1
0
Fork 0

Adding upstream version 1.9.1.

Signed-off-by: Daniel Baumann <daniel@debian.org>
This commit is contained in:
Daniel Baumann 2025-02-09 16:57:44 +01:00
parent 2bf0435a35
commit 031879240c
Signed by: daniel
GPG key ID: FBB4F0E80A80222F
356 changed files with 26924 additions and 0 deletions

View file

@ -0,0 +1,56 @@
def test_ip_match(judge_command):
def ip_valid(ip, valid):
if valid:
judge_command(
f"cluster meet {ip} 6379",
{"command": "cluster meet", "ip": ip, "port": "6379"},
)
else:
judge_command(f"cluster meet {ip} 6379", None)
ip_valid("192.168.0.1", True)
ip_valid("255.255.255.255", True)
ip_valid("192.168.0.256", False)
ip_valid("192.256.0.26", False)
ip_valid("192.255.256.26", False)
ip_valid("0.0.0.0", True)
ip_valid("99.999.100.1", False)
ip_valid("300.168.0.1", False)
def test_port_match(judge_command):
def port_valid(port, valid):
if valid:
judge_command(
f"cluster meet 192.168.0.1 {port}",
{"command": "cluster meet", "ip": "192.168.0.1", "port": port},
)
else:
judge_command(f"cluster meet 192.168.0.1 {port}", None)
port_valid("65535", True)
port_valid("0", False)
port_valid("1", True)
port_valid("192.168.0.256", False)
port_valid("65536", False)
port_valid("65545", False)
port_valid("65635", False)
port_valid("66535", False)
port_valid("75535", False)
port_valid("1024", True)
port_valid("6553", True)
port_valid("99999", False)
port_valid("99999999", False)
def test_command_with_key_in_quotes(judge_command):
judge_command(
'cluster keyslot "mykey"', {"command": "cluster keyslot", "key": '"mykey"'}
)
judge_command(
'cluster keyslot "\\"mykey"',
{"command": "cluster keyslot", "key": '"\\"mykey"'},
)
judge_command(
'cluster keyslot "mykey "', {"command": "cluster keyslot", "key": '"mykey "'}
)

View file

@ -0,0 +1,227 @@
"""
redis command in `cluster` group parse test.
"""
def test_command_cluster_addslots(judge_command):
judge_command("cluster addslots 1", {"command": "cluster addslots", "slots": "1"})
judge_command("CLUSTER ADDSLOTS 1", {"command": "CLUSTER ADDSLOTS", "slots": "1"})
judge_command(
"cluster addslots 1 2 3 4", {"command": "cluster addslots", "slots": "1 2 3 4"}
)
judge_command("cluster addslots 1 a", None)
judge_command("cluster addslots a", None)
judge_command("cluster addslots a 4", None)
judge_command("cluster addslots abc", None)
def test_command_cluster_count_failure_reports(judge_command):
judge_command(
"cluster count-failure-reports 1",
{"command": "cluster count-failure-reports", "node": "1"},
)
judge_command(
"CLUSTER COUNT-FAILURE-REPORTS 1",
{"command": "CLUSTER COUNT-FAILURE-REPORTS", "node": "1"},
)
judge_command("cluster count-failure-reports 1 2 3 4", None)
judge_command("cluster count-failure-reports 1 a", None)
judge_command("cluster count-failure-reports a", None)
judge_command("cluster count-failure-reports a 2", None)
judge_command("cluster count-failure-reports abc", None)
def test_command_cluster_countkeysinslot(judge_command):
judge_command(
"cluster countkeysinslot 1", {"command": "cluster countkeysinslot", "slot": "1"}
)
judge_command(
"CLUSTER COUNTKEYSINSLOT 1", {"command": "CLUSTER COUNTKEYSINSLOT", "slot": "1"}
)
judge_command("cluster countkeysinslot 1 2 3 4", None)
judge_command("cluster countkeysinslot 1 a", None)
judge_command("cluster countkeysinslot a", None)
judge_command("cluster countkeysinslot a 4", None)
judge_command("cluster countkeysinslot abc", None)
def test_command_cluster_delslots(judge_command):
judge_command("cluster delslots 1", {"command": "cluster delslots", "slots": "1"})
judge_command("CLUSTER DELSLOTS 1", {"command": "CLUSTER DELSLOTS", "slots": "1"})
judge_command(
"cluster delslots 1 2 3 4", {"command": "cluster delslots", "slots": "1 2 3 4"}
)
judge_command("cluster delslots 1 a", None)
judge_command("cluster delslots a", None)
judge_command("cluster delslots a 4", None)
judge_command("cluster delslots abc", None)
def test_command_cluster_failover(judge_command):
judge_command(
"cluster failover force",
{"command": "cluster failover", "failoverchoice": "force"},
)
judge_command(
"cluster failover takeover",
{"command": "cluster failover", "failoverchoice": "takeover"},
)
judge_command(
"CLUSTER FAILOVER FORCE",
{"command": "CLUSTER FAILOVER", "failoverchoice": "FORCE"},
)
judge_command(
"CLUSTER FAILOVER takeover",
{"command": "CLUSTER FAILOVER", "failoverchoice": "takeover"},
)
judge_command(
"CLUSTER FAILOVER TAKEOVER",
{"command": "CLUSTER FAILOVER", "failoverchoice": "TAKEOVER"},
)
def test_command_cluster_forget(judge_command):
judge_command("cluster forget 1", {"command": "cluster forget", "node": "1"})
judge_command(
"CLUSTER COUNT-FAILURE-REPORTS 1",
{"command": "CLUSTER COUNT-FAILURE-REPORTS", "node": "1"},
)
judge_command("cluster forget 1 2 3 4", None)
judge_command("cluster forget 1 a", None)
judge_command("cluster forget a", None)
judge_command("cluster forget a 2", None)
judge_command("cluster forget abc", None)
def test_command_cluster_getkeysinslot(judge_command):
judge_command(
"cluster getkeysinslot 1 1",
{"command": "cluster getkeysinslot", "slot": "1", "count": "1"},
)
judge_command(
"CLUSTER GETKEYSINSLOT 1 1",
{"command": "CLUSTER GETKEYSINSLOT", "slot": "1", "count": "1"},
)
judge_command(
"cluster getkeysinslot 1123 1121",
{"command": "cluster getkeysinslot", "slot": "1123", "count": "1121"},
)
judge_command("cluster getkeysinslot 1 2 3 4", None)
judge_command("cluster getkeysinslot 1 a", None)
judge_command("cluster getkeysinslot a", None)
judge_command("cluster getkeysinslot a 4", None)
judge_command("cluster getkeysinslot abc", None)
def test_command_cluster_info(judge_command):
judge_command("cluster info", {"command": "cluster info"})
judge_command("CLUSTER INFO", {"command": "CLUSTER INFO"})
judge_command("CLUSTER INFO 1", None)
judge_command("Acluster info", "invalid")
def test_command_cluster_keyslot(judge_command):
judge_command(
"cluster keyslot mykey", {"command": "cluster keyslot", "key": "mykey"}
)
judge_command(
"cluster keyslot MYKEY", {"command": "cluster keyslot", "key": "MYKEY"}
)
judge_command(
"CLUSTER KEYSLOT MYKEY", {"command": "CLUSTER KEYSLOT", "key": "MYKEY"}
)
def test_command_cluster_meet(judge_command):
judge_command(
"cluster meet 192.168.0.1 12200",
{"command": "cluster meet", "ip": "192.168.0.1", "port": "12200"},
)
judge_command(
"CLUSTER MEET 192.168.0.1 12200",
{"command": "CLUSTER MEET", "ip": "192.168.0.1", "port": "12200"},
)
def test_command_cluster_nodes(judge_command):
judge_command("cluster nodes", {"command": "cluster nodes"})
judge_command("CLUSTER NODES", {"command": "CLUSTER NODES"})
def test_command_cluster_reset(judge_command):
judge_command(
"cluster reset hard", {"command": "cluster reset", "resetchoice": "hard"}
)
judge_command(
"cluster reset soft", {"command": "cluster reset", "resetchoice": "soft"}
)
judge_command(
"CLUSTER RESET HARD", {"command": "CLUSTER RESET", "resetchoice": "HARD"}
)
judge_command(
"CLUSTER RESET soft", {"command": "CLUSTER RESET", "resetchoice": "soft"}
)
judge_command(
"CLUSTER RESET SOFT", {"command": "CLUSTER RESET", "resetchoice": "SOFT"}
)
judge_command("CLUSTER RESET SOFT1", None)
judge_command("CLUSTER RESET SAOFT", None)
def test_command_cluster_set_config_epoch(judge_command):
judge_command("cluster set-config-epoch 123123 ad", None)
judge_command(
"cluster set-config-epoch 0 ",
{"command": "cluster set-config-epoch", "epoch": "0"},
)
judge_command(
"cluster set-config-epoch 123123 ",
{"command": "cluster set-config-epoch", "epoch": "123123"},
)
def test_command_cluster_set_slot(judge_command):
judge_command(
"cluster setslot 123 importing 123123",
{
"command": "cluster setslot",
"slot": "123",
"slotsubcmd": "importing",
"node": "123123",
},
)
judge_command(
"cluster setslot 123 migrating 123123",
{
"command": "cluster setslot",
"slot": "123",
"slotsubcmd": "migrating",
"node": "123123",
},
)
judge_command(
"cluster setslot 123 node 123123",
{
"command": "cluster setslot",
"slot": "123",
"slotsubcmd": "node",
"node": "123123",
},
)
judge_command(
"cluster setslot 123 MIGRATING 123123",
{
"command": "cluster setslot",
"slot": "123",
"slotsubcmd": "MIGRATING",
"node": "123123",
},
)
judge_command(
"cluster setslot 123 stable",
{"command": "cluster setslot", "slot": "123", "slotsubcmd": "stable"},
)
judge_command(
"cluster setslot 123 STABLE",
{"command": "cluster setslot", "slot": "123", "slotsubcmd": "STABLE"},
)

View file

@ -0,0 +1,97 @@
def test_auth(judge_command):
judge_command("auth 123", {"command": "auth", "password": "123"})
def test_auth_redis6(judge_command):
from iredis.commands import command2syntax
from iredis.redis_grammar import get_command_grammar
get_command_grammar.cache_clear()
old = command2syntax["AUTH"]
command2syntax["AUTH"] = "command_usernamex_password"
judge_command(
"auth default 123",
{"command": "auth", "password": "123", "username": "default"},
)
judge_command("AUTH 123", {"command": "AUTH", "password": "123"})
command2syntax["AUTH"] = old
def test_echo(judge_command):
judge_command("echo hello", {"command": "echo", "message": "hello"})
def test_ping(judge_command):
judge_command("ping hello", {"command": "ping", "message": "hello"})
judge_command("ping", {"command": "ping", "message": None})
judge_command("ping hello world", None)
def test_select(judge_command):
for index in range(16):
judge_command(f"select {index}", {"command": "select", "index": str(index)})
for index in range(16, 100):
judge_command(f"select {index}", None)
judge_command("select acb", None)
def test_swapdb(judge_command):
for index1 in range(16):
for index2 in range(16):
judge_command(
f"swapdb {index1} {index2}",
{"command": "swapdb", "index": [str(index1), str(index2)]},
)
judge_command("swapdb abc 1", None)
judge_command("swapdb 1", None)
def test_client_caching(judge_command):
judge_command("CLIENT CACHING YES", {"command": "CLIENT CACHING", "yes": "YES"})
judge_command("CLIENT CACHING NO", {"command": "CLIENT CACHING", "yes": "NO"})
judge_command("CLIENT CACHING", None)
judge_command("CLIENT CACHING abc", None)
def test_client_tracking(judge_command):
judge_command("CLIENT TRACKING on", {"command": "CLIENT TRACKING", "on_off": "on"})
judge_command(
"CLIENT TRACKING ON REDIRECT 123",
{
"command": "CLIENT TRACKING",
"on_off": "ON",
"redirect_const": "REDIRECT",
"clientid": "123",
},
)
judge_command(
"CLIENT TRACKING ON PREFIX foo",
{
"command": "CLIENT TRACKING",
"on_off": "ON",
"prefix_const": "PREFIX",
"prefix": "foo",
},
)
judge_command(
"CLIENT TRACKING ON PREFIX foo",
{
"command": "CLIENT TRACKING",
"on_off": "ON",
"prefix_const": "PREFIX",
"prefix": "foo",
},
)
judge_command(
"CLIENT TRACKING ON PREFIX foo BCAST NOLOOP OPTIN",
{
"command": "CLIENT TRACKING",
"on_off": "ON",
"prefix_const": "PREFIX",
"prefix": "foo",
"bcast_const": "BCAST",
"noloop_const": "NOLOOP",
"optin_const": "OPTIN",
},
)

View file

