import re
from textwrap import dedent

import psycopg
import pytest
from unittest.mock import patch, MagicMock
from pgspecial.main import PGSpecial, NO_QUERY
from utils import run, dbtest, requires_json, requires_jsonb

from pgcli.main import PGCli, exception_formatter as main_exception_formatter
from pgcli.packages.parseutils.meta import FunctionMetadata


def function_meta_data(
    func_name,
    schema_name="public",
    arg_names=None,
    arg_types=None,
    arg_modes=None,
    return_type=None,
    is_aggregate=False,
    is_window=False,
    is_set_returning=False,
    is_extension=False,
    arg_defaults=None,
):
    return FunctionMetadata(
        schema_name,
        func_name,
        arg_names,
        arg_types,
        arg_modes,
        return_type,
        is_aggregate,
        is_window,
        is_set_returning,
        is_extension,
        arg_defaults,
    )


@dbtest
def test_conn(executor):
    run(executor, """create table test(a text)""")
    run(executor, """insert into test values('abc')""")
    assert run(executor, """select * from test""", join=True) == dedent(
        """\
        +-----+
        | a   |
        |-----|
        | abc |
        +-----+
        SELECT 1"""
    )


@dbtest
def test_copy(executor):
    executor_copy = executor.copy()
    run(executor_copy, """create table test(a text)""")
    run(executor_copy, """insert into test values('abc')""")
    assert run(executor_copy, """select * from test""", join=True) == dedent(
        """\
        +-----+
        | a   |
        |-----|
        | abc |
        +-----+
        SELECT 1"""
    )


@dbtest
def test_bools_are_treated_as_strings(executor):
    run(executor, """create table test(a boolean)""")
    run(executor, """insert into test values(True)""")
    assert run(executor, """select * from test""", join=True) == dedent(
        """\
        +------+
        | a    |
        |------|
        | True |
        +------+
        SELECT 1"""
    )


@dbtest
def test_expanded_slash_G(executor, pgspecial):
    # Tests whether we reset the expanded output after a \G.
    run(executor, """create table test(a boolean)""")
    run(executor, """insert into test values(True)""")
    results = run(executor, r"""select * from test \G""", pgspecial=pgspecial)
    assert pgspecial.expanded_output == False


@dbtest
def test_schemata_table_views_and_columns_query(executor):
    run(executor, "create table a(x text, y text)")
    run(executor, "create table b(z text)")
    run(executor, "create view d as select 1 as e")
    run(executor, "create schema schema1")
    run(executor, "create table schema1.c (w text DEFAULT 'meow')")
    run(executor, "create schema schema2")

    # schemata
    # don't enforce all members of the schemas since they may include postgres
    # temporary schemas
    assert set(executor.schemata()) >= {
        "public",
        "pg_catalog",
        "information_schema",
        "schema1",
        "schema2",
    }
    assert executor.search_path() == ["pg_catalog", "public"]

    # tables
    assert set(executor.tables()) >= {
        ("public", "a"),
        ("public", "b"),
        ("schema1", "c"),
    }

    assert set(executor.table_columns()) >= {
        ("public", "a", "x", "text", False, None),
        ("public", "a", "y", "text", False, None),
        ("public", "b", "z", "text", False, None),
        ("schema1", "c", "w", "text", True, "'meow'::text"),
    }

    # views
    assert set(executor.views()) >= {("public", "d")}

    assert set(executor.view_columns()) >= {
        ("public", "d", "e", "integer", False, None)
    }


@dbtest
def test_foreign_key_query(executor):
    run(executor, "create schema schema1")
    run(executor, "create schema schema2")
    run(executor, "create table schema1.parent(parentid int PRIMARY KEY)")
    run(
        executor,
        "create table schema2.child(childid int PRIMARY KEY, motherid int REFERENCES schema1.parent)",
    )

    assert set(executor.foreignkeys()) >= {
        ("schema1", "parent", "parentid", "schema2", "child", "motherid")
    }


