1
0
Fork 0

Adding upstream version 1.31.1.

Signed-off-by: Daniel Baumann <daniel@debian.org>
This commit is contained in:
Daniel Baumann 2025-04-29 04:23:47 +02:00
parent f38abee4bc
commit 9aaa9c7fe0
Signed by: daniel
GPG key ID: FBB4F0E80A80222F
27 changed files with 387 additions and 394 deletions

View file

@ -4,6 +4,6 @@
## Checklist
<!--- We appreciate your help and want to give you credit. Please take a moment to put an `x` in the boxes below as you complete them. -->
<!--- We appreciate your help and want to give you credit. Place an `x` in the boxes below as you complete them. -->
- [ ] I've added this contribution to the `changelog.md`.
- [ ] I've added my name to the `AUTHORS` file (or it's already there).

View file

@ -45,6 +45,3 @@ jobs:
TERM: xterm
run: |
uv run tox -e py${{ matrix.python-version }}
- name: Run Style Checks
run: uv run tox -e style

30
.github/workflows/lint.yml vendored Normal file
View file

@ -0,0 +1,30 @@
name: lint
on:
pull_request:
paths-ignore:
- '**.md'
- 'AUTHORS'
jobs:
linters:
name: Linters
runs-on: ubuntu-latest
steps:
- name: Check out Git repository
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
# todo
# remember to sync the ruff-check version number with pyproject.toml
# - name: Run ruff check
# uses: astral-sh/ruff-action@9828f49eb4cadf267b40eaa330295c412c68c1f9 # v3.2.2
# with:
# version: 0.11.5
# remember to sync the ruff-check version number with pyproject.toml
- name: Run ruff format
uses: astral-sh/ruff-action@9828f49eb4cadf267b40eaa330295c412c68c1f9 # v3.2.2
with:
version: 0.11.5
args: 'format --check'

View file

@ -46,8 +46,10 @@ jobs:
run: |
uv run tox -e py${{ matrix.python-version }}
- name: Run Style Checks
run: uv run tox -e style
# TODO enable style checks here and in CI for PRs
#
# - name: Run Style Checks
# run: uv run tox -e style
build:
runs-on: ubuntu-latest

View file