@ -0,0 +1,179 @@
def test_del(judge_command):
judge_command("DEL abc", {"command": "DEL", "keys": "abc"})
judge_command("DEL bc1", {"command": "DEL", "keys": "bc1"})
judge_command("DEL 1", {"command": "DEL", "keys": "1"})
judge_command('DEL "hello world"', {"command": "DEL", "keys": '"hello world"'})
judge_command(
r'DEL "hell\"o world"', {"command": "DEL", "keys": r'"hell\"o world"'}
)
judge_command("DEL abc def", {"command": "DEL", "keys": "abc def"})
judge_command("DEL 1 2", {"command": "DEL", "keys": "1 2"})
judge_command("DEL 'he \"llo'", {"command": "DEL", "keys": "'he \"llo'"})
judge_command(
"""DEL 'he "llo' "abc" 'def' """,
{"command": "DEL", "keys": "'he \"llo' \"abc\" 'def'"},
)
def test_exists(judge_command):
judge_command("EXISTS foo bar", {"command": "EXISTS", "keys": "foo bar"})
judge_command("EXISTS foo", {"command": "EXISTS", "keys": "foo"})
judge_command("EXISTS 1", {"command": "EXISTS", "keys": "1"})
judge_command('EXISTS "foo bar"', {"command": "EXISTS", "keys": '"foo bar"'})
judge_command(r'EXISTS "\""', {"command": "EXISTS", "keys": r'"\""'})
def test_expire(judge_command):
judge_command("EXPIRE key 12", {"command": "EXPIRE", "key": "key", "second": "12"})
judge_command("EXPIRE key a12", None)
judge_command("EXPIRE 12 12", {"command": "EXPIRE", "key": "12", "second": "12"})
judge_command("EXPIRE 12", None)
def test_expireat(judge_command):
judge_command(
"EXPIRE key 1565787643",
{"command": "EXPIRE", "key": "key", "second": "1565787643"},
)
judge_command("EXPIRE key a12", None)
def test_keys(judge_command):
judge_command("KEYS *", {"command": "KEYS", "pattern": "*"})
judge_command("KEYS *abc", {"command": "KEYS", "pattern": "*abc"})
judge_command("keys abc*", {"command": "keys", "pattern": "abc*"})
def test_move(judge_command):
judge_command("MOVE key 14", {"command": "MOVE", "key": "key", "index": "14"})
def test_pexpire(judge_command):
judge_command(
"PEXPIRE key 12", {"command": "PEXPIRE", "key": "key", "millisecond": "12"}
)
judge_command("PEXPIRE key a12", None)
judge_command(
"PEXPIRE 12 12", {"command": "PEXPIRE", "key": "12", "millisecond": "12"}
)
judge_command("PEXPIRE 12", None)
def test_pexpireat(judge_command):
judge_command(
"PEXPIREAT key 1565787643",
{"command": "PEXPIREAT", "key": "key", "timestampms": "1565787643"},
)
judge_command("PEXPIREAT key a12", None)
def test_rename(judge_command):
judge_command(
"rename key newkey", {"command": "rename", "key": "key", "newkey": "newkey"}
)
judge_command(
"rename 123 newkey", {"command": "rename", "key": "123", "newkey": "newkey"}
)
judge_command("rename 123 ", None)
def test_scan(judge_command):
judge_command(
"SCAN 0 MATCH task* COUNT 15 TYPE string",
{
"command": "SCAN",
"cursor": "0",
"match": "MATCH",
"pattern": "task*",
"count_const": "COUNT",
"count": "15",
"type_const": "TYPE",
"type": "string",
},
)
judge_command("SCAN 0", {"command": "SCAN", "cursor": "0"})
judge_command(
"SCAN 0 MATCH task*",
{"command": "SCAN", "cursor": "0", "match": "MATCH", "pattern": "task*"},
)
judge_command(
"SCAN 0 COUNT 15 TYPE string",
{
"command": "SCAN",
"cursor": "0",
"count_const": "COUNT",
"count": "15",
"type_const": "TYPE",
"type": "string",
},
)
def test_migrate(judge_command):
judge_command(
'MIGRATE 192.168.1.34 6379 " " 0 5000 KEYS key1 key2 key3',
{
"command": "MIGRATE",
"host": "192.168.1.34",
"port": "6379",
"key": '" "',
"index": "0",
"timeout": "5000",
"const_keys": "KEYS",
"keys": "key1 key2 key3",
},
)
judge_command(
"MIGRATE 192.168.1.34 6379 foo 0 5000 auth password1 KEYS key1 key2 key3",
{
"command": "MIGRATE",
"host": "192.168.1.34",
"port": "6379",
"key": "foo",
"index": "0",
"timeout": "5000",
"const_keys": "KEYS",
"keys": "key1 key2 key3",
"auth": "auth",
"password": "password1",
},
)
judge_command(
"MIGRATE 192.168.1.34 6379 foo 0 5000 auth username1 password1 KEYS key1 key2 key3",
{
"command": "MIGRATE",
"host": "192.168.1.34",
"port": "6379",
"key": "foo",
"index": "0",
"timeout": "5000",
"const_keys": "KEYS",
"keys": "key1 key2 key3",
"auth": "auth",
"password": "password1",
"username": "username1",
},
)
def test_object(judge_command):
judge_command(
"object refcount mylist",
{"command": "object", "object": "refcount", "key": "mylist"},
)
def test_wait(judge_command):
judge_command("WAIT 3 100", {"command": "WAIT", "count": "3", "timeout": "100"})
def test_restore(judge_command):
judge_command(
'RESTORE mykey 0 "\n\x17\x17\x00\x00\x00\x12\x00\x00\x00\x03\x00\x00\xc0\x01\x00\x04\xc0\x02\x00\x04\xc0\x03\x00\xff\x04\x00u#<\xc0;.\xe9\xdd"', # noqa
{
"command": "RESTORE",
"key": "mykey",
"timeout": "0",
"value": '"\n\x17\x17\x00\x00\x00\x12\x00\x00\x00\x03\x00\x00\xc0\x01\x00\x04\xc0\x02\x00\x04\xc0\x03\x00\xff\x04\x00u#<\xc0;.\xe9\xdd"', # noqa
},
)

View file

@ -0,0 +1,39 @@
def test_geoadd(judge_command):
judge_command(
'GEOADD Sicily 13.361389 38.115556 "Palermo" 15.087269 37.502669 "Catania"',
{
"command": "GEOADD",
"key": "Sicily",
"longitude": "15.087269",
"latitude": "37.502669",
"member": '"Catania"',
},
)
def test_georadiusbymember(judge_command):
judge_command(
"GEORADIUSBYMEMBER Sicily Agrigento 100 km",
{
"command": "GEORADIUSBYMEMBER",
"key": "Sicily",
"member": "Agrigento",
"float": "100",
"distunit": "km",
},
)
def test_georadius(judge_command):
judge_command(
"GEORADIUS Sicily 15 37 200 km WITHDIST WITHCOORD ",
{
"command": "GEORADIUS",
"key": "Sicily",
"longitude": "15",
"latitude": "37",
"float": "200",
"distunit": "km",
"geochoice": "WITHCOORD",
},
)

View file

@ -0,0 +1,45 @@
def test_hdel(judge_command):
judge_command("HDEL foo bar", {"command": "HDEL", "key": "foo", "fields": "bar"})
judge_command(
"HDEL foo bar hello world",
{"command": "HDEL", "key": "foo", "fields": "bar hello world"},
)
def test_hmset(judge_command):
judge_command(
"HMSET foo bar hello-world",
{"command": "HMSET", "key": "foo", "field": "bar", "value": "hello-world"},
)
judge_command(
"HMSET foo bar hello-world key2 value2",
{"command": "HMSET", "key": "foo", "field": "key2", "value": "value2"},
)
def test_hexists(judge_command):
judge_command(
"HEXISTS foo bar", {"command": "HEXISTS", "key": "foo", "field": "bar"}
)
judge_command("HEXISTS foo bar hello-world", None)
def test_hincrby(judge_command):
judge_command(
"HINCRBY foo bar 12",
{"command": "HINCRBY", "key": "foo", "field": "bar", "delta": "12"},
)
def test_hincrbyfloat(judge_command):
judge_command(
"HINCRBYFLOAT foo bar 12.1",
{"command": "HINCRBYFLOAT", "key": "foo", "field": "bar", "float": "12.1"},
)
def test_hset(judge_command):
judge_command(
"HSET foo bar hello",
{"command": "HSET", "key": "foo", "field": "bar", "value": "hello"},
)

View file

@ -0,0 +1,5 @@
def test_pfmerge(judge_command):
judge_command(
"PFMERGE hll3 hll1 hll2",
{"command": "PFMERGE", "newkey": "hll3", "keys": "hll1 hll2"},
)

View file

@ -0,0 +1,108 @@
def test_rpush(judge_command):
judge_command(
"RPUSH list1 foo bar hello world",
{"command": "RPUSH", "key": "list1", "values": "foo bar hello world"},
)
judge_command(
"LPUSH list1 foo", {"command": "LPUSH", "key": "list1", "values": "foo"}
)
def test_lindex(judge_command):
judge_command(
"LINDEX list1 10", {"command": "LINDEX", "key": "list1", "position": "10"}
)
judge_command(
"LINDEX list1 -10", {"command": "LINDEX", "key": "list1", "position": "-10"}
)
judge_command("LINDEX list1 1.1", None)
def test_lset(judge_command):
judge_command(
"LSET list1 10 newbie",
{"command": "LSET", "key": "list1", "position": "10", "value": "newbie"},
)
judge_command(
"LSET list1 -1 newbie",
{"command": "LSET", "key": "list1", "position": "-1", "value": "newbie"},
)
def test_brpoplpush(judge_command):
judge_command(
"BRPOPLPUSH list1 list2 10",
{"command": "BRPOPLPUSH", "key": "list1", "newkey": "list2", "timeout": "10"},
)
judge_command(
"BRPOPLPUSH list1 list2 0",
{"command": "BRPOPLPUSH", "key": "list1", "newkey": "list2", "timeout": "0"},
)
judge_command("BRPOPLPUSH list1 list2 -1", None)
def test_linsert(judge_command):
judge_command(
'LINSERT mylist BEFORE "World" "There"',
{
"command": "LINSERT",
"key": "mylist",
"position_choice": "BEFORE",
"value": ['"World"', '"There"'],
},
)
judge_command(
'LINSERT mylist after "World" "There"',
{
"command": "LINSERT",
"key": "mylist",
"position_choice": "after",
"value": ['"World"', '"There"'],
},
)
def test_lpos(judge_command):
judge_command("LPOS mylist c", {"command": "LPOS", "key": "mylist", "element": "c"})
judge_command(
"LPOS mylist c RANK 2",
{
"command": "LPOS",
"key": "mylist",
"element": "c",
"rank_const": "RANK",
"rank": "2",
},
)
judge_command(
"LPOS mylist c RANK -1",
{
"command": "LPOS",
"key": "mylist",
"element": "c",
"rank_const": "RANK",
"rank": "-1",
},
)
judge_command(
"LPOS mylist c COUNT 2",
{
"command": "LPOS",
"key": "mylist",
"element": "c",
"count_const": "COUNT",
"count": "2",
},
)
judge_command(
"LPOS mylist c RANK -1 COUNT 2",
{
"command": "LPOS",
"key": "mylist",
"element": "c",
"count_const": "COUNT",
"count": "2",
"rank_const": "RANK",
"rank": "-1",
},
)

View file

@ -0,0 +1,15 @@
def test_publish(judge_command):
judge_command(
"publish foo bar", {"command": "publish", "channel": "foo", "message": "bar"}
)
def test_subscribe(judge_command):
judge_command("subscribe foo bar", {"command": "subscribe", "channel": "bar"})
def test_pubsub(judge_command):
judge_command(
"PUBSUB NUMSUB foo bar",
{"command": "PUBSUB", "pubsubcmd": "NUMSUB", "channel": "bar"},
)

View file

@ -0,0 +1,22 @@
def test_eval(judge_command):
judge_command(
'eval "return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}" 2 key1 key2 first second',
{
"command": "eval",
"double_lua": "return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}",
"any": "2 key1 key2 first second",
},
)
judge_command(
"eval 'return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}' 2 key1 key2 first second",
{
"command": "eval",
"single_lua": "return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}",
"any": "2 key1 key2 first second",
},
)
def test_scriptdebug(judge_command):
judge_command("SCRIPT DEBUG YES", {"command": "SCRIPT DEBUG", "scriptdebug": "YES"})
judge_command("SCRIPT DEBUG no", {"command": "SCRIPT DEBUG", "scriptdebug": "no"})

View file

