1
0
Fork 0
mycli/test/test_special_iocommands.py
Daniel Baumann f15fe4f59f
Merging upstream version 1.29.2.
Signed-off-by: Daniel Baumann <daniel@debian.org>
2025-02-09 19:16:42 +01:00

313 lines
11 KiB
Python

import os
import stat
import tempfile
from time import time
from unittest.mock import patch
import pytest
from pymysql import ProgrammingError
import mycli.packages.special
from .utils import dbtest, db_connection, send_ctrl_c
def test_set_get_pager():
mycli.packages.special.set_pager_enabled(True)
assert mycli.packages.special.is_pager_enabled()
mycli.packages.special.set_pager_enabled(False)
assert not mycli.packages.special.is_pager_enabled()
mycli.packages.special.set_pager("less")
assert os.environ["PAGER"] == "less"
mycli.packages.special.set_pager(False)
assert os.environ["PAGER"] == "less"
del os.environ["PAGER"]
mycli.packages.special.set_pager(False)
mycli.packages.special.disable_pager()
assert not mycli.packages.special.is_pager_enabled()
def test_set_get_timing():
mycli.packages.special.set_timing_enabled(True)
assert mycli.packages.special.is_timing_enabled()
mycli.packages.special.set_timing_enabled(False)
assert not mycli.packages.special.is_timing_enabled()
def test_set_get_expanded_output():
mycli.packages.special.set_expanded_output(True)
assert mycli.packages.special.is_expanded_output()
mycli.packages.special.set_expanded_output(False)
assert not mycli.packages.special.is_expanded_output()
def test_editor_command():
assert mycli.packages.special.editor_command(r"hello\e")
assert mycli.packages.special.editor_command(r"\ehello")
assert not mycli.packages.special.editor_command(r"hello")
assert mycli.packages.special.get_filename(r"\e filename") == "filename"
os.environ["EDITOR"] = "true"
os.environ["VISUAL"] = "true"
# Set the editor to Notepad on Windows
if os.name != "nt":
mycli.packages.special.open_external_editor(sql=r"select 1") == "select 1"
else:
pytest.skip("Skipping on Windows platform.")
def test_tee_command():
mycli.packages.special.write_tee("hello world") # write without file set
# keep Windows from locking the file with delete=False
with tempfile.NamedTemporaryFile(delete=False) as f:
mycli.packages.special.execute(None, "tee " + f.name)
mycli.packages.special.write_tee("hello world")
if os.name == "nt":
assert f.read() == b"hello world\r\n"
else:
assert f.read() == b"hello world\n"
mycli.packages.special.execute(None, "tee -o " + f.name)
mycli.packages.special.write_tee("hello world")
f.seek(0)
if os.name == "nt":
assert f.read() == b"hello world\r\n"
else:
assert f.read() == b"hello world\n"
mycli.packages.special.execute(None, "notee")
mycli.packages.special.write_tee("hello world")
f.seek(0)
if os.name == "nt":
assert f.read() == b"hello world\r\n"
else:
assert f.read() == b"hello world\n"
# remove temp file
# delete=False means we should try to clean up
try:
if os.path.exists(f.name):
os.remove(f.name)
except Exception as e:
print(f"An error occurred while attempting to delete the file: {e}")
def test_tee_command_error():
with pytest.raises(TypeError):
mycli.packages.special.execute(None, "tee")
with pytest.raises(OSError):
with tempfile.NamedTemporaryFile() as f:
os.chmod(f.name, stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
mycli.packages.special.execute(None, "tee {}".format(f.name))
@dbtest
@pytest.mark.skipif(os.name == "nt", reason="Bug: fails on Windows, needs fixing, singleton of FQ not working right")
def test_favorite_query():
with db_connection().cursor() as cur:
query = 'select ""'
mycli.packages.special.execute(cur, "\\fs check {0}".format(query))
assert next(mycli.packages.special.execute(cur, "\\f check"))[0] == "> " + query
def test_once_command():
with pytest.raises(TypeError):
mycli.packages.special.execute(None, "\\once")
with pytest.raises(OSError):
mycli.packages.special.execute(None, "\\once /proc/access-denied")
mycli.packages.special.write_once("hello world") # write without file set
# keep Windows from locking the file with delete=False
with tempfile.NamedTemporaryFile(delete=False) as f:
mycli.packages.special.execute(None, "\\once " + f.name)
mycli.packages.special.write_once("hello world")
if os.name == "nt":
assert f.read() == b"hello world\r\n"
else:
assert f.read() == b"hello world\n"
mycli.packages.special.execute(None, "\\once -o " + f.name)
mycli.packages.special.write_once("hello world line 1")
mycli.packages.special.write_once("hello world line 2")
f.seek(0)
if os.name == "nt":
assert f.read() == b"hello world line 1\r\nhello world line 2\r\n"
else:
assert f.read() == b"hello world line 1\nhello world line 2\n"
# delete=False means we should try to clean up
try:
if os.path.exists(f.name):
os.remove(f.name)
except Exception as e:
print(f"An error occurred while attempting to delete the file: {e}")
def test_pipe_once_command():
with pytest.raises(IOError):
mycli.packages.special.execute(None, "\\pipe_once")
with pytest.raises(OSError):
mycli.packages.special.execute(None, "\\pipe_once /proc/access-denied")
if os.name == "nt":
mycli.packages.special.execute(None, '\\pipe_once python -c "import sys; print(len(sys.stdin.read().strip()))"')
mycli.packages.special.write_once("hello world")
mycli.packages.special.unset_pipe_once_if_written()
else:
with tempfile.NamedTemporaryFile() as f:
mycli.packages.special.execute(None, "\\pipe_once tee " + f.name)
mycli.packages.special.write_pipe_once("hello world")
mycli.packages.special.unset_pipe_once_if_written()
f.seek(0)
assert f.read() == b"hello world\n"
def test_parseargfile():
"""Test that parseargfile expands the user directory."""
expected = {"file": os.path.join(os.path.expanduser("~"), "filename"), "mode": "a"}
if os.name == "nt":
assert expected == mycli.packages.special.iocommands.parseargfile("~\\filename")
else:
assert expected == mycli.packages.special.iocommands.parseargfile("~/filename")
expected = {"file": os.path.join(os.path.expanduser("~"), "filename"), "mode": "w"}
if os.name == "nt":
assert expected == mycli.packages.special.iocommands.parseargfile("-o ~\\filename")
else:
assert expected == mycli.packages.special.iocommands.parseargfile("-o ~/filename")
def test_parseargfile_no_file():
"""Test that parseargfile raises a TypeError if there is no filename."""
with pytest.raises(TypeError):
mycli.packages.special.iocommands.parseargfile("")
with pytest.raises(TypeError):
mycli.packages.special.iocommands.parseargfile("-o ")
@dbtest
def test_watch_query_iteration():
"""Test that a single iteration of the result of `watch_query` executes
the desired query and returns the given results."""
expected_value = "1"
query = "SELECT {0!s}".format(expected_value)
expected_title = "> {0!s}".format(query)
with db_connection().cursor() as cur:
result = next(mycli.packages.special.iocommands.watch_query(arg=query, cur=cur))
assert result[0] == expected_title
assert result[2][0] == expected_value
@dbtest
@pytest.mark.skipif(os.name == "nt", reason="Bug: Win handles this differently. May need to refactor watch_query to work for Win")
def test_watch_query_full():
"""Test that `watch_query`:
* Returns the expected results.
* Executes the defined times inside the given interval, in this case with
a 0.3 seconds wait, it should execute 4 times inside a 1 seconds
interval.
* Stops at Ctrl-C
"""
watch_seconds = 0.3
wait_interval = 1
expected_value = "1"
query = "SELECT {0!s}".format(expected_value)
expected_title = "> {0!s}".format(query)
expected_results = 4
ctrl_c_process = send_ctrl_c(wait_interval)
with db_connection().cursor() as cur:
results = list(
result for result in mycli.packages.special.iocommands.watch_query(arg="{0!s} {1!s}".format(watch_seconds, query), cur=cur)
)
ctrl_c_process.join(1)
assert len(results) == expected_results
for result in results:
assert result[0] == expected_title
assert result[2][0] == expected_value
@dbtest
@patch("click.clear")
def test_watch_query_clear(clear_mock):
"""Test that the screen is cleared with the -c flag of `watch` command
before execute the query."""
with db_connection().cursor() as cur:
watch_gen = mycli.packages.special.iocommands.watch_query(arg="0.1 -c select 1;", cur=cur)
assert not clear_mock.called
next(watch_gen)
assert clear_mock.called
clear_mock.reset_mock()
next(watch_gen)
assert clear_mock.called
clear_mock.reset_mock()
@dbtest
def test_watch_query_bad_arguments():
"""Test different incorrect combinations of arguments for `watch`
command."""
watch_query = mycli.packages.special.iocommands.watch_query
with db_connection().cursor() as cur:
with pytest.raises(ProgrammingError):
next(watch_query("a select 1;", cur=cur))
with pytest.raises(ProgrammingError):
next(watch_query("-a select 1;", cur=cur))
with pytest.raises(ProgrammingError):
next(watch_query("1 -a select 1;", cur=cur))
with pytest.raises(ProgrammingError):
next(watch_query("-c -a select 1;", cur=cur))
@dbtest
@patch("click.clear")
def test_watch_query_interval_clear(clear_mock):
"""Test `watch` command with interval and clear flag."""
def test_asserts(gen):
clear_mock.reset_mock()
start = time()
next(gen)
assert clear_mock.called
next(gen)
exec_time = time() - start
assert exec_time > seconds and exec_time < (seconds + seconds)
seconds = 1.0
watch_query = mycli.packages.special.iocommands.watch_query
with db_connection().cursor() as cur:
test_asserts(watch_query("{0!s} -c select 1;".format(seconds), cur=cur))
test_asserts(watch_query("-c {0!s} select 1;".format(seconds), cur=cur))
def test_split_sql_by_delimiter():
for delimiter_str in (";", "$", "😀"):
mycli.packages.special.set_delimiter(delimiter_str)
sql_input = "select 1{} select \ufffc2".format(delimiter_str)
queries = ("select 1", "select \ufffc2")
for query, parsed_query in zip(queries, mycli.packages.special.split_queries(sql_input)):
assert query == parsed_query
def test_switch_delimiter_within_query():
mycli.packages.special.set_delimiter(";")
sql_input = "select 1; delimiter $$ select 2 $$ select 3 $$"
queries = ("select 1", "delimiter $$ select 2 $$ select 3 $$", "select 2", "select 3")
for query, parsed_query in zip(queries, mycli.packages.special.split_queries(sql_input)):
assert query == parsed_query
def test_set_delimiter():
for delim in ("foo", "bar"):
mycli.packages.special.set_delimiter(delim)
assert mycli.packages.special.get_current_delimiter() == delim
def teardown_function():
mycli.packages.special.set_delimiter(";")