2025-02-09 18:53:36 +01:00
|
|
|
import os
|
2025-02-09 18:56:31 +01:00
|
|
|
import shutil
|
2025-02-09 18:53:36 +01:00
|
|
|
import sys
|
|
|
|
from tempfile import mkstemp
|
|
|
|
|
|
|
|
import db_utils as dbutils
|
|
|
|
import fixture_utils as fixutils
|
|
|
|
import pexpect
|
|
|
|
|
|
|
|
from steps.wrappers import run_cli, wait_prompt
|
|
|
|
|
|
|
|
test_log_file = os.path.join(os.environ['HOME'], '.mycli.test.log')
|
|
|
|
|
|
|
|
|
2025-02-09 18:56:31 +01:00
|
|
|
SELF_CONNECTING_FEATURES = (
|
|
|
|
'test/features/connection.feature',
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
MY_CNF_PATH = os.path.expanduser('~/.my.cnf')
|
|
|
|
MY_CNF_BACKUP_PATH = f'{MY_CNF_PATH}.backup'
|
|
|
|
MYLOGIN_CNF_PATH = os.path.expanduser('~/.mylogin.cnf')
|
|
|
|
MYLOGIN_CNF_BACKUP_PATH = f'{MYLOGIN_CNF_PATH}.backup'
|
|
|
|
|
|
|
|
|
|
|
|
def get_db_name_from_context(context):
|
|
|
|
return context.config.userdata.get(
|
|
|
|
'my_test_db', None
|
|
|
|
) or "mycli_behave_tests"
|
|
|
|
|
|
|
|
|
|
|
|
|
2025-02-09 18:53:36 +01:00
|
|
|
def before_all(context):
|
|
|
|
"""Set env parameters."""
|
|
|
|
os.environ['LINES'] = "100"
|
|
|
|
os.environ['COLUMNS'] = "100"
|
|
|
|
os.environ['EDITOR'] = 'ex'
|
|
|
|
os.environ['LC_ALL'] = 'en_US.UTF-8'
|
|
|
|
os.environ['PROMPT_TOOLKIT_NO_CPR'] = '1'
|
|
|
|
os.environ['MYCLI_HISTFILE'] = os.devnull
|
|
|
|
|
|
|
|
test_dir = os.path.abspath(os.path.dirname(os.path.dirname(__file__)))
|
|
|
|
login_path_file = os.path.join(test_dir, 'mylogin.cnf')
|
2025-02-09 18:56:31 +01:00
|
|
|
# os.environ['MYSQL_TEST_LOGIN_FILE'] = login_path_file
|
2025-02-09 18:53:36 +01:00
|
|
|
|
|
|
|
context.package_root = os.path.abspath(
|
|
|
|
os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
|
|
|
|
|
|
|
|
os.environ["COVERAGE_PROCESS_START"] = os.path.join(context.package_root,
|
|
|
|
'.coveragerc')
|
|
|
|
|
|
|
|
context.exit_sent = False
|
|
|
|
|
|
|
|
vi = '_'.join([str(x) for x in sys.version_info[:3]])
|
2025-02-09 18:56:31 +01:00
|
|
|
db_name = get_db_name_from_context(context)
|
2025-02-09 18:53:36 +01:00
|
|
|
db_name_full = '{0}_{1}'.format(db_name, vi)
|
|
|
|
|
|
|
|
# Store get params from config/environment variables
|
|
|
|
context.conf = {
|
|
|
|
'host': context.config.userdata.get(
|
|
|
|
'my_test_host',
|
|
|
|
os.getenv('PYTEST_HOST', 'localhost')
|
|
|
|
),
|
|
|
|
'port': context.config.userdata.get(
|
|
|
|
'my_test_port',
|
|
|
|
int(os.getenv('PYTEST_PORT', '3306'))
|
|
|
|
),
|
|
|
|
'user': context.config.userdata.get(
|
|
|
|
'my_test_user',
|
|
|
|
os.getenv('PYTEST_USER', 'root')
|
|
|
|
),
|
|
|
|
'pass': context.config.userdata.get(
|
|
|
|
'my_test_pass',
|
|
|
|
os.getenv('PYTEST_PASSWORD', None)
|
|
|
|
),
|
|
|
|
'cli_command': context.config.userdata.get(
|
|
|
|
'my_cli_command', None) or
|
|
|
|
sys.executable + ' -c "import coverage ; coverage.process_startup(); import mycli.main; mycli.main.cli()"',
|
|
|
|
'dbname': db_name,
|
|
|
|
'dbname_tmp': db_name_full + '_tmp',
|
|
|
|
'vi': vi,
|
|
|
|
'pager_boundary': '---boundary---',
|
|
|
|
}
|
|
|
|
|
|
|
|
_, my_cnf = mkstemp()
|
|
|
|
with open(my_cnf, 'w') as f:
|
|
|
|
f.write(
|
|
|
|
'[client]\n'
|
|
|
|
'pager={0} {1} {2}\n'.format(
|
|
|
|
sys.executable, os.path.join(context.package_root,
|
|
|
|
'test/features/wrappager.py'),
|
|
|
|
context.conf['pager_boundary'])
|
|
|
|
)
|
|
|
|
context.conf['defaults-file'] = my_cnf
|
|
|
|
context.conf['myclirc'] = os.path.join(context.package_root, 'test',
|
|
|
|
'myclirc')
|
|
|
|
|
|
|
|
context.cn = dbutils.create_db(context.conf['host'], context.conf['port'],
|
|
|
|
context.conf['user'],
|
|
|
|
context.conf['pass'],
|
|
|
|
context.conf['dbname'])
|
|
|
|
|
|
|
|
context.fixture_data = fixutils.read_fixture_files()
|
|
|
|
|
|
|
|
|
|
|
|
def after_all(context):
|
|
|
|
"""Unset env parameters."""
|
|
|
|
dbutils.close_cn(context.cn)
|
|
|
|
dbutils.drop_db(context.conf['host'], context.conf['port'],
|
|
|
|
context.conf['user'], context.conf['pass'],
|
|
|
|
context.conf['dbname'])
|
|
|
|
|
|
|
|
# Restore env vars.
|
|
|
|
#for k, v in context.pgenv.items():
|
|
|
|
# if k in os.environ and v is None:
|
|
|
|
# del os.environ[k]
|
|
|
|
# elif v:
|
|
|
|
# os.environ[k] = v
|
|
|
|
|
|
|
|
|
|
|
|
def before_step(context, _):
|
|
|
|
context.atprompt = False
|
|
|
|
|
|
|
|
|
2025-02-09 18:56:31 +01:00
|
|
|
def before_scenario(context, arg):
|
2025-02-09 18:53:36 +01:00
|
|
|
with open(test_log_file, 'w') as f:
|
|
|
|
f.write('')
|
2025-02-09 18:56:31 +01:00
|
|
|
if arg.location.filename not in SELF_CONNECTING_FEATURES:
|
|
|
|
run_cli(context)
|
|
|
|
wait_prompt(context)
|
|
|
|
|
|
|
|
if os.path.exists(MY_CNF_PATH):
|
|
|
|
shutil.move(MY_CNF_PATH, MY_CNF_BACKUP_PATH)
|
|
|
|
|
|
|
|
if os.path.exists(MYLOGIN_CNF_PATH):
|
|
|
|
shutil.move(MYLOGIN_CNF_PATH, MYLOGIN_CNF_BACKUP_PATH)
|
2025-02-09 18:53:36 +01:00
|
|
|
|
|
|
|
|
|
|
|
def after_scenario(context, _):
|
|
|
|
"""Cleans up after each test complete."""
|
|
|
|
with open(test_log_file) as f:
|
|
|
|
for line in f:
|
|
|
|
if 'error' in line.lower():
|
|
|
|
raise RuntimeError(f'Error in log file: {line}')
|
|
|
|
|
|
|
|
if hasattr(context, 'cli') and not context.exit_sent:
|
|
|
|
# Quit nicely.
|
|
|
|
if not context.atprompt:
|
|
|
|
user = context.conf['user']
|
|
|
|
host = context.conf['host']
|
|
|
|
dbname = context.currentdb
|
|
|
|
context.cli.expect_exact(
|
|
|
|
'{0}@{1}:{2}>'.format(
|
|
|
|
user, host, dbname
|
|
|
|
),
|
|
|
|
timeout=5
|
|
|
|
)
|
|
|
|
context.cli.sendcontrol('c')
|
|
|
|
context.cli.sendcontrol('d')
|
|
|
|
context.cli.expect_exact(pexpect.EOF, timeout=5)
|
|
|
|
|
2025-02-09 18:56:31 +01:00
|
|
|
if os.path.exists(MY_CNF_BACKUP_PATH):
|
|
|
|
shutil.move(MY_CNF_BACKUP_PATH, MY_CNF_PATH)
|
|
|
|
|
|
|
|
if os.path.exists(MYLOGIN_CNF_BACKUP_PATH):
|
|
|
|
shutil.move(MYLOGIN_CNF_BACKUP_PATH, MYLOGIN_CNF_PATH)
|
|
|
|
elif os.path.exists(MYLOGIN_CNF_PATH):
|
|
|
|
# This file was moved in `before_scenario`.
|
|
|
|
# If it exists now, it has been created during a test
|
|
|
|
os.remove(MYLOGIN_CNF_PATH)
|
|
|
|
|
|
|
|
|
2025-02-09 18:53:36 +01:00
|
|
|
# TODO: uncomment to debug a failure
|
|
|
|
# def after_step(context, step):
|
|
|
|
# if step.status == "failed":
|
|
|
|
# import ipdb; ipdb.set_trace()
|