@ -0,0 +1,201 @@
def test_client_setname(judge_command):
judge_command(
"CLIENT SETNAME foobar", {"command": "CLIENT SETNAME", "value": "foobar"}
)
def test_client_unblock(judge_command):
judge_command(
"CLIENT UNBLOCK 33 TIMEOUT",
{"command": "CLIENT UNBLOCK", "clientid": "33", "error": "TIMEOUT"},
)
judge_command("CLIENT UNBLOCK 33", {"command": "CLIENT UNBLOCK", "clientid": "33"})
def test_flushdb(judge_command):
judge_command("FLUSHDB async", {"command": "FLUSHDB", "async": "async"})
judge_command("FLUSHDB", {"command": "FLUSHDB"})
judge_command("FLUSHDB ASYNC", {"command": "FLUSHDB", "async": "ASYNC"})
judge_command("FLUSHALL ASYNC", {"command": "FLUSHALL", "async": "ASYNC"})
def test_client_list(judge_command):
judge_command("client list", {"command": "client list"})
judge_command("client list TYPE REPLICA1", None)
judge_command(
"client list type master",
{"command": "client list", "type_const": "type", "conntype": "master"},
)
judge_command(
"client list TYPE REPLICA",
{"command": "client list", "type_const": "TYPE", "conntype": "REPLICA"},
)
def test_configset(judge_command):
judge_command(
"config set foo bar",
{"command": "config set", "parameter": "foo", "value": "bar"},
)
judge_command(
"config set requirepass ''",
{"command": "config set", "parameter": "requirepass", "value": "''"},
)
def test_shutdown(judge_command):
judge_command("shutdown save", {"command": "shutdown", "shutdown": "save"})
judge_command("shutdown NOSAVE", {"command": "shutdown", "shutdown": "NOSAVE"})
def test_clientpause(judge_command):
judge_command("client pause 3000", {"command": "client pause", "timeout": "3000"})
def test_client_reply(judge_command):
judge_command("client reply on", {"command": "client reply", "switch": "on"})
def test_client_kill(judge_command):
judge_command(
"CLIENT KILL addr 127.0.0.1:12345 type pubsub",
{
"command": "CLIENT KILL",
"addr": "addr",
"ip_port": "127.0.0.1:12345",
"type_const": "type",
"conntype": "pubsub",
},
)
judge_command(
"CLIENT KILL 127.0.0.1:12345 ",
{"command": "CLIENT KILL", "ip_port": "127.0.0.1:12345"},
)
judge_command(
"CLIENT KILL id 123455 type pubsub skipme no",
{
"command": "CLIENT KILL",
"const_id": "id",
"clientid": "123455",
"type_const": "type",
"conntype": "pubsub",
"skipme": "skipme",
"yes": "no",
},
)
def test_client_kill_username(judge_command):
"""since redis-server 6.0"""
judge_command(
"client kill USER default",
{"command": "client kill", "const_user": "USER", "username": "default"},
)
def test_client_kill_unordered_arguments(judge_command):
judge_command(
"CLIENT KILL type pubsub addr 127.0.0.1:12345",
{
"command": "CLIENT KILL",
"addr": "addr",
"ip_port": "127.0.0.1:12345",
"type_const": "type",
"conntype": "pubsub",
},
)
def test_psync(judge_command):
judge_command(
"PSYNC abc 123", {"command": "PSYNC", "replicationid": "abc", "offset": "123"}
)
judge_command("PSYNC", None)
def test_latency_graph(judge_command):
judge_command(
"latency graph command", {"command": "latency graph", "graphevent": "command"}
)
judge_command(
"latency graph fork", {"command": "latency graph", "graphevent": "fork"}
)
judge_command("latency graph", None)
def test_latency_reset(judge_command):
judge_command(
"latency reset command fork aof-fsync-always",
{"command": "latency reset", "graphevent": "aof-fsync-always"},
)
judge_command(
"latency reset fork", {"command": "latency reset", "graphevent": "fork"}
)
judge_command("latency reset", {"command": "latency reset"})
def test_lolwut(judge_command):
judge_command("lolwut", {"command": "lolwut"})
# only works before redis 6
judge_command("lolwut 5", {"command": "lolwut", "any": "5"})
judge_command("lolwut 5 1", {"command": "lolwut", "any": "5 1"})
# redis 6
judge_command(
"lolwut VERSION 5 5",
{"command": "lolwut", "version": "VERSION", "version_num": "5", "any": "5"},
)
def test_info(judge_command):
judge_command("info cpu", {"command": "info", "section": "cpu"})
judge_command("info", {"command": "info"})
judge_command("info all", {"command": "info", "section": "all"})
judge_command("info CPU", {"command": "info", "section": "CPU"})
def test_bgsave(judge_command):
judge_command("bgsave", {"command": "bgsave"})
judge_command("bgsave schedule", {"command": "bgsave", "schedule": "schedule"})
judge_command("BGSAVE SCHEDULE", {"command": "BGSAVE", "schedule": "SCHEDULE"})
def test_acl_cat(judge_command):
judge_command("acl cat", {"command": "acl cat"})
judge_command("acl CAT", {"command": "acl CAT"})
judge_command(
"ACL CAT scripting", {"command": "ACL CAT", "categoryname": "scripting"}
)
judge_command("ACL CAT WATCH", {"command": "ACL CAT", "categoryname": "WATCH"})
def test_acl_deluser(judge_command):
judge_command(
"acl deluser laixintao", {"command": "acl deluser", "username": "laixintao"}
)
judge_command(
"acl deluser laixintao antirez",
{"command": "acl deluser", "username": "antirez"},
)
def test_acl_log(judge_command):
judge_command("acl log 2", {"command": "acl log", "count": "2"})
judge_command("acl log reset", {"command": "acl log", "reset_const": "reset"})
judge_command("acl log ", {"command": "acl log"})
def test_acl_setuser(judge_command):
judge_command("ACL SETUSER alice", {"command": "ACL SETUSER", "username": "alice"})
judge_command(
"ACL SETUSER alice on >p1pp0 ~cached:* +get",
{"command": "ACL SETUSER", "username": "alice", "rule": "+get"},
)
judge_command(
"ACL SETUSER alan allkeys +@string +@set -SADD >alanpassword",
{"command": "ACL SETUSER", "username": "alan", "rule": ">alanpassword"},
)
def test_acl_getuser(judge_command):
judge_command("acl getuser alan", {"command": "acl getuser", "username": "alan"})
judge_command("acl getuser", None)

View file

@ -0,0 +1,40 @@
def test_sadd(judge_command):
judge_command(
"SADD foo m1 m2 m3", {"command": "SADD", "key": "foo", "members": "m1 m2 m3"}
)
judge_command("SADD foo m1", {"command": "SADD", "key": "foo", "members": "m1"})
judge_command("SADD foo", None)
def test_sdiffstore(judge_command):
judge_command(
"SDIFFSTORE foo m1 m2 m3",
{"command": "SDIFFSTORE", "destination": "foo", "keys": "m1 m2 m3"},
)
judge_command(
"SDIFFSTORE foo m1",
{"command": "SDIFFSTORE", "destination": "foo", "keys": "m1"},
)
judge_command("SDIFFSTORE foo", None)
def test_is_member(judge_command):
judge_command("SISMEMBER foo m1 m2 m3", None)
judge_command(
"SISMEMBER foo m1", {"command": "SISMEMBER", "key": "foo", "member": "m1"}
)
judge_command("SISMEMBER foo", None)
def test_smove(judge_command):
judge_command(
"SMOVE foo bar m2",
{"command": "SMOVE", "key": "foo", "newkey": "bar", "member": "m2"},
)
judge_command("SMOVE foo m1", None)
judge_command("SMOVE foo", None)
def test_spop(judge_command):
judge_command("SPOP set", {"command": "SPOP", "key": "set"})
judge_command("SPOP set 3", {"command": "SPOP", "key": "set", "count": "3"})

View file

@ -0,0 +1,172 @@
import pytest
def test_zcount(judge_command):
judge_command(
"zcount foo -10 0",
{"command": "zcount", "key": "foo", "min": "-10", "max": "0"},
)
def test_bzpopmax(judge_command):
judge_command(
"bzpopmax set set2 set3 4",
{"command": "bzpopmax", "keys": "set set2 set3", "timeout": "4"},
)
judge_command(
"bzpopmin set 4", {"command": "bzpopmin", "keys": "set", "timeout": "4"}
)
def test_zadd(judge_command):
judge_command(
"zadd t 100 qewqr 23 pp 11 oo",
{
"command": "zadd",
"key": "t",
"score": "11", # FIXME: only have last one
"member": "oo",
},
)
judge_command(
"zadd t incr 100 foo",
{
"command": "zadd",
"key": "t",
"incr": "incr",
"score": "100", # FIXME: only have last one
"member": "foo",
},
)
judge_command(
"zadd t NX CH incr 100 foo",
{
"command": "zadd",
"key": "t",
"condition": "NX",
"changed": "CH",
"incr": "incr",
"score": "100", # FIXME: only have last one
"member": "foo",
},
)
def test_zincrby(judge_command):
judge_command(
"zincrby t 10 foo",
{"command": "zincrby", "key": "t", "float": "10", "member": "foo"},
)
judge_command(
"zincrby t 2.3 foo",
{"command": "zincrby", "key": "t", "float": "2.3", "member": "foo"},
)
def test_zlexcount(judge_command):
judge_command(
"zlexcount a - +",
{"command": "zlexcount", "key": "a", "lexmin": "-", "lexmax": "+"},
)
judge_command(
"zlexcount a (aaaa [z",
{"command": "zlexcount", "key": "a", "lexmin": "(aaaa", "lexmax": "[z"},
)
judge_command(
"ZLEXCOUNT myset - [c",
{"command": "ZLEXCOUNT", "key": "myset", "lexmin": "-", "lexmax": "[c"},
)
judge_command(
"ZLEXCOUNT myset [aaa (g",
{"command": "ZLEXCOUNT", "key": "myset", "lexmin": "[aaa", "lexmax": "(g"},
)
def test_zrange(judge_command):
judge_command(
"zrange foo -1 10",
{"command": "zrange", "key": "foo", "start": "-1", "end": "10"},
)
judge_command(
"zrange foo 0 -1",
{"command": "zrange", "key": "foo", "start": "0", "end": "-1"},
)
judge_command(
"zrange foo 0 -1 withscores",
{
"command": "zrange",
"key": "foo",
"start": "0",
"end": "-1",
"withscores": "withscores",
},
)
@pytest.mark.xfail(reason="Not implemented yet")
def test_zinterstore(judge_command):
judge_command("ZINTERSTORE out 2 zset1 zset2 WEIGHTS 2 3", {})
judge_command("ZINTERSTORE out 2 zset1 zset2 WEIGHTS -1 -2", {})
judge_command("ZINTERSTORE out 2 zset1 zset2 WEIGHTS 0.2 0.3", {})
def test_zrangebylex(judge_command):
judge_command(
"ZRANGEBYLEX myzset [aaa (g",
{"command": "ZRANGEBYLEX", "key": "myzset", "lexmin": "[aaa", "lexmax": "(g"},
)
judge_command(
"ZRANGEBYLEX myzset - (c",
{"command": "ZRANGEBYLEX", "key": "myzset", "lexmin": "-", "lexmax": "(c"},
)
judge_command(
"ZRANGEBYLEX myzset - (c limit 10 100",
{
"command": "ZRANGEBYLEX",
"key": "myzset",
"lexmin": "-",
"lexmax": "(c",
"limit": "limit",
"offset": "10",
"count": "100",
},
)
judge_command(
"ZRANGEBYLEX myzset - (c limit 10 -1",
{
"command": "ZRANGEBYLEX",
"key": "myzset",
"lexmin": "-",
"lexmax": "(c",
"limit": "limit",
"offset": "10",
"count": "-1",
},
)
def test_zrangebyscore(judge_command):
judge_command(
"ZRANGEBYSCORE myzset -inf +inf",
{"command": "ZRANGEBYSCORE", "key": "myzset", "min": "-inf", "max": "+inf"},
)
judge_command(
"ZRANGEBYSCORE myzset 1 2",
{"command": "ZRANGEBYSCORE", "key": "myzset", "min": "1", "max": "2"},
)
judge_command(
"ZRANGEBYSCORE myzset (1 (2",
{"command": "ZRANGEBYSCORE", "key": "myzset", "min": "(1", "max": "(2"},
)
judge_command(
"ZRANGEBYSCORE myzset -inf +inf LIMIT 10 100",
{
"command": "ZRANGEBYSCORE",
"key": "myzset",
"min": "-inf",
"max": "+inf",
"limit": "LIMIT",
"offset": "10",
"count": "100",
},
)

View file

