492 lines
15 KiB
Python
492 lines
15 KiB
Python
import os
|
|
import time
|
|
from prompt_toolkit.formatted_text import FormattedText
|
|
from iredis import renders
|
|
from iredis.config import config
|
|
from iredis.completers import IRedisCompleter
|
|
|
|
|
|
def strip_formatted_text(formatted_text):
|
|
return "".join(text[1] for text in formatted_text)
|
|
|
|
|
|
def test_render_simple_string_raw_using_raw_render():
|
|
assert renders.OutputRender.render_raw(b"OK") == b"OK"
|
|
assert renders.OutputRender.render_raw(b"BUMPED 1") == b"BUMPED 1"
|
|
assert renders.OutputRender.render_raw(b"STILL 1") == b"STILL 1"
|
|
|
|
|
|
def test_render_simple_string():
|
|
assert renders.OutputRender.render_simple_string(b"OK") == FormattedText(
|
|
[("class:success", "OK")]
|
|
)
|
|
assert renders.OutputRender.render_simple_string(b"BUMPED 1") == FormattedText(
|
|
[("class:success", "BUMPED 1")]
|
|
)
|
|
assert renders.OutputRender.render_simple_string(b"STILL 1") == FormattedText(
|
|
[("class:success", "STILL 1")]
|
|
)
|
|
|
|
|
|
def test_render_list_index():
|
|
raw = ["hello", "world", "foo"]
|
|
out = renders._render_list([item.encode() for item in raw], raw)
|
|
out = strip_formatted_text(out)
|
|
assert isinstance(out, str)
|
|
assert "3)" in out
|
|
assert "1)" in out
|
|
assert "4)" not in out
|
|
|
|
|
|
def test_render_list_index_const_width():
|
|
raw = ["hello"] * 100
|
|
out = renders._render_list([item.encode() for item in raw], raw)
|
|
out = strip_formatted_text(out)
|
|
assert isinstance(out, str)
|
|
assert " 1)" in out
|
|
assert "\n100)" in out
|
|
|
|
raw = ["hello"] * 1000
|
|
out = renders._render_list([item.encode() for item in raw], raw)
|
|
out = strip_formatted_text(out)
|
|
assert " 1)" in out
|
|
assert "\n 999)" in out
|
|
assert "\n1000)" in out
|
|
|
|
raw = ["hello"] * 10
|
|
out = renders._render_list([item.encode() for item in raw], raw)
|
|
out = strip_formatted_text(out)
|
|
assert " 1)" in out
|
|
assert "\n 9)" in out
|
|
assert "\n10)" in out
|
|
|
|
|
|
def test_render_list_using_raw_render():
|
|
raw = ["hello", "world", "foo"]
|
|
out = renders.OutputRender.render_raw([item.encode() for item in raw])
|
|
assert b"hello\nworld\nfoo" == out
|
|
|
|
|
|
def test_render_list_with_nil_init():
|
|
raw = [b"hello", None, b"world"]
|
|
out = renders.OutputRender.render_list(raw)
|
|
out = strip_formatted_text(out)
|
|
assert out == '1) "hello"\n2) (nil)\n3) "world"'
|
|
|
|
|
|
def test_render_list_with_nil_init_while_config_raw():
|
|
raw = [b"hello", None, b"world"]
|
|
out = renders.OutputRender.render_raw(raw)
|
|
assert out == b"hello\n\nworld"
|
|
|
|
|
|
def test_render_list_with_empty_list_raw():
|
|
raw = []
|
|
out = renders.OutputRender.render_raw(raw)
|
|
assert out == b""
|
|
|
|
|
|
def test_render_list_with_empty_list():
|
|
raw = []
|
|
out = renders.OutputRender.render_list(raw)
|
|
out = strip_formatted_text(out)
|
|
assert out == "(empty list or set)"
|
|
|
|
|
|
def test_ensure_str_bytes():
|
|
assert renders.ensure_str(b"hello world") == r"hello world"
|
|
assert renders.ensure_str(b"hello'world") == r"hello'world"
|
|
assert renders.ensure_str("你好".encode()) == r"\xe4\xbd\xa0\xe5\xa5\xbd"
|
|
|
|
|
|
def test_double_quotes():
|
|
assert renders.double_quotes('hello"world') == r'"hello\"world"'
|
|
assert renders.double_quotes('"hello\\world"') == '"\\"hello\\world\\""'
|
|
|
|
assert renders.double_quotes("'") == '"\'"'
|
|
assert renders.double_quotes("\\") == '"\\"'
|
|
assert renders.double_quotes('"') == '"\\""'
|
|
|
|
|
|
def test_render_int():
|
|
config.raw = False
|
|
assert renders.OutputRender.render_int(12) == FormattedText(
|
|
[("class:type", "(integer) "), ("", "12")]
|
|
)
|
|
|
|
|
|
def test_render_int_raw():
|
|
assert renders.OutputRender.render_raw(12) == b"12"
|
|
|
|
|
|
def test_render_list_or_string():
|
|
assert renders.OutputRender.render_list_or_string("") == '""'
|
|
assert renders.OutputRender.render_list_or_string("foo") == '"foo"'
|
|
assert renders.OutputRender.render_list_or_string(
|
|
[b"foo", b"bar"]
|
|
) == FormattedText(
|
|
[
|
|
("", "1)"),
|
|
("", " "),
|
|
("class:string", '"foo"'),
|
|
("", "\n"),
|
|
("", "2)"),
|
|
("", " "),
|
|
("class:string", '"bar"'),
|
|
]
|
|
)
|
|
|
|
|
|
def test_render_list_or_string_nil_and_empty_list():
|
|
assert renders.OutputRender.render_list_or_string(None) == FormattedText(
|
|
[("class:type", "(nil)")]
|
|
)
|
|
assert renders.OutputRender.render_list_or_string([]) == FormattedText(
|
|
[("class:type", "(empty list or set)")]
|
|
)
|
|
|
|
|
|
def test_render_raw_nil_and_empty_list():
|
|
assert renders.OutputRender.render_raw(None) == b""
|
|
assert renders.OutputRender.render_raw([]) == b""
|
|
|
|
|
|
def test_list_or_string():
|
|
config.raw = False
|
|
assert renders.OutputRender.render_string_or_int(b"10.1") == '"10.1"'
|
|
assert renders.OutputRender.render_string_or_int(3) == FormattedText(
|
|
[("class:type", "(integer) "), ("", "3")]
|
|
)
|
|
|
|
|
|
def test_command_keys():
|
|
completer = IRedisCompleter()
|
|
completer.key_completer.words = []
|
|
config.raw = False
|
|
rendered = renders.OutputRender.command_keys([b"cat", b"dog", b"banana"])
|
|
completer.update_completer_for_response("KEYS", None, [b"cat", b"dog", b"banana"])
|
|
|
|
assert rendered == FormattedText(
|
|
[
|
|
("", "1)"),
|
|
("", " "),
|
|
("class:key", '"cat"'),
|
|
("", "\n"),
|
|
("", "2)"),
|
|
("", " "),
|
|
("class:key", '"dog"'),
|
|
("", "\n"),
|
|
("", "3)"),
|
|
("", " "),
|
|
("class:key", '"banana"'),
|
|
]
|
|
)
|
|
assert completer.key_completer.words == ["banana", "dog", "cat"]
|
|
|
|
|
|
def test_command_scan():
|
|
completer = IRedisCompleter()
|
|
completer.key_completer.words = []
|
|
config.raw = False
|
|
rendered = renders.OutputRender.command_scan(
|
|
[b"44", [b"a", b"key:__rand_int__", b"dest", b" a"]]
|
|
)
|
|
completer.update_completer_for_response(
|
|
"SCAN", ("0",), [b"44", [b"a", b"key:__rand_int__", b"dest", b" a"]]
|
|
)
|
|
|
|
assert rendered == FormattedText(
|
|
[
|
|
("class:type", "(cursor) "),
|
|
("class:integer", "44"),
|
|
("", "\n"),
|
|
("", "1)"),
|
|
("", " "),
|
|
("class:key", '"a"'),
|
|
("", "\n"),
|
|
("", "2)"),
|
|
("", " "),
|
|
("class:key", '"key:__rand_int__"'),
|
|
("", "\n"),
|
|
("", "3)"),
|
|
("", " "),
|
|
("class:key", '"dest"'),
|
|
("", "\n"),
|
|
("", "4)"),
|
|
("", " "),
|
|
("class:key", '" a"'),
|
|
]
|
|
)
|
|
assert completer.key_completer.words == [" a", "dest", "key:__rand_int__", "a"]
|
|
|
|
|
|
def test_command_sscan():
|
|
completer = IRedisCompleter()
|
|
completer.member_completer.words = []
|
|
rendered = renders.OutputRender.command_sscan(
|
|
[b"44", [b"a", b"member:__rand_int__", b"dest", b" a"]]
|
|
)
|
|
completer.update_completer_for_response(
|
|
"SSCAN", (0), [b"44", [b"a", b"member:__rand_int__", b"dest", b" a"]]
|
|
)
|
|
|
|
assert rendered == FormattedText(
|
|
[
|
|
("class:type", "(cursor) "),
|
|
("class:integer", "44"),
|
|
("", "\n"),
|
|
("", "1)"),
|
|
("", " "),
|
|
("class:member", '"a"'),
|
|
("", "\n"),
|
|
("", "2)"),
|
|
("", " "),
|
|
("class:member", '"member:__rand_int__"'),
|
|
("", "\n"),
|
|
("", "3)"),
|
|
("", " "),
|
|
("class:member", '"dest"'),
|
|
("", "\n"),
|
|
("", "4)"),
|
|
("", " "),
|
|
("class:member", '" a"'),
|
|
]
|
|
)
|
|
assert completer.member_completer.words == [
|
|
" a",
|
|
"dest",
|
|
"member:__rand_int__",
|
|
"a",
|
|
]
|
|
|
|
|
|
def test_command_sscan_config_raw():
|
|
completer = IRedisCompleter()
|
|
completer.member_completer.words = []
|
|
rendered = renders.OutputRender.render_raw(
|
|
[b"44", [b"a", b"member:__rand_int__", b"dest", b" a"]]
|
|
)
|
|
completer.update_completer_for_response(
|
|
"SSCAN", (0), [b"44", [b"a", b"member:__rand_int__", b"dest", b" a"]]
|
|
)
|
|
|
|
assert rendered == b"44\na\nmember:__rand_int__\ndest\n a"
|
|
assert completer.member_completer.words == [
|
|
" a",
|
|
"dest",
|
|
"member:__rand_int__",
|
|
"a",
|
|
]
|
|
|
|
|
|
def test_render_members():
|
|
completer = IRedisCompleter()
|
|
completer.member_completer.words = []
|
|
config.withscores = True
|
|
resp = [b"duck", b"667", b"camel", b"708"]
|
|
rendered = renders.OutputRender.render_members(resp)
|
|
completer.update_completer_for_response("ZRANGE", ("foo", "0", "-1"), resp)
|
|
|
|
assert rendered == FormattedText(
|
|
[
|
|
("", "1)"),
|
|
("", " "),
|
|
("class:integer", "667 "),
|
|
("class:member", '"duck"'),
|
|
("", "\n"),
|
|
("", "2)"),
|
|
("", " "),
|
|
("class:integer", "708 "),
|
|
("class:member", '"camel"'),
|
|
]
|
|
)
|
|
assert completer.member_completer.words == ["camel", "duck"]
|
|
|
|
|
|
def test_render_members_config_raw():
|
|
completer = IRedisCompleter()
|
|
completer.member_completer.words = []
|
|
config.withscores = True
|
|
resp = [b"duck", b"667", b"camel", b"708"]
|
|
rendered = renders.OutputRender.render_raw(resp)
|
|
completer.update_completer_for_response("ZRANGE", (), resp)
|
|
|
|
assert rendered == b"duck\n667\ncamel\n708"
|
|
assert completer.member_completer.words == ["camel", "duck"]
|
|
|
|
|
|
def test_render_unixtime_config_raw():
|
|
# fake the timezone and reload
|
|
os.environ["TZ"] = "Asia/Shanghai"
|
|
time.tzset()
|
|
rendered = renders.OutputRender.render_unixtime(1570469891)
|
|
|
|
assert rendered == FormattedText(
|
|
[
|
|
("class:type", "(integer) "),
|
|
("", "1570469891"),
|
|
("", "\n"),
|
|
("class:type", "(local time)"),
|
|
("", " "),
|
|
("", "2019-10-08 01:38:11"),
|
|
]
|
|
)
|
|
|
|
|
|
def test_render_unixtime():
|
|
rendered = renders.OutputRender.render_raw(1570469891)
|
|
|
|
assert rendered == b"1570469891"
|
|
|
|
|
|
def test_bulk_string_reply():
|
|
assert renders.OutputRender.render_bulk_string(b"'\"") == '''"'\\""'''
|
|
|
|
|
|
def test_bulk_string_reply_raw():
|
|
assert renders.OutputRender.render_raw(b"hello") == b"hello"
|
|
|
|
|
|
def test_render_bulk_string_decoded():
|
|
EXPECTED_RENDER = """# Server\nredis_version:5.0.5\nredis_git_sha1:00000000\nredis_git_dirty:0\nredis_build_id:31cd6e21ec924b46""" # noqa
|
|
_input = b"# Server\r\nredis_version:5.0.5\r\nredis_git_sha1:00000000\r\nredis_git_dirty:0\r\nredis_build_id:31cd6e21ec924b46" # noqa
|
|
assert renders.OutputRender.render_bulk_string_decode(_input) == EXPECTED_RENDER
|
|
|
|
|
|
def test_render_bulk_string_decoded_with_decoded_utf8():
|
|
EXPECTED_RENDER = """# Server\nredis_version:5.0.5\nredis_git_sha1:00000000\nredis_git_dirty:0\nredis_build_id:31cd6e21ec924b46""" # noqa
|
|
_input = "# Server\r\nredis_version:5.0.5\r\nredis_git_sha1:00000000\r\nredis_git_dirty:0\r\nredis_build_id:31cd6e21ec924b46" # noqa
|
|
assert renders.OutputRender.render_bulk_string_decode(_input) == EXPECTED_RENDER
|
|
|
|
|
|
def test_render_time():
|
|
value = [b"1571305643", b"765481"]
|
|
assert renders.OutputRender.render_time(value) == FormattedText(
|
|
[
|
|
("class:type", "(unix timestamp) "),
|
|
("", "1571305643"),
|
|
("", "\n"),
|
|
("class:type", "(millisecond) "),
|
|
("", "765481"),
|
|
("", "\n"),
|
|
("class:type", "(convert to local timezone) "),
|
|
("", "2019-10-17 17:47:23.765481"),
|
|
]
|
|
)
|
|
|
|
assert renders.OutputRender.render_raw(value) == b"1571305643\n765481"
|
|
|
|
|
|
def test_render_nested_pairs():
|
|
text = [
|
|
b"peak.allocated",
|
|
10160336,
|
|
b"lua.caches",
|
|
0,
|
|
b"db.0",
|
|
[b"overhead.hashtable.main", 648, b"overhead.hashtable.expires", 32],
|
|
b"db.1",
|
|
[b"overhead.hashtable.main", 112, b"overhead.hashtable.expires", 32],
|
|
b"fragmentation",
|
|
b"0.062980629503726959",
|
|
b"fragmentation.bytes",
|
|
-9445680,
|
|
]
|
|
|
|
assert renders.OutputRender.render_raw(text) == (
|
|
b"peak.allocated\n10160336\nlua.caches\n0\ndb.0\noverhead.hashtable.main\n64"
|
|
b"8\noverhead.hashtable.expires\n32\ndb.1\noverhead.hashtable.main\n112\nove"
|
|
b"rhead.hashtable.expires\n32\nfragmentation\n0.062980629503726959\nfragmentat"
|
|
b"ion.bytes\n-9445680"
|
|
)
|
|
|
|
assert renders.OutputRender.render_nested_pair(text) == FormattedText(
|
|
[
|
|
("class:string", "peak.allocated: "),
|
|
("class:value", "10160336"),
|
|
("", "\n"),
|
|
("class:string", "lua.caches: "),
|
|
("class:value", "0"),
|
|
("", "\n"),
|
|
("class:string", "db.0: "),
|
|
("", "\n"),
|
|
("class:string", " overhead.hashtable.main: "),
|
|
("class:value", "648"),
|
|
("", "\n"),
|
|
("class:string", " overhead.hashtable.expires: "),
|
|
("class:value", "32"),
|
|
("", "\n"),
|
|
("class:string", "db.1: "),
|
|
("", "\n"),
|
|
("class:string", " overhead.hashtable.main: "),
|
|
("class:value", "112"),
|
|
("", "\n"),
|
|
("class:string", " overhead.hashtable.expires: "),
|
|
("class:value", "32"),
|
|
("", "\n"),
|
|
("class:string", "fragmentation: "),
|
|
("class:value", "0.062980629503726959"),
|
|
("", "\n"),
|
|
("class:string", "fragmentation.bytes: "),
|
|
("class:value", "-9445680"),
|
|
]
|
|
)
|
|
|
|
|
|
def test_render_nested_list():
|
|
text = [[b"get", 2, [b"readonly", b"fast"], 1, 1, 1]]
|
|
assert renders.OutputRender.render_list(text) == FormattedText(
|
|
[
|
|
("", "1)"),
|
|
("", " "),
|
|
("", "1)"),
|
|
("", " "),
|
|
("class:string", '"get"'),
|
|
("", "\n"),
|
|
("", " "),
|
|
("", "2)"),
|
|
("", " "),
|
|
("class:string", '"2"'),
|
|
("", "\n"),
|
|
("", " "),
|
|
("", "3)"),
|
|
("", " "),
|
|
("", "1)"),
|
|
("", " "),
|
|
("class:string", '"readonly"'),
|
|
("", "\n"),
|
|
("", " "),
|
|
("", "2)"),
|
|
("", " "),
|
|
("class:string", '"fast"'),
|
|
("", "\n"),
|
|
("", " "),
|
|
("", "4)"),
|
|
("", " "),
|
|
("class:string", '"1"'),
|
|
("", "\n"),
|
|
("", " "),
|
|
("", "5)"),
|
|
("", " "),
|
|
("class:string", '"1"'),
|
|
("", "\n"),
|
|
("", " "),
|
|
("", "6)"),
|
|
("", " "),
|
|
("class:string", '"1"'),
|
|
]
|
|
)
|
|
|
|
|
|
def test_render_bytes(config):
|
|
assert renders.OutputRender.render_bytes(b"bytes\n") == b"bytes"
|
|
|
|
|
|
def test_render_bytes_raw(config):
|
|
assert renders.OutputRender.render_raw(b"bytes\n") == b"bytes\n"
|
|
|
|
|
|
def test_render_help(config):
|
|
assert renders.OutputRender.render_help([b"foo", b"bar"]) == FormattedText(
|
|
[("class:string", "foo\nbar")]
|
|
)
|