1
0
Fork 0
mycli/test/features/environment.py

146 lines
5 KiB
Python
Raw Normal View History

import os
import shutil
import sys
from tempfile import mkstemp
import db_utils as dbutils
import fixture_utils as fixutils
import pexpect
from steps.wrappers import run_cli, wait_prompt
test_log_file = os.path.join(os.environ["HOME"], ".mycli.test.log")
SELF_CONNECTING_FEATURES = ("test/features/connection.feature",)
MY_CNF_PATH = os.path.expanduser("~/.my.cnf")
MY_CNF_BACKUP_PATH = f"{MY_CNF_PATH}.backup"
MYLOGIN_CNF_PATH = os.path.expanduser("~/.mylogin.cnf")
MYLOGIN_CNF_BACKUP_PATH = f"{MYLOGIN_CNF_PATH}.backup"
def get_db_name_from_context(context):
return context.config.userdata.get("my_test_db", None) or "mycli_behave_tests"
def before_all(context):
"""Set env parameters."""
os.environ["LINES"] = "100"
os.environ["COLUMNS"] = "100"
os.environ["EDITOR"] = "ex"
os.environ["LC_ALL"] = "en_US.UTF-8"
os.environ["PROMPT_TOOLKIT_NO_CPR"] = "1"
os.environ["MYCLI_HISTFILE"] = os.devnull
# test_dir = os.path.abspath(os.path.dirname(os.path.dirname(__file__)))
# login_path_file = os.path.join(test_dir, "mylogin.cnf")
# os.environ['MYSQL_TEST_LOGIN_FILE'] = login_path_file
context.package_root = os.path.abspath(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
os.environ["COVERAGE_PROCESS_START"] = os.path.join(context.package_root, ".coveragerc")
context.exit_sent = False
vi = "_".join([str(x) for x in sys.version_info[:3]])
db_name = get_db_name_from_context(context)
db_name_full = "{0}_{1}".format(db_name, vi)
# Store get params from config/environment variables
context.conf = {
"host": context.config.userdata.get("my_test_host", os.getenv("PYTEST_HOST", "localhost")),
"port": context.config.userdata.get("my_test_port", int(os.getenv("PYTEST_PORT", "3306"))),
"user": context.config.userdata.get("my_test_user", os.getenv("PYTEST_USER", "root")),
"pass": context.config.userdata.get("my_test_pass", os.getenv("PYTEST_PASSWORD", None)),
"cli_command": context.config.userdata.get("my_cli_command", None)
or sys.executable + ' -c "import coverage ; coverage.process_startup(); import mycli.main; mycli.main.cli()"',
"dbname": db_name,
"dbname_tmp": db_name_full + "_tmp",
"vi": vi,
"pager_boundary": "---boundary---",
}
_, my_cnf = mkstemp()
with open(my_cnf, "w") as f:
f.write(
"[client]\n" "pager={0} {1} {2}\n".format(
sys.executable, os.path.join(context.package_root, "test/features/wrappager.py"), context.conf["pager_boundary"]
)
)
context.conf["defaults-file"] = my_cnf
context.conf["myclirc"] = os.path.join(context.package_root, "test", "myclirc")
context.cn = dbutils.create_db(
context.conf["host"], context.conf["port"], context.conf["user"], context.conf["pass"], context.conf["dbname"]
)
context.fixture_data = fixutils.read_fixture_files()
def after_all(context):
"""Unset env parameters."""
dbutils.close_cn(context.cn)
dbutils.drop_db(context.conf["host"], context.conf["port"], context.conf["user"], context.conf["pass"], context.conf["dbname"])
# Restore env vars.
# for k, v in context.pgenv.items():
# if k in os.environ and v is None:
# del os.environ[k]
# elif v:
# os.environ[k] = v
def before_step(context, _):
context.atprompt = False
def before_scenario(context, arg):
with open(test_log_file, "w") as f:
f.write("")
if arg.location.filename not in SELF_CONNECTING_FEATURES:
run_cli(context)
wait_prompt(context)
if os.path.exists(MY_CNF_PATH):
shutil.move(MY_CNF_PATH, MY_CNF_BACKUP_PATH)
if os.path.exists(MYLOGIN_CNF_PATH):
shutil.move(MYLOGIN_CNF_PATH, MYLOGIN_CNF_BACKUP_PATH)
def after_scenario(context, _):
"""Cleans up after each test complete."""
with open(test_log_file) as f:
for line in f:
if "error" in line.lower():
raise RuntimeError(f"Error in log file: {line}")
if hasattr(context, "cli") and not context.exit_sent:
# Quit nicely.
if not context.atprompt:
user = context.conf["user"]
host = context.conf["host"]
dbname = context.currentdb
context.cli.expect_exact("{0}@{1}:{2}>".format(user, host, dbname), timeout=5)
context.cli.sendcontrol("c")
context.cli.sendcontrol("d")
context.cli.expect_exact(pexpect.EOF, timeout=5)
if os.path.exists(MY_CNF_BACKUP_PATH):
shutil.move(MY_CNF_BACKUP_PATH, MY_CNF_PATH)
if os.path.exists(MYLOGIN_CNF_BACKUP_PATH):
shutil.move(MYLOGIN_CNF_BACKUP_PATH, MYLOGIN_CNF_PATH)
elif os.path.exists(MYLOGIN_CNF_PATH):
# This file was moved in `before_scenario`.
# If it exists now, it has been created during a test
os.remove(MYLOGIN_CNF_PATH)
# TODO: uncomment to debug a failure
# def after_step(context, step):
# if step.status == "failed":
# import ipdb; ipdb.set_trace()