@dbtest
def test_functions_query(executor):
    run(
        executor,
        """create function func1() returns int
                     language sql as $$select 1$$""",
    )
    run(executor, "create schema schema1")
    run(
        executor,
        """create function schema1.func2() returns int
                     language sql as $$select 2$$""",
    )

    run(
        executor,
        """create function func3()
                     returns table(x int, y int) language sql
                     as $$select 1, 2 from generate_series(1,5)$$;""",
    )

    run(
        executor,
        """create function func4(x int) returns setof int language sql
                     as $$select generate_series(1,5)$$;""",
    )

    funcs = set(executor.functions())
    assert funcs >= {
        function_meta_data(func_name="func1", return_type="integer"),
        function_meta_data(
            func_name="func3",
            arg_names=["x", "y"],
            arg_types=["integer", "integer"],
            arg_modes=["t", "t"],
            return_type="record",
            is_set_returning=True,
        ),
        function_meta_data(
            schema_name="public",
            func_name="func4",
            arg_names=("x",),
            arg_types=("integer",),
            return_type="integer",
            is_set_returning=True,
        ),
        function_meta_data(
            schema_name="schema1", func_name="func2", return_type="integer"
        ),
    }


@dbtest
def test_datatypes_query(executor):
    run(executor, "create type foo AS (a int, b text)")

    types = list(executor.datatypes())
    assert types == [("public", "foo")]


@dbtest
def test_database_list(executor):
    databases = executor.databases()
    assert "_test_db" in databases


@dbtest
def test_invalid_syntax(executor, exception_formatter):
    result = run(
        executor,
        "invalid syntax!",
        exception_formatter=lambda x: main_exception_formatter(x, verbose_errors=False),
    )
    assert 'syntax error at or near "invalid"' in result[0]
    assert "SQLSTATE" not in result[0]


@dbtest
def test_invalid_syntax_verbose(executor):
    result = run(
        executor,
        "invalid syntax!",
        exception_formatter=lambda x: main_exception_formatter(x, verbose_errors=True),
    )
    fields = r"""
Severity: ERROR
Severity \(non-localized\): ERROR
SQLSTATE code: 42601
Message: syntax error at or near "invalid"
Position: 1
File: scan\.l
Line: \d+
Routine: scanner_yyerror
    """.strip()
    assert re.search(fields, result[0])


@dbtest
def test_invalid_column_name(executor, exception_formatter):
    result = run(
        executor, "select invalid command", exception_formatter=exception_formatter
    )
    assert 'column "invalid" does not exist' in result[0]


@pytest.fixture(params=[True, False])
def expanded(request):
    return request.param


@dbtest
def test_unicode_support_in_output(executor, expanded):
    run(executor, "create table unicodechars(t text)")
    run(executor, "insert into unicodechars (t) values ('é')")

    # See issue #24, this raises an exception without proper handling
    assert "é" in run(
        executor, "select * from unicodechars", join=True, expanded=expanded
    )


@dbtest
def test_not_is_special(executor, pgspecial):
    """is_special is set to false for database queries."""
    query = "select 1"
    result = list(executor.run(query, pgspecial=pgspecial))
    success, is_special = result[0][5:]
    assert success == True
    assert is_special == False


@dbtest
def test_execute_from_file_no_arg(executor, pgspecial):
    r"""\i without a filename returns an error."""
    result = list(executor.run(r"\i", pgspecial=pgspecial))
    status, sql, success, is_special = result[0][3:]
    assert "missing required argument" in status
    assert success == False
    assert is_special == True


@dbtest
@patch("pgcli.main.os")
def test_execute_from_file_io_error(os, executor, pgspecial):
    r"""\i with an os_error returns an error."""
    # Inject an OSError.
    os.path.expanduser.side_effect = OSError("test")

    # Check the result.
    result = list(executor.run(r"\i test", pgspecial=pgspecial))
    status, sql, success, is_special = result[0][3:]
    assert status == "test"
    assert success == False
    assert is_special == True


@dbtest
def test_execute_from_commented_file_that_executes_another_file(
    executor, pgspecial, tmpdir
):
    # https://github.com/dbcli/pgcli/issues/1336
    sqlfile1 = tmpdir.join("test01.sql")
    sqlfile1.write("-- asdf \n\\h")
    sqlfile2 = tmpdir.join("test00.sql")
    sqlfile2.write("--An useless comment;\nselect now();\n-- another useless comment")

    rcfile = str(tmpdir.join("rcfile"))
    print(rcfile)
    cli = PGCli(pgexecute=executor, pgclirc_file=rcfile)
    assert cli != None
    statement = "--comment\n\\h"
    result = run(executor, statement, pgspecial=cli.pgspecial)
    assert result != None
    assert result[0].find("ALTER TABLE")


