Merging upstream version 1.28.0.
Signed-off-by: Daniel Baumann <daniel@debian.org>
This commit is contained in:
parent
336c651928
commit
f6ae5d2542
16 changed files with 611 additions and 404 deletions
|
@ -181,7 +181,7 @@ $ sudo dnf install mycli
|
|||
|
||||
### Windows
|
||||
|
||||
Follow the instructions on this blogpost: https://www.codewall.co.uk/installing-using-mycli-on-windows/
|
||||
Follow the instructions on this blogpost: http://web.archive.org/web/20221006045208/https://www.codewall.co.uk/installing-using-mycli-on-windows/
|
||||
|
||||
|
||||
### Thanks:
|
||||
|
|
346
changelog.md
346
changelog.md
File diff suppressed because it is too large
Load diff
|
@ -96,6 +96,8 @@ Contributors:
|
|||
* Alfred Wingate
|
||||
* Zhanze Wang
|
||||
* Houston Wong
|
||||
* Mohamed Rezk
|
||||
* Ryosuke Kazami
|
||||
|
||||
|
||||
Created by:
|
||||
|
|
|
@ -1 +1 @@
|
|||
__version__ = '1.27.2'
|
||||
__version__ = "1.28.0"
|
||||
|
|
|
@ -3,6 +3,8 @@ from prompt_toolkit.enums import EditingMode
|
|||
from prompt_toolkit.filters import completion_is_selected, emacs_mode
|
||||
from prompt_toolkit.key_binding import KeyBindings
|
||||
|
||||
from .packages.toolkit.fzf import search_history
|
||||
|
||||
_logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
|
@ -101,6 +103,12 @@ def mycli_bindings(mycli):
|
|||
cursorpos_abs -= 1
|
||||
b.cursor_position = min(cursorpos_abs, len(b.text))
|
||||
|
||||
@kb.add('c-r', filter=emacs_mode)
|
||||
def _(event):
|
||||
"""Search history using fzf or default reverse incremental search."""
|
||||
_logger.debug('Detected <C-r> key.')
|
||||
search_history(event)
|
||||
|
||||
@kb.add('enter', filter=completion_is_selected)
|
||||
def _(event):
|
||||
"""Makes the enter key work as the tab key only when showing the menu.
|
||||
|
|
|
@ -36,7 +36,6 @@ from prompt_toolkit.formatted_text import ANSI
|
|||
from prompt_toolkit.layout.processors import (HighlightMatchingBracketProcessor,
|
||||
ConditionalProcessor)
|
||||
from prompt_toolkit.lexers import PygmentsLexer
|
||||
from prompt_toolkit.history import FileHistory
|
||||
from prompt_toolkit.auto_suggest import AutoSuggestFromHistory
|
||||
|
||||
from .packages.special.main import NO_QUERY
|
||||
|
@ -44,6 +43,7 @@ from .packages.prompt_utils import confirm, confirm_destructive_query
|
|||
from .packages.tabular_output import sql_format
|
||||
from .packages import special
|
||||
from .packages.special.favoritequeries import FavoriteQueries
|
||||
from .packages.toolkit.history import FileHistoryWithTimestamp
|
||||
from .sqlcompleter import SQLCompleter
|
||||
from .clitoolbar import create_toolbar_tokens_func
|
||||
from .clistyle import style_factory, style_factory_output
|
||||
|
@ -626,7 +626,7 @@ class MyCli(object):
|
|||
history_file = os.path.expanduser(
|
||||
os.environ.get('MYCLI_HISTFILE', '~/.mycli-history'))
|
||||
if dir_path_exists(history_file):
|
||||
history = FileHistory(history_file)
|
||||
history = FileHistoryWithTimestamp(history_file)
|
||||
else:
|
||||
history = None
|
||||
self.echo(
|
||||
|
|
|
@ -138,6 +138,8 @@ def suggest_based_on_last_token(token, text_before_cursor, full_text, identifier
|
|||
|
||||
if not token:
|
||||
return [{'type': 'keyword'}, {'type': 'special'}]
|
||||
elif token_v == "*":
|
||||
return [{'type': 'keyword'}]
|
||||
elif token_v.endswith('('):
|
||||
p = sqlparse.parse(text_before_cursor)[0]
|
||||
|
||||
|
|
|
@ -100,7 +100,7 @@ def guess_socket_location():
|
|||
for r, dirs, files in os.walk(directory, topdown=True):
|
||||
for filename in files:
|
||||
name, ext = os.path.splitext(filename)
|
||||
if name.startswith("mysql") and ext in ('.socket', '.sock'):
|
||||
if name.startswith("mysql") and name != "mysqlx" and ext in ('.socket', '.sock'):
|
||||
return os.path.join(r, filename)
|
||||
dirs[:] = [d for d in dirs if d.startswith("mysql")]
|
||||
return None
|
||||
|
|
0
mycli/packages/toolkit/__init__.py
Normal file
0
mycli/packages/toolkit/__init__.py
Normal file
45
mycli/packages/toolkit/fzf.py
Normal file
45
mycli/packages/toolkit/fzf.py
Normal file
|
@ -0,0 +1,45 @@
|
|||
from shutil import which
|
||||
|
||||
from pyfzf import FzfPrompt
|
||||
from prompt_toolkit import search
|
||||
from prompt_toolkit.key_binding.key_processor import KeyPressEvent
|
||||
|
||||
from .history import FileHistoryWithTimestamp
|
||||
|
||||
|
||||
class Fzf(FzfPrompt):
|
||||
def __init__(self):
|
||||
self.executable = which("fzf")
|
||||
if self.executable:
|
||||
super().__init__()
|
||||
|
||||
def is_available(self) -> bool:
|
||||
return self.executable is not None
|
||||
|
||||
|
||||
def search_history(event: KeyPressEvent):
|
||||
buffer = event.current_buffer
|
||||
history = buffer.history
|
||||
|
||||
fzf = Fzf()
|
||||
|
||||
if fzf.is_available() and isinstance(history, FileHistoryWithTimestamp):
|
||||
history_items_with_timestamp = history.load_history_with_timestamp()
|
||||
|
||||
formatted_history_items = []
|
||||
original_history_items = []
|
||||
for item, timestamp in history_items_with_timestamp:
|
||||
formatted_item = item.replace('\n', ' ')
|
||||
timestamp = timestamp.split(".")[0] if "." in timestamp else timestamp
|
||||
formatted_history_items.append(f"{timestamp} {formatted_item}")
|
||||
original_history_items.append(item)
|
||||
|
||||
result = fzf.prompt(formatted_history_items, fzf_options="--tiebreak=index")
|
||||
|
||||
if result:
|
||||
selected_index = formatted_history_items.index(result[0])
|
||||
buffer.text = original_history_items[selected_index]
|
||||
buffer.cursor_position = len(buffer.text)
|
||||
else:
|
||||
# Fallback to default reverse incremental search
|
||||
search.start_search(direction=search.SearchDirection.BACKWARD)
|
52
mycli/packages/toolkit/history.py
Normal file
52
mycli/packages/toolkit/history.py
Normal file
|
@ -0,0 +1,52 @@
|
|||
import os
|
||||
from typing import Iterable, Union, List, Tuple
|
||||
|
||||
from prompt_toolkit.history import FileHistory
|
||||
|
||||
_StrOrBytesPath = Union[str, bytes, os.PathLike]
|
||||
|
||||
|
||||
class FileHistoryWithTimestamp(FileHistory):
|
||||
"""
|
||||
:class:`.FileHistory` class that stores all strings in a file with timestamp.
|
||||
"""
|
||||
|
||||
def __init__(self, filename: _StrOrBytesPath) -> None:
|
||||
self.filename = filename
|
||||
super().__init__(filename)
|
||||
|
||||
def load_history_with_timestamp(self) -> List[Tuple[str, str]]:
|
||||
"""
|
||||
Load history entries along with their timestamps.
|
||||
|
||||
Returns:
|
||||
List[Tuple[str, str]]: A list of tuples where each tuple contains
|
||||
a history entry and its corresponding timestamp.
|
||||
"""
|
||||
history_with_timestamp: List[Tuple[str, str]] = []
|
||||
lines: List[str] = []
|
||||
timestamp: str = ""
|
||||
|
||||
def add() -> None:
|
||||
if lines:
|
||||
# Join and drop trailing newline.
|
||||
string = "".join(lines)[:-1]
|
||||
history_with_timestamp.append((string, timestamp))
|
||||
|
||||
if os.path.exists(self.filename):
|
||||
with open(self.filename, "rb") as f:
|
||||
for line_bytes in f:
|
||||
line = line_bytes.decode("utf-8", errors="replace")
|
||||
|
||||
if line.startswith("#"):
|
||||
# Extract timestamp
|
||||
timestamp = line[2:].strip()
|
||||
elif line.startswith("+"):
|
||||
lines.append(line[1:])
|
||||
else:
|
||||
add()
|
||||
lines = []
|
||||
|
||||
add()
|
||||
|
||||
return list(reversed(history_with_timestamp))
|
|
@ -31,7 +31,7 @@ class SQLCompleter(Completer):
|
|||
'PORT', 'PRIMARY', 'PRIVILEGES', 'PROCESSLIST', 'PURGE',
|
||||
'REFERENCES', 'REGEXP', 'RENAME', 'REPAIR', 'RESET', 'REVOKE',
|
||||
'RIGHT', 'ROLLBACK', 'ROW', 'ROWS', 'ROW_FORMAT', 'SAVEPOINT',
|
||||
'SESSION', 'SET', 'SHARE', 'SHOW', 'SLAVE', 'SMALLINT', 'SMALLINT',
|
||||
'SESSION', 'SET', 'SHARE', 'SHOW', 'SLAVE', 'SMALLINT',
|
||||
'START', 'STOP', 'TABLE', 'THEN', 'TINYINT', 'TO', 'TRANSACTION',
|
||||
'TRIGGER', 'TRUNCATE', 'UNION', 'UNIQUE', 'UNSIGNED', 'USE',
|
||||
'USER', 'USING', 'VALUES', 'VARCHAR', 'VIEW', 'WHEN', 'WITH'
|
||||
|
@ -475,8 +475,6 @@ class SQLCompleter(Completer):
|
|||
|
||||
elif suggestion['type'] == 'keyword':
|
||||
keywords = self.find_matches(word_before_cursor, self.keywords,
|
||||
start_only=True,
|
||||
fuzzy=False,
|
||||
casing=self.keyword_casing)
|
||||
completions.extend(keywords)
|
||||
|
||||
|
@ -513,8 +511,8 @@ class SQLCompleter(Completer):
|
|||
completions.extend(queries)
|
||||
elif suggestion['type'] == 'table_format':
|
||||
formats = self.find_matches(word_before_cursor,
|
||||
self.table_formats,
|
||||
start_only=True, fuzzy=False)
|
||||
self.table_formats)
|
||||
|
||||
completions.extend(formats)
|
||||
elif suggestion['type'] == 'file_name':
|
||||
file_names = self.find_files(word_before_cursor)
|
||||
|
|
|
@ -14,4 +14,4 @@ pyperclip>=1.8.1
|
|||
importlib_resources>=5.0.0
|
||||
pyaes>=1.6.1
|
||||
sqlglot>=5.1.3
|
||||
setuptools
|
||||
setuptools<=71.1.0
|
||||
|
|
3
setup.py
3
setup.py
|
@ -29,7 +29,8 @@ install_requirements = [
|
|||
'configobj >= 5.0.5',
|
||||
'cli_helpers[styles] >= 2.2.1',
|
||||
'pyperclip >= 1.8.1',
|
||||
'pyaes >= 1.6.1'
|
||||
'pyaes >= 1.6.1',
|
||||
'pyfzf >= 0.3.1',
|
||||
]
|
||||
|
||||
if sys.version_info.minor < 9:
|
||||
|
|
|
@ -89,6 +89,9 @@ keyword_casing = auto
|
|||
# disabled pager on startup
|
||||
enable_pager = True
|
||||
|
||||
# Choose a specific pager
|
||||
pager = less
|
||||
|
||||
# Custom colors for the completion menu, toolbar, etc.
|
||||
[colors]
|
||||
completion-menu.completion.current = "bg:#ffffff #000000"
|
||||
|
|
|
@ -5,17 +5,17 @@ from prompt_toolkit.document import Document
|
|||
import mycli.packages.special.main as special
|
||||
|
||||
metadata = {
|
||||
'users': ['id', 'email', 'first_name', 'last_name'],
|
||||
'orders': ['id', 'ordered_date', 'status'],
|
||||
'select': ['id', 'insert', 'ABC'],
|
||||
'réveillé': ['id', 'insert', 'ABC']
|
||||
"users": ["id", "email", "first_name", "last_name"],
|
||||
"orders": ["id", "ordered_date", "status"],
|
||||
"select": ["id", "insert", "ABC"],
|
||||
"réveillé": ["id", "insert", "ABC"],
|
||||
}
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def completer():
|
||||
|
||||
import mycli.sqlcompleter as sqlcompleter
|
||||
|
||||
comp = sqlcompleter.SQLCompleter(smart_completion=True)
|
||||
|
||||
tables, columns = [], []
|
||||
|
@ -24,10 +24,10 @@ def completer():
|
|||
tables.append((table,))
|
||||
columns.extend([(table, col) for col in cols])
|
||||
|
||||
comp.set_dbname('test')
|
||||
comp.extend_schemata('test')
|
||||
comp.extend_relations(tables, kind='tables')
|
||||
comp.extend_columns(columns, kind='tables')
|
||||
comp.set_dbname("test")
|
||||
comp.extend_schemata("test")
|
||||
comp.extend_relations(tables, kind="tables")
|
||||
comp.extend_columns(columns, kind="tables")
|
||||
comp.extend_special_commands(special.COMMANDS)
|
||||
|
||||
return comp
|
||||
|
@ -36,59 +36,85 @@ def completer():
|
|||
@pytest.fixture
|
||||
def complete_event():
|
||||
from unittest.mock import Mock
|
||||
|
||||
return Mock()
|
||||
|
||||
|
||||
def test_special_name_completion(completer, complete_event):
|
||||
text = '\\d'
|
||||
position = len('\\d')
|
||||
text = "\\d"
|
||||
position = len("\\d")
|
||||
result = completer.get_completions(
|
||||
Document(text=text, cursor_position=position),
|
||||
complete_event)
|
||||
assert result == [Completion(text='\\dt', start_position=-2)]
|
||||
Document(text=text, cursor_position=position), complete_event
|
||||
)
|
||||
assert result == [Completion(text="\\dt", start_position=-2)]
|
||||
|
||||
|
||||
def test_empty_string_completion(completer, complete_event):
|
||||
text = ''
|
||||
text = ""
|
||||
position = 0
|
||||
result = list(
|
||||
completer.get_completions(
|
||||
Document(text=text, cursor_position=position),
|
||||
complete_event))
|
||||
assert list(map(Completion, completer.keywords +
|
||||
completer.special_commands)) == result
|
||||
Document(text=text, cursor_position=position), complete_event
|
||||
)
|
||||
)
|
||||
assert (
|
||||
list(map(Completion, completer.keywords + completer.special_commands)) == result
|
||||
)
|
||||
|
||||
|
||||
def test_select_keyword_completion(completer, complete_event):
|
||||
text = 'SEL'
|
||||
position = len('SEL')
|
||||
text = "SEL"
|
||||
position = len("SEL")
|
||||
result = completer.get_completions(
|
||||
Document(text=text, cursor_position=position),
|
||||
complete_event)
|
||||
assert list(result) == list([Completion(text='SELECT', start_position=-3)])
|
||||
Document(text=text, cursor_position=position), complete_event
|
||||
)
|
||||
assert list(result) == list([Completion(text="SELECT", start_position=-3)])
|
||||
|
||||
|
||||
def test_select_star(completer, complete_event):
|
||||
text = "SELECT * "
|
||||
position = len(text)
|
||||
result = completer.get_completions(
|
||||
Document(text=text, cursor_position=position), complete_event
|
||||
)
|
||||
assert list(result) == list(map(Completion, completer.keywords))
|
||||
|
||||
|
||||
def test_table_completion(completer, complete_event):
|
||||
text = 'SELECT * FROM '
|
||||
text = "SELECT * FROM "
|
||||
position = len(text)
|
||||
result = completer.get_completions(
|
||||
Document(text=text, cursor_position=position), complete_event)
|
||||
assert list(result) == list([
|
||||
Completion(text='users', start_position=0),
|
||||
Completion(text='orders', start_position=0),
|
||||
Completion(text='`select`', start_position=0),
|
||||
Completion(text='`réveillé`', start_position=0),
|
||||
])
|
||||
Document(text=text, cursor_position=position), complete_event
|
||||
)
|
||||
assert list(result) == list(
|
||||
[
|
||||
Completion(text="users", start_position=0),
|
||||
Completion(text="orders", start_position=0),
|
||||
Completion(text="`select`", start_position=0),
|
||||
Completion(text="`réveillé`", start_position=0),
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
def test_function_name_completion(completer, complete_event):
|
||||
text = 'SELECT MA'
|
||||
position = len('SELECT MA')
|
||||
text = "SELECT MA"
|
||||
position = len("SELECT MA")
|
||||
result = completer.get_completions(
|
||||
Document(text=text, cursor_position=position), complete_event)
|
||||
assert list(result) == list([Completion(text='MAX', start_position=-2),
|
||||
Completion(text='MASTER', start_position=-2),
|
||||
])
|
||||
Document(text=text, cursor_position=position), complete_event
|
||||
)
|
||||
assert list(result) == list(
|
||||
[
|
||||
Completion(text="MAX", start_position=-2),
|
||||
Completion(text="CHANGE MASTER TO", start_position=-2),
|
||||
Completion(text="CURRENT_TIMESTAMP", start_position=-2),
|
||||
Completion(text="DECIMAL", start_position=-2),
|
||||
Completion(text="FORMAT", start_position=-2),
|
||||
Completion(text="MASTER", start_position=-2),
|
||||
Completion(text="PRIMARY", start_position=-2),
|
||||
Completion(text="ROW_FORMAT", start_position=-2),
|
||||
Completion(text="SMALLINT", start_position=-2),
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
def test_suggested_column_names(completer, complete_event):
|
||||
|
@ -99,21 +125,25 @@ def test_suggested_column_names(completer, complete_event):
|
|||
:return:
|
||||
|
||||
"""
|
||||
text = 'SELECT from users'
|
||||
position = len('SELECT ')
|
||||
result = list(completer.get_completions(
|
||||
Document(text=text, cursor_position=position),
|
||||
complete_event))
|
||||
assert result == list([
|
||||
Completion(text='*', start_position=0),
|
||||
Completion(text='id', start_position=0),
|
||||
Completion(text='email', start_position=0),
|
||||
Completion(text='first_name', start_position=0),
|
||||
Completion(text='last_name', start_position=0),
|
||||
] +
|
||||
list(map(Completion, completer.functions)) +
|
||||
[Completion(text='users', start_position=0)] +
|
||||
list(map(Completion, completer.keywords)))
|
||||
text = "SELECT from users"
|
||||
position = len("SELECT ")
|
||||
result = list(
|
||||
completer.get_completions(
|
||||
Document(text=text, cursor_position=position), complete_event
|
||||
)
|
||||
)
|
||||
assert result == list(
|
||||
[
|
||||
Completion(text="*", start_position=0),
|
||||
Completion(text="id", start_position=0),
|
||||
Completion(text="email", start_position=0),
|
||||
Completion(text="first_name", start_position=0),
|
||||
Completion(text="last_name", start_position=0),
|
||||
]
|
||||
+ list(map(Completion, completer.functions))
|
||||
+ [Completion(text="users", start_position=0)]
|
||||
+ list(map(Completion, completer.keywords))
|
||||
)
|
||||
|
||||
|
||||
def test_suggested_column_names_in_function(completer, complete_event):
|
||||
|
@ -125,17 +155,20 @@ def test_suggested_column_names_in_function(completer, complete_event):
|
|||
:return:
|
||||
|
||||
"""
|
||||
text = 'SELECT MAX( from users'
|
||||
position = len('SELECT MAX(')
|
||||
text = "SELECT MAX( from users"
|
||||
position = len("SELECT MAX(")
|
||||
result = completer.get_completions(
|
||||
Document(text=text, cursor_position=position),
|
||||
complete_event)
|
||||
assert list(result) == list([
|
||||
Completion(text='*', start_position=0),
|
||||
Completion(text='id', start_position=0),
|
||||
Completion(text='email', start_position=0),
|
||||
Completion(text='first_name', start_position=0),
|
||||
Completion(text='last_name', start_position=0)])
|
||||
Document(text=text, cursor_position=position), complete_event
|
||||
)
|
||||
assert list(result) == list(
|
||||
[
|
||||
Completion(text="*", start_position=0),
|
||||
Completion(text="id", start_position=0),
|
||||
Completion(text="email", start_position=0),
|
||||
Completion(text="first_name", start_position=0),
|
||||
Completion(text="last_name", start_position=0),
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
def test_suggested_column_names_with_table_dot(completer, complete_event):
|
||||
|
@ -146,17 +179,22 @@ def test_suggested_column_names_with_table_dot(completer, complete_event):
|
|||
:return:
|
||||
|
||||
"""
|
||||
text = 'SELECT users. from users'
|
||||
position = len('SELECT users.')
|
||||
result = list(completer.get_completions(
|
||||
Document(text=text, cursor_position=position),
|
||||
complete_event))
|
||||
assert result == list([
|
||||
Completion(text='*', start_position=0),
|
||||
Completion(text='id', start_position=0),
|
||||
Completion(text='email', start_position=0),
|
||||
Completion(text='first_name', start_position=0),
|
||||
Completion(text='last_name', start_position=0)])
|
||||
text = "SELECT users. from users"
|
||||
position = len("SELECT users.")
|
||||
result = list(
|
||||
completer.get_completions(
|
||||
Document(text=text, cursor_position=position), complete_event
|
||||
)
|
||||
)
|
||||
assert result == list(
|
||||
[
|
||||
Completion(text="*", start_position=0),
|
||||
Completion(text="id", start_position=0),
|
||||
Completion(text="email", start_position=0),
|
||||
Completion(text="first_name", start_position=0),
|
||||
Completion(text="last_name", start_position=0),
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
def test_suggested_column_names_with_alias(completer, complete_event):
|
||||
|
@ -167,17 +205,22 @@ def test_suggested_column_names_with_alias(completer, complete_event):
|
|||
:return:
|
||||
|
||||
"""
|
||||
text = 'SELECT u. from users u'
|
||||
position = len('SELECT u.')
|
||||
result = list(completer.get_completions(
|
||||
Document(text=text, cursor_position=position),
|
||||
complete_event))
|
||||
assert result == list([
|
||||
Completion(text='*', start_position=0),
|
||||
Completion(text='id', start_position=0),
|
||||
Completion(text='email', start_position=0),
|
||||
Completion(text='first_name', start_position=0),
|
||||
Completion(text='last_name', start_position=0)])
|
||||
text = "SELECT u. from users u"
|
||||
position = len("SELECT u.")
|
||||
result = list(
|
||||
completer.get_completions(
|
||||
Document(text=text, cursor_position=position), complete_event
|
||||
)
|
||||
)
|
||||
assert result == list(
|
||||
[
|
||||
Completion(text="*", start_position=0),
|
||||
Completion(text="id", start_position=0),
|
||||
Completion(text="email", start_position=0),
|
||||
Completion(text="first_name", start_position=0),
|
||||
Completion(text="last_name", start_position=0),
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
def test_suggested_multiple_column_names(completer, complete_event):
|
||||
|
@ -189,20 +232,25 @@ def test_suggested_multiple_column_names(completer, complete_event):
|
|||
:return:
|
||||
|
||||
"""
|
||||
text = 'SELECT id, from users u'
|
||||
position = len('SELECT id, ')
|
||||
result = list(completer.get_completions(
|
||||
Document(text=text, cursor_position=position),
|
||||
complete_event))
|
||||
assert result == list([
|
||||
Completion(text='*', start_position=0),
|
||||
Completion(text='id', start_position=0),
|
||||
Completion(text='email', start_position=0),
|
||||
Completion(text='first_name', start_position=0),
|
||||
Completion(text='last_name', start_position=0)] +
|
||||
list(map(Completion, completer.functions)) +
|
||||
[Completion(text='u', start_position=0)] +
|
||||
list(map(Completion, completer.keywords)))
|
||||
text = "SELECT id, from users u"
|
||||
position = len("SELECT id, ")
|
||||
result = list(
|
||||
completer.get_completions(
|
||||
Document(text=text, cursor_position=position), complete_event
|
||||
)
|
||||
)
|
||||
assert result == list(
|
||||
[
|
||||
Completion(text="*", start_position=0),
|
||||
Completion(text="id", start_position=0),
|
||||
Completion(text="email", start_position=0),
|
||||
Completion(text="first_name", start_position=0),
|
||||
Completion(text="last_name", start_position=0),
|
||||
]
|
||||
+ list(map(Completion, completer.functions))
|
||||
+ [Completion(text="u", start_position=0)]
|
||||
+ list(map(Completion, completer.keywords))
|
||||
)
|
||||
|
||||
|
||||
def test_suggested_multiple_column_names_with_alias(completer, complete_event):
|
||||
|
@ -214,17 +262,22 @@ def test_suggested_multiple_column_names_with_alias(completer, complete_event):
|
|||
:return:
|
||||
|
||||
"""
|
||||
text = 'SELECT u.id, u. from users u'
|
||||
position = len('SELECT u.id, u.')
|
||||
result = list(completer.get_completions(
|
||||
Document(text=text, cursor_position=position),
|
||||
complete_event))
|
||||
assert result == list([
|
||||
Completion(text='*', start_position=0),
|
||||
Completion(text='id', start_position=0),
|
||||
Completion(text='email', start_position=0),
|
||||
Completion(text='first_name', start_position=0),
|
||||
Completion(text='last_name', start_position=0)])
|
||||
text = "SELECT u.id, u. from users u"
|
||||
position = len("SELECT u.id, u.")
|
||||
result = list(
|
||||
completer.get_completions(
|
||||
Document(text=text, cursor_position=position), complete_event
|
||||
)
|
||||
)
|
||||
assert result == list(
|
||||
[
|
||||
Completion(text="*", start_position=0),
|
||||
Completion(text="id", start_position=0),
|
||||
Completion(text="email", start_position=0),
|
||||
Completion(text="first_name", start_position=0),
|
||||
Completion(text="last_name", start_position=0),
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
def test_suggested_multiple_column_names_with_dot(completer, complete_event):
|
||||
|
@ -236,154 +289,185 @@ def test_suggested_multiple_column_names_with_dot(completer, complete_event):
|
|||
:return:
|
||||
|
||||
"""
|
||||
text = 'SELECT users.id, users. from users u'
|
||||
position = len('SELECT users.id, users.')
|
||||
result = list(completer.get_completions(
|
||||
Document(text=text, cursor_position=position),
|
||||
complete_event))
|
||||
assert result == list([
|
||||
Completion(text='*', start_position=0),
|
||||
Completion(text='id', start_position=0),
|
||||
Completion(text='email', start_position=0),
|
||||
Completion(text='first_name', start_position=0),
|
||||
Completion(text='last_name', start_position=0)])
|
||||
text = "SELECT users.id, users. from users u"
|
||||
position = len("SELECT users.id, users.")
|
||||
result = list(
|
||||
completer.get_completions(
|
||||
Document(text=text, cursor_position=position), complete_event
|
||||
)
|
||||
)
|
||||
assert result == list(
|
||||
[
|
||||
Completion(text="*", start_position=0),
|
||||
Completion(text="id", start_position=0),
|
||||
Completion(text="email", start_position=0),
|
||||
Completion(text="first_name", start_position=0),
|
||||
Completion(text="last_name", start_position=0),
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
def test_suggested_aliases_after_on(completer, complete_event):
|
||||
text = 'SELECT u.name, o.id FROM users u JOIN orders o ON '
|
||||
position = len('SELECT u.name, o.id FROM users u JOIN orders o ON ')
|
||||
result = list(completer.get_completions(
|
||||
Document(text=text, cursor_position=position),
|
||||
complete_event))
|
||||
assert result == list([
|
||||
Completion(text='u', start_position=0),
|
||||
Completion(text='o', start_position=0),
|
||||
])
|
||||
text = "SELECT u.name, o.id FROM users u JOIN orders o ON "
|
||||
position = len("SELECT u.name, o.id FROM users u JOIN orders o ON ")
|
||||
result = list(
|
||||
completer.get_completions(
|
||||
Document(text=text, cursor_position=position), complete_event
|
||||
)
|
||||
)
|
||||
assert result == list(
|
||||
[
|
||||
Completion(text="u", start_position=0),
|
||||
Completion(text="o", start_position=0),
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
def test_suggested_aliases_after_on_right_side(completer, complete_event):
|
||||
text = 'SELECT u.name, o.id FROM users u JOIN orders o ON o.user_id = '
|
||||
position = len(
|
||||
'SELECT u.name, o.id FROM users u JOIN orders o ON o.user_id = ')
|
||||
result = list(completer.get_completions(
|
||||
Document(text=text, cursor_position=position),
|
||||
complete_event))
|
||||
assert result == list([
|
||||
Completion(text='u', start_position=0),
|
||||
Completion(text='o', start_position=0),
|
||||
])
|
||||
text = "SELECT u.name, o.id FROM users u JOIN orders o ON o.user_id = "
|
||||
position = len("SELECT u.name, o.id FROM users u JOIN orders o ON o.user_id = ")
|
||||
result = list(
|
||||
completer.get_completions(
|
||||
Document(text=text, cursor_position=position), complete_event
|
||||
)
|
||||
)
|
||||
assert result == list(
|
||||
[
|
||||
Completion(text="u", start_position=0),
|
||||
Completion(text="o", start_position=0),
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
def test_suggested_tables_after_on(completer, complete_event):
|
||||
text = 'SELECT users.name, orders.id FROM users JOIN orders ON '
|
||||
position = len('SELECT users.name, orders.id FROM users JOIN orders ON ')
|
||||
result = list(completer.get_completions(
|
||||
Document(text=text, cursor_position=position),
|
||||
complete_event))
|
||||
assert result == list([
|
||||
Completion(text='users', start_position=0),
|
||||
Completion(text='orders', start_position=0),
|
||||
])
|
||||
text = "SELECT users.name, orders.id FROM users JOIN orders ON "
|
||||
position = len("SELECT users.name, orders.id FROM users JOIN orders ON ")
|
||||
result = list(
|
||||
completer.get_completions(
|
||||
Document(text=text, cursor_position=position), complete_event
|
||||
)
|
||||
)
|
||||
assert result == list(
|
||||
[
|
||||
Completion(text="users", start_position=0),
|
||||
Completion(text="orders", start_position=0),
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
def test_suggested_tables_after_on_right_side(completer, complete_event):
|
||||
text = 'SELECT users.name, orders.id FROM users JOIN orders ON orders.user_id = '
|
||||
text = "SELECT users.name, orders.id FROM users JOIN orders ON orders.user_id = "
|
||||
position = len(
|
||||
'SELECT users.name, orders.id FROM users JOIN orders ON orders.user_id = ')
|
||||
result = list(completer.get_completions(
|
||||
Document(text=text, cursor_position=position),
|
||||
complete_event))
|
||||
assert result == list([
|
||||
Completion(text='users', start_position=0),
|
||||
Completion(text='orders', start_position=0),
|
||||
])
|
||||
"SELECT users.name, orders.id FROM users JOIN orders ON orders.user_id = "
|
||||
)
|
||||
result = list(
|
||||
completer.get_completions(
|
||||
Document(text=text, cursor_position=position), complete_event
|
||||
)
|
||||
)
|
||||
assert result == list(
|
||||
[
|
||||
Completion(text="users", start_position=0),
|
||||
Completion(text="orders", start_position=0),
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
def test_table_names_after_from(completer, complete_event):
|
||||
text = 'SELECT * FROM '
|
||||
position = len('SELECT * FROM ')
|
||||
result = list(completer.get_completions(
|
||||
Document(text=text, cursor_position=position),
|
||||
complete_event))
|
||||
assert result == list([
|
||||
Completion(text='users', start_position=0),
|
||||
Completion(text='orders', start_position=0),
|
||||
Completion(text='`select`', start_position=0),
|
||||
Completion(text='`réveillé`', start_position=0),
|
||||
])
|
||||
text = "SELECT * FROM "
|
||||
position = len("SELECT * FROM ")
|
||||
result = list(
|
||||
completer.get_completions(
|
||||
Document(text=text, cursor_position=position), complete_event
|
||||
)
|
||||
)
|
||||
assert result == list(
|
||||
[
|
||||
Completion(text="users", start_position=0),
|
||||
Completion(text="orders", start_position=0),
|
||||
Completion(text="`select`", start_position=0),
|
||||
Completion(text="`réveillé`", start_position=0),
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
def test_auto_escaped_col_names(completer, complete_event):
|
||||
text = 'SELECT from `select`'
|
||||
position = len('SELECT ')
|
||||
result = list(completer.get_completions(
|
||||
Document(text=text, cursor_position=position),
|
||||
complete_event))
|
||||
text = "SELECT from `select`"
|
||||
position = len("SELECT ")
|
||||
result = list(
|
||||
completer.get_completions(
|
||||
Document(text=text, cursor_position=position), complete_event
|
||||
)
|
||||
)
|
||||
assert result == [
|
||||
Completion(text='*', start_position=0),
|
||||
Completion(text='id', start_position=0),
|
||||
Completion(text='`insert`', start_position=0),
|
||||
Completion(text='`ABC`', start_position=0),
|
||||
] + \
|
||||
list(map(Completion, completer.functions)) + \
|
||||
[Completion(text='select', start_position=0)] + \
|
||||
list(map(Completion, completer.keywords))
|
||||
Completion(text="*", start_position=0),
|
||||
Completion(text="id", start_position=0),
|
||||
Completion(text="`insert`", start_position=0),
|
||||
Completion(text="`ABC`", start_position=0),
|
||||
] + list(map(Completion, completer.functions)) + [
|
||||
Completion(text="select", start_position=0)
|
||||
] + list(map(Completion, completer.keywords))
|
||||
|
||||
|
||||
def test_un_escaped_table_names(completer, complete_event):
|
||||
text = 'SELECT from réveillé'
|
||||
position = len('SELECT ')
|
||||
result = list(completer.get_completions(
|
||||
Document(text=text, cursor_position=position),
|
||||
complete_event))
|
||||
assert result == list([
|
||||
Completion(text='*', start_position=0),
|
||||
Completion(text='id', start_position=0),
|
||||
Completion(text='`insert`', start_position=0),
|
||||
Completion(text='`ABC`', start_position=0),
|
||||
] +
|
||||
list(map(Completion, completer.functions)) +
|
||||
[Completion(text='réveillé', start_position=0)] +
|
||||
list(map(Completion, completer.keywords)))
|
||||
text = "SELECT from réveillé"
|
||||
position = len("SELECT ")
|
||||
result = list(
|
||||
completer.get_completions(
|
||||
Document(text=text, cursor_position=position), complete_event
|
||||
)
|
||||
)
|
||||
assert result == list(
|
||||
[
|
||||
Completion(text="*", start_position=0),
|
||||
Completion(text="id", start_position=0),
|
||||
Completion(text="`insert`", start_position=0),
|
||||
Completion(text="`ABC`", start_position=0),
|
||||
]
|
||||
+ list(map(Completion, completer.functions))
|
||||
+ [Completion(text="réveillé", start_position=0)]
|
||||
+ list(map(Completion, completer.keywords))
|
||||
)
|
||||
|
||||
|
||||
def dummy_list_path(dir_name):
|
||||
dirs = {
|
||||
'/': [
|
||||
'dir1',
|
||||
'file1.sql',
|
||||
'file2.sql',
|
||||
"/": [
|
||||
"dir1",
|
||||
"file1.sql",
|
||||
"file2.sql",
|
||||
],
|
||||
'/dir1': [
|
||||
'subdir1',
|
||||
'subfile1.sql',
|
||||
'subfile2.sql',
|
||||
"/dir1": [
|
||||
"subdir1",
|
||||
"subfile1.sql",
|
||||
"subfile2.sql",
|
||||
],
|
||||
'/dir1/subdir1': [
|
||||
'lastfile.sql',
|
||||
"/dir1/subdir1": [
|
||||
"lastfile.sql",
|
||||
],
|
||||
}
|
||||
return dirs.get(dir_name, [])
|
||||
|
||||
|
||||
@patch('mycli.packages.filepaths.list_path', new=dummy_list_path)
|
||||
@pytest.mark.parametrize('text,expected', [
|
||||
@patch("mycli.packages.filepaths.list_path", new=dummy_list_path)
|
||||
@pytest.mark.parametrize(
|
||||
"text,expected",
|
||||
[
|
||||
# ('source ', [('~', 0),
|
||||
# ('/', 0),
|
||||
# ('.', 0),
|
||||
# ('..', 0)]),
|
||||
('source /', [('dir1', 0),
|
||||
('file1.sql', 0),
|
||||
('file2.sql', 0)]),
|
||||
('source /dir1/', [('subdir1', 0),
|
||||
('subfile1.sql', 0),
|
||||
('subfile2.sql', 0)]),
|
||||
('source /dir1/subdir1/', [('lastfile.sql', 0)]),
|
||||
])
|
||||
("source /", [("dir1", 0), ("file1.sql", 0), ("file2.sql", 0)]),
|
||||
("source /dir1/", [("subdir1", 0), ("subfile1.sql", 0), ("subfile2.sql", 0)]),
|
||||
("source /dir1/subdir1/", [("lastfile.sql", 0)]),
|
||||
],
|
||||
)
|
||||
def test_file_name_completion(completer, complete_event, text, expected):
|
||||
position = len(text)
|
||||
result = list(completer.get_completions(
|
||||
Document(text=text, cursor_position=position),
|
||||
complete_event))
|
||||
result = list(
|
||||
completer.get_completions(
|
||||
Document(text=text, cursor_position=position), complete_event
|
||||
)
|
||||
)
|
||||
expected = list((Completion(txt, pos) for txt, pos in expected))
|
||||
assert result == expected
|
||||
|
|
Loading…
Add table
Reference in a new issue