@ -0,0 +1,459 @@
def test_xrange(judge_command):
judge_command(
"XRANGE somestream - +",
{"command": "XRANGE", "key": "somestream", "stream_id": ["-", "+"]},
)
judge_command(
"XRANGE somestream 1526985054069 1526985055069",
{
"command": "XRANGE",
"key": "somestream",
"stream_id": ["1526985054069", "1526985055069"],
},
)
judge_command(
"XRANGE somestream 1526985054069 1526985055069-10",
{
"command": "XRANGE",
"key": "somestream",
"stream_id": ["1526985054069", "1526985055069-10"],
},
)
judge_command(
"XRANGE somestream 1526985054069 1526985055069-10 count 10",
{
"command": "XRANGE",
"key": "somestream",
"stream_id": ["1526985054069", "1526985055069-10"],
"count_const": "count",
"count": "10",
},
)
def test_xgroup_create(judge_command):
judge_command(
"XGROUP CREATE mykey mygroup 123",
{
"command": "XGROUP",
"stream_create": "CREATE",
"key": "mykey",
"group": "mygroup",
"stream_id": "123",
},
)
judge_command(
"XGROUP CREATE mykey mygroup $",
{
"command": "XGROUP",
"stream_create": "CREATE",
"key": "mykey",
"group": "mygroup",
"stream_id": "$",
},
)
# short of a parameter
judge_command("XGROUP CREATE mykey mygroup", None)
judge_command("XGROUP CREATE mykey", None)
def test_xgroup_setid(judge_command):
judge_command(
"XGROUP SETID mykey mygroup 123",
{
"command": "XGROUP",
"stream_setid": "SETID",
"key": "mykey",
"group": "mygroup",
"stream_id": "123",
},
)
judge_command(
"XGROUP SETID mykey mygroup $",
{
"command": "XGROUP",
"stream_setid": "SETID",
"key": "mykey",
"group": "mygroup",
"stream_id": "$",
},
)
# two subcommand together shouldn't match
judge_command("XGROUP CREATE mykey mygroup 123 SETID mykey mygroup $", None)
def test_xgroup_destroy(judge_command):
judge_command(
"XGROUP destroy mykey mygroup",
{
"command": "XGROUP",
"stream_destroy": "destroy",
"key": "mykey",
"group": "mygroup",
},
)
judge_command("XGROUP destroy mykey", None)
judge_command("XGROUP DESTROY mykey mygroup $", None)
def test_xgroup_delconsumer(judge_command):
judge_command(
"XGROUP delconsumer mykey mygroup myconsumer",
{
"command": "XGROUP",
"stream_delconsumer": "delconsumer",
"key": "mykey",
"group": "mygroup",
"consumer": "myconsumer",
},
)
judge_command(
"XGROUP delconsumer mykey mygroup $",
{
"command": "XGROUP",
"stream_delconsumer": "delconsumer",
"key": "mykey",
"group": "mygroup",
"consumer": "$",
},
)
judge_command("XGROUP delconsumer mykey mygroup", None)
def test_xgroup_stream(judge_command):
judge_command(
"XACK mystream group1 123123",
{
"command": "XACK",
"key": "mystream",
"group": "group1",
"stream_id": "123123",
},
)
judge_command(
"XACK mystream group1 123123 111",
{"command": "XACK", "key": "mystream", "group": "group1", "stream_id": "111"},
)
def test_xinfo(judge_command):
judge_command(
"XINFO consumers mystream mygroup",
{
"command": "XINFO",
"stream_consumers": "consumers",
"key": "mystream",
"group": "mygroup",
},
)
judge_command(
"XINFO GROUPS mystream",
{"command": "XINFO", "stream_groups": "GROUPS", "key": "mystream"},
)
judge_command(
"XINFO STREAM mystream",
{"command": "XINFO", "stream": "STREAM", "key": "mystream"},
)
judge_command("XINFO HELP", {"command": "XINFO", "help": "HELP"})
judge_command("XINFO consumers mystream mygroup GROUPS mystream", None)
judge_command("XINFO groups mystream mygroup", None)
def test_xinfo_with_full(judge_command):
judge_command(
"XINFO STREAM mystream FULL",
{
"command": "XINFO",
"stream": "STREAM",
"key": "mystream",
"full_const": "FULL",
},
)
judge_command(
"XINFO STREAM mystream FULL count 10",
{
"command": "XINFO",
"stream": "STREAM",
"key": "mystream",
"full_const": "FULL",
"count_const": "count",
"count": "10",
},
)
def test_xpending(judge_command):
judge_command(
"XPENDING mystream group55",
{"command": "XPENDING", "key": "mystream", "group": "group55"},
)
judge_command(
"XPENDING mystream group55 myconsumer",
{
"command": "XPENDING",
"key": "mystream",
"group": "group55",
"consumer": "myconsumer",
},
)
judge_command(
"XPENDING mystream group55 - + 10",
{
"command": "XPENDING",
"key": "mystream",
"group": "group55",
"stream_id": ["-", "+"],
"count": "10",
},
)
judge_command(
"XPENDING mystream group55 - + 10 myconsumer",
{
"command": "XPENDING",
"key": "mystream",
"group": "group55",
"stream_id": ["-", "+"],
"count": "10",
"consumer": "myconsumer",
},
)
judge_command("XPENDING mystream group55 - + ", None)
def test_xadd(judge_command):
judge_command(
"xadd mystream MAXLEN ~ 1000 * key value",
{
"command": "xadd",
"key": "mystream",
"maxlen": "MAXLEN",
"approximately": "~",
"count": "1000",
"sfield": "key",
"svalue": "value",
"stream_id": "*",
},
)
# test for MAXLEN option
judge_command(
"xadd mystream MAXLEN 1000 * key value",
{
"command": "xadd",
"key": "mystream",
"maxlen": "MAXLEN",
"count": "1000",
"sfield": "key",
"svalue": "value",
"stream_id": "*",
},
)
judge_command(
"xadd mystream * key value",
{
"command": "xadd",
"key": "mystream",
"sfield": "key",
"svalue": "value",
"stream_id": "*",
},
)
# spcify stream id
judge_command(
"xadd mystream 123-123 key value",
{
"command": "xadd",
"key": "mystream",
"sfield": "key",
"svalue": "value",
"stream_id": "123-123",
},
)
judge_command(
"xadd mystream 123-123 key value foo bar hello world",
{
"command": "xadd",
"key": "mystream",
"sfield": "hello",
"svalue": "world",
"stream_id": "123-123",
},
)
def test_xtrim(judge_command):
judge_command(
" XTRIM mystream MAXLEN 2",
{"command": "XTRIM", "key": "mystream", "maxlen": "MAXLEN", "count": "2"},
)
judge_command(
" XTRIM mystream MAXLEN ~ 2",
{
"command": "XTRIM",
"key": "mystream",
"maxlen": "MAXLEN",
"count": "2",
"approximately": "~",
},
)
judge_command(" XTRIM mystream", None)
def test_xdel(judge_command):
judge_command(
"XDEL mystream 1581165000000 1549611229000 1581060831000",
{"command": "XDEL", "key": "mystream", "stream_id": "1581060831000"},
)
judge_command(
"XDEL mystream 1581165000000",
{"command": "XDEL", "key": "mystream", "stream_id": "1581165000000"},
)
def test_xclaim(judge_command):
judge_command(
"XCLAIM mystream mygroup Alice 3600000 1526569498055-0",
{
"command": "XCLAIM",
"key": "mystream",
"group": "mygroup",
"consumer": "Alice",
"millisecond": "3600000",
"stream_id": "1526569498055-0",
},
)
judge_command(
"XCLAIM mystream mygroup Alice 3600000 1526569498055-0 123 456 789",
{
"command": "XCLAIM",
"key": "mystream",
"group": "mygroup",
"consumer": "Alice",
"millisecond": "3600000",
"stream_id": "789",
},
)
judge_command(
"XCLAIM mystream mygroup Alice 3600000 1526569498055-0 IDEL 300",
{
"command": "XCLAIM",
"key": "mystream",
"group": "mygroup",
"consumer": "Alice",
"millisecond": ["3600000", "300"],
"stream_id": "1526569498055-0",
"idel": "IDEL",
},
)
judge_command(
"XCLAIM mystream mygroup Alice 3600000 1526569498055-0 retrycount 7",
{
"command": "XCLAIM",
"key": "mystream",
"group": "mygroup",
"consumer": "Alice",
"millisecond": "3600000",
"stream_id": "1526569498055-0",
"retrycount": "retrycount",
"count": "7",
},
)
judge_command(
"XCLAIM mystream mygroup Alice 3600000 1526569498055-0 TIME 123456789",
{
"command": "XCLAIM",
"key": "mystream",
"group": "mygroup",
"consumer": "Alice",
"millisecond": "3600000",
"stream_id": "1526569498055-0",
"time": "TIME",
"timestamp": "123456789",
},
)
judge_command(
"XCLAIM mystream mygroup Alice 3600000 1526569498055-0 FORCE",
{
"command": "XCLAIM",
"key": "mystream",
"group": "mygroup",
"consumer": "Alice",
"millisecond": "3600000",
"stream_id": "1526569498055-0",
"force": "FORCE",
},
)
judge_command(
"XCLAIM mystream mygroup Alice 3600000 1526569498055-0 JUSTID",
{
"command": "XCLAIM",
"key": "mystream",
"group": "mygroup",
"consumer": "Alice",
"millisecond": "3600000",
"stream_id": "1526569498055-0",
"justid": "JUSTID",
},
)
def test_xread(judge_command):
judge_command(
"XREAD COUNT 2 STREAMS mystream writers 0-0 0-0",
{
"command": "XREAD",
"count_const": "COUNT",
"count": "2",
"streams": "STREAMS",
# FIXME current grammar can't support multiple tokens
# so the ids will be recongized to keys.
"keys": "mystream writers 0-0",
"stream_id": "0-0",
},
)
judge_command(
"XREAD COUNT 2 BLOCK 1000 STREAMS mystream writers 0-0 0-0",
{
"command": "XREAD",
"count_const": "COUNT",
"count": "2",
"streams": "STREAMS",
"keys": "mystream writers 0-0",
"block": "BLOCK",
"millisecond": "1000",
"stream_id": "0-0",
},
)
def test_xreadgroup(judge_command):
judge_command(
"XREADGROUP GROUP mygroup1 Bob COUNT 1 BLOCK 100 NOACK STREAMS key1 1 key2 2",
{
"command": "XREADGROUP",
"stream_group": "GROUP",
"group": "mygroup1",
"consumer": "Bob",
"count_const": "COUNT",
"count": "1",
"block": "BLOCK",
"millisecond": "100",
"noack": "NOACK",
"streams": "STREAMS",
"keys": "key1 1 key2",
"stream_id": "2",
},
)
judge_command(
"XREADGROUP GROUP mygroup1 Bob STREAMS key1 1 key2 2",
{
"command": "XREADGROUP",
"stream_group": "GROUP",
"group": "mygroup1",
"consumer": "Bob",
"streams": "STREAMS",
"keys": "key1 1 key2",
"stream_id": "2",
},
)
judge_command("XREADGROUP GROUP group consumer", None)

View file