@dbtest
def test_execute_commented_first_line_and_special(executor, pgspecial, tmpdir):
    # just some base cases that should work also
    statement = "--comment\nselect now();"
    result = run(executor, statement, pgspecial=pgspecial)
    assert result != None
    assert result[1].find("now") >= 0

    statement = "/*comment*/\nselect now();"
    result = run(executor, statement, pgspecial=pgspecial)
    assert result != None
    assert result[1].find("now") >= 0

    # https://github.com/dbcli/pgcli/issues/1362
    statement = "--comment\n\\h"
    result = run(executor, statement, pgspecial=pgspecial)
    assert result != None
    assert result[1].find("ALTER") >= 0
    assert result[1].find("ABORT") >= 0

    statement = "--comment1\n--comment2\n\\h"
    result = run(executor, statement, pgspecial=pgspecial)
    assert result != None
    assert result[1].find("ALTER") >= 0
    assert result[1].find("ABORT") >= 0

    statement = "/*comment*/\n\h;"
    result = run(executor, statement, pgspecial=pgspecial)
    assert result != None
    assert result[1].find("ALTER") >= 0
    assert result[1].find("ABORT") >= 0

    statement = """/*comment1
    comment2*/
    \h"""
    result = run(executor, statement, pgspecial=pgspecial)
    assert result != None
    assert result[1].find("ALTER") >= 0
    assert result[1].find("ABORT") >= 0

    statement = """/*comment1
    comment2*/
    /*comment 3
    comment4*/
    \\h"""
    result = run(executor, statement, pgspecial=pgspecial)
    assert result != None
    assert result[1].find("ALTER") >= 0
    assert result[1].find("ABORT") >= 0

    statement = "    /*comment*/\n\h;"
    result = run(executor, statement, pgspecial=pgspecial)
    assert result != None
    assert result[1].find("ALTER") >= 0
    assert result[1].find("ABORT") >= 0

    statement = "/*comment\ncomment line2*/\n\h;"
    result = run(executor, statement, pgspecial=pgspecial)
    assert result != None
    assert result[1].find("ALTER") >= 0
    assert result[1].find("ABORT") >= 0

    statement = "          /*comment\ncomment line2*/\n\h;"
    result = run(executor, statement, pgspecial=pgspecial)
    assert result != None
    assert result[1].find("ALTER") >= 0
    assert result[1].find("ABORT") >= 0

    statement = """\\h /*comment4 */"""
    result = run(executor, statement, pgspecial=pgspecial)
    print(result)
    assert result != None
    assert result[0].find("No help") >= 0

    # TODO: we probably don't want to do this but sqlparse is not parsing things well
    # we relly want it to find help but right now, sqlparse isn't dropping the /*comment*/
    # style comments after command

    statement = """/*comment1*/
    \h
    /*comment4 */"""
    result = run(executor, statement, pgspecial=pgspecial)
    assert result != None
    assert result[0].find("No help") >= 0

    # TODO: same for this one
    statement = """/*comment1
    comment3
    comment2*/
    \\h
    /*comment4
    comment5
    comment6*/"""
    result = run(executor, statement, pgspecial=pgspecial)
    assert result != None
    assert result[0].find("No help") >= 0


