71 lines
2 KiB
Python
71 lines
2 KiB
Python
import re
|
|
import pexpect
|
|
from pgcli.main import COLOR_CODE_REGEX
|
|
import textwrap
|
|
|
|
from io import StringIO
|
|
|
|
|
|
def expect_exact(context, expected, timeout):
|
|
timedout = False
|
|
try:
|
|
context.cli.expect_exact(expected, timeout=timeout)
|
|
except pexpect.TIMEOUT:
|
|
timedout = True
|
|
if timedout:
|
|
# Strip color codes out of the output.
|
|
actual = re.sub(r"\x1b\[([0-9A-Za-z;?])+[m|K]?", "", context.cli.before)
|
|
raise Exception(
|
|
textwrap.dedent(
|
|
"""\
|
|
Expected:
|
|
---
|
|
{0!r}
|
|
---
|
|
Actual:
|
|
---
|
|
{1!r}
|
|
---
|
|
Full log:
|
|
---
|
|
{2!r}
|
|
---
|
|
"""
|
|
).format(expected, actual, context.logfile.getvalue())
|
|
)
|
|
|
|
|
|
def expect_pager(context, expected, timeout):
|
|
formatted = expected if isinstance(expected, list) else [expected]
|
|
formatted = [
|
|
f"{context.conf['pager_boundary']}\r\n{t}{context.conf['pager_boundary']}\r\n"
|
|
for t in formatted
|
|
]
|
|
|
|
expect_exact(
|
|
context,
|
|
formatted,
|
|
timeout=timeout,
|
|
)
|
|
|
|
|
|
def run_cli(context, run_args=None, prompt_check=True, currentdb=None):
|
|
"""Run the process using pexpect."""
|
|
run_args = run_args or []
|
|
cli_cmd = context.conf.get("cli_command")
|
|
cmd_parts = [cli_cmd] + run_args
|
|
cmd = " ".join(cmd_parts)
|
|
context.cli = pexpect.spawnu(cmd, cwd=context.package_root)
|
|
context.logfile = StringIO()
|
|
context.cli.logfile = context.logfile
|
|
context.exit_sent = False
|
|
context.currentdb = currentdb or context.conf["dbname"]
|
|
context.cli.sendline(r"\pset pager always")
|
|
if prompt_check:
|
|
wait_prompt(context)
|
|
|
|
|
|
def wait_prompt(context):
|
|
"""Make sure prompt is displayed."""
|
|
prompt_str = "{0}>".format(context.currentdb)
|
|
expect_exact(context, [prompt_str + " ", prompt_str, pexpect.EOF], timeout=3)
|