499 lines
14 KiB
Python
499 lines
14 KiB
Python
#!/usr/bin/env python
|
|
# SPDX-License-Identifier: GPL-2.0-or-later
|
|
|
|
# Copyright 2017, LabN Consulting, L.L.C.
|
|
|
|
import os
|
|
import re
|
|
import sys
|
|
import time
|
|
import json
|
|
import math
|
|
import time
|
|
from lib.topolog import logger
|
|
from lib.topotest import json_cmp
|
|
|
|
|
|
# L utility functions
|
|
#
|
|
# These functions are inteneted to provide support for CI testing within MiniNet
|
|
# environments.
|
|
|
|
|
|
class lUtil:
|
|
# to be made configurable in the future
|
|
base_script_dir = "."
|
|
base_log_dir = "."
|
|
fout_name = "output.log"
|
|
fsum_name = "summary.txt"
|
|
l_level = 6
|
|
CallOnFail = False
|
|
|
|
l_total = 0
|
|
l_pass = 0
|
|
l_fail = 0
|
|
l_filename = ""
|
|
l_last = None
|
|
l_line = 0
|
|
l_dotall_experiment = False
|
|
l_last_nl = None
|
|
l_wait_strict = 1
|
|
|
|
fout = ""
|
|
fsum = ""
|
|
net = ""
|
|
|
|
def log(self, str, level=6):
|
|
if self.l_level > 0:
|
|
if self.fout == "":
|
|
self.fout = open(self.fout_name, "w")
|
|
self.fout.write(str + "\n")
|
|
if level <= self.l_level:
|
|
print(str)
|
|
|
|
def summary(self, str):
|
|
if self.fsum == "":
|
|
self.fsum = open(self.fsum_name, "w")
|
|
self.fsum.write(
|
|
"\
|
|
******************************************************************************\n"
|
|
)
|
|
self.fsum.write(
|
|
"\
|
|
Test Target Summary Pass Fail\n"
|
|
)
|
|
self.fsum.write(
|
|
"\
|
|
******************************************************************************\n"
|
|
)
|
|
self.fsum.write(str + "\n")
|
|
|
|
def result(self, target, success, str, logstr=None):
|
|
if success:
|
|
p = 1
|
|
f = 0
|
|
self.l_pass += 1
|
|
sstr = "PASS"
|
|
else:
|
|
f = 1
|
|
p = 0
|
|
self.l_fail += 1
|
|
sstr = "FAIL"
|
|
self.l_total += 1
|
|
if logstr != None:
|
|
self.log("R:%d %s: %s" % (self.l_total, sstr, logstr))
|
|
res = "%-4d %-6s %-56s %-4d %d" % (self.l_total, target, str, p, f)
|
|
self.log("R:" + res)
|
|
self.summary(res)
|
|
if f == 1 and self.CallOnFail != False:
|
|
self.CallOnFail()
|
|
|
|
def closeFiles(self):
|
|
ret = (
|
|
"\
|
|
******************************************************************************\n\
|
|
Total %-4d %-4d %d\n\
|
|
******************************************************************************"
|
|
% (self.l_total, self.l_pass, self.l_fail)
|
|
)
|
|
if self.fsum != "":
|
|
self.fsum.write(ret + "\n")
|
|
self.fsum.close()
|
|
self.fsum = ""
|
|
if self.fout != "":
|
|
if os.path.isfile(self.fsum_name):
|
|
r = open(self.fsum_name, "r")
|
|
self.fout.write(r.read())
|
|
r.close()
|
|
self.fout.close()
|
|
self.fout = ""
|
|
return ret
|
|
|
|
def setFilename(self, name):
|
|
str = "FILE: " + name
|
|
self.log(str)
|
|
self.summary(str)
|
|
self.l_filename = name
|
|
self.line = 0
|
|
|
|
def getCallOnFail(self):
|
|
return self.CallOnFail
|
|
|
|
def setCallOnFail(self, CallOnFail):
|
|
self.CallOnFail = CallOnFail
|
|
|
|
def strToArray(self, string):
|
|
a = []
|
|
c = 0
|
|
end = ""
|
|
words = string.split()
|
|
if len(words) < 1 or words[0].startswith("#"):
|
|
return a
|
|
words = string.split()
|
|
for word in words:
|
|
if len(end) == 0:
|
|
a.append(word)
|
|
else:
|
|
a[c] += str(" " + word)
|
|
if end == "\\":
|
|
end = ""
|
|
if not word.endswith("\\"):
|
|
if end != '"':
|
|
if word.startswith('"'):
|
|
end = '"'
|
|
else:
|
|
c += 1
|
|
else:
|
|
if word.endswith('"'):
|
|
end = ""
|
|
c += 1
|
|
else:
|
|
c += 1
|
|
else:
|
|
end = "\\"
|
|
# if len(end) == 0:
|
|
# print('%d:%s:' % (c, a[c-1]))
|
|
|
|
return a
|
|
|
|
def execTestFile(self, tstFile):
|
|
if os.path.isfile(tstFile):
|
|
f = open(tstFile)
|
|
for line in f:
|
|
if len(line) > 1:
|
|
a = self.strToArray(line)
|
|
if len(a) >= 6:
|
|
luCommand(a[1], a[2], a[3], a[4], a[5])
|
|
else:
|
|
self.l_line += 1
|
|
self.log("%s:%s %s" % (self.l_filename, self.l_line, line))
|
|
if len(a) >= 2:
|
|
if a[0] == "sleep":
|
|
time.sleep(int(a[1]))
|
|
elif a[0] == "include":
|
|
self.execTestFile(a[1])
|
|
f.close()
|
|
else:
|
|
self.log("unable to read: " + tstFile)
|
|
sys.exit(1)
|
|
|
|
def command(
|
|
self,
|
|
target,
|
|
command,
|
|
regexp,
|
|
op,
|
|
result,
|
|
returnJson,
|
|
startt=None,
|
|
force_result=False,
|
|
):
|
|
global net
|
|
if op == "jsoncmp_pass" or op == "jsoncmp_fail":
|
|
returnJson = True
|
|
|
|
self.log(
|
|
"%s (#%d) %s:%s COMMAND:%s:%s:%s:%s:%s:"
|
|
% (
|
|
time.asctime(),
|
|
self.l_total + 1,
|
|
self.l_filename,
|
|
self.l_line,
|
|
target,
|
|
command,
|
|
regexp,
|
|
op,
|
|
result,
|
|
)
|
|
)
|
|
if self.net == "":
|
|
return False
|
|
# self.log("Running %s %s" % (target, command))
|
|
js = None
|
|
out = self.net[target].cmd(command).rstrip()
|
|
if len(out) == 0:
|
|
report = "<no output>"
|
|
else:
|
|
report = out
|
|
if returnJson == True:
|
|
try:
|
|
js = json.loads(out)
|
|
except:
|
|
js = None
|
|
self.log(
|
|
"WARNING: JSON load failed -- confirm command output is in JSON format."
|
|
)
|
|
self.log("COMMAND OUTPUT:%s:" % report)
|
|
|
|
# JSON comparison
|
|
if op == "jsoncmp_pass" or op == "jsoncmp_fail":
|
|
try:
|
|
expect = json.loads(regexp)
|
|
except:
|
|
expect = None
|
|
self.log(
|
|
"WARNING: JSON load failed -- confirm regex input is in JSON format."
|
|
)
|
|
json_diff = json_cmp(js, expect)
|
|
if json_diff != None:
|
|
if op == "jsoncmp_fail":
|
|
success = True
|
|
else:
|
|
success = False
|
|
self.log("JSON DIFF:%s:" % json_diff)
|
|
ret = success
|
|
else:
|
|
if op == "jsoncmp_fail":
|
|
success = False
|
|
else:
|
|
success = True
|
|
self.result(target, success, result)
|
|
if js != None:
|
|
return js
|
|
return ret
|
|
|
|
# Experiment: can we achieve the same match behavior via DOTALL
|
|
# without converting newlines to spaces?
|
|
out_nl = out
|
|
search_nl = re.search(regexp, out_nl, re.DOTALL)
|
|
self.l_last_nl = search_nl
|
|
# Set up for comparison
|
|
if search_nl != None:
|
|
group_nl = search_nl.group()
|
|
group_nl_converted = " ".join(group_nl.splitlines())
|
|
else:
|
|
group_nl_converted = None
|
|
|
|
out = " ".join(out.splitlines())
|
|
search = re.search(regexp, out)
|
|
self.l_last = search
|
|
if search == None:
|
|
if op == "fail":
|
|
success = True
|
|
else:
|
|
success = False
|
|
ret = success
|
|
else:
|
|
ret = search.group()
|
|
if op != "fail":
|
|
success = True
|
|
level = 7
|
|
else:
|
|
success = False
|
|
level = 5
|
|
self.log("found:%s:" % ret, level)
|
|
# Experiment: compare matched strings obtained each way
|
|
if self.l_dotall_experiment and (group_nl_converted != ret):
|
|
self.log(
|
|
"DOTALL experiment: strings differ dotall=[%s] orig=[%s]"
|
|
% (group_nl_converted, ret),
|
|
9,
|
|
)
|
|
if startt != None:
|
|
if js != None or ret is not False or force_result is not False:
|
|
delta = time.time() - startt
|
|
self.result(target, success, "%s +%4.2f secs" % (result, delta))
|
|
elif op == "pass" or op == "fail":
|
|
self.result(target, success, result)
|
|
if js != None:
|
|
return js
|
|
return ret
|
|
|
|
def wait(
|
|
self, target, command, regexp, op, result, wait, returnJson, wait_time=0.5
|
|
):
|
|
self.log(
|
|
"%s:%s WAIT:%s:%s:%s:%s:%s:%s:%s:"
|
|
% (
|
|
self.l_filename,
|
|
self.l_line,
|
|
target,
|
|
command,
|
|
regexp,
|
|
op,
|
|
result,
|
|
wait,
|
|
wait_time,
|
|
)
|
|
)
|
|
found = False
|
|
n = 0
|
|
startt = time.time()
|
|
|
|
if (op == "wait-strict") or ((op == "wait") and self.l_wait_strict):
|
|
strict = True
|
|
else:
|
|
strict = False
|
|
|
|
# Calculate the amount of `sleep`s we are going to peform.
|
|
wait_count = int(math.ceil(wait / wait_time)) + 1
|
|
|
|
force_result = False
|
|
while wait_count > 0:
|
|
n += 1
|
|
|
|
# log a failure on last iteration if we don't get desired regexp
|
|
if strict and (wait_count == 1):
|
|
force_result = True
|
|
|
|
found = self.command(
|
|
target, command, regexp, op, result, returnJson, startt, force_result
|
|
)
|
|
if found is not False:
|
|
break
|
|
|
|
wait_count -= 1
|
|
if wait_count > 0:
|
|
time.sleep(wait_time)
|
|
|
|
delta = time.time() - startt
|
|
self.log("Done after %d loops, time=%s, Found=%s" % (n, delta, found))
|
|
return found
|
|
|
|
|
|
# initialized by luStart
|
|
LUtil = None
|
|
|
|
|
|
# entry calls
|
|
def luStart(
|
|
baseScriptDir=".",
|
|
baseLogDir=".",
|
|
net="",
|
|
fout="output.log",
|
|
fsum="summary.txt",
|
|
level=None,
|
|
):
|
|
global LUtil
|
|
# init class
|
|
LUtil = lUtil()
|
|
LUtil.base_script_dir = baseScriptDir
|
|
LUtil.base_log_dir = baseLogDir
|
|
LUtil.net = net
|
|
if fout != "":
|
|
LUtil.fout_name = baseLogDir + "/" + fout
|
|
if fsum != None:
|
|
LUtil.fsum_name = baseLogDir + "/" + fsum
|
|
if level != None:
|
|
LUtil.l_level = level
|
|
LUtil.l_dotall_experiment = False
|
|
LUtil.l_dotall_experiment = True
|
|
|
|
|
|
def luCommand(
|
|
target,
|
|
command,
|
|
regexp=".",
|
|
op="none",
|
|
result="",
|
|
time=10,
|
|
returnJson=False,
|
|
wait_time=0.5,
|
|
):
|
|
waitops = ["wait", "wait-strict", "wait-nostrict"]
|
|
|
|
if op in waitops:
|
|
return LUtil.wait(
|
|
target, command, regexp, op, result, time, returnJson, wait_time
|
|
)
|
|
else:
|
|
return LUtil.command(target, command, regexp, op, result, returnJson)
|
|
|
|
|
|
def luLast(usenl=False):
|
|
if usenl:
|
|
if LUtil.l_last_nl != None:
|
|
LUtil.log("luLast:%s:" % LUtil.l_last_nl.group(), 7)
|
|
return LUtil.l_last_nl
|
|
else:
|
|
if LUtil.l_last != None:
|
|
LUtil.log("luLast:%s:" % LUtil.l_last.group(), 7)
|
|
return LUtil.l_last
|
|
|
|
|
|
def luInclude(filename, CallOnFail=None):
|
|
tstFile = LUtil.base_script_dir + "/" + filename
|
|
LUtil.setFilename(filename)
|
|
if CallOnFail != None:
|
|
oldCallOnFail = LUtil.getCallOnFail()
|
|
LUtil.setCallOnFail(CallOnFail)
|
|
if filename.endswith(".py"):
|
|
LUtil.log("luInclude: execfile " + tstFile)
|
|
with open(tstFile) as infile:
|
|
exec(infile.read())
|
|
else:
|
|
LUtil.log("luInclude: execTestFile " + tstFile)
|
|
LUtil.execTestFile(tstFile)
|
|
if CallOnFail != None:
|
|
LUtil.setCallOnFail(oldCallOnFail)
|
|
|
|
|
|
def luFinish():
|
|
global LUtil
|
|
ret = LUtil.closeFiles()
|
|
# done
|
|
LUtil = None
|
|
return ret
|
|
|
|
|
|
def luNumFail():
|
|
return LUtil.l_fail
|
|
|
|
|
|
def luNumPass():
|
|
return LUtil.l_pass
|
|
|
|
|
|
def luResult(target, success, str, logstr=None):
|
|
return LUtil.result(target, success, str, logstr)
|
|
|
|
|
|
def luShowResults(prFunction):
|
|
printed = 0
|
|
sf = open(LUtil.fsum_name, "r")
|
|
for line in sf:
|
|
printed += 1
|
|
prFunction(line.rstrip())
|
|
sf.close()
|
|
|
|
|
|
def luShowFail():
|
|
printed = 0
|
|
sf = open(LUtil.fsum_name, "r")
|
|
for line in sf:
|
|
if line[-2] != "0":
|
|
printed += 1
|
|
logger.error(line.rstrip())
|
|
sf.close()
|
|
if printed > 0:
|
|
logger.error("See %s for details of errors" % LUtil.fout_name)
|
|
|
|
|
|
#
|
|
# Sets default wait type for luCommand(op="wait) (may be overridden by
|
|
# specifying luCommand(op="wait-strict") or luCommand(op="wait-nostrict")).
|
|
#
|
|
# "nostrict" is the historical default behavior, which is to ignore
|
|
# failures to match the specified regexp in the specified time.
|
|
#
|
|
# "strict" means that failure to match the specified regexp in the
|
|
# specified time yields an explicit, logged failure result
|
|
#
|
|
def luSetWaitType(waittype):
|
|
if waittype == "strict":
|
|
LUtil.l_wait_strict = 1
|
|
else:
|
|
if waittype == "nostrict":
|
|
LUtil.l_wait_strict = 0
|
|
else:
|
|
raise ValueError('waittype must be one of "strict" or "nostrict"')
|
|
|
|
|
|
# for testing
|
|
if __name__ == "__main__":
|
|
print(os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + "/lib")
|
|
luStart()
|
|
for arg in sys.argv[1:]:
|
|
luInclude(arg)
|
|
luFinish()
|
|
sys.exit(0)
|