1
0
Fork 0
pgcli/tests/test_pgexecute.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

836 lines
24 KiB
Python
Raw Permalink Normal View History

import re
from textwrap import dedent
import psycopg
import pytest
from unittest.mock import patch, MagicMock
from pgspecial.main import PGSpecial, NO_QUERY
from utils import run, dbtest, requires_json, requires_jsonb
from pgcli.main import PGCli, exception_formatter as main_exception_formatter
from pgcli.packages.parseutils.meta import FunctionMetadata
def function_meta_data(
func_name,
schema_name="public",
arg_names=None,
arg_types=None,
arg_modes=None,
return_type=None,
is_aggregate=False,
is_window=False,
is_set_returning=False,
is_extension=False,
arg_defaults=None,
):
return FunctionMetadata(
schema_name,
func_name,
arg_names,
arg_types,
arg_modes,
return_type,
is_aggregate,
is_window,
is_set_returning,
is_extension,
arg_defaults,
)
@dbtest
def test_conn(executor):
run(executor, """create table test(a text)""")
run(executor, """insert into test values('abc')""")
assert run(executor, """select * from test""", join=True) == dedent(
"""\
+-----+
| a |
|-----|
| abc |
+-----+
SELECT 1"""
)
@dbtest
def test_copy(executor):
executor_copy = executor.copy()
run(executor_copy, """create table test(a text)""")
run(executor_copy, """insert into test values('abc')""")
assert run(executor_copy, """select * from test""", join=True) == dedent(
"""\
+-----+
| a |
|-----|
| abc |
+-----+
SELECT 1"""
)
@dbtest
def test_bools_are_treated_as_strings(executor):
run(executor, """create table test(a boolean)""")
run(executor, """insert into test values(True)""")
assert run(executor, """select * from test""", join=True) == dedent(
"""\
+------+
| a |
|------|
| True |
+------+
SELECT 1"""
)
@dbtest
def test_expanded_slash_G(executor, pgspecial):
# Tests whether we reset the expanded output after a \G.
run(executor, """create table test(a boolean)""")
run(executor, """insert into test values(True)""")
results = run(executor, r"""select * from test \G""", pgspecial=pgspecial)
assert pgspecial.expanded_output == False
@dbtest
def test_schemata_table_views_and_columns_query(executor):
run(executor, "create table a(x text, y text)")
run(executor, "create table b(z text)")
run(executor, "create view d as select 1 as e")
run(executor, "create schema schema1")
run(executor, "create table schema1.c (w text DEFAULT 'meow')")
run(executor, "create schema schema2")
# schemata
# don't enforce all members of the schemas since they may include postgres
# temporary schemas
assert set(executor.schemata()) >= {
"public",
"pg_catalog",
"information_schema",
"schema1",
"schema2",
}
assert executor.search_path() == ["pg_catalog", "public"]
# tables
assert set(executor.tables()) >= {
("public", "a"),
("public", "b"),
("schema1", "c"),
}
assert set(executor.table_columns()) >= {
("public", "a", "x", "text", False, None),
("public", "a", "y", "text", False, None),
("public", "b", "z", "text", False, None),
("schema1", "c", "w", "text", True, "'meow'::text"),
}
# views
assert set(executor.views()) >= {("public", "d")}
assert set(executor.view_columns()) >= {
("public", "d", "e", "integer", False, None)
}
@dbtest
def test_foreign_key_query(executor):
run(executor, "create schema schema1")
run(executor, "create schema schema2")
run(executor, "create table schema1.parent(parentid int PRIMARY KEY)")
run(
executor,
"create table schema2.child(childid int PRIMARY KEY, motherid int REFERENCES schema1.parent)",
)
assert set(executor.foreignkeys()) >= {
("schema1", "parent", "parentid", "schema2", "child", "motherid")
}
@dbtest
def test_functions_query(executor):
run(
executor,
"""create function func1() returns int
language sql as $$select 1$$""",
)
run(executor, "create schema schema1")
run(
executor,
"""create function schema1.func2() returns int
language sql as $$select 2$$""",
)
run(
executor,
"""create function func3()
returns table(x int, y int) language sql
as $$select 1, 2 from generate_series(1,5)$$;""",
)
run(
executor,
"""create function func4(x int) returns setof int language sql
as $$select generate_series(1,5)$$;""",
)
funcs = set(executor.functions())
assert funcs >= {
function_meta_data(func_name="func1", return_type="integer"),
function_meta_data(
func_name="func3",
arg_names=["x", "y"],
arg_types=["integer", "integer"],
arg_modes=["t", "t"],
return_type="record",
is_set_returning=True,
),
function_meta_data(
schema_name="public",
func_name="func4",
arg_names=("x",),
arg_types=("integer",),
return_type="integer",
is_set_returning=True,
),
function_meta_data(
schema_name="schema1", func_name="func2", return_type="integer"
),
}
@dbtest
def test_datatypes_query(executor):
run(executor, "create type foo AS (a int, b text)")
types = list(executor.datatypes())
assert types == [("public", "foo")]
@dbtest
def test_database_list(executor):
databases = executor.databases()
assert "_test_db" in databases
@dbtest
def test_invalid_syntax(executor, exception_formatter):
result = run(
executor,
"invalid syntax!",
exception_formatter=lambda x: main_exception_formatter(x, verbose_errors=False),
)
assert 'syntax error at or near "invalid"' in result[0]
assert "SQLSTATE" not in result[0]
@dbtest
def test_invalid_syntax_verbose(executor):
result = run(
executor,
"invalid syntax!",
exception_formatter=lambda x: main_exception_formatter(x, verbose_errors=True),
)
fields = r"""
Severity: ERROR
Severity \(non-localized\): ERROR
SQLSTATE code: 42601
Message: syntax error at or near "invalid"
Position: 1
File: scan\.l
Line: \d+
Routine: scanner_yyerror
""".strip()
assert re.search(fields, result[0])
@dbtest
def test_invalid_column_name(executor, exception_formatter):
result = run(
executor, "select invalid command", exception_formatter=exception_formatter
)
assert 'column "invalid" does not exist' in result[0]
@pytest.fixture(params=[True, False])
def expanded(request):
return request.param
@dbtest
def test_unicode_support_in_output(executor, expanded):
run(executor, "create table unicodechars(t text)")
run(executor, "insert into unicodechars (t) values ('é')")
# See issue #24, this raises an exception without proper handling
assert "é" in run(
executor, "select * from unicodechars", join=True, expanded=expanded
)
@dbtest
def test_not_is_special(executor, pgspecial):
"""is_special is set to false for database queries."""
query = "select 1"
result = list(executor.run(query, pgspecial=pgspecial))
success, is_special = result[0][5:]
assert success == True
assert is_special == False
@dbtest
def test_execute_from_file_no_arg(executor, pgspecial):
r"""\i without a filename returns an error."""
result = list(executor.run(r"\i", pgspecial=pgspecial))
status, sql, success, is_special = result[0][3:]
assert "missing required argument" in status
assert success == False
assert is_special == True
@dbtest
@patch("pgcli.main.os")
def test_execute_from_file_io_error(os, executor, pgspecial):
r"""\i with an os_error returns an error."""
# Inject an OSError.
os.path.expanduser.side_effect = OSError("test")
# Check the result.
result = list(executor.run(r"\i test", pgspecial=pgspecial))
status, sql, success, is_special = result[0][3:]
assert status == "test"
assert success == False
assert is_special == True
@dbtest
def test_execute_from_commented_file_that_executes_another_file(
executor, pgspecial, tmpdir
):
# https://github.com/dbcli/pgcli/issues/1336
sqlfile1 = tmpdir.join("test01.sql")
sqlfile1.write("-- asdf \n\\h")
sqlfile2 = tmpdir.join("test00.sql")
sqlfile2.write("--An useless comment;\nselect now();\n-- another useless comment")
rcfile = str(tmpdir.join("rcfile"))
print(rcfile)
cli = PGCli(pgexecute=executor, pgclirc_file=rcfile)
assert cli != None
statement = "--comment\n\\h"
result = run(executor, statement, pgspecial=cli.pgspecial)
assert result != None
assert result[0].find("ALTER TABLE")
@dbtest
def test_execute_commented_first_line_and_special(executor, pgspecial, tmpdir):
# just some base cases that should work also
statement = "--comment\nselect now();"
result = run(executor, statement, pgspecial=pgspecial)
assert result != None
assert result[1].find("now") >= 0
statement = "/*comment*/\nselect now();"
result = run(executor, statement, pgspecial=pgspecial)
assert result != None
assert result[1].find("now") >= 0
# https://github.com/dbcli/pgcli/issues/1362
statement = "--comment\n\\h"
result = run(executor, statement, pgspecial=pgspecial)
assert result != None
assert result[1].find("ALTER") >= 0
assert result[1].find("ABORT") >= 0
statement = "--comment1\n--comment2\n\\h"
result = run(executor, statement, pgspecial=pgspecial)
assert result != None
assert result[1].find("ALTER") >= 0
assert result[1].find("ABORT") >= 0
statement = "/*comment*/\n\h;"
result = run(executor, statement, pgspecial=pgspecial)
assert result != None
assert result[1].find("ALTER") >= 0
assert result[1].find("ABORT") >= 0
statement = """/*comment1
comment2*/
\h"""
result = run(executor, statement, pgspecial=pgspecial)
assert result != None
assert result[1].find("ALTER") >= 0
assert result[1].find("ABORT") >= 0
statement = """/*comment1
comment2*/
/*comment 3
comment4*/
\\h"""
result = run(executor, statement, pgspecial=pgspecial)
assert result != None
assert result[1].find("ALTER") >= 0
assert result[1].find("ABORT") >= 0
statement = " /*comment*/\n\h;"
result = run(executor, statement, pgspecial=pgspecial)
assert result != None
assert result[1].find("ALTER") >= 0
assert result[1].find("ABORT") >= 0
statement = "/*comment\ncomment line2*/\n\h;"
result = run(executor, statement, pgspecial=pgspecial)
assert result != None
assert result[1].find("ALTER") >= 0
assert result[1].find("ABORT") >= 0
statement = " /*comment\ncomment line2*/\n\h;"
result = run(executor, statement, pgspecial=pgspecial)
assert result != None
assert result[1].find("ALTER") >= 0
assert result[1].find("ABORT") >= 0
statement = """\\h /*comment4 */"""
result = run(executor, statement, pgspecial=pgspecial)
print(result)
assert result != None
assert result[0].find("No help") >= 0
# TODO: we probably don't want to do this but sqlparse is not parsing things well
# we relly want it to find help but right now, sqlparse isn't dropping the /*comment*/
# style comments after command
statement = """/*comment1*/
\h
/*comment4 */"""
result = run(executor, statement, pgspecial=pgspecial)
assert result != None
assert result[0].find("No help") >= 0
# TODO: same for this one
statement = """/*comment1
comment3
comment2*/
\\h
/*comment4
comment5
comment6*/"""
result = run(executor, statement, pgspecial=pgspecial)
assert result != None
assert result[0].find("No help") >= 0
@dbtest
def test_execute_commented_first_line_and_normal(executor, pgspecial, tmpdir):
# https://github.com/dbcli/pgcli/issues/1403
# just some base cases that should work also
statement = "--comment\nselect now();"
result = run(executor, statement, pgspecial=pgspecial)
assert result != None
assert result[1].find("now") >= 0
statement = "/*comment*/\nselect now();"
result = run(executor, statement, pgspecial=pgspecial)
assert result != None
assert result[1].find("now") >= 0
# this simulates the original error (1403) without having to add/drop tables
# since it was just an error on reading input files and not the actual
# command itself
# test that the statement works
statement = """VALUES (1, 'one'), (2, 'two'), (3, 'three');"""
result = run(executor, statement, pgspecial=pgspecial)
assert result != None
assert result[5].find("three") >= 0
# test the statement with a \n in the middle
statement = """VALUES (1, 'one'),\n (2, 'two'), (3, 'three');"""
result = run(executor, statement, pgspecial=pgspecial)
assert result != None
assert result[5].find("three") >= 0
# test the statement with a newline in the middle
statement = """VALUES (1, 'one'),
(2, 'two'), (3, 'three');"""
result = run(executor, statement, pgspecial=pgspecial)
assert result != None
assert result[5].find("three") >= 0
# now add a single comment line
statement = """--comment\nVALUES (1, 'one'), (2, 'two'), (3, 'three');"""
result = run(executor, statement, pgspecial=pgspecial)
assert result != None
assert result[5].find("three") >= 0
# doing without special char \n
statement = """--comment
VALUES (1,'one'),
(2, 'two'), (3, 'three');"""
result = run(executor, statement, pgspecial=pgspecial)
assert result != None
assert result[5].find("three") >= 0
# two comment lines
statement = """--comment\n--comment2\nVALUES (1,'one'), (2, 'two'), (3, 'three');"""
result = run(executor, statement, pgspecial=pgspecial)
assert result != None
assert result[5].find("three") >= 0
# doing without special char \n
statement = """--comment
--comment2
VALUES (1,'one'), (2, 'two'), (3, 'three');
"""
result = run(executor, statement, pgspecial=pgspecial)
assert result != None
assert result[5].find("three") >= 0
# multiline comment + newline in middle of the statement
statement = """/*comment
comment2
comment3*/
VALUES (1,'one'),
(2, 'two'), (3, 'three');"""
result = run(executor, statement, pgspecial=pgspecial)
assert result != None
assert result[5].find("three") >= 0
# multiline comment + newline in middle of the statement
# + comments after the statement
statement = """/*comment
comment2
comment3*/
VALUES (1,'one'),
(2, 'two'), (3, 'three');
--comment4
--comment5"""
result = run(executor, statement, pgspecial=pgspecial)
assert result != None
assert result[5].find("three") >= 0
@dbtest
def test_multiple_queries_same_line(executor):
result = run(executor, "select 'foo'; select 'bar'")
assert len(result) == 12 # 2 * (output+status) * 3 lines
assert "foo" in result[3]
assert "bar" in result[9]
@dbtest
def test_multiple_queries_with_special_command_same_line(executor, pgspecial):
result = run(executor, r"select 'foo'; \d", pgspecial=pgspecial)
assert len(result) == 11 # 2 * (output+status) * 3 lines
assert "foo" in result[3]
# This is a lame check. :(
assert "Schema" in result[7]
@dbtest
def test_multiple_queries_same_line_syntaxerror(executor, exception_formatter):
result = run(
executor,
"select 'fooé'; invalid syntax é",
exception_formatter=exception_formatter,
)
assert "fooé" in result[3]
assert 'syntax error at or near "invalid"' in result[-1]
@pytest.fixture
def pgspecial():
return PGCli().pgspecial
@dbtest
def test_special_command_help(executor, pgspecial):
result = run(executor, "\\?", pgspecial=pgspecial)[1].split("|")
assert "Command" in result[1]
assert "Description" in result[2]
@dbtest
def test_bytea_field_support_in_output(executor):
run(executor, "create table binarydata(c bytea)")
run(executor, "insert into binarydata (c) values (decode('DEADBEEF', 'hex'))")
assert "\\xdeadbeef" in run(executor, "select * from binarydata", join=True)
@dbtest
def test_unicode_support_in_unknown_type(executor):
assert "日本語" in run(executor, "SELECT '日本語' AS japanese;", join=True)
@dbtest
def test_unicode_support_in_enum_type(executor):
run(executor, "CREATE TYPE mood AS ENUM ('sad', 'ok', 'happy', '日本語')")
run(executor, "CREATE TABLE person (name TEXT, current_mood mood)")
run(executor, "INSERT INTO person VALUES ('Moe', '日本語')")
assert "日本語" in run(executor, "SELECT * FROM person", join=True)
@requires_json
def test_json_renders_without_u_prefix(executor, expanded):
run(executor, "create table jsontest(d json)")
run(executor, """insert into jsontest (d) values ('{"name": "Éowyn"}')""")
result = run(
executor, "SELECT d FROM jsontest LIMIT 1", join=True, expanded=expanded
)
assert '{"name": "Éowyn"}' in result
@requires_jsonb
def test_jsonb_renders_without_u_prefix(executor, expanded):
run(executor, "create table jsonbtest(d jsonb)")
run(executor, """insert into jsonbtest (d) values ('{"name": "Éowyn"}')""")
result = run(
executor, "SELECT d FROM jsonbtest LIMIT 1", join=True, expanded=expanded
)
assert '{"name": "Éowyn"}' in result
@dbtest
def test_date_time_types(executor):
run(executor, "SET TIME ZONE UTC")
assert (
run(executor, "SELECT (CAST('00:00:00' AS time))", join=True).split("\n")[3]
== "| 00:00:00 |"
)
assert (
run(executor, "SELECT (CAST('00:00:00+14:59' AS timetz))", join=True).split(
"\n"
)[3]
== "| 00:00:00+14:59 |"
)
assert (
run(executor, "SELECT (CAST('4713-01-01 BC' AS date))", join=True).split("\n")[
3
]
== "| 4713-01-01 BC |"
)
assert (
run(
executor, "SELECT (CAST('4713-01-01 00:00:00 BC' AS timestamp))", join=True
).split("\n")[3]
== "| 4713-01-01 00:00:00 BC |"
)
assert (
run(
executor,
"SELECT (CAST('4713-01-01 00:00:00+00 BC' AS timestamptz))",
join=True,
).split("\n")[3]
== "| 4713-01-01 00:00:00+00 BC |"
)
assert (
run(
executor, "SELECT (CAST('-123456789 days 12:23:56' AS interval))", join=True
).split("\n")[3]
== "| -123456789 days, 12:23:56 |"
)
@dbtest
@pytest.mark.parametrize("value", ["10000000", "10000000.0", "10000000000000"])
def test_large_numbers_render_directly(executor, value):
run(executor, "create table numbertest(a numeric)")
run(executor, f"insert into numbertest (a) values ({value})")
assert value in run(executor, "select * from numbertest", join=True)
@dbtest
@pytest.mark.parametrize("command", ["di", "dv", "ds", "df", "dT"])
@pytest.mark.parametrize("verbose", ["", "+"])
@pytest.mark.parametrize("pattern", ["", "x", "*.*", "x.y", "x.*", "*.y"])
def test_describe_special(executor, command, verbose, pattern, pgspecial):
# We don't have any tests for the output of any of the special commands,
# but we can at least make sure they run without error
sql = r"\{command}{verbose} {pattern}".format(**locals())
list(executor.run(sql, pgspecial=pgspecial))
@dbtest
@pytest.mark.parametrize("sql", ["invalid sql", "SELECT 1; select error;"])
def test_raises_with_no_formatter(executor, sql):
with pytest.raises(psycopg.ProgrammingError):
list(executor.run(sql))
@dbtest
def test_on_error_resume(executor, exception_formatter):
sql = "select 1; error; select 1;"
result = list(
executor.run(sql, on_error_resume=True, exception_formatter=exception_formatter)
)
assert len(result) == 3
@dbtest
def test_on_error_stop(executor, exception_formatter):
sql = "select 1; error; select 1;"
result = list(
executor.run(
sql, on_error_resume=False, exception_formatter=exception_formatter
)
)
assert len(result) == 2
# @dbtest
# def test_unicode_notices(executor):
# sql = "DO language plpgsql $$ BEGIN RAISE NOTICE '有人更改'; END $$;"
# result = list(executor.run(sql))
# assert result[0][0] == u'NOTICE: 有人更改\n'
@dbtest
def test_nonexistent_function_definition(executor):
with pytest.raises(RuntimeError):
result = executor.view_definition("there_is_no_such_function")
@dbtest
def test_function_definition(executor):
run(
executor,
"""
CREATE OR REPLACE FUNCTION public.the_number_three()
RETURNS int
LANGUAGE sql
AS $function$
select 3;
$function$
""",
)
result = executor.function_definition("the_number_three")
@dbtest
def test_function_notice_order(executor):
run(
executor,
"""
CREATE OR REPLACE FUNCTION demo_order() RETURNS VOID AS
$$
BEGIN
RAISE NOTICE 'first';
RAISE NOTICE 'second';
RAISE NOTICE 'third';
RAISE NOTICE 'fourth';
RAISE NOTICE 'fifth';
RAISE NOTICE 'sixth';
END;
$$
LANGUAGE plpgsql;
""",
)
executor.function_definition("demo_order")
result = run(executor, "select demo_order()")
assert "first\nsecond\nthird\nfourth\nfifth\nsixth" in result[0]
assert "+------------+" in result[1]
assert "| demo_order |" in result[2]
assert "|------------|" in result[3]
assert "| |" in result[4]
assert "+------------+" in result[5]
assert "SELECT 1" in result[6]
@dbtest
def test_view_definition(executor):
run(executor, "create table tbl1 (a text, b numeric)")
run(executor, "create view vw1 AS SELECT * FROM tbl1")
run(executor, "create materialized view mvw1 AS SELECT * FROM tbl1")
result = executor.view_definition("vw1")
assert 'VIEW "public"."vw1" AS' in result
assert "FROM tbl1" in result
# import pytest; pytest.set_trace()
result = executor.view_definition("mvw1")
assert "MATERIALIZED VIEW" in result
@dbtest
def test_nonexistent_view_definition(executor):
with pytest.raises(RuntimeError):
result = executor.view_definition("there_is_no_such_view")
with pytest.raises(RuntimeError):
result = executor.view_definition("mvw1")
@dbtest
def test_short_host(executor):
with patch.object(executor, "host", "localhost"):
assert executor.short_host == "localhost"
with patch.object(executor, "host", "localhost.example.org"):
assert executor.short_host == "localhost"
with patch.object(
executor, "host", "localhost1.example.org,localhost2.example.org"
):
assert executor.short_host == "localhost1"
with patch.object(executor, "host", "ec2-11-222-333-444.compute-1.amazonaws.com"):
assert executor.short_host == "ec2-11-222-333-444"
with patch.object(executor, "host", "1.2.3.4"):
assert executor.short_host == "1.2.3.4"
class VirtualCursor:
"""Mock a cursor to virtual database like pgbouncer."""
def __init__(self):
self.protocol_error = False
self.protocol_message = ""
self.description = None
self.status = None
self.statusmessage = "Error"
def execute(self, *args, **kwargs):
self.protocol_error = True
self.protocol_message = "Command not supported"
@dbtest
def test_exit_without_active_connection(executor):
quit_handler = MagicMock()
pgspecial = PGSpecial()
pgspecial.register(
quit_handler,
"\\q",
"\\q",
"Quit pgcli.",
arg_type=NO_QUERY,
case_sensitive=True,
aliases=(":q",),
)
with patch.object(
executor.conn, "cursor", side_effect=psycopg.InterfaceError("I'm broken!")
):
# we should be able to quit the app, even without active connection
run(executor, "\\q", pgspecial=pgspecial)
quit_handler.assert_called_once()
# an exception should be raised when running a query without active connection
with pytest.raises(psycopg.InterfaceError):
run(executor, "select 1", pgspecial=pgspecial)
@dbtest
def test_virtual_database(executor):
virtual_connection = MagicMock()
virtual_connection.cursor.return_value = VirtualCursor()
with patch.object(executor, "conn", virtual_connection):
result = run(executor, "select 1")
assert "Command not supported" in result