231 lines
6.1 KiB
Python
231 lines
6.1 KiB
Python
"""
|
|
Steps for behavioral style tests are defined in this module.
|
|
Each step is defined by the string decorating it.
|
|
This string is used to call the step in "*.feature" file.
|
|
"""
|
|
|
|
import pexpect
|
|
import subprocess
|
|
import tempfile
|
|
|
|
from behave import when, then
|
|
from textwrap import dedent
|
|
import wrappers
|
|
|
|
|
|
@when("we list databases")
|
|
def step_list_databases(context):
|
|
cmd = ["pgcli", "--list"]
|
|
context.cmd_output = subprocess.check_output(cmd, cwd=context.package_root)
|
|
|
|
|
|
@then("we see list of databases")
|
|
def step_see_list_databases(context):
|
|
assert b"List of databases" in context.cmd_output
|
|
assert b"postgres" in context.cmd_output
|
|
context.cmd_output = None
|
|
|
|
|
|
@when("we run dbcli")
|
|
def step_run_cli(context):
|
|
wrappers.run_cli(context)
|
|
|
|
|
|
@when("we launch dbcli using {arg}")
|
|
def step_run_cli_using_arg(context, arg):
|
|
prompt_check = False
|
|
currentdb = None
|
|
if arg == "--username":
|
|
arg = "--username={}".format(context.conf["user"])
|
|
if arg == "--user":
|
|
arg = "--user={}".format(context.conf["user"])
|
|
if arg == "--port":
|
|
arg = "--port={}".format(context.conf["port"])
|
|
if arg == "--password":
|
|
arg = "--password"
|
|
prompt_check = False
|
|
# This uses the mock_pg_service.conf file in fixtures folder.
|
|
if arg == "dsn_password":
|
|
arg = "service=mock_postgres --password"
|
|
prompt_check = False
|
|
currentdb = "postgres"
|
|
wrappers.run_cli(
|
|
context, run_args=[arg], prompt_check=prompt_check, currentdb=currentdb
|
|
)
|
|
|
|
|
|
@when("we wait for prompt")
|
|
def step_wait_prompt(context):
|
|
wrappers.wait_prompt(context)
|
|
|
|
|
|
@when('we send "ctrl + d"')
|
|
def step_ctrl_d(context):
|
|
"""
|
|
Send Ctrl + D to hopefully exit.
|
|
"""
|
|
step_try_to_ctrl_d(context)
|
|
context.cli.expect(pexpect.EOF, timeout=5)
|
|
context.exit_sent = True
|
|
|
|
|
|
@when('we try to send "ctrl + d"')
|
|
def step_try_to_ctrl_d(context):
|
|
"""
|
|
Send Ctrl + D, perhaps exiting, perhaps not (if a transaction is
|
|
ongoing).
|
|
"""
|
|
# turn off pager before exiting
|
|
context.cli.sendcontrol("c")
|
|
context.cli.sendline(r"\pset pager off")
|
|
wrappers.wait_prompt(context)
|
|
context.cli.sendcontrol("d")
|
|
|
|
|
|
@when('we send "ctrl + c"')
|
|
def step_ctrl_c(context):
|
|
"""Send Ctrl + c to hopefully interrupt."""
|
|
context.cli.sendcontrol("c")
|
|
|
|
|
|
@then("we see cancelled query warning")
|
|
def step_see_cancelled_query_warning(context):
|
|
"""
|
|
Make sure we receive the warning that the current query was cancelled.
|
|
"""
|
|
wrappers.expect_exact(context, "cancelled query", timeout=2)
|
|
|
|
|
|
@then("we see ongoing transaction message")
|
|
def step_see_ongoing_transaction_error(context):
|
|
"""
|
|
Make sure we receive the warning that a transaction is ongoing.
|
|
"""
|
|
context.cli.expect("A transaction is ongoing.", timeout=2)
|
|
|
|
|
|
@when("we send sleep query")
|
|
def step_send_sleep_15_seconds(context):
|
|
"""
|
|
Send query to sleep for 15 seconds.
|
|
"""
|
|
context.cli.sendline("select pg_sleep(15)")
|
|
|
|
|
|
@when("we check for any non-idle sleep queries")
|
|
def step_check_for_active_sleep_queries(context):
|
|
"""
|
|
Send query to check for any non-idle pg_sleep queries.
|
|
"""
|
|
context.cli.sendline(
|
|
"select state from pg_stat_activity where query not like '%pg_stat_activity%' and query like '%pg_sleep%' and state != 'idle';"
|
|
)
|
|
|
|
|
|
@then("we don't see any non-idle sleep queries")
|
|
def step_no_active_sleep_queries(context):
|
|
"""Confirm that any pg_sleep queries are either idle or not active."""
|
|
wrappers.expect_exact(
|
|
context,
|
|
context.conf["pager_boundary"]
|
|
+ "\r"
|
|
+ dedent(
|
|
"""
|
|
+-------+\r
|
|
| state |\r
|
|
|-------|\r
|
|
+-------+\r
|
|
SELECT 0\r
|
|
"""
|
|
)
|
|
+ context.conf["pager_boundary"],
|
|
timeout=5,
|
|
)
|
|
|
|
|
|
@when(r'we send "\?" command')
|
|
def step_send_help(context):
|
|
r"""
|
|
Send \? to see help.
|
|
"""
|
|
context.cli.sendline(r"\?")
|
|
|
|
|
|
@when("we send partial select command")
|
|
def step_send_partial_select_command(context):
|
|
"""
|
|
Send `SELECT a` to see completion.
|
|
"""
|
|
context.cli.sendline("SELECT a")
|
|
|
|
|
|
@then("we see error message")
|
|
def step_see_error_message(context):
|
|
wrappers.expect_exact(context, 'column "a" does not exist', timeout=2)
|
|
|
|
|
|
@when("we send source command")
|
|
def step_send_source_command(context):
|
|
context.tmpfile_sql_help = tempfile.NamedTemporaryFile(prefix="pgcli_")
|
|
context.tmpfile_sql_help.write(rb"\?")
|
|
context.tmpfile_sql_help.flush()
|
|
context.cli.sendline(rf"\i {context.tmpfile_sql_help.name}")
|
|
wrappers.expect_exact(context, context.conf["pager_boundary"] + "\r\n", timeout=5)
|
|
|
|
|
|
@when("we run query to check application_name")
|
|
def step_check_application_name(context):
|
|
context.cli.sendline(
|
|
"SELECT 'found' FROM pg_stat_activity WHERE application_name = 'pgcli' HAVING COUNT(*) > 0;"
|
|
)
|
|
|
|
|
|
@then("we see found")
|
|
def step_see_found(context):
|
|
wrappers.expect_exact(
|
|
context,
|
|
context.conf["pager_boundary"]
|
|
+ "\r"
|
|
+ dedent(
|
|
"""
|
|
+----------+\r
|
|
| ?column? |\r
|
|
|----------|\r
|
|
| found |\r
|
|
+----------+\r
|
|
SELECT 1\r
|
|
"""
|
|
)
|
|
+ context.conf["pager_boundary"],
|
|
timeout=5,
|
|
)
|
|
|
|
|
|
@then("we respond to the destructive warning: {response}")
|
|
def step_resppond_to_destructive_command(context, response):
|
|
"""Respond to destructive command."""
|
|
wrappers.expect_exact(
|
|
context,
|
|
"You're about to run a destructive command.\r\nDo you want to proceed? [y/N]:",
|
|
timeout=2,
|
|
)
|
|
context.cli.sendline(response.strip())
|
|
|
|
|
|
@then("we send password")
|
|
def step_send_password(context):
|
|
wrappers.expect_exact(context, "Password for", timeout=5)
|
|
context.cli.sendline(context.conf["pass"] or "DOES NOT MATTER")
|
|
|
|
|
|
@when('we send "{text}"')
|
|
def step_send_text(context, text):
|
|
context.cli.sendline(text)
|
|
# Try to detect whether we are exiting. If so, set `exit_sent`
|
|
# so that `after_scenario` correctly cleans up.
|
|
try:
|
|
context.cli.expect(pexpect.EOF, timeout=0.2)
|
|
except pexpect.TIMEOUT:
|
|
pass
|
|
else:
|
|
context.exit_sent = True
|