1
0
Fork 0

Merging upstream version 1.12.3.

Signed-off-by: Daniel Baumann <daniel@debian.org>
This commit is contained in:
Daniel Baumann 2025-02-09 17:55:14 +01:00
parent 7565ea4dc1
commit 47976d33df
Signed by: daniel
GPG key ID: FBB4F0E80A80222F
38 changed files with 323 additions and 677 deletions

View file

@ -1,3 +0,0 @@
[run]
parallel = True
source = litecli

View file

@ -4,6 +4,7 @@ on:
pull_request:
paths-ignore:
- '**.md'
- 'AUTHORS'
jobs:
build:
@ -14,31 +15,21 @@ jobs:
python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"]
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3
- uses: astral-sh/setup-uv@v1
with:
version: "latest"
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Install requirements
run: |
python -m pip install -U pip setuptools
pip install --no-cache-dir -e .
pip install -r requirements-dev.txt -U --upgrade-strategy=only-if-needed
- name: Install dependencies
run: uv sync --all-extras -p ${{ matrix.python-version }}
- name: Run unit tests
env:
PYTEST_PASSWORD: root
run: |
./setup.py test --pytest-args="--cov-report= --cov=litecli"
run: uv run tox -e py${{ matrix.python-version }}
- name: Run Black
run: |
./setup.py lint
if: matrix.python-version == '3.7'
- name: Coverage
run: |
coverage report
codecov
- name: Run Style Checks
run: uv run tox -e style

80
.github/workflows/publish.yml vendored Normal file
View file

@ -0,0 +1,80 @@
name: Publish Python Package
on:
release:
types: [created]
permissions:
contents: read
jobs:
test:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"]
steps:
- uses: actions/checkout@v4
- uses: astral-sh/setup-uv@v1
with:
version: "latest"
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: uv sync --all-extras -p ${{ matrix.python-version }}
- name: Run unit tests
run: uv run tox -e py${{ matrix.python-version }}
- name: Run Style Checks
run: uv run tox -e style
build:
runs-on: ubuntu-latest
needs: [test]
steps:
- uses: actions/checkout@v4
- uses: astral-sh/setup-uv@v1
with:
version: "latest"
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.12'
- name: Install dependencies
run: uv sync --all-extras -p 3.12
- name: Build
run: uv build
- name: Store the distribution packages
uses: actions/upload-artifact@v4
with:
name: python-packages
path: dist/
publish:
name: Publish to PyPI
runs-on: ubuntu-latest
if: startsWith(github.ref, 'refs/tags/')
needs: [build]
environment: release
permissions:
id-token: write
steps:
- name: Download distribution packages
uses: actions/download-artifact@v4
with:
name: python-packages
path: dist/
- name: Publish to PyPI
uses: pypa/gh-action-pypi-publish@release/v1

View file

@ -1,5 +1,10 @@
repos:
- repo: https://github.com/psf/black
rev: 23.11.0
- repo: https://github.com/astral-sh/ruff-pre-commit
# Ruff version.
rev: v0.6.4
hooks:
- id: black
# Run the linter.
- id: ruff
args: [ --fix ]
# Run the formatter.
- id: ruff-format

View file

@ -1,3 +1,39 @@
## Upcoming - TBD
### Features
### Bug Fixes
### Internal Changes
## 1.12.3 - 2024-09-10
### Bug Fixes
* Specify build system in `pyproject.toml`
* Don't install tests
## 1.12.2 - 2024-09-07
### Bug Fixes
* Fix the missing packages due to invalid pyproject.toml config
## 1.12.1 - 2024-09-07 (Yanked)
### Internal Changes
* Modernize the project with following changes:
* pyproject.toml instead of setup.py
* Use ruff for linting and formatting
* Update GH actions to use uv and tox
* Use GH actions to release a new version
## 1.11.1 - 2024-07-04
### Bug Fixes

View file

