1
0
Fork 0

Merging upstream version 1.15.0.

Signed-off-by: Daniel Baumann <daniel@debian.org>
This commit is contained in:
Daniel Baumann 2025-03-17 07:31:48 +01:00
parent e6604cf449
commit d4dff17dce
Signed by: daniel
GPG key ID: FBB4F0E80A80222F
10 changed files with 141 additions and 23 deletions

View file

@ -1,3 +1,24 @@
## 1.15.0 - 2025-03-15
### Features
* Add logs while invoking `\llm`and `\\m+` command. [(#215)](https://github.com/dbcli/litecli/pull/215)
* Support `--help` in the `\llm`and `\llm+` command. ([#214](https://github.com/dbcli/litecli/pull/214))
* Make the history file location configurable. ([#206](https://github.com/dbcli/litecli/issues/206))
* Add dot command to list views.
### Bug Fixes
* Fix a bug where the `\llm` command on alternate invocations weren't detected correctly. (#211)
* Do not escape upper table or column name. [(#185)](https://github.com/dbcli/litecli/issues/185)
* Return indices when `.schema` command is run. Also update the output to contain the `sql` for the `indexes` command. [(#149)](https://github.com/dbcli/litecli/issues/149)
### Internal
* Fix typo `pormpt`to `prompt` in `special/llm.py`.
* Update pip install to work in both bash and zsh.
## 1.14.4 - 2025-01-31
### Bug Fixes

View file

@ -44,7 +44,7 @@ You'll always get credit for your work.
5. Install the dependencies and development tools:
```bash
$ pip install --editable .[dev]
$ pip install --editable ".[dev]"
```
6. Create a branch for your bugfix or feature based off the `main` branch:

View file

@ -18,6 +18,12 @@ destructive_warning = True
# %USERPROFILE% is typically C:\Users\{username}
log_file = default
# history_file location.
# In Unix/Linux: ~/.config/litecli/history
# In Windows: %USERPROFILE%\AppData\Local\dbcli\litecli\history
# %USERPROFILE% is typically C:\Users\{username}
history_file = default
# Default log level. Possible values: "CRITICAL", "ERROR", "WARNING", "INFO"
# and "DEBUG". "NONE" disables logging.
log_level = INFO

View file

@ -352,7 +352,10 @@ class LiteCli(object):
self.configure_pager()
self.refresh_completions()
history_file = config_location() + "history"
history_file = self.config["main"]["history_file"]
if history_file == "default":
history_file = config_location() + "history"
history_file = os.path.expanduser(history_file)
if dir_path_exists(history_file):
history = FileHistory(history_file)
else:
@ -441,13 +444,16 @@ class LiteCli(object):
self.echo(str(e), err=True, fg="red")
return
if special.is_llm_command(text):
while special.is_llm_command(text):
try:
start = time()
cur = self.sqlexecute.conn and self.sqlexecute.conn.cursor()
context, sql = special.handle_llm(text, cur)
context, sql, duration = special.handle_llm(text, cur)
if context:
click.echo("LLM Reponse:")
click.echo(context)
click.echo('---')
click.echo(f"Time: {duration:.2f} seconds")
text = self.prompt_app.prompt(default=sql)
except KeyboardInterrupt:
return

View file

@ -56,6 +56,40 @@ def list_tables(cur, arg=None, arg_type=PARSED_QUERY, verbose=False):
return [(None, tables, headers, status)]
@special_command(
".views",
"\\dv",
"List views.",
arg_type=PARSED_QUERY,
case_sensitive=True,
aliases=("\\dv",),
)
def list_views(cur, arg=None, arg_type=PARSED_QUERY, verbose=False):
if arg:
args = ("{0}%".format(arg),)
query = """
SELECT name FROM sqlite_master
WHERE type = 'view' AND name LIKE ? AND name NOT LIKE 'sqlite_%'
ORDER BY 1
"""
else:
args = tuple()
query = """
SELECT name FROM sqlite_master
WHERE type = 'view' AND name NOT LIKE 'sqlite_%'
ORDER BY 1
"""
log.debug(query)
cur.execute(query, args)
views = cur.fetchall()
status = ""
if cur.description:
headers = [x[0] for x in cur.description]
else:
return [(None, None, None, "")]
return [(None, views, headers, status)]
@special_command(
".schema",
".schema[+] [table]",
@ -68,7 +102,7 @@ def show_schema(cur, arg=None, **_):
args = (arg,)
query = """
SELECT sql FROM sqlite_master
WHERE name==? AND sql IS NOT NULL
WHERE tbl_name==? AND sql IS NOT NULL
ORDER BY tbl_name, type DESC, name
"""
else:
@ -90,7 +124,6 @@ def show_schema(cur, arg=None, **_):
return [(None, tables, headers, status)]
@special_command(
".databases",
".databases",
@ -109,7 +142,6 @@ def list_databases(cur, **_):
else:
return [(None, None, None, "")]
@special_command(
".indexes",
".indexes [tablename]",
@ -122,14 +154,14 @@ def list_indexes(cur, arg=None, arg_type=PARSED_QUERY, verbose=False):
if arg:
args = ("{0}%".format(arg),)
query = """
SELECT name FROM sqlite_master
SELECT name, sql FROM sqlite_master
WHERE type = 'index' AND tbl_name LIKE ? AND name NOT LIKE 'sqlite_%'
ORDER BY 1
"""
else:
args = tuple()
query = """
SELECT name FROM sqlite_master
SELECT name, sql FROM sqlite_master
WHERE type = 'index' AND name NOT LIKE 'sqlite_%'
ORDER BY 1
"""

View file

@ -7,6 +7,7 @@ import shlex
import sys
from runpy import run_module
from typing import Optional, Tuple
from time import time
import click
@ -19,6 +20,8 @@ try:
except ImportError:
llm = None
cli = None
LLM_CLI_COMMANDS = []
MODELS = {}
from . import export
from .main import parse_special_command
@ -191,9 +194,8 @@ def ensure_litecli_template(replace=False):
run_external_cmd("llm", PROMPT, "--save", "litecli")
return
@export
def handle_llm(text, cur) -> Tuple[str, Optional[str]]:
def handle_llm(text, cur) -> Tuple[str, Optional[str], float]:
"""This function handles the special command `\\llm`.
If it deals with a question that results in a SQL query then it will return
@ -222,7 +224,7 @@ def handle_llm(text, cur) -> Tuple[str, Optional[str]]:
if "-c" in parts:
capture_output = True
use_context = False
# If the parts has `pormpt` command without `-c` then use context to the prompt.
# If the parts has `prompt` command without `-c` then use context to the prompt.
# \llm -m ollama prompt "Most visited urls?"
elif "prompt" in parts: # User might invoke prompt with an option flag in the first argument.
capture_output = True
@ -236,6 +238,10 @@ def handle_llm(text, cur) -> Tuple[str, Optional[str]]:
elif parts[0] in LLM_CLI_COMMANDS:
capture_output = False
use_context = False
# If the user wants to use --help option to see each command and it's description
elif "--help" == parts[0]:
capture_output = False
use_context = False
# If the parts doesn't have any known LLM_CLI_COMMANDS then the user is
# invoking a question. eg: \llm -m ollama "Most visited urls?"
elif not set(parts).intersection(LLM_CLI_COMMANDS):
@ -250,7 +256,10 @@ def handle_llm(text, cur) -> Tuple[str, Optional[str]]:
if not use_context:
args = parts
if capture_output:
click.echo("Calling llm command")
start = time()
_, result = run_external_cmd("llm", *args, capture_output=capture_output)
end = time()
match = re.search(_SQL_CODE_FENCE, result, re.DOTALL)
if match:
sql = match.group(1).strip()
@ -258,17 +267,21 @@ def handle_llm(text, cur) -> Tuple[str, Optional[str]]:
output = [(None, None, None, result)]
raise FinishIteration(output)
return result if verbose else "", sql
return result if verbose else "", sql, end - start
else:
run_external_cmd("llm", *args, restart_cli=restart)
raise FinishIteration(None)
try:
ensure_litecli_template()
# Measure end to end llm command invocation.
# This measures the internal DB command to pull the schema and llm command
start = time()
context, sql = sql_using_llm(cur=cur, question=arg, verbose=verbose)
end = time()
if not verbose:
context = ""
return context, sql
return context, sql, end - start
except Exception as e:
# Something went wrong. Raise an exception and bail.
raise RuntimeError(e)
@ -297,6 +310,7 @@ def sql_using_llm(cur, question=None, verbose=False) -> Tuple[str, Optional[str]
WHERE type IN ('table','view') AND name NOT LIKE 'sqlite_%'
ORDER BY 1
"""
click.echo("Preparing schema information to feed the llm")
sample_row_query = "SELECT * FROM {table} LIMIT 1"
log.debug(schema_query)
cur.execute(schema_query)
@ -328,7 +342,9 @@ def sql_using_llm(cur, question=None, verbose=False) -> Tuple[str, Optional[str]
question,
" ", # Dummy argument to prevent llm from waiting on stdin
]
click.echo("Invoking llm command with schema information")
_, result = run_external_cmd("llm", *args, capture_output=True)
click.echo("Received response from the llm command")
match = re.search(_SQL_CODE_FENCE, result, re.DOTALL)
if match:
sql = match.group(1).strip()

View file

@ -258,7 +258,7 @@ class SQLCompleter(Completer):
self.reserved_words = set()
for x in self.keywords:
self.reserved_words.update(x.split())
self.name_pattern = compile(r"^[_a-z][_a-z0-9\$]*$")
self.name_pattern = compile(r"^[_a-zA-Z][_a-zA-Z0-9\$]*$")
self.special_commands = []
self.table_formats = supported_formats
@ -320,6 +320,7 @@ class SQLCompleter(Completer):
try:
data = [self.escaped_names(d) for d in data]
except Exception:
_logger.exception("Failed to get relation names.")
data = []
# dbmetadata['tables'][$schema_name][$table_name] should be a list of
@ -351,6 +352,7 @@ class SQLCompleter(Completer):
try:
column_data = [self.escaped_names(d) for d in column_data]
except Exception:
_logger.exception("Failed to get column names.")
column_data = []
metadata = self.dbmetadata[kind]
@ -366,6 +368,7 @@ class SQLCompleter(Completer):
try:
func_data = [self.escaped_names(d) for d in func_data]
except Exception:
_logger.exception("Failed to get function names.")
func_data = []
# dbmetadata['functions'][$schema_name][$function_name] should return

View file

@ -38,7 +38,7 @@ class SQLExecute(object):
"""
indexes_query = """
SELECT name
SELECT name, sql
FROM sqlite_master
WHERE type = 'index' AND name NOT LIKE 'sqlite_%'
ORDER BY 1
@ -58,7 +58,7 @@ class SQLExecute(object):
def connect(self, database=None):
db = database or self.dbname
_logger.debug("Connection DB Params: \n" "\tdatabase: %r", database)
_logger.debug("Connection DB Params: \n" "\tdatabase: %r", db)
db_name = os.path.expanduser(db)
db_dir_name = os.path.dirname(os.path.abspath(db_name))

View file

@ -59,7 +59,7 @@ def test_llm_command_with_c_flag_and_fenced_sql(mock_run_cmd, mock_llm, executor
test_text = r"\llm -c 'Rewrite the SQL without CTE'"
result, sql = handle_llm(test_text, executor)
result, sql, duration = handle_llm(test_text, executor)
# We expect the function to return (result, sql), but result might be "" if verbose is not set
# By default, `verbose` is false unless text has something like \llm --verbose?
@ -67,6 +67,7 @@ def test_llm_command_with_c_flag_and_fenced_sql(mock_run_cmd, mock_llm, executor
# Our test_text doesn't set verbose => we expect "" for the returned context.
assert result == ""
assert sql == "SELECT * FROM table;"
assert isinstance(duration, float)
@patch("litecli.packages.special.llm.llm")
@ -87,6 +88,23 @@ def test_llm_command_known_subcommand(mock_run_cmd, mock_llm, executor):
# And the function should raise FinishIteration(None)
assert exc_info.value.args[0] is None
@patch("litecli.packages.special.llm.llm")
@patch("litecli.packages.special.llm.run_external_cmd")
def test_llm_command_with_help_flag(mock_run_cmd, mock_llm, executor):
"""
If the parts[0] is --help, we do NOT capture output, we just call run_external_cmd
and then raise FinishIteration.
"""
# Let's assume 'models' is in LLM_CLI_COMMANDS
test_text = r"\llm --help"
with pytest.raises(FinishIteration) as exc_info:
handle_llm(test_text, executor)
# We check that run_external_cmd was called with these arguments:
mock_run_cmd.assert_called_once_with("llm", "--help", restart_cli=False)
# And the function should raise FinishIteration(None)
assert exc_info.value.args[0] is None
@patch("litecli.packages.special.llm.llm")
@patch("litecli.packages.special.llm.run_external_cmd")
@ -116,7 +134,7 @@ def test_llm_command_with_prompt(mock_sql_using_llm, mock_ensure_template, mock_
mock_sql_using_llm.return_value = ("context from LLM", "SELECT 1;")
test_text = r"\llm prompt 'Magic happening here?'"
context, sql = handle_llm(test_text, executor)
context, sql, duration = handle_llm(test_text, executor)
# ensure_litecli_template should be called
mock_ensure_template.assert_called_once()
@ -126,6 +144,7 @@ def test_llm_command_with_prompt(mock_sql_using_llm, mock_ensure_template, mock_
mock_sql_using_llm.assert_called()
assert context == ""
assert sql == "SELECT 1;"
assert isinstance(duration, float)
@patch("litecli.packages.special.llm.llm")
@ -138,12 +157,13 @@ def test_llm_command_question_with_context(mock_sql_using_llm, mock_ensure_templ
mock_sql_using_llm.return_value = ("You have context!", "SELECT 2;")
test_text = r"\llm 'Top 10 downloads by size.'"
context, sql = handle_llm(test_text, executor)
context, sql, duration = handle_llm(test_text, executor)
mock_ensure_template.assert_called_once()
mock_sql_using_llm.assert_called()
assert context == ""
assert sql == "SELECT 2;"
assert isinstance(duration, float)
@patch("litecli.packages.special.llm.llm")
@ -156,7 +176,9 @@ def test_llm_command_question_verbose(mock_sql_using_llm, mock_ensure_template,
mock_sql_using_llm.return_value = ("Verbose context, oh yeah!", "SELECT 42;")
test_text = r"\llm+ 'Top 10 downloads by size.'"
context, sql = handle_llm(test_text, executor)
context, sql, duration = handle_llm(test_text, executor)
assert context == "Verbose context, oh yeah!"
assert sql == "SELECT 42;"
assert isinstance(duration, float)

View file

@ -40,6 +40,18 @@ def complete_event():
return Mock()
def test_escape_name(completer):
for name, expected_name in [# Upper case name shouldn't be escaped
("BAR", "BAR"),
# This name is escaped and should start with back tick
("2025todos", "`2025todos`"),
# normal case
("people", "people"),
# table name with _underscore should not be escaped
("django_users", "django_users")]:
assert completer.escape_name(name) == expected_name
def test_empty_string_completion(completer, complete_event):
text = ""
position = 0
@ -302,7 +314,7 @@ def test_auto_escaped_col_names(completer, complete_event):
result = list(completer.get_completions(Document(text=text, cursor_position=position), complete_event))
assert result == [
Completion(text="*", start_position=0),
Completion(text="`ABC`", start_position=0),
Completion(text="ABC", start_position=0),
Completion(text="`insert`", start_position=0),
Completion(text="id", start_position=0),
] + list(map(Completion, completer.functions)) + [Completion(text="select", start_position=0)] + list(
@ -317,7 +329,7 @@ def test_un_escaped_table_names(completer, complete_event):
assert result == list(
[
Completion(text="*", start_position=0),
Completion(text="`ABC`", start_position=0),
Completion(text="ABC", start_position=0),
Completion(text="`insert`", start_position=0),
Completion(text="id", start_position=0),
]