1
0
Fork 0
iredis/tests/unittests/test_entry.py

273 lines
7 KiB
Python
Raw Normal View History

import pytest
import tempfile
from unittest.mock import patch
from prompt_toolkit.formatted_text import FormattedText
from iredis.entry import (
gather_args,
parse_url,
SkipAuthFileHistory,
write_result,
is_too_tall,
)
from iredis.utils import DSN
@pytest.mark.parametrize(
"is_tty,raw_arg_is_raw,final_config_is_raw",
[
(True, None, False),
(True, True, True),
(True, False, False),
(False, None, True),
(False, True, True),
(False, False, True), # not tty
],
)
def test_command_entry_tty(is_tty, raw_arg_is_raw, final_config_is_raw, config):
# is tty + raw -> raw
with patch("sys.stdout.isatty") as patch_tty:
patch_tty.return_value = is_tty
if raw_arg_is_raw is None:
call = ["iredis"]
elif raw_arg_is_raw is True:
call = ["iredis", "--raw"]
elif raw_arg_is_raw is False:
call = ["iredis", "--no-raw"]
else:
raise Exception()
gather_args.main(call, standalone_mode=False)
assert config.raw == final_config_is_raw
def test_disable_pager():
from iredis.config import config
gather_args.main(["iredis", "--decode", "utf-8"], standalone_mode=False)
assert config.enable_pager
gather_args.main(["iredis", "--no-pager"], standalone_mode=False)
assert not config.enable_pager
def test_command_with_decode_utf_8():
from iredis.config import config
gather_args.main(["iredis", "--decode", "utf-8"], standalone_mode=False)
assert config.decode == "utf-8"
gather_args.main(["iredis"], standalone_mode=False)
assert config.decode == ""
def test_command_with_shell_pipeline():
from iredis.config import config
gather_args.main(["iredis", "--no-shell"], standalone_mode=False)
assert config.shell is False
gather_args.main(["iredis"], standalone_mode=False)
assert config.shell is True
def test_command_shell_options_higher_priority():
from iredis.config import config
from textwrap import dedent
config_content = dedent(
"""
[main]
shell = False
"""
)
with open("/tmp/iredisrc", "w+") as etc_config:
etc_config.write(config_content)
gather_args.main(["iredis", "--iredisrc", "/tmp/iredisrc"], standalone_mode=False)
assert config.shell is False
gather_args.main(
["iredis", "--shell", "--iredisrc", "/tmp/iredisrc"], standalone_mode=False
)
assert config.shell is True
@pytest.mark.parametrize(
"url,dsn",
[
(
"redis://localhost:6379/3",
DSN(
scheme="redis",
host="localhost",
port=6379,
path=None,
db=3,
username=None,
password=None,
),
),
(
"redis://localhost:6379",
DSN(
scheme="redis",
host="localhost",
port=6379,
path=None,
db=0,
username=None,
password=None,
),
),
(
"rediss://localhost:6379",
DSN(
scheme="rediss",
host="localhost",
port=6379,
path=None,
db=0,
username=None,
password=None,
),
),
(
"redis://username:password@localhost:6379",
DSN(
scheme="redis",
host="localhost",
port=6379,
path=None,
db=0,
username="username",
password="password",
),
),
(
"redis://:password@localhost:6379",
DSN(
scheme="redis",
host="localhost",
port=6379,
path=None,
db=0,
username=None,
password="password",
),
),
(
"redis://username@localhost:12345",
DSN(
scheme="redis",
host="localhost",
port=12345,
path=None,
db=0,
username="username",
password=None,
),
),
(
# query string won't work for redis://
"redis://username@localhost:6379?db=2",
DSN(
scheme="redis",
host="localhost",
port=6379,
path=None,
db=0,
username="username",
password=None,
),
),
(
"unix://username:password2@/tmp/to/socket.sock?db=0",
DSN(
scheme="unix",
host=None,
port=None,
path="/tmp/to/socket.sock",
db=0,
username="username",
password="password2",
),
),
(
"unix://:password3@/path/to/socket.sock",
DSN(
scheme="unix",
host=None,
port=None,
path="/path/to/socket.sock",
db=0,
username=None,
password="password3",
),
),
(
"unix:///tmp/socket.sock",
DSN(
scheme="unix",
host=None,
port=None,
path="/tmp/socket.sock",
db=0,
username=None,
password=None,
),
),
],
)
def test_parse_url(url, dsn):
assert parse_url(url) == dsn
@pytest.mark.parametrize(
"command,record",
[
("set foo bar", True),
("set auth bar", True),
("auth 123", False),
("AUTH hello", False),
("AUTH hello world", False),
],
)
def test_history(command, record):
f = tempfile.TemporaryFile("w+")
history = SkipAuthFileHistory(f.name)
assert history._loaded_strings == []
history.append_string(command)
assert (command in history._loaded_strings) is record
def test_write_result_for_str(capsys):
write_result("hello")
captured = capsys.readouterr()
assert captured.out == "hello\n"
def test_write_result_for_bytes(capsys):
write_result(b"hello")
captured = capsys.readouterr()
assert captured.out == "hello\n"
def test_write_result_for_formatted_text():
ft = FormattedText([("class:keyword", "set"), ("class:string", "hello world")])
# just this test not raise means ok
write_result(ft)
def test_is_too_tall_for_formatted_text():
ft = FormattedText([("class:key", f"key-{index}\n") for index in range(21)])
assert is_too_tall(ft, 20)
assert not is_too_tall(ft, 22)
def test_is_too_tall_for_bytes():
byte_text = b"".join([b"key\n" for index in range(21)])
assert is_too_tall(byte_text, 20)
assert not is_too_tall(byte_text, 23)