2025-02-09 19:48:22 +01:00
|
|
|
from metadata import (
|
|
|
|
MetaData,
|
|
|
|
alias,
|
|
|
|
name_join,
|
|
|
|
fk_join,
|
|
|
|
join,
|
|
|
|
keyword,
|
|
|
|
schema,
|
|
|
|
table,
|
|
|
|
view,
|
|
|
|
function,
|
|
|
|
column,
|
|
|
|
wildcard_expansion,
|
|
|
|
get_result,
|
|
|
|
result_set,
|
|
|
|
qual,
|
|
|
|
no_qual,
|
|
|
|
parametrize,
|
|
|
|
)
|
|
|
|
from prompt_toolkit.completion import Completion
|
|
|
|
from utils import completions_to_set
|
|
|
|
|
|
|
|
|
|
|
|
metadata = {
|
|
|
|
"tables": {
|
|
|
|
"users": ["id", "parentid", "email", "first_name", "last_name"],
|
|
|
|
"Users": ["userid", "username"],
|
|
|
|
"orders": ["id", "ordered_date", "status", "email"],
|
|
|
|
"select": ["id", "insert", "ABC"],
|
|
|
|
},
|
|
|
|
"views": {"user_emails": ["id", "email"], "functions": ["function"]},
|
|
|
|
"functions": [
|
|
|
|
["custom_fun", [], [], [], "", False, False, False, False],
|
|
|
|
["_custom_fun", [], [], [], "", False, False, False, False],
|
|
|
|
["custom_func1", [], [], [], "", False, False, False, False],
|
|
|
|
["custom_func2", [], [], [], "", False, False, False, False],
|
|
|
|
[
|
|
|
|
"set_returning_func",
|
|
|
|
["x", "y"],
|
|
|
|
["integer", "integer"],
|
|
|
|
["b", "b"],
|
|
|
|
"",
|
|
|
|
False,
|
|
|
|
False,
|
|
|
|
True,
|
|
|
|
False,
|
|
|
|
],
|
|
|
|
],
|
|
|
|
"datatypes": ["custom_type1", "custom_type2"],
|
|
|
|
"foreignkeys": [
|
|
|
|
("public", "users", "id", "public", "users", "parentid"),
|
|
|
|
("public", "users", "id", "public", "Users", "userid"),
|
|
|
|
],
|
|
|
|
}
|
|
|
|
|
2025-02-09 19:58:08 +01:00
|
|
|
metadata = {k: {"public": v} for k, v in metadata.items()}
|
2025-02-09 19:48:22 +01:00
|
|
|
|
|
|
|
testdata = MetaData(metadata)
|
|
|
|
|
|
|
|
cased_users_col_names = ["ID", "PARENTID", "Email", "First_Name", "last_name"]
|
|
|
|
cased_users2_col_names = ["UserID", "UserName"]
|
|
|
|
cased_func_names = [
|
|
|
|
"Custom_Fun",
|
|
|
|
"_custom_fun",
|
|
|
|
"Custom_Func1",
|
|
|
|
"custom_func2",
|
|
|
|
"set_returning_func",
|
|
|
|
]
|
|
|
|
cased_tbls = ["Users", "Orders"]
|
|
|
|
cased_views = ["User_Emails", "Functions"]
|
|
|
|
casing = (
|
|
|
|
["SELECT", "PUBLIC"]
|
|
|
|
+ cased_func_names
|
|
|
|
+ cased_tbls
|
|
|
|
+ cased_views
|
|
|
|
+ cased_users_col_names
|
|
|
|
+ cased_users2_col_names
|
|
|
|
)
|
|
|
|
# Lists for use in assertions
|
|
|
|
cased_funcs = [
|
|
|
|
function(f)
|
|
|
|
for f in ("Custom_Fun()", "_custom_fun()", "Custom_Func1()", "custom_func2()")
|
|
|
|
] + [function("set_returning_func(x := , y := )", display="set_returning_func(x, y)")]
|
|
|
|
cased_tbls = [table(t) for t in (cased_tbls + ['"Users"', '"select"'])]
|
|
|
|
cased_rels = [view(t) for t in cased_views] + cased_funcs + cased_tbls
|
|
|
|
cased_users_cols = [column(c) for c in cased_users_col_names]
|
|
|
|
aliased_rels = (
|
|
|
|
[table(t) for t in ("users u", '"Users" U', "orders o", '"select" s')]
|
|
|
|
+ [view("user_emails ue"), view("functions f")]
|
|
|
|
+ [
|
|
|
|
function(f)
|
|
|
|
for f in (
|
|
|
|
"_custom_fun() cf",
|
|
|
|
"custom_fun() cf",
|
|
|
|
"custom_func1() cf",
|
|
|
|
"custom_func2() cf",
|
|
|
|
)
|
|
|
|
]
|
|
|
|
+ [
|
|
|
|
function(
|
|
|
|
"set_returning_func(x := , y := ) srf",
|
|
|
|
display="set_returning_func(x, y) srf",
|
|
|
|
)
|
|
|
|
]
|
|
|
|
)
|
|
|
|
cased_aliased_rels = (
|
|
|
|
[table(t) for t in ("Users U", '"Users" U', "Orders O", '"select" s')]
|
|
|
|
+ [view("User_Emails UE"), view("Functions F")]
|
|
|
|
+ [
|
|
|
|
function(f)
|
|
|
|
for f in (
|
|
|
|
"_custom_fun() cf",
|
|
|
|
"Custom_Fun() CF",
|
|
|
|
"Custom_Func1() CF",
|
|
|
|
"custom_func2() cf",
|
|
|
|
)
|
|
|
|
]
|
|
|
|
+ [
|
|
|
|
function(
|
|
|
|
"set_returning_func(x := , y := ) srf",
|
|
|
|
display="set_returning_func(x, y) srf",
|
|
|
|
)
|
|
|
|
]
|
|
|
|
)
|
|
|
|
completers = testdata.get_completers(casing)
|
|
|
|
|
|
|
|
|
|
|
|
# Just to make sure that this doesn't crash
|
|
|
|
@parametrize("completer", completers())
|
|
|
|
def test_function_column_name(completer):
|
|
|
|
for l in range(
|
|
|
|
len("SELECT * FROM Functions WHERE function:"),
|
|
|
|
len("SELECT * FROM Functions WHERE function:text") + 1,
|
|
|
|
):
|
|
|
|
assert [] == get_result(
|
|
|
|
completer, "SELECT * FROM Functions WHERE function:text"[:l]
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
@parametrize("action", ["ALTER", "DROP", "CREATE", "CREATE OR REPLACE"])
|
|
|
|
@parametrize("completer", completers())
|
|
|
|
def test_drop_alter_function(completer, action):
|
|
|
|
assert get_result(completer, action + " FUNCTION set_ret") == [
|
|
|
|
function("set_returning_func(x integer, y integer)", -len("set_ret"))
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
|
|
@parametrize("completer", completers())
|
|
|
|
def test_empty_string_completion(completer):
|
|
|
|
result = get_result(completer, "")
|
|
|
|
assert completions_to_set(
|
|
|
|
testdata.keywords() + testdata.specials()
|
|
|
|
) == completions_to_set(result)
|
|
|
|
|
|
|
|
|
|
|
|
@parametrize("completer", completers())
|
|
|
|
def test_select_keyword_completion(completer):
|
|
|
|
result = get_result(completer, "SEL")
|
|
|
|
assert completions_to_set(result) == completions_to_set([keyword("SELECT", -3)])
|
|
|
|
|
|
|
|
|
|
|
|
@parametrize("completer", completers())
|
|
|
|
def test_builtin_function_name_completion(completer):
|
|
|
|
result = get_result(completer, "SELECT MA")
|
|
|
|
assert completions_to_set(result) == completions_to_set(
|
|
|
|
[
|
|
|
|
function("MAKE_DATE", -2),
|
|
|
|
function("MAKE_INTERVAL", -2),
|
|
|
|
function("MAKE_TIME", -2),
|
|
|
|
function("MAKE_TIMESTAMP", -2),
|
|
|
|
function("MAKE_TIMESTAMPTZ", -2),
|
|
|
|
function("MASKLEN", -2),
|
|
|
|
function("MAX", -2),
|
|
|
|
keyword("MAXEXTENTS", -2),
|
|
|
|
keyword("MATERIALIZED VIEW", -2),
|
|
|
|
]
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
@parametrize("completer", completers())
|
|
|
|
def test_builtin_function_matches_only_at_start(completer):
|
|
|
|
text = "SELECT IN"
|
|
|
|
|
|
|
|
result = [c.text for c in get_result(completer, text)]
|
|
|
|
|
|
|
|
assert "MIN" not in result
|
|
|
|
|
|
|
|
|
|
|
|
@parametrize("completer", completers(casing=False, aliasing=False))
|
|
|
|
def test_user_function_name_completion(completer):
|
|
|
|
result = get_result(completer, "SELECT cu")
|
|
|
|
assert completions_to_set(result) == completions_to_set(
|
|
|
|
[
|
|
|
|
function("custom_fun()", -2),
|
|
|
|
function("_custom_fun()", -2),
|
|
|
|
function("custom_func1()", -2),
|
|
|
|
function("custom_func2()", -2),
|
|
|
|
function("CURRENT_DATE", -2),
|
|
|
|
function("CURRENT_TIMESTAMP", -2),
|
|
|
|
function("CUME_DIST", -2),
|
|
|
|
function("CURRENT_TIME", -2),
|
|
|
|
keyword("CURRENT", -2),
|
|
|
|
]
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
@parametrize("completer", completers(casing=False, aliasing=False))
|
|
|
|
def test_user_function_name_completion_matches_anywhere(completer):
|
|
|
|
result = get_result(completer, "SELECT om")
|
|
|
|
assert completions_to_set(result) == completions_to_set(
|
|
|
|
[
|
|
|
|
function("custom_fun()", -2),
|
|
|
|
function("_custom_fun()", -2),
|
|
|
|
function("custom_func1()", -2),
|
|
|
|
function("custom_func2()", -2),
|
|
|
|
]
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
@parametrize("completer", completers(casing=True))
|
|
|
|
def test_list_functions_for_special(completer):
|
|
|
|
result = get_result(completer, r"\df ")
|
|
|
|
assert completions_to_set(result) == completions_to_set(
|
|
|
|
[schema("PUBLIC")] + [function(f) for f in cased_func_names]
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
@parametrize("completer", completers(casing=False, qualify=no_qual))
|
|
|
|
def test_suggested_column_names_from_visible_table(completer):
|
|
|
|
result = get_result(completer, "SELECT from users", len("SELECT "))
|
|
|
|
assert completions_to_set(result) == completions_to_set(
|
|
|
|
testdata.columns_functions_and_keywords("users")
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
@parametrize("completer", completers(casing=True, qualify=no_qual))
|
|
|
|
def test_suggested_cased_column_names(completer):
|
|
|
|
result = get_result(completer, "SELECT from users", len("SELECT "))
|
|
|
|
assert completions_to_set(result) == completions_to_set(
|
|
|
|
cased_funcs
|
|
|
|
+ cased_users_cols
|
|
|
|
+ testdata.builtin_functions()
|
|
|
|
+ testdata.keywords()
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
@parametrize("completer", completers(casing=False, qualify=no_qual))
|
|
|
|
@parametrize("text", ["SELECT from users", "INSERT INTO Orders SELECT from users"])
|
|
|
|
def test_suggested_auto_qualified_column_names(text, completer):
|
|
|
|
position = text.index(" ") + 1
|
|
|
|
cols = [column(c.lower()) for c in cased_users_col_names]
|
|
|
|
result = get_result(completer, text, position)
|
|
|
|
assert completions_to_set(result) == completions_to_set(
|
|
|
|
cols + testdata.functions_and_keywords()
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
@parametrize("completer", completers(casing=False, qualify=qual))
|
|
|
|
@parametrize(
|
|
|
|
"text",
|
|
|
|
[
|
|
|
|
'SELECT from users U NATURAL JOIN "Users"',
|
|
|
|
'INSERT INTO Orders SELECT from users U NATURAL JOIN "Users"',
|
|
|
|
],
|
|
|
|
)
|
|
|
|
def test_suggested_auto_qualified_column_names_two_tables(text, completer):
|
|
|
|
position = text.index(" ") + 1
|
|
|
|
cols = [column("U." + c.lower()) for c in cased_users_col_names]
|
|
|
|
cols += [column('"Users".' + c.lower()) for c in cased_users2_col_names]
|
|
|
|
result = get_result(completer, text, position)
|
|
|
|
assert completions_to_set(result) == completions_to_set(
|
|
|
|
cols + testdata.functions_and_keywords()
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
@parametrize("completer", completers(casing=True, qualify=["always"]))
|
|
|
|
@parametrize("text", ["UPDATE users SET ", "INSERT INTO users("])
|
|
|
|
def test_no_column_qualification(text, completer):
|
|
|
|
cols = [column(c) for c in cased_users_col_names]
|
|
|
|
result = get_result(completer, text)
|
|
|
|
assert completions_to_set(result) == completions_to_set(cols)
|
|
|
|
|
|
|
|
|
|
|
|
@parametrize("completer", completers(casing=True, qualify=["always"]))
|
|
|
|
def test_suggested_cased_always_qualified_column_names(completer):
|
|
|
|
text = "SELECT from users"
|
|
|
|
position = len("SELECT ")
|
|
|
|
cols = [column("users." + c) for c in cased_users_col_names]
|
|
|
|
result = get_result(completer, text, position)
|
|
|
|
assert completions_to_set(result) == completions_to_set(
|
|
|
|
cased_funcs + cols + testdata.builtin_functions() + testdata.keywords()
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
@parametrize("completer", completers(casing=False, qualify=no_qual))
|
|
|
|
def test_suggested_column_names_in_function(completer):
|
|
|
|
result = get_result(completer, "SELECT MAX( from users", len("SELECT MAX("))
|
|
|
|
assert completions_to_set(result) == completions_to_set(
|
2025-02-09 19:58:08 +01:00
|
|
|
testdata.columns_functions_and_keywords("users")
|
2025-02-09 19:48:22 +01:00
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
@parametrize("completer", completers(casing=False))
|
|
|
|
def test_suggested_column_names_with_table_dot(completer):
|
|
|
|
result = get_result(completer, "SELECT users. from users", len("SELECT users."))
|
|
|
|
assert completions_to_set(result) == completions_to_set(testdata.columns("users"))
|
|
|
|
|
|
|
|
|
|
|
|
@parametrize("completer", completers(casing=False))
|
|
|
|
def test_suggested_column_names_with_alias(completer):
|
|
|
|
result = get_result(completer, "SELECT u. from users u", len("SELECT u."))
|
|
|
|
assert completions_to_set(result) == completions_to_set(testdata.columns("users"))
|
|
|
|
|
|
|
|
|
|
|
|
@parametrize("completer", completers(casing=False, qualify=no_qual))
|
|
|
|
def test_suggested_multiple_column_names(completer):
|
|
|
|
result = get_result(completer, "SELECT id, from users u", len("SELECT id, "))
|
|
|
|
assert completions_to_set(result) == completions_to_set(
|
2025-02-09 19:58:08 +01:00
|
|
|
testdata.columns_functions_and_keywords("users")
|
2025-02-09 19:48:22 +01:00
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
@parametrize("completer", completers(casing=False))
|
|
|
|
def test_suggested_multiple_column_names_with_alias(completer):
|
|
|
|
result = get_result(
|
|
|
|
completer, "SELECT u.id, u. from users u", len("SELECT u.id, u.")
|
|
|
|
)
|
|
|
|
assert completions_to_set(result) == completions_to_set(testdata.columns("users"))
|
|
|
|
|
|
|
|
|
|
|
|
@parametrize("completer", completers(casing=True))
|
|
|
|
def test_suggested_cased_column_names_with_alias(completer):
|
|
|
|
result = get_result(
|
|
|
|
completer, "SELECT u.id, u. from users u", len("SELECT u.id, u.")
|
|
|
|
)
|
|
|
|
assert completions_to_set(result) == completions_to_set(cased_users_cols)
|
|
|
|
|
|
|
|
|
|
|
|
@parametrize("completer", completers(casing=False))
|
|
|
|
def test_suggested_multiple_column_names_with_dot(completer):
|
|
|
|
result = get_result(
|
|
|
|
completer,
|
|
|
|
"SELECT users.id, users. from users u",
|
|
|
|
len("SELECT users.id, users."),
|
|
|
|
)
|
|
|
|
assert completions_to_set(result) == completions_to_set(testdata.columns("users"))
|
|
|
|
|
|
|
|
|
|
|
|
@parametrize("completer", completers(casing=False))
|
|
|
|
def test_suggest_columns_after_three_way_join(completer):
|
|
|
|
text = """SELECT * FROM users u1
|
|
|
|
INNER JOIN users u2 ON u1.id = u2.id
|
|
|
|
INNER JOIN users u3 ON u2.id = u3."""
|
|
|
|
result = get_result(completer, text)
|
|
|
|
assert column("id") in result
|
|
|
|
|
|
|
|
|
|
|
|
join_condition_texts = [
|
|
|
|
'INSERT INTO orders SELECT * FROM users U JOIN "Users" U2 ON ',
|
|
|
|
"""INSERT INTO public.orders(orderid)
|
|
|
|
SELECT * FROM users U JOIN "Users" U2 ON """,
|
|
|
|
'SELECT * FROM users U JOIN "Users" U2 ON ',
|
|
|
|
'SELECT * FROM users U INNER join "Users" U2 ON ',
|
|
|
|
'SELECT * FROM USERS U right JOIN "Users" U2 ON ',
|
|
|
|
'SELECT * FROM users U LEFT JOIN "Users" U2 ON ',
|
|
|
|
'SELECT * FROM Users U FULL JOIN "Users" U2 ON ',
|
|
|
|
'SELECT * FROM users U right outer join "Users" U2 ON ',
|
|
|
|
'SELECT * FROM Users U LEFT OUTER JOIN "Users" U2 ON ',
|
|
|
|
'SELECT * FROM users U FULL OUTER JOIN "Users" U2 ON ',
|
|
|
|
"""SELECT *
|
|
|
|
FROM users U
|
|
|
|
FULL OUTER JOIN "Users" U2 ON
|
|
|
|
""",
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
|
|
@parametrize("completer", completers(casing=False))
|
|
|
|
@parametrize("text", join_condition_texts)
|
|
|
|
def test_suggested_join_conditions(completer, text):
|
|
|
|
result = get_result(completer, text)
|
|
|
|
assert completions_to_set(result) == completions_to_set(
|
|
|
|
[alias("U"), alias("U2"), fk_join("U2.userid = U.id")]
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
@parametrize("completer", completers(casing=True))
|
|
|
|
@parametrize("text", join_condition_texts)
|
|
|
|
def test_cased_join_conditions(completer, text):
|
|
|
|
result = get_result(completer, text)
|
|
|
|
assert completions_to_set(result) == completions_to_set(
|
|
|
|
[alias("U"), alias("U2"), fk_join("U2.UserID = U.ID")]
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
@parametrize("completer", completers(casing=False))
|
|
|
|
@parametrize(
|
|
|
|
"text",
|
|
|
|
[
|
|
|
|
"""SELECT *
|
|
|
|
FROM users
|
|
|
|
CROSS JOIN "Users"
|
|
|
|
NATURAL JOIN users u
|
|
|
|
JOIN "Users" u2 ON
|
|
|
|
"""
|
|
|
|
],
|
|
|
|
)
|
|
|
|
def test_suggested_join_conditions_with_same_table_twice(completer, text):
|
|
|
|
result = get_result(completer, text)
|
|
|
|
assert result == [
|
|
|
|
fk_join("u2.userid = u.id"),
|
|
|
|
fk_join("u2.userid = users.id"),
|
|
|
|
name_join('u2.userid = "Users".userid'),
|
|
|
|
name_join('u2.username = "Users".username'),
|
|
|
|
alias("u"),
|
|
|
|
alias("u2"),
|
|
|
|
alias("users"),
|
|
|
|
alias('"Users"'),
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
|
|
@parametrize("completer", completers())
|
|
|
|
@parametrize("text", ["SELECT * FROM users JOIN users u2 on foo."])
|
|
|
|
def test_suggested_join_conditions_with_invalid_qualifier(completer, text):
|
|
|
|
result = get_result(completer, text)
|
|
|
|
assert result == []
|
|
|
|
|
|
|
|
|
|
|
|
@parametrize("completer", completers(casing=False))
|
|
|
|
@parametrize(
|
|
|
|
("text", "ref"),
|
|
|
|
[
|
|
|
|
("SELECT * FROM users JOIN NonTable on ", "NonTable"),
|
|
|
|
("SELECT * FROM users JOIN nontable nt on ", "nt"),
|
|
|
|
],
|
|
|
|
)
|
|
|
|
def test_suggested_join_conditions_with_invalid_table(completer, text, ref):
|
|
|
|
result = get_result(completer, text)
|
|
|
|
assert completions_to_set(result) == completions_to_set(
|
|
|
|
[alias("users"), alias(ref)]
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
@parametrize("completer", completers(casing=False, aliasing=False))
|
|
|
|
@parametrize(
|
|
|
|
"text",
|
|
|
|
[
|
|
|
|
'SELECT * FROM "Users" u JOIN u',
|
|
|
|
'SELECT * FROM "Users" u JOIN uid',
|
|
|
|
'SELECT * FROM "Users" u JOIN userid',
|
|
|
|
'SELECT * FROM "Users" u JOIN id',
|
|
|
|
],
|
|
|
|
)
|
|
|
|
def test_suggested_joins_fuzzy(completer, text):
|
|
|
|
result = get_result(completer, text)
|
|
|
|
last_word = text.split()[-1]
|
|
|
|
expected = join("users ON users.id = u.userid", -len(last_word))
|
|
|
|
assert expected in result
|
|
|
|
|
|
|
|
|
|
|
|
join_texts = [
|
|
|
|
"SELECT * FROM Users JOIN ",
|
|
|
|
"""INSERT INTO "Users"
|
|
|
|
SELECT *
|
|
|
|
FROM Users
|
|
|
|
INNER JOIN """,
|
|
|
|
"""INSERT INTO public."Users"(username)
|
|
|
|
SELECT *
|
|
|
|
FROM Users
|
|
|
|
INNER JOIN """,
|
|
|
|
"""SELECT *
|
|
|
|
FROM Users
|
|
|
|
INNER JOIN """,
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
|
|
@parametrize("completer", completers(casing=False, aliasing=False))
|
|
|
|
@parametrize("text", join_texts)
|
|
|
|
def test_suggested_joins(completer, text):
|
|
|
|
result = get_result(completer, text)
|
|
|
|
assert completions_to_set(result) == completions_to_set(
|
|
|
|
testdata.schemas_and_from_clause_items()
|
|
|
|
+ [
|
|
|
|
join('"Users" ON "Users".userid = Users.id'),
|
|
|
|
join("users users2 ON users2.id = Users.parentid"),
|
|
|
|
join("users users2 ON users2.parentid = Users.id"),
|
|
|
|
]
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
@parametrize("completer", completers(casing=True, aliasing=False))
|
|
|
|
@parametrize("text", join_texts)
|
|
|
|
def test_cased_joins(completer, text):
|
|
|
|
result = get_result(completer, text)
|
|
|
|
assert completions_to_set(result) == completions_to_set(
|
|
|
|
[schema("PUBLIC")]
|
|
|
|
+ cased_rels
|
|
|
|
+ [
|
|
|
|
join('"Users" ON "Users".UserID = Users.ID'),
|
|
|
|
join("Users Users2 ON Users2.ID = Users.PARENTID"),
|
|
|
|
join("Users Users2 ON Users2.PARENTID = Users.ID"),
|
|
|
|
]
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
@parametrize("completer", completers(casing=False, aliasing=True))
|
|
|
|
@parametrize("text", join_texts)
|
|
|
|
def test_aliased_joins(completer, text):
|
|
|
|
result = get_result(completer, text)
|
|
|
|
assert completions_to_set(result) == completions_to_set(
|
|
|
|
testdata.schemas()
|
|
|
|
+ aliased_rels
|
|
|
|
+ [
|
|
|
|
join('"Users" U ON U.userid = Users.id'),
|
|
|
|
join("users u ON u.id = Users.parentid"),
|
|
|
|
join("users u ON u.parentid = Users.id"),
|
|
|
|
]
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
@parametrize("completer", completers(casing=False, aliasing=False))
|
|
|
|
@parametrize(
|
|
|
|
"text",
|
|
|
|
[
|
|
|
|
'SELECT * FROM public."Users" JOIN ',
|
|
|
|
'SELECT * FROM public."Users" RIGHT OUTER JOIN ',
|
|
|
|
"""SELECT *
|
|
|
|
FROM public."Users"
|
|
|
|
LEFT JOIN """,
|
|
|
|
],
|
|
|
|
)
|
|
|
|
def test_suggested_joins_quoted_schema_qualified_table(completer, text):
|
|
|
|
result = get_result(completer, text)
|
|
|
|
assert completions_to_set(result) == completions_to_set(
|
|
|
|
testdata.schemas_and_from_clause_items()
|
|
|
|
+ [join('public.users ON users.id = "Users".userid')]
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
@parametrize("completer", completers(casing=False))
|
|
|
|
@parametrize(
|
|
|
|
"text",
|
|
|
|
[
|
|
|
|
"SELECT u.name, o.id FROM users u JOIN orders o ON ",
|
|
|
|
"SELECT u.name, o.id FROM users u JOIN orders o ON JOIN orders o2 ON",
|
|
|
|
],
|
|
|
|
)
|
|
|
|
def test_suggested_aliases_after_on(completer, text):
|
|
|
|
position = len("SELECT u.name, o.id FROM users u JOIN orders o ON ")
|
|
|
|
result = get_result(completer, text, position)
|
|
|
|
assert completions_to_set(result) == completions_to_set(
|
|
|
|
[
|
|
|
|
alias("u"),
|
|
|
|
name_join("o.id = u.id"),
|
|
|
|
name_join("o.email = u.email"),
|
|
|
|
alias("o"),
|
|
|
|
]
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
@parametrize("completer", completers())
|
|
|
|
@parametrize(
|
|
|
|
"text",
|
|
|
|
[
|
|
|
|
"SELECT u.name, o.id FROM users u JOIN orders o ON o.user_id = ",
|
|
|
|
"SELECT u.name, o.id FROM users u JOIN orders o ON o.user_id = JOIN orders o2 ON",
|
|
|
|
],
|
|
|
|
)
|
|
|
|
def test_suggested_aliases_after_on_right_side(completer, text):
|
|
|
|
position = len("SELECT u.name, o.id FROM users u JOIN orders o ON o.user_id = ")
|
|
|
|
result = get_result(completer, text, position)
|
|
|
|
assert completions_to_set(result) == completions_to_set([alias("u"), alias("o")])
|
|
|
|
|
|
|
|
|
|
|
|
@parametrize("completer", completers(casing=False))
|
|
|
|
@parametrize(
|
|
|
|
"text",
|
|
|
|
[
|
|
|
|
"SELECT users.name, orders.id FROM users JOIN orders ON ",
|
|
|
|
"SELECT users.name, orders.id FROM users JOIN orders ON JOIN orders orders2 ON",
|
|
|
|
],
|
|
|
|
)
|
|
|
|
def test_suggested_tables_after_on(completer, text):
|
|
|
|
position = len("SELECT users.name, orders.id FROM users JOIN orders ON ")
|
|
|
|
result = get_result(completer, text, position)
|
|
|
|
assert completions_to_set(result) == completions_to_set(
|
|
|
|
[
|
|
|
|
name_join("orders.id = users.id"),
|
|
|
|
name_join("orders.email = users.email"),
|
|
|
|
alias("users"),
|
|
|
|
alias("orders"),
|
|
|
|
]
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
@parametrize("completer", completers(casing=False))
|
|
|
|
@parametrize(
|
|
|
|
"text",
|
|
|
|
[
|
|
|
|
"SELECT users.name, orders.id FROM users JOIN orders ON orders.user_id = JOIN orders orders2 ON",
|
|
|
|
"SELECT users.name, orders.id FROM users JOIN orders ON orders.user_id = ",
|
|
|
|
],
|
|
|
|
)
|
|
|
|
def test_suggested_tables_after_on_right_side(completer, text):
|
|
|
|
position = len(
|
|
|
|
"SELECT users.name, orders.id FROM users JOIN orders ON orders.user_id = "
|
|
|
|
)
|
|
|
|
result = get_result(completer, text, position)
|
|
|
|
assert completions_to_set(result) == completions_to_set(
|
|
|
|
[alias("users"), alias("orders")]
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
@parametrize("completer", completers(casing=False))
|
|
|
|
@parametrize(
|
|
|
|
"text",
|
|
|
|
[
|
|
|
|
"SELECT * FROM users INNER JOIN orders USING (",
|
|
|
|
"SELECT * FROM users INNER JOIN orders USING(",
|
|
|
|
],
|
|
|
|
)
|
|
|
|
def test_join_using_suggests_common_columns(completer, text):
|
|
|
|
result = get_result(completer, text)
|
|
|
|
assert completions_to_set(result) == completions_to_set(
|
|
|
|
[column("id"), column("email")]
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
@parametrize("completer", completers(casing=False))
|
|
|
|
@parametrize(
|
|
|
|
"text",
|
|
|
|
[
|
|
|
|
"SELECT * FROM users u1 JOIN users u2 USING (email) JOIN user_emails ue USING()",
|
|
|
|
"SELECT * FROM users u1 JOIN users u2 USING(email) JOIN user_emails ue USING ()",
|
|
|
|
"SELECT * FROM users u1 JOIN user_emails ue USING () JOIN users u2 ue USING(first_name, last_name)",
|
|
|
|
"SELECT * FROM users u1 JOIN user_emails ue USING() JOIN users u2 ue USING (first_name, last_name)",
|
|
|
|
],
|
|
|
|
)
|
|
|
|
def test_join_using_suggests_from_last_table(completer, text):
|
|
|
|
position = text.index("()") + 1
|
|
|
|
result = get_result(completer, text, position)
|
|
|
|
assert completions_to_set(result) == completions_to_set(
|
|
|
|
[column("id"), column("email")]
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
@parametrize("completer", completers(casing=False))
|
|
|
|
@parametrize(
|
|
|
|
"text",
|
|
|
|
[
|
|
|
|
"SELECT * FROM users INNER JOIN orders USING (id,",
|
|
|
|
"SELECT * FROM users INNER JOIN orders USING(id,",
|
|
|
|
],
|
|
|
|
)
|
|
|
|
def test_join_using_suggests_columns_after_first_column(completer, text):
|
|
|
|
result = get_result(completer, text)
|
|
|
|
assert completions_to_set(result) == completions_to_set(
|
|
|
|
[column("id"), column("email")]
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
@parametrize("completer", completers(casing=False, aliasing=False))
|
|
|
|
@parametrize(
|
|
|
|
"text",
|
|
|
|
[
|
|
|
|
"SELECT * FROM ",
|
|
|
|
"SELECT * FROM users CROSS JOIN ",
|
|
|
|
"SELECT * FROM users natural join ",
|
|
|
|
],
|
|
|
|
)
|
|
|
|
def test_table_names_after_from(completer, text):
|
|
|
|
result = get_result(completer, text)
|
|
|
|
assert completions_to_set(result) == completions_to_set(
|
|
|
|
testdata.schemas_and_from_clause_items()
|
|
|
|
)
|
|
|
|
assert [c.text for c in result] == [
|
|
|
|
"public",
|
|
|
|
"orders",
|
|
|
|
'"select"',
|
|
|
|
"users",
|
|
|
|
'"Users"',
|
|
|
|
"functions",
|
|
|
|
"user_emails",
|
|
|
|
"_custom_fun()",
|
|
|
|
"custom_fun()",
|
|
|
|
"custom_func1()",
|
|
|
|
"custom_func2()",
|
|
|
|
"set_returning_func(x := , y := )",
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
|
|
@parametrize("completer", completers(casing=False, qualify=no_qual))
|
|
|
|
def test_auto_escaped_col_names(completer):
|
|
|
|
result = get_result(completer, 'SELECT from "select"', len("SELECT "))
|
|
|
|
assert completions_to_set(result) == completions_to_set(
|
|
|
|
testdata.columns_functions_and_keywords("select")
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
@parametrize("completer", completers(aliasing=False))
|
|
|
|
def test_allow_leading_double_quote_in_last_word(completer):
|
|
|
|
result = get_result(completer, 'SELECT * from "sele')
|
|
|
|
|
|
|
|
expected = table('"select"', -5)
|
|
|
|
|
|
|
|
assert expected in result
|
|
|
|
|
|
|
|
|
|
|
|
@parametrize("completer", completers(casing=False))
|
|
|
|
@parametrize(
|
|
|
|
"text",
|
|
|
|
[
|
|
|
|
"SELECT 1::",
|
|
|
|
"CREATE TABLE foo (bar ",
|
|
|
|
"CREATE FUNCTION foo (bar INT, baz ",
|
|
|
|
"ALTER TABLE foo ALTER COLUMN bar TYPE ",
|
|
|
|
],
|
|
|
|
)
|
|
|
|
def test_suggest_datatype(text, completer):
|
|
|
|
result = get_result(completer, text)
|
|
|
|
assert completions_to_set(result) == completions_to_set(
|
|
|
|
testdata.schemas() + testdata.types() + testdata.builtin_datatypes()
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
@parametrize("completer", completers(casing=False))
|
|
|
|
def test_suggest_columns_from_escaped_table_alias(completer):
|
|
|
|
result = get_result(completer, 'select * from "select" s where s.')
|
|
|
|
assert completions_to_set(result) == completions_to_set(testdata.columns("select"))
|
|
|
|
|
|
|
|
|
|
|
|
@parametrize("completer", completers(casing=False, qualify=no_qual))
|
|
|
|
def test_suggest_columns_from_set_returning_function(completer):
|
|
|
|
result = get_result(completer, "select from set_returning_func()", len("select "))
|
|
|
|
assert completions_to_set(result) == completions_to_set(
|
|
|
|
testdata.columns_functions_and_keywords("set_returning_func", typ="functions")
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
@parametrize("completer", completers(casing=False))
|
|
|
|
def test_suggest_columns_from_aliased_set_returning_function(completer):
|
|
|
|
result = get_result(
|
|
|
|
completer, "select f. from set_returning_func() f", len("select f.")
|
|
|
|
)
|
|
|
|
assert completions_to_set(result) == completions_to_set(
|
|
|
|
testdata.columns("set_returning_func", typ="functions")
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
@parametrize("completer", completers(casing=False))
|
|
|
|
def test_join_functions_using_suggests_common_columns(completer):
|
|
|
|
text = """SELECT * FROM set_returning_func() f1
|
|
|
|
INNER JOIN set_returning_func() f2 USING ("""
|
|
|
|
result = get_result(completer, text)
|
|
|
|
assert completions_to_set(result) == completions_to_set(
|
|
|
|
testdata.columns("set_returning_func", typ="functions")
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
@parametrize("completer", completers(casing=False))
|
|
|
|
def test_join_functions_on_suggests_columns_and_join_conditions(completer):
|
|
|
|
text = """SELECT * FROM set_returning_func() f1
|
|
|
|
INNER JOIN set_returning_func() f2 ON f1."""
|
|
|
|
result = get_result(completer, text)
|
|
|
|
assert completions_to_set(result) == completions_to_set(
|
|
|
|
[name_join("y = f2.y"), name_join("x = f2.x")]
|
|
|
|
+ testdata.columns("set_returning_func", typ="functions")
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
@parametrize("completer", completers())
|
|
|
|
def test_learn_keywords(completer):
|
|
|
|
history = "CREATE VIEW v AS SELECT 1"
|
|
|
|
completer.extend_query_history(history)
|
|
|
|
|
|
|
|
# Now that we've used `VIEW` once, it should be suggested ahead of other
|
|
|
|
# keywords starting with v.
|
|
|
|
text = "create v"
|
|
|
|
completions = get_result(completer, text)
|
|
|
|
assert completions[0].text == "VIEW"
|
|
|
|
|
|
|
|
|
|
|
|
@parametrize("completer", completers(casing=False, aliasing=False))
|
|
|
|
def test_learn_table_names(completer):
|
|
|
|
history = "SELECT * FROM users; SELECT * FROM orders; SELECT * FROM users"
|
|
|
|
completer.extend_query_history(history)
|
|
|
|
|
|
|
|
text = "SELECT * FROM "
|
|
|
|
completions = get_result(completer, text)
|
|
|
|
|
|
|
|
# `users` should be higher priority than `orders` (used more often)
|
|
|
|
users = table("users")
|
|
|
|
orders = table("orders")
|
|
|
|
|
|
|
|
assert completions.index(users) < completions.index(orders)
|
|
|
|
|
|
|
|
|
|
|
|
@parametrize("completer", completers(casing=False, qualify=no_qual))
|
|
|
|
def test_columns_before_keywords(completer):
|
|
|
|
text = "SELECT * FROM orders WHERE s"
|
|
|
|
completions = get_result(completer, text)
|
|
|
|
|
|
|
|
col = column("status", -1)
|
|
|
|
kw = keyword("SELECT", -1)
|
|
|
|
|
|
|
|
assert completions.index(col) < completions.index(kw)
|
|
|
|
|
|
|
|
|
|
|
|
@parametrize("completer", completers(casing=False, qualify=no_qual))
|
|
|
|
@parametrize(
|
|
|
|
"text",
|
|
|
|
[
|
|
|
|
"SELECT * FROM users",
|
|
|
|
"INSERT INTO users SELECT * FROM users u",
|
|
|
|
"""INSERT INTO users(id, parentid, email, first_name, last_name)
|
|
|
|
SELECT *
|
|
|
|
FROM users u""",
|
|
|
|
],
|
|
|
|
)
|
|
|
|
def test_wildcard_column_expansion(completer, text):
|
|
|
|
position = text.find("*") + 1
|
|
|
|
|
|
|
|
completions = get_result(completer, text, position)
|
|
|
|
|
|
|
|
col_list = "id, parentid, email, first_name, last_name"
|
|
|
|
expected = [wildcard_expansion(col_list)]
|
|
|
|
|
|
|
|
assert expected == completions
|
|
|
|
|
|
|
|
|
|
|
|
@parametrize("completer", completers(casing=False))
|
|
|
|
@parametrize(
|
|
|
|
"text",
|
|
|
|
[
|
|
|
|
"SELECT u.* FROM users u",
|
|
|
|
"INSERT INTO public.users SELECT u.* FROM users u",
|
|
|
|
"""INSERT INTO users(id, parentid, email, first_name, last_name)
|
|
|
|
SELECT u.*
|
|
|
|
FROM users u""",
|
|
|
|
],
|
|
|
|
)
|
|
|
|
def test_wildcard_column_expansion_with_alias(completer, text):
|
|
|
|
position = text.find("*") + 1
|
|
|
|
|
|
|
|
completions = get_result(completer, text, position)
|
|
|
|
|
|
|
|
col_list = "id, u.parentid, u.email, u.first_name, u.last_name"
|
|
|
|
expected = [wildcard_expansion(col_list)]
|
|
|
|
|
|
|
|
assert expected == completions
|
|
|
|
|
|
|
|
|
|
|
|
@parametrize("completer", completers(casing=False))
|
|
|
|
@parametrize(
|
|
|
|
"text,expected",
|
|
|
|
[
|
|
|
|
(
|
|
|
|
"SELECT users.* FROM users",
|
|
|
|
"id, users.parentid, users.email, users.first_name, users.last_name",
|
|
|
|
),
|
|
|
|
(
|
|
|
|
"SELECT Users.* FROM Users",
|
|
|
|
"id, Users.parentid, Users.email, Users.first_name, Users.last_name",
|
|
|
|
),
|
|
|
|
],
|
|
|
|
)
|
|
|
|
def test_wildcard_column_expansion_with_table_qualifier(completer, text, expected):
|
|
|
|
position = len("SELECT users.*")
|
|
|
|
|
|
|
|
completions = get_result(completer, text, position)
|
|
|
|
|
|
|
|
expected = [wildcard_expansion(expected)]
|
|
|
|
|
|
|
|
assert expected == completions
|
|
|
|
|
|
|
|
|
|
|
|
@parametrize("completer", completers(casing=False, qualify=qual))
|
|
|
|
def test_wildcard_column_expansion_with_two_tables(completer):
|
|
|
|
text = 'SELECT * FROM "select" JOIN users u ON true'
|
|
|
|
position = len("SELECT *")
|
|
|
|
|
|
|
|
completions = get_result(completer, text, position)
|
|
|
|
|
|
|
|
cols = (
|
|
|
|
'"select".id, "select".insert, "select"."ABC", '
|
|
|
|
"u.id, u.parentid, u.email, u.first_name, u.last_name"
|
|
|
|
)
|
|
|
|
expected = [wildcard_expansion(cols)]
|
|
|
|
assert completions == expected
|
|
|
|
|
|
|
|
|
|
|
|
@parametrize("completer", completers(casing=False))
|
|
|
|
def test_wildcard_column_expansion_with_two_tables_and_parent(completer):
|
|
|
|
text = 'SELECT "select".* FROM "select" JOIN users u ON true'
|
|
|
|
position = len('SELECT "select".*')
|
|
|
|
|
|
|
|
completions = get_result(completer, text, position)
|
|
|
|
|
|
|
|
col_list = 'id, "select".insert, "select"."ABC"'
|
|
|
|
expected = [wildcard_expansion(col_list)]
|
|
|
|
|
|
|
|
assert expected == completions
|
|
|
|
|
|
|
|
|
|
|
|
@parametrize("completer", completers(casing=False))
|
|
|
|
@parametrize(
|
|
|
|
"text",
|
|
|
|
["SELECT U. FROM Users U", "SELECT U. FROM USERS U", "SELECT U. FROM users U"],
|
|
|
|
)
|
|
|
|
def test_suggest_columns_from_unquoted_table(completer, text):
|
|
|
|
position = len("SELECT U.")
|
|
|
|
result = get_result(completer, text, position)
|
|
|
|
assert completions_to_set(result) == completions_to_set(testdata.columns("users"))
|
|
|
|
|
|
|
|
|
|
|
|
@parametrize("completer", completers(casing=False))
|
|
|
|
def test_suggest_columns_from_quoted_table(completer):
|
|
|
|
result = get_result(completer, 'SELECT U. FROM "Users" U', len("SELECT U."))
|
|
|
|
assert completions_to_set(result) == completions_to_set(testdata.columns("Users"))
|
|
|
|
|
|
|
|
|
|
|
|
@parametrize("completer", completers(casing=False, aliasing=False))
|
|
|
|
@parametrize("text", ["SELECT * FROM ", "SELECT * FROM Orders o CROSS JOIN "])
|
|
|
|
def test_schema_or_visible_table_completion(completer, text):
|
|
|
|
result = get_result(completer, text)
|
|
|
|
assert completions_to_set(result) == completions_to_set(
|
|
|
|
testdata.schemas_and_from_clause_items()
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
@parametrize("completer", completers(casing=False, aliasing=True))
|
|
|
|
@parametrize("text", ["SELECT * FROM "])
|
|
|
|
def test_table_aliases(completer, text):
|
|
|
|
result = get_result(completer, text)
|
|
|
|
assert completions_to_set(result) == completions_to_set(
|
|
|
|
testdata.schemas() + aliased_rels
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
@parametrize("completer", completers(casing=False, aliasing=True))
|
|
|
|
@parametrize("text", ["SELECT * FROM Orders o CROSS JOIN "])
|
|
|
|
def test_duplicate_table_aliases(completer, text):
|
|
|
|
result = get_result(completer, text)
|
|
|
|
assert completions_to_set(result) == completions_to_set(
|
|
|
|
testdata.schemas()
|
|
|
|
+ [
|
|
|
|
table("orders o2"),
|
|
|
|
table("users u"),
|
|
|
|
table('"Users" U'),
|
|
|
|
table('"select" s'),
|
|
|
|
view("user_emails ue"),
|
|
|
|
view("functions f"),
|
|
|
|
function("_custom_fun() cf"),
|
|
|
|
function("custom_fun() cf"),
|
|
|
|
function("custom_func1() cf"),
|
|
|
|
function("custom_func2() cf"),
|
|
|
|
function(
|
|
|
|
"set_returning_func(x := , y := ) srf",
|
|
|
|
display="set_returning_func(x, y) srf",
|
|
|
|
),
|
|
|
|
]
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
@parametrize("completer", completers(casing=True, aliasing=True))
|
|
|
|
@parametrize("text", ["SELECT * FROM Orders o CROSS JOIN "])
|
|
|
|
def test_duplicate_aliases_with_casing(completer, text):
|
|
|
|
result = get_result(completer, text)
|
|
|
|
assert completions_to_set(result) == completions_to_set(
|
|
|
|
[
|
|
|
|
schema("PUBLIC"),
|
|
|
|
table("Orders O2"),
|
|
|
|
table("Users U"),
|
|
|
|
table('"Users" U'),
|
|
|
|
table('"select" s'),
|
|
|
|
view("User_Emails UE"),
|
|
|
|
view("Functions F"),
|
|
|
|
function("_custom_fun() cf"),
|
|
|
|
function("Custom_Fun() CF"),
|
|
|
|
function("Custom_Func1() CF"),
|
|
|
|
function("custom_func2() cf"),
|
|
|
|
function(
|
|
|
|
"set_returning_func(x := , y := ) srf",
|
|
|
|
display="set_returning_func(x, y) srf",
|
|
|
|
),
|
|
|
|
]
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
@parametrize("completer", completers(casing=True, aliasing=True))
|
|
|
|
@parametrize("text", ["SELECT * FROM "])
|
|
|
|
def test_aliases_with_casing(completer, text):
|
|
|
|
result = get_result(completer, text)
|
|
|
|
assert completions_to_set(result) == completions_to_set(
|
|
|
|
[schema("PUBLIC")] + cased_aliased_rels
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
@parametrize("completer", completers(casing=True, aliasing=False))
|
|
|
|
@parametrize("text", ["SELECT * FROM "])
|
|
|
|
def test_table_casing(completer, text):
|
|
|
|
result = get_result(completer, text)
|
|
|
|
assert completions_to_set(result) == completions_to_set(
|
|
|
|
[schema("PUBLIC")] + cased_rels
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
@parametrize("completer", completers(casing=False))
|
|
|
|
@parametrize(
|
|
|
|
"text",
|
|
|
|
[
|
|
|
|
"INSERT INTO users ()",
|
|
|
|
"INSERT INTO users()",
|
|
|
|
"INSERT INTO users () SELECT * FROM orders;",
|
|
|
|
"INSERT INTO users() SELECT * FROM users u cross join orders o",
|
|
|
|
],
|
|
|
|
)
|
|
|
|
def test_insert(completer, text):
|
|
|
|
position = text.find("(") + 1
|
|
|
|
result = get_result(completer, text, position)
|
|
|
|
assert completions_to_set(result) == completions_to_set(testdata.columns("users"))
|
|
|
|
|
|
|
|
|
|
|
|
@parametrize("completer", completers(casing=False, aliasing=False))
|
|
|
|
def test_suggest_cte_names(completer):
|
|
|
|
text = """
|
|
|
|
WITH cte1 AS (SELECT a, b, c FROM foo),
|
|
|
|
cte2 AS (SELECT d, e, f FROM bar)
|
|
|
|
SELECT * FROM
|
|
|
|
"""
|
|
|
|
result = get_result(completer, text)
|
|
|
|
expected = completions_to_set(
|
|
|
|
[
|
|
|
|
Completion("cte1", 0, display_meta="table"),
|
|
|
|
Completion("cte2", 0, display_meta="table"),
|
|
|
|
]
|
|
|
|
)
|
|
|
|
assert expected <= completions_to_set(result)
|
|
|
|
|
|
|
|
|
|
|
|
@parametrize("completer", completers(casing=False, qualify=no_qual))
|
|
|
|
def test_suggest_columns_from_cte(completer):
|
|
|
|
result = get_result(
|
|
|
|
completer,
|
|
|
|
"WITH cte AS (SELECT foo, bar FROM baz) SELECT FROM cte",
|
|
|
|
len("WITH cte AS (SELECT foo, bar FROM baz) SELECT "),
|
|
|
|
)
|
|
|
|
expected = [
|
|
|
|
Completion("foo", 0, display_meta="column"),
|
|
|
|
Completion("bar", 0, display_meta="column"),
|
|
|
|
] + testdata.functions_and_keywords()
|
|
|
|
|
|
|
|
assert completions_to_set(expected) == completions_to_set(result)
|
|
|
|
|
|
|
|
|
|
|
|
@parametrize("completer", completers(casing=False, qualify=no_qual))
|
|
|
|
@parametrize(
|
|
|
|
"text",
|
|
|
|
[
|
|
|
|
"WITH cte AS (SELECT foo FROM bar) SELECT * FROM cte WHERE cte.",
|
|
|
|
"WITH cte AS (SELECT foo FROM bar) SELECT * FROM cte c WHERE c.",
|
|
|
|
],
|
|
|
|
)
|
|
|
|
def test_cte_qualified_columns(completer, text):
|
|
|
|
result = get_result(completer, text)
|
|
|
|
expected = [Completion("foo", 0, display_meta="column")]
|
|
|
|
assert completions_to_set(expected) == completions_to_set(result)
|
|
|
|
|
|
|
|
|
|
|
|
@parametrize(
|
|
|
|
"keyword_casing,expected,texts",
|
|
|
|
[
|
|
|
|
("upper", "SELECT", ("", "s", "S", "Sel")),
|
|
|
|
("lower", "select", ("", "s", "S", "Sel")),
|
|
|
|
("auto", "SELECT", ("", "S", "SEL", "seL")),
|
|
|
|
("auto", "select", ("s", "sel", "SEl")),
|
|
|
|
],
|
|
|
|
)
|
|
|
|
def test_keyword_casing_upper(keyword_casing, expected, texts):
|
|
|
|
for text in texts:
|
|
|
|
completer = testdata.get_completer({"keyword_casing": keyword_casing})
|
|
|
|
completions = get_result(completer, text)
|
|
|
|
assert expected in [cpl.text for cpl in completions]
|
|
|
|
|
|
|
|
|
|
|
|
@parametrize("completer", completers())
|
|
|
|
def test_keyword_after_alter(completer):
|
|
|
|
text = "ALTER TABLE users ALTER "
|
|
|
|
expected = Completion("COLUMN", start_position=0, display_meta="keyword")
|
|
|
|
completions = get_result(completer, text)
|
|
|
|
assert expected in completions
|
|
|
|
|
|
|
|
|
|
|
|
@parametrize("completer", completers())
|
|
|
|
def test_set_schema(completer):
|
|
|
|
text = "SET SCHEMA "
|
|
|
|
result = get_result(completer, text)
|
|
|
|
expected = completions_to_set([schema("'public'")])
|
|
|
|
assert completions_to_set(result) == expected
|
|
|
|
|
|
|
|
|
|
|
|
@parametrize("completer", completers())
|
|
|
|
def test_special_name_completion(completer):
|
|
|
|
result = get_result(completer, "\\t")
|
|
|
|
assert completions_to_set(result) == completions_to_set(
|
|
|
|
[
|
|
|
|
Completion(
|
|
|
|
text="\\timing",
|
|
|
|
start_position=-2,
|
|
|
|
display_meta="Toggle timing of commands.",
|
|
|
|
)
|
|
|
|
]
|
|
|
|
)
|