@ -0,0 +1,345 @@
def test_set(judge_command):
judge_command("SET abc bar", {"command": "SET", "key": "abc", "value": "bar"})
judge_command(
"SET abc bar EX 10",
{
"command": "SET",
"key": "abc",
"value": "bar",
"expiration": "EX",
"millisecond": "10",
},
)
judge_command(
"SET abc bar px 10000",
{
"command": "SET",
"key": "abc",
"value": "bar",
"expiration": "px",
"millisecond": "10000",
},
)
judge_command(
"SET abc bar px 10000 nx",
{
"command": "SET",
"key": "abc",
"value": "bar",
"expiration": "px",
"millisecond": "10000",
"condition": "nx",
},
)
judge_command(
"SET abc bar px 10000 XX",
{
"command": "SET",
"key": "abc",
"value": "bar",
"expiration": "px",
"millisecond": "10000",
"condition": "XX",
},
)
judge_command(
"SET abc bar XX px 10000",
{
"command": "SET",
"key": "abc",
"value": "bar",
"expiration": "px",
"millisecond": "10000",
"condition": "XX",
},
)
judge_command(
"SET abc bar XX",
{"command": "SET", "key": "abc", "value": "bar", "condition": "XX"},
)
# keepttl
judge_command(
"SET abc bar XX keepttl",
{
"command": "SET",
"key": "abc",
"value": "bar",
"condition": "XX",
"keepttl": "keepttl",
},
)
judge_command(
"SET abc bar keepttl XX",
{
"command": "SET",
"key": "abc",
"value": "bar",
"condition": "XX",
"keepttl": "keepttl",
},
)
judge_command(
"SET abc bar XX px 10000 KEEPTTL",
{
"command": "SET",
"key": "abc",
"value": "bar",
"expiration": "px",
"millisecond": "10000",
"condition": "XX",
"keepttl": "KEEPTTL",
},
)
def test_append(judge_command):
judge_command("append foo bar", {"command": "append", "key": "foo", "value": "bar"})
judge_command(
"APPEND foo 'bar'", {"command": "APPEND", "key": "foo", "value": "'bar'"}
)
judge_command("APPEND foo", None)
def test_bitcount(judge_command):
judge_command("bitcount foo", {"command": "bitcount", "key": "foo"})
judge_command(
"bitcount foo 1 5",
{"command": "bitcount", "key": "foo", "start": "1", "end": "5"},
)
judge_command(
"bitcount foo 1 -5",
{"command": "bitcount", "key": "foo", "start": "1", "end": "-5"},
)
judge_command(
"bitcount foo -2 -1",
{"command": "bitcount", "key": "foo", "start": "-2", "end": "-1"},
)
judge_command("bitcount foo -2", None)
def test_getrange(judge_command):
judge_command("getrange foo", None)
judge_command(
"getrange foo 1 5",
{"command": "getrange", "key": "foo", "start": "1", "end": "5"},
)
judge_command(
"getrange foo 1 -5",
{"command": "getrange", "key": "foo", "start": "1", "end": "-5"},
)
judge_command(
"getrange foo -2 -1",
{"command": "getrange", "key": "foo", "start": "-2", "end": "-1"},
)
judge_command("getrange foo -2", None)
def test_get_set(judge_command):
judge_command("GETSET abc bar", {"command": "GETSET", "key": "abc", "value": "bar"})
def test_incr(judge_command):
judge_command("INCR foo", {"command": "INCR", "key": "foo"})
judge_command("INCR", None)
judge_command("INCR foo 1", None)
def test_incr_by(judge_command):
judge_command("INCRBY foo", None)
judge_command("INCRBY", None)
judge_command("INCRBY foo 1", {"command": "INCRBY", "key": "foo", "delta": "1"})
judge_command("INCRBY foo 200", {"command": "INCRBY", "key": "foo", "delta": "200"})
judge_command("INCRBY foo -21", {"command": "INCRBY", "key": "foo", "delta": "-21"})
def test_decr(judge_command):
judge_command("DECR foo", {"command": "DECR", "key": "foo"})
judge_command("DECR", None)
judge_command("DECR foo 1", None)
def test_decr_by(judge_command):
judge_command("DECRBY foo", None)
judge_command("DECRBY", None)
judge_command("DECRBY foo 1", {"command": "DECRBY", "key": "foo", "delta": "1"})
judge_command("DECRBY foo 200", {"command": "DECRBY", "key": "foo", "delta": "200"})
judge_command("DECRBY foo -21", {"command": "DECRBY", "key": "foo", "delta": "-21"})
def test_command_set_range(judge_command):
judge_command(
"SETRANGE foo 10 bar",
{"command": "SETRANGE", "key": "foo", "offset": "10", "value": "bar"},
)
judge_command("SETRANGE foo bar", None)
judge_command(
"SETRANGE Redis 10 'hello world'",
{
"command": "SETRANGE",
"key": "Redis",
"offset": "10",
"value": "'hello world'",
},
)
def test_command_set_ex(judge_command):
judge_command(
"SETEX key 10 value",
{"command": "SETEX", "key": "key", "second": "10", "value": "value"},
)
judge_command("SETEX foo 10", None)
judge_command(
"setex Redis 10 'hello world'",
{"command": "setex", "key": "Redis", "second": "10", "value": "'hello world'"},
)
def test_command_setbit(judge_command):
judge_command(
"SETBIT key 10 0",
{"command": "SETBIT", "key": "key", "offset": "10", "bit": "0"},
)
judge_command(
"SETBIT foo 10 1",
{"command": "SETBIT", "key": "foo", "offset": "10", "bit": "1"},
)
judge_command("SETBIT foo 10 10", None)
judge_command("SETBIT foo 10 abc", None)
judge_command("SETBIT foo 10", None)
judge_command("SETBIT foo", None)
def test_command_getbit(judge_command):
judge_command("GETBIT key 10", {"command": "GETBIT", "key": "key", "offset": "10"})
judge_command("GETBIT foo 0", {"command": "GETBIT", "key": "foo", "offset": "0"})
judge_command("GETBIT foo -1", None)
judge_command("SETBIT foo abc", None)
judge_command("SETBIT foo", None)
def test_command_incrbyfloat(judge_command):
judge_command("INCRBYFLOAT key", None)
judge_command(
"INCRBYFLOAT key 1.1", {"command": "INCRBYFLOAT", "key": "key", "float": "1.1"}
)
judge_command(
"INCRBYFLOAT key .1", {"command": "INCRBYFLOAT", "key": "key", "float": ".1"}
)
judge_command(
"INCRBYFLOAT key 1.", {"command": "INCRBYFLOAT", "key": "key", "float": "1."}
)
judge_command(
"INCRBYFLOAT key 5.0e3",
{"command": "INCRBYFLOAT", "key": "key", "float": "5.0e3"},
)
judge_command(
"INCRBYFLOAT key -5.0e3",
{"command": "INCRBYFLOAT", "key": "key", "float": "-5.0e3"},
)
def test_command_mget(judge_command):
judge_command("mget foo bar", {"command": "mget", "keys": "foo bar"})
def test_mset(judge_command):
judge_command("mset foo bar", {"command": "mset", "key": "foo", "value": "bar"})
judge_command(
"mset foo bar hello world",
{"command": "mset", "key": "hello", "value": "world"},
)
def test_psetex(judge_command):
judge_command(
"psetex foo 1000 bar",
{"command": "psetex", "key": "foo", "value": "bar", "millisecond": "1000"},
)
judge_command("psetex foo bar", None)
def test_bitop(judge_command):
judge_command(
"BITOP AND dest key1 key2",
{"command": "BITOP", "operation": "AND", "key": "dest", "keys": "key1 key2"},
)
judge_command(
"BITOP AND dest key1",
{"command": "BITOP", "operation": "AND", "key": "dest", "keys": "key1"},
)
judge_command("BITOP AND dest", None)
def test_bitpos(judge_command):
judge_command(
"BITPOS mykey 1 3 5",
{"command": "BITPOS", "key": "mykey", "bit": "1", "start": "3", "end": "5"},
)
judge_command("BITPOS mykey 1", {"command": "BITPOS", "key": "mykey", "bit": "1"})
judge_command(
"BITPOS mykey 1 3",
{"command": "BITPOS", "key": "mykey", "bit": "1", "start": "3"},
)
def test_bitfield(judge_command):
judge_command(
"BITFIELD mykey INCRBY i5 100 1 GET u4 0",
{
"command": "BITFIELD",
"key": "mykey",
"incrby": "INCRBY",
"inttype": ["i5", "u4"],
"offset": ["100", "0"],
"value": "1",
"get": "GET",
},
)
judge_command(
"BITFIELD mystring SET i8 #0 100",
{
"command": "BITFIELD",
"key": "mystring",
"set": "SET",
"inttype": "i8",
"offset": "#0",
"value": "100",
},
)
judge_command(
"BITFIELD mykey incrby u2 100 1 OVERFLOW SAT incrby u2 102 1",
{
"command": "BITFIELD",
"key": "mykey",
"incrby": "incrby",
"inttype": "u2",
"offset": "102",
"value": "1",
"overflow": "OVERFLOW",
"overflow_option": "SAT",
},
)
def test_stralgo(judge_command):
judge_command(
"STRALGO LCS STRINGS ohmytext mynewtext",
{
"command": "STRALGO",
"str_algo": "LCS",
"strings_const": "STRINGS",
"values": "ohmytext mynewtext",
},
)
# Due to redis' command design, this can't be fix in any ways.
judge_command(
"STRALGO LCS STRINGS ohmytext mynewtext LEN",
{
"command": "STRALGO",
"str_algo": "LCS",
"strings_const": "STRINGS",
"values": "ohmytext mynewtext LEN",
},
)

View file

@ -0,0 +1,504 @@
import re
import pytest
import redis
from unittest.mock import MagicMock, patch
from textwrap import dedent
from prompt_toolkit.formatted_text import FormattedText
from iredis.client import Client
from iredis.config import config, load_config_files
from iredis.completers import IRedisCompleter
from iredis.entry import Rainbow, prompt_message
from iredis.exceptions import NotSupport
@pytest.fixture
def completer():
return IRedisCompleter()
@pytest.mark.parametrize(
"_input, command_name, expect_args",
[
("keys *", "keys", ["*"]),
("DEL abc foo bar", "DEL", ["abc", "foo", "bar"]),
("cluster info", "cluster info", []),
("CLUSTER failover FORCE", "CLUSTER failover", ["FORCE"]),
],
)
def test_send_command(_input, command_name, expect_args):
client = Client("127.0.0.1", "6379", None)
client.execute = MagicMock()
next(client.send_command(_input, None))
args, kwargs = client.execute.call_args
assert args == (command_name, *expect_args)
def test_client_not_support_hello_command(iredis_client):
with pytest.raises(NotSupport):
iredis_client.pre_hook("hello 3", "hello", "3", None)
def test_patch_completer():
client = Client("127.0.0.1", "6379", None)
completer = IRedisCompleter()
client.pre_hook(
"MGET foo bar hello world", "MGET", "foo bar hello world", completer
)
assert completer.key_completer.words == ["world", "hello", "bar", "foo"]
assert completer.key_completer.words == ["world", "hello", "bar", "foo"]
client.pre_hook("GET bar", "GET", "bar", completer)
assert completer.key_completer.words == ["bar", "world", "hello", "foo"]
def test_get_server_verison_after_client(config):
Client("127.0.0.1", "6379", None)
assert re.match(r"\d+\..*", config.version)
config.version = "Unknown"
config.no_info = True
Client("127.0.0.1", "6379", None)
assert config.version == "Unknown"
def test_do_help(config):
client = Client("127.0.0.1", "6379", None)
config.version = "5.0.0"
resp = client.do_help("SET")
assert resp[10] == ("", "1.0.0 (Avaiable on your redis-server: 5.0.0)")
config.version = "2.0.0"
resp = client.do_help("cluster", "addslots")
assert resp[10] == ("", "3.0.0 (Not avaiable on your redis-server: 2.0.0)")
def test_rainbow_iterator():
"test color infinite iterator"
original_color = Rainbow.color
Rainbow.color = list(range(0, 3))
assert list(zip(range(10), Rainbow())) == [
(0, 0),
(1, 1),
(2, 2),
(3, 1),
(4, 0),
(5, 1),
(6, 2),
(7, 1),
(8, 0),
(9, 1),
]
Rainbow.color = original_color
def test_prompt_message(iredis_client, config):
config.rainbow = False
assert prompt_message(iredis_client) == "127.0.0.1:6379[15]> "
config.rainbow = True
assert prompt_message(iredis_client)[:3] == [
("#cc2244", "1"),
("#bb4444", "2"),
("#996644", "7"),
]
def test_on_connection_error_retry(iredis_client, config):
config.retry_times = 1
mock_connection = MagicMock()
mock_connection.read_response.side_effect = [
redis.exceptions.ConnectionError(
"Error 61 connecting to 127.0.0.1:7788. Connection refused."
),
"hello",
]
original_connection = iredis_client.connection
iredis_client.connection = mock_connection
value = iredis_client.execute("None", "GET", ["foo"])
assert value == "hello" # be rendered
mock_connection.disconnect.assert_called_once()
mock_connection.connect.assert_called_once()
iredis_client.connection = original_connection
def test_on_connection_error_retry_without_retrytimes(iredis_client, config):
config.retry_times = 0
mock_connection = MagicMock()
mock_connection.read_response.side_effect = [
redis.exceptions.ConnectionError(
"Error 61 connecting to 127.0.0.1:7788. Connection refused."
),
"hello",
]
iredis_client.connection = mock_connection
with pytest.raises(redis.exceptions.ConnectionError):
iredis_client.execute("None", "GET", ["foo"])
mock_connection.disconnect.assert_not_called()
mock_connection.connect.assert_not_called()
def test_socket_keepalive(config):
config.socket_keepalive = True
from iredis.client import Client
newclient = Client("127.0.0.1", "6379", 0)
assert newclient.connection.socket_keepalive
# keepalive off
config.socket_keepalive = False
newclient = Client("127.0.0.1", "6379", 0)
assert not newclient.connection.socket_keepalive
def test_not_retry_on_authentication_error(iredis_client, config):
config.retry_times = 2
mock_connection = MagicMock()
mock_connection.read_response.side_effect = [
redis.exceptions.AuthenticationError("Authentication required."),
"hello",
]
iredis_client.connection = mock_connection
with pytest.raises(redis.exceptions.ConnectionError):
iredis_client.execute("None", "GET", ["foo"])
@pytest.mark.skipif("int(os.environ['REDIS_VERSION']) < 6")
def test_auto_select_db_and_auth_for_reconnect_only_6(iredis_client, config):
config.retry_times = 2
config.raw = True
next(iredis_client.send_command("select 2"))
assert iredis_client.connection.db == 2
resp = next(iredis_client.send_command("auth 123"))
assert (
b"ERROR AUTH <password> called without any "
b"password configured for the default user. "
b"Are you sure your configuration is correct?" in resp
)
assert iredis_client.connection.password is None
next(iredis_client.send_command("config set requirepass 'abc'"))
next(iredis_client.send_command("auth abc"))
assert iredis_client.connection.password == "abc"
assert (
iredis_client.execute("ACL SETUSER", "default", "on", "nopass", "~*", "+@all")
== b"OK"
)
@pytest.mark.skipif("int(os.environ['REDIS_VERSION']) > 5")
def test_auto_select_db_and_auth_for_reconnect_only_5(iredis_client, config):
config.retry_times = 2
config.raw = True
next(iredis_client.send_command("select 2"))
assert iredis_client.connection.db == 2
resp = next(iredis_client.send_command("auth 123"))
assert b"Client sent AUTH, but no password is set" in resp
assert iredis_client.connection.password is None
next(iredis_client.send_command("config set requirepass 'abc'"))
next(iredis_client.send_command("auth abc"))
assert iredis_client.connection.password == "abc"
next(iredis_client.send_command("config set requirepass ''"))
def test_split_shell_command(iredis_client, completer):
assert iredis_client.split_command_and_pipeline(" get json | rg . ", completer) == (
" get json ",
"rg . ",
)
assert iredis_client.split_command_and_pipeline(
""" get "json | \\" hello" | rg . """, completer
) == (""" get "json | \\" hello" """, "rg . ")
def test_running_with_pipeline(clean_redis, iredis_client, capfd, completer):
config.shell = True
clean_redis.set("foo", "hello \n world")
with pytest.raises(StopIteration):
next(iredis_client.send_command("get foo | grep w", completer))
out, err = capfd.readouterr()
assert out == " world\n"
def test_running_with_multiple_pipeline(clean_redis, iredis_client, capfd, completer):
config.shell = True
clean_redis.set("foo", "hello world\nhello iredis")
with pytest.raises(StopIteration):
next(
iredis_client.send_command("get foo | grep hello | grep iredis", completer)
)
out, err = capfd.readouterr()
assert out == "hello iredis\n"
def test_can_not_connect_on_startup(capfd):
with pytest.raises(SystemExit):
Client("localhost", "16111", 15)
out, err = capfd.readouterr()
assert "connecting to localhost:16111." in err
def test_peek_key_not_exist(iredis_client, clean_redis, config):
config.raw = False
peek_result = list(iredis_client.do_peek("non-exist-key"))
assert peek_result == ["non-exist-key doesn't exist."]
def test_peek_string(iredis_client, clean_redis):
clean_redis.set("foo", "bar")
peek_result = list(iredis_client.do_peek("foo"))
assert peek_result == [
FormattedText(
[
("class:dockey", "key: "),
("", "string (embstr) mem: 50 bytes, ttl: -1"),
("", "\n"),
("class:dockey", "strlen: "),
("", "3"),
("", "\n"),
("class:dockey", "value: "),
("", '"bar"'),
]
)
]
def test_peek_list_fetch_all(iredis_client, clean_redis):
clean_redis.lpush("mylist", *[f"hello-{index}" for index in range(5)])
peek_result = list(iredis_client.do_peek("mylist"))
assert peek_result == [
FormattedText(
[
("class:dockey", "key: "),
("", "list (quicklist) mem: 176 bytes, ttl: -1"),
("", "\n"),
("class:dockey", "llen: "),
("", "5"),
("", "\n"),
("class:dockey", "elements: "),
("", "\n"),
("", "1)"),
("", " "),
("class:string", '"hello-4"'),
("", "\n"),
("", "2)"),
("", " "),
("class:string", '"hello-3"'),
("", "\n"),
("", "3)"),
("", " "),
("class:string", '"hello-2"'),
("", "\n"),
("", "4)"),
("", " "),
("class:string", '"hello-1"'),
("", "\n"),
("", "5)"),
("", " "),
("class:string", '"hello-0"'),
]
)
]
def test_peek_list_fetch_part(iredis_client, clean_redis):
clean_redis.lpush("mylist", *[f"hello-{index}" for index in range(40)])
peek_result = list(iredis_client.do_peek("mylist"))
assert len(peek_result[0]) == 91
def test_peek_set_fetch_all(iredis_client, clean_redis):
clean_redis.sadd("myset", *[f"hello-{index}" for index in range(5)])
peek_result = list(iredis_client.do_peek("myset"))
assert len(peek_result[0]) == 27
def test_peek_set_fetch_part(iredis_client, clean_redis):
clean_redis.sadd("myset", *[f"hello-{index}" for index in range(40)])
peek_result = list(iredis_client.do_peek("myset"))
assert peek_result[0][0] == ("class:dockey", "key: ")
assert peek_result[0][1][1].startswith("set (hashtable) mem: 2")
def test_peek_zset_fetch_all(iredis_client, clean_redis):
clean_redis.zadd(
"myzset", dict(zip([f"hello-{index}" for index in range(3)], range(3)))
)
peek_result = list(iredis_client.do_peek("myzset"))
assert peek_result[0][0:9] == FormattedText(
[
("class:dockey", "key: "),
("", "zset (ziplist) mem: 92 bytes, ttl: -1"),
("", "\n"),
("class:dockey", "zcount: "),
("", "3"),
("", "\n"),
("class:dockey", "members: "),
("", "\n"),
("", "1)"),
]
)
def test_peek_zset_fetch_part(iredis_client, clean_redis):
clean_redis.zadd(
"myzset", dict(zip([f"hello-{index}" for index in range(40)], range(40)))
)
peek_result = list(iredis_client.do_peek("myzset"))
assert peek_result[0][0:8] == FormattedText(
[
("class:dockey", "key: "),
("", "zset (ziplist) mem: 556 bytes, ttl: -1"),
("", "\n"),
("class:dockey", "zcount: "),
("", "40"),
("", "\n"),
("class:dockey", "members (first 40): "),
("", "\n"),
]
)
def test_peek_hash_fetch_all(iredis_client, clean_redis):
for key, value in zip(
[f"hello-{index}" for index in range(3)], [f"hi-{index}" for index in range(3)]
):
clean_redis.hset("myhash", key, value)
peek_result = list(iredis_client.do_peek("myhash"))
assert len(peek_result[0]) == 28
def test_peek_hash_fetch_part(iredis_client, clean_redis):
for key, value in zip(
[f"hello-{index}" for index in range(100)],
[f"hi-{index}" for index in range(100)],
):
clean_redis.hset("myhash", key, value)
peek_result = list(iredis_client.do_peek("myhash"))
assert len(peek_result[0]) == 707
def test_peek_stream(iredis_client, clean_redis):
clean_redis.xadd("mystream", {"foo": "bar", "hello": "world"})
peek_result = list(iredis_client.do_peek("mystream"))
assert peek_result[0][0:18] == FormattedText(
[
("class:dockey", "key: "),
("", "stream (unknown) mem: 601 bytes, ttl: -1"),
("", "\n"),
("class:dockey", "XINFO: "),
("", "\n"),
("", " 1)"),
("", " "),
("class:string", '"length"'),
("", "\n"),
("", " 2)"),
("", " "),
("class:string", '"1"'),
("", "\n"),
("", " 3)"),
("", " "),
("class:string", '"radix-tree-keys"'),
("", "\n"),
("", " 4)"),
]
)
def test_mem_not_called_before_redis_4(config, iredis_client, clean_redis):
config.version = "3.2.9"
def wrapper(func):
def execute(command_name, *args):
print(command_name)
if command_name.upper() == "MEMORY USAGE":
raise Exception("MEMORY USAGE not supported!")
return func(command_name, *args)
return execute
iredis_client.execute = wrapper(iredis_client.execute)
clean_redis.set("foo", "bar")
result = list(iredis_client.do_peek("foo"))
assert result[0][1] == ("", "string (embstr), ttl: -1")
def test_mem_not_called_when_cant_get_server_version(
config, iredis_client, clean_redis
):
config.version = None
def wrapper(func):
def execute(command_name, *args):
print(command_name)
if command_name.upper() == "MEMORY USAGE":
raise Exception("MEMORY USAGE not supported!")
return func(command_name, *args)
return execute
iredis_client.execute = wrapper(iredis_client.execute)
clean_redis.set("foo", "bar")
result = list(iredis_client.do_peek("foo"))
assert result[0][1] == ("", "string (embstr), ttl: -1")
def test_reissue_command_on_redis_cluster(iredis_client, clean_redis):
mock_response = iredis_client.connection = MagicMock()
mock_response.read_response.side_effect = redis.exceptions.ResponseError(
"MOVED 12182 127.0.0.1:7002"
)
iredis_client.reissue_with_redirect = MagicMock()
iredis_client.execute("set", "foo", "bar")
assert iredis_client.reissue_with_redirect.call_args == (
(
"MOVED 12182 127.0.0.1:7002",
"set",
"foo",
"bar",
),
)
def test_reissue_command_on_redis_cluster_with_password_in_dsn(
iredis_client, clean_redis
):
config_content = dedent(
"""
[main]
log_location = /tmp/iredis1.log
no_info=True
[alias_dsn]
cluster-7003=redis://foo:bar@127.0.0.1:7003
"""
)
with open("/tmp/iredisrc", "w+") as etc_config:
etc_config.write(config_content)
config_obj = load_config_files("/tmp/iredisrc")
config.alias_dsn = config_obj["alias_dsn"]
mock_execute_by_connection = iredis_client.execute_by_connection = MagicMock()
with patch("redis.connection.Connection.connect"):
iredis_client.reissue_with_redirect(
"MOVED 12182 127.0.0.1:7003", "set", "foo", "bar"
)
call_args = mock_execute_by_connection.call_args[0]
print(call_args)
assert list(call_args[1:]) == ["set", "foo", "bar"]
assert call_args[0].password == "bar"