@ -1,6 +1,6 @@
# Development Guide
This is a guide for developers who would like to contribute to this project. It is recommended to use Python 3.7 and above for development.
This is a guide for developers who would like to contribute to this project. It is recommended to use Python 3.10 and above for development.
If you're interested in contributing to litecli, thank you. We'd love your help!
You'll always get credit for your work.
@ -24,8 +24,7 @@ You'll always get credit for your work.
```bash
$ cd litecli
$ pip install virtualenv
$ virtualenv litecli_dev
$ python -m venv .venv
```
We've just created a virtual environment that we'll use to install all the dependencies
@ -33,7 +32,7 @@ You'll always get credit for your work.
need to activate the virtual environment:
```bash
$ source litecli_dev/bin/activate
$ source .venv/bin/activate
```
When you're done working, you can deactivate the virtual environment:
@ -45,8 +44,7 @@ You'll always get credit for your work.
5. Install the dependencies and development tools:
```bash
$ pip install -r requirements-dev.txt
$ pip install --editable .
$ pip install --editable .[dev]
```
6. Create a branch for your bugfix or feature based off the `main` branch:
@ -75,18 +73,10 @@ You'll always get credit for your work.
While you work on litecli, it's important to run the tests to make sure your code
hasn't broken any existing functionality. To run the tests, just type in:
```bash
$ ./setup.py test
```
litecli supports Python 3.7+. You can test against multiple versions of
Python by running tox:
```bash
$ tox
```
### CLI Tests
Some CLI tests expect the program `ex` to be a symbolic link to `vim`.
@ -102,18 +92,12 @@ $ readlink -f $(which ex)
## Coding Style
litecli uses [black](https://github.com/ambv/black) to format the source code. Make sure to install black.
Litecli uses [ruff](https://docs.astral.sh/ruff/) to format the source code.
It's easy to check the style of your code, just run:
To check the style and fix any violations, run:
```bash
$ ./setup.py lint
```
If you see any style issues, you can automatically fix them by running:
```bash
$ ./setup.py lint --fix
$ tox -e style
```
Be sure to commit and push any stylistic fixes.

View file

@ -1 +1 @@
__version__ = "1.11.1"
__version__ = "1.12.3"

View file

@ -56,7 +56,7 @@ def parse_pygments_style(token_name, style_object, style_dict):
try:
other_token_type = string_to_tokentype(style_dict[token_name])
return token_type, style_object.styles[other_token_type]
except AttributeError as err:
except AttributeError:
return token_type, style_dict[token_name]
@ -85,9 +85,7 @@ def style_factory(name, cli_style):
prompt_styles.append((token, cli_style[token]))
override_style = Style([("bottom-toolbar", "noreverse")])
return merge_styles(
[style_from_pygments_cls(style), override_style, Style(prompt_styles)]
)
return merge_styles([style_from_pygments_cls(style), override_style, Style(prompt_styles)])
def style_factory_output(name, cli_style):

View file

@ -15,23 +15,17 @@ def create_toolbar_tokens_func(cli, show_fish_help):
result.append(("class:bottom-toolbar", " "))
if cli.multi_line:
result.append(
("class:bottom-toolbar", " (Semi-colon [;] will end the line) ")
)
result.append(("class:bottom-toolbar", " (Semi-colon [;] will end the line) "))
if cli.multi_line:
result.append(("class:bottom-toolbar.on", "[F3] Multiline: ON "))
else:
result.append(("class:bottom-toolbar.off", "[F3] Multiline: OFF "))
if cli.prompt_app.editing_mode == EditingMode.VI:
result.append(
("class:botton-toolbar.on", "Vi-mode ({})".format(_get_vi_mode()))
)
result.append(("class:botton-toolbar.on", "Vi-mode ({})".format(_get_vi_mode())))
if show_fish_help():
result.append(
("class:bottom-toolbar", " Right-arrow to complete suggestion")
)
result.append(("class:bottom-toolbar", " Right-arrow to complete suggestion"))
if cli.completion_refresher.is_refreshing():
result.append(("class:bottom-toolbar", " Refreshing completions..."))

View file

@ -1,9 +0,0 @@
# -*- coding: utf-8 -*-
"""Platform and Python version compatibility support."""
import sys
PY2 = sys.version_info[0] == 2
PY3 = sys.version_info[0] == 3
WIN = sys.platform in ("win32", "cygwin")

View file

@ -1,38 +0,0 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from litecli.compat import PY2
if PY2:
binary_type = str
string_types = basestring
text_type = unicode
else:
binary_type = bytes
string_types = str
text_type = str
def unicode2utf8(arg):
"""Convert strings to UTF8-encoded bytes.
Only in Python 2. In Python 3 the args are expected as unicode.
"""
if PY2 and isinstance(arg, text_type):
return arg.encode("utf-8")
return arg
def utf8tounicode(arg):
"""Convert UTF8-encoded bytes to strings.
Only in Python 2. In Python 3 the errors are returned as strings.
"""
if PY2 and isinstance(arg, binary_type):
return arg.decode("utf-8")
return arg

View file

@ -20,7 +20,6 @@ import sqlparse
from prompt_toolkit.completion import DynamicCompleter
from prompt_toolkit.enums import DEFAULT_BUFFER, EditingMode
from prompt_toolkit.shortcuts import PromptSession, CompleteStyle
from prompt_toolkit.styles.pygments import style_from_pygments_cls
from prompt_toolkit.document import Document
from prompt_toolkit.filters import HasFocus, IsDone
from prompt_toolkit.formatted_text import ANSI
@ -43,7 +42,6 @@ from .clibuffer import cli_is_multiline
from .completion_refresher import CompletionRefresher
from .config import config_location, ensure_dir_exists, get_config
from .key_bindings import cli_bindings
from .encodingutils import utf8tounicode, text_type
from .lexer import LiteCliLexer
from .__init__ import __version__
from .packages.filepaths import dir_path_exists
@ -94,9 +92,7 @@ class LiteCli(object):
self.login_path_as_host = c["main"].as_bool("login_path_as_host")
# read from cli argument or user config file
self.auto_vertical_output = auto_vertical_output or c["main"].as_bool(
"auto_vertical_output"
)
self.auto_vertical_output = auto_vertical_output or c["main"].as_bool("auto_vertical_output")
# audit log
if self.logfile is None and "audit_log" in c["main"]:
@ -112,9 +108,7 @@ class LiteCli(object):
# Load startup commands.
try:
self.startup_commands = c["startup_commands"]
except (
KeyError
): # Redundant given the load_config() function that merges in the standard config, but put here to avoid fail if user do not have updated config file.
except KeyError: # Redundant given the load_config() function that merges in the standard config, but put here to avoid fail if user do not have updated config file.
self.startup_commands = None
self.completion_refresher = CompletionRefresher()
@ -123,9 +117,7 @@ class LiteCli(object):
self.initialize_logging()
prompt_cnf = self.read_my_cnf_files(["prompt"])["prompt"]
self.prompt_format = (
prompt or prompt_cnf or c["main"]["prompt"] or self.default_prompt
)
self.prompt_format = prompt or prompt_cnf or c["main"]["prompt"] or self.default_prompt
self.prompt_continuation_format = c["main"]["prompt_continuation"]
keyword_casing = c["main"].get("keyword_casing", "auto")
@ -270,10 +262,7 @@ class LiteCli(object):
)
return
formatter = logging.Formatter(
"%(asctime)s (%(process)d/%(threadName)s) "
"%(name)s %(levelname)s - %(message)s"
)
formatter = logging.Formatter("%(asctime)s (%(process)d/%(threadName)s) " "%(name)s %(levelname)s - %(message)s")
handler.setFormatter(formatter)
@ -371,8 +360,7 @@ class LiteCli(object):
else:
history = None
self.echo(
'Error: Unable to open the history file "{}". '
"Your query history will not be saved.".format(history_file),
'Error: Unable to open the history file "{}". ' "Your query history will not be saved.".format(history_file),
err=True,
fg="red",
)
@ -385,10 +373,7 @@ class LiteCli(object):
def get_message():
prompt = self.get_prompt(self.prompt_format)
if (
self.prompt_format == self.default_prompt
and len(prompt) > self.max_len_prompt
):
if self.prompt_format == self.default_prompt and len(prompt) > self.max_len_prompt:
prompt = self.get_prompt("\\d> ")
prompt = prompt.replace("\\x1b", "\x1b")
return ANSI(prompt)
@ -469,9 +454,7 @@ class LiteCli(object):
else:
max_width = None
formatted = self.format_output(
title, cur, headers, special.is_expanded_output(), max_width
)
formatted = self.format_output(title, cur, headers, special.is_expanded_output(), max_width)
t = time() - start
try:
@ -601,9 +584,7 @@ class LiteCli(object):
for title, cur, headers, status in res:
if title == "dot command not implemented":
self.echo(
"The SQLite dot command '"
+ command.split(" ", 1)[0]
+ "' is not yet implemented.",
"The SQLite dot command '" + command.split(" ", 1)[0] + "' is not yet implemented.",
fg="yellow",
)
else:
@ -619,9 +600,7 @@ class LiteCli(object):
try:
startup_commands()
except Exception as e:
self.echo(
"Could not execute all startup commands: \n" + str(e), fg="yellow"
)
self.echo("Could not execute all startup commands: \n" + str(e), fg="yellow")
try:
while True:
@ -635,7 +614,7 @@ class LiteCli(object):
def log_output(self, output):
"""Log the output in the audit log, if it's enabled."""
if self.logfile:
click.echo(utf8tounicode(output), file=self.logfile)
click.echo(output, file=self.logfile)
def echo(self, s, **kwargs):
"""Print a message to stdout.
@ -651,11 +630,7 @@ class LiteCli(object):
def get_output_margin(self, status=None):
"""Get the output margin (number of rows for the prompt, footer and
timing message."""
margin = (
self.get_reserved_space()
+ self.get_prompt(self.prompt_format).count("\n")
+ 2
)
margin = self.get_reserved_space() + self.get_prompt(self.prompt_format).count("\n") + 2
if status:
margin += 1 + status.count("\n")
@ -741,9 +716,7 @@ class LiteCli(object):
},
)
return [
(None, None, None, "Auto-completion refresh started in the background.")
]
return [(None, None, None, "Auto-completion refresh started in the background.")]
def _on_completions_refreshed(self, new_completer):
"""Swap the completer object in cli with the newly created completer."""
@ -757,9 +730,7 @@ class LiteCli(object):
def get_completions(self, text, cursor_positition):
with self._completer_lock:
return self.completer.get_completions(
Document(text=text, cursor_position=cursor_positition), None
)
return self.completer.get_completions(Document(text=text, cursor_position=cursor_positition), None)
def get_prompt(self, string):
self.logger.debug("Getting prompt")
@ -805,13 +776,7 @@ class LiteCli(object):
if cur:
column_types = None
if hasattr(cur, "description"):
def get_col_type(col):
# col_type = FIELD_TYPES.get(col[1], text_type)
# return col_type if type(col_type) is type else text_type
return text_type
column_types = [get_col_type(col) for col in cur.description]
column_types = [str(col) for col in cur.description]
if max_width is not None:
cur = list(cur)
@ -824,20 +789,14 @@ class LiteCli(object):
**output_kwargs,
)
if isinstance(formatted, (text_type)):
if isinstance(formatted, str):
formatted = formatted.splitlines()
formatted = iter(formatted)
first_line = next(formatted)
formatted = itertools.chain([first_line], formatted)
if (
not expanded
and max_width
and headers
and cur
and len(first_line) > max_width
):
if not expanded and max_width and headers and cur and len(first_line) > max_width:
formatted = self.formatter.format_output(
cur,
headers,
@ -845,7 +804,7 @@ class LiteCli(object):
column_types=column_types,
**output_kwargs,
)
if isinstance(formatted, (text_type)):
if isinstance(formatted, str):
formatted = iter(formatted.splitlines())
output = itertools.chain(output, formatted)
@ -890,13 +849,9 @@ class LiteCli(object):
is_flag=True,
help="Automatically switch to vertical output mode if the result is wider than the terminal width.",
)
@click.option(
"-t", "--table", is_flag=True, help="Display batch output in table format."
)
@click.option("-t", "--table", is_flag=True, help="Display batch output in table format.")
@click.option("--csv", is_flag=True, help="Display batch output in CSV format.")
@click.option(
"--warn/--no-warn", default=None, help="Warn before running a destructive query."
)
@click.option("--warn/--no-warn", default=None, help="Warn before running a destructive query.")
@click.option("-e", "--execute", type=str, help="Execute command and quit.")
@click.argument("database", default="", nargs=1)
def cli(
@ -964,10 +919,7 @@ def cli(
except (FileNotFoundError, OSError):
litecli.logger.warning("Unable to open TTY as stdin.")
if (
litecli.destructive_warning
and confirm_destructive_query(stdin_text) is False
):
if litecli.destructive_warning and confirm_destructive_query(stdin_text) is False:
exit(0)
try:
new_line = True

View file

@ -1,8 +1,6 @@
from __future__ import print_function
import sys
import sqlparse
from sqlparse.sql import Comparison, Identifier, Where
from litecli.encodingutils import string_types, text_type
from .parseutils import last_word, extract_tables, find_prev_keyword
from .special import parse_special_command
@ -52,7 +50,7 @@ def suggest_type(full_text, text_before_cursor):
stmt_start, stmt_end = 0, 0
for statement in parsed:
stmt_len = len(text_type(statement))
stmt_len = len(str(statement))
stmt_start, stmt_end = stmt_end, stmt_end + stmt_len
if stmt_end >= current_pos:
@ -83,9 +81,7 @@ def suggest_type(full_text, text_before_cursor):
last_token = statement and statement.token_prev(len(statement.tokens))[1] or ""
return suggest_based_on_last_token(
last_token, text_before_cursor, full_text, identifier
)
return suggest_based_on_last_token(last_token, text_before_cursor, full_text, identifier)
def suggest_special(text):
@ -142,7 +138,7 @@ def _expecting_arg_idx(arg, text):
def suggest_based_on_last_token(token, text_before_cursor, full_text, identifier):
if isinstance(token, string_types):
if isinstance(token, str):
token_v = token.lower()
elif isinstance(token, Comparison):
# If 'token' is a Comparison type such as
@ -158,13 +154,12 @@ def suggest_based_on_last_token(token, text_before_cursor, full_text, identifier
# 'where foo > 5 and '. We need to look "inside" token.tokens to handle
# suggestions in complicated where clauses correctly
prev_keyword, text_before_cursor = find_prev_keyword(text_before_cursor)
return suggest_based_on_last_token(
prev_keyword, text_before_cursor, full_text, identifier
)
return suggest_based_on_last_token(prev_keyword, text_before_cursor, full_text, identifier)
else:
token_v = token.value.lower()
is_operand = lambda x: x and any([x.endswith(op) for op in ["+", "-", "*", "/"]])
def is_operand(x):
return x and any([x.endswith(op) for op in ["+", "-", "*", "/"]])
if not token:
return [{"type": "keyword"}, {"type": "special"}]
@ -183,9 +178,7 @@ def suggest_based_on_last_token(token, text_before_cursor, full_text, identifier
# Suggest columns/functions AND keywords. (If we wanted to be
# really fancy, we could suggest only array-typed columns)
column_suggestions = suggest_based_on_last_token(
"where", text_before_cursor, full_text, identifier
)
column_suggestions = suggest_based_on_last_token("where", text_before_cursor, full_text, identifier)
# Check for a subquery expression (cases 3 & 4)
where = p.tokens[-1]
@ -256,8 +249,7 @@ def suggest_based_on_last_token(token, text_before_cursor, full_text, identifier
{"type": "keyword"},
]
elif (token_v.endswith("join") and token.is_keyword) or (
token_v
in ("copy", "from", "update", "into", "describe", "truncate", "desc", "explain")
token_v in ("copy", "from", "update", "into", "describe", "truncate", "desc", "explain")
):
schema = (identifier and identifier.get_parent_name()) or []
@ -318,9 +310,7 @@ def suggest_based_on_last_token(token, text_before_cursor, full_text, identifier
elif token_v.endswith(",") or is_operand(token_v) or token_v in ["=", "and", "or"]:
prev_keyword, text_before_cursor = find_prev_keyword(text_before_cursor)
if prev_keyword:
return suggest_based_on_last_token(
prev_keyword, text_before_cursor, full_text, identifier
)
return suggest_based_on_last_token(prev_keyword, text_before_cursor, full_text, identifier)
else:
return []
else:

View file

@ -2,7 +2,6 @@
from __future__ import unicode_literals
from litecli.encodingutils import text_type
import os
@ -64,10 +63,10 @@ def suggest_path(root_dir):
"""
if not root_dir:
return map(text_type, [os.path.abspath(os.sep), "~", os.curdir, os.pardir])
return map(str, [os.path.abspath(os.sep), "~", os.curdir, os.pardir])
if "~" in root_dir:
root_dir = text_type(os.path.expanduser(root_dir))
root_dir = str(os.path.expanduser(root_dir))
if not os.path.exists(root_dir):
root_dir, _ = os.path.split(root_dir)

View file

@ -96,17 +96,18 @@ def extract_from_part(parsed, stop_at_punctuation=True):
# Also 'SELECT * FROM abc JOIN def' will trigger this elif
# condition. So we need to ignore the keyword JOIN and its variants
# INNER JOIN, FULL OUTER JOIN, etc.
elif (
item.ttype is Keyword
and (not item.value.upper() == "FROM")
and (not item.value.upper().endswith("JOIN"))
):
elif item.ttype is Keyword and (not item.value.upper() == "FROM") and (not item.value.upper().endswith("JOIN")):
return
else:
yield item
elif (
item.ttype is Keyword or item.ttype is Keyword.DML
) and item.value.upper() in ("COPY", "FROM", "INTO", "UPDATE", "TABLE", "JOIN"):
elif (item.ttype is Keyword or item.ttype is Keyword.DML) and item.value.upper() in (
"COPY",
"FROM",
"INTO",
"UPDATE",
"TABLE",
"JOIN",
):
tbl_prefix_seen = True
# 'SELECT a, FROM abc' will detect FROM as part of the column list.
# So this check here is necessary.
@ -180,9 +181,7 @@ def find_prev_keyword(sql):
logical_operators = ("AND", "OR", "NOT", "BETWEEN")
for t in reversed(flattened):
if t.value == "(" or (
t.is_keyword and (t.value.upper() not in logical_operators)
):
if t.value == "(" or (t.is_keyword and (t.value.upper() not in logical_operators)):
# Find the location of token t in the original parsed statement
# We can't use parsed.token_index(t) because t may be a child token
# inside a TokenList, in which case token_index thows an error

View file

@ -16,9 +16,7 @@ def confirm_destructive_query(queries):
* False if the query is destructive and the user doesn't want to proceed.
"""
prompt_text = (
"You're about to run a destructive command.\n" "Do you want to proceed? (y/n)"
)
prompt_text = "You're about to run a destructive command.\n" "Do you want to proceed? (y/n)"
if is_destructive(queries) and sys.stdin.isatty():
return prompt(prompt_text, type=bool)

View file

@ -1,3 +1,5 @@
# ruff: noqa
__all__ = []

View file

@ -5,12 +5,10 @@ import os
import sys
import platform
import shlex
from sqlite3 import ProgrammingError
from litecli import __version__
from litecli.packages.special import iocommands
from litecli.packages.special.utils import format_uptime
from .main import special_command, RAW_QUERY, PARSED_QUERY, ArgumentMissing
from .main import special_command, RAW_QUERY, PARSED_QUERY
log = logging.getLogger(__name__)
@ -220,9 +218,7 @@ def describe(cur, arg, **_):
args = (arg,)
query = """
PRAGMA table_info({})
""".format(
arg
)
""".format(args)
else:
return list_tables(cur)
@ -275,8 +271,7 @@ def import_file(cur, arg=None, **_):
for i, row in enumerate(reader):
if len(row) != ncols:
print(
"%s:%d expected %d columns but found %d - ignored"
% (filename, i, ncols, len(row)),
"%s:%d expected %d columns but found %d - ignored" % (filename, i, ncols, len(row)),
file=sys.stderr,
)
nignored += 1

View file

@ -241,8 +241,7 @@ def subst_favorite_query_args(query, args):
else:
return [
None,
"Too many arguments.\nQuery does not have enough place holders to substitute.\n"
+ query,
"Too many arguments.\nQuery does not have enough place holders to substitute.\n" + query,
]
match = re.search(r"\?|\$\d+", query)
@ -393,9 +392,7 @@ def write_once(output):
once_file = open(**once_file_args)
except (IOError, OSError) as e:
once_file = None
raise OSError(
"Cannot write to file '{}': {}".format(e.filename, e.strerror)
)
raise OSError("Cannot write to file '{}': {}".format(e.filename, e.strerror))
click.echo(output, file=once_file, nl=False)
click.echo("\n", file=once_file, nl=False)
@ -451,9 +448,7 @@ def watch_query(arg, **kwargs):
elif destructive_prompt is True:
click.secho("Your call!")
cur = kwargs["cur"]
sql_list = [
(sql.rstrip(";"), "> {0!s}".format(sql)) for sql in sqlparse.split(statement)
]
sql_list = [(sql.rstrip(";"), "> {0!s}".format(sql)) for sql in sqlparse.split(statement)]
old_pager_enabled = is_pager_enabled()
while True:
if clear_screen:

View file

@ -82,9 +82,7 @@ def register_special_command(
aliases=(),
):
cmd = command.lower() if not case_sensitive else command
COMMANDS[cmd] = SpecialCommand(
handler, command, shortcut, description, arg_type, hidden, case_sensitive
)
COMMANDS[cmd] = SpecialCommand(handler, command, shortcut, description, arg_type, hidden, case_sensitive)
for alias in aliases:
cmd = alias.lower() if not case_sensitive else alias
COMMANDS[cmd] = SpecialCommand(
@ -123,9 +121,7 @@ def execute(cur, sql):
return special_cmd.handler(cur=cur, query=sql)
@special_command(
"help", "\\?", "Show this help.", arg_type=NO_QUERY, aliases=("\\?", "?")
)
@special_command("help", "\\?", "Show this help.", arg_type=NO_QUERY, aliases=("\\?", "?"))
def show_help(): # All the parameters are ignored.
headers = ["Command", "Shortcut", "Description"]
result = []

View file

@ -267,11 +267,7 @@ class SQLCompleter(Completer):
self.reset_completions()
def escape_name(self, name):
if name and (
(not self.name_pattern.match(name))
or (name.upper() in self.reserved_words)
or (name.upper() in self.functions)
):
if name and ((not self.name_pattern.match(name)) or (name.upper() in self.reserved_words) or (name.upper() in self.functions)):
name = "`%s`" % name
return name
@ -437,10 +433,7 @@ class SQLCompleter(Completer):
return kw.upper()
return kw.lower()
return (
Completion(z if casing is None else apply_case(z), -len(text))
for x, y, z in sorted(completions)
)
return (Completion(z if casing is None else apply_case(z), -len(text)) for x, y, z in sorted(completions))
def get_completions(self, document, complete_event):
word_before_cursor = document.get_word_before_cursor(WORD=True)
@ -458,11 +451,7 @@ class SQLCompleter(Completer):
# drop_unique is used for 'tb11 JOIN tbl2 USING (...'
# which should suggest only columns that appear in more than
# one table
scoped_cols = [
col
for (col, count) in Counter(scoped_cols).items()
if count > 1 and col != "*"
]
scoped_cols = [col for (col, count) in Counter(scoped_cols).items() if count > 1 and col != "*"]
cols = self.find_matches(word_before_cursor, scoped_cols)
completions.extend(cols)
@ -535,9 +524,7 @@ class SQLCompleter(Completer):
)
completions.extend(queries)
elif suggestion["type"] == "table_format":
formats = self.find_matches(
word_before_cursor, self.table_formats, start_only=True, fuzzy=False
)
formats = self.find_matches(word_before_cursor, self.table_formats, start_only=True, fuzzy=False)
completions.extend(formats)
elif suggestion["type"] == "file_name":
file_names = self.find_files(word_before_cursor)

View file

@ -112,9 +112,7 @@ class SQLExecute(object):
or sql.startswith("exit")
or sql.startswith("quit")
):
_logger.debug(
"Not connected to database. Will not run statement: %s.", sql
)
_logger.debug("Not connected to database. Will not run statement: %s.", sql)
raise OperationalError("Not connected to database.")
# yield ('Not connected to database', None, None, None)
# return

49
pyproject.toml Normal file
View file

@ -0,0 +1,49 @@
[project]
name = "litecli"
dynamic = ["version"]
description = "CLI for SQLite Databases with auto-completion and syntax highlighting."
readme = "README.md"
requires-python = ">=3.7"
license = {text = "BSD"}
authors = [
{name = "dbcli", email = "litecli-users@googlegroups.com"}
]
urls = { "homepage" = "https://github.com/dbcli/litecli" }
dependencies = [
"cli-helpers[styles]>=2.2.1",
"click>=4.1",
"configobj>=5.0.5",
"prompt-toolkit>=3.0.3,<4.0.0",
"pygments>=1.6",
"sqlparse>=0.4.4",
]
[build-system]
requires = ["setuptools >= 61.0"]
build-backend = "setuptools.build_meta"
[project.scripts]
litecli = "litecli.main:cli"
[project.optional-dependencies]
dev = [
"behave>=1.2.6",
"coverage>=7.2.7",
"pexpect>=4.9.0",
"pytest>=7.4.4",
"pytest-cov>=4.1.0",
"tox>=4.8.0",
"pdbpp>=0.10.3",
]
[tool.setuptools.packages.find]
exclude = ["screenshots", "tests*"]
[tool.setuptools.package-data]
litecli = ["liteclirc", "AUTHORS"]
[tool.setuptools.dynamic]
version = {attr = "litecli.__version__"}
[tool.ruff]
line-length = 140

View file

@ -46,9 +46,7 @@ def run_step(*args):
def version(version_file):
_version_re = re.compile(
r'__version__\s+=\s+(?P<quote>[\'"])(?P<version>.*)(?P=quote)'
)
_version_re = re.compile(r'__version__\s+=\s+(?P<quote>[\'"])(?P<version>.*)(?P=quote)')
with io.open(version_file, encoding="utf-8") as f:
ver = _version_re.search(f.read()).group("version")
@ -102,9 +100,7 @@ if __name__ == "__main__":
action="store_true",
dest="confirm_steps",
default=False,
help=(
"Confirm every step. If the step is not " "confirmed, it will be skipped."
),
help=("Confirm every step. If the step is not " "confirmed, it will be skipped."),
)
parser.add_option(
"-d",

View file

@ -1,10 +0,0 @@
mock
pytest>=3.6
pytest-cov
tox
behave
pexpect
coverage
codecov
click
black

View file

@ -1,18 +0,0 @@
[bdist_wheel]
universal = 1
[tool:pytest]
addopts = --capture=sys
--showlocals
--doctest-modules
--doctest-ignore-import-errors
--ignore=setup.py
--ignore=litecli/magic.py
--ignore=litecli/packages/parseutils.py
--ignore=test/features
[pep8]
rev = master
docformatter = True
diff = True
error-status = True

View file

@ -1,70 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import ast
from io import open
import re
from setuptools import setup, find_packages
_version_re = re.compile(r"__version__\s+=\s+(.*)")
with open("litecli/__init__.py", "rb") as f:
version = str(
ast.literal_eval(_version_re.search(f.read().decode("utf-8")).group(1))
)
def open_file(filename):
"""Open and read the file *filename*."""
with open(filename) as f:
return f.read()
readme = open_file("README.md")
install_requirements = [
"click >= 4.1",
"Pygments>=1.6",
"prompt_toolkit>=3.0.3,<4.0.0",
"sqlparse",
"configobj >= 5.0.5",
"cli_helpers[styles] >= 2.2.1",
]
setup(
name="litecli",
author="dbcli",
author_email="litecli-users@googlegroups.com",
license="BSD",
version=version,
url="https://github.com/dbcli/litecli",
packages=find_packages(),
package_data={"litecli": ["liteclirc", "AUTHORS"]},
description="CLI for SQLite Databases with auto-completion and syntax "
"highlighting.",
long_description=readme,
long_description_content_type="text/markdown",
install_requires=install_requirements,
# cmdclass={"test": test, "lint": lint},
entry_points={
"console_scripts": ["litecli = litecli.main:cli"],
"distutils.commands": ["lint = tasks:lint", "test = tasks:test"],
},
classifiers=[
"Intended Audience :: Developers",
"License :: OSI Approved :: BSD License",
"Operating System :: Unix",
"Programming Language :: Python",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: SQL",
"Topic :: Database",
"Topic :: Database :: Front-Ends",
"Topic :: Software Development",
"Topic :: Software Development :: Libraries :: Python Modules",
],
)

129
tasks.py
View file

@ -1,129 +0,0 @@
# -*- coding: utf-8 -*-
"""Common development tasks for setup.py to use."""
import re
import subprocess
import sys
from setuptools import Command
from setuptools.command.test import test as TestCommand
class BaseCommand(Command, object):
"""The base command for project tasks."""
user_options = []
default_cmd_options = ("verbose", "quiet", "dry_run")
def __init__(self, *args, **kwargs):
super(BaseCommand, self).__init__(*args, **kwargs)
self.verbose = False
def initialize_options(self):
"""Override the distutils abstract method."""
pass
def finalize_options(self):
"""Override the distutils abstract method."""
# Distutils uses incrementing integers for verbosity.
self.verbose = bool(self.verbose)
def call_and_exit(self, cmd, shell=True):
"""Run the *cmd* and exit with the proper exit code."""
sys.exit(subprocess.call(cmd, shell=shell))
def call_in_sequence(self, cmds, shell=True):
"""Run multiple commands in a row, exiting if one fails."""
for cmd in cmds:
if subprocess.call(cmd, shell=shell) == 1:
sys.exit(1)
def apply_options(self, cmd, options=()):
"""Apply command-line options."""
for option in self.default_cmd_options + options:
cmd = self.apply_option(cmd, option, active=getattr(self, option, False))
return cmd
def apply_option(self, cmd, option, active=True):
"""Apply a command-line option."""
return re.sub(
r"{{{}\:(?P<option>[^}}]*)}}".format(option),
r"\g<option>" if active else "",
cmd,
)
class lint(BaseCommand):
description = "check code using black (and fix violations)"
user_options = [("fix", "f", "fix the violations in place")]
def initialize_options(self):
"""Set the default options."""
self.fix = False
def finalize_options(self):
pass
def run(self):
cmd = "black"
if not self.fix:
cmd += " --check"
cmd += " ."
sys.exit(subprocess.call(cmd, shell=True))
class test(TestCommand):
user_options = [("pytest-args=", "a", "Arguments to pass to pytest")]
def initialize_options(self):
TestCommand.initialize_options(self)
self.pytest_args = ""
def run_tests(self):
unit_test_errno = subprocess.call(
"pytest tests " + self.pytest_args, shell=True
)
# cli_errno = subprocess.call('behave test/features', shell=True)
# sys.exit(unit_test_errno or cli_errno)
sys.exit(unit_test_errno)
# class test(BaseCommand):
# """Run the test suites for this project."""
# description = "run the test suite"
# user_options = [
# ("all", "a", "test against all supported versions of Python"),
# ("coverage", "c", "measure test coverage"),
# ]
# unit_test_cmd = (
# "py.test{quiet: -q}{verbose: -v}{dry_run: --setup-only}"
# "{coverage: --cov-report= --cov=litecli}"
# )
# # cli_test_cmd = 'behave{quiet: -q}{verbose: -v}{dry_run: -d} test/features'
# test_all_cmd = "tox{verbose: -v}{dry_run: --notest}"
# coverage_cmd = "coverage combine && coverage report"
# def initialize_options(self):
# """Set the default options."""
# self.all = False
# self.coverage = False
# super(test, self).initialize_options()
# def run(self):
# """Run the test suites."""
# if self.all:
# cmd = self.apply_options(self.test_all_cmd)
# self.call_and_exit(cmd)
# else:
# cmds = (
# self.apply_options(self.unit_test_cmd, ("coverage",)),
# # self.apply_options(self.cli_test_cmd)
# )
# if self.coverage:
# cmds += (self.apply_options(self.coverage_cmd),)
# self.call_in_sequence(cmds)

View file

@ -6,7 +6,7 @@ from utils import create_db, db_connection, drop_tables
import litecli.sqlexecute
@pytest.yield_fixture(scope="function")
@pytest.fixture(scope="function")
def connection():
create_db("_test_db")
connection = db_connection("_test_db")

View file

@ -135,10 +135,5 @@ Token.Toolbar.Arg.Text = nobold
[favorite_queries]
q_param = select * from test where name=?
sh_param = select * from test where id=$1
# Startup commands
# litecli commands or sqlite commands to be executed on startup.
# some of them will require you to have a database attached.
# they will be executed in the same order as they appear in the list.
[startup_commands]
commands = "create table startupcommands(a text)", "insert into startupcommands values('abc')"
commands = create table startupcommands(a text), insert into startupcommands values('abc')

View file

@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
"""Test the litecli.clistyle module."""
import pytest
from pygments.style import Style

View file

@ -32,9 +32,7 @@ def test_select_suggests_cols_with_qualified_table_scope():
def test_order_by_suggests_cols_with_qualified_table_scope():
suggestions = suggest_type(
"SELECT * FROM sch.tabl ORDER BY ", "SELECT * FROM sch.tabl ORDER BY "
)
suggestions = suggest_type("SELECT * FROM sch.tabl ORDER BY ", "SELECT * FROM sch.tabl ORDER BY ")
assert sorted_dicts(suggestions) == sorted_dicts(
[
{"type": "column", "tables": [("sch", "tabl", None)]},
@ -109,9 +107,7 @@ def test_operand_inside_function_suggests_cols1():
def test_operand_inside_function_suggests_cols2():
suggestion = suggest_type(
"SELECT MAX(col1 + col2 + FROM tbl", "SELECT MAX(col1 + col2 + "
)
suggestion = suggest_type("SELECT MAX(col1 + col2 + FROM tbl", "SELECT MAX(col1 + col2 + ")
assert suggestion == [{"type": "column", "tables": [(None, "tbl", None)]}]
@ -166,23 +162,17 @@ def test_expression_suggests_tables_views_and_schemas(expression):
)
def test_expression_suggests_qualified_tables_views_and_schemas(expression):
suggestions = suggest_type(expression, expression)
assert sorted_dicts(suggestions) == sorted_dicts(
[{"type": "table", "schema": "sch"}, {"type": "view", "schema": "sch"}]
)
assert sorted_dicts(suggestions) == sorted_dicts([{"type": "table", "schema": "sch"}, {"type": "view", "schema": "sch"}])
def test_truncate_suggests_tables_and_schemas():
suggestions = suggest_type("TRUNCATE ", "TRUNCATE ")
assert sorted_dicts(suggestions) == sorted_dicts(
[{"type": "table", "schema": []}, {"type": "schema"}]
)
assert sorted_dicts(suggestions) == sorted_dicts([{"type": "table", "schema": []}, {"type": "schema"}])
def test_truncate_suggests_qualified_tables():
suggestions = suggest_type("TRUNCATE sch.", "TRUNCATE sch.")
assert sorted_dicts(suggestions) == sorted_dicts(
[{"type": "table", "schema": "sch"}]
)
assert sorted_dicts(suggestions) == sorted_dicts([{"type": "table", "schema": "sch"}])
def test_distinct_suggests_cols():
@ -240,9 +230,7 @@ def test_insert_into_lparen_comma_suggests_cols():
def test_partially_typed_col_name_suggests_col_names():
suggestions = suggest_type(
"SELECT * FROM tabl WHERE col_n", "SELECT * FROM tabl WHERE col_n"
)
suggestions = suggest_type("SELECT * FROM tabl WHERE col_n", "SELECT * FROM tabl WHERE col_n")
assert sorted_dicts(suggestions) == sorted_dicts(
[
{"type": "alias", "aliases": ["tabl"]},
@ -278,9 +266,7 @@ def test_dot_suggests_cols_of_an_alias():
def test_dot_col_comma_suggests_cols_or_schema_qualified_table():
suggestions = suggest_type(
"SELECT t1.a, t2. FROM tabl1 t1, tabl2 t2", "SELECT t1.a, t2."
)
suggestions = suggest_type("SELECT t1.a, t2. FROM tabl1 t1, tabl2 t2", "SELECT t1.a, t2.")
assert sorted_dicts(suggestions) == sorted_dicts(
[
{"type": "column", "tables": [(None, "tabl2", "t2")]},
@ -349,9 +335,7 @@ def test_sub_select_table_name_completion(expression):
def test_sub_select_col_name_completion():
suggestions = suggest_type(
"SELECT * FROM (SELECT FROM abc", "SELECT * FROM (SELECT "
)
suggestions = suggest_type("SELECT * FROM (SELECT FROM abc", "SELECT * FROM (SELECT ")
assert sorted_dicts(suggestions) == sorted_dicts(
[
{"type": "alias", "aliases": ["abc"]},
@ -364,9 +348,7 @@ def test_sub_select_col_name_completion():
@pytest.mark.xfail
def test_sub_select_multiple_col_name_completion():
suggestions = suggest_type(
"SELECT * FROM (SELECT a, FROM abc", "SELECT * FROM (SELECT a, "
)
suggestions = suggest_type("SELECT * FROM (SELECT a, FROM abc", "SELECT * FROM (SELECT a, ")
assert sorted_dicts(suggestions) == sorted_dicts(
[
{"type": "column", "tables": [(None, "abc", None)]},
@ -376,9 +358,7 @@ def test_sub_select_multiple_col_name_completion():
def test_sub_select_dot_col_name_completion():
suggestions = suggest_type(
"SELECT * FROM (SELECT t. FROM tabl t", "SELECT * FROM (SELECT t."
)
suggestions = suggest_type("SELECT * FROM (SELECT t. FROM tabl t", "SELECT * FROM (SELECT t.")
assert sorted_dicts(suggestions) == sorted_dicts(
[
{"type": "column", "tables": [(None, "tabl", "t")]},
@ -502,9 +482,7 @@ def test_join_using_suggests_common_columns(col_list):
def test_2_statements_2nd_current():
suggestions = suggest_type(
"select * from a; select * from ", "select * from a; select * from "
)
suggestions = suggest_type("select * from a; select * from ", "select * from a; select * from ")
assert sorted_dicts(suggestions) == sorted_dicts(
[
{"type": "table", "schema": []},
@ -513,9 +491,7 @@ def test_2_statements_2nd_current():
]
)
suggestions = suggest_type(
"select * from a; select from b", "select * from a; select "
)
suggestions = suggest_type("select * from a; select from b", "select * from a; select ")
assert sorted_dicts(suggestions) == sorted_dicts(
[
{"type": "alias", "aliases": ["b"]},
@ -526,9 +502,7 @@ def test_2_statements_2nd_current():
)
# Should work even if first statement is invalid
suggestions = suggest_type(
"select * from; select * from ", "select * from; select * from "
)
suggestions = suggest_type("select * from; select * from ", "select * from; select * from ")
assert sorted_dicts(suggestions) == sorted_dicts(
[
{"type": "table", "schema": []},
@ -572,9 +546,7 @@ def test_3_statements_2nd_current():
]
)
suggestions = suggest_type(
"select * from a; select from b; select * from c", "select * from a; select "
)
suggestions = suggest_type("select * from a; select from b; select * from c", "select * from a; select ")
assert sorted_dicts(suggestions) == sorted_dicts(
[
{"type": "alias", "aliases": ["b"]},
@ -586,9 +558,7 @@ def test_3_statements_2nd_current():
def test_create_db_with_template():
suggestions = suggest_type(
"create database foo with template ", "create database foo with template "
)
suggestions = suggest_type("create database foo with template ", "create database foo with template ")
assert sorted_dicts(suggestions) == sorted_dicts([{"type": "database"}])
@ -597,9 +567,7 @@ def test_create_db_with_template():
def test_specials_included_for_initial_completion(initial_text):
suggestions = suggest_type(initial_text, initial_text)
assert sorted_dicts(suggestions) == sorted_dicts(
[{"type": "keyword"}, {"type": "special"}]
)
assert sorted_dicts(suggestions) == sorted_dicts([{"type": "keyword"}, {"type": "special"}])
def test_specials_not_included_after_initial_token():

View file

@ -1,6 +1,6 @@
import time
import pytest
from mock import Mock, patch
from unittest.mock import Mock, patch
@pytest.fixture

View file

@ -1,7 +1,6 @@
import os
from collections import namedtuple
from textwrap import dedent
from tempfile import NamedTemporaryFile
import shutil
import click
@ -180,9 +179,7 @@ def output(monkeypatch, terminal_size, testdata, explicit_pager, expect_pager):
def test_conditional_pager(monkeypatch):
testdata = "Lorem ipsum dolor sit amet consectetur adipiscing elit sed do".split(
" "
)
testdata = "Lorem ipsum dolor sit amet consectetur adipiscing elit sed do".split(" ")
# User didn't set pager, output doesn't fit screen -> pager
output(
monkeypatch,

View file

@ -1,7 +1,7 @@
# coding: utf-8
from __future__ import unicode_literals
import pytest
from mock import patch
from unittest.mock import patch
from prompt_toolkit.completion import Completion
from prompt_toolkit.document import Document
@ -35,7 +35,7 @@ def completer():
@pytest.fixture
def complete_event():
from mock import Mock
from unittest.mock import Mock
return Mock()
@ -43,29 +43,21 @@ def complete_event():
def test_empty_string_completion(completer, complete_event):
text = ""
position = 0
result = list(
completer.get_completions(
Document(text=text, cursor_position=position), complete_event
)
)
result = list(completer.get_completions(Document(text=text, cursor_position=position), complete_event))
assert list(map(Completion, sorted(completer.keywords))) == result
def test_select_keyword_completion(completer, complete_event):
text = "SEL"
position = len("SEL")
result = completer.get_completions(
Document(text=text, cursor_position=position), complete_event
)
result = completer.get_completions(Document(text=text, cursor_position=position), complete_event)
assert list(result) == list([Completion(text="SELECT", start_position=-3)])
def test_table_completion(completer, complete_event):
text = "SELECT * FROM "
position = len(text)
result = completer.get_completions(
Document(text=text, cursor_position=position), complete_event
)
result = completer.get_completions(Document(text=text, cursor_position=position), complete_event)
assert list(result) == list(
[
Completion(text="`réveillé`", start_position=0),
@ -79,9 +71,7 @@ def test_table_completion(completer, complete_event):
def test_function_name_completion(completer, complete_event):
text = "SELECT MA"
position = len("SELECT MA")
result = completer.get_completions(
Document(text=text, cursor_position=position), complete_event
)
result = completer.get_completions(Document(text=text, cursor_position=position), complete_event)
assert list(result) == list(
[
Completion(text="MAX", start_position=-2),
@ -100,11 +90,7 @@ def test_suggested_column_names(completer, complete_event):
"""
text = "SELECT from users"
position = len("SELECT ")
result = list(
completer.get_completions(
Document(text=text, cursor_position=position), complete_event
)
)
result = list(completer.get_completions(Document(text=text, cursor_position=position), complete_event))
assert result == list(
[
Completion(text="*", start_position=0),
@ -130,9 +116,7 @@ def test_suggested_column_names_in_function(completer, complete_event):
"""
text = "SELECT MAX( from users"
position = len("SELECT MAX(")
result = completer.get_completions(
Document(text=text, cursor_position=position), complete_event
)
result = completer.get_completions(Document(text=text, cursor_position=position), complete_event)
assert list(result) == list(
[
Completion(text="*", start_position=0),
@ -154,11 +138,7 @@ def test_suggested_column_names_with_table_dot(completer, complete_event):
"""
text = "SELECT users. from users"
position = len("SELECT users.")
result = list(
completer.get_completions(
Document(text=text, cursor_position=position), complete_event
)
)
result = list(completer.get_completions(Document(text=text, cursor_position=position), complete_event))
assert result == list(
[
Completion(text="*", start_position=0),
@ -180,11 +160,7 @@ def test_suggested_column_names_with_alias(completer, complete_event):
"""
text = "SELECT u. from users u"
position = len("SELECT u.")
result = list(
completer.get_completions(
Document(text=text, cursor_position=position), complete_event
)
)
result = list(completer.get_completions(Document(text=text, cursor_position=position), complete_event))
assert result == list(
[
Completion(text="*", start_position=0),
@ -207,11 +183,7 @@ def test_suggested_multiple_column_names(completer, complete_event):
"""
text = "SELECT id, from users u"
position = len("SELECT id, ")
result = list(
completer.get_completions(
Document(text=text, cursor_position=position), complete_event
)
)
result = list(completer.get_completions(Document(text=text, cursor_position=position), complete_event))
assert result == list(
[
Completion(text="*", start_position=0),
@ -237,11 +209,7 @@ def test_suggested_multiple_column_names_with_alias(completer, complete_event):
"""
text = "SELECT u.id, u. from users u"
position = len("SELECT u.id, u.")
result = list(
completer.get_completions(
Document(text=text, cursor_position=position), complete_event
)
)
result = list(completer.get_completions(Document(text=text, cursor_position=position), complete_event))
assert result == list(
[
Completion(text="*", start_position=0),
@ -264,11 +232,7 @@ def test_suggested_multiple_column_names_with_dot(completer, complete_event):
"""
text = "SELECT users.id, users. from users u"
position = len("SELECT users.id, users.")
result = list(
completer.get_completions(
Document(text=text, cursor_position=position), complete_event
)
)
result = list(completer.get_completions(Document(text=text, cursor_position=position), complete_event))
assert result == list(
[
Completion(text="*", start_position=0),
@ -283,37 +247,21 @@ def test_suggested_multiple_column_names_with_dot(completer, complete_event):
def test_suggested_aliases_after_on(completer, complete_event):
text = "SELECT u.name, o.id FROM users u JOIN orders o ON "
position = len("SELECT u.name, o.id FROM users u JOIN orders o ON ")
result = list(
completer.get_completions(
Document(text=text, cursor_position=position), complete_event
)
)
assert result == list(
[Completion(text="o", start_position=0), Completion(text="u", start_position=0)]
)
result = list(completer.get_completions(Document(text=text, cursor_position=position), complete_event))
assert result == list([Completion(text="o", start_position=0), Completion(text="u", start_position=0)])
def test_suggested_aliases_after_on_right_side(completer, complete_event):
text = "SELECT u.name, o.id FROM users u JOIN orders o ON o.user_id = "
position = len("SELECT u.name, o.id FROM users u JOIN orders o ON o.user_id = ")
result = list(
completer.get_completions(
Document(text=text, cursor_position=position), complete_event
)
)
assert result == list(
[Completion(text="o", start_position=0), Completion(text="u", start_position=0)]
)
result = list(completer.get_completions(Document(text=text, cursor_position=position), complete_event))
assert result == list([Completion(text="o", start_position=0), Completion(text="u", start_position=0)])
def test_suggested_tables_after_on(completer, complete_event):
text = "SELECT users.name, orders.id FROM users JOIN orders ON "
position = len("SELECT users.name, orders.id FROM users JOIN orders ON ")
result = list(
completer.get_completions(
Document(text=text, cursor_position=position), complete_event
)
)
result = list(completer.get_completions(Document(text=text, cursor_position=position), complete_event))
assert result == list(
[
Completion(text="orders", start_position=0),
@ -324,14 +272,8 @@ def test_suggested_tables_after_on(completer, complete_event):
def test_suggested_tables_after_on_right_side(completer, complete_event):
text = "SELECT users.name, orders.id FROM users JOIN orders ON orders.user_id = "
position = len(
"SELECT users.name, orders.id FROM users JOIN orders ON orders.user_id = "
)
result = list(
completer.get_completions(
Document(text=text, cursor_position=position), complete_event
)
)
position = len("SELECT users.name, orders.id FROM users JOIN orders ON orders.user_id = ")
result = list(completer.get_completions(Document(text=text, cursor_position=position), complete_event))
assert list(result) == list(
[
Completion(text="orders", start_position=0),
@ -343,11 +285,7 @@ def test_suggested_tables_after_on_right_side(completer, complete_event):
def test_table_names_after_from(completer, complete_event):
text = "SELECT * FROM "
position = len("SELECT * FROM ")
result = list(
completer.get_completions(
Document(text=text, cursor_position=position), complete_event
)
)
result = list(completer.get_completions(Document(text=text, cursor_position=position), complete_event))
assert list(result) == list(
[
Completion(text="`réveillé`", start_position=0),
@ -361,19 +299,13 @@ def test_table_names_after_from(completer, complete_event):
def test_auto_escaped_col_names(completer, complete_event):
text = "SELECT from `select`"
position = len("SELECT ")
result = list(
completer.get_completions(
Document(text=text, cursor_position=position), complete_event
)
)
result = list(completer.get_completions(Document(text=text, cursor_position=position), complete_event))
assert result == [
Completion(text="*", start_position=0),
Completion(text="`ABC`", start_position=0),
Completion(text="`insert`", start_position=0),
Completion(text="id", start_position=0),
] + list(map(Completion, completer.functions)) + [
Completion(text="select", start_position=0)
] + list(
] + list(map(Completion, completer.functions)) + [Completion(text="select", start_position=0)] + list(
map(Completion, sorted(completer.keywords))
)
@ -381,11 +313,7 @@ def test_auto_escaped_col_names(completer, complete_event):
def test_un_escaped_table_names(completer, complete_event):
text = "SELECT from réveillé"
position = len("SELECT ")
result = list(
completer.get_completions(
Document(text=text, cursor_position=position), complete_event
)
)
result = list(completer.get_completions(Document(text=text, cursor_position=position), complete_event))
assert result == list(
[
Completion(text="*", start_position=0),
@ -420,10 +348,6 @@ def dummy_list_path(dir_name):
)
def test_file_name_completion(completer, complete_event, text, expected):
position = len(text)
result = list(
completer.get_completions(
Document(text=text, cursor_position=position), complete_event
)
)
result = list(completer.get_completions(Document(text=text, cursor_position=position), complete_event))
expected = list([Completion(txt, pos) for txt, pos in expected])
assert result == expected

View file

@ -274,8 +274,7 @@ def test_favorite_query_multiple_statement(executor):
results = run(
executor,
"\\fs test-ad select * from test where a like 'a%'; "
"select * from test where a like 'd%'",
"\\fs test-ad select * from test where a like 'a%'; " "select * from test where a like 'd%'",
)
assert_result_equal(results, status="Saved.")
@ -346,9 +345,7 @@ def test_cd_command_without_a_folder_name(executor):
@dbtest
def test_system_command_not_found(executor):
results = run(executor, "system xyz")
assert_result_equal(
results, status="OSError: No such file or directory", assert_contains=True
)
assert_result_equal(results, status="OSError: No such file or directory", assert_contains=True)
@dbtest

View file

@ -12,7 +12,7 @@ import pytest
from litecli.main import special
DATABASE = os.getenv("PYTEST_DATABASE", "test.sqlite3")
DATABASE = "test.sqlite3"
def db_connection(dbname=":memory:"):
@ -23,12 +23,10 @@ def db_connection(dbname=":memory:"):
try:
db_connection()
CAN_CONNECT_TO_DB = True
except Exception as ex:
except Exception:
CAN_CONNECT_TO_DB = False
dbtest = pytest.mark.skipif(
not CAN_CONNECT_TO_DB, reason="Error creating sqlite connection"
)
dbtest = pytest.mark.skipif(not CAN_CONNECT_TO_DB, reason="Error creating sqlite connection")
def create_db(dbname):
@ -36,7 +34,7 @@ def create_db(dbname):
try:
cur.execute("""DROP DATABASE IF EXISTS _test_db""")
cur.execute("""CREATE DATABASE _test_db""")
except:
except Exception:
pass
@ -44,7 +42,7 @@ def drop_tables(dbname):
with closing(db_connection().cursor()) as cur:
try:
cur.execute("""DROP DATABASE IF EXISTS _test_db""")
except:
except Exception:
pass
@ -54,9 +52,7 @@ def run(executor, sql, rows_as_list=True):
for title, rows, headers, status in executor.run(sql):
rows = list(rows) if (rows_as_list and rows) else rows
result.append(
{"title": title, "rows": rows, "headers": headers, "status": status}
)
result.append({"title": title, "rows": rows, "headers": headers, "status": status})
return result
@ -89,8 +85,6 @@ def send_ctrl_c(wait_seconds):
Returns the `multiprocessing.Process` created.
"""
ctrl_c_process = multiprocessing.Process(
target=send_ctrl_c_to_pid, args=(os.getpid(), wait_seconds)
)
ctrl_c_process = multiprocessing.Process(target=send_ctrl_c_to_pid, args=(os.getpid(), wait_seconds))
ctrl_c_process.start()
return ctrl_c_process

19
tox.ini
View file

@ -1,11 +1,14 @@
[tox]
envlist = py37, py38, py39, py310
envlist = py,style
[testenv]
deps = pytest
mock
pexpect
behave
coverage
commands = python setup.py test
passenv = PYTEST_DATABASE
deps = uv
commands = uv pip install -e .[dev]
coverage run -m pytest -v tests
coverage report -m
[testenv:style]
skip_install = true
deps = ruff
commands = ruff check --fix
ruff format