1
0
Fork 0

Adding upstream version 1.31.2.

Signed-off-by: Daniel Baumann <daniel@debian.org>
This commit is contained in:
Daniel Baumann 2025-05-04 09:25:34 +02:00
parent 9aaa9c7fe0
commit 5829a372a2
Signed by: daniel
GPG key ID: FBB4F0E80A80222F
23 changed files with 166 additions and 107 deletions

6
.github/dependabot.yml vendored Normal file
View file

@ -0,0 +1,6 @@
version: 2
updates:
- package-ecosystem: "github-actions"
directory: "/"
schedule:
interval: "daily"

View file

@ -15,13 +15,13 @@ jobs:
python-version: ["3.9", "3.10", "3.11", "3.12", "3.13"] python-version: ["3.9", "3.10", "3.11", "3.12", "3.13"]
steps: steps:
- uses: actions/checkout@v4 - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- uses: astral-sh/setup-uv@v1 - uses: astral-sh/setup-uv@6b9c6063abd6010835644d4c2e1bef4cf5cd0fca # v6.0.1
with: with:
version: "latest" version: "latest"
- name: Set up Python ${{ matrix.python-version }} - name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5 uses: actions/setup-python@a26af69be951a213d495a4c3e4e4022e16d87065 # v5.6.0
with: with:
python-version: ${{ matrix.python-version }} python-version: ${{ matrix.python-version }}

View file

@ -15,16 +15,15 @@ jobs:
- name: Check out Git repository - name: Check out Git repository
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
# todo
# remember to sync the ruff-check version number with pyproject.toml # remember to sync the ruff-check version number with pyproject.toml
# - name: Run ruff check - name: Run ruff check
# uses: astral-sh/ruff-action@9828f49eb4cadf267b40eaa330295c412c68c1f9 # v3.2.2 uses: astral-sh/ruff-action@c6bea5606c33b5d04902374392d9233464b90660 # v3.3.0
# with: with:
# version: 0.11.5 version: 0.11.5
# remember to sync the ruff-check version number with pyproject.toml # remember to sync the ruff-check version number with pyproject.toml
- name: Run ruff format - name: Run ruff format
uses: astral-sh/ruff-action@9828f49eb4cadf267b40eaa330295c412c68c1f9 # v3.2.2 uses: astral-sh/ruff-action@c6bea5606c33b5d04902374392d9233464b90660 # v3.3.0
with: with:
version: 0.11.5 version: 0.11.5
args: 'format --check' args: 'format --check'

View file

@ -16,13 +16,13 @@ jobs:
python-version: ["3.9", "3.10", "3.11", "3.12", "3.13"] python-version: ["3.9", "3.10", "3.11", "3.12", "3.13"]
steps: steps:
- uses: actions/checkout@v4 - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- uses: astral-sh/setup-uv@v1 - uses: astral-sh/setup-uv@6b9c6063abd6010835644d4c2e1bef4cf5cd0fca # v6.0.1
with: with:
version: "latest" version: "latest"
- name: Set up Python ${{ matrix.python-version }} - name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5 uses: actions/setup-python@a26af69be951a213d495a4c3e4e4022e16d87065 # v5.6.0
with: with:
python-version: ${{ matrix.python-version }} python-version: ${{ matrix.python-version }}
@ -46,23 +46,22 @@ jobs:
run: | run: |
uv run tox -e py${{ matrix.python-version }} uv run tox -e py${{ matrix.python-version }}
# TODO enable style checks here and in CI for PRs # arguably this should be made identical to CI for PRs
# - name: Run Style Checks
# - name: Run Style Checks run: uv run tox -e style
# run: uv run tox -e style
build: build:
runs-on: ubuntu-latest runs-on: ubuntu-latest
needs: [test] needs: [test]
steps: steps:
- uses: actions/checkout@v4 - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- uses: astral-sh/setup-uv@v1 - uses: astral-sh/setup-uv@6b9c6063abd6010835644d4c2e1bef4cf5cd0fca # v6.0.1
with: with:
version: "latest" version: "latest"
- name: Set up Python - name: Set up Python
uses: actions/setup-python@v5 uses: actions/setup-python@a26af69be951a213d495a4c3e4e4022e16d87065 # v5.6.0
with: with:
python-version: '3.13' python-version: '3.13'
@ -73,7 +72,7 @@ jobs:
run: uv build run: uv build
- name: Store the distribution packages - name: Store the distribution packages
uses: actions/upload-artifact@v4 uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # v4.6.2
with: with:
name: python-packages name: python-packages
path: dist/ path: dist/
@ -88,9 +87,9 @@ jobs:
id-token: write id-token: write
steps: steps:
- name: Download distribution packages - name: Download distribution packages
uses: actions/download-artifact@v4 uses: actions/download-artifact@d3f86a106a0bac45b974a628896c90dbdf5c8093 # v4.3.0
with: with:
name: python-packages name: python-packages
path: dist/ path: dist/
- name: Publish to PyPI - name: Publish to PyPI
uses: pypa/gh-action-pypi-publish@release/v1 uses: pypa/gh-action-pypi-publish@76f52bc884231f62b9a034ebfe128415bbaabdfc # v1.12.4