View file

@ -0,0 +1,356 @@
from unittest.mock import MagicMock, patch
import pendulum
from prompt_toolkit.formatted_text import FormattedText
from prompt_toolkit.completion import Completion
from iredis.completers import MostRecentlyUsedFirstWordCompleter
from iredis.completers import IRedisCompleter, TimestampCompleter, IntegerTypeCompleter
def test_LUF_completer_touch():
c = MostRecentlyUsedFirstWordCompleter(3, ["one", "two"])
c.touch("hello")
assert c.words == ["hello", "one", "two"]
c.touch("foo")
assert c.words == ["foo", "hello", "one"]
c.touch("hello")
assert c.words == ["hello", "foo", "one"]
def test_LUF_completer_touch_words():
c = MostRecentlyUsedFirstWordCompleter(3, [])
c.touch_words(["hello", "world", "foo", "bar"])
assert c.words == ["bar", "foo", "world"]
c.touch_words(["one", "two"])
assert c.words == ["two", "one", "bar"]
def test_newbie_mode_complete_without_meta_dict():
fake_document = MagicMock()
fake_document.text_before_cursor = fake_document.text = "GEOR"
completer = IRedisCompleter(hint=False)
completions = list(completer.get_completions(fake_document, None))
assert [word.text for word in completions] == ["GEORADIUS", "GEORADIUSBYMEMBER"]
assert [word.display_meta for word in completions] == [
FormattedText([("", "")]),
FormattedText([("", "")]),
]
def test_newbie_mode_complete_with_meta_dict():
fake_document = MagicMock()
fake_document.text_before_cursor = fake_document.text = "GEOR"
completer = IRedisCompleter(hint=True)
completions = list(completer.get_completions(fake_document, None))
assert sorted([completion.display_meta for completion in completions]) == [
FormattedText(
[
(
"",
"Query a sorted set representing a geospatial index to fetch members matching a given maximum distance from a member", # noqa
)
]
),
FormattedText(
[
(
"",
"Query a sorted set representing a geospatial index to fetch members matching a given maximum distance from a point", # noqa
)
]
),
]
def test_newbie_mode_complete_with_meta_dict_command_is_lowercase():
fake_document = MagicMock()
fake_document.text_before_cursor = fake_document.text = "geor"
completer = IRedisCompleter(hint=True)
completions = list(completer.get_completions(fake_document, None))
assert sorted([completion.display_meta for completion in completions]) == [
FormattedText(
[
(
"",
"Query a sorted set representing a geospatial index to fetch members matching a given maximum distance from a member", # noqa
)
]
),
FormattedText(
[
(
"",
"Query a sorted set representing a geospatial index to fetch members matching a given maximum distance from a point", # noqa
)
]
),
]
def test_iredis_completer_update_for_response():
c = IRedisCompleter()
c.update_completer_for_response(
"HGETALL",
(),
[
b"Behave",
b"misbehave",
b"Interpret",
b"misinterpret",
b"Lead",
b"mislead",
b"Trust",
b"mistrust",
],
)
assert c.field_completer.words == ["Trust", "Lead", "Interpret", "Behave"]
def test_categoryname_completer_update_for_response():
c = IRedisCompleter()
c.update_completer_for_response(
"ACL CAT",
(),
[b"scripting", b"watch"],
)
assert sorted(c.catetoryname_completer.words) == ["scripting", "watch"]
c.update_completer_for_response(
"ACL CAT",
("scripting"),
[b"foo", b"bar"],
)
assert sorted(c.catetoryname_completer.words) == ["scripting", "watch"]
def test_completer_when_there_are_spaces_in_command():
c = IRedisCompleter()
c.update_completer_for_response(
"ACL cat",
(),
[b"scripting", b"watch"],
)
assert sorted(c.catetoryname_completer.words) == ["scripting", "watch"]
c.update_completer_for_response(
"acl \t cat",
(),
[b"hello", b"world"],
)
assert sorted(c.catetoryname_completer.words) == [
"hello",
"scripting",
"watch",
"world",
]
def test_iredis_completer_no_exception_for_none_response():
c = IRedisCompleter()
c.update_completer_for_response("XPENDING", None, None)
c.update_completer_for_response("KEYS", None, None)
def test_group_completer():
fake_document = MagicMock()
previous_commands = ["xgroup create abc world 123", "xgroup setid abc hello 123"]
fake_document.text = fake_document.text_before_cursor = "XGROUP DESTROY key "
completer = IRedisCompleter()
for command in previous_commands:
completer.update_completer_for_input(command)
completions = list(completer.get_completions(fake_document, None))
assert completions == [
Completion(
text="hello",
start_position=0,
display=FormattedText([("", "hello")]),
display_meta=FormattedText([("", "")]),
),
Completion(
text="world",
start_position=0,
display=FormattedText([("", "world")]),
display_meta=FormattedText([("", "")]),
),
]
@patch("iredis.completers.pendulum.now")
def test_timestamp_completer_humanize_time_completion(fake_now):
fake_now.return_value = pendulum.from_timestamp(1578487013)
c = TimestampCompleter()
fake_document = MagicMock()
fake_document.text = fake_document.text_before_cursor = "30"
completions = list(c.get_completions(fake_document, None))
assert completions == [
Completion(
text="1575895013000",
start_position=-2,
display=FormattedText([("", "1575895013000")]),
display_meta="30 days ago (2019-12-09 12:36:53)",
),
Completion(
text="1578379013000",
start_position=-2,
display=FormattedText([("", "1578379013000")]),
display_meta="30 hours ago (2020-01-07 06:36:53)",
),
Completion(
text="1578485213000",
start_position=-2,
display=FormattedText([("", "1578485213000")]),
display_meta="30 minutes ago (2020-01-08 12:06:53)",
),
Completion(
text="1578486983000",
start_position=-2,
display=FormattedText([("", "1578486983000")]),
display_meta="30 seconds ago (2020-01-08 12:36:23)",
),
]
# No plural
fake_document.text = fake_document.text_before_cursor = "1"
completions = list(c.get_completions(fake_document, None))
assert completions == [
Completion(
text="1546951013000",
start_position=-1,
display=FormattedText([("", "1546951013000")]),
display_meta="1 year ago (2019-01-08 12:36:53)",
),
Completion(
text="1575808613000",
start_position=-1,
display=FormattedText([("", "1575808613000")]),
display_meta="1 month ago (2019-12-08 12:36:53)",
),
Completion(
text="1578400613000",
start_position=-1,
display=FormattedText([("", "1578400613000")]),
display_meta="1 day ago (2020-01-07 12:36:53)",
),
Completion(
text="1578483413000",
start_position=-1,
display=FormattedText([("", "1578483413000")]),
display_meta="1 hour ago (2020-01-08 11:36:53)",
),
Completion(
text="1578486953000",
start_position=-1,
display=FormattedText([("", "1578486953000")]),
display_meta="1 minute ago (2020-01-08 12:35:53)",
),
Completion(
text="1578487012000",
start_position=-1,
display=FormattedText([("", "1578487012000")]),
display_meta="1 second ago (2020-01-08 12:36:52)",
),
]
def test_timestamp_completer_datetime_format_time_completion():
c = TimestampCompleter()
fake_document = MagicMock()
fake_document.text = fake_document.text_before_cursor = "2020-02-07"
completions = list(c.get_completions(fake_document, None))
assert completions == [
Completion(
text="1581033600000",
start_position=-10,
display=FormattedText([("", "1581033600000")]),
display_meta="2020-02-07T00:00:00+00:00",
)
]
def test_integer_type_completer():
c = IntegerTypeCompleter()
fake_document = MagicMock()
fake_document.text = fake_document.get_word_before_cursor.return_value = "i"
completions = list(c.get_completions(fake_document, None))
assert len(completions) == 64
fake_document.text = fake_document.get_word_before_cursor.return_value = "u"
completions = list(c.get_completions(fake_document, None))
assert len(completions) == 63
c.touch("u4")
assert list(c.get_completions(fake_document, None))[0].text == "u4"
def test_completion_casing():
c = IRedisCompleter(completion_casing="auto")
fake_document = MagicMock()
fake_document.text = fake_document.text_before_cursor = "ge"
assert [
completion.text for completion in c.get_completions(fake_document, None)
] == [
"get",
"getset",
"getbit",
"geopos",
"geoadd",
"geohash",
"geodist",
"getrange",
"georadius",
"georadiusbymember",
]
c = IRedisCompleter(completion_casing="auto")
fake_document.text = fake_document.text_before_cursor = "GET"
assert [
completion.text for completion in c.get_completions(fake_document, None)
] == ["GET", "GETSET", "GETBIT", "GETRANGE"]
c = IRedisCompleter(completion_casing="upper")
fake_document.text = fake_document.text_before_cursor = "get"
assert [
completion.text for completion in c.get_completions(fake_document, None)
] == ["GET", "GETSET", "GETBIT", "GETRANGE"]
c = IRedisCompleter(completion_casing="lower")
fake_document.text = fake_document.text_before_cursor = "GET"
assert [
completion.text for completion in c.get_completions(fake_document, None)
] == ["get", "getset", "getbit", "getrange"]
def test_username_completer():
completer = IRedisCompleter()
completer.update_completer_for_input("acl deluser laixintao")
completer.update_completer_for_input("acl deluser antirez")
fake_document = MagicMock()
fake_document.text_before_cursor = fake_document.text = "acl deluser "
completions = list(completer.get_completions(fake_document, None))
assert sorted([completion.text for completion in completions]) == [
"antirez",
"laixintao",
]
def test_username_touch_for_response():
c = IRedisCompleter()
c.update_completer_for_response(
"acl users",
(),
[b"hello", b"world"],
)
assert sorted(c.username_completer.words) == [
"hello",
"world",
]

