1
0
Fork 0

Merging upstream version 3.2.0.

Signed-off-by: Daniel Baumann <daniel@debian.org>
This commit is contained in:
Daniel Baumann 2025-02-09 19:58:08 +01:00
parent a868bb3d29
commit 39b7cc8559
Signed by: daniel
GPG key ID: FBB4F0E80A80222F
50 changed files with 952 additions and 634 deletions

View file

@ -2,7 +2,7 @@ from textwrap import dedent
import psycopg2
import pytest
from mock import patch, MagicMock
from unittest.mock import patch, MagicMock
from pgspecial.main import PGSpecial, NO_QUERY
from utils import run, dbtest, requires_json, requires_jsonb
@ -89,7 +89,7 @@ def test_expanded_slash_G(executor, pgspecial):
# Tests whether we reset the expanded output after a \G.
run(executor, """create table test(a boolean)""")
run(executor, """insert into test values(True)""")
results = run(executor, """select * from test \G""", pgspecial=pgspecial)
results = run(executor, r"""select * from test \G""", pgspecial=pgspecial)
assert pgspecial.expanded_output == False
@ -105,31 +105,35 @@ def test_schemata_table_views_and_columns_query(executor):
# schemata
# don't enforce all members of the schemas since they may include postgres
# temporary schemas
assert set(executor.schemata()) >= set(
["public", "pg_catalog", "information_schema", "schema1", "schema2"]
)
assert set(executor.schemata()) >= {
"public",
"pg_catalog",
"information_schema",
"schema1",
"schema2",
}
assert executor.search_path() == ["pg_catalog", "public"]
# tables
assert set(executor.tables()) >= set(
[("public", "a"), ("public", "b"), ("schema1", "c")]
)
assert set(executor.tables()) >= {
("public", "a"),
("public", "b"),
("schema1", "c"),
}
assert set(executor.table_columns()) >= set(
[
("public", "a", "x", "text", False, None),
("public", "a", "y", "text", False, None),
("public", "b", "z", "text", False, None),
("schema1", "c", "w", "text", True, "'meow'::text"),
]
)
assert set(executor.table_columns()) >= {
("public", "a", "x", "text", False, None),
("public", "a", "y", "text", False, None),
("public", "b", "z", "text", False, None),
("schema1", "c", "w", "text", True, "'meow'::text"),
}
# views
assert set(executor.views()) >= set([("public", "d")])
assert set(executor.views()) >= {("public", "d")}
assert set(executor.view_columns()) >= set(
[("public", "d", "e", "integer", False, None)]
)
assert set(executor.view_columns()) >= {
("public", "d", "e", "integer", False, None)
}
@dbtest
@ -142,9 +146,9 @@ def test_foreign_key_query(executor):
"create table schema2.child(childid int PRIMARY KEY, motherid int REFERENCES schema1.parent)",
)
assert set(executor.foreignkeys()) >= set(
[("schema1", "parent", "parentid", "schema2", "child", "motherid")]
)
assert set(executor.foreignkeys()) >= {
("schema1", "parent", "parentid", "schema2", "child", "motherid")
}
@dbtest
@ -175,30 +179,28 @@ def test_functions_query(executor):
)
funcs = set(executor.functions())
assert funcs >= set(
[
function_meta_data(func_name="func1", return_type="integer"),
function_meta_data(
func_name="func3",
arg_names=["x", "y"],
arg_types=["integer", "integer"],
arg_modes=["t", "t"],
return_type="record",
is_set_returning=True,
),
function_meta_data(
schema_name="public",
func_name="func4",
arg_names=("x",),
arg_types=("integer",),
return_type="integer",
is_set_returning=True,
),
function_meta_data(
schema_name="schema1", func_name="func2", return_type="integer"
),
]
)
assert funcs >= {
function_meta_data(func_name="func1", return_type="integer"),
function_meta_data(
func_name="func3",
arg_names=["x", "y"],
arg_types=["integer", "integer"],
arg_modes=["t", "t"],
return_type="record",
is_set_returning=True,
),
function_meta_data(
schema_name="public",
func_name="func4",
arg_names=("x",),
arg_types=("integer",),
return_type="integer",
is_set_returning=True,
),
function_meta_data(
schema_name="schema1", func_name="func2", return_type="integer"
),
}
@dbtest
@ -257,8 +259,8 @@ def test_not_is_special(executor, pgspecial):
@dbtest
def test_execute_from_file_no_arg(executor, pgspecial):
"""\i without a filename returns an error."""
result = list(executor.run("\i", pgspecial=pgspecial))
r"""\i without a filename returns an error."""
result = list(executor.run(r"\i", pgspecial=pgspecial))
status, sql, success, is_special = result[0][3:]
assert "missing required argument" in status
assert success == False
@ -268,12 +270,12 @@ def test_execute_from_file_no_arg(executor, pgspecial):
@dbtest
@patch("pgcli.main.os")
def test_execute_from_file_io_error(os, executor, pgspecial):
"""\i with an io_error returns an error."""
# Inject an IOError.
os.path.expanduser.side_effect = IOError("test")
r"""\i with an os_error returns an error."""
# Inject an OSError.
os.path.expanduser.side_effect = OSError("test")
# Check the result.
result = list(executor.run("\i test", pgspecial=pgspecial))
result = list(executor.run(r"\i test", pgspecial=pgspecial))
status, sql, success, is_special = result[0][3:]
assert status == "test"
assert success == False
@ -290,7 +292,7 @@ def test_multiple_queries_same_line(executor):
@dbtest
def test_multiple_queries_with_special_command_same_line(executor, pgspecial):
result = run(executor, "select 'foo'; \d", pgspecial=pgspecial)
result = run(executor, r"select 'foo'; \d", pgspecial=pgspecial)
assert len(result) == 11 # 2 * (output+status) * 3 lines
assert "foo" in result[3]
# This is a lame check. :(
@ -408,7 +410,7 @@ def test_date_time_types(executor):
@pytest.mark.parametrize("value", ["10000000", "10000000.0", "10000000000000"])
def test_large_numbers_render_directly(executor, value):
run(executor, "create table numbertest(a numeric)")
run(executor, "insert into numbertest (a) values ({0})".format(value))
run(executor, f"insert into numbertest (a) values ({value})")
assert value in run(executor, "select * from numbertest", join=True)
@ -511,13 +513,28 @@ def test_short_host(executor):
assert executor.short_host == "localhost1"
class BrokenConnection(object):
class BrokenConnection:
"""Mock a connection that failed."""
def cursor(self):
raise psycopg2.InterfaceError("I'm broken!")
class VirtualCursor:
"""Mock a cursor to virtual database like pgbouncer."""
def __init__(self):
self.protocol_error = False
self.protocol_message = ""
self.description = None
self.status = None
self.statusmessage = "Error"
def execute(self, *args, **kwargs):
self.protocol_error = True
self.protocol_message = "Command not supported"
@dbtest
def test_exit_without_active_connection(executor):
quit_handler = MagicMock()
@ -540,3 +557,12 @@ def test_exit_without_active_connection(executor):
# an exception should be raised when running a query without active connection
with pytest.raises(psycopg2.InterfaceError):
run(executor, "select 1", pgspecial=pgspecial)
@dbtest
def test_virtual_database(executor):
virtual_connection = MagicMock()
virtual_connection.cursor.return_value = VirtualCursor()
with patch.object(executor, "conn", virtual_connection):
result = run(executor, "select 1")
assert "Command not supported" in result