View file

@ -1,3 +1,21 @@
1.31.2 (2025/05/01)
===================
Bug Fixes
---------
* Let table-name extraction work on multi-statement inputs.
Internal
--------
* Work on passing `ruff check` linting.
* Remove backward-compatibility hacks.
* Pin more GitHub Actions and add Dependabot support.
* Enable xpassing test.
1.31.1 (2025/04/25) 1.31.1 (2025/04/25)
=================== ===================

View file

@ -35,11 +35,13 @@ def _multiline_exception(text):
text.lower().startswith("delimiter") text.lower().startswith("delimiter")
or or
# Ended with the current delimiter (usually a semi-column) # Ended with the current delimiter (usually a semi-column)
text.endswith(special.get_current_delimiter()) text.endswith((
or text.endswith("\\g") special.get_current_delimiter(),
or text.endswith("\\G") "\\g",
or text.endswith(r"\e") "\\G",
or text.endswith(r"\clip") r"\e",
r"\clip",
))
or or
# Exit doesn't need semi-column` # Exit doesn't need semi-column`
(text == "exit") (text == "exit")

View file

@ -11,11 +11,6 @@ from typing import Union, IO
from configobj import ConfigObj, ConfigObjError from configobj import ConfigObj, ConfigObjError
import pyaes import pyaes
try:
basestring
except NameError:
basestring = str
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -40,7 +35,7 @@ def read_config_file(f, list_values=True):
""" """
if isinstance(f, basestring): if isinstance(f, str):
f = os.path.expanduser(f) f = os.path.expanduser(f)
try: try:
@ -73,7 +68,7 @@ def get_included_configs(config_file: Union[str, TextIOWrapper]) -> list:
try: try:
with open(config_file) as f: with open(config_file) as f:
include_directives = filter(lambda s: s.startswith("!includedir"), f) include_directives = filter(lambda s: s.startswith("!includedir"), f)
dirs_split = map(lambda s: s.strip().split()[-1], include_directives) dirs_split = (s.strip().split()[-1] for s in include_directives)
dirs = filter(os.path.isdir, dirs_split) dirs = filter(os.path.isdir, dirs_split)
for dir_ in dirs: for dir_ in dirs:
for filename in os.listdir(dir_): for filename in os.listdir(dir_):
@ -284,7 +279,7 @@ def str_to_bool(s):
"""Convert a string value to its corresponding boolean value.""" """Convert a string value to its corresponding boolean value."""
if isinstance(s, bool): if isinstance(s, bool):
return s return s
elif not isinstance(s, basestring): elif not isinstance(s, str):
raise TypeError("argument must be a string") raise TypeError("argument must be a string")
true_values = ("true", "on", "1") true_values = ("true", "on", "1")
@ -305,7 +300,7 @@ def strip_matching_quotes(s):
values. values.
""" """
if isinstance(s, basestring) and len(s) >= 2 and s[0] == s[-1] and s[0] in ('"', "'"): if isinstance(s, str) and len(s) >= 2 and s[0] == s[-1] and s[0] in ('"', "'"):
s = s[1:-1] s = s[1:-1]
return s return s

View file

