1
0
Fork 0
iredis/iredis/completers.py
Daniel Baumann a748cc7b1a
Merging upstream version 1.15.0.
Signed-off-by: Daniel Baumann <daniel@debian.org>
2025-02-09 17:25:10 +01:00

385 lines
13 KiB
Python

import logging
from typing import Iterable
from datetime import datetime, timezone
from dateutil.relativedelta import relativedelta
from prompt_toolkit.completion import (
CompleteEvent,
Completer,
Completion,
FuzzyWordCompleter,
WordCompleter,
)
from prompt_toolkit.contrib.regular_languages.completion import GrammarCompleter
from prompt_toolkit.document import Document
from .commands import split_command_args, commands_summary, all_commands
from .config import config
from .exceptions import InvalidArguments, AmbiguousCommand
from .redis_grammar import CONST, command_grammar, get_command_grammar
from .utils import strip_quote_args, ensure_str
logger = logging.getLogger(__name__)
class MostRecentlyUsedFirstWordMixin:
"""
A Mixin for WordCompleter, with a `touch()` method can make latest used
word appears first. And evict old completion word when `max_words` reached.
Not thread safe.
"""
def __init__(self, max_words, words, *args, **kwargs):
self.words = words
self.max_words = max_words
super().__init__(words, *args, **kwargs)
def touch(self, word):
"""
Make sure word is in the first place of the completer
list.
"""
if word in self.words:
self.words.remove(word)
else: # not in words
if len(self.words) == self.max_words: # full
self.words.pop()
self.words.insert(0, word)
def touch_words(self, words):
for word in words:
self.touch(word)
class MostRecentlyUsedFirstWordCompleter(
MostRecentlyUsedFirstWordMixin, FuzzyWordCompleter
):
pass
class IntegerTypeCompleter(MostRecentlyUsedFirstWordMixin, WordCompleter):
def __init__(self):
words = []
for i in range(1, 64):
words.append(f"i{i}") # signed integer, 64 bit max
words.append(f"u{i}") # unsigned integer, 63 bit max
words.append("i64")
super().__init__(len(words), list(reversed(words)))
class TimestampCompleter(Completer):
"""
Completer for timestamp based on input.
Features:
* Auto complete humanize time, like 3 -> 3 minutes ago, 3 hours ago.
* Auto guess datetime, complete by its timestamp. 2020-01-01 12:00
-> 1577851200.
The timezone is read from system.
"""
def __init__(self, is_milliseconds, future_time, *args, **kwargs):
if is_milliseconds:
self.factor = 1000
else:
self.factor = 1
self.future_time = future_time
super().__init__(*args, **kwargs)
when_lower_than = {
"year": 20,
"month": 12,
"day": 31,
"hour": 100,
"minute": 1000,
"second": 1000_000,
}
def _completion_humanize_time(self, document: Document) -> Iterable[Completion]:
text = document.text
if not text.isnumeric():
return
current = int(text)
now = datetime.now()
for unit, minimum in self.when_lower_than.items():
if current <= minimum:
if self.future_time:
dt = now + relativedelta(**{f"{unit}s": current})
offset_text = "later"
else:
dt = now - relativedelta(**{f"{unit}s": current})
offset_text = "ago"
meta = f"{text} {unit}{'s' if current > 1 else ''} {offset_text} ({dt.strftime('%Y-%m-%d %H:%M:%S')})"
yield Completion(
str(int(dt.timestamp()) * self.factor),
start_position=-len(document.text_before_cursor),
display_meta=meta,
)
def _completion_formatted_time(self, document: Document) -> Iterable[Completion]:
text = document.text
try:
dt = datetime.fromisoformat(text).replace(tzinfo=timezone.utc)
except Exception:
return
yield Completion(
str(int(dt.timestamp()) * self.factor),
start_position=-len(document.text_before_cursor),
display_meta=str(dt),
)
def get_completions(
self, document: Document, complete_event: CompleteEvent
) -> Iterable[Completion]:
completions = list(self._completion_humanize_time(document)) + list(
self._completion_formatted_time(document)
)
# here we yield bigger timestamp first.
yield from sorted(completions, key=lambda a: a.text)
class IRedisCompleter(Completer):
"""
Completer class that can dynamically returns any Completer.
:param get_completer: Callable that returns a :class:`.Completer` instance.
"""
def __init__(self, hint=False, completion_casing="upper"):
super().__init__()
self.completer_mapping = self.get_completer_mapping(hint, completion_casing)
self.current_completer = self.root_completer = GrammarCompleter(
command_grammar, self.completer_mapping
)
@property
def key_completer(self) -> MostRecentlyUsedFirstWordCompleter:
return self.completer_mapping["key"]
@property
def member_completer(self) -> MostRecentlyUsedFirstWordCompleter:
return self.completer_mapping["member"]
@property
def field_completer(self) -> MostRecentlyUsedFirstWordCompleter:
return self.completer_mapping["field"]
@property
def group_completer(self) -> MostRecentlyUsedFirstWordCompleter:
return self.completer_mapping["group"]
@property
def catetoryname_completer(self) -> MostRecentlyUsedFirstWordCompleter:
return self.completer_mapping["categoryname"]
@property
def username_completer(self) -> MostRecentlyUsedFirstWordCompleter:
return self.completer_mapping["username"]
def get_completer(self, input_text):
try:
command, _ = split_command_args(input_text)
# here will compile grammar for this command
grammar = get_command_grammar(command)
completer = GrammarCompleter(
compiled_grammar=grammar, completers=self.completer_mapping
)
except (InvalidArguments, AmbiguousCommand):
completer = self.root_completer
return completer
def get_completions(
self, document: Document, complete_event: CompleteEvent
) -> Iterable[Completion]:
input_text = document.text
self.current_completer = self.get_completer(input_text)
return self.current_completer.get_completions(document, complete_event)
def update_completer_for_input(self, command):
completer = self.get_completer(command)
grammar = completer.compiled_grammar
m = grammar.match(command)
if not m:
# invalid command!
return
variables = m.variables()
# auto update completion words, if it's LRU strategy.
for _token, _completer in self.completer_mapping.items():
if not isinstance(_completer, MostRecentlyUsedFirstWordMixin):
continue
# getall always returns a []
tokens_in_command = variables.getall(_token)
for _token_in_command in tokens_in_command:
# prompt_toolkit didn't support multi tokens
# like DEL key1 key2 key3
# so we have to split them manually
for single_token in strip_quote_args(_token_in_command):
_completer.touch(single_token)
def update_completer_for_response(self, command_name, args, response):
command_name = " ".join(command_name.split()).upper()
logger.info(
f"Try update completer using response... command_name is {command_name}"
)
if response is None:
return
response = ensure_str(response)
if command_name in ("HKEYS",):
self.field_completer.touch_words(response)
logger.debug(f"[Completer] field completer updated with {response}.")
if command_name in ("HGETALL",):
self.field_completer.touch_words(response[::2])
logger.debug(f"[Completer] field completer updated with {response[::2]}.")
if command_name in ("ZPOPMAX", "ZPOPMIN", "ZRANGE", "ZRANGE", "ZRANGEBYSCORE"):
if config.withscores:
self.member_completer.touch_words(response[::2])
logger.debug(
f"[Completer] member completer updated with {response[::2]}."
)
else:
self.member_completer.touch_words(response)
logger.debug(f"[Completer] member completer updated with {response}.")
if command_name in ("KEYS",):
self.key_completer.touch_words(response)
logger.debug(f"[Completer] key completer updated with {response}.")
if command_name in ("SCAN",):
self.key_completer.touch_words(response[1])
logger.debug(f"[Completer] key completer updated with {response[1]}.")
if command_name in ("SSCAN", "ZSCAN"):
self.member_completer.touch_words(response[1])
logger.debug(f"[Completer] member completer updated with {response[1]}.")
if command_name in ("HSCAN",):
self.field_completer.touch_words(response[1][::2])
logger.debug(
f"[Completer] field completer updated with {response[1][::2]}."
)
# only update categoryname completer when `ACL CAT` without args.
if command_name == "ACL CAT" and not args:
self.catetoryname_completer.touch_words(response)
if command_name == "ACL USERS":
self.username_completer.touch_words(response)
def _touch_members(self, items):
_step = 1
if config.withscores:
_step = 2
self.member_completer.touch_words(ensure_str(items)[::_step])
def _touch_hash_pairs(self, items):
self.field_completer.touch_words(ensure_str(items)[::2])
def _touch_keys(self, items):
self.key_completer.touch_words(ensure_str(items))
def __repr__(self) -> str:
return "DynamicCompleter({!r} -> {!r})".format(
self.get_completer,
self.current_completer,
)
def get_completer_mapping(self, hint_on, completion_casing):
completer_mapping = {}
completer_mapping.update(
{
key: WordCompleter(tokens.split(" "), ignore_case=True)
for key, tokens in CONST.items()
}
)
key_completer = MostRecentlyUsedFirstWordCompleter(config.completer_max, [])
member_completer = MostRecentlyUsedFirstWordCompleter(config.completer_max, [])
field_completer = MostRecentlyUsedFirstWordCompleter(config.completer_max, [])
group_completer = MostRecentlyUsedFirstWordCompleter(config.completer_max, [])
username_completer = MostRecentlyUsedFirstWordCompleter(
config.completer_max, []
)
categoryname_completer = MostRecentlyUsedFirstWordCompleter(100, [])
timestamp_ms_ago_completer = TimestampCompleter(
is_milliseconds=True, future_time=False
)
timestamp_ms_after_completer = TimestampCompleter(
is_milliseconds=True, future_time=True
)
timestamp_after_completer = TimestampCompleter(
is_milliseconds=False, future_time=True
)
integer_type_completer = IntegerTypeCompleter()
completer_mapping.update(
{
# all key related completers share the same completer
"keys": key_completer,
"key": key_completer,
"destination": key_completer,
"newkey": key_completer,
# member
"member": member_completer,
"members": member_completer,
# zmember
# TODO separate sorted set and set
# hash fields
"field": field_completer,
"fields": field_completer,
# stream groups
"group": group_completer,
# stream id
"stream_id": timestamp_ms_ago_completer,
"timestampms": timestamp_ms_after_completer,
"timestamp": timestamp_after_completer,
"inttype": integer_type_completer,
"categoryname": categoryname_completer,
"username": username_completer,
}
)
# command completer
if hint_on:
command_hint = {
key: info["summary"] for key, info in commands_summary.items()
}
hint = {
command: command_hint.get(command.upper()) for command in all_commands
}
hint.update(
{
command.lower(): command_hint.get(command.upper())
for command in all_commands
}
)
else:
hint = {}
upper_commands = all_commands[::-1]
lower_commands = [command.lower() for command in all_commands[::-1]]
auto_commands = upper_commands + lower_commands
ignore_case = completion_casing != "auto"
command_completions = {
"auto": auto_commands,
"upper": upper_commands,
"lower": lower_commands,
}.get(completion_casing)
completer_mapping["command"] = WordCompleter(
command_completions, ignore_case=ignore_case, sentence=True, meta_dict=hint
)
return completer_mapping