View file

@ -0,0 +1,272 @@
import pytest
import tempfile
from unittest.mock import patch
from prompt_toolkit.formatted_text import FormattedText
from iredis.entry import (
gather_args,
parse_url,
SkipAuthFileHistory,
write_result,
is_too_tall,
)
from iredis.utils import DSN
@pytest.mark.parametrize(
"is_tty,raw_arg_is_raw,final_config_is_raw",
[
(True, None, False),
(True, True, True),
(True, False, False),
(False, None, True),
(False, True, True),
(False, False, True), # not tty
],
)
def test_command_entry_tty(is_tty, raw_arg_is_raw, final_config_is_raw, config):
# is tty + raw -> raw
with patch("sys.stdout.isatty") as patch_tty:
patch_tty.return_value = is_tty
if raw_arg_is_raw is None:
call = ["iredis"]
elif raw_arg_is_raw is True:
call = ["iredis", "--raw"]
elif raw_arg_is_raw is False:
call = ["iredis", "--no-raw"]
else:
raise Exception()
gather_args.main(call, standalone_mode=False)
assert config.raw == final_config_is_raw
def test_disable_pager():
from iredis.config import config
gather_args.main(["iredis", "--decode", "utf-8"], standalone_mode=False)
assert config.enable_pager
gather_args.main(["iredis", "--no-pager"], standalone_mode=False)
assert not config.enable_pager
def test_command_with_decode_utf_8():
from iredis.config import config
gather_args.main(["iredis", "--decode", "utf-8"], standalone_mode=False)
assert config.decode == "utf-8"
gather_args.main(["iredis"], standalone_mode=False)
assert config.decode == ""
def test_command_with_shell_pipeline():
from iredis.config import config
gather_args.main(["iredis", "--no-shell"], standalone_mode=False)
assert config.shell is False
gather_args.main(["iredis"], standalone_mode=False)
assert config.shell is True
def test_command_shell_options_higher_priority():
from iredis.config import config
from textwrap import dedent
config_content = dedent(
"""
[main]
shell = False
"""
)
with open("/tmp/iredisrc", "w+") as etc_config:
etc_config.write(config_content)
gather_args.main(["iredis", "--iredisrc", "/tmp/iredisrc"], standalone_mode=False)
assert config.shell is False
gather_args.main(
["iredis", "--shell", "--iredisrc", "/tmp/iredisrc"], standalone_mode=False
)
assert config.shell is True
@pytest.mark.parametrize(
"url,dsn",
[
(
"redis://localhost:6379/3",
DSN(
scheme="redis",
host="localhost",
port=6379,
path=None,
db=3,
username=None,
password=None,
),
),
(
"redis://localhost:6379",
DSN(
scheme="redis",
host="localhost",
port=6379,
path=None,
db=0,
username=None,
password=None,
),
),
(
"rediss://localhost:6379",
DSN(
scheme="rediss",
host="localhost",
port=6379,
path=None,
db=0,
username=None,
password=None,
),
),
(
"redis://username:password@localhost:6379",
DSN(
scheme="redis",
host="localhost",
port=6379,
path=None,
db=0,
username="username",
password="password",
),
),
(
"redis://:password@localhost:6379",
DSN(
scheme="redis",
host="localhost",
port=6379,
path=None,
db=0,
username=None,
password="password",
),
),
(
"redis://username@localhost:12345",
DSN(
scheme="redis",
host="localhost",
port=12345,
path=None,
db=0,
username="username",
password=None,
),
),
(
# query string won't work for redis://
"redis://username@localhost:6379?db=2",
DSN(
scheme="redis",
host="localhost",
port=6379,
path=None,
db=0,
username="username",
password=None,
),
),
(
"unix://username:password2@/tmp/to/socket.sock?db=0",
DSN(
scheme="unix",
host=None,
port=None,
path="/tmp/to/socket.sock",
db=0,
username="username",
password="password2",
),
),
(
"unix://:password3@/path/to/socket.sock",
DSN(
scheme="unix",
host=None,
port=None,
path="/path/to/socket.sock",
db=0,
username=None,
password="password3",
),
),
(
"unix:///tmp/socket.sock",
DSN(
scheme="unix",
host=None,
port=None,
path="/tmp/socket.sock",
db=0,
username=None,
password=None,
),
),
],
)
def test_parse_url(url, dsn):
assert parse_url(url) == dsn
@pytest.mark.parametrize(
"command,record",
[
("set foo bar", True),
("set auth bar", True),
("auth 123", False),
("AUTH hello", False),
("AUTH hello world", False),
],
)
def test_history(command, record):
f = tempfile.TemporaryFile("w+")
history = SkipAuthFileHistory(f.name)
assert history._loaded_strings == []
history.append_string(command)
assert (command in history._loaded_strings) is record
def test_write_result_for_str(capsys):
write_result("hello")
captured = capsys.readouterr()
assert captured.out == "hello\n"
def test_write_result_for_bytes(capsys):
write_result(b"hello")
captured = capsys.readouterr()
assert captured.out == "hello\n"
def test_write_result_for_formatted_text():
ft = FormattedText([("class:keyword", "set"), ("class:string", "hello world")])
# just this test not raise means ok
write_result(ft)
def test_is_too_tall_for_formatted_text():
ft = FormattedText([("class:key", f"key-{index}\n") for index in range(21)])
assert is_too_tall(ft, 20)
assert not is_too_tall(ft, 22)
def test_is_too_tall_for_bytes():
byte_text = b"".join([b"key\n" for index in range(21)])
assert is_too_tall(byte_text, 20)
assert not is_too_tall(byte_text, 23)

View file

@ -0,0 +1,28 @@
"""
This test ensures that all of redis-doc's markdown can be rendered.
Why do we need this?
see:
https://github.com/antirez/redis-doc/commit/02b3d1a345093c1794fd86273e9d516fffd3b819
"""
import pytest
from importlib_resources import read_text
from iredis.commands import commands_summary
from iredis.data import commands as commands_data
from iredis.markdown import render
doc_files = []
for command, info in commands_summary.items():
command_docs_name = "-".join(command.split()).lower()
if info["group"] == "iredis":
continue
doc_files.append(f"{command_docs_name}.md")
@pytest.mark.parametrize("filename", doc_files)
def test_markdown_render(filename):
print(filename)
doc = read_text(commands_data, filename)
render(doc)

View file