@ -1,5 +1,4 @@
from collections import defaultdict from collections import defaultdict
from io import open
import os import os
import sys import sys
import shutil import shutil
@ -7,11 +6,8 @@ import traceback
import logging import logging
import threading import threading
import re import re
import stat
from collections import namedtuple from collections import namedtuple
from pygments.lexer import combined
try: try:
from pwd import getpwuid from pwd import getpwuid
except ImportError: except ImportError:
@ -62,18 +58,10 @@ import itertools
click.disable_unicode_literals_warning = True click.disable_unicode_literals_warning = True
try: from urllib.parse import urlparse
from urlparse import urlparse from urllib.parse import unquote
from urlparse import unquote
except ImportError:
from urllib.parse import urlparse
from urllib.parse import unquote
try: from importlib import resources
import importlib.resources as resources
except ImportError:
# Python < 3.7
import importlib_resources as resources
try: try:
import paramiko import paramiko
@ -89,8 +77,6 @@ SUPPORT_INFO = "Home: http://mycli.net\nBug tracker: https://github.com/dbcli/my
class PasswordFileError(Exception): class PasswordFileError(Exception):
"""Base exception for errors related to reading password files.""" """Base exception for errors related to reading password files."""
pass
class MyCli(object): class MyCli(object):
default_prompt = "\\t \\u@\\h:\\d> " default_prompt = "\\t \\u@\\h:\\d> "
@ -1458,7 +1444,7 @@ def is_mutating(status):
if not status: if not status:
return False return False
mutating = set(["insert", "update", "delete", "alter", "create", "drop", "replace", "truncate", "load", "rename"]) mutating = {"insert", "update", "delete", "alter", "create", "drop", "replace", "truncate", "load", "rename"}
return status.split(None, 1)[0].lower() in mutating return status.split(None, 1)[0].lower() in mutating

View file

