import os import shutil import click from click.testing import CliRunner from mycli.main import MyCli, cli, thanks_picker from mycli.packages.special.main import COMMANDS as SPECIAL_COMMANDS from mycli.sqlexecute import ServerInfo from .utils import USER, HOST, PORT, PASSWORD, dbtest, run from textwrap import dedent from collections import namedtuple from tempfile import NamedTemporaryFile test_dir = os.path.abspath(os.path.dirname(__file__)) project_dir = os.path.dirname(test_dir) default_config_file = os.path.join(project_dir, "test", "myclirc") login_path_file = os.path.join(test_dir, "mylogin.cnf") os.environ["MYSQL_TEST_LOGIN_FILE"] = login_path_file CLI_ARGS = [ "--user", USER, "--host", HOST, "--port", PORT, "--password", PASSWORD, "--myclirc", default_config_file, "--defaults-file", default_config_file, "mycli_test_db", ] @dbtest def test_execute_arg(executor): run(executor, "create table test (a text)") run(executor, 'insert into test values("abc")') sql = "select * from test;" runner = CliRunner() result = runner.invoke(cli, args=CLI_ARGS + ["-e", sql]) assert result.exit_code == 0 assert "abc" in result.output result = runner.invoke(cli, args=CLI_ARGS + ["--execute", sql]) assert result.exit_code == 0 assert "abc" in result.output expected = "a\nabc\n" assert expected in result.output @dbtest def test_execute_arg_with_table(executor): run(executor, "create table test (a text)") run(executor, 'insert into test values("abc")') sql = "select * from test;" runner = CliRunner() result = runner.invoke(cli, args=CLI_ARGS + ["-e", sql] + ["--table"]) expected = "+-----+\n| a |\n+-----+\n| abc |\n+-----+\n" assert result.exit_code == 0 assert expected in result.output @dbtest def test_execute_arg_with_csv(executor): run(executor, "create table test (a text)") run(executor, 'insert into test values("abc")') sql = "select * from test;" runner = CliRunner() result = runner.invoke(cli, args=CLI_ARGS + ["-e", sql] + ["--csv"]) expected = '"a"\n"abc"\n' assert result.exit_code == 0 assert expected in "".join(result.output) @dbtest def test_batch_mode(executor): run(executor, """create table test(a text)""") run(executor, """insert into test values('abc'), ('def'), ('ghi')""") sql = "select count(*) from test;\n" "select * from test limit 1;" runner = CliRunner() result = runner.invoke(cli, args=CLI_ARGS, input=sql) assert result.exit_code == 0 assert "count(*)\n3\na\nabc\n" in "".join(result.output) @dbtest def test_batch_mode_table(executor): run(executor, """create table test(a text)""") run(executor, """insert into test values('abc'), ('def'), ('ghi')""") sql = "select count(*) from test;\n" "select * from test limit 1;" runner = CliRunner() result = runner.invoke(cli, args=CLI_ARGS + ["-t"], input=sql) expected = dedent("""\ +----------+ | count(*) | +----------+ | 3 | +----------+ +-----+ | a | +-----+ | abc | +-----+""") assert result.exit_code == 0 assert expected in result.output @dbtest def test_batch_mode_csv(executor): run(executor, """create table test(a text, b text)""") run(executor, """insert into test (a, b) values('abc', 'de\nf'), ('ghi', 'jkl')""") sql = "select * from test;" runner = CliRunner() result = runner.invoke(cli, args=CLI_ARGS + ["--csv"], input=sql) expected = '"a","b"\n"abc","de\nf"\n"ghi","jkl"\n' assert result.exit_code == 0 assert expected in "".join(result.output) def test_thanks_picker_utf8(): name = thanks_picker() assert name and isinstance(name, str) def test_help_strings_end_with_periods(): """Make sure click options have help text that end with a period.""" for param in cli.params: if isinstance(param, click.core.Option): assert hasattr(param, "help") assert param.help.endswith(".") def test_command_descriptions_end_with_periods(): """Make sure that mycli commands' descriptions end with a period.""" MyCli() for _, command in SPECIAL_COMMANDS.items(): assert command[3].endswith(".") def output(monkeypatch, terminal_size, testdata, explicit_pager, expect_pager): global clickoutput clickoutput = "" m = MyCli(myclirc=default_config_file) class TestOutput: def get_size(self): size = namedtuple("Size", "rows columns") size.columns, size.rows = terminal_size return size class TestExecute: host = "test" user = "test" dbname = "test" server_info = ServerInfo.from_version_string("unknown") port = 0 def server_type(self): return ["test"] class PromptBuffer: output = TestOutput() m.prompt_app = PromptBuffer() m.sqlexecute = TestExecute() m.explicit_pager = explicit_pager def echo_via_pager(s): assert expect_pager global clickoutput clickoutput += "".join(s) def secho(s): assert not expect_pager global clickoutput clickoutput += s + "\n" monkeypatch.setattr(click, "echo_via_pager", echo_via_pager) monkeypatch.setattr(click, "secho", secho) m.output(testdata) if clickoutput.endswith("\n"): clickoutput = clickoutput[:-1] assert clickoutput == "\n".join(testdata) def test_conditional_pager(monkeypatch): testdata = "Lorem ipsum dolor sit amet consectetur adipiscing elit sed do".split(" ") # User didn't set pager, output doesn't fit screen -> pager output(monkeypatch, terminal_size=(5, 10), testdata=testdata, explicit_pager=False, expect_pager=True) # User didn't set pager, output fits screen -> no pager output(monkeypatch, terminal_size=(20, 20), testdata=testdata, explicit_pager=False, expect_pager=False) # User manually configured pager, output doesn't fit screen -> pager output(monkeypatch, terminal_size=(5, 10), testdata=testdata, explicit_pager=True, expect_pager=True) # User manually configured pager, output fit screen -> pager output(monkeypatch, terminal_size=(20, 20), testdata=testdata, explicit_pager=True, expect_pager=True) SPECIAL_COMMANDS["nopager"].handler() output(monkeypatch, terminal_size=(5, 10), testdata=testdata, explicit_pager=False, expect_pager=False) SPECIAL_COMMANDS["pager"].handler("") def test_reserved_space_is_integer(monkeypatch): """Make sure that reserved space is returned as an integer.""" def stub_terminal_size(): return (5, 5) with monkeypatch.context() as m: m.setattr(shutil, "get_terminal_size", stub_terminal_size) mycli = MyCli() assert isinstance(mycli.get_reserved_space(), int) def test_list_dsn(): runner = CliRunner() # keep Windows from locking the file with delete=False with NamedTemporaryFile(mode="w", delete=False) as myclirc: myclirc.write( dedent("""\ [alias_dsn] test = mysql://test/test """) ) myclirc.flush() args = ["--list-dsn", "--myclirc", myclirc.name] result = runner.invoke(cli, args=args) assert result.output == "test\n" result = runner.invoke(cli, args=args + ["--verbose"]) assert result.output == "test : mysql://test/test\n" # delete=False means we should try to clean up try: if os.path.exists(myclirc.name): os.remove(myclirc.name) except Exception as e: print(f"An error occurred while attempting to delete the file: {e}") def test_prettify_statement(): statement = "SELECT 1" m = MyCli() pretty_statement = m.handle_prettify_binding(statement) assert pretty_statement == "SELECT\n 1;" def test_unprettify_statement(): statement = "SELECT\n 1" m = MyCli() unpretty_statement = m.handle_unprettify_binding(statement) assert unpretty_statement == "SELECT 1;" def test_list_ssh_config(): runner = CliRunner() # keep Windows from locking the file with delete=False with NamedTemporaryFile(mode="w", delete=False) as ssh_config: ssh_config.write( dedent("""\ Host test Hostname test.example.com User joe Port 22222 IdentityFile ~/.ssh/gateway """) ) ssh_config.flush() args = ["--list-ssh-config", "--ssh-config-path", ssh_config.name] result = runner.invoke(cli, args=args) assert "test\n" in result.output result = runner.invoke(cli, args=args + ["--verbose"]) assert "test : test.example.com\n" in result.output # delete=False means we should try to clean up try: if os.path.exists(ssh_config.name): os.remove(ssh_config.name) except Exception as e: print(f"An error occurred while attempting to delete the file: {e}") def test_dsn(monkeypatch): # Setup classes to mock mycli.main.MyCli class Formatter: format_name = None class Logger: def debug(self, *args, **args_dict): pass def warning(self, *args, **args_dict): pass class MockMyCli: config = {"alias_dsn": {}} def __init__(self, **args): self.logger = Logger() self.destructive_warning = False self.formatter = Formatter() def connect(self, **args): MockMyCli.connect_args = args def run_query(self, query, new_line=True): pass import mycli.main monkeypatch.setattr(mycli.main, "MyCli", MockMyCli) runner = CliRunner() # When a user supplies a DSN as database argument to mycli, # use these values. result = runner.invoke(mycli.main.cli, args=["mysql://dsn_user:dsn_passwd@dsn_host:1/dsn_database"]) assert result.exit_code == 0, result.output + " " + str(result.exception) assert ( MockMyCli.connect_args["user"] == "dsn_user" and MockMyCli.connect_args["passwd"] == "dsn_passwd" and MockMyCli.connect_args["host"] == "dsn_host" and MockMyCli.connect_args["port"] == 1 and MockMyCli.connect_args["database"] == "dsn_database" ) MockMyCli.connect_args = None # When a use supplies a DSN as database argument to mycli, # and used command line arguments, use the command line # arguments. result = runner.invoke( mycli.main.cli, args=[ "mysql://dsn_user:dsn_passwd@dsn_host:2/dsn_database", "--user", "arg_user", "--password", "arg_password", "--host", "arg_host", "--port", "3", "--database", "arg_database", ], ) assert result.exit_code == 0, result.output + " " + str(result.exception) assert ( MockMyCli.connect_args["user"] == "arg_user" and MockMyCli.connect_args["passwd"] == "arg_password" and MockMyCli.connect_args["host"] == "arg_host" and MockMyCli.connect_args["port"] == 3 and MockMyCli.connect_args["database"] == "arg_database" ) MockMyCli.config = {"alias_dsn": {"test": "mysql://alias_dsn_user:alias_dsn_passwd@alias_dsn_host:4/alias_dsn_database"}} MockMyCli.connect_args = None # When a user uses a DSN from the configuration file (alias_dsn), # use these values. result = runner.invoke(cli, args=["--dsn", "test"]) assert result.exit_code == 0, result.output + " " + str(result.exception) assert ( MockMyCli.connect_args["user"] == "alias_dsn_user" and MockMyCli.connect_args["passwd"] == "alias_dsn_passwd" and MockMyCli.connect_args["host"] == "alias_dsn_host" and MockMyCli.connect_args["port"] == 4 and MockMyCli.connect_args["database"] == "alias_dsn_database" ) MockMyCli.config = {"alias_dsn": {"test": "mysql://alias_dsn_user:alias_dsn_passwd@alias_dsn_host:4/alias_dsn_database"}} MockMyCli.connect_args = None # When a user uses a DSN from the configuration file (alias_dsn) # and used command line arguments, use the command line arguments. result = runner.invoke( cli, args=[ "--dsn", "test", "", "--user", "arg_user", "--password", "arg_password", "--host", "arg_host", "--port", "5", "--database", "arg_database", ], ) assert result.exit_code == 0, result.output + " " + str(result.exception) assert ( MockMyCli.connect_args["user"] == "arg_user" and MockMyCli.connect_args["passwd"] == "arg_password" and MockMyCli.connect_args["host"] == "arg_host" and MockMyCli.connect_args["port"] == 5 and MockMyCli.connect_args["database"] == "arg_database" ) # Use a DSN without password result = runner.invoke(mycli.main.cli, args=["mysql://dsn_user@dsn_host:6/dsn_database"]) assert result.exit_code == 0, result.output + " " + str(result.exception) assert ( MockMyCli.connect_args["user"] == "dsn_user" and MockMyCli.connect_args["passwd"] is None and MockMyCli.connect_args["host"] == "dsn_host" and MockMyCli.connect_args["port"] == 6 and MockMyCli.connect_args["database"] == "dsn_database" ) def test_ssh_config(monkeypatch): # Setup classes to mock mycli.main.MyCli class Formatter: format_name = None class Logger: def debug(self, *args, **args_dict): pass def warning(self, *args, **args_dict): pass class MockMyCli: config = {"alias_dsn": {}} def __init__(self, **args): self.logger = Logger() self.destructive_warning = False self.formatter = Formatter() def connect(self, **args): MockMyCli.connect_args = args def run_query(self, query, new_line=True): pass import mycli.main monkeypatch.setattr(mycli.main, "MyCli", MockMyCli) runner = CliRunner() # Setup temporary configuration # keep Windows from locking the file with delete=False with NamedTemporaryFile(mode="w", delete=False) as ssh_config: ssh_config.write( dedent("""\ Host test Hostname test.example.com User joe Port 22222 IdentityFile ~/.ssh/gateway """) ) ssh_config.flush() # When a user supplies a ssh config. result = runner.invoke(mycli.main.cli, args=["--ssh-config-path", ssh_config.name, "--ssh-config-host", "test"]) assert result.exit_code == 0, result.output + " " + str(result.exception) assert ( MockMyCli.connect_args["ssh_user"] == "joe" and MockMyCli.connect_args["ssh_host"] == "test.example.com" and MockMyCli.connect_args["ssh_port"] == 22222 and MockMyCli.connect_args["ssh_key_filename"] == os.path.expanduser("~") + "/.ssh/gateway" ) # When a user supplies a ssh config host as argument to mycli, # and used command line arguments, use the command line # arguments. result = runner.invoke( mycli.main.cli, args=[ "--ssh-config-path", ssh_config.name, "--ssh-config-host", "test", "--ssh-user", "arg_user", "--ssh-host", "arg_host", "--ssh-port", "3", "--ssh-key-filename", "/path/to/key", ], ) assert result.exit_code == 0, result.output + " " + str(result.exception) assert ( MockMyCli.connect_args["ssh_user"] == "arg_user" and MockMyCli.connect_args["ssh_host"] == "arg_host" and MockMyCli.connect_args["ssh_port"] == 3 and MockMyCli.connect_args["ssh_key_filename"] == "/path/to/key" ) # delete=False means we should try to clean up try: if os.path.exists(ssh_config.name): os.remove(ssh_config.name) except Exception as e: print(f"An error occurred while attempting to delete the file: {e}") @dbtest def test_init_command_arg(executor): init_command = "set sql_select_limit=1000" sql = 'show variables like "sql_select_limit";' runner = CliRunner() result = runner.invoke(cli, args=CLI_ARGS + ["--init-command", init_command], input=sql) expected = "sql_select_limit\t1000\n" assert result.exit_code == 0 assert expected in result.output @dbtest def test_init_command_multiple_arg(executor): init_command = "set sql_select_limit=2000; set max_join_size=20000" sql = 'show variables like "sql_select_limit";\n' 'show variables like "max_join_size"' runner = CliRunner() result = runner.invoke(cli, args=CLI_ARGS + ["--init-command", init_command], input=sql) expected_sql_select_limit = "sql_select_limit\t2000\n" expected_max_join_size = "max_join_size\t20000\n" assert result.exit_code == 0 assert expected_sql_select_limit in result.output assert expected_max_join_size in result.output