@ -0,0 +1,486 @@
import os
import time
from prompt_toolkit.formatted_text import FormattedText
from iredis import renders
from iredis.config import config
from iredis.completers import IRedisCompleter
def strip_formatted_text(formatted_text):
return "".join(text[1] for text in formatted_text)
def test_render_simple_string_raw_using_raw_render():
assert renders.OutputRender.render_raw(b"OK") == b"OK"
assert renders.OutputRender.render_raw(b"BUMPED 1") == b"BUMPED 1"
assert renders.OutputRender.render_raw(b"STILL 1") == b"STILL 1"
def test_render_simple_string():
assert renders.OutputRender.render_simple_string(b"OK") == FormattedText(
[("class:success", "OK")]
)
assert renders.OutputRender.render_simple_string(b"BUMPED 1") == FormattedText(
[("class:success", "BUMPED 1")]
)
assert renders.OutputRender.render_simple_string(b"STILL 1") == FormattedText(
[("class:success", "STILL 1")]
)
def test_render_list_index():
raw = ["hello", "world", "foo"]
out = renders._render_list([item.encode() for item in raw], raw)
out = strip_formatted_text(out)
assert isinstance(out, str)
assert "3)" in out
assert "1)" in out
assert "4)" not in out
def test_render_list_index_const_width():
raw = ["hello"] * 100
out = renders._render_list([item.encode() for item in raw], raw)
out = strip_formatted_text(out)
assert isinstance(out, str)
assert " 1)" in out
assert "\n100)" in out
raw = ["hello"] * 1000
out = renders._render_list([item.encode() for item in raw], raw)
out = strip_formatted_text(out)
assert " 1)" in out
assert "\n 999)" in out
assert "\n1000)" in out
raw = ["hello"] * 10
out = renders._render_list([item.encode() for item in raw], raw)
out = strip_formatted_text(out)
assert " 1)" in out
assert "\n 9)" in out
assert "\n10)" in out
def test_render_list_using_raw_render():
raw = ["hello", "world", "foo"]
out = renders.OutputRender.render_raw([item.encode() for item in raw])
assert b"hello\nworld\nfoo" == out
def test_render_list_with_nil_init():
raw = [b"hello", None, b"world"]
out = renders.OutputRender.render_list(raw)
out = strip_formatted_text(out)
assert out == '1) "hello"\n2) (nil)\n3) "world"'
def test_render_list_with_nil_init_while_config_raw():
raw = [b"hello", None, b"world"]
out = renders.OutputRender.render_raw(raw)
assert out == b"hello\n\nworld"
def test_render_list_with_empty_list_raw():
raw = []
out = renders.OutputRender.render_raw(raw)
assert out == b""
def test_render_list_with_empty_list():
raw = []
out = renders.OutputRender.render_list(raw)
out = strip_formatted_text(out)
assert out == "(empty list or set)"
def test_ensure_str_bytes():
assert renders.ensure_str(b"hello world") == r"hello world"
assert renders.ensure_str(b"hello'world") == r"hello'world"
assert renders.ensure_str("你好".encode()) == r"\xe4\xbd\xa0\xe5\xa5\xbd"
def test_double_quotes():
assert renders.double_quotes('hello"world') == r'"hello\"world"'
assert renders.double_quotes('"hello\\world"') == '"\\"hello\\world\\""'
assert renders.double_quotes("'") == '"\'"'
assert renders.double_quotes("\\") == '"\\"'
assert renders.double_quotes('"') == '"\\""'
def test_render_int():
config.raw = False
assert renders.OutputRender.render_int(12) == FormattedText(
[("class:type", "(integer) "), ("", "12")]
)
def test_render_int_raw():
assert renders.OutputRender.render_raw(12) == b"12"
def test_render_list_or_string():
assert renders.OutputRender.render_list_or_string("") == '""'
assert renders.OutputRender.render_list_or_string("foo") == '"foo"'
assert renders.OutputRender.render_list_or_string(
[b"foo", b"bar"]
) == FormattedText(
[
("", "1)"),
("", " "),
("class:string", '"foo"'),
("", "\n"),
("", "2)"),
("", " "),
("class:string", '"bar"'),
]
)
def test_render_list_or_string_nil_and_empty_list():
assert renders.OutputRender.render_list_or_string(None) == FormattedText(
[("class:type", "(nil)")]
)
assert renders.OutputRender.render_list_or_string([]) == FormattedText(
[("class:type", "(empty list or set)")]
)
def test_render_raw_nil_and_empty_list():
assert renders.OutputRender.render_raw(None) == b""
assert renders.OutputRender.render_raw([]) == b""
def test_list_or_string():
config.raw = False
assert renders.OutputRender.render_string_or_int(b"10.1") == '"10.1"'
assert renders.OutputRender.render_string_or_int(3) == FormattedText(
[("class:type", "(integer) "), ("", "3")]
)
def test_command_keys():
completer = IRedisCompleter()
completer.key_completer.words = []
config.raw = False
rendered = renders.OutputRender.command_keys([b"cat", b"dog", b"banana"])
completer.update_completer_for_response("KEYS", None, [b"cat", b"dog", b"banana"])
assert rendered == FormattedText(
[
("", "1)"),
("", " "),
("class:key", '"cat"'),
("", "\n"),
("", "2)"),
("", " "),
("class:key", '"dog"'),
("", "\n"),
("", "3)"),
("", " "),
("class:key", '"banana"'),
]
)
assert completer.key_completer.words == ["banana", "dog", "cat"]
def test_command_scan():
completer = IRedisCompleter()
completer.key_completer.words = []
config.raw = False
rendered = renders.OutputRender.command_scan(
[b"44", [b"a", b"key:__rand_int__", b"dest", b" a"]]
)
completer.update_completer_for_response(
"SCAN", ("0",), [b"44", [b"a", b"key:__rand_int__", b"dest", b" a"]]
)
assert rendered == FormattedText(
[
("class:type", "(cursor) "),
("class:integer", "44"),
("", "\n"),
("", "1)"),
("", " "),
("class:key", '"a"'),
("", "\n"),
("", "2)"),
("", " "),
("class:key", '"key:__rand_int__"'),
("", "\n"),
("", "3)"),
("", " "),
("class:key", '"dest"'),
("", "\n"),
("", "4)"),
("", " "),
("class:key", '" a"'),
]
)
assert completer.key_completer.words == [" a", "dest", "key:__rand_int__", "a"]
def test_command_sscan():
completer = IRedisCompleter()
completer.member_completer.words = []
rendered = renders.OutputRender.command_sscan(
[b"44", [b"a", b"member:__rand_int__", b"dest", b" a"]]
)
completer.update_completer_for_response(
"SSCAN", (0), [b"44", [b"a", b"member:__rand_int__", b"dest", b" a"]]
)
assert rendered == FormattedText(
[
("class:type", "(cursor) "),
("class:integer", "44"),
("", "\n"),
("", "1)"),
("", " "),
("class:member", '"a"'),
("", "\n"),
("", "2)"),
("", " "),
("class:member", '"member:__rand_int__"'),
("", "\n"),
("", "3)"),
("", " "),
("class:member", '"dest"'),
("", "\n"),
("", "4)"),
("", " "),
("class:member", '" a"'),
]
)
assert completer.member_completer.words == [
" a",
"dest",
"member:__rand_int__",
"a",
]
def test_command_sscan_config_raw():
completer = IRedisCompleter()
completer.member_completer.words = []
rendered = renders.OutputRender.render_raw(
[b"44", [b"a", b"member:__rand_int__", b"dest", b" a"]]
)
completer.update_completer_for_response(
"SSCAN", (0), [b"44", [b"a", b"member:__rand_int__", b"dest", b" a"]]
)
assert rendered == b"44\na\nmember:__rand_int__\ndest\n a"
assert completer.member_completer.words == [
" a",
"dest",
"member:__rand_int__",
"a",
]
def test_render_members():
completer = IRedisCompleter()
completer.member_completer.words = []
config.withscores = True
resp = [b"duck", b"667", b"camel", b"708"]
rendered = renders.OutputRender.render_members(resp)
completer.update_completer_for_response("ZRANGE", ("foo", "0", "-1"), resp)
assert rendered == FormattedText(
[
("", "1)"),
("", " "),
("class:integer", "667 "),
("class:member", '"duck"'),
("", "\n"),
("", "2)"),
("", " "),
("class:integer", "708 "),
("class:member", '"camel"'),
]
)
assert completer.member_completer.words == ["camel", "duck"]
def test_render_members_config_raw():
completer = IRedisCompleter()
completer.member_completer.words = []
config.withscores = True
resp = [b"duck", b"667", b"camel", b"708"]
rendered = renders.OutputRender.render_raw(resp)
completer.update_completer_for_response("ZRANGE", (), resp)
assert rendered == b"duck\n667\ncamel\n708"
assert completer.member_completer.words == ["camel", "duck"]
def test_render_unixtime_config_raw():
# fake the timezone and reload
os.environ["TZ"] = "Asia/Shanghai"
time.tzset()
rendered = renders.OutputRender.render_unixtime(1570469891)
assert rendered == FormattedText(
[
("class:type", "(integer) "),
("", "1570469891"),
("", "\n"),
("class:type", "(local time)"),
("", " "),
("", "2019-10-08 01:38:11"),
]
)
def test_render_unixtime():
rendered = renders.OutputRender.render_raw(1570469891)
assert rendered == b"1570469891"
def test_bulk_string_reply():
assert renders.OutputRender.render_bulk_string(b"'\"") == '''"'\\""'''
def test_bulk_string_reply_raw():
assert renders.OutputRender.render_raw(b"hello") == b"hello"
def test_render_bulk_string_decoded():
EXPECTED_RENDER = """# Server\nredis_version:5.0.5\nredis_git_sha1:00000000\nredis_git_dirty:0\nredis_build_id:31cd6e21ec924b46""" # noqa
_input = b"# Server\r\nredis_version:5.0.5\r\nredis_git_sha1:00000000\r\nredis_git_dirty:0\r\nredis_build_id:31cd6e21ec924b46" # noqa
assert renders.OutputRender.render_bulk_string_decode(_input) == EXPECTED_RENDER
def test_render_bulk_string_decoded_with_decoded_utf8():
EXPECTED_RENDER = """# Server\nredis_version:5.0.5\nredis_git_sha1:00000000\nredis_git_dirty:0\nredis_build_id:31cd6e21ec924b46""" # noqa
_input = "# Server\r\nredis_version:5.0.5\r\nredis_git_sha1:00000000\r\nredis_git_dirty:0\r\nredis_build_id:31cd6e21ec924b46" # noqa
assert renders.OutputRender.render_bulk_string_decode(_input) == EXPECTED_RENDER
def test_render_time():
value = [b"1571305643", b"765481"]
assert renders.OutputRender.render_time(value) == FormattedText(
[
("class:type", "(unix timestamp) "),
("", "1571305643"),
("", "\n"),
("class:type", "(millisecond) "),
("", "765481"),
("", "\n"),
("class:type", "(convert to local timezone) "),
("", "2019-10-17 17:47:23.765481"),
]
)
assert renders.OutputRender.render_raw(value) == b"1571305643\n765481"
def test_render_nested_pairs():
text = [
b"peak.allocated",
10160336,
b"lua.caches",
0,
b"db.0",
[b"overhead.hashtable.main", 648, b"overhead.hashtable.expires", 32],
b"db.1",
[b"overhead.hashtable.main", 112, b"overhead.hashtable.expires", 32],
b"fragmentation",
b"0.062980629503726959",
b"fragmentation.bytes",
-9445680,
]
assert renders.OutputRender.render_raw(text) == (
b"peak.allocated\n10160336\nlua.caches\n0\ndb.0\noverhead.hashtable.main\n64"
b"8\noverhead.hashtable.expires\n32\ndb.1\noverhead.hashtable.main\n112\nove"
b"rhead.hashtable.expires\n32\nfragmentation\n0.062980629503726959\nfragmentat"
b"ion.bytes\n-9445680"
)
assert renders.OutputRender.render_nested_pair(text) == FormattedText(
[
("class:string", "peak.allocated: "),
("class:value", "10160336"),
("", "\n"),
("class:string", "lua.caches: "),
("class:value", "0"),
("", "\n"),
("class:string", "db.0: "),
("", "\n"),
("class:string", " overhead.hashtable.main: "),
("class:value", "648"),
("", "\n"),
("class:string", " overhead.hashtable.expires: "),
("class:value", "32"),
("", "\n"),
("class:string", "db.1: "),
("", "\n"),
("class:string", " overhead.hashtable.main: "),
("class:value", "112"),
("", "\n"),
("class:string", " overhead.hashtable.expires: "),
("class:value", "32"),
("", "\n"),
("class:string", "fragmentation: "),
("class:value", "0.062980629503726959"),
("", "\n"),
("class:string", "fragmentation.bytes: "),
("class:value", "-9445680"),
]
)
def test_render_nested_list():
text = [[b"get", 2, [b"readonly", b"fast"], 1, 1, 1]]
assert renders.OutputRender.render_list(text) == FormattedText(
[
("", "1)"),
("", " "),
("", "1)"),
("", " "),
("class:string", '"get"'),
("", "\n"),
("", " "),
("", "2)"),
("", " "),
("class:string", '"2"'),
("", "\n"),
("", " "),
("", "3)"),
("", " "),
("", "1)"),
("", " "),
("class:string", '"readonly"'),
("", "\n"),
("", " "),
("", "2)"),
("", " "),
("class:string", '"fast"'),
("", "\n"),
("", " "),
("", "4)"),
("", " "),
("class:string", '"1"'),
("", "\n"),
("", " "),
("", "5)"),
("", " "),
("class:string", '"1"'),
("", "\n"),
("", " "),
("", "6)"),
("", " "),
("class:string", '"1"'),
]
)
def test_render_bytes(config):
assert renders.OutputRender.render_bytes(b"bytes\n") == b"bytes"
def test_render_bytes_raw(config):
assert renders.OutputRender.render_raw(b"bytes\n") == b"bytes\n"

View file

@ -0,0 +1,122 @@
import re
import time
import pytest
from unittest.mock import patch
from iredis.utils import timer, strip_quote_args
from iredis.commands import split_command_args, split_unknown_args
from iredis.utils import command_syntax
from iredis.style import STYLE
from iredis.exceptions import InvalidArguments, AmbiguousCommand
from iredis.commands import commands_summary
from prompt_toolkit import print_formatted_text
def test_timer():
with patch("iredis.utils.logger") as mock_logger:
timer("foo")
time.sleep(0.1)
timer("bar")
mock_logger.debug.assert_called()
args, kwargs = mock_logger.debug.call_args
matched = re.match(r"\[timer (\d)\] (0\.\d+) -> bar", args[0])
assert matched.group(1) == str(3)
assert 0.1 <= float(matched.group(2)) <= 0.2
# --- test again ---
timer("foo")
time.sleep(0.2)
timer("bar")
mock_logger.debug.assert_called()
args, kwargs = mock_logger.debug.call_args
matched = re.match(r"\[timer (\d)\] (0\.\d+) -> bar", args[0])
assert matched.group(1) == str(5)
assert 0.2 <= float(matched.group(2)) <= 0.3
@pytest.mark.parametrize(
"test_input,expected",
[
("hello world", ["hello", "world"]),
("'hello world'", ["hello world"]),
('''hello"world"''', ["helloworld"]),
(r'''hello\"world"''', [r"hello\world"]),
(r'"\\"', [r"\\"]),
(r"\\", [r"\\"]),
(r"\abcd ef", [r"\abcd", "ef"]),
# quotes in quotes
(r""" 'hello"world' """, ['hello"world']),
(r""" "hello'world" """, ["hello'world"]),
(r""" 'hello\'world'""", ["hello'world"]),
(r""" "hello\"world" """, ['hello"world']),
(r"''", [""]), # set foo "" is a legal command
(r'""', [""]), # set foo "" is a legal command
(r"\\", ["\\\\"]), # blackslash are legal
("\\hello\\", ["\\hello\\"]), # blackslash are legal
],
)
def test_stipe_quote_escaple_in_quote(test_input, expected):
assert list(strip_quote_args(test_input)) == expected
@pytest.mark.parametrize(
"command,expected,args",
[
("GET a", "GET", ["a"]),
("cluster info", "cluster info", []),
("getbit foo 17", "getbit", ["foo", "17"]),
("command ", "command", []),
(" command count ", "command count", []),
(" command count ", "command count", []), # command with multi space
(" command count ' hello world'", "command count", [" hello world"]),
("set foo 'hello world'", "set", ["foo", "hello world"]),
],
)
def test_split_commands(command, expected, args):
parsed_command, parsed_args = split_command_args(command)
assert expected == parsed_command
assert args == parsed_args
def test_split_commands_fail_on_unknown_command():
with pytest.raises(InvalidArguments):
split_command_args("FOO BAR")
@pytest.mark.parametrize(
"command",
["command in", "command in", "Command in", "COMMAND in"],
)
def test_split_commands_fail_on_partially_input(command):
with pytest.raises(AmbiguousCommand):
split_command_args(command)
def test_split_commands_fail_on_unfinished_command():
with pytest.raises(InvalidArguments):
split_command_args("setn")
def test_render_bottom_with_command_json():
for command, info in commands_summary.items():
print_formatted_text(command_syntax(command, info), style=STYLE)
@pytest.mark.parametrize(
"raw,command,args",
[
("abc 123", "abc", ["123"]),
("abc", "abc", []),
("abc foo bar", "abc", ["foo", "bar"]),
("abc 'foo bar'", "abc", ["foo bar"]),
('abc "foo bar"', "abc", ["foo bar"]),
('abc "foo bar" 3 hello', "abc", ["foo bar", "3", "hello"]),
('abc "foo \nbar"', "abc", ["foo \nbar"]),
],
)
def test_split_unknown_commands(raw, command, args):
parsed_command, parsed_args = split_unknown_args(raw)
assert command == parsed_command
assert args == parsed_args