2025-02-09 19:48:22 +01:00
|
|
|
from textwrap import dedent
|
|
|
|
|
2025-02-09 20:01:39 +01:00
|
|
|
import psycopg
|
2025-02-09 19:48:22 +01:00
|
|
|
import pytest
|
2025-02-09 19:58:08 +01:00
|
|
|
from unittest.mock import patch, MagicMock
|
2025-02-09 19:48:22 +01:00
|
|
|
from pgspecial.main import PGSpecial, NO_QUERY
|
|
|
|
from utils import run, dbtest, requires_json, requires_jsonb
|
|
|
|
|
|
|
|
from pgcli.main import PGCli
|
|
|
|
from pgcli.packages.parseutils.meta import FunctionMetadata
|
|
|
|
|
|
|
|
|
|
|
|
def function_meta_data(
|
|
|
|
func_name,
|
|
|
|
schema_name="public",
|
|
|
|
arg_names=None,
|
|
|
|
arg_types=None,
|
|
|
|
arg_modes=None,
|
|
|
|
return_type=None,
|
|
|
|
is_aggregate=False,
|
|
|
|
is_window=False,
|
|
|
|
is_set_returning=False,
|
|
|
|
is_extension=False,
|
|
|
|
arg_defaults=None,
|
|
|
|
):
|
|
|
|
return FunctionMetadata(
|
|
|
|
schema_name,
|
|
|
|
func_name,
|
|
|
|
arg_names,
|
|
|
|
arg_types,
|
|
|
|
arg_modes,
|
|
|
|
return_type,
|
|
|
|
is_aggregate,
|
|
|
|
is_window,
|
|
|
|
is_set_returning,
|
|
|
|
is_extension,
|
|
|
|
arg_defaults,
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
@dbtest
|
|
|
|
def test_conn(executor):
|
|
|
|
run(executor, """create table test(a text)""")
|
|
|
|
run(executor, """insert into test values('abc')""")
|
|
|
|
assert run(executor, """select * from test""", join=True) == dedent(
|
|
|
|
"""\
|
|
|
|
+-----+
|
|
|
|
| a |
|
|
|
|
|-----|
|
|
|
|
| abc |
|
|
|
|
+-----+
|
|
|
|
SELECT 1"""
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
@dbtest
|
|
|
|
def test_copy(executor):
|
|
|
|
executor_copy = executor.copy()
|
|
|
|
run(executor_copy, """create table test(a text)""")
|
|
|
|
run(executor_copy, """insert into test values('abc')""")
|
|
|
|
assert run(executor_copy, """select * from test""", join=True) == dedent(
|
|
|
|
"""\
|
|
|
|
+-----+
|
|
|
|
| a |
|
|
|
|
|-----|
|
|
|
|
| abc |
|
|
|
|
+-----+
|
|
|
|
SELECT 1"""
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
@dbtest
|
|
|
|
def test_bools_are_treated_as_strings(executor):
|
|
|
|
run(executor, """create table test(a boolean)""")
|
|
|
|
run(executor, """insert into test values(True)""")
|
|
|
|
assert run(executor, """select * from test""", join=True) == dedent(
|
|
|
|
"""\
|
|
|
|
+------+
|
|
|
|
| a |
|
|
|
|
|------|
|
|
|
|
| True |
|
|
|
|
+------+
|
|
|
|
SELECT 1"""
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
@dbtest
|
|
|
|
def test_expanded_slash_G(executor, pgspecial):
|
|
|
|
# Tests whether we reset the expanded output after a \G.
|
|
|
|
run(executor, """create table test(a boolean)""")
|
|
|
|
run(executor, """insert into test values(True)""")
|
2025-02-09 19:58:08 +01:00
|
|
|
results = run(executor, r"""select * from test \G""", pgspecial=pgspecial)
|
2025-02-09 19:48:22 +01:00
|
|
|
assert pgspecial.expanded_output == False
|
|
|
|
|
|
|
|
|
|
|
|
@dbtest
|
|
|
|
def test_schemata_table_views_and_columns_query(executor):
|
|
|
|
run(executor, "create table a(x text, y text)")
|
|
|
|
run(executor, "create table b(z text)")
|
|
|
|
run(executor, "create view d as select 1 as e")
|
|
|
|
run(executor, "create schema schema1")
|
|
|
|
run(executor, "create table schema1.c (w text DEFAULT 'meow')")
|
|
|
|
run(executor, "create schema schema2")
|
|
|
|
|
|
|
|
# schemata
|
|
|
|
# don't enforce all members of the schemas since they may include postgres
|
|
|
|
# temporary schemas
|
2025-02-09 19:58:08 +01:00
|
|
|
assert set(executor.schemata()) >= {
|
|
|
|
"public",
|
|
|
|
"pg_catalog",
|
|
|
|
"information_schema",
|
|
|
|
"schema1",
|
|
|
|
"schema2",
|
|
|
|
}
|
2025-02-09 19:48:22 +01:00
|
|
|
assert executor.search_path() == ["pg_catalog", "public"]
|
|
|
|
|
|
|
|
# tables
|
2025-02-09 19:58:08 +01:00
|
|
|
assert set(executor.tables()) >= {
|
|
|
|
("public", "a"),
|
|
|
|
("public", "b"),
|
|
|
|
("schema1", "c"),
|
|
|
|
}
|
|
|
|
|
|
|
|
assert set(executor.table_columns()) >= {
|
|
|
|
("public", "a", "x", "text", False, None),
|
|
|
|
("public", "a", "y", "text", False, None),
|
|
|
|
("public", "b", "z", "text", False, None),
|
|
|
|
("schema1", "c", "w", "text", True, "'meow'::text"),
|
|
|
|
}
|
2025-02-09 19:48:22 +01:00
|
|
|
|
|
|
|
# views
|
2025-02-09 19:58:08 +01:00
|
|
|
assert set(executor.views()) >= {("public", "d")}
|
2025-02-09 19:48:22 +01:00
|
|
|
|
2025-02-09 19:58:08 +01:00
|
|
|
assert set(executor.view_columns()) >= {
|
|
|
|
("public", "d", "e", "integer", False, None)
|
|
|
|
}
|
2025-02-09 19:48:22 +01:00
|
|
|
|
|
|
|
|
|
|
|
@dbtest
|
|
|
|
def test_foreign_key_query(executor):
|
|
|
|
run(executor, "create schema schema1")
|
|
|
|
run(executor, "create schema schema2")
|
|
|
|
run(executor, "create table schema1.parent(parentid int PRIMARY KEY)")
|
|
|
|
run(
|
|
|
|
executor,
|
|
|
|
"create table schema2.child(childid int PRIMARY KEY, motherid int REFERENCES schema1.parent)",
|
|
|
|
)
|
|
|
|
|
2025-02-09 19:58:08 +01:00
|
|
|
assert set(executor.foreignkeys()) >= {
|
|
|
|
("schema1", "parent", "parentid", "schema2", "child", "motherid")
|
|
|
|
}
|
2025-02-09 19:48:22 +01:00
|
|
|
|
|
|
|
|
|
|
|
@dbtest
|
|
|
|
def test_functions_query(executor):
|
|
|
|
run(
|
|
|
|
executor,
|
|
|
|
"""create function func1() returns int
|
|
|
|
language sql as $$select 1$$""",
|
|
|
|
)
|
|
|
|
run(executor, "create schema schema1")
|
|
|
|
run(
|
|
|
|
executor,
|
|
|
|
"""create function schema1.func2() returns int
|
|
|
|
language sql as $$select 2$$""",
|
|
|
|
)
|
|
|
|
|
|
|
|
run(
|
|
|
|
executor,
|
|
|
|
"""create function func3()
|
|
|
|
returns table(x int, y int) language sql
|
|
|
|
as $$select 1, 2 from generate_series(1,5)$$;""",
|
|
|
|
)
|
|
|
|
|
|
|
|
run(
|
|
|
|
executor,
|
|
|
|
"""create function func4(x int) returns setof int language sql
|
|
|
|
as $$select generate_series(1,5)$$;""",
|
|
|
|
)
|
|
|
|
|
|
|
|
funcs = set(executor.functions())
|
2025-02-09 19:58:08 +01:00
|
|
|
assert funcs >= {
|
|
|
|
function_meta_data(func_name="func1", return_type="integer"),
|
|
|
|
function_meta_data(
|
|
|
|
func_name="func3",
|
|
|
|
arg_names=["x", "y"],
|
|
|
|
arg_types=["integer", "integer"],
|
|
|
|
arg_modes=["t", "t"],
|
|
|
|
return_type="record",
|
|
|
|
is_set_returning=True,
|
|
|
|
),
|
|
|
|
function_meta_data(
|
|
|
|
schema_name="public",
|
|
|
|
func_name="func4",
|
|
|
|
arg_names=("x",),
|
|
|
|
arg_types=("integer",),
|
|
|
|
return_type="integer",
|
|
|
|
is_set_returning=True,
|
|
|
|
),
|
|
|
|
function_meta_data(
|
|
|
|
schema_name="schema1", func_name="func2", return_type="integer"
|
|
|
|
),
|
|
|
|
}
|
2025-02-09 19:48:22 +01:00
|
|
|
|
|
|
|
|
|
|
|
@dbtest
|
|
|
|
def test_datatypes_query(executor):
|
|
|
|
run(executor, "create type foo AS (a int, b text)")
|
|
|
|
|
|
|
|
types = list(executor.datatypes())
|
|
|
|
assert types == [("public", "foo")]
|
|
|
|
|
|
|
|
|
|
|
|
@dbtest
|
|
|
|
def test_database_list(executor):
|
|
|
|
databases = executor.databases()
|
|
|
|
assert "_test_db" in databases
|
|
|
|
|
|
|
|
|
|
|
|
@dbtest
|
|
|
|
def test_invalid_syntax(executor, exception_formatter):
|
|
|
|
result = run(executor, "invalid syntax!", exception_formatter=exception_formatter)
|
|
|
|
assert 'syntax error at or near "invalid"' in result[0]
|
|
|
|
|
|
|
|
|
|
|
|
@dbtest
|
|
|
|
def test_invalid_column_name(executor, exception_formatter):
|
|
|
|
result = run(
|
|
|
|
executor, "select invalid command", exception_formatter=exception_formatter
|
|
|
|
)
|
|
|
|
assert 'column "invalid" does not exist' in result[0]
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture(params=[True, False])
|
|
|
|
def expanded(request):
|
|
|
|
return request.param
|
|
|
|
|
|
|
|
|
|
|
|
@dbtest
|
|
|
|
def test_unicode_support_in_output(executor, expanded):
|
|
|
|
run(executor, "create table unicodechars(t text)")
|
|
|
|
run(executor, "insert into unicodechars (t) values ('é')")
|
|
|
|
|
|
|
|
# See issue #24, this raises an exception without proper handling
|
|
|
|
assert "é" in run(
|
|
|
|
executor, "select * from unicodechars", join=True, expanded=expanded
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
@dbtest
|
|
|
|
def test_not_is_special(executor, pgspecial):
|
|
|
|
"""is_special is set to false for database queries."""
|
|
|
|
query = "select 1"
|
|
|
|
result = list(executor.run(query, pgspecial=pgspecial))
|
|
|
|
success, is_special = result[0][5:]
|
|
|
|
assert success == True
|
|
|
|
assert is_special == False
|
|
|
|
|
|
|
|
|
|
|
|
@dbtest
|
|
|
|
def test_execute_from_file_no_arg(executor, pgspecial):
|
2025-02-09 19:58:08 +01:00
|
|
|
r"""\i without a filename returns an error."""
|
|
|
|
result = list(executor.run(r"\i", pgspecial=pgspecial))
|
2025-02-09 19:48:22 +01:00
|
|
|
status, sql, success, is_special = result[0][3:]
|
|
|
|
assert "missing required argument" in status
|
|
|
|
assert success == False
|
|
|
|
assert is_special == True
|
|
|
|
|
|
|
|
|
|
|
|
@dbtest
|
|
|
|
@patch("pgcli.main.os")
|
|
|
|
def test_execute_from_file_io_error(os, executor, pgspecial):
|
2025-02-09 19:58:08 +01:00
|
|
|
r"""\i with an os_error returns an error."""
|
|
|
|
# Inject an OSError.
|
|
|
|
os.path.expanduser.side_effect = OSError("test")
|
2025-02-09 19:48:22 +01:00
|
|
|
|
|
|
|
# Check the result.
|
2025-02-09 19:58:08 +01:00
|
|
|
result = list(executor.run(r"\i test", pgspecial=pgspecial))
|
2025-02-09 19:48:22 +01:00
|
|
|
status, sql, success, is_special = result[0][3:]
|
|
|
|
assert status == "test"
|
|
|
|
assert success == False
|
|
|
|
assert is_special == True
|
|
|
|
|
|
|
|
|
2025-02-09 20:01:39 +01:00
|
|
|
@dbtest
|
|
|
|
def test_execute_from_commented_file_that_executes_another_file(
|
|
|
|
executor, pgspecial, tmpdir
|
|
|
|
):
|
|
|
|
# https://github.com/dbcli/pgcli/issues/1336
|
|
|
|
sqlfile1 = tmpdir.join("test01.sql")
|
|
|
|
sqlfile1.write("-- asdf \n\\h")
|
|
|
|
sqlfile2 = tmpdir.join("test00.sql")
|
|
|
|
sqlfile2.write("--An useless comment;\nselect now();\n-- another useless comment")
|
|
|
|
|
|
|
|
rcfile = str(tmpdir.join("rcfile"))
|
|
|
|
print(rcfile)
|
|
|
|
cli = PGCli(pgexecute=executor, pgclirc_file=rcfile)
|
|
|
|
assert cli != None
|
|
|
|
statement = "--comment\n\\h"
|
|
|
|
result = run(executor, statement, pgspecial=cli.pgspecial)
|
|
|
|
assert result != None
|
|
|
|
assert result[0].find("ALTER TABLE")
|
|
|
|
|
|
|
|
|
|
|
|
@dbtest
|
|
|
|
def test_execute_commented_first_line_and_special(executor, pgspecial, tmpdir):
|
|
|
|
# https://github.com/dbcli/pgcli/issues/1362
|
|
|
|
|
|
|
|
# just some base caes that should work also
|
|
|
|
statement = "--comment\nselect now();"
|
|
|
|
result = run(executor, statement, pgspecial=pgspecial)
|
|
|
|
assert result != None
|
|
|
|
assert result[1].find("now") >= 0
|
|
|
|
|
|
|
|
statement = "/*comment*/\nselect now();"
|
|
|
|
result = run(executor, statement, pgspecial=pgspecial)
|
|
|
|
assert result != None
|
|
|
|
assert result[1].find("now") >= 0
|
|
|
|
|
|
|
|
statement = "/*comment\ncomment line2*/\nselect now();"
|
|
|
|
result = run(executor, statement, pgspecial=pgspecial)
|
|
|
|
assert result != None
|
|
|
|
assert result[1].find("now") >= 0
|
|
|
|
|
|
|
|
statement = "--comment\n\\h"
|
|
|
|
result = run(executor, statement, pgspecial=pgspecial)
|
|
|
|
assert result != None
|
|
|
|
assert result[1].find("ALTER") >= 0
|
|
|
|
assert result[1].find("ABORT") >= 0
|
|
|
|
|
|
|
|
statement = "/*comment*/\n\h;"
|
|
|
|
result = run(executor, statement, pgspecial=pgspecial)
|
|
|
|
assert result != None
|
|
|
|
assert result[1].find("ALTER") >= 0
|
|
|
|
assert result[1].find("ABORT") >= 0
|
|
|
|
|
|
|
|
statement = " /*comment*/\n\h;"
|
|
|
|
result = run(executor, statement, pgspecial=pgspecial)
|
|
|
|
assert result != None
|
|
|
|
assert result[1].find("ALTER") >= 0
|
|
|
|
assert result[1].find("ABORT") >= 0
|
|
|
|
|
|
|
|
statement = "/*comment\ncomment line2*/\n\h;"
|
|
|
|
result = run(executor, statement, pgspecial=pgspecial)
|
|
|
|
assert result != None
|
|
|
|
assert result[1].find("ALTER") >= 0
|
|
|
|
assert result[1].find("ABORT") >= 0
|
|
|
|
|
|
|
|
statement = " /*comment\ncomment line2*/\n\h;"
|
|
|
|
result = run(executor, statement, pgspecial=pgspecial)
|
|
|
|
assert result != None
|
|
|
|
assert result[1].find("ALTER") >= 0
|
|
|
|
assert result[1].find("ABORT") >= 0
|
|
|
|
|
|
|
|
|
2025-02-09 19:48:22 +01:00
|
|
|
@dbtest
|
|
|
|
def test_multiple_queries_same_line(executor):
|
|
|
|
result = run(executor, "select 'foo'; select 'bar'")
|
|
|
|
assert len(result) == 12 # 2 * (output+status) * 3 lines
|
|
|
|
assert "foo" in result[3]
|
|
|
|
assert "bar" in result[9]
|
|
|
|
|
|
|
|
|
|
|
|
@dbtest
|
|
|
|
def test_multiple_queries_with_special_command_same_line(executor, pgspecial):
|
2025-02-09 19:58:08 +01:00
|
|
|
result = run(executor, r"select 'foo'; \d", pgspecial=pgspecial)
|
2025-02-09 19:48:22 +01:00
|
|
|
assert len(result) == 11 # 2 * (output+status) * 3 lines
|
|
|
|
assert "foo" in result[3]
|
|
|
|
# This is a lame check. :(
|
|
|
|
assert "Schema" in result[7]
|
|
|
|
|
|
|
|
|
|
|
|
@dbtest
|
|
|
|
def test_multiple_queries_same_line_syntaxerror(executor, exception_formatter):
|
|
|
|
result = run(
|
|
|
|
executor,
|
|
|
|
"select 'fooé'; invalid syntax é",
|
|
|
|
exception_formatter=exception_formatter,
|
|
|
|
)
|
|
|
|
assert "fooé" in result[3]
|
|
|
|
assert 'syntax error at or near "invalid"' in result[-1]
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture
|
|
|
|
def pgspecial():
|
|
|
|
return PGCli().pgspecial
|
|
|
|
|
|
|
|
|
|
|
|
@dbtest
|
|
|
|
def test_special_command_help(executor, pgspecial):
|
|
|
|
result = run(executor, "\\?", pgspecial=pgspecial)[1].split("|")
|
|
|
|
assert "Command" in result[1]
|
|
|
|
assert "Description" in result[2]
|
|
|
|
|
|
|
|
|
|
|
|
@dbtest
|
|
|
|
def test_bytea_field_support_in_output(executor):
|
|
|
|
run(executor, "create table binarydata(c bytea)")
|
|
|
|
run(executor, "insert into binarydata (c) values (decode('DEADBEEF', 'hex'))")
|
|
|
|
|
|
|
|
assert "\\xdeadbeef" in run(executor, "select * from binarydata", join=True)
|
|
|
|
|
|
|
|
|
|
|
|
@dbtest
|
|
|
|
def test_unicode_support_in_unknown_type(executor):
|
|
|
|
assert "日本語" in run(executor, "SELECT '日本語' AS japanese;", join=True)
|
|
|
|
|
|
|
|
|
|
|
|
@dbtest
|
|
|
|
def test_unicode_support_in_enum_type(executor):
|
|
|
|
run(executor, "CREATE TYPE mood AS ENUM ('sad', 'ok', 'happy', '日本語')")
|
|
|
|
run(executor, "CREATE TABLE person (name TEXT, current_mood mood)")
|
|
|
|
run(executor, "INSERT INTO person VALUES ('Moe', '日本語')")
|
|
|
|
assert "日本語" in run(executor, "SELECT * FROM person", join=True)
|
|
|
|
|
|
|
|
|
|
|
|
@requires_json
|
|
|
|
def test_json_renders_without_u_prefix(executor, expanded):
|
|
|
|
run(executor, "create table jsontest(d json)")
|
|
|
|
run(executor, """insert into jsontest (d) values ('{"name": "Éowyn"}')""")
|
|
|
|
result = run(
|
|
|
|
executor, "SELECT d FROM jsontest LIMIT 1", join=True, expanded=expanded
|
|
|
|
)
|
|
|
|
|
|
|
|
assert '{"name": "Éowyn"}' in result
|
|
|
|
|
|
|
|
|
|
|
|
@requires_jsonb
|
|
|
|
def test_jsonb_renders_without_u_prefix(executor, expanded):
|
|
|
|
run(executor, "create table jsonbtest(d jsonb)")
|
|
|
|
run(executor, """insert into jsonbtest (d) values ('{"name": "Éowyn"}')""")
|
|
|
|
result = run(
|
|
|
|
executor, "SELECT d FROM jsonbtest LIMIT 1", join=True, expanded=expanded
|
|
|
|
)
|
|
|
|
|
|
|
|
assert '{"name": "Éowyn"}' in result
|
|
|
|
|
|
|
|
|
|
|
|
@dbtest
|
|
|
|
def test_date_time_types(executor):
|
|
|
|
run(executor, "SET TIME ZONE UTC")
|
|
|
|
assert (
|
|
|
|
run(executor, "SELECT (CAST('00:00:00' AS time))", join=True).split("\n")[3]
|
|
|
|
== "| 00:00:00 |"
|
|
|
|
)
|
|
|
|
assert (
|
|
|
|
run(executor, "SELECT (CAST('00:00:00+14:59' AS timetz))", join=True).split(
|
|
|
|
"\n"
|
|
|
|
)[3]
|
|
|
|
== "| 00:00:00+14:59 |"
|
|
|
|
)
|
|
|
|
assert (
|
|
|
|
run(executor, "SELECT (CAST('4713-01-01 BC' AS date))", join=True).split("\n")[
|
|
|
|
3
|
|
|
|
]
|
|
|
|
== "| 4713-01-01 BC |"
|
|
|
|
)
|
|
|
|
assert (
|
|
|
|
run(
|
|
|
|
executor, "SELECT (CAST('4713-01-01 00:00:00 BC' AS timestamp))", join=True
|
|
|
|
).split("\n")[3]
|
|
|
|
== "| 4713-01-01 00:00:00 BC |"
|
|
|
|
)
|
|
|
|
assert (
|
|
|
|
run(
|
|
|
|
executor,
|
|
|
|
"SELECT (CAST('4713-01-01 00:00:00+00 BC' AS timestamptz))",
|
|
|
|
join=True,
|
|
|
|
).split("\n")[3]
|
|
|
|
== "| 4713-01-01 00:00:00+00 BC |"
|
|
|
|
)
|
|
|
|
assert (
|
|
|
|
run(
|
|
|
|
executor, "SELECT (CAST('-123456789 days 12:23:56' AS interval))", join=True
|
|
|
|
).split("\n")[3]
|
|
|
|
== "| -123456789 days, 12:23:56 |"
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
@dbtest
|
|
|
|
@pytest.mark.parametrize("value", ["10000000", "10000000.0", "10000000000000"])
|
|
|
|
def test_large_numbers_render_directly(executor, value):
|
|
|
|
run(executor, "create table numbertest(a numeric)")
|
2025-02-09 19:58:08 +01:00
|
|
|
run(executor, f"insert into numbertest (a) values ({value})")
|
2025-02-09 19:48:22 +01:00
|
|
|
assert value in run(executor, "select * from numbertest", join=True)
|
|
|
|
|
|
|
|
|
|
|
|
@dbtest
|
|
|
|
@pytest.mark.parametrize("command", ["di", "dv", "ds", "df", "dT"])
|
|
|
|
@pytest.mark.parametrize("verbose", ["", "+"])
|
|
|
|
@pytest.mark.parametrize("pattern", ["", "x", "*.*", "x.y", "x.*", "*.y"])
|
|
|
|
def test_describe_special(executor, command, verbose, pattern, pgspecial):
|
|
|
|
# We don't have any tests for the output of any of the special commands,
|
|
|
|
# but we can at least make sure they run without error
|
|
|
|
sql = r"\{command}{verbose} {pattern}".format(**locals())
|
|
|
|
list(executor.run(sql, pgspecial=pgspecial))
|
|
|
|
|
|
|
|
|
|
|
|
@dbtest
|
|
|
|
@pytest.mark.parametrize("sql", ["invalid sql", "SELECT 1; select error;"])
|
|
|
|
def test_raises_with_no_formatter(executor, sql):
|
2025-02-09 20:01:39 +01:00
|
|
|
with pytest.raises(psycopg.ProgrammingError):
|
2025-02-09 19:48:22 +01:00
|
|
|
list(executor.run(sql))
|
|
|
|
|
|
|
|
|
|
|
|
@dbtest
|
|
|
|
def test_on_error_resume(executor, exception_formatter):
|
|
|
|
sql = "select 1; error; select 1;"
|
|
|
|
result = list(
|
|
|
|
executor.run(sql, on_error_resume=True, exception_formatter=exception_formatter)
|
|
|
|
)
|
|
|
|
assert len(result) == 3
|
|
|
|
|
|
|
|
|
|
|
|
@dbtest
|
|
|
|
def test_on_error_stop(executor, exception_formatter):
|
|
|
|
sql = "select 1; error; select 1;"
|
|
|
|
result = list(
|
|
|
|
executor.run(
|
|
|
|
sql, on_error_resume=False, exception_formatter=exception_formatter
|
|
|
|
)
|
|
|
|
)
|
|
|
|
assert len(result) == 2
|
|
|
|
|
|
|
|
|
|
|
|
# @dbtest
|
|
|
|
# def test_unicode_notices(executor):
|
|
|
|
# sql = "DO language plpgsql $$ BEGIN RAISE NOTICE '有人更改'; END $$;"
|
|
|
|
# result = list(executor.run(sql))
|
|
|
|
# assert result[0][0] == u'NOTICE: 有人更改\n'
|
|
|
|
|
|
|
|
|
|
|
|
@dbtest
|
|
|
|
def test_nonexistent_function_definition(executor):
|
|
|
|
with pytest.raises(RuntimeError):
|
|
|
|
result = executor.view_definition("there_is_no_such_function")
|
|
|
|
|
|
|
|
|
|
|
|
@dbtest
|
|
|
|
def test_function_definition(executor):
|
|
|
|
run(
|
|
|
|
executor,
|
|
|
|
"""
|
|
|
|
CREATE OR REPLACE FUNCTION public.the_number_three()
|
|
|
|
RETURNS int
|
|
|
|
LANGUAGE sql
|
|
|
|
AS $function$
|
|
|
|
select 3;
|
|
|
|
$function$
|
|
|
|
""",
|
|
|
|
)
|
|
|
|
result = executor.function_definition("the_number_three")
|
|
|
|
|
|
|
|
|
|
|
|
@dbtest
|
|
|
|
def test_view_definition(executor):
|
|
|
|
run(executor, "create table tbl1 (a text, b numeric)")
|
|
|
|
run(executor, "create view vw1 AS SELECT * FROM tbl1")
|
|
|
|
run(executor, "create materialized view mvw1 AS SELECT * FROM tbl1")
|
|
|
|
result = executor.view_definition("vw1")
|
|
|
|
assert "FROM tbl1" in result
|
|
|
|
# import pytest; pytest.set_trace()
|
|
|
|
result = executor.view_definition("mvw1")
|
|
|
|
assert "MATERIALIZED VIEW" in result
|
|
|
|
|
|
|
|
|
|
|
|
@dbtest
|
|
|
|
def test_nonexistent_view_definition(executor):
|
|
|
|
with pytest.raises(RuntimeError):
|
|
|
|
result = executor.view_definition("there_is_no_such_view")
|
|
|
|
with pytest.raises(RuntimeError):
|
|
|
|
result = executor.view_definition("mvw1")
|
|
|
|
|
|
|
|
|
|
|
|
@dbtest
|
|
|
|
def test_short_host(executor):
|
|
|
|
with patch.object(executor, "host", "localhost"):
|
|
|
|
assert executor.short_host == "localhost"
|
|
|
|
with patch.object(executor, "host", "localhost.example.org"):
|
|
|
|
assert executor.short_host == "localhost"
|
|
|
|
with patch.object(
|
|
|
|
executor, "host", "localhost1.example.org,localhost2.example.org"
|
|
|
|
):
|
|
|
|
assert executor.short_host == "localhost1"
|
|
|
|
|
|
|
|
|
2025-02-09 19:58:08 +01:00
|
|
|
class VirtualCursor:
|
|
|
|
"""Mock a cursor to virtual database like pgbouncer."""
|
|
|
|
|
|
|
|
def __init__(self):
|
|
|
|
self.protocol_error = False
|
|
|
|
self.protocol_message = ""
|
|
|
|
self.description = None
|
|
|
|
self.status = None
|
|
|
|
self.statusmessage = "Error"
|
|
|
|
|
|
|
|
def execute(self, *args, **kwargs):
|
|
|
|
self.protocol_error = True
|
|
|
|
self.protocol_message = "Command not supported"
|
|
|
|
|
|
|
|
|
2025-02-09 19:48:22 +01:00
|
|
|
@dbtest
|
|
|
|
def test_exit_without_active_connection(executor):
|
|
|
|
quit_handler = MagicMock()
|
|
|
|
pgspecial = PGSpecial()
|
|
|
|
pgspecial.register(
|
|
|
|
quit_handler,
|
|
|
|
"\\q",
|
|
|
|
"\\q",
|
|
|
|
"Quit pgcli.",
|
|
|
|
arg_type=NO_QUERY,
|
|
|
|
case_sensitive=True,
|
|
|
|
aliases=(":q",),
|
|
|
|
)
|
|
|
|
|
2025-02-09 20:01:39 +01:00
|
|
|
with patch.object(
|
|
|
|
executor.conn, "cursor", side_effect=psycopg.InterfaceError("I'm broken!")
|
|
|
|
):
|
2025-02-09 19:48:22 +01:00
|
|
|
# we should be able to quit the app, even without active connection
|
|
|
|
run(executor, "\\q", pgspecial=pgspecial)
|
|
|
|
quit_handler.assert_called_once()
|
|
|
|
|
|
|
|
# an exception should be raised when running a query without active connection
|
2025-02-09 20:01:39 +01:00
|
|
|
with pytest.raises(psycopg.InterfaceError):
|
2025-02-09 19:48:22 +01:00
|
|
|
run(executor, "select 1", pgspecial=pgspecial)
|
2025-02-09 19:58:08 +01:00
|
|
|
|
|
|
|
|
|
|
|
@dbtest
|
|
|
|
def test_virtual_database(executor):
|
|
|
|
virtual_connection = MagicMock()
|
|
|
|
virtual_connection.cursor.return_value = VirtualCursor()
|
|
|
|
with patch.object(executor, "conn", virtual_connection):
|
|
|
|
result = run(executor, "select 1")
|
|
|
|
assert "Command not supported" in result
|