@dbtest
def test_execute_commented_first_line_and_normal(executor, pgspecial, tmpdir):
    # https://github.com/dbcli/pgcli/issues/1403

    # just some base cases that should work also
    statement = "--comment\nselect now();"
    result = run(executor, statement, pgspecial=pgspecial)
    assert result != None
    assert result[1].find("now") >= 0

    statement = "/*comment*/\nselect now();"
    result = run(executor, statement, pgspecial=pgspecial)
    assert result != None
    assert result[1].find("now") >= 0

    # this simulates the original error (1403) without having to add/drop tables
    # since it was just an error on reading input files and not the actual
    # command itself

    # test that the statement works
    statement = """VALUES (1, 'one'), (2, 'two'), (3, 'three');"""
    result = run(executor, statement, pgspecial=pgspecial)
    assert result != None
    assert result[5].find("three") >= 0

    # test the statement with a \n in the middle
    statement = """VALUES (1, 'one'),\n (2, 'two'), (3, 'three');"""
    result = run(executor, statement, pgspecial=pgspecial)
    assert result != None
    assert result[5].find("three") >= 0

    # test the statement with a newline in the middle
    statement = """VALUES (1, 'one'),
     (2, 'two'), (3, 'three');"""
    result = run(executor, statement, pgspecial=pgspecial)
    assert result != None
    assert result[5].find("three") >= 0

    # now add a single comment line
    statement = """--comment\nVALUES (1, 'one'), (2, 'two'), (3, 'three');"""
    result = run(executor, statement, pgspecial=pgspecial)
    assert result != None
    assert result[5].find("three") >= 0

    # doing without special char \n
    statement = """--comment
    VALUES (1,'one'),
    (2, 'two'), (3, 'three');"""
    result = run(executor, statement, pgspecial=pgspecial)
    assert result != None
    assert result[5].find("three") >= 0

    # two comment lines
    statement = """--comment\n--comment2\nVALUES (1,'one'), (2, 'two'), (3, 'three');"""
    result = run(executor, statement, pgspecial=pgspecial)
    assert result != None
    assert result[5].find("three") >= 0

    # doing without special char \n
    statement = """--comment
    --comment2
    VALUES (1,'one'), (2, 'two'), (3, 'three');
    """
    result = run(executor, statement, pgspecial=pgspecial)
    assert result != None
    assert result[5].find("three") >= 0

    # multiline comment + newline in middle of the statement
    statement = """/*comment
comment2
comment3*/
VALUES (1,'one'),
(2, 'two'), (3, 'three');"""
    result = run(executor, statement, pgspecial=pgspecial)
    assert result != None
    assert result[5].find("three") >= 0

    # multiline comment + newline in middle of the statement
    # + comments after the statement
    statement = """/*comment
comment2
comment3*/
VALUES (1,'one'),
(2, 'two'), (3, 'three');
--comment4
--comment5"""
    result = run(executor, statement, pgspecial=pgspecial)
    assert result != None
    assert result[5].find("three") >= 0


@dbtest
def test_multiple_queries_same_line(executor):
    result = run(executor, "select 'foo'; select 'bar'")
    assert len(result) == 12  # 2 * (output+status) * 3 lines
    assert "foo" in result[3]
    assert "bar" in result[9]


@dbtest
def test_multiple_queries_with_special_command_same_line(executor, pgspecial):
    result = run(executor, r"select 'foo'; \d", pgspecial=pgspecial)
    assert len(result) == 11  # 2 * (output+status) * 3 lines
    assert "foo" in result[3]
    # This is a lame check. :(
    assert "Schema" in result[7]


@dbtest
def test_multiple_queries_same_line_syntaxerror(executor, exception_formatter):
    result = run(
        executor,
        "select 'fooé'; invalid syntax é",
        exception_formatter=exception_formatter,
    )
    assert "fooé" in result[3]
    assert 'syntax error at or near "invalid"' in result[-1]


@pytest.fixture
def pgspecial():
    return PGCli().pgspecial


@dbtest
def test_special_command_help(executor, pgspecial):
    result = run(executor, "\\?", pgspecial=pgspecial)[1].split("|")
    assert "Command" in result[1]
    assert "Description" in result[2]


@dbtest
def test_bytea_field_support_in_output(executor):
    run(executor, "create table binarydata(c bytea)")
    run(executor, "insert into binarydata (c) values (decode('DEADBEEF', 'hex'))")

    assert "\\xdeadbeef" in run(executor, "select * from binarydata", join=True)


@dbtest
def test_unicode_support_in_unknown_type(executor):
    assert "日本語" in run(executor, "SELECT '日本語' AS japanese;", join=True)


@dbtest
def test_unicode_support_in_enum_type(executor):
    run(executor, "CREATE TYPE mood AS ENUM ('sad', 'ok', 'happy', '日本語')")
    run(executor, "CREATE TABLE person (name TEXT, current_mood mood)")
    run(executor, "INSERT INTO person VALUES ('Moe', '日本語')")
    assert "日本語" in run(executor, "SELECT * FROM person", join=True)