@ -129,7 +129,7 @@ def suggest_based_on_last_token(token, text_before_cursor, full_text, identifier
else: else:
token_v = token.value.lower() token_v = token.value.lower()
is_operand = lambda x: x and any([x.endswith(op) for op in ["+", "-", "*", "/"]]) # noqa: E731 is_operand = lambda x: x and any(x.endswith(op) for op in ["+", "-", "*", "/"]) # noqa: E731
if not token: if not token:
return [{"type": "keyword"}, {"type": "special"}] return [{"type": "keyword"}, {"type": "special"}]
@ -289,5 +289,5 @@ def suggest_based_on_last_token(token, text_before_cursor, full_text, identifier
return [{"type": "keyword"}] return [{"type": "keyword"}]
def identifies(id, schema, table, alias): def identifies(identifier, schema, table, alias):
return id == alias or id == table or (schema and (id == schema + "." + table)) return identifier == alias or identifier == table or (schema and (identifier == schema + "." + table))

View file

@ -16,9 +16,9 @@ class Paramiko:
print( print(
dedent(""" dedent("""
To enable certain SSH features you need to install paramiko and sshtunnel: To enable certain SSH features you need to install paramiko and sshtunnel:
pip install paramiko sshtunnel pip install paramiko sshtunnel
It is required for the following configuration options: It is required for the following configuration options:
--list-ssh-config --list-ssh-config
--ssh-config-host --ssh-config-host

View file

@ -1,4 +1,5 @@
import re import re
import sqlglot
import sqlparse import sqlparse
from sqlparse.sql import IdentifierList, Identifier, Function from sqlparse.sql import IdentifierList, Identifier, Function
from sqlparse.tokens import Keyword, DML, Punctuation from sqlparse.tokens import Keyword, DML, Punctuation
@ -166,6 +167,42 @@ def extract_tables(sql):
return list(extract_table_identifiers(stream)) return list(extract_table_identifiers(stream))
def extract_tables_from_complete_statements(sql):
"""Extract the table names from a complete and valid series of SQL
statements.
Returns a list of (schema, table, alias) tuples
"""
# sqlglot chokes entirely on things like "\T" that it doesn't know about,
# but is much better at extracting table names from complete statements.
# sqlparse can extract the series of statements, though it also doesn't
# understand "\T".
roughly_parsed = sqlparse.parse(sql)
if not roughly_parsed:
return []
finely_parsed = []
for statement in roughly_parsed:
try:
finely_parsed.append(sqlglot.parse_one(str(statement), read='mysql'))
except sqlglot.errors.ParseError:
pass
tables = []
for statement in finely_parsed:
for identifier in statement.find_all(sqlglot.exp.Table):
if identifier.parent_select.sql().startswith('WITH'):
continue
tables.append((
None if identifier.db == '' else identifier.db,
identifier.name,
None if identifier.alias == '' else identifier.alias,
))
return tables
def find_prev_keyword(sql): def find_prev_keyword(sql):
"""Find the last sql keyword in an SQL statement """Find the last sql keyword in an SQL statement

View file

@ -26,10 +26,10 @@ class DelimiterCommand(object):
return [stmt.replace(";", self._delimiter).replace(placeholder, ";") for stmt in split] return [stmt.replace(";", self._delimiter).replace(placeholder, ";") for stmt in split]
def queries_iter(self, input): def queries_iter(self, input_str):
"""Iterate over queries in the input string.""" """Iterate over queries in the input string."""
queries = self._split(input) queries = self._split(input_str)
while queries: while queries:
for sql in queries: for sql in queries:
delimiter = self._delimiter delimiter = self._delimiter

View file

@ -4,7 +4,6 @@ import locale
import logging import logging
import subprocess import subprocess
import shlex import shlex
from io import open
from time import sleep from time import sleep
import click import click
@ -547,6 +546,6 @@ def get_current_delimiter():
@export @export
def split_queries(input): def split_queries(input_str):
for query in delimiter_command.queries_iter(input): for query in delimiter_command.queries_iter(input_str):
yield query yield query

View file

@ -108,7 +108,7 @@ def show_keyword_help(cur, arg):
@special_command("exit", "\\q", "Exit.", arg_type=NO_QUERY, aliases=("\\q",)) @special_command("exit", "\\q", "Exit.", arg_type=NO_QUERY, aliases=("\\q",))
@special_command("quit", "\\q", "Quit.", arg_type=NO_QUERY) @special_command("quit", "\\q", "Quit.", arg_type=NO_QUERY)
def quit(*_args): def quit_(*_args):
raise EOFError raise EOFError

View file

@ -1,6 +1,6 @@
"""Format adapter for sql.""" """Format adapter for sql."""
from mycli.packages.parseutils import extract_tables from mycli.packages.parseutils import extract_tables_from_complete_statements
supported_formats = ( supported_formats = (
"sql-insert", "sql-insert",
@ -20,7 +20,7 @@ def escape_for_sql_statement(value):
def adapter(data, headers, table_format=None, **kwargs): def adapter(data, headers, table_format=None, **kwargs):
tables = extract_tables(formatter.query) tables = extract_tables_from_complete_statements(formatter.query)
if len(tables) > 0: if len(tables) > 0:
table = tables[0] table = tables[0]
if table[0]: if table[0]:

View file

@ -1,5 +1,5 @@
import logging import logging
from re import compile, escape import re
from collections import Counter from collections import Counter
from prompt_toolkit.completion import Completer, Completion from prompt_toolkit.completion import Completer, Completion
@ -900,7 +900,7 @@ class SQLCompleter(Completer):
self.reserved_words = set() self.reserved_words = set()
for x in self.keywords: for x in self.keywords:
self.reserved_words.update(x.split()) self.reserved_words.update(x.split())
self.name_pattern = compile(r"^[_a-z][_a-z0-9\$]*$") self.name_pattern = re.compile(r"^[_a-z][_a-z0-9\$]*$")
self.special_commands = [] self.special_commands = []
self.table_formats = supported_formats self.table_formats = supported_formats
@ -1075,8 +1075,8 @@ class SQLCompleter(Completer):
completions = [] completions = []
if fuzzy: if fuzzy:
regex = ".*?".join(map(escape, text)) regex = ".*?".join(map(re.escape, text))
pat = compile("(%s)" % regex) pat = re.compile("(%s)" % regex)
for item in collection: for item in collection:
r = pat.search(item.lower()) r = pat.search(item.lower())
if r: if r:

View file

@ -62,7 +62,7 @@ line-length = 140
[tool.ruff.lint] [tool.ruff.lint]
select = [ select = [
'A', 'A',
'I', # 'I', # todo enableme imports
'E', 'E',
'W', 'W',
'F', 'F',
@ -79,12 +79,16 @@ ignore = [
'E114', # indentation-with-invalid-multiple-comment 'E114', # indentation-with-invalid-multiple-comment
'E117', # over-indented 'E117', # over-indented
'W191', # tab-indentation 'W191', # tab-indentation
# TODO
'PIE796', # todo enableme Enum contains duplicate value
] ]
[tool.ruff.lint.isort] [tool.ruff.lint.isort]
force-sort-within-sections = true force-sort-within-sections = true
known-first-party = [ known-first-party = [
'mycli', 'mycli',
'test',
'steps',
] ]
[tool.ruff.format] [tool.ruff.format]

View file

@ -28,7 +28,7 @@ def test_select_keyword_completion(completer, complete_event):
text = "SEL" text = "SEL"
position = len("SEL") position = len("SEL")
result = list(completer.get_completions(Document(text=text, cursor_position=position), complete_event)) result = list(completer.get_completions(Document(text=text, cursor_position=position), complete_event))
assert result == list([Completion(text="SELECT", start_position=-3)]) assert result == [Completion(text="SELECT", start_position=-3)]
def test_function_name_completion(completer, complete_event): def test_function_name_completion(completer, complete_event):

View file

@ -1,6 +1,7 @@
import pytest import pytest
from mycli.packages.parseutils import ( from mycli.packages.parseutils import (
extract_tables, extract_tables,
extract_tables_from_complete_statements,
query_starts_with, query_starts_with,
queries_start_with, queries_start_with,
is_destructive, is_destructive,
@ -77,7 +78,6 @@ def test_simple_insert_single_table():
assert tables == [(None, "abc", "abc")] assert tables == [(None, "abc", "abc")]
@pytest.mark.xfail
def test_simple_insert_single_table_schema_qualified(): def test_simple_insert_single_table_schema_qualified():
tables = extract_tables('insert into abc.def (id, name) values (1, "def")') tables = extract_tables('insert into abc.def (id, name) values (1, "def")')
assert tables == [("abc", "def", None)] assert tables == [("abc", "def", None)]
@ -108,6 +108,22 @@ def test_join_as_table():
assert tables == [(None, "my_table", "m")] assert tables == [(None, "my_table", "m")]
def test_extract_tables_from_complete_statements():
tables = extract_tables_from_complete_statements("SELECT * FROM my_table AS m WHERE m.a > 5")
assert tables == [(None, "my_table", "m")]
def test_extract_tables_from_complete_statements_cte():
tables = extract_tables_from_complete_statements("WITH my_cte (id, num) AS ( SELECT id, COUNT(1) FROM my_table GROUP BY id ) SELECT *")
assert tables == [(None, "my_table", None)]
# this would confuse plain extract_tables() per #1122
def test_extract_tables_from_multiple_complete_statements():
tables = extract_tables_from_complete_statements(r'\T sql-insert; SELECT * FROM my_table AS m WHERE m.a > 5')
assert tables == [(None, "my_table", "m")]
def test_query_starts_with(): def test_query_starts_with():
query = "USE test;" query = "USE test;"
assert query_starts_with(query, ("use",)) is True assert query_starts_with(query, ("use",)) is True

View file

@ -58,7 +58,7 @@ def test_select_keyword_completion(completer, complete_event):
text = "SEL" text = "SEL"
position = len("SEL") position = len("SEL")
result = completer.get_completions(Document(text=text, cursor_position=position), complete_event) result = completer.get_completions(Document(text=text, cursor_position=position), complete_event)
assert list(result) == list([Completion(text="SELECT", start_position=-3)]) assert list(result) == [Completion(text="SELECT", start_position=-3)]
def test_select_star(completer, complete_event): def test_select_star(completer, complete_event):
@ -72,19 +72,19 @@ def test_table_completion(completer, complete_event):
text = "SELECT * FROM " text = "SELECT * FROM "
position = len(text) position = len(text)
result = completer.get_completions(Document(text=text, cursor_position=position), complete_event) result = completer.get_completions(Document(text=text, cursor_position=position), complete_event)
assert list(result) == list([ assert list(result) == [
Completion(text="users", start_position=0), Completion(text="users", start_position=0),
Completion(text="orders", start_position=0), Completion(text="orders", start_position=0),
Completion(text="`select`", start_position=0), Completion(text="`select`", start_position=0),
Completion(text="`réveillé`", start_position=0), Completion(text="`réveillé`", start_position=0),
]) ]
def test_function_name_completion(completer, complete_event): def test_function_name_completion(completer, complete_event):
text = "SELECT MA" text = "SELECT MA"
position = len("SELECT MA") position = len("SELECT MA")
result = completer.get_completions(Document(text=text, cursor_position=position), complete_event) result = completer.get_completions(Document(text=text, cursor_position=position), complete_event)
assert list(result) == list([ assert list(result) == [
Completion(text="MAX", start_position=-2), Completion(text="MAX", start_position=-2),
Completion(text="CHANGE MASTER TO", start_position=-2), Completion(text="CHANGE MASTER TO", start_position=-2),
Completion(text="CURRENT_TIMESTAMP", start_position=-2), Completion(text="CURRENT_TIMESTAMP", start_position=-2),
@ -94,7 +94,7 @@ def test_function_name_completion(completer, complete_event):
Completion(text="PRIMARY", start_position=-2), Completion(text="PRIMARY", start_position=-2),
Completion(text="ROW_FORMAT", start_position=-2), Completion(text="ROW_FORMAT", start_position=-2),
Completion(text="SMALLINT", start_position=-2), Completion(text="SMALLINT", start_position=-2),
]) ]
def test_suggested_column_names(completer, complete_event): def test_suggested_column_names(completer, complete_event):
@ -134,13 +134,13 @@ def test_suggested_column_names_in_function(completer, complete_event):
text = "SELECT MAX( from users" text = "SELECT MAX( from users"
position = len("SELECT MAX(") position = len("SELECT MAX(")
result = completer.get_completions(Document(text=text, cursor_position=position), complete_event) result = completer.get_completions(Document(text=text, cursor_position=position), complete_event)
assert list(result) == list([ assert list(result) == [
Completion(text="*", start_position=0), Completion(text="*", start_position=0),
Completion(text="id", start_position=0), Completion(text="id", start_position=0),
Completion(text="email", start_position=0), Completion(text="email", start_position=0),
Completion(text="first_name", start_position=0), Completion(text="first_name", start_position=0),
Completion(text="last_name", start_position=0), Completion(text="last_name", start_position=0),
]) ]
def test_suggested_column_names_with_table_dot(completer, complete_event): def test_suggested_column_names_with_table_dot(completer, complete_event):
@ -154,13 +154,13 @@ def test_suggested_column_names_with_table_dot(completer, complete_event):
text = "SELECT users. from users" text = "SELECT users. from users"
position = len("SELECT users.") position = len("SELECT users.")
result = list(completer.get_completions(Document(text=text, cursor_position=position), complete_event)) result = list(completer.get_completions(Document(text=text, cursor_position=position), complete_event))
assert result == list([ assert result == [
Completion(text="*", start_position=0), Completion(text="*", start_position=0),
Completion(text="id", start_position=0), Completion(text="id", start_position=0),
Completion(text="email", start_position=0), Completion(text="email", start_position=0),
Completion(text="first_name", start_position=0), Completion(text="first_name", start_position=0),
Completion(text="last_name", start_position=0), Completion(text="last_name", start_position=0),
]) ]
def test_suggested_column_names_with_alias(completer, complete_event): def test_suggested_column_names_with_alias(completer, complete_event):
@ -174,13 +174,13 @@ def test_suggested_column_names_with_alias(completer, complete_event):
text = "SELECT u. from users u" text = "SELECT u. from users u"
position = len("SELECT u.") position = len("SELECT u.")
result = list(completer.get_completions(Document(text=text, cursor_position=position), complete_event)) result = list(completer.get_completions(Document(text=text, cursor_position=position), complete_event))
assert result == list([ assert result == [
Completion(text="*", start_position=0), Completion(text="*", start_position=0),
Completion(text="id", start_position=0), Completion(text="id", start_position=0),
Completion(text="email", start_position=0), Completion(text="email", start_position=0),
Completion(text="first_name", start_position=0), Completion(text="first_name", start_position=0),
Completion(text="last_name", start_position=0), Completion(text="last_name", start_position=0),
]) ]
def test_suggested_multiple_column_names(completer, complete_event): def test_suggested_multiple_column_names(completer, complete_event):
@ -221,13 +221,13 @@ def test_suggested_multiple_column_names_with_alias(completer, complete_event):
text = "SELECT u.id, u. from users u" text = "SELECT u.id, u. from users u"
position = len("SELECT u.id, u.") position = len("SELECT u.id, u.")
result = list(completer.get_completions(Document(text=text, cursor_position=position), complete_event)) result = list(completer.get_completions(Document(text=text, cursor_position=position), complete_event))
assert result == list([ assert result == [
Completion(text="*", start_position=0), Completion(text="*", start_position=0),
Completion(text="id", start_position=0), Completion(text="id", start_position=0),
Completion(text="email", start_position=0), Completion(text="email", start_position=0),
Completion(text="first_name", start_position=0), Completion(text="first_name", start_position=0),
Completion(text="last_name", start_position=0), Completion(text="last_name", start_position=0),
]) ]
def test_suggested_multiple_column_names_with_dot(completer, complete_event): def test_suggested_multiple_column_names_with_dot(completer, complete_event):
@ -242,65 +242,65 @@ def test_suggested_multiple_column_names_with_dot(completer, complete_event):
text = "SELECT users.id, users. from users u" text = "SELECT users.id, users. from users u"
position = len("SELECT users.id, users.") position = len("SELECT users.id, users.")
result = list(completer.get_completions(Document(text=text, cursor_position=position), complete_event)) result = list(completer.get_completions(Document(text=text, cursor_position=position), complete_event))
assert result == list([ assert result == [
Completion(text="*", start_position=0), Completion(text="*", start_position=0),
Completion(text="id", start_position=0), Completion(text="id", start_position=0),
Completion(text="email", start_position=0), Completion(text="email", start_position=0),
Completion(text="first_name", start_position=0), Completion(text="first_name", start_position=0),
Completion(text="last_name", start_position=0), Completion(text="last_name", start_position=0),
]) ]
def test_suggested_aliases_after_on(completer, complete_event): def test_suggested_aliases_after_on(completer, complete_event):
text = "SELECT u.name, o.id FROM users u JOIN orders o ON " text = "SELECT u.name, o.id FROM users u JOIN orders o ON "
position = len("SELECT u.name, o.id FROM users u JOIN orders o ON ") position = len("SELECT u.name, o.id FROM users u JOIN orders o ON ")
result = list(completer.get_completions(Document(text=text, cursor_position=position), complete_event)) result = list(completer.get_completions(Document(text=text, cursor_position=position), complete_event))
assert result == list([ assert result == [
Completion(text="u", start_position=0), Completion(text="u", start_position=0),
Completion(text="o", start_position=0), Completion(text="o", start_position=0),
]) ]
def test_suggested_aliases_after_on_right_side(completer, complete_event): def test_suggested_aliases_after_on_right_side(completer, complete_event):
text = "SELECT u.name, o.id FROM users u JOIN orders o ON o.user_id = " text = "SELECT u.name, o.id FROM users u JOIN orders o ON o.user_id = "
position = len("SELECT u.name, o.id FROM users u JOIN orders o ON o.user_id = ") position = len("SELECT u.name, o.id FROM users u JOIN orders o ON o.user_id = ")
result = list(completer.get_completions(Document(text=text, cursor_position=position), complete_event)) result = list(completer.get_completions(Document(text=text, cursor_position=position), complete_event))
assert result == list([ assert result == [
Completion(text="u", start_position=0), Completion(text="u", start_position=0),
Completion(text="o", start_position=0), Completion(text="o", start_position=0),
]) ]
def test_suggested_tables_after_on(completer, complete_event): def test_suggested_tables_after_on(completer, complete_event):
text = "SELECT users.name, orders.id FROM users JOIN orders ON " text = "SELECT users.name, orders.id FROM users JOIN orders ON "
position = len("SELECT users.name, orders.id FROM users JOIN orders ON ") position = len("SELECT users.name, orders.id FROM users JOIN orders ON ")
result = list(completer.get_completions(Document(text=text, cursor_position=position), complete_event)) result = list(completer.get_completions(Document(text=text, cursor_position=position), complete_event))
assert result == list([ assert result == [
Completion(text="users", start_position=0), Completion(text="users", start_position=0),
Completion(text="orders", start_position=0), Completion(text="orders", start_position=0),
]) ]
def test_suggested_tables_after_on_right_side(completer, complete_event): def test_suggested_tables_after_on_right_side(completer, complete_event):
text = "SELECT users.name, orders.id FROM users JOIN orders ON orders.user_id = " text = "SELECT users.name, orders.id FROM users JOIN orders ON orders.user_id = "
position = len("SELECT users.name, orders.id FROM users JOIN orders ON orders.user_id = ") position = len("SELECT users.name, orders.id FROM users JOIN orders ON orders.user_id = ")
result = list(completer.get_completions(Document(text=text, cursor_position=position), complete_event)) result = list(completer.get_completions(Document(text=text, cursor_position=position), complete_event))
assert result == list([ assert result == [
Completion(text="users", start_position=0), Completion(text="users", start_position=0),
Completion(text="orders", start_position=0), Completion(text="orders", start_position=0),
]) ]
def test_table_names_after_from(completer, complete_event): def test_table_names_after_from(completer, complete_event):
text = "SELECT * FROM " text = "SELECT * FROM "
position = len("SELECT * FROM ") position = len("SELECT * FROM ")
result = list(completer.get_completions(Document(text=text, cursor_position=position), complete_event)) result = list(completer.get_completions(Document(text=text, cursor_position=position), complete_event))
assert result == list([ assert result == [
Completion(text="users", start_position=0), Completion(text="users", start_position=0),
Completion(text="orders", start_position=0), Completion(text="orders", start_position=0),
Completion(text="`select`", start_position=0), Completion(text="`select`", start_position=0),
Completion(text="`réveillé`", start_position=0), Completion(text="`réveillé`", start_position=0),
]) ]
def test_auto_escaped_col_names(completer, complete_event): def test_auto_escaped_col_names(completer, complete_event):
@ -369,5 +369,5 @@ def dummy_list_path(dir_name):
def test_file_name_completion(completer, complete_event, text, expected): def test_file_name_completion(completer, complete_event, text, expected):
position = len(text) position = len(text)
result = list(completer.get_completions(Document(text=text, cursor_position=position), complete_event)) result = list(completer.get_completions(Document(text=text, cursor_position=position), complete_event))
expected = list((Completion(txt, pos) for txt, pos in expected)) expected = [Completion(txt, pos) for txt, pos in expected]
assert result == expected assert result == expected

View file

@ -223,9 +223,7 @@ def test_watch_query_full():
expected_results = 4 expected_results = 4
ctrl_c_process = send_ctrl_c(wait_interval) ctrl_c_process = send_ctrl_c(wait_interval)
with db_connection().cursor() as cur: with db_connection().cursor() as cur:
results = list( results = list(mycli.packages.special.iocommands.watch_query(arg="{0!s} {1!s}".format(watch_seconds, query), cur=cur))
result for result in mycli.packages.special.iocommands.watch_query(arg="{0!s} {1!s}".format(watch_seconds, query), cur=cur)
)
ctrl_c_process.join(1) ctrl_c_process.join(1)
assert len(results) == expected_results assert len(results) == expected_results
for result in results: for result in results:

View file

@ -61,8 +61,8 @@ def test_table_and_columns_query(executor):
run(executor, "create table a(x text, y text)") run(executor, "create table a(x text, y text)")
run(executor, "create table b(z text)") run(executor, "create table b(z text)")
assert set(executor.tables()) == set([("a",), ("b",)]) assert set(executor.tables()) == {("a",), ("b",)}
assert set(executor.table_columns()) == set([("a", "x"), ("a", "y"), ("b", "z")]) assert set(executor.table_columns()) == {("a", "x"), ("a", "y"), ("b", "z")}
@dbtest @dbtest

View file

@ -17,5 +17,5 @@ commands = uv pip install -e .[dev,ssh]
[testenv:style] [testenv:style]
skip_install = true skip_install = true
deps = ruff deps = ruff
commands = ruff check --fix commands = ruff check
ruff format ruff format --diff