1
0
Fork 0

Merging upstream version 2.12.

Signed-off-by: Daniel Baumann <daniel@debian.org>
This commit is contained in:
Daniel Baumann 2025-03-20 08:10:44 +01:00
parent 078c0dbcc0
commit 635faa7346
Signed by: daniel
GPG key ID: FBB4F0E80A80222F
571 changed files with 10718 additions and 2738 deletions

View file

@ -30,12 +30,22 @@ import shutil
import stat
import subprocess
import sys
import time
import unittest
import time
from nvme_test_logger import TestNVMeLogger
def to_decimal(value):
""" Wrapper for converting numbers to base 10 decimal
- Args:
- value: A number in any common base
- Returns:
- Decimal integer
"""
return int(str(value), 0)
class TestNVMe(unittest.TestCase):
"""
@ -58,19 +68,28 @@ class TestNVMe(unittest.TestCase):
self.ctrl = "XXX"
self.ns1 = "XXX"
self.test_log_dir = "XXX"
self.nvme_bin = "nvme"
self.do_validate_pci_device = True
self.default_nsid = 0x1
self.flbas = 0
self.config_file = 'tests/config.json'
self.load_config()
if self.do_validate_pci_device:
self.validate_pci_device()
self.create_and_attach_default_ns()
print(f"\nsetup: ctrl: {self.ctrl}, ns1: {self.ns1}, default_nsid: {self.default_nsid}, flbas: {self.flbas}\n")
def tearDown(self):
""" Post Section for TestNVMe. """
if self.clear_log_dir is True:
shutil.rmtree(self.log_dir, ignore_errors=True)
self.create_and_attach_default_ns()
print(f"\nteardown: ctrl: {self.ctrl}, ns1: {self.ns1}, default_nsid: {self.default_nsid}, flbas: {self.flbas}\n")
@classmethod
def tearDownClass(cls):
print("\n")
def create_and_attach_default_ns(self):
""" Creates a default namespace with the full capacity of the ctrls NVM
@ -103,8 +122,8 @@ class TestNVMe(unittest.TestCase):
- None
"""
x1, x2, dev = self.ctrl.split('/')
cmd = cmd = "find /sys/devices -name \\*" + dev + " | grep -i pci"
err = subprocess.call(cmd, shell=True)
cmd = "find /sys/devices -name \\*" + dev + " | grep -i pci"
err = subprocess.call(cmd, shell=True, stdout=subprocess.DEVNULL)
self.assertEqual(err, 0, "ERROR : Only NVMe PCI subsystem is supported")
def load_config(self):
@ -119,7 +138,10 @@ class TestNVMe(unittest.TestCase):
self.ctrl = config['controller']
self.ns1 = config['ns1']
self.log_dir = config['log_dir']
self.do_validate_pci_device = config.get('do_validate_pci_device', self.do_validate_pci_device)
self.nvme_bin = config.get('nvme_bin', self.nvme_bin)
print(f"\nUsing nvme binary '{self.nvme_bin}'")
self.do_validate_pci_device = config.get(
'do_validate_pci_device', self.do_validate_pci_device)
self.clear_log_dir = False
if self.clear_log_dir is True:
@ -154,20 +176,17 @@ class TestNVMe(unittest.TestCase):
- Returns:
- None
"""
nvme_reset_cmd = "nvme reset " + self.ctrl
nvme_reset_cmd = f"{self.nvme_bin} reset {self.ctrl}"
err = subprocess.call(nvme_reset_cmd,
shell=True,
stdout=subprocess.PIPE,
encoding='utf-8')
stdout=subprocess.DEVNULL)
self.assertEqual(err, 0, "ERROR : nvme reset failed")
time.sleep(5)
rescan_cmd = "echo 1 > /sys/bus/pci/rescan"
proc = subprocess.Popen(rescan_cmd,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
encoding='utf-8')
time.sleep(5)
self.assertEqual(proc.wait(), 0, "ERROR : pci rescan failed")
def get_ctrl_id(self):
@ -177,7 +196,8 @@ class TestNVMe(unittest.TestCase):
- Returns:
- controller id.
"""
get_ctrl_id = f"nvme list-ctrl {self.ctrl} --output-format=json"
get_ctrl_id = f"{self.nvme_bin} list-ctrl {self.ctrl} " + \
"--output-format=json"
proc = subprocess.Popen(get_ctrl_id,
shell=True,
stdout=subprocess.PIPE,
@ -189,7 +209,7 @@ class TestNVMe(unittest.TestCase):
"ERROR : nvme list-ctrl could not find ctrl")
return str(json_output['ctrl_list'][0]['ctrl_id'])
def get_ns_list(self):
def get_nsid_list(self):
""" Wrapper for extracting the namespace list.
- Args:
- None
@ -197,14 +217,17 @@ class TestNVMe(unittest.TestCase):
- List of the namespaces.
"""
ns_list = []
ns_list_cmd = "nvme list-ns " + self.ctrl
ns_list_cmd = f"{self.nvme_bin} list-ns {self.ctrl} " + \
"--output-format=json"
proc = subprocess.Popen(ns_list_cmd,
shell=True,
stdout=subprocess.PIPE,
encoding='utf-8')
self.assertEqual(proc.wait(), 0, "ERROR : nvme list namespace failed")
for line in proc.stdout:
ns_list.append(line.split('x')[-1])
json_output = json.loads(proc.stdout.read())
for ns in json_output['nsid_list']:
ns_list.append(ns['nsid'])
return ns_list
@ -215,22 +238,16 @@ class TestNVMe(unittest.TestCase):
- Returns:
- maximum number of namespaces supported.
"""
pattern = re.compile("^nn[ ]+: [0-9]", re.IGNORECASE)
max_ns = -1
max_ns_cmd = "nvme id-ctrl " + self.ctrl
max_ns_cmd = f"{self.nvme_bin} id-ctrl {self.ctrl} " + \
"--output-format=json"
proc = subprocess.Popen(max_ns_cmd,
shell=True,
stdout=subprocess.PIPE,
encoding='utf-8')
err = proc.wait()
self.assertEqual(err, 0, "ERROR : reading maximum namespace count failed")
for line in proc.stdout:
if pattern.match(line):
max_ns = line.split(":")[1].strip()
break
print(max_ns)
return int(max_ns)
json_output = json.loads(proc.stdout.read())
return int(json_output['nn'])
def get_lba_status_supported(self):
""" Check if 'Get LBA Status' command is supported by the device
@ -239,7 +256,7 @@ class TestNVMe(unittest.TestCase):
- Returns:
- True if 'Get LBA Status' command is supported, otherwise False
"""
return int(self.get_id_ctrl_field_value("oacs"), 16) & (1 << 9)
return to_decimal(self.get_id_ctrl_field_value("oacs")) & (1 << 9)
def get_lba_format_size(self):
""" Wrapper for extracting lba format size of the given flbas
@ -248,7 +265,8 @@ class TestNVMe(unittest.TestCase):
- Returns:
- lba format size as a tuple of (data_size, metadata_size) in bytes.
"""
nvme_id_ns_cmd = f"nvme id-ns {self.ns1} --output-format=json"
nvme_id_ns_cmd = f"{self.nvme_bin} id-ns {self.ns1} " + \
"--output-format=json"
proc = subprocess.Popen(nvme_id_ns_cmd,
shell=True,
stdout=subprocess.PIPE,
@ -274,24 +292,9 @@ class TestNVMe(unittest.TestCase):
- Args:
- None
- Returns:
- maximum number of namespaces supported.
- Total NVM capacity.
"""
pattern = re.compile("^tnvmcap[ ]+: [0-9]", re.IGNORECASE)
ncap = -1
ncap_cmd = "nvme id-ctrl " + self.ctrl
proc = subprocess.Popen(ncap_cmd,
shell=True,
stdout=subprocess.PIPE,
encoding='utf-8')
err = proc.wait()
self.assertEqual(err, 0, "ERROR : reading nvm capacity failed")
for line in proc.stdout:
if pattern.match(line):
ncap = line.split(":")[1].strip()
break
print(ncap)
return int(ncap)
return to_decimal(self.get_id_ctrl_field_value("tnvmcap"))
def get_id_ctrl_field_value(self, field):
""" Wrapper for extracting id-ctrl field values
@ -300,7 +303,8 @@ class TestNVMe(unittest.TestCase):
- Returns:
- Filed value of the given field
"""
id_ctrl_cmd = f"nvme id-ctrl {self.ctrl} --output-format=json"
id_ctrl_cmd = f"{self.nvme_bin} id-ctrl {self.ctrl} " + \
"--output-format=json"
proc = subprocess.Popen(id_ctrl_cmd,
shell=True,
stdout=subprocess.PIPE,
@ -319,30 +323,7 @@ class TestNVMe(unittest.TestCase):
- Returns:
- Optional Copy Formats Supported
"""
return int(self.get_id_ctrl_field_value("ocfs"), 16)
def get_format(self):
""" Wrapper for extracting format.
- Args:
- None
- Returns:
- maximum format of namespace.
"""
# defaulting to 4K
nvm_format = 4096
nvm_format_cmd = "nvme id-ns " + self.ctrl + " -n1"
proc = subprocess.Popen(nvm_format_cmd,
shell=True,
stdout=subprocess.PIPE,
encoding='utf-8')
err = proc.wait()
self.assertEqual(err, 0, "ERROR : reading nvm capacity failed")
for line in proc.stdout:
if "in use" in line:
nvm_format = 2 ** int(line.split(":")[3].split()[0])
print(nvm_format)
return int(nvm_format)
return to_decimal(self.get_id_ctrl_field_value("ocfs"))
def delete_all_ns(self):
""" Wrapper for deleting all the namespaces.
@ -351,15 +332,19 @@ class TestNVMe(unittest.TestCase):
- Returns:
- None
"""
delete_ns_cmd = "nvme delete-ns " + self.ctrl + " -n 0xFFFFFFFF"
delete_ns_cmd = f"{self.nvme_bin} delete-ns {self.ctrl} " + \
"--namespace-id=0xFFFFFFFF"
self.assertEqual(self.exec_cmd(delete_ns_cmd), 0)
list_ns_cmd = "nvme list-ns " + self.ctrl + " --all | wc -l"
list_ns_cmd = f"{self.nvme_bin} list-ns {self.ctrl} --all " + \
"--output-format=json"
proc = subprocess.Popen(list_ns_cmd,
shell=True,
stdout=subprocess.PIPE,
encoding='utf-8')
output = proc.stdout.read().strip()
self.assertEqual(output, '0', "ERROR : deleting all namespace failed")
self.assertEqual(proc.wait(), 0, "ERROR : nvme list-ns failed")
json_output = json.loads(proc.stdout.read())
self.assertEqual(len(json_output['nsid_list']), 0,
"ERROR : deleting all namespace failed")
def create_ns(self, nsze, ncap, flbas, dps):
""" Wrapper for creating a namespace.
@ -371,9 +356,9 @@ class TestNVMe(unittest.TestCase):
- Returns:
- return code of the nvme create namespace command.
"""
create_ns_cmd = "nvme create-ns " + self.ctrl + " --nsze=" + \
str(nsze) + " --ncap=" + str(ncap) + \
" --flbas=" + str(flbas) + " --dps=" + str(dps)
create_ns_cmd = f"{self.nvme_bin} create-ns {self.ctrl} " + \
f"--nsze={str(nsze)} --ncap={str(ncap)} --flbas={str(flbas)} " + \
f"--dps={str(dps)} --verbose"
return self.exec_cmd(create_ns_cmd)
def create_and_validate_ns(self, nsid, nsze, ncap, flbas, dps):
@ -389,15 +374,14 @@ class TestNVMe(unittest.TestCase):
"""
err = self.create_ns(nsze, ncap, flbas, dps)
if err == 0:
time.sleep(2)
id_ns_cmd = "nvme id-ns " + self.ctrl + " -n " + str(nsid)
id_ns_cmd = f"{self.nvme_bin} id-ns {self.ctrl} " + \
f"--namespace-id={str(nsid)}"
err = subprocess.call(id_ns_cmd,
shell=True,
stdout=subprocess.PIPE,
encoding='utf-8')
stdout=subprocess.DEVNULL)
return err
def attach_ns(self, ctrl_id, ns_id):
def attach_ns(self, ctrl_id, nsid):
""" Wrapper for attaching the namespace.
- Args:
- ctrl_id : controller id to which namespace to be attached.
@ -405,21 +389,23 @@ class TestNVMe(unittest.TestCase):
- Returns:
- 0 on success, error code on failure.
"""
attach_ns_cmd = "nvme attach-ns " + self.ctrl + \
" --namespace-id=" + str(ns_id) + \
" --controllers=" + ctrl_id
attach_ns_cmd = f"{self.nvme_bin} attach-ns {self.ctrl} " + \
f"--namespace-id={str(nsid)} --controllers={ctrl_id} --verbose"
err = subprocess.call(attach_ns_cmd,
shell=True,
stdout=subprocess.PIPE,
encoding='utf-8')
time.sleep(5)
if err == 0:
# enumerate new namespace block device
self.nvme_reset_ctrl()
time.sleep(5)
# check if new namespace block device exists
err = 0 if stat.S_ISBLK(os.stat(self.ns1).st_mode) else 1
return err
stdout=subprocess.DEVNULL)
if err != 0:
return err
# Try to find block device for 5 seconds
device_path = f"{self.ctrl}n{str(nsid)}"
stop_time = time.time() + 5
while time.time() < stop_time:
if os.path.exists(device_path) and stat.S_ISBLK(os.stat(device_path).st_mode):
return 0
time.sleep(0.1)
return 1
def detach_ns(self, ctrl_id, nsid):
""" Wrapper for detaching the namespace.
@ -429,13 +415,11 @@ class TestNVMe(unittest.TestCase):
- Returns:
- 0 on success, error code on failure.
"""
detach_ns_cmd = "nvme detach-ns " + self.ctrl + \
" --namespace-id=" + str(nsid) + \
" --controllers=" + ctrl_id
detach_ns_cmd = f"{self.nvme_bin} detach-ns {self.ctrl} " + \
f"--namespace-id={str(nsid)} --controllers={ctrl_id} --verbose"
return subprocess.call(detach_ns_cmd,
shell=True,
stdout=subprocess.PIPE,
encoding='utf-8')
stdout=subprocess.DEVNULL)
def delete_and_validate_ns(self, nsid):
""" Wrapper for deleting and validating that namespace is deleted.
@ -445,11 +429,11 @@ class TestNVMe(unittest.TestCase):
- 0 on success, 1 on failure.
"""
# delete the namespace
delete_ns_cmd = "nvme delete-ns " + self.ctrl + " -n " + str(nsid)
delete_ns_cmd = f"{self.nvme_bin} delete-ns {self.ctrl} " + \
f"--namespace-id={str(nsid)} --verbose"
err = subprocess.call(delete_ns_cmd,
shell=True,
stdout=subprocess.PIPE,
encoding='utf-8')
stdout=subprocess.DEVNULL)
self.assertEqual(err, 0, "ERROR : delete namespace failed")
return err
@ -460,16 +444,14 @@ class TestNVMe(unittest.TestCase):
- Returns:
- 0 on success, error code on failure.
"""
smart_log_cmd = "nvme smart-log " + self.ctrl + " -n " + str(nsid)
print(smart_log_cmd)
smart_log_cmd = f"{self.nvme_bin} smart-log {self.ctrl} " + \
f"--namespace-id={str(nsid)}"
proc = subprocess.Popen(smart_log_cmd,
shell=True,
stdout=subprocess.PIPE,
encoding='utf-8')
err = proc.wait()
self.assertEqual(err, 0, "ERROR : nvme smart log failed")
smart_log_output = proc.communicate()[0]
print(f"{smart_log_output}")
return err
def get_id_ctrl(self, vendor=False):
@ -480,10 +462,10 @@ class TestNVMe(unittest.TestCase):
- 0 on success, error code on failure.
"""
if not vendor:
id_ctrl_cmd = "nvme id-ctrl " + self.ctrl
id_ctrl_cmd = f"{self.nvme_bin} id-ctrl {self.ctrl}"
else:
id_ctrl_cmd = "nvme id-ctrl --vendor-specific " + self.ctrl
print(id_ctrl_cmd)
id_ctrl_cmd = f"{self.nvme_bin} id-ctrl " +\
f"--vendor-specific {self.ctrl}"
proc = subprocess.Popen(id_ctrl_cmd,
shell=True,
stdout=subprocess.PIPE,
@ -500,13 +482,14 @@ class TestNVMe(unittest.TestCase):
- 0 on success, error code on failure.
"""
pattern = re.compile(r"^ Entry\[[ ]*[0-9]+\]")
error_log_cmd = "nvme error-log " + self.ctrl
error_log_cmd = f"{self.nvme_bin} error-log {self.ctrl}"
proc = subprocess.Popen(error_log_cmd,
shell=True,
stdout=subprocess.PIPE,
encoding='utf-8')
err = proc.wait()
self.assertEqual(err, 0, "ERROR : nvme error log failed")
# This sanity checkes the 'normal' output
line = proc.stdout.readline()
err_log_entry_count = int(line.split(" ")[5].strip().split(":")[1])
entry_count = 0
@ -523,42 +506,20 @@ class TestNVMe(unittest.TestCase):
- Returns:
- None
"""
block_size = mmap.PAGESIZE if int(lbads) < 9 else 2 ** int(lbads)
(ds, _) = self.get_lba_format_size()
block_size = ds if int(lbads) < 9 else 2 ** int(lbads)
ns_path = self.ctrl + "n" + str(nsid)
io_cmd = "dd if=" + ns_path + " of=/dev/null" + " bs=" + \
str(block_size) + " count=" + str(count) + " > /dev/null 2>&1"
print(io_cmd)
print(f"Running io: {io_cmd}")
run_io = subprocess.Popen(io_cmd, shell=True, stdout=subprocess.PIPE,
encoding='utf-8')
run_io_result = run_io.communicate()[1]
self.assertEqual(run_io_result, None)
io_cmd = "dd if=/dev/zero of=" + ns_path + " bs=" + \
str(block_size) + " count=" + str(count) + " > /dev/null 2>&1"
print(io_cmd)
print(f"Running io: {io_cmd}")
run_io = subprocess.Popen(io_cmd, shell=True, stdout=subprocess.PIPE,
encoding='utf-8')
run_io_result = run_io.communicate()[1]
self.assertEqual(run_io_result, None)
def supp_check_id_ctrl(self, key):
""" Wrapper for support check.
- Args:
- key : search key.
- Returns:
- value for key requested.
"""
id_ctrl = "nvme id-ctrl " + self.ctrl
print("\n" + id_ctrl)
proc = subprocess.Popen(id_ctrl,
shell=True,
stdout=subprocess.PIPE,
encoding='utf-8')
err = proc.wait()
self.assertEqual(err, 0, "ERROR : nvme Identify controller Data \
structure failed")
for line in proc.stdout:
if key in line:
key = line.replace(",", "", 1)
print(key)
val = (key.split(':'))[1].strip()
return int(val, 16)