@ -19,13 +19,12 @@ You'll always get credit for your work.
$ git remote add upstream git@github.com:dbcli/mycli.git
```
4. Set up a [virtual environment](http://docs.python-guide.org/en/latest/dev/virtualenvs)
4. Set up [uv](https://docs.astral.sh/uv/getting-started/installation/)
for development:
```bash
$ cd mycli
$ pip install virtualenv
$ virtualenv mycli_dev
$ uv venv
```
We've just created a virtual environment that we'll use to install all the dependencies
@ -33,20 +32,14 @@ You'll always get credit for your work.
need to activate the virtual environment:
```bash
$ source mycli_dev/bin/activate
```
When you're done working, you can deactivate the virtual environment:
```bash
$ deactivate
$ source .venv/bin/activate
```
5. Install the dependencies and development tools:
```bash
$ pip install -r requirements-dev.txt
$ pip install --editable .
$ uv pip install -r requirements-dev.txt
$ uv pip install --editable .
```
6. Create a branch for your bugfix or feature based off the `main` branch:
@ -76,18 +69,10 @@ You'll always get credit for your work.
While you work on mycli, it's important to run the tests to make sure your code
hasn't broken any existing functionality. To run the tests, just type in:
```bash
$ ./setup.py test
```
Mycli supports Python 2.7 and 3.4+. You can test against multiple versions of
Python by running tox:
```bash
$ tox
```
### Test Database Credentials
The tests require a database connection to work. You can tell the tests which
@ -126,42 +111,6 @@ $ readlink -f $(which ex)
```
## Coding Style
Mycli requires code submissions to adhere to
[PEP 8](https://www.python.org/dev/peps/pep-0008/).
It's easy to check the style of your code, just run:
```bash
$ ./setup.py lint
```
If you see any PEP 8 style issues, you can automatically fix them by running:
```bash
$ ./setup.py lint --fix
```
Be sure to commit and push any PEP 8 fixes.
## Releasing a new version of mycli
You have been made the maintainer of `mycli`? Congratulations! We have a release script to help you:
```sh
> python release.py --help
Usage: release.py [options]
Options:
-h, --help show this help message and exit
-c, --confirm-steps Confirm every step. If the step is not confirmed, it
will be skipped.
-d, --dry-run Print out, but not actually run any steps.
```
To release a new version of the package:
* Create and merge a PR to bump the version in the changelog ([example PR](https://github.com/dbcli/mycli/pull/1043)).
* Pull `main` and bump the version number inside `mycli/__init__.py`. Do not check in - the release script will do that.
* Make sure you have the dev requirements installed: `pip install -r requirements-dev.txt -U --upgrade-strategy only-if-needed`.
* Finally, run the release script: `python release.py`.
Create a new [release](https://github.com/dbcli/mycli/releases) in Github. This will trigger a Github action which will run all the tests, build the wheel and upload it to PyPI.

View file

@ -147,13 +147,10 @@ get this running in a development setup.
https://github.com/dbcli/mycli/blob/main/CONTRIBUTING.md
Please feel free to reach out to me if you need help.
My email: amjith.r@gmail.com
## Additional Install Instructions:
Twitter: [@amjithr](http://twitter.com/amjithr)
## Detailed Install Instructions:
These are some alternative ways to install mycli that are not managed by our team but provided by OS package maintainers. These packages could be slightly out of date and take time to release the latest version.
### Arch, Manjaro
@ -202,7 +199,7 @@ Thanks to [PyMysql](https://github.com/PyMySQL/PyMySQL) for a pure python adapte
### Compatibility
Mycli is tested on macOS and Linux, and requires Python 3.7 or better.
Mycli is tested on macOS and Linux, and requires Python 3.9 or better.
**Mycli is not tested on Windows**, but the libraries used in this app are Windows-compatible.
This means it should work without any modifications. If you're unable to run it

View file

@ -1,3 +1,30 @@
1.31.1 (2025/04/25)
===================
Internal
--------
* skip style checks on Publish action
1.31.0 (NEVER RELEASED)
===================
Features
--------
* Added explicit error handle to get_password_from_file with EAFP.
* Use the "history" scheme for fzf searches.
* Deduplicate history in fzf searches.
* Add a preview window to fzf history searches.
Internal
--------
* New Project Lead: [Roland Walker](https://github.com/rolandwalker)
* Update sqlparse to <=0.6.0
* Typing/lint fixes.
1.30.0 (2025/04/19)
===================

View file

@ -1,3 +1,8 @@
Project Lead:
-------------
* Roland Walker
Core Developers:
----------------
@ -15,6 +20,7 @@ Contributors:
* Abirami P
* Adam Chainz
* Aljosha Papsch
* Allrob
* Andy Teijelo Pérez
* Angelo Lupo
* Artem Bezsmertnyi

View file

@ -12,12 +12,10 @@ def create_toolbar_tokens_func(mycli, show_fish_help):
if mycli.multi_line:
delimiter = special.get_current_delimiter()
result.append(
(
result.append((
"class:bottom-toolbar",
" ({} [{}] will end the line) ".format("Semi-colon" if delimiter == ";" else "Delimiter", delimiter),
)
)
))
if mycli.multi_line:
result.append(("class:bottom-toolbar.on", "[F3] Multiline: ON "))

View file

@ -1,4 +1,5 @@
from copy import copy
from importlib import resources
from io import BytesIO, TextIOWrapper
import logging
import os
@ -10,12 +11,6 @@ from typing import Union, IO
from configobj import ConfigObj, ConfigObjError
import pyaes
try:
import importlib.resources as resources
except ImportError:
# Python < 3.7
import importlib_resources as resources
try:
basestring
except NameError:
@ -51,11 +46,11 @@ def read_config_file(f, list_values=True):
try:
config = ConfigObj(f, interpolation=False, encoding="utf8", list_values=list_values)
except ConfigObjError as e:
log(logger, logging.WARNING, "Unable to parse line {0} of config file " "'{1}'.".format(e.line_number, f))
log(logger, logging.WARNING, "Unable to parse line {0} of config file '{1}'.".format(e.line_number, f))
log(logger, logging.WARNING, "Using successfully parsed config values.")
return e.config
except (IOError, OSError) as e:
log(logger, logging.WARNING, "You don't have permission to read " "config file '{0}'.".format(e.filename))
log(logger, logging.WARNING, "You don't have permission to read config file '{0}'.".format(e.filename))
return None
return config
@ -78,12 +73,12 @@ def get_included_configs(config_file: Union[str, TextIOWrapper]) -> list:
try:
with open(config_file) as f:
include_directives = filter(lambda s: s.startswith("!includedir"), f)
dirs = map(lambda s: s.strip().split()[-1], include_directives)
dirs = filter(os.path.isdir, dirs)
for dir in dirs:
for filename in os.listdir(dir):
dirs_split = map(lambda s: s.strip().split()[-1], include_directives)
dirs = filter(os.path.isdir, dirs_split)
for dir_ in dirs:
for filename in os.listdir(dir_):
if filename.endswith(".cnf"):
included_configs.append(os.path.join(dir, filename))
included_configs.append(os.path.join(dir_, filename))
except (PermissionError, UnicodeDecodeError):
pass
return included_configs

View file

@ -83,7 +83,13 @@ except ImportError:
# Query tuples are used for maintaining history
Query = namedtuple("Query", ["query", "successful", "mutating"])
SUPPORT_INFO = "Home: http://mycli.net\n" "Bug tracker: https://github.com/dbcli/mycli/issues"
SUPPORT_INFO = "Home: http://mycli.net\nBug tracker: https://github.com/dbcli/mycli/issues"
class PasswordFileError(Exception):
"""Base exception for errors related to reading password files."""
pass
class MyCli(object):
@ -101,10 +107,7 @@ class MyCli(object):
]
# check XDG_CONFIG_HOME exists and not an empty string
if os.environ.get("XDG_CONFIG_HOME"):
xdg_config_home = os.environ.get("XDG_CONFIG_HOME")
else:
xdg_config_home = "~/.config"
xdg_config_home = os.environ.get("XDG_CONFIG_HOME", "~/.config")
system_config_files = ["/etc/myclirc", os.path.join(os.path.expanduser(xdg_config_home), "mycli", "myclirc")]
pwd_config_file = os.path.join(os.getcwd(), ".myclirc")
@ -253,7 +256,7 @@ class MyCli(object):
arg = re.sub(r"``", r"`", arg)
self.sqlexecute.change_db(arg)
yield (None, None, None, 'You are now connected to database "%s" as ' 'user "%s"' % (self.sqlexecute.dbname, self.sqlexecute.user))
yield (None, None, None, 'You are now connected to database "%s" as user "%s"' % (self.sqlexecute.dbname, self.sqlexecute.user))
def execute_from_file(self, arg, **_):
if not arg:
@ -305,7 +308,7 @@ class MyCli(object):
self.echo('Error: Unable to open the log file "{}".'.format(log_file), err=True, fg="red")
return
formatter = logging.Formatter("%(asctime)s (%(process)d/%(threadName)s) " "%(name)s %(levelname)s - %(message)s")
formatter = logging.Formatter("%(asctime)s (%(process)d/%(threadName)s) %(name)s %(levelname)s - %(message)s")
handler.setFormatter(formatter)
@ -536,14 +539,19 @@ class MyCli(object):
sys.exit(1)
def get_password_from_file(self, password_file):
password_from_file = None
if password_file:
if (os.path.isfile(password_file) or stat.S_ISFIFO(os.stat(password_file).st_mode)) and os.access(password_file, os.R_OK):
try:
with open(password_file) as fp:
password_from_file = fp.readline()
password_from_file = password_from_file.rstrip().lstrip()
return password_from_file
password = fp.readline().strip()
return password
except FileNotFoundError:
raise PasswordFileError(f"Password file '{password_file}' not found") from None
except PermissionError:
raise PasswordFileError(f"Permission denied reading password file '{password_file}'") from None
except IsADirectoryError:
raise PasswordFileError(f"Path '{password_file}' is a directory, not a file") from None
except Exception as e:
raise PasswordFileError(f"Error reading password file '{password_file}': {str(e)}") from None
def handle_editor_command(self, text):
r"""Editor command is any query that is prefixed or suffixed by a '\e'.
@ -635,7 +643,7 @@ class MyCli(object):
else:
history = None
self.echo(
'Error: Unable to open the history file "{}". ' "Your query history will not be saved.".format(history_file),
'Error: Unable to open the history file "{}". Your query history will not be saved.'.format(history_file),
err=True,
fg="red",
)
@ -1105,7 +1113,7 @@ class MyCli(object):
@click.command()
@click.option("-h", "--host", envvar="MYSQL_HOST", help="Host address of the database.")
@click.option("-P", "--port", envvar="MYSQL_TCP_PORT", type=int, help="Port number to use for connection. Honors " "$MYSQL_TCP_PORT.")
@click.option("-P", "--port", envvar="MYSQL_TCP_PORT", type=int, help="Port number to use for connection. Honors $MYSQL_TCP_PORT.")
@click.option("-u", "--user", help="User name to connect to the database.")
@click.option("-S", "--socket", envvar="MYSQL_UNIX_PORT", help="The socket file to use for connection.")
@click.option("-p", "--password", "password", envvar="MYSQL_PWD", type=str, help="Password to connect to the database.")
@ -1131,7 +1139,7 @@ class MyCli(object):
@click.option(
"--ssl-verify-server-cert",
is_flag=True,
help=('Verify server\'s "Common Name" in its cert against ' "hostname used when connecting. This option is disabled " "by default."),
help=('Verify server\'s "Common Name" in its cert against hostname used when connecting. This option is disabled by default.'),
)
# as of 2016-02-15 revocation list is not supported by underling PyMySQL
# library (--ssl-crl and --ssl-crlpath options in vanilla mysql client)
@ -1229,7 +1237,7 @@ def cli(
try:
alias_dsn = mycli.config["alias_dsn"]
except KeyError:
click.secho("Invalid DSNs found in the config file. " 'Please check the "[alias_dsn]" section in myclirc.', err=True, fg="red")
click.secho("Invalid DSNs found in the config file. Please check the \"[alias_dsn]\" section in myclirc.", err=True, fg="red")
sys.exit(1)
except Exception as e:
click.secho(str(e), err=True, fg="red")
@ -1285,7 +1293,7 @@ def cli(
dsn_uri = mycli.config["alias_dsn"][dsn]
except KeyError:
click.secho(
"Could not find the specified DSN in the config file. " 'Please check the "[alias_dsn]" section in your ' "myclirc.",
"Could not find the specified DSN in the config file. Please check the \"[alias_dsn]\" section in your myclirc.",
err=True,
fg="red",
)
@ -1362,7 +1370,7 @@ def cli(
if combined_init_cmd:
click.echo("Executing init-command: %s" % combined_init_cmd, err=True)
mycli.logger.debug("Launch Params: \n" "\tdatabase: %r" "\tuser: %r" "\thost: %r" "\tport: %r", database, user, host, port)
mycli.logger.debug("Launch Params: \n\tdatabase: %r\tuser: %r\thost: %r\tport: %r", database, user, host, port)
# --execute argument
if execute:

View file

@ -4,11 +4,11 @@ import platform
if os.name == "posix":
if platform.system() == "Darwin":
DEFAULT_SOCKET_DIRS = ("/tmp",)
DEFAULT_SOCKET_DIRS = ["/tmp"]
else:
DEFAULT_SOCKET_DIRS = ("/var/run", "/var/lib")
DEFAULT_SOCKET_DIRS = ["/var/run", "/var/lib"]
else:
DEFAULT_SOCKET_DIRS = ()
DEFAULT_SOCKET_DIRS = []
def list_path(root_dir):

View file

@ -32,7 +32,7 @@ def confirm_destructive_query(queries):
* False if the query is destructive and the user doesn't want to proceed.
"""
prompt_text = "You're about to run a destructive command.\n" "Do you want to proceed? (y/n)"
prompt_text = "You're about to run a destructive command.\nDo you want to proceed? (y/n)"
if is_destructive(queries) and sys.stdin.isatty():
return prompt(prompt_text, type=BOOLEAN_TYPE)

View file

@ -116,7 +116,7 @@ def status(cur, **_):
output.append(("Connection:", host_info))
query = "SELECT @@character_set_server, @@character_set_database, " "@@character_set_client, @@character_set_connection LIMIT 1;"
query = "SELECT @@character_set_server, @@character_set_database, @@character_set_client, @@character_set_connection LIMIT 1;"
log.debug(query)
cur.execute(query)
charset = cur.fetchone()

View file

@ -98,15 +98,18 @@ def set_expanded_output(val):
def is_expanded_output():
return use_expanded_output
@export
def set_forced_horizontal_output(val):
global force_horizontal_output
force_horizontal_output = val
@export
def forced_horizontal():
return force_horizontal_output
_logger = logging.getLogger(__name__)

View file

@ -1,3 +1,4 @@
import re
from shutil import which
from pyfzf import FzfPrompt
@ -28,13 +29,20 @@ def search_history(event: KeyPressEvent):
formatted_history_items = []
original_history_items = []
seen = {}
for item, timestamp in history_items_with_timestamp:
formatted_item = item.replace("\n", " ")
formatted_item = re.sub(r'\s+', ' ', item)
timestamp = timestamp.split(".")[0] if "." in timestamp else timestamp
if formatted_item in seen:
continue
seen[formatted_item] = True
formatted_history_items.append(f"{timestamp} {formatted_item}")
original_history_items.append(item)
result = fzf.prompt(formatted_history_items, fzf_options="--tiebreak=index")
result = fzf.prompt(
formatted_history_items,
fzf_options="--scheme=history --tiebreak=index --preview-window=down:wrap --preview=\"printf '%s' {}\"",
)
if result:
selected_index = formatted_history_items.index(result[0])

View file

@ -195,14 +195,12 @@ class SQLExecute(object):
init_command,
)
conv = conversions.copy()
conv.update(
{
conv.update({
FIELD_TYPE.TIMESTAMP: lambda obj: (convert_datetime(obj) or obj),
FIELD_TYPE.DATETIME: lambda obj: (convert_datetime(obj) or obj),
FIELD_TYPE.TIME: lambda obj: (convert_timedelta(obj) or obj),
FIELD_TYPE.DATE: lambda obj: (convert_date(obj) or obj),
}
)
})
defer_connect = False

View file

@ -3,7 +3,7 @@ name = "mycli"
dynamic = ["version"]
description = "CLI for MySQL Database. With auto-completion and syntax highlighting."
readme = "README.md"
requires-python = ">=3.7"
requires-python = ">=3.9"
license = { text = "BSD" }
authors = [{ name = "Mycli Core Team", email = "mycli-dev@googlegroups.com" }]
urls = { homepage = "http://mycli.net" }
@ -14,7 +14,7 @@ dependencies = [
"Pygments>=1.6",
"prompt_toolkit>=3.0.6,<4.0.0",
"PyMySQL >= 0.9.2",
"sqlparse>=0.3.0,<0.5.0",
"sqlparse>=0.3.0,<0.6.0",
"sqlglot>=5.1.3",
"configobj >= 5.0.5",
"cli_helpers[styles] >= 2.2.1",
@ -56,4 +56,41 @@ mycli = ["myclirc", "AUTHORS", "SPONSORS"]
include = ["mycli*"]
[tool.ruff]
target-version = 'py39'
line-length = 140
[tool.ruff.lint]
select = [
'A',
'I',
'E',
'W',
'F',
'C4',
'PIE',
'TID',
]
ignore = [
'E401', # Multiple imports on one line
'E402', # Module level import not at top of file
'PIE808', # range() starting with 0
# https://docs.astral.sh/ruff/formatter/#conflicting-lint-rules
'E111', # indentation-with-invalid-multiple
'E114', # indentation-with-invalid-multiple-comment
'E117', # over-indented
'W191', # tab-indentation
]
[tool.ruff.lint.isort]
force-sort-within-sections = true
known-first-party = [
'mycli',
]
[tool.ruff.format]
preview = true
quote-style = 'preserve'
exclude = [
'build',
'mycli_dev',
]

View file

@ -65,7 +65,7 @@ def before_all(context):
_, my_cnf = mkstemp()
with open(my_cnf, "w") as f:
f.write(
"[client]\n" "pager={0} {1} {2}\n".format(
"[client]\npager={0} {1} {2}\n".format(
sys.executable, os.path.join(context.package_root, "test/features/wrappager.py"), context.conf["pager_boundary"]
)
)

View file

@ -41,7 +41,7 @@ def step_see_small_results(context):
@then("we see large results in vertical format")
def step_see_large_results(context):
rows = ["{n:3}| {n}".format(n=str(n)) for n in range(1, 50)]
expected = "***************************[ 1. row ]" "***************************\r\n" + "{}\r\n".format("\r\n".join(rows) + "\r\n")
expected = "***************************[ 1. row ]***************************\r\n" + "{}\r\n".format("\r\n".join(rows) + "\r\n")
wrappers.expect_pager(context, expected, timeout=10)
wrappers.expect_exact(context, "1 row in set", timeout=2)

View file

@ -32,7 +32,7 @@ def status_contains(context, expression):
@when("we create my.cnf file")
def step_create_my_cnf_file(context):
my_cnf = "[client]\n" f"host = {HOST}\n" f"port = {PORT}\n" f"user = {USER}\n" f"password = {PASSWORD}\n"
my_cnf = f"[client]\nhost = {HOST}\nport = {PORT}\nuser = {USER}\npassword = {PASSWORD}\n"
with open(MY_CNF_PATH, "w") as f:
f.write(my_cnf)
@ -40,7 +40,7 @@ def step_create_my_cnf_file(context):
@when("we create mylogin.cnf file")
def step_create_mylogin_cnf_file(context):
os.environ.pop("MYSQL_TEST_LOGIN_FILE", None)
mylogin_cnf = f"[{TEST_LOGIN_PATH}]\n" f"host = {HOST}\n" f"port = {PORT}\n" f"user = {USER}\n" f"password = {PASSWORD}\n"
mylogin_cnf = f"[{TEST_LOGIN_PATH}]\nhost = {HOST}\nport = {PORT}\nuser = {USER}\npassword = {PASSWORD}\n"
with open(MYLOGIN_CNF_PATH, "wb") as f:
input_file = io.StringIO(mylogin_cnf)
f.write(encrypt_mylogin_cnf(input_file).read())

View file

@ -81,9 +81,7 @@ def run_cli(context, run_args=None, exclude_args=None):
try:
cli_cmd = context.conf["cli_command"]
except KeyError:
cli_cmd = ('{0!s} -c "' "import coverage ; " "coverage.process_startup(); " "import mycli.main; " "mycli.main.cli()" '"').format(
sys.executable
)
cli_cmd = ('{0!s} -c "import coverage ; coverage.process_startup(); import mycli.main; mycli.main.cli()"').format(sys.executable)
cmd_parts = [cli_cmd] + rendered_args
cmd = " ".join(cmd_parts)

View file

@ -9,26 +9,22 @@ def sorted_dicts(dicts):
def test_select_suggests_cols_with_visible_table_scope():
suggestions = suggest_type("SELECT FROM tabl", "SELECT ")
assert sorted_dicts(suggestions) == sorted_dicts(
[
assert sorted_dicts(suggestions) == sorted_dicts([
{"type": "alias", "aliases": ["tabl"]},
{"type": "column", "tables": [(None, "tabl", None)]},
{"type": "function", "schema": []},
{"type": "keyword"},
]
)
])
def test_select_suggests_cols_with_qualified_table_scope():
suggestions = suggest_type("SELECT FROM sch.tabl", "SELECT ")
assert sorted_dicts(suggestions) == sorted_dicts(
[
assert sorted_dicts(suggestions) == sorted_dicts([
{"type": "alias", "aliases": ["tabl"]},
{"type": "column", "tables": [("sch", "tabl", None)]},
{"type": "function", "schema": []},
{"type": "keyword"},
]
)
])
@pytest.mark.parametrize(
@ -48,14 +44,12 @@ def test_select_suggests_cols_with_qualified_table_scope():
)
def test_where_suggests_columns_functions(expression):
suggestions = suggest_type(expression, expression)
assert sorted_dicts(suggestions) == sorted_dicts(
[
assert sorted_dicts(suggestions) == sorted_dicts([
{"type": "alias", "aliases": ["tabl"]},
{"type": "column", "tables": [(None, "tabl", None)]},
{"type": "function", "schema": []},
{"type": "keyword"},
]
)
])
@pytest.mark.parametrize(
@ -67,27 +61,23 @@ def test_where_suggests_columns_functions(expression):
)
def test_where_in_suggests_columns(expression):
suggestions = suggest_type(expression, expression)
assert sorted_dicts(suggestions) == sorted_dicts(
[
assert sorted_dicts(suggestions) == sorted_dicts([
{"type": "alias", "aliases": ["tabl"]},
{"type": "column", "tables": [(None, "tabl", None)]},
{"type": "function", "schema": []},
{"type": "keyword"},
]
)
])
def test_where_equals_any_suggests_columns_or_keywords():
text = "SELECT * FROM tabl WHERE foo = ANY("
suggestions = suggest_type(text, text)
assert sorted_dicts(suggestions) == sorted_dicts(
[
assert sorted_dicts(suggestions) == sorted_dicts([
{"type": "alias", "aliases": ["tabl"]},
{"type": "column", "tables": [(None, "tabl", None)]},
{"type": "function", "schema": []},
{"type": "keyword"},
]
)
])
def test_lparen_suggests_cols():
@ -107,14 +97,12 @@ def test_operand_inside_function_suggests_cols2():
def test_select_suggests_cols_and_funcs():
suggestions = suggest_type("SELECT ", "SELECT ")
assert sorted_dicts(suggestions) == sorted_dicts(
[
assert sorted_dicts(suggestions) == sorted_dicts([
{"type": "alias", "aliases": []},
{"type": "column", "tables": []},
{"type": "function", "schema": []},
{"type": "keyword"},
]
)
])
@pytest.mark.parametrize(
@ -170,14 +158,12 @@ def test_distinct_suggests_cols():
def test_col_comma_suggests_cols():
suggestions = suggest_type("SELECT a, b, FROM tbl", "SELECT a, b,")
assert sorted_dicts(suggestions) == sorted_dicts(
[
assert sorted_dicts(suggestions) == sorted_dicts([
{"type": "alias", "aliases": ["tbl"]},
{"type": "column", "tables": [(None, "tbl", None)]},
{"type": "function", "schema": []},
{"type": "keyword"},
]
)
])
def test_table_comma_suggests_tables_and_schemas():
@ -207,50 +193,42 @@ def test_insert_into_lparen_comma_suggests_cols():
def test_partially_typed_col_name_suggests_col_names():
suggestions = suggest_type("SELECT * FROM tabl WHERE col_n", "SELECT * FROM tabl WHERE col_n")
assert sorted_dicts(suggestions) == sorted_dicts(
[
assert sorted_dicts(suggestions) == sorted_dicts([
{"type": "alias", "aliases": ["tabl"]},
{"type": "column", "tables": [(None, "tabl", None)]},
{"type": "function", "schema": []},
{"type": "keyword"},
]
)
])
def test_dot_suggests_cols_of_a_table_or_schema_qualified_table():
suggestions = suggest_type("SELECT tabl. FROM tabl", "SELECT tabl.")
assert sorted_dicts(suggestions) == sorted_dicts(
[
assert sorted_dicts(suggestions) == sorted_dicts([
{"type": "column", "tables": [(None, "tabl", None)]},
{"type": "table", "schema": "tabl"},
{"type": "view", "schema": "tabl"},
{"type": "function", "schema": "tabl"},
]
)
])
def test_dot_suggests_cols_of_an_alias():
suggestions = suggest_type("SELECT t1. FROM tabl1 t1, tabl2 t2", "SELECT t1.")
assert sorted_dicts(suggestions) == sorted_dicts(
[
assert sorted_dicts(suggestions) == sorted_dicts([
{"type": "table", "schema": "t1"},
{"type": "view", "schema": "t1"},
{"type": "column", "tables": [(None, "tabl1", "t1")]},
{"type": "function", "schema": "t1"},
]
)
])
def test_dot_col_comma_suggests_cols_or_schema_qualified_table():
suggestions = suggest_type("SELECT t1.a, t2. FROM tabl1 t1, tabl2 t2", "SELECT t1.a, t2.")
assert sorted_dicts(suggestions) == sorted_dicts(
[
assert sorted_dicts(suggestions) == sorted_dicts([
{"type": "column", "tables": [(None, "tabl2", "t2")]},
{"type": "table", "schema": "t2"},
{"type": "view", "schema": "t2"},
{"type": "function", "schema": "t2"},
]
)
])
@pytest.mark.parametrize(
@ -306,34 +284,31 @@ def test_sub_select_table_name_completion(expression):
def test_sub_select_col_name_completion():
suggestions = suggest_type("SELECT * FROM (SELECT FROM abc", "SELECT * FROM (SELECT ")
assert sorted_dicts(suggestions) == sorted_dicts(
[
assert sorted_dicts(suggestions) == sorted_dicts([
{"type": "alias", "aliases": ["abc"]},
{"type": "column", "tables": [(None, "abc", None)]},
{"type": "function", "schema": []},
{"type": "keyword"},
]
)
])
@pytest.mark.xfail
def test_sub_select_multiple_col_name_completion():
suggestions = suggest_type("SELECT * FROM (SELECT a, FROM abc", "SELECT * FROM (SELECT a, ")
assert sorted_dicts(suggestions) == sorted_dicts(
[{"type": "column", "tables": [(None, "abc", None)]}, {"type": "function", "schema": []}]
)
assert sorted_dicts(suggestions) == sorted_dicts([
{"type": "column", "tables": [(None, "abc", None)]},
{"type": "function", "schema": []},
])
def test_sub_select_dot_col_name_completion():
suggestions = suggest_type("SELECT * FROM (SELECT t. FROM tabl t", "SELECT * FROM (SELECT t.")
assert sorted_dicts(suggestions) == sorted_dicts(
[
assert sorted_dicts(suggestions) == sorted_dicts([
{"type": "column", "tables": [(None, "tabl", "t")]},
{"type": "table", "schema": "t"},
{"type": "view", "schema": "t"},
{"type": "function", "schema": "t"},
]
)
])
@pytest.mark.parametrize("join_type", ["", "INNER", "LEFT", "RIGHT OUTER"])
@ -353,14 +328,12 @@ def test_join_suggests_tables_and_schemas(tbl_alias, join_type):
)
def test_join_alias_dot_suggests_cols1(sql):
suggestions = suggest_type(sql, sql)
assert sorted_dicts(suggestions) == sorted_dicts(
[
assert sorted_dicts(suggestions) == sorted_dicts([
{"type": "column", "tables": [(None, "abc", "a")]},
{"type": "table", "schema": "a"},
{"type": "view", "schema": "a"},
{"type": "function", "schema": "a"},
]
)
])
@pytest.mark.parametrize(
@ -372,14 +345,12 @@ def test_join_alias_dot_suggests_cols1(sql):
)
def test_join_alias_dot_suggests_cols2(sql):
suggestions = suggest_type(sql, sql)
assert sorted_dicts(suggestions) == sorted_dicts(
[
assert sorted_dicts(suggestions) == sorted_dicts([
{"type": "column", "tables": [(None, "def", "d")]},
{"type": "table", "schema": "d"},
{"type": "view", "schema": "d"},
{"type": "function", "schema": "d"},
]
)
])
@pytest.mark.parametrize(
@ -445,14 +416,12 @@ def test_join_using_suggests_common_columns(col_list):
)
def test_two_join_alias_dot_suggests_cols1(sql):
suggestions = suggest_type(sql, sql)
assert sorted_dicts(suggestions) == sorted_dicts(
[
assert sorted_dicts(suggestions) == sorted_dicts([
{"type": "column", "tables": [(None, "ghi", "g")]},
{"type": "table", "schema": "g"},
{"type": "view", "schema": "g"},
{"type": "function", "schema": "g"},
]
)
])
def test_2_statements_2nd_current():
@ -460,14 +429,12 @@ def test_2_statements_2nd_current():
assert sorted_dicts(suggestions) == sorted_dicts([{"type": "table", "schema": []}, {"type": "view", "schema": []}, {"type": "schema"}])
suggestions = suggest_type("select * from a; select from b", "select * from a; select ")
assert sorted_dicts(suggestions) == sorted_dicts(
[
assert sorted_dicts(suggestions) == sorted_dicts([
{"type": "alias", "aliases": ["b"]},
{"type": "column", "tables": [(None, "b", None)]},
{"type": "function", "schema": []},
{"type": "keyword"},
]
)
])
# Should work even if first statement is invalid
suggestions = suggest_type("select * from; select * from ", "select * from; select * from ")
@ -479,14 +446,12 @@ def test_2_statements_1st_current():
assert sorted_dicts(suggestions) == sorted_dicts([{"type": "table", "schema": []}, {"type": "view", "schema": []}, {"type": "schema"}])
suggestions = suggest_type("select from a; select * from b", "select ")
assert sorted_dicts(suggestions) == sorted_dicts(
[
assert sorted_dicts(suggestions) == sorted_dicts([
{"type": "alias", "aliases": ["a"]},
{"type": "column", "tables": [(None, "a", None)]},
{"type": "function", "schema": []},
{"type": "keyword"},
]
)
])
def test_3_statements_2nd_current():
@ -494,14 +459,12 @@ def test_3_statements_2nd_current():
assert sorted_dicts(suggestions) == sorted_dicts([{"type": "table", "schema": []}, {"type": "view", "schema": []}, {"type": "schema"}])
suggestions = suggest_type("select * from a; select from b; select * from c", "select * from a; select ")
assert sorted_dicts(suggestions) == sorted_dicts(
[
assert sorted_dicts(suggestions) == sorted_dicts([
{"type": "alias", "aliases": ["b"]},
{"type": "column", "tables": [(None, "b", None)]},
{"type": "function", "schema": []},
{"type": "keyword"},
]
)
])
def test_create_db_with_template():

View file

@ -93,7 +93,7 @@ def test_batch_mode(executor):
run(executor, """create table test(a text)""")
run(executor, """insert into test values('abc'), ('def'), ('ghi')""")
sql = "select count(*) from test;\n" "select * from test limit 1;"
sql = "select count(*) from test;\nselect * from test limit 1;"
runner = CliRunner()
result = runner.invoke(cli, args=CLI_ARGS, input=sql)
@ -107,7 +107,7 @@ def test_batch_mode_table(executor):
run(executor, """create table test(a text)""")
run(executor, """insert into test values('abc'), ('def'), ('ghi')""")
sql = "select count(*) from test;\n" "select * from test limit 1;"
sql = "select count(*) from test;\nselect * from test limit 1;"
runner = CliRunner()
result = runner.invoke(cli, args=CLI_ARGS + ["-t"], input=sql)
@ -543,7 +543,7 @@ def test_init_command_arg(executor):
@dbtest
def test_init_command_multiple_arg(executor):
init_command = "set sql_select_limit=2000; set max_join_size=20000"
sql = 'show variables like "sql_select_limit";\n' 'show variables like "max_join_size"'
sql = 'show variables like "sql_select_limit";\nshow variables like "max_join_size"'
runner = CliRunner()
result = runner.invoke(cli, args=CLI_ARGS + ["--init-command", init_command], input=sql)
@ -554,6 +554,7 @@ def test_init_command_multiple_arg(executor):
assert expected_sql_select_limit in result.output
assert expected_max_join_size in result.output
@dbtest
def test_global_init_commands(executor):
"""Tests that global init-commands from config are executed by default."""

View file

@ -122,24 +122,24 @@ def test_query_starts_with_comment():
def test_queries_start_with():
sql = "# comment\n" "show databases;" "use foo;"
sql = "# comment\nshow databases;use foo;"
assert queries_start_with(sql, ("show", "select")) is True
assert queries_start_with(sql, ("use", "drop")) is True
assert queries_start_with(sql, ("delete", "update")) is False
def test_is_destructive():
sql = "use test;\n" "show databases;\n" "drop database foo;"
sql = "use test;\nshow databases;\ndrop database foo;"
assert is_destructive(sql) is True
def test_is_destructive_update_with_where_clause():
sql = "use test;\n" "show databases;\n" "UPDATE test SET x = 1 WHERE id = 1;"
sql = "use test;\nshow databases;\nUPDATE test SET x = 1 WHERE id = 1;"
assert is_destructive(sql) is False
def test_is_destructive_update_without_where_clause():
sql = "use test;\n" "show databases;\n" "UPDATE test SET x = 1;"
sql = "use test;\nshow databases;\nUPDATE test SET x = 1;"
assert is_destructive(sql) is True
@ -167,7 +167,7 @@ def test_query_has_where_clause(sql, has_where_clause):
("drop database foo; create database bar", "foo", True),
("select bar from foo; drop database bazz", "foo", False),
("select bar from foo; drop database bazz", "bazz", True),
("-- dropping database \n " "drop -- really dropping \n " "schema abc -- now it is dropped", "abc", True),
("-- dropping database \n drop -- really dropping \n schema abc -- now it is dropped", "abc", True),
],
)
def test_is_dropping_database(sql, dbname, is_dropping):

View file

@ -72,22 +72,19 @@ def test_table_completion(completer, complete_event):
text = "SELECT * FROM "
position = len(text)
result = completer.get_completions(Document(text=text, cursor_position=position), complete_event)
assert list(result) == list(
[
assert list(result) == list([
Completion(text="users", start_position=0),
Completion(text="orders", start_position=0),
Completion(text="`select`", start_position=0),
Completion(text="`réveillé`", start_position=0),
]
)
])
def test_function_name_completion(completer, complete_event):
text = "SELECT MA"
position = len("SELECT MA")
result = completer.get_completions(Document(text=text, cursor_position=position), complete_event)
assert list(result) == list(
[
assert list(result) == list([
Completion(text="MAX", start_position=-2),
Completion(text="CHANGE MASTER TO", start_position=-2),
Completion(text="CURRENT_TIMESTAMP", start_position=-2),
@ -97,8 +94,7 @@ def test_function_name_completion(completer, complete_event):
Completion(text="PRIMARY", start_position=-2),
Completion(text="ROW_FORMAT", start_position=-2),
Completion(text="SMALLINT", start_position=-2),
]
)
])
def test_suggested_column_names(completer, complete_event):
@ -138,15 +134,13 @@ def test_suggested_column_names_in_function(completer, complete_event):
text = "SELECT MAX( from users"
position = len("SELECT MAX(")
result = completer.get_completions(Document(text=text, cursor_position=position), complete_event)
assert list(result) == list(
[
assert list(result) == list([
Completion(text="*", start_position=0),
Completion(text="id", start_position=0),
Completion(text="email", start_position=0),
Completion(text="first_name", start_position=0),
Completion(text="last_name", start_position=0),
]
)
])
def test_suggested_column_names_with_table_dot(completer, complete_event):
@ -160,15 +154,13 @@ def test_suggested_column_names_with_table_dot(completer, complete_event):
text = "SELECT users. from users"
position = len("SELECT users.")
result = list(completer.get_completions(Document(text=text, cursor_position=position), complete_event))
assert result == list(
[
assert result == list([
Completion(text="*", start_position=0),
Completion(text="id", start_position=0),
Completion(text="email", start_position=0),
Completion(text="first_name", start_position=0),
Completion(text="last_name", start_position=0),
]
)
])
def test_suggested_column_names_with_alias(completer, complete_event):
@ -182,15 +174,13 @@ def test_suggested_column_names_with_alias(completer, complete_event):
text = "SELECT u. from users u"
position = len("SELECT u.")
result = list(completer.get_completions(Document(text=text, cursor_position=position), complete_event))
assert result == list(
[
assert result == list([
Completion(text="*", start_position=0),
Completion(text="id", start_position=0),
Completion(text="email", start_position=0),
Completion(text="first_name", start_position=0),
Completion(text="last_name", start_position=0),
]
)
])
def test_suggested_multiple_column_names(completer, complete_event):
@ -231,15 +221,13 @@ def test_suggested_multiple_column_names_with_alias(completer, complete_event):
text = "SELECT u.id, u. from users u"
position = len("SELECT u.id, u.")
result = list(completer.get_completions(Document(text=text, cursor_position=position), complete_event))
assert result == list(
[
assert result == list([
Completion(text="*", start_position=0),
Completion(text="id", start_position=0),
Completion(text="email", start_position=0),
Completion(text="first_name", start_position=0),
Completion(text="last_name", start_position=0),
]
)
])
def test_suggested_multiple_column_names_with_dot(completer, complete_event):
@ -254,77 +242,65 @@ def test_suggested_multiple_column_names_with_dot(completer, complete_event):
text = "SELECT users.id, users. from users u"
position = len("SELECT users.id, users.")
result = list(completer.get_completions(Document(text=text, cursor_position=position), complete_event))
assert result == list(
[
assert result == list([
Completion(text="*", start_position=0),
Completion(text="id", start_position=0),
Completion(text="email", start_position=0),
Completion(text="first_name", start_position=0),
Completion(text="last_name", start_position=0),
]
)
])
def test_suggested_aliases_after_on(completer, complete_event):
text = "SELECT u.name, o.id FROM users u JOIN orders o ON "
position = len("SELECT u.name, o.id FROM users u JOIN orders o ON ")
result = list(completer.get_completions(Document(text=text, cursor_position=position), complete_event))
assert result == list(
[
assert result == list([
Completion(text="u", start_position=0),
Completion(text="o", start_position=0),
]
)
])
def test_suggested_aliases_after_on_right_side(completer, complete_event):
text = "SELECT u.name, o.id FROM users u JOIN orders o ON o.user_id = "
position = len("SELECT u.name, o.id FROM users u JOIN orders o ON o.user_id = ")
result = list(completer.get_completions(Document(text=text, cursor_position=position), complete_event))
assert result == list(
[
assert result == list([
Completion(text="u", start_position=0),
Completion(text="o", start_position=0),
]
)
])
def test_suggested_tables_after_on(completer, complete_event):
text = "SELECT users.name, orders.id FROM users JOIN orders ON "
position = len("SELECT users.name, orders.id FROM users JOIN orders ON ")
result = list(completer.get_completions(Document(text=text, cursor_position=position), complete_event))
assert result == list(
[
assert result == list([
Completion(text="users", start_position=0),
Completion(text="orders", start_position=0),
]
)
])
def test_suggested_tables_after_on_right_side(completer, complete_event):
text = "SELECT users.name, orders.id FROM users JOIN orders ON orders.user_id = "
position = len("SELECT users.name, orders.id FROM users JOIN orders ON orders.user_id = ")
result = list(completer.get_completions(Document(text=text, cursor_position=position), complete_event))
assert result == list(
[
assert result == list([
Completion(text="users", start_position=0),
Completion(text="orders", start_position=0),
]
)
])
def test_table_names_after_from(completer, complete_event):
text = "SELECT * FROM "
position = len("SELECT * FROM ")
result = list(completer.get_completions(Document(text=text, cursor_position=position), complete_event))
assert result == list(
[
assert result == list([
Completion(text="users", start_position=0),
Completion(text="orders", start_position=0),
Completion(text="`select`", start_position=0),
Completion(text="`réveillé`", start_position=0),
]
)
])
def test_auto_escaped_col_names(completer, complete_event):

View file

@ -44,7 +44,7 @@ def test_bools(executor):
@dbtest
def test_binary(executor):
run(executor, """create table bt(geom linestring NOT NULL)""")
run(executor, "INSERT INTO bt VALUES " "(ST_GeomFromText('LINESTRING(116.37604 39.73979,116.375 39.73965)'));")
run(executor, "INSERT INTO bt VALUES (ST_GeomFromText('LINESTRING(116.37604 39.73979,116.375 39.73965)'));")
results = run(executor, """select * from bt""")
geom = (
@ -139,7 +139,7 @@ def test_favorite_query_multiple_statement(executor):
run(executor, "insert into test values('abc')")
run(executor, "insert into test values('def')")
results = run(executor, "\\fs test-ad select * from test where a like 'a%'; " "select * from test where a like 'd%'")
results = run(executor, "\\fs test-ad select * from test where a like 'a%'; select * from test where a like 'd%'")
assert_result_equal(results, status="Saved.")
results = run(executor, "\\f test-ad")