Adding upstream version 1.27.2.
Signed-off-by: Daniel Baumann <daniel@debian.org>
This commit is contained in:
parent
69f0f331c6
commit
5e6d9cc7a9
15 changed files with 248 additions and 118 deletions
|
@ -254,23 +254,21 @@ def test_conditional_pager(monkeypatch):
|
|||
SPECIAL_COMMANDS['pager'].handler('')
|
||||
|
||||
|
||||
def test_reserved_space_is_integer():
|
||||
def test_reserved_space_is_integer(monkeypatch):
|
||||
"""Make sure that reserved space is returned as an integer."""
|
||||
def stub_terminal_size():
|
||||
return (5, 5)
|
||||
|
||||
old_func = shutil.get_terminal_size
|
||||
|
||||
shutil.get_terminal_size = stub_terminal_size
|
||||
mycli = MyCli()
|
||||
assert isinstance(mycli.get_reserved_space(), int)
|
||||
|
||||
shutil.get_terminal_size = old_func
|
||||
with monkeypatch.context() as m:
|
||||
m.setattr(shutil, 'get_terminal_size', stub_terminal_size)
|
||||
mycli = MyCli()
|
||||
assert isinstance(mycli.get_reserved_space(), int)
|
||||
|
||||
|
||||
def test_list_dsn():
|
||||
runner = CliRunner()
|
||||
with NamedTemporaryFile(mode="w") as myclirc:
|
||||
# keep Windows from locking the file with delete=False
|
||||
with NamedTemporaryFile(mode="w",delete=False) as myclirc:
|
||||
myclirc.write(dedent("""\
|
||||
[alias_dsn]
|
||||
test = mysql://test/test
|
||||
|
@ -281,6 +279,15 @@ def test_list_dsn():
|
|||
assert result.output == "test\n"
|
||||
result = runner.invoke(cli, args=args + ['--verbose'])
|
||||
assert result.output == "test : mysql://test/test\n"
|
||||
|
||||
# delete=False means we should try to clean up
|
||||
try:
|
||||
if os.path.exists(myclirc.name):
|
||||
os.remove(myclirc.name)
|
||||
except Exception as e:
|
||||
print(f"An error occurred while attempting to delete the file: {e}")
|
||||
|
||||
|
||||
|
||||
|
||||
def test_prettify_statement():
|
||||
|
@ -299,7 +306,8 @@ def test_unprettify_statement():
|
|||
|
||||
def test_list_ssh_config():
|
||||
runner = CliRunner()
|
||||
with NamedTemporaryFile(mode="w") as ssh_config:
|
||||
# keep Windows from locking the file with delete=False
|
||||
with NamedTemporaryFile(mode="w",delete=False) as ssh_config:
|
||||
ssh_config.write(dedent("""\
|
||||
Host test
|
||||
Hostname test.example.com
|
||||
|
@ -313,6 +321,13 @@ def test_list_ssh_config():
|
|||
assert "test\n" in result.output
|
||||
result = runner.invoke(cli, args=args + ['--verbose'])
|
||||
assert "test : test.example.com\n" in result.output
|
||||
|
||||
# delete=False means we should try to clean up
|
||||
try:
|
||||
if os.path.exists(ssh_config.name):
|
||||
os.remove(ssh_config.name)
|
||||
except Exception as e:
|
||||
print(f"An error occurred while attempting to delete the file: {e}")
|
||||
|
||||
|
||||
def test_dsn(monkeypatch):
|
||||
|
@ -466,7 +481,8 @@ def test_ssh_config(monkeypatch):
|
|||
runner = CliRunner()
|
||||
|
||||
# Setup temporary configuration
|
||||
with NamedTemporaryFile(mode="w") as ssh_config:
|
||||
# keep Windows from locking the file with delete=False
|
||||
with NamedTemporaryFile(mode="w",delete=False) as ssh_config:
|
||||
ssh_config.write(dedent("""\
|
||||
Host test
|
||||
Hostname test.example.com
|
||||
|
@ -489,8 +505,8 @@ def test_ssh_config(monkeypatch):
|
|||
MockMyCli.connect_args["ssh_user"] == "joe" and \
|
||||
MockMyCli.connect_args["ssh_host"] == "test.example.com" and \
|
||||
MockMyCli.connect_args["ssh_port"] == 22222 and \
|
||||
MockMyCli.connect_args["ssh_key_filename"] == os.getenv(
|
||||
"HOME") + "/.ssh/gateway"
|
||||
MockMyCli.connect_args["ssh_key_filename"] == os.path.expanduser(
|
||||
"~") + "/.ssh/gateway"
|
||||
|
||||
# When a user supplies a ssh config host as argument to mycli,
|
||||
# and used command line arguments, use the command line
|
||||
|
@ -512,6 +528,13 @@ def test_ssh_config(monkeypatch):
|
|||
MockMyCli.connect_args["ssh_host"] == "arg_host" and \
|
||||
MockMyCli.connect_args["ssh_port"] == 3 and \
|
||||
MockMyCli.connect_args["ssh_key_filename"] == "/path/to/key"
|
||||
|
||||
# delete=False means we should try to clean up
|
||||
try:
|
||||
if os.path.exists(ssh_config.name):
|
||||
os.remove(ssh_config.name)
|
||||
except Exception as e:
|
||||
print(f"An error occurred while attempting to delete the file: {e}")
|
||||
|
||||
|
||||
@dbtest
|
||||
|
|
|
@ -50,25 +50,49 @@ def test_editor_command():
|
|||
|
||||
os.environ['EDITOR'] = 'true'
|
||||
os.environ['VISUAL'] = 'true'
|
||||
mycli.packages.special.open_external_editor(sql=r'select 1') == "select 1"
|
||||
# Set the editor to Notepad on Windows
|
||||
if os.name != 'nt':
|
||||
mycli.packages.special.open_external_editor(sql=r'select 1') == "select 1"
|
||||
else:
|
||||
pytest.skip('Skipping on Windows platform.')
|
||||
|
||||
|
||||
|
||||
def test_tee_command():
|
||||
mycli.packages.special.write_tee(u"hello world") # write without file set
|
||||
with tempfile.NamedTemporaryFile() as f:
|
||||
# keep Windows from locking the file with delete=False
|
||||
with tempfile.NamedTemporaryFile(delete=False) as f:
|
||||
mycli.packages.special.execute(None, u"tee " + f.name)
|
||||
mycli.packages.special.write_tee(u"hello world")
|
||||
assert f.read() == b"hello world\n"
|
||||
if os.name=='nt':
|
||||
assert f.read() == b"hello world\r\n"
|
||||
else:
|
||||
assert f.read() == b"hello world\n"
|
||||
|
||||
mycli.packages.special.execute(None, u"tee -o " + f.name)
|
||||
mycli.packages.special.write_tee(u"hello world")
|
||||
f.seek(0)
|
||||
assert f.read() == b"hello world\n"
|
||||
if os.name=='nt':
|
||||
assert f.read() == b"hello world\r\n"
|
||||
else:
|
||||
assert f.read() == b"hello world\n"
|
||||
|
||||
mycli.packages.special.execute(None, u"notee")
|
||||
mycli.packages.special.write_tee(u"hello world")
|
||||
f.seek(0)
|
||||
assert f.read() == b"hello world\n"
|
||||
if os.name=='nt':
|
||||
assert f.read() == b"hello world\r\n"
|
||||
else:
|
||||
assert f.read() == b"hello world\n"
|
||||
|
||||
# remove temp file
|
||||
# delete=False means we should try to clean up
|
||||
try:
|
||||
if os.path.exists(f.name):
|
||||
os.remove(f.name)
|
||||
except Exception as e:
|
||||
print(f"An error occurred while attempting to delete the file: {e}")
|
||||
|
||||
|
||||
|
||||
def test_tee_command_error():
|
||||
|
@ -82,6 +106,8 @@ def test_tee_command_error():
|
|||
|
||||
|
||||
@dbtest
|
||||
|
||||
@pytest.mark.skipif(os.name == "nt", reason="Bug: fails on Windows, needs fixing, singleton of FQ not working right")
|
||||
def test_favorite_query():
|
||||
with db_connection().cursor() as cur:
|
||||
query = u'select "✔"'
|
||||
|
@ -98,16 +124,29 @@ def test_once_command():
|
|||
mycli.packages.special.execute(None, u"\\once /proc/access-denied")
|
||||
|
||||
mycli.packages.special.write_once(u"hello world") # write without file set
|
||||
with tempfile.NamedTemporaryFile() as f:
|
||||
# keep Windows from locking the file with delete=False
|
||||
with tempfile.NamedTemporaryFile(delete=False) as f:
|
||||
mycli.packages.special.execute(None, u"\\once " + f.name)
|
||||
mycli.packages.special.write_once(u"hello world")
|
||||
assert f.read() == b"hello world\n"
|
||||
if os.name=='nt':
|
||||
assert f.read() == b"hello world\r\n"
|
||||
else:
|
||||
assert f.read() == b"hello world\n"
|
||||
|
||||
mycli.packages.special.execute(None, u"\\once -o " + f.name)
|
||||
mycli.packages.special.write_once(u"hello world line 1")
|
||||
mycli.packages.special.write_once(u"hello world line 2")
|
||||
f.seek(0)
|
||||
assert f.read() == b"hello world line 1\nhello world line 2\n"
|
||||
if os.name=='nt':
|
||||
assert f.read() == b"hello world line 1\r\nhello world line 2\r\n"
|
||||
else:
|
||||
assert f.read() == b"hello world line 1\nhello world line 2\n"
|
||||
# delete=False means we should try to clean up
|
||||
try:
|
||||
if os.path.exists(f.name):
|
||||
os.remove(f.name)
|
||||
except Exception as e:
|
||||
print(f"An error occurred while attempting to delete the file: {e}")
|
||||
|
||||
|
||||
def test_pipe_once_command():
|
||||
|
@ -118,9 +157,14 @@ def test_pipe_once_command():
|
|||
mycli.packages.special.execute(
|
||||
None, u"\\pipe_once /proc/access-denied")
|
||||
|
||||
mycli.packages.special.execute(None, u"\\pipe_once wc")
|
||||
mycli.packages.special.write_once(u"hello world")
|
||||
mycli.packages.special.unset_pipe_once_if_written()
|
||||
if os.name == 'nt':
|
||||
mycli.packages.special.execute(None, u'\\pipe_once python -c "import sys; print(len(sys.stdin.read().strip()))"')
|
||||
mycli.packages.special.write_once(u"hello world")
|
||||
mycli.packages.special.unset_pipe_once_if_written()
|
||||
else:
|
||||
mycli.packages.special.execute(None, u"\\pipe_once wc")
|
||||
mycli.packages.special.write_once(u"hello world")
|
||||
mycli.packages.special.unset_pipe_once_if_written()
|
||||
# how to assert on wc output?
|
||||
|
||||
|
||||
|
@ -128,12 +172,21 @@ def test_parseargfile():
|
|||
"""Test that parseargfile expands the user directory."""
|
||||
expected = {'file': os.path.join(os.path.expanduser('~'), 'filename'),
|
||||
'mode': 'a'}
|
||||
assert expected == mycli.packages.special.iocommands.parseargfile(
|
||||
'~/filename')
|
||||
|
||||
if os.name=='nt':
|
||||
assert expected == mycli.packages.special.iocommands.parseargfile(
|
||||
'~\\filename')
|
||||
else:
|
||||
assert expected == mycli.packages.special.iocommands.parseargfile(
|
||||
'~/filename')
|
||||
|
||||
expected = {'file': os.path.join(os.path.expanduser('~'), 'filename'),
|
||||
'mode': 'w'}
|
||||
assert expected == mycli.packages.special.iocommands.parseargfile(
|
||||
if os.name=='nt':
|
||||
assert expected == mycli.packages.special.iocommands.parseargfile(
|
||||
'-o ~\\filename')
|
||||
else:
|
||||
assert expected == mycli.packages.special.iocommands.parseargfile(
|
||||
'-o ~/filename')
|
||||
|
||||
|
||||
|
@ -162,6 +215,7 @@ def test_watch_query_iteration():
|
|||
|
||||
|
||||
@dbtest
|
||||
@pytest.mark.skipif(os.name == "nt", reason="Bug: Win handles this differently. May need to refactor watch_query to work for Win")
|
||||
def test_watch_query_full():
|
||||
"""Test that `watch_query`:
|
||||
|
||||
|
|
|
@ -117,6 +117,7 @@ def test_multiple_queries_same_line_syntaxerror(executor):
|
|||
|
||||
|
||||
@dbtest
|
||||
@pytest.mark.skipif(os.name == "nt", reason="Bug: fails on Windows, needs fixing, singleton of FQ not working right")
|
||||
def test_favorite_query(executor):
|
||||
set_expanded_output(False)
|
||||
run(executor, "create table test(a text)")
|
||||
|
@ -136,6 +137,7 @@ def test_favorite_query(executor):
|
|||
|
||||
|
||||
@dbtest
|
||||
@pytest.mark.skipif(os.name == "nt", reason="Bug: fails on Windows, needs fixing, singleton of FQ not working right")
|
||||
def test_favorite_query_multiple_statement(executor):
|
||||
set_expanded_output(False)
|
||||
run(executor, "create table test(a text)")
|
||||
|
@ -159,6 +161,7 @@ def test_favorite_query_multiple_statement(executor):
|
|||
|
||||
|
||||
@dbtest
|
||||
@pytest.mark.skipif(os.name == "nt", reason="Bug: fails on Windows, needs fixing, singleton of FQ not working right")
|
||||
def test_favorite_query_expanded_output(executor):
|
||||
set_expanded_output(False)
|
||||
run(executor, '''create table test(a text)''')
|
||||
|
@ -195,16 +198,21 @@ def test_cd_command_without_a_folder_name(executor):
|
|||
@dbtest
|
||||
def test_system_command_not_found(executor):
|
||||
results = run(executor, 'system xyz')
|
||||
assert_result_equal(results, status='OSError: No such file or directory',
|
||||
assert_contains=True)
|
||||
if os.name=='nt':
|
||||
assert_result_equal(results, status='OSError: The system cannot find the file specified',
|
||||
assert_contains=True)
|
||||
else:
|
||||
assert_result_equal(results, status='OSError: No such file or directory',
|
||||
assert_contains=True)
|
||||
|
||||
|
||||
@dbtest
|
||||
def test_system_command_output(executor):
|
||||
eol = os.linesep
|
||||
test_dir = os.path.abspath(os.path.dirname(__file__))
|
||||
test_file_path = os.path.join(test_dir, 'test.txt')
|
||||
results = run(executor, 'system cat {0}'.format(test_file_path))
|
||||
assert_result_equal(results, status='mycli rocks!\n')
|
||||
assert_result_equal(results, status=f'mycli rocks!{eol}')
|
||||
|
||||
|
||||
@dbtest
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue