1
0
Fork 0

Merging upstream version 1.31.1.

Signed-off-by: Daniel Baumann <daniel@debian.org>
This commit is contained in:
Daniel Baumann 2025-04-29 04:24:10 +02:00
parent bf2e4897d0
commit c8f71b8649
Signed by: daniel
GPG key ID: FBB4F0E80A80222F
27 changed files with 387 additions and 394 deletions

View file

@ -4,6 +4,6 @@
## Checklist ## Checklist
<!--- We appreciate your help and want to give you credit. Please take a moment to put an `x` in the boxes below as you complete them. --> <!--- We appreciate your help and want to give you credit. Place an `x` in the boxes below as you complete them. -->
- [ ] I've added this contribution to the `changelog.md`. - [ ] I've added this contribution to the `changelog.md`.
- [ ] I've added my name to the `AUTHORS` file (or it's already there). - [ ] I've added my name to the `AUTHORS` file (or it's already there).

View file

@ -45,6 +45,3 @@ jobs:
TERM: xterm TERM: xterm
run: | run: |
uv run tox -e py${{ matrix.python-version }} uv run tox -e py${{ matrix.python-version }}
- name: Run Style Checks
run: uv run tox -e style

30
.github/workflows/lint.yml vendored Normal file
View file

@ -0,0 +1,30 @@
name: lint
on:
pull_request:
paths-ignore:
- '**.md'
- 'AUTHORS'
jobs:
linters:
name: Linters
runs-on: ubuntu-latest
steps:
- name: Check out Git repository
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
# todo
# remember to sync the ruff-check version number with pyproject.toml
# - name: Run ruff check
# uses: astral-sh/ruff-action@9828f49eb4cadf267b40eaa330295c412c68c1f9 # v3.2.2
# with:
# version: 0.11.5
# remember to sync the ruff-check version number with pyproject.toml
- name: Run ruff format
uses: astral-sh/ruff-action@9828f49eb4cadf267b40eaa330295c412c68c1f9 # v3.2.2
with:
version: 0.11.5
args: 'format --check'

View file

@ -46,8 +46,10 @@ jobs:
run: | run: |
uv run tox -e py${{ matrix.python-version }} uv run tox -e py${{ matrix.python-version }}
- name: Run Style Checks # TODO enable style checks here and in CI for PRs
run: uv run tox -e style #
# - name: Run Style Checks
# run: uv run tox -e style
build: build:
runs-on: ubuntu-latest runs-on: ubuntu-latest

View file

@ -19,13 +19,12 @@ You'll always get credit for your work.
$ git remote add upstream git@github.com:dbcli/mycli.git $ git remote add upstream git@github.com:dbcli/mycli.git
``` ```
4. Set up a [virtual environment](http://docs.python-guide.org/en/latest/dev/virtualenvs) 4. Set up [uv](https://docs.astral.sh/uv/getting-started/installation/)
for development: for development:
```bash ```bash
$ cd mycli $ cd mycli
$ pip install virtualenv $ uv venv
$ virtualenv mycli_dev
``` ```
We've just created a virtual environment that we'll use to install all the dependencies We've just created a virtual environment that we'll use to install all the dependencies
@ -33,20 +32,14 @@ You'll always get credit for your work.
need to activate the virtual environment: need to activate the virtual environment:
```bash ```bash
$ source mycli_dev/bin/activate $ source .venv/bin/activate
```
When you're done working, you can deactivate the virtual environment:
```bash
$ deactivate
``` ```
5. Install the dependencies and development tools: 5. Install the dependencies and development tools:
```bash ```bash
$ pip install -r requirements-dev.txt $ uv pip install -r requirements-dev.txt
$ pip install --editable . $ uv pip install --editable .
``` ```
6. Create a branch for your bugfix or feature based off the `main` branch: 6. Create a branch for your bugfix or feature based off the `main` branch:
@ -76,18 +69,10 @@ You'll always get credit for your work.
While you work on mycli, it's important to run the tests to make sure your code While you work on mycli, it's important to run the tests to make sure your code
hasn't broken any existing functionality. To run the tests, just type in: hasn't broken any existing functionality. To run the tests, just type in:
```bash
$ ./setup.py test
```
Mycli supports Python 2.7 and 3.4+. You can test against multiple versions of
Python by running tox:
```bash ```bash
$ tox $ tox
``` ```
### Test Database Credentials ### Test Database Credentials
The tests require a database connection to work. You can tell the tests which The tests require a database connection to work. You can tell the tests which
@ -126,42 +111,6 @@ $ readlink -f $(which ex)
``` ```
## Coding Style
Mycli requires code submissions to adhere to
[PEP 8](https://www.python.org/dev/peps/pep-0008/).
It's easy to check the style of your code, just run:
```bash
$ ./setup.py lint
```
If you see any PEP 8 style issues, you can automatically fix them by running:
```bash
$ ./setup.py lint --fix
```
Be sure to commit and push any PEP 8 fixes.
## Releasing a new version of mycli ## Releasing a new version of mycli
You have been made the maintainer of `mycli`? Congratulations! We have a release script to help you: Create a new [release](https://github.com/dbcli/mycli/releases) in Github. This will trigger a Github action which will run all the tests, build the wheel and upload it to PyPI.
```sh
> python release.py --help
Usage: release.py [options]
Options:
-h, --help show this help message and exit
-c, --confirm-steps Confirm every step. If the step is not confirmed, it
will be skipped.
-d, --dry-run Print out, but not actually run any steps.
```
To release a new version of the package:
* Create and merge a PR to bump the version in the changelog ([example PR](https://github.com/dbcli/mycli/pull/1043)).
* Pull `main` and bump the version number inside `mycli/__init__.py`. Do not check in - the release script will do that.
* Make sure you have the dev requirements installed: `pip install -r requirements-dev.txt -U --upgrade-strategy only-if-needed`.
* Finally, run the release script: `python release.py`.

View file

@ -147,13 +147,10 @@ get this running in a development setup.
https://github.com/dbcli/mycli/blob/main/CONTRIBUTING.md https://github.com/dbcli/mycli/blob/main/CONTRIBUTING.md
Please feel free to reach out to me if you need help.
My email: amjith.r@gmail.com ## Additional Install Instructions:
Twitter: [@amjithr](http://twitter.com/amjithr) These are some alternative ways to install mycli that are not managed by our team but provided by OS package maintainers. These packages could be slightly out of date and take time to release the latest version.
## Detailed Install Instructions:
### Arch, Manjaro ### Arch, Manjaro
@ -202,7 +199,7 @@ Thanks to [PyMysql](https://github.com/PyMySQL/PyMySQL) for a pure python adapte
### Compatibility ### Compatibility
Mycli is tested on macOS and Linux, and requires Python 3.7 or better. Mycli is tested on macOS and Linux, and requires Python 3.9 or better.
**Mycli is not tested on Windows**, but the libraries used in this app are Windows-compatible. **Mycli is not tested on Windows**, but the libraries used in this app are Windows-compatible.
This means it should work without any modifications. If you're unable to run it This means it should work without any modifications. If you're unable to run it

View file

@ -1,3 +1,30 @@
1.31.1 (2025/04/25)
===================
Internal
--------
* skip style checks on Publish action
1.31.0 (NEVER RELEASED)
===================
Features
--------
* Added explicit error handle to get_password_from_file with EAFP.
* Use the "history" scheme for fzf searches.
* Deduplicate history in fzf searches.
* Add a preview window to fzf history searches.
Internal
--------
* New Project Lead: [Roland Walker](https://github.com/rolandwalker)
* Update sqlparse to <=0.6.0
* Typing/lint fixes.
1.30.0 (2025/04/19) 1.30.0 (2025/04/19)
=================== ===================

View file

@ -1,3 +1,8 @@
Project Lead:
-------------
* Roland Walker
Core Developers: Core Developers:
---------------- ----------------
@ -15,6 +20,7 @@ Contributors:
* Abirami P * Abirami P
* Adam Chainz * Adam Chainz
* Aljosha Papsch * Aljosha Papsch
* Allrob
* Andy Teijelo Pérez * Andy Teijelo Pérez
* Angelo Lupo * Angelo Lupo
* Artem Bezsmertnyi * Artem Bezsmertnyi

View file

@ -12,12 +12,10 @@ def create_toolbar_tokens_func(mycli, show_fish_help):
if mycli.multi_line: if mycli.multi_line:
delimiter = special.get_current_delimiter() delimiter = special.get_current_delimiter()
result.append( result.append((
( "class:bottom-toolbar",
"class:bottom-toolbar", " ({} [{}] will end the line) ".format("Semi-colon" if delimiter == ";" else "Delimiter", delimiter),
" ({} [{}] will end the line) ".format("Semi-colon" if delimiter == ";" else "Delimiter", delimiter), ))
)
)
if mycli.multi_line: if mycli.multi_line:
result.append(("class:bottom-toolbar.on", "[F3] Multiline: ON ")) result.append(("class:bottom-toolbar.on", "[F3] Multiline: ON "))

View file

@ -1,4 +1,5 @@
from copy import copy from copy import copy
from importlib import resources
from io import BytesIO, TextIOWrapper from io import BytesIO, TextIOWrapper
import logging import logging
import os import os
@ -10,12 +11,6 @@ from typing import Union, IO
from configobj import ConfigObj, ConfigObjError from configobj import ConfigObj, ConfigObjError
import pyaes import pyaes
try:
import importlib.resources as resources
except ImportError:
# Python < 3.7
import importlib_resources as resources
try: try:
basestring basestring
except NameError: except NameError:
@ -51,11 +46,11 @@ def read_config_file(f, list_values=True):
try: try:
config = ConfigObj(f, interpolation=False, encoding="utf8", list_values=list_values) config = ConfigObj(f, interpolation=False, encoding="utf8", list_values=list_values)
except ConfigObjError as e: except ConfigObjError as e:
log(logger, logging.WARNING, "Unable to parse line {0} of config file " "'{1}'.".format(e.line_number, f)) log(logger, logging.WARNING, "Unable to parse line {0} of config file '{1}'.".format(e.line_number, f))
log(logger, logging.WARNING, "Using successfully parsed config values.") log(logger, logging.WARNING, "Using successfully parsed config values.")
return e.config return e.config
except (IOError, OSError) as e: except (IOError, OSError) as e:
log(logger, logging.WARNING, "You don't have permission to read " "config file '{0}'.".format(e.filename)) log(logger, logging.WARNING, "You don't have permission to read config file '{0}'.".format(e.filename))
return None return None
return config return config
@ -78,12 +73,12 @@ def get_included_configs(config_file: Union[str, TextIOWrapper]) -> list:
try: try:
with open(config_file) as f: with open(config_file) as f:
include_directives = filter(lambda s: s.startswith("!includedir"), f) include_directives = filter(lambda s: s.startswith("!includedir"), f)
dirs = map(lambda s: s.strip().split()[-1], include_directives) dirs_split = map(lambda s: s.strip().split()[-1], include_directives)
dirs = filter(os.path.isdir, dirs) dirs = filter(os.path.isdir, dirs_split)
for dir in dirs: for dir_ in dirs:
for filename in os.listdir(dir): for filename in os.listdir(dir_):
if filename.endswith(".cnf"): if filename.endswith(".cnf"):
included_configs.append(os.path.join(dir, filename)) included_configs.append(os.path.join(dir_, filename))
except (PermissionError, UnicodeDecodeError): except (PermissionError, UnicodeDecodeError):
pass pass
return included_configs return included_configs

View file

@ -83,7 +83,13 @@ except ImportError:
# Query tuples are used for maintaining history # Query tuples are used for maintaining history
Query = namedtuple("Query", ["query", "successful", "mutating"]) Query = namedtuple("Query", ["query", "successful", "mutating"])
SUPPORT_INFO = "Home: http://mycli.net\n" "Bug tracker: https://github.com/dbcli/mycli/issues" SUPPORT_INFO = "Home: http://mycli.net\nBug tracker: https://github.com/dbcli/mycli/issues"
class PasswordFileError(Exception):
"""Base exception for errors related to reading password files."""
pass
class MyCli(object): class MyCli(object):
@ -101,10 +107,7 @@ class MyCli(object):
] ]
# check XDG_CONFIG_HOME exists and not an empty string # check XDG_CONFIG_HOME exists and not an empty string
if os.environ.get("XDG_CONFIG_HOME"): xdg_config_home = os.environ.get("XDG_CONFIG_HOME", "~/.config")
xdg_config_home = os.environ.get("XDG_CONFIG_HOME")
else:
xdg_config_home = "~/.config"
system_config_files = ["/etc/myclirc", os.path.join(os.path.expanduser(xdg_config_home), "mycli", "myclirc")] system_config_files = ["/etc/myclirc", os.path.join(os.path.expanduser(xdg_config_home), "mycli", "myclirc")]
pwd_config_file = os.path.join(os.getcwd(), ".myclirc") pwd_config_file = os.path.join(os.getcwd(), ".myclirc")
@ -253,7 +256,7 @@ class MyCli(object):
arg = re.sub(r"``", r"`", arg) arg = re.sub(r"``", r"`", arg)
self.sqlexecute.change_db(arg) self.sqlexecute.change_db(arg)
yield (None, None, None, 'You are now connected to database "%s" as ' 'user "%s"' % (self.sqlexecute.dbname, self.sqlexecute.user)) yield (None, None, None, 'You are now connected to database "%s" as user "%s"' % (self.sqlexecute.dbname, self.sqlexecute.user))
def execute_from_file(self, arg, **_): def execute_from_file(self, arg, **_):
if not arg: if not arg:
@ -305,7 +308,7 @@ class MyCli(object):
self.echo('Error: Unable to open the log file "{}".'.format(log_file), err=True, fg="red") self.echo('Error: Unable to open the log file "{}".'.format(log_file), err=True, fg="red")
return return
formatter = logging.Formatter("%(asctime)s (%(process)d/%(threadName)s) " "%(name)s %(levelname)s - %(message)s") formatter = logging.Formatter("%(asctime)s (%(process)d/%(threadName)s) %(name)s %(levelname)s - %(message)s")
handler.setFormatter(formatter) handler.setFormatter(formatter)
@ -536,14 +539,19 @@ class MyCli(object):
sys.exit(1) sys.exit(1)
def get_password_from_file(self, password_file): def get_password_from_file(self, password_file):
password_from_file = None
if password_file: if password_file:
if (os.path.isfile(password_file) or stat.S_ISFIFO(os.stat(password_file).st_mode)) and os.access(password_file, os.R_OK): try:
with open(password_file) as fp: with open(password_file) as fp:
password_from_file = fp.readline() password = fp.readline().strip()
password_from_file = password_from_file.rstrip().lstrip() return password
except FileNotFoundError:
return password_from_file raise PasswordFileError(f"Password file '{password_file}' not found") from None
except PermissionError:
raise PasswordFileError(f"Permission denied reading password file '{password_file}'") from None
except IsADirectoryError:
raise PasswordFileError(f"Path '{password_file}' is a directory, not a file") from None
except Exception as e:
raise PasswordFileError(f"Error reading password file '{password_file}': {str(e)}") from None
def handle_editor_command(self, text): def handle_editor_command(self, text):
r"""Editor command is any query that is prefixed or suffixed by a '\e'. r"""Editor command is any query that is prefixed or suffixed by a '\e'.
@ -635,7 +643,7 @@ class MyCli(object):
else: else:
history = None history = None
self.echo( self.echo(
'Error: Unable to open the history file "{}". ' "Your query history will not be saved.".format(history_file), 'Error: Unable to open the history file "{}". Your query history will not be saved.'.format(history_file),
err=True, err=True,
fg="red", fg="red",
) )
@ -1105,7 +1113,7 @@ class MyCli(object):
@click.command() @click.command()
@click.option("-h", "--host", envvar="MYSQL_HOST", help="Host address of the database.") @click.option("-h", "--host", envvar="MYSQL_HOST", help="Host address of the database.")
@click.option("-P", "--port", envvar="MYSQL_TCP_PORT", type=int, help="Port number to use for connection. Honors " "$MYSQL_TCP_PORT.") @click.option("-P", "--port", envvar="MYSQL_TCP_PORT", type=int, help="Port number to use for connection. Honors $MYSQL_TCP_PORT.")
@click.option("-u", "--user", help="User name to connect to the database.") @click.option("-u", "--user", help="User name to connect to the database.")
@click.option("-S", "--socket", envvar="MYSQL_UNIX_PORT", help="The socket file to use for connection.") @click.option("-S", "--socket", envvar="MYSQL_UNIX_PORT", help="The socket file to use for connection.")
@click.option("-p", "--password", "password", envvar="MYSQL_PWD", type=str, help="Password to connect to the database.") @click.option("-p", "--password", "password", envvar="MYSQL_PWD", type=str, help="Password to connect to the database.")
@ -1131,7 +1139,7 @@ class MyCli(object):
@click.option( @click.option(
"--ssl-verify-server-cert", "--ssl-verify-server-cert",
is_flag=True, is_flag=True,
help=('Verify server\'s "Common Name" in its cert against ' "hostname used when connecting. This option is disabled " "by default."), help=('Verify server\'s "Common Name" in its cert against hostname used when connecting. This option is disabled by default.'),
) )
# as of 2016-02-15 revocation list is not supported by underling PyMySQL # as of 2016-02-15 revocation list is not supported by underling PyMySQL
# library (--ssl-crl and --ssl-crlpath options in vanilla mysql client) # library (--ssl-crl and --ssl-crlpath options in vanilla mysql client)
@ -1229,7 +1237,7 @@ def cli(
try: try:
alias_dsn = mycli.config["alias_dsn"] alias_dsn = mycli.config["alias_dsn"]
except KeyError: except KeyError:
click.secho("Invalid DSNs found in the config file. " 'Please check the "[alias_dsn]" section in myclirc.', err=True, fg="red") click.secho("Invalid DSNs found in the config file. Please check the \"[alias_dsn]\" section in myclirc.", err=True, fg="red")
sys.exit(1) sys.exit(1)
except Exception as e: except Exception as e:
click.secho(str(e), err=True, fg="red") click.secho(str(e), err=True, fg="red")
@ -1285,7 +1293,7 @@ def cli(
dsn_uri = mycli.config["alias_dsn"][dsn] dsn_uri = mycli.config["alias_dsn"][dsn]
except KeyError: except KeyError:
click.secho( click.secho(
"Could not find the specified DSN in the config file. " 'Please check the "[alias_dsn]" section in your ' "myclirc.", "Could not find the specified DSN in the config file. Please check the \"[alias_dsn]\" section in your myclirc.",
err=True, err=True,
fg="red", fg="red",
) )
@ -1362,7 +1370,7 @@ def cli(
if combined_init_cmd: if combined_init_cmd:
click.echo("Executing init-command: %s" % combined_init_cmd, err=True) click.echo("Executing init-command: %s" % combined_init_cmd, err=True)
mycli.logger.debug("Launch Params: \n" "\tdatabase: %r" "\tuser: %r" "\thost: %r" "\tport: %r", database, user, host, port) mycli.logger.debug("Launch Params: \n\tdatabase: %r\tuser: %r\thost: %r\tport: %r", database, user, host, port)
# --execute argument # --execute argument
if execute: if execute:

View file

@ -4,11 +4,11 @@ import platform
if os.name == "posix": if os.name == "posix":
if platform.system() == "Darwin": if platform.system() == "Darwin":
DEFAULT_SOCKET_DIRS = ("/tmp",) DEFAULT_SOCKET_DIRS = ["/tmp"]
else: else:
DEFAULT_SOCKET_DIRS = ("/var/run", "/var/lib") DEFAULT_SOCKET_DIRS = ["/var/run", "/var/lib"]
else: else:
DEFAULT_SOCKET_DIRS = () DEFAULT_SOCKET_DIRS = []
def list_path(root_dir): def list_path(root_dir):

View file

@ -32,7 +32,7 @@ def confirm_destructive_query(queries):
* False if the query is destructive and the user doesn't want to proceed. * False if the query is destructive and the user doesn't want to proceed.
""" """
prompt_text = "You're about to run a destructive command.\n" "Do you want to proceed? (y/n)" prompt_text = "You're about to run a destructive command.\nDo you want to proceed? (y/n)"
if is_destructive(queries) and sys.stdin.isatty(): if is_destructive(queries) and sys.stdin.isatty():
return prompt(prompt_text, type=BOOLEAN_TYPE) return prompt(prompt_text, type=BOOLEAN_TYPE)

View file

@ -116,7 +116,7 @@ def status(cur, **_):
output.append(("Connection:", host_info)) output.append(("Connection:", host_info))
query = "SELECT @@character_set_server, @@character_set_database, " "@@character_set_client, @@character_set_connection LIMIT 1;" query = "SELECT @@character_set_server, @@character_set_database, @@character_set_client, @@character_set_connection LIMIT 1;"
log.debug(query) log.debug(query)
cur.execute(query) cur.execute(query)
charset = cur.fetchone() charset = cur.fetchone()

View file

@ -98,15 +98,18 @@ def set_expanded_output(val):
def is_expanded_output(): def is_expanded_output():
return use_expanded_output return use_expanded_output
@export @export
def set_forced_horizontal_output(val): def set_forced_horizontal_output(val):
global force_horizontal_output global force_horizontal_output
force_horizontal_output = val force_horizontal_output = val
@export @export
def forced_horizontal(): def forced_horizontal():
return force_horizontal_output return force_horizontal_output
_logger = logging.getLogger(__name__) _logger = logging.getLogger(__name__)

View file

@ -1,3 +1,4 @@
import re
from shutil import which from shutil import which
from pyfzf import FzfPrompt from pyfzf import FzfPrompt
@ -28,13 +29,20 @@ def search_history(event: KeyPressEvent):
formatted_history_items = [] formatted_history_items = []
original_history_items = [] original_history_items = []
seen = {}
for item, timestamp in history_items_with_timestamp: for item, timestamp in history_items_with_timestamp:
formatted_item = item.replace("\n", " ") formatted_item = re.sub(r'\s+', ' ', item)
timestamp = timestamp.split(".")[0] if "." in timestamp else timestamp timestamp = timestamp.split(".")[0] if "." in timestamp else timestamp
if formatted_item in seen:
continue
seen[formatted_item] = True
formatted_history_items.append(f"{timestamp} {formatted_item}") formatted_history_items.append(f"{timestamp} {formatted_item}")
original_history_items.append(item) original_history_items.append(item)
result = fzf.prompt(formatted_history_items, fzf_options="--tiebreak=index") result = fzf.prompt(
formatted_history_items,
fzf_options="--scheme=history --tiebreak=index --preview-window=down:wrap --preview=\"printf '%s' {}\"",
)
if result: if result:
selected_index = formatted_history_items.index(result[0]) selected_index = formatted_history_items.index(result[0])

View file

@ -195,14 +195,12 @@ class SQLExecute(object):
init_command, init_command,
) )
conv = conversions.copy() conv = conversions.copy()
conv.update( conv.update({
{ FIELD_TYPE.TIMESTAMP: lambda obj: (convert_datetime(obj) or obj),
FIELD_TYPE.TIMESTAMP: lambda obj: (convert_datetime(obj) or obj), FIELD_TYPE.DATETIME: lambda obj: (convert_datetime(obj) or obj),
FIELD_TYPE.DATETIME: lambda obj: (convert_datetime(obj) or obj), FIELD_TYPE.TIME: lambda obj: (convert_timedelta(obj) or obj),
FIELD_TYPE.TIME: lambda obj: (convert_timedelta(obj) or obj), FIELD_TYPE.DATE: lambda obj: (convert_date(obj) or obj),
FIELD_TYPE.DATE: lambda obj: (convert_date(obj) or obj), })
}
)
defer_connect = False defer_connect = False

View file

@ -3,7 +3,7 @@ name = "mycli"
dynamic = ["version"] dynamic = ["version"]
description = "CLI for MySQL Database. With auto-completion and syntax highlighting." description = "CLI for MySQL Database. With auto-completion and syntax highlighting."
readme = "README.md" readme = "README.md"
requires-python = ">=3.7" requires-python = ">=3.9"
license = { text = "BSD" } license = { text = "BSD" }
authors = [{ name = "Mycli Core Team", email = "mycli-dev@googlegroups.com" }] authors = [{ name = "Mycli Core Team", email = "mycli-dev@googlegroups.com" }]
urls = { homepage = "http://mycli.net" } urls = { homepage = "http://mycli.net" }
@ -14,7 +14,7 @@ dependencies = [
"Pygments>=1.6", "Pygments>=1.6",
"prompt_toolkit>=3.0.6,<4.0.0", "prompt_toolkit>=3.0.6,<4.0.0",
"PyMySQL >= 0.9.2", "PyMySQL >= 0.9.2",
"sqlparse>=0.3.0,<0.5.0", "sqlparse>=0.3.0,<0.6.0",
"sqlglot>=5.1.3", "sqlglot>=5.1.3",
"configobj >= 5.0.5", "configobj >= 5.0.5",
"cli_helpers[styles] >= 2.2.1", "cli_helpers[styles] >= 2.2.1",
@ -56,4 +56,41 @@ mycli = ["myclirc", "AUTHORS", "SPONSORS"]
include = ["mycli*"] include = ["mycli*"]
[tool.ruff] [tool.ruff]
target-version = 'py39'
line-length = 140 line-length = 140
[tool.ruff.lint]
select = [
'A',
'I',
'E',
'W',
'F',
'C4',
'PIE',
'TID',
]
ignore = [
'E401', # Multiple imports on one line
'E402', # Module level import not at top of file
'PIE808', # range() starting with 0
# https://docs.astral.sh/ruff/formatter/#conflicting-lint-rules
'E111', # indentation-with-invalid-multiple
'E114', # indentation-with-invalid-multiple-comment
'E117', # over-indented
'W191', # tab-indentation
]
[tool.ruff.lint.isort]
force-sort-within-sections = true
known-first-party = [
'mycli',
]
[tool.ruff.format]
preview = true
quote-style = 'preserve'
exclude = [
'build',
'mycli_dev',
]

View file

@ -65,7 +65,7 @@ def before_all(context):
_, my_cnf = mkstemp() _, my_cnf = mkstemp()
with open(my_cnf, "w") as f: with open(my_cnf, "w") as f:
f.write( f.write(
"[client]\n" "pager={0} {1} {2}\n".format( "[client]\npager={0} {1} {2}\n".format(
sys.executable, os.path.join(context.package_root, "test/features/wrappager.py"), context.conf["pager_boundary"] sys.executable, os.path.join(context.package_root, "test/features/wrappager.py"), context.conf["pager_boundary"]
) )
) )

View file

@ -41,7 +41,7 @@ def step_see_small_results(context):
@then("we see large results in vertical format") @then("we see large results in vertical format")
def step_see_large_results(context): def step_see_large_results(context):
rows = ["{n:3}| {n}".format(n=str(n)) for n in range(1, 50)] rows = ["{n:3}| {n}".format(n=str(n)) for n in range(1, 50)]
expected = "***************************[ 1. row ]" "***************************\r\n" + "{}\r\n".format("\r\n".join(rows) + "\r\n") expected = "***************************[ 1. row ]***************************\r\n" + "{}\r\n".format("\r\n".join(rows) + "\r\n")
wrappers.expect_pager(context, expected, timeout=10) wrappers.expect_pager(context, expected, timeout=10)
wrappers.expect_exact(context, "1 row in set", timeout=2) wrappers.expect_exact(context, "1 row in set", timeout=2)

View file

@ -32,7 +32,7 @@ def status_contains(context, expression):
@when("we create my.cnf file") @when("we create my.cnf file")
def step_create_my_cnf_file(context): def step_create_my_cnf_file(context):
my_cnf = "[client]\n" f"host = {HOST}\n" f"port = {PORT}\n" f"user = {USER}\n" f"password = {PASSWORD}\n" my_cnf = f"[client]\nhost = {HOST}\nport = {PORT}\nuser = {USER}\npassword = {PASSWORD}\n"
with open(MY_CNF_PATH, "w") as f: with open(MY_CNF_PATH, "w") as f:
f.write(my_cnf) f.write(my_cnf)
@ -40,7 +40,7 @@ def step_create_my_cnf_file(context):
@when("we create mylogin.cnf file") @when("we create mylogin.cnf file")
def step_create_mylogin_cnf_file(context): def step_create_mylogin_cnf_file(context):
os.environ.pop("MYSQL_TEST_LOGIN_FILE", None) os.environ.pop("MYSQL_TEST_LOGIN_FILE", None)
mylogin_cnf = f"[{TEST_LOGIN_PATH}]\n" f"host = {HOST}\n" f"port = {PORT}\n" f"user = {USER}\n" f"password = {PASSWORD}\n" mylogin_cnf = f"[{TEST_LOGIN_PATH}]\nhost = {HOST}\nport = {PORT}\nuser = {USER}\npassword = {PASSWORD}\n"
with open(MYLOGIN_CNF_PATH, "wb") as f: with open(MYLOGIN_CNF_PATH, "wb") as f:
input_file = io.StringIO(mylogin_cnf) input_file = io.StringIO(mylogin_cnf)
f.write(encrypt_mylogin_cnf(input_file).read()) f.write(encrypt_mylogin_cnf(input_file).read())

View file

@ -81,9 +81,7 @@ def run_cli(context, run_args=None, exclude_args=None):
try: try:
cli_cmd = context.conf["cli_command"] cli_cmd = context.conf["cli_command"]
except KeyError: except KeyError:
cli_cmd = ('{0!s} -c "' "import coverage ; " "coverage.process_startup(); " "import mycli.main; " "mycli.main.cli()" '"').format( cli_cmd = ('{0!s} -c "import coverage ; coverage.process_startup(); import mycli.main; mycli.main.cli()"').format(sys.executable)
sys.executable
)
cmd_parts = [cli_cmd] + rendered_args cmd_parts = [cli_cmd] + rendered_args
cmd = " ".join(cmd_parts) cmd = " ".join(cmd_parts)

View file

@ -9,26 +9,22 @@ def sorted_dicts(dicts):
def test_select_suggests_cols_with_visible_table_scope(): def test_select_suggests_cols_with_visible_table_scope():
suggestions = suggest_type("SELECT FROM tabl", "SELECT ") suggestions = suggest_type("SELECT FROM tabl", "SELECT ")
assert sorted_dicts(suggestions) == sorted_dicts( assert sorted_dicts(suggestions) == sorted_dicts([
[ {"type": "alias", "aliases": ["tabl"]},
{"type": "alias", "aliases": ["tabl"]}, {"type": "column", "tables": [(None, "tabl", None)]},
{"type": "column", "tables": [(None, "tabl", None)]}, {"type": "function", "schema": []},
{"type": "function", "schema": []}, {"type": "keyword"},
{"type": "keyword"}, ])
]
)
def test_select_suggests_cols_with_qualified_table_scope(): def test_select_suggests_cols_with_qualified_table_scope():
suggestions = suggest_type("SELECT FROM sch.tabl", "SELECT ") suggestions = suggest_type("SELECT FROM sch.tabl", "SELECT ")
assert sorted_dicts(suggestions) == sorted_dicts( assert sorted_dicts(suggestions) == sorted_dicts([
[ {"type": "alias", "aliases": ["tabl"]},
{"type": "alias", "aliases": ["tabl"]}, {"type": "column", "tables": [("sch", "tabl", None)]},
{"type": "column", "tables": [("sch", "tabl", None)]}, {"type": "function", "schema": []},
{"type": "function", "schema": []}, {"type": "keyword"},
{"type": "keyword"}, ])
]
)
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -48,14 +44,12 @@ def test_select_suggests_cols_with_qualified_table_scope():
) )
def test_where_suggests_columns_functions(expression): def test_where_suggests_columns_functions(expression):
suggestions = suggest_type(expression, expression) suggestions = suggest_type(expression, expression)
assert sorted_dicts(suggestions) == sorted_dicts( assert sorted_dicts(suggestions) == sorted_dicts([
[ {"type": "alias", "aliases": ["tabl"]},
{"type": "alias", "aliases": ["tabl"]}, {"type": "column", "tables": [(None, "tabl", None)]},
{"type": "column", "tables": [(None, "tabl", None)]}, {"type": "function", "schema": []},
{"type": "function", "schema": []}, {"type": "keyword"},
{"type": "keyword"}, ])
]
)
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -67,27 +61,23 @@ def test_where_suggests_columns_functions(expression):
) )
def test_where_in_suggests_columns(expression): def test_where_in_suggests_columns(expression):
suggestions = suggest_type(expression, expression) suggestions = suggest_type(expression, expression)
assert sorted_dicts(suggestions) == sorted_dicts( assert sorted_dicts(suggestions) == sorted_dicts([
[ {"type": "alias", "aliases": ["tabl"]},
{"type": "alias", "aliases": ["tabl"]}, {"type": "column", "tables": [(None, "tabl", None)]},
{"type": "column", "tables": [(None, "tabl", None)]}, {"type": "function", "schema": []},
{"type": "function", "schema": []}, {"type": "keyword"},
{"type": "keyword"}, ])
]
)
def test_where_equals_any_suggests_columns_or_keywords(): def test_where_equals_any_suggests_columns_or_keywords():
text = "SELECT * FROM tabl WHERE foo = ANY(" text = "SELECT * FROM tabl WHERE foo = ANY("
suggestions = suggest_type(text, text) suggestions = suggest_type(text, text)
assert sorted_dicts(suggestions) == sorted_dicts( assert sorted_dicts(suggestions) == sorted_dicts([
[ {"type": "alias", "aliases": ["tabl"]},
{"type": "alias", "aliases": ["tabl"]}, {"type": "column", "tables": [(None, "tabl", None)]},
{"type": "column", "tables": [(None, "tabl", None)]}, {"type": "function", "schema": []},
{"type": "function", "schema": []}, {"type": "keyword"},
{"type": "keyword"}, ])
]
)
def test_lparen_suggests_cols(): def test_lparen_suggests_cols():
@ -107,14 +97,12 @@ def test_operand_inside_function_suggests_cols2():
def test_select_suggests_cols_and_funcs(): def test_select_suggests_cols_and_funcs():
suggestions = suggest_type("SELECT ", "SELECT ") suggestions = suggest_type("SELECT ", "SELECT ")
assert sorted_dicts(suggestions) == sorted_dicts( assert sorted_dicts(suggestions) == sorted_dicts([
[ {"type": "alias", "aliases": []},
{"type": "alias", "aliases": []}, {"type": "column", "tables": []},
{"type": "column", "tables": []}, {"type": "function", "schema": []},
{"type": "function", "schema": []}, {"type": "keyword"},
{"type": "keyword"}, ])
]
)
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -170,14 +158,12 @@ def test_distinct_suggests_cols():
def test_col_comma_suggests_cols(): def test_col_comma_suggests_cols():
suggestions = suggest_type("SELECT a, b, FROM tbl", "SELECT a, b,") suggestions = suggest_type("SELECT a, b, FROM tbl", "SELECT a, b,")
assert sorted_dicts(suggestions) == sorted_dicts( assert sorted_dicts(suggestions) == sorted_dicts([
[ {"type": "alias", "aliases": ["tbl"]},
{"type": "alias", "aliases": ["tbl"]}, {"type": "column", "tables": [(None, "tbl", None)]},
{"type": "column", "tables": [(None, "tbl", None)]}, {"type": "function", "schema": []},
{"type": "function", "schema": []}, {"type": "keyword"},
{"type": "keyword"}, ])
]
)
def test_table_comma_suggests_tables_and_schemas(): def test_table_comma_suggests_tables_and_schemas():
@ -207,50 +193,42 @@ def test_insert_into_lparen_comma_suggests_cols():
def test_partially_typed_col_name_suggests_col_names(): def test_partially_typed_col_name_suggests_col_names():
suggestions = suggest_type("SELECT * FROM tabl WHERE col_n", "SELECT * FROM tabl WHERE col_n") suggestions = suggest_type("SELECT * FROM tabl WHERE col_n", "SELECT * FROM tabl WHERE col_n")
assert sorted_dicts(suggestions) == sorted_dicts( assert sorted_dicts(suggestions) == sorted_dicts([
[ {"type": "alias", "aliases": ["tabl"]},
{"type": "alias", "aliases": ["tabl"]}, {"type": "column", "tables": [(None, "tabl", None)]},
{"type": "column", "tables": [(None, "tabl", None)]}, {"type": "function", "schema": []},
{"type": "function", "schema": []}, {"type": "keyword"},
{"type": "keyword"}, ])
]
)
def test_dot_suggests_cols_of_a_table_or_schema_qualified_table(): def test_dot_suggests_cols_of_a_table_or_schema_qualified_table():
suggestions = suggest_type("SELECT tabl. FROM tabl", "SELECT tabl.") suggestions = suggest_type("SELECT tabl. FROM tabl", "SELECT tabl.")
assert sorted_dicts(suggestions) == sorted_dicts( assert sorted_dicts(suggestions) == sorted_dicts([
[ {"type": "column", "tables": [(None, "tabl", None)]},
{"type": "column", "tables": [(None, "tabl", None)]}, {"type": "table", "schema": "tabl"},
{"type": "table", "schema": "tabl"}, {"type": "view", "schema": "tabl"},
{"type": "view", "schema": "tabl"}, {"type": "function", "schema": "tabl"},
{"type": "function", "schema": "tabl"}, ])
]
)
def test_dot_suggests_cols_of_an_alias(): def test_dot_suggests_cols_of_an_alias():
suggestions = suggest_type("SELECT t1. FROM tabl1 t1, tabl2 t2", "SELECT t1.") suggestions = suggest_type("SELECT t1. FROM tabl1 t1, tabl2 t2", "SELECT t1.")
assert sorted_dicts(suggestions) == sorted_dicts( assert sorted_dicts(suggestions) == sorted_dicts([
[ {"type": "table", "schema": "t1"},
{"type": "table", "schema": "t1"}, {"type": "view", "schema": "t1"},
{"type": "view", "schema": "t1"}, {"type": "column", "tables": [(None, "tabl1", "t1")]},
{"type": "column", "tables": [(None, "tabl1", "t1")]}, {"type": "function", "schema": "t1"},
{"type": "function", "schema": "t1"}, ])
]
)
def test_dot_col_comma_suggests_cols_or_schema_qualified_table(): def test_dot_col_comma_suggests_cols_or_schema_qualified_table():
suggestions = suggest_type("SELECT t1.a, t2. FROM tabl1 t1, tabl2 t2", "SELECT t1.a, t2.") suggestions = suggest_type("SELECT t1.a, t2. FROM tabl1 t1, tabl2 t2", "SELECT t1.a, t2.")
assert sorted_dicts(suggestions) == sorted_dicts( assert sorted_dicts(suggestions) == sorted_dicts([
[ {"type": "column", "tables": [(None, "tabl2", "t2")]},
{"type": "column", "tables": [(None, "tabl2", "t2")]}, {"type": "table", "schema": "t2"},
{"type": "table", "schema": "t2"}, {"type": "view", "schema": "t2"},
{"type": "view", "schema": "t2"}, {"type": "function", "schema": "t2"},
{"type": "function", "schema": "t2"}, ])
]
)
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -306,34 +284,31 @@ def test_sub_select_table_name_completion(expression):
def test_sub_select_col_name_completion(): def test_sub_select_col_name_completion():
suggestions = suggest_type("SELECT * FROM (SELECT FROM abc", "SELECT * FROM (SELECT ") suggestions = suggest_type("SELECT * FROM (SELECT FROM abc", "SELECT * FROM (SELECT ")
assert sorted_dicts(suggestions) == sorted_dicts( assert sorted_dicts(suggestions) == sorted_dicts([
[ {"type": "alias", "aliases": ["abc"]},
{"type": "alias", "aliases": ["abc"]}, {"type": "column", "tables": [(None, "abc", None)]},
{"type": "column", "tables": [(None, "abc", None)]}, {"type": "function", "schema": []},
{"type": "function", "schema": []}, {"type": "keyword"},
{"type": "keyword"}, ])
]
)
@pytest.mark.xfail @pytest.mark.xfail
def test_sub_select_multiple_col_name_completion(): def test_sub_select_multiple_col_name_completion():
suggestions = suggest_type("SELECT * FROM (SELECT a, FROM abc", "SELECT * FROM (SELECT a, ") suggestions = suggest_type("SELECT * FROM (SELECT a, FROM abc", "SELECT * FROM (SELECT a, ")
assert sorted_dicts(suggestions) == sorted_dicts( assert sorted_dicts(suggestions) == sorted_dicts([
[{"type": "column", "tables": [(None, "abc", None)]}, {"type": "function", "schema": []}] {"type": "column", "tables": [(None, "abc", None)]},
) {"type": "function", "schema": []},
])
def test_sub_select_dot_col_name_completion(): def test_sub_select_dot_col_name_completion():
suggestions = suggest_type("SELECT * FROM (SELECT t. FROM tabl t", "SELECT * FROM (SELECT t.") suggestions = suggest_type("SELECT * FROM (SELECT t. FROM tabl t", "SELECT * FROM (SELECT t.")
assert sorted_dicts(suggestions) == sorted_dicts( assert sorted_dicts(suggestions) == sorted_dicts([
[ {"type": "column", "tables": [(None, "tabl", "t")]},
{"type": "column", "tables": [(None, "tabl", "t")]}, {"type": "table", "schema": "t"},
{"type": "table", "schema": "t"}, {"type": "view", "schema": "t"},
{"type": "view", "schema": "t"}, {"type": "function", "schema": "t"},
{"type": "function", "schema": "t"}, ])
]
)
@pytest.mark.parametrize("join_type", ["", "INNER", "LEFT", "RIGHT OUTER"]) @pytest.mark.parametrize("join_type", ["", "INNER", "LEFT", "RIGHT OUTER"])
@ -353,14 +328,12 @@ def test_join_suggests_tables_and_schemas(tbl_alias, join_type):
) )
def test_join_alias_dot_suggests_cols1(sql): def test_join_alias_dot_suggests_cols1(sql):
suggestions = suggest_type(sql, sql) suggestions = suggest_type(sql, sql)
assert sorted_dicts(suggestions) == sorted_dicts( assert sorted_dicts(suggestions) == sorted_dicts([
[ {"type": "column", "tables": [(None, "abc", "a")]},
{"type": "column", "tables": [(None, "abc", "a")]}, {"type": "table", "schema": "a"},
{"type": "table", "schema": "a"}, {"type": "view", "schema": "a"},
{"type": "view", "schema": "a"}, {"type": "function", "schema": "a"},
{"type": "function", "schema": "a"}, ])
]
)
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -372,14 +345,12 @@ def test_join_alias_dot_suggests_cols1(sql):
) )
def test_join_alias_dot_suggests_cols2(sql): def test_join_alias_dot_suggests_cols2(sql):
suggestions = suggest_type(sql, sql) suggestions = suggest_type(sql, sql)
assert sorted_dicts(suggestions) == sorted_dicts( assert sorted_dicts(suggestions) == sorted_dicts([
[ {"type": "column", "tables": [(None, "def", "d")]},
{"type": "column", "tables": [(None, "def", "d")]}, {"type": "table", "schema": "d"},
{"type": "table", "schema": "d"}, {"type": "view", "schema": "d"},
{"type": "view", "schema": "d"}, {"type": "function", "schema": "d"},
{"type": "function", "schema": "d"}, ])
]
)
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -445,14 +416,12 @@ def test_join_using_suggests_common_columns(col_list):
) )
def test_two_join_alias_dot_suggests_cols1(sql): def test_two_join_alias_dot_suggests_cols1(sql):
suggestions = suggest_type(sql, sql) suggestions = suggest_type(sql, sql)
assert sorted_dicts(suggestions) == sorted_dicts( assert sorted_dicts(suggestions) == sorted_dicts([
[ {"type": "column", "tables": [(None, "ghi", "g")]},
{"type": "column", "tables": [(None, "ghi", "g")]}, {"type": "table", "schema": "g"},
{"type": "table", "schema": "g"}, {"type": "view", "schema": "g"},
{"type": "view", "schema": "g"}, {"type": "function", "schema": "g"},
{"type": "function", "schema": "g"}, ])
]
)
def test_2_statements_2nd_current(): def test_2_statements_2nd_current():
@ -460,14 +429,12 @@ def test_2_statements_2nd_current():
assert sorted_dicts(suggestions) == sorted_dicts([{"type": "table", "schema": []}, {"type": "view", "schema": []}, {"type": "schema"}]) assert sorted_dicts(suggestions) == sorted_dicts([{"type": "table", "schema": []}, {"type": "view", "schema": []}, {"type": "schema"}])
suggestions = suggest_type("select * from a; select from b", "select * from a; select ") suggestions = suggest_type("select * from a; select from b", "select * from a; select ")
assert sorted_dicts(suggestions) == sorted_dicts( assert sorted_dicts(suggestions) == sorted_dicts([
[ {"type": "alias", "aliases": ["b"]},
{"type": "alias", "aliases": ["b"]}, {"type": "column", "tables": [(None, "b", None)]},
{"type": "column", "tables": [(None, "b", None)]}, {"type": "function", "schema": []},
{"type": "function", "schema": []}, {"type": "keyword"},
{"type": "keyword"}, ])
]
)
# Should work even if first statement is invalid # Should work even if first statement is invalid
suggestions = suggest_type("select * from; select * from ", "select * from; select * from ") suggestions = suggest_type("select * from; select * from ", "select * from; select * from ")
@ -479,14 +446,12 @@ def test_2_statements_1st_current():
assert sorted_dicts(suggestions) == sorted_dicts([{"type": "table", "schema": []}, {"type": "view", "schema": []}, {"type": "schema"}]) assert sorted_dicts(suggestions) == sorted_dicts([{"type": "table", "schema": []}, {"type": "view", "schema": []}, {"type": "schema"}])
suggestions = suggest_type("select from a; select * from b", "select ") suggestions = suggest_type("select from a; select * from b", "select ")
assert sorted_dicts(suggestions) == sorted_dicts( assert sorted_dicts(suggestions) == sorted_dicts([
[ {"type": "alias", "aliases": ["a"]},
{"type": "alias", "aliases": ["a"]}, {"type": "column", "tables": [(None, "a", None)]},
{"type": "column", "tables": [(None, "a", None)]}, {"type": "function", "schema": []},
{"type": "function", "schema": []}, {"type": "keyword"},
{"type": "keyword"}, ])
]
)
def test_3_statements_2nd_current(): def test_3_statements_2nd_current():
@ -494,14 +459,12 @@ def test_3_statements_2nd_current():
assert sorted_dicts(suggestions) == sorted_dicts([{"type": "table", "schema": []}, {"type": "view", "schema": []}, {"type": "schema"}]) assert sorted_dicts(suggestions) == sorted_dicts([{"type": "table", "schema": []}, {"type": "view", "schema": []}, {"type": "schema"}])
suggestions = suggest_type("select * from a; select from b; select * from c", "select * from a; select ") suggestions = suggest_type("select * from a; select from b; select * from c", "select * from a; select ")
assert sorted_dicts(suggestions) == sorted_dicts( assert sorted_dicts(suggestions) == sorted_dicts([
[ {"type": "alias", "aliases": ["b"]},
{"type": "alias", "aliases": ["b"]}, {"type": "column", "tables": [(None, "b", None)]},
{"type": "column", "tables": [(None, "b", None)]}, {"type": "function", "schema": []},
{"type": "function", "schema": []}, {"type": "keyword"},
{"type": "keyword"}, ])
]
)
def test_create_db_with_template(): def test_create_db_with_template():

View file

@ -93,7 +93,7 @@ def test_batch_mode(executor):
run(executor, """create table test(a text)""") run(executor, """create table test(a text)""")
run(executor, """insert into test values('abc'), ('def'), ('ghi')""") run(executor, """insert into test values('abc'), ('def'), ('ghi')""")
sql = "select count(*) from test;\n" "select * from test limit 1;" sql = "select count(*) from test;\nselect * from test limit 1;"
runner = CliRunner() runner = CliRunner()
result = runner.invoke(cli, args=CLI_ARGS, input=sql) result = runner.invoke(cli, args=CLI_ARGS, input=sql)
@ -107,7 +107,7 @@ def test_batch_mode_table(executor):
run(executor, """create table test(a text)""") run(executor, """create table test(a text)""")
run(executor, """insert into test values('abc'), ('def'), ('ghi')""") run(executor, """insert into test values('abc'), ('def'), ('ghi')""")
sql = "select count(*) from test;\n" "select * from test limit 1;" sql = "select count(*) from test;\nselect * from test limit 1;"
runner = CliRunner() runner = CliRunner()
result = runner.invoke(cli, args=CLI_ARGS + ["-t"], input=sql) result = runner.invoke(cli, args=CLI_ARGS + ["-t"], input=sql)
@ -543,7 +543,7 @@ def test_init_command_arg(executor):
@dbtest @dbtest
def test_init_command_multiple_arg(executor): def test_init_command_multiple_arg(executor):
init_command = "set sql_select_limit=2000; set max_join_size=20000" init_command = "set sql_select_limit=2000; set max_join_size=20000"
sql = 'show variables like "sql_select_limit";\n' 'show variables like "max_join_size"' sql = 'show variables like "sql_select_limit";\nshow variables like "max_join_size"'
runner = CliRunner() runner = CliRunner()
result = runner.invoke(cli, args=CLI_ARGS + ["--init-command", init_command], input=sql) result = runner.invoke(cli, args=CLI_ARGS + ["--init-command", init_command], input=sql)
@ -554,6 +554,7 @@ def test_init_command_multiple_arg(executor):
assert expected_sql_select_limit in result.output assert expected_sql_select_limit in result.output
assert expected_max_join_size in result.output assert expected_max_join_size in result.output
@dbtest @dbtest
def test_global_init_commands(executor): def test_global_init_commands(executor):
"""Tests that global init-commands from config are executed by default.""" """Tests that global init-commands from config are executed by default."""

View file

@ -122,24 +122,24 @@ def test_query_starts_with_comment():
def test_queries_start_with(): def test_queries_start_with():
sql = "# comment\n" "show databases;" "use foo;" sql = "# comment\nshow databases;use foo;"
assert queries_start_with(sql, ("show", "select")) is True assert queries_start_with(sql, ("show", "select")) is True
assert queries_start_with(sql, ("use", "drop")) is True assert queries_start_with(sql, ("use", "drop")) is True
assert queries_start_with(sql, ("delete", "update")) is False assert queries_start_with(sql, ("delete", "update")) is False
def test_is_destructive(): def test_is_destructive():
sql = "use test;\n" "show databases;\n" "drop database foo;" sql = "use test;\nshow databases;\ndrop database foo;"
assert is_destructive(sql) is True assert is_destructive(sql) is True
def test_is_destructive_update_with_where_clause(): def test_is_destructive_update_with_where_clause():
sql = "use test;\n" "show databases;\n" "UPDATE test SET x = 1 WHERE id = 1;" sql = "use test;\nshow databases;\nUPDATE test SET x = 1 WHERE id = 1;"
assert is_destructive(sql) is False assert is_destructive(sql) is False
def test_is_destructive_update_without_where_clause(): def test_is_destructive_update_without_where_clause():
sql = "use test;\n" "show databases;\n" "UPDATE test SET x = 1;" sql = "use test;\nshow databases;\nUPDATE test SET x = 1;"
assert is_destructive(sql) is True assert is_destructive(sql) is True
@ -167,7 +167,7 @@ def test_query_has_where_clause(sql, has_where_clause):
("drop database foo; create database bar", "foo", True), ("drop database foo; create database bar", "foo", True),
("select bar from foo; drop database bazz", "foo", False), ("select bar from foo; drop database bazz", "foo", False),
("select bar from foo; drop database bazz", "bazz", True), ("select bar from foo; drop database bazz", "bazz", True),
("-- dropping database \n " "drop -- really dropping \n " "schema abc -- now it is dropped", "abc", True), ("-- dropping database \n drop -- really dropping \n schema abc -- now it is dropped", "abc", True),
], ],
) )
def test_is_dropping_database(sql, dbname, is_dropping): def test_is_dropping_database(sql, dbname, is_dropping):

View file

@ -72,33 +72,29 @@ def test_table_completion(completer, complete_event):
text = "SELECT * FROM " text = "SELECT * FROM "
position = len(text) position = len(text)
result = completer.get_completions(Document(text=text, cursor_position=position), complete_event) result = completer.get_completions(Document(text=text, cursor_position=position), complete_event)
assert list(result) == list( assert list(result) == list([
[ Completion(text="users", start_position=0),
Completion(text="users", start_position=0), Completion(text="orders", start_position=0),
Completion(text="orders", start_position=0), Completion(text="`select`", start_position=0),
Completion(text="`select`", start_position=0), Completion(text="`réveillé`", start_position=0),
Completion(text="`réveillé`", start_position=0), ])
]
)
def test_function_name_completion(completer, complete_event): def test_function_name_completion(completer, complete_event):
text = "SELECT MA" text = "SELECT MA"
position = len("SELECT MA") position = len("SELECT MA")
result = completer.get_completions(Document(text=text, cursor_position=position), complete_event) result = completer.get_completions(Document(text=text, cursor_position=position), complete_event)
assert list(result) == list( assert list(result) == list([
[ Completion(text="MAX", start_position=-2),
Completion(text="MAX", start_position=-2), Completion(text="CHANGE MASTER TO", start_position=-2),
Completion(text="CHANGE MASTER TO", start_position=-2), Completion(text="CURRENT_TIMESTAMP", start_position=-2),
Completion(text="CURRENT_TIMESTAMP", start_position=-2), Completion(text="DECIMAL", start_position=-2),
Completion(text="DECIMAL", start_position=-2), Completion(text="FORMAT", start_position=-2),
Completion(text="FORMAT", start_position=-2), Completion(text="MASTER", start_position=-2),
Completion(text="MASTER", start_position=-2), Completion(text="PRIMARY", start_position=-2),
Completion(text="PRIMARY", start_position=-2), Completion(text="ROW_FORMAT", start_position=-2),
Completion(text="ROW_FORMAT", start_position=-2), Completion(text="SMALLINT", start_position=-2),
Completion(text="SMALLINT", start_position=-2), ])
]
)
def test_suggested_column_names(completer, complete_event): def test_suggested_column_names(completer, complete_event):
@ -138,15 +134,13 @@ def test_suggested_column_names_in_function(completer, complete_event):
text = "SELECT MAX( from users" text = "SELECT MAX( from users"
position = len("SELECT MAX(") position = len("SELECT MAX(")
result = completer.get_completions(Document(text=text, cursor_position=position), complete_event) result = completer.get_completions(Document(text=text, cursor_position=position), complete_event)
assert list(result) == list( assert list(result) == list([
[ Completion(text="*", start_position=0),
Completion(text="*", start_position=0), Completion(text="id", start_position=0),
Completion(text="id", start_position=0), Completion(text="email", start_position=0),
Completion(text="email", start_position=0), Completion(text="first_name", start_position=0),
Completion(text="first_name", start_position=0), Completion(text="last_name", start_position=0),
Completion(text="last_name", start_position=0), ])
]
)
def test_suggested_column_names_with_table_dot(completer, complete_event): def test_suggested_column_names_with_table_dot(completer, complete_event):
@ -160,15 +154,13 @@ def test_suggested_column_names_with_table_dot(completer, complete_event):
text = "SELECT users. from users" text = "SELECT users. from users"
position = len("SELECT users.") position = len("SELECT users.")
result = list(completer.get_completions(Document(text=text, cursor_position=position), complete_event)) result = list(completer.get_completions(Document(text=text, cursor_position=position), complete_event))
assert result == list( assert result == list([
[ Completion(text="*", start_position=0),
Completion(text="*", start_position=0), Completion(text="id", start_position=0),
Completion(text="id", start_position=0), Completion(text="email", start_position=0),
Completion(text="email", start_position=0), Completion(text="first_name", start_position=0),
Completion(text="first_name", start_position=0), Completion(text="last_name", start_position=0),
Completion(text="last_name", start_position=0), ])
]
)
def test_suggested_column_names_with_alias(completer, complete_event): def test_suggested_column_names_with_alias(completer, complete_event):
@ -182,15 +174,13 @@ def test_suggested_column_names_with_alias(completer, complete_event):
text = "SELECT u. from users u" text = "SELECT u. from users u"
position = len("SELECT u.") position = len("SELECT u.")
result = list(completer.get_completions(Document(text=text, cursor_position=position), complete_event)) result = list(completer.get_completions(Document(text=text, cursor_position=position), complete_event))
assert result == list( assert result == list([
[ Completion(text="*", start_position=0),
Completion(text="*", start_position=0), Completion(text="id", start_position=0),
Completion(text="id", start_position=0), Completion(text="email", start_position=0),
Completion(text="email", start_position=0), Completion(text="first_name", start_position=0),
Completion(text="first_name", start_position=0), Completion(text="last_name", start_position=0),
Completion(text="last_name", start_position=0), ])
]
)
def test_suggested_multiple_column_names(completer, complete_event): def test_suggested_multiple_column_names(completer, complete_event):
@ -231,15 +221,13 @@ def test_suggested_multiple_column_names_with_alias(completer, complete_event):
text = "SELECT u.id, u. from users u" text = "SELECT u.id, u. from users u"
position = len("SELECT u.id, u.") position = len("SELECT u.id, u.")
result = list(completer.get_completions(Document(text=text, cursor_position=position), complete_event)) result = list(completer.get_completions(Document(text=text, cursor_position=position), complete_event))
assert result == list( assert result == list([
[ Completion(text="*", start_position=0),
Completion(text="*", start_position=0), Completion(text="id", start_position=0),
Completion(text="id", start_position=0), Completion(text="email", start_position=0),
Completion(text="email", start_position=0), Completion(text="first_name", start_position=0),
Completion(text="first_name", start_position=0), Completion(text="last_name", start_position=0),
Completion(text="last_name", start_position=0), ])
]
)
def test_suggested_multiple_column_names_with_dot(completer, complete_event): def test_suggested_multiple_column_names_with_dot(completer, complete_event):
@ -254,77 +242,65 @@ def test_suggested_multiple_column_names_with_dot(completer, complete_event):
text = "SELECT users.id, users. from users u" text = "SELECT users.id, users. from users u"
position = len("SELECT users.id, users.") position = len("SELECT users.id, users.")
result = list(completer.get_completions(Document(text=text, cursor_position=position), complete_event)) result = list(completer.get_completions(Document(text=text, cursor_position=position), complete_event))
assert result == list( assert result == list([
[ Completion(text="*", start_position=0),
Completion(text="*", start_position=0), Completion(text="id", start_position=0),
Completion(text="id", start_position=0), Completion(text="email", start_position=0),
Completion(text="email", start_position=0), Completion(text="first_name", start_position=0),
Completion(text="first_name", start_position=0), Completion(text="last_name", start_position=0),
Completion(text="last_name", start_position=0), ])
]
)
def test_suggested_aliases_after_on(completer, complete_event): def test_suggested_aliases_after_on(completer, complete_event):
text = "SELECT u.name, o.id FROM users u JOIN orders o ON " text = "SELECT u.name, o.id FROM users u JOIN orders o ON "
position = len("SELECT u.name, o.id FROM users u JOIN orders o ON ") position = len("SELECT u.name, o.id FROM users u JOIN orders o ON ")
result = list(completer.get_completions(Document(text=text, cursor_position=position), complete_event)) result = list(completer.get_completions(Document(text=text, cursor_position=position), complete_event))
assert result == list( assert result == list([
[ Completion(text="u", start_position=0),
Completion(text="u", start_position=0), Completion(text="o", start_position=0),
Completion(text="o", start_position=0), ])
]
)
def test_suggested_aliases_after_on_right_side(completer, complete_event): def test_suggested_aliases_after_on_right_side(completer, complete_event):
text = "SELECT u.name, o.id FROM users u JOIN orders o ON o.user_id = " text = "SELECT u.name, o.id FROM users u JOIN orders o ON o.user_id = "
position = len("SELECT u.name, o.id FROM users u JOIN orders o ON o.user_id = ") position = len("SELECT u.name, o.id FROM users u JOIN orders o ON o.user_id = ")
result = list(completer.get_completions(Document(text=text, cursor_position=position), complete_event)) result = list(completer.get_completions(Document(text=text, cursor_position=position), complete_event))
assert result == list( assert result == list([
[ Completion(text="u", start_position=0),
Completion(text="u", start_position=0), Completion(text="o", start_position=0),
Completion(text="o", start_position=0), ])
]
)
def test_suggested_tables_after_on(completer, complete_event): def test_suggested_tables_after_on(completer, complete_event):
text = "SELECT users.name, orders.id FROM users JOIN orders ON " text = "SELECT users.name, orders.id FROM users JOIN orders ON "
position = len("SELECT users.name, orders.id FROM users JOIN orders ON ") position = len("SELECT users.name, orders.id FROM users JOIN orders ON ")
result = list(completer.get_completions(Document(text=text, cursor_position=position), complete_event)) result = list(completer.get_completions(Document(text=text, cursor_position=position), complete_event))
assert result == list( assert result == list([
[ Completion(text="users", start_position=0),
Completion(text="users", start_position=0), Completion(text="orders", start_position=0),
Completion(text="orders", start_position=0), ])
]
)
def test_suggested_tables_after_on_right_side(completer, complete_event): def test_suggested_tables_after_on_right_side(completer, complete_event):
text = "SELECT users.name, orders.id FROM users JOIN orders ON orders.user_id = " text = "SELECT users.name, orders.id FROM users JOIN orders ON orders.user_id = "
position = len("SELECT users.name, orders.id FROM users JOIN orders ON orders.user_id = ") position = len("SELECT users.name, orders.id FROM users JOIN orders ON orders.user_id = ")
result = list(completer.get_completions(Document(text=text, cursor_position=position), complete_event)) result = list(completer.get_completions(Document(text=text, cursor_position=position), complete_event))
assert result == list( assert result == list([
[ Completion(text="users", start_position=0),
Completion(text="users", start_position=0), Completion(text="orders", start_position=0),
Completion(text="orders", start_position=0), ])
]
)
def test_table_names_after_from(completer, complete_event): def test_table_names_after_from(completer, complete_event):
text = "SELECT * FROM " text = "SELECT * FROM "
position = len("SELECT * FROM ") position = len("SELECT * FROM ")
result = list(completer.get_completions(Document(text=text, cursor_position=position), complete_event)) result = list(completer.get_completions(Document(text=text, cursor_position=position), complete_event))
assert result == list( assert result == list([
[ Completion(text="users", start_position=0),
Completion(text="users", start_position=0), Completion(text="orders", start_position=0),
Completion(text="orders", start_position=0), Completion(text="`select`", start_position=0),
Completion(text="`select`", start_position=0), Completion(text="`réveillé`", start_position=0),
Completion(text="`réveillé`", start_position=0), ])
]
)
def test_auto_escaped_col_names(completer, complete_event): def test_auto_escaped_col_names(completer, complete_event):

View file

@ -44,7 +44,7 @@ def test_bools(executor):
@dbtest @dbtest
def test_binary(executor): def test_binary(executor):
run(executor, """create table bt(geom linestring NOT NULL)""") run(executor, """create table bt(geom linestring NOT NULL)""")
run(executor, "INSERT INTO bt VALUES " "(ST_GeomFromText('LINESTRING(116.37604 39.73979,116.375 39.73965)'));") run(executor, "INSERT INTO bt VALUES (ST_GeomFromText('LINESTRING(116.37604 39.73979,116.375 39.73965)'));")
results = run(executor, """select * from bt""") results = run(executor, """select * from bt""")
geom = ( geom = (
@ -139,7 +139,7 @@ def test_favorite_query_multiple_statement(executor):
run(executor, "insert into test values('abc')") run(executor, "insert into test values('abc')")
run(executor, "insert into test values('def')") run(executor, "insert into test values('def')")
results = run(executor, "\\fs test-ad select * from test where a like 'a%'; " "select * from test where a like 'd%'") results = run(executor, "\\fs test-ad select * from test where a like 'a%'; select * from test where a like 'd%'")
assert_result_equal(results, status="Saved.") assert_result_equal(results, status="Saved.")
results = run(executor, "\\f test-ad") results = run(executor, "\\f test-ad")