@requires_json
def test_json_renders_without_u_prefix(executor, expanded):
    run(executor, "create table jsontest(d json)")
    run(executor, """insert into jsontest (d) values ('{"name": "Éowyn"}')""")
    result = run(
        executor, "SELECT d FROM jsontest LIMIT 1", join=True, expanded=expanded
    )

    assert '{"name": "Éowyn"}' in result


@requires_jsonb
def test_jsonb_renders_without_u_prefix(executor, expanded):
    run(executor, "create table jsonbtest(d jsonb)")
    run(executor, """insert into jsonbtest (d) values ('{"name": "Éowyn"}')""")
    result = run(
        executor, "SELECT d FROM jsonbtest LIMIT 1", join=True, expanded=expanded
    )

    assert '{"name": "Éowyn"}' in result


@dbtest
def test_date_time_types(executor):
    run(executor, "SET TIME ZONE UTC")
    assert (
        run(executor, "SELECT (CAST('00:00:00' AS time))", join=True).split("\n")[3]
        == "| 00:00:00 |"
    )
    assert (
        run(executor, "SELECT (CAST('00:00:00+14:59' AS timetz))", join=True).split(
            "\n"
        )[3]
        == "| 00:00:00+14:59 |"
    )
    assert (
        run(executor, "SELECT (CAST('4713-01-01 BC' AS date))", join=True).split("\n")[
            3
        ]
        == "| 4713-01-01 BC |"
    )
    assert (
        run(
            executor, "SELECT (CAST('4713-01-01 00:00:00 BC' AS timestamp))", join=True
        ).split("\n")[3]
        == "| 4713-01-01 00:00:00 BC |"
    )
    assert (
        run(
            executor,
            "SELECT (CAST('4713-01-01 00:00:00+00 BC' AS timestamptz))",
            join=True,
        ).split("\n")[3]
        == "| 4713-01-01 00:00:00+00 BC |"
    )
    assert (
        run(
            executor, "SELECT (CAST('-123456789 days 12:23:56' AS interval))", join=True
        ).split("\n")[3]
        == "| -123456789 days, 12:23:56 |"
    )


@dbtest
@pytest.mark.parametrize("value", ["10000000", "10000000.0", "10000000000000"])
def test_large_numbers_render_directly(executor, value):
    run(executor, "create table numbertest(a numeric)")
    run(executor, f"insert into numbertest (a) values ({value})")
    assert value in run(executor, "select * from numbertest", join=True)


@dbtest
@pytest.mark.parametrize("command", ["di", "dv", "ds", "df", "dT"])
@pytest.mark.parametrize("verbose", ["", "+"])
@pytest.mark.parametrize("pattern", ["", "x", "*.*", "x.y", "x.*", "*.y"])
def test_describe_special(executor, command, verbose, pattern, pgspecial):
    # We don't have any tests for the output of any of the special commands,
    # but we can at least make sure they run without error
    sql = r"\{command}{verbose} {pattern}".format(**locals())
    list(executor.run(sql, pgspecial=pgspecial))


@dbtest
@pytest.mark.parametrize("sql", ["invalid sql", "SELECT 1; select error;"])
def test_raises_with_no_formatter(executor, sql):
    with pytest.raises(psycopg.ProgrammingError):
        list(executor.run(sql))


@dbtest
def test_on_error_resume(executor, exception_formatter):
    sql = "select 1; error; select 1;"
    result = list(
        executor.run(sql, on_error_resume=True, exception_formatter=exception_formatter)
    )
    assert len(result) == 3


@dbtest
def test_on_error_stop(executor, exception_formatter):
    sql = "select 1; error; select 1;"
    result = list(
        executor.run(
            sql, on_error_resume=False, exception_formatter=exception_formatter
        )
    )
    assert len(result) == 2


# @dbtest
# def test_unicode_notices(executor):
#     sql = "DO language plpgsql $$ BEGIN RAISE NOTICE '有人更改'; END $$;"
#     result = list(executor.run(sql))
#     assert result[0][0] == u'NOTICE:  有人更改\n'


@dbtest
def test_nonexistent_function_definition(executor):
    with pytest.raises(RuntimeError):
        result = executor.view_definition("there_is_no_such_function")


@dbtest
def test_function_definition(executor):
    run(
        executor,
        """
            CREATE OR REPLACE FUNCTION public.the_number_three()
            RETURNS int
            LANGUAGE sql
            AS $function$
              select 3;
            $function$
    """,
    )
    result = executor.function_definition("the_number_three")


@dbtest
def test_function_notice_order(executor):
    run(
        executor,
        """
        CREATE OR REPLACE FUNCTION demo_order() RETURNS VOID AS
        $$
        BEGIN
            RAISE NOTICE 'first';
            RAISE NOTICE 'second';
            RAISE NOTICE 'third';
            RAISE NOTICE 'fourth';
            RAISE NOTICE 'fifth';
            RAISE NOTICE 'sixth';
        END;
        $$
        LANGUAGE plpgsql;
    """,
    )

    executor.function_definition("demo_order")

    result = run(executor, "select demo_order()")
    assert "first\nsecond\nthird\nfourth\nfifth\nsixth" in result[0]
    assert "+------------+" in result[1]
    assert "| demo_order |" in result[2]
    assert "|------------|" in result[3]
    assert "|            |" in result[4]
    assert "+------------+" in result[5]
    assert "SELECT 1" in result[6]


@dbtest
def test_view_definition(executor):
    run(executor, "create table tbl1 (a text, b numeric)")
    run(executor, "create view vw1 AS SELECT * FROM tbl1")
    run(executor, "create materialized view mvw1 AS SELECT * FROM tbl1")
    result = executor.view_definition("vw1")
    assert 'VIEW "public"."vw1" AS' in result
    assert "FROM tbl1" in result
    # import pytest; pytest.set_trace()
    result = executor.view_definition("mvw1")
    assert "MATERIALIZED VIEW" in result


@dbtest
def test_nonexistent_view_definition(executor):
    with pytest.raises(RuntimeError):
        result = executor.view_definition("there_is_no_such_view")
    with pytest.raises(RuntimeError):
        result = executor.view_definition("mvw1")


@dbtest
def test_short_host(executor):
    with patch.object(executor, "host", "localhost"):
        assert executor.short_host == "localhost"
    with patch.object(executor, "host", "localhost.example.org"):
        assert executor.short_host == "localhost"
    with patch.object(
        executor, "host", "localhost1.example.org,localhost2.example.org"
    ):
        assert executor.short_host == "localhost1"
    with patch.object(executor, "host", "ec2-11-222-333-444.compute-1.amazonaws.com"):
        assert executor.short_host == "ec2-11-222-333-444"
    with patch.object(executor, "host", "1.2.3.4"):
        assert executor.short_host == "1.2.3.4"


class VirtualCursor:
    """Mock a cursor to virtual database like pgbouncer."""

    def __init__(self):
        self.protocol_error = False
        self.protocol_message = ""
        self.description = None
        self.status = None
        self.statusmessage = "Error"

    def execute(self, *args, **kwargs):
        self.protocol_error = True
        self.protocol_message = "Command not supported"


@dbtest
def test_exit_without_active_connection(executor):
    quit_handler = MagicMock()
    pgspecial = PGSpecial()
    pgspecial.register(
        quit_handler,
        "\\q",
        "\\q",
        "Quit pgcli.",
        arg_type=NO_QUERY,
        case_sensitive=True,
        aliases=(":q",),
    )

    with patch.object(
        executor.conn, "cursor", side_effect=psycopg.InterfaceError("I'm broken!")
    ):
        # we should be able to quit the app, even without active connection
        run(executor, "\\q", pgspecial=pgspecial)
        quit_handler.assert_called_once()

        # an exception should be raised when running a query without active connection
        with pytest.raises(psycopg.InterfaceError):
            run(executor, "select 1", pgspecial=pgspecial)


@dbtest
def test_virtual_database(executor):
    virtual_connection = MagicMock()
    virtual_connection.cursor.return_value = VirtualCursor()
    with patch.object(executor, "conn", virtual_connection):
        result = run(executor, "select 1")
        assert "Command not supported" in result