1
0
Fork 0
nvme-stas/staslib/stas.py
Daniel Baumann a8f39c03aa
Merging upstream version 2.3.1:
- properly handles big-endian data in `iputils.py` (Closes: #1057031).

Signed-off-by: Daniel Baumann <daniel@debian.org>
2025-02-16 12:56:36 +01:00

604 lines
22 KiB
Python

# Copyright (c) 2021, Dell Inc. or its subsidiaries. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# See the LICENSE file for details.
#
# This file is part of NVMe STorage Appliance Services (nvme-stas).
#
# Authors: Martin Belanger <Martin.Belanger@dell.com>
#
'''Library for staf/stac. You will find here common code for stafd and stacd
including the Abstract Base Classes (ABC) for Controllers and Services'''
import os
import sys
import abc
import signal
import pickle
import logging
import dasbus.connection
from gi.repository import Gio, GLib
from systemd.daemon import notify as sd_notify
from staslib import conf, defs, gutil, iputil, log, trid
try:
# Python 3.9 or later
# This is the preferred way, but may not be available before Python 3.9
from importlib.resources import files
except ImportError:
try:
# Pre Python 3.9 backport of importlib.resources (if installed)
from importlib_resources import files
except ImportError:
# Less efficient, but available on older versions of Python
import pkg_resources
def load_idl(idl_fname):
'''@brief Load D-Bus Interface Description Language File'''
try:
return pkg_resources.resource_string('staslib', idl_fname).decode()
except (FileNotFoundError, AttributeError):
pass
return ''
else:
def load_idl(idl_fname):
'''@brief Load D-Bus Interface Description Language File'''
try:
return files('staslib').joinpath(idl_fname).read_text() # pylint: disable=unspecified-encoding
except FileNotFoundError:
pass
return ''
else:
def load_idl(idl_fname):
'''@brief Load D-Bus Interface Description Language File'''
try:
return files('staslib').joinpath(idl_fname).read_text() # pylint: disable=unspecified-encoding
except FileNotFoundError:
pass
return ''
# ******************************************************************************
def check_if_allowed_to_continue():
'''@brief Let's perform some basic checks before going too far. There are
a few pre-requisites that need to be met before this program
is allowed to proceed:
1) The program needs to have root privileges
2) The nvme-tcp kernel module must be loaded
@return This function will only return if all conditions listed above
are met. Otherwise the program exits.
'''
# 1) Check root privileges
if os.geteuid() != 0:
sys.exit(f'Permission denied. You need root privileges to run {defs.PROG_NAME}.')
# 2) Check that nvme-tcp kernel module is running
if not os.path.exists('/dev/nvme-fabrics'):
# There's no point going any further if the kernel module hasn't been loaded
sys.exit('Fatal error: missing nvme-tcp kernel module')
# ******************************************************************************
def remove_invalid_addresses(controllers: list):
'''@brief Remove controllers with invalid addresses from the list of controllers.
@param controllers: List of TIDs
'''
service_conf = conf.SvcConf()
valid_controllers = list()
for controller in controllers:
if controller.transport in ('tcp', 'rdma'):
# Let's make sure that traddr is
# syntactically a valid IPv4 or IPv6 address.
ip = iputil.get_ipaddress_obj(controller.traddr)
if ip is None:
logging.warning('%s IP address is not valid', controller)
continue
# Let's make sure the address family is enabled.
if ip.version not in service_conf.ip_family:
logging.debug(
'%s ignored because IPv%s is disabled in %s',
controller,
ip.version,
service_conf.conf_file,
)
continue
valid_controllers.append(controller)
elif controller.transport in ('fc', 'loop'):
# At some point, need to validate FC addresses as well...
valid_controllers.append(controller)
else:
logging.warning('Invalid transport %s', controller.transport)
return valid_controllers
# ******************************************************************************
def tid_from_dlpe(dlpe, host_traddr, host_iface, host_nqn):
'''@brief Take a Discovery Log Page Entry and return a Controller ID as a dict.'''
cid = {
'transport': dlpe['trtype'],
'traddr': dlpe['traddr'],
'trsvcid': dlpe['trsvcid'],
'host-traddr': host_traddr,
'host-iface': host_iface,
'subsysnqn': dlpe['subnqn'],
}
if host_nqn:
cid['host-nqn'] = host_nqn
return trid.TID(cid)
# ******************************************************************************
def _excluded(excluded_ctrl_list, controller: dict):
'''@brief Check if @controller is excluded.'''
for excluded_ctrl in excluded_ctrl_list:
test_results = [val == controller.get(key, None) for key, val in excluded_ctrl.items()]
if all(test_results):
return True
return False
# ******************************************************************************
def remove_excluded(controllers: list):
'''@brief Remove excluded controllers from the list of controllers.
@param controllers: List of TIDs
'''
excluded_ctrl_list = conf.SvcConf().get_excluded()
if excluded_ctrl_list:
logging.debug('remove_excluded() - excluded_ctrl_list = %s', excluded_ctrl_list)
controllers = [
controller for controller in controllers if not _excluded(excluded_ctrl_list, controller.as_dict())
]
return controllers
# ******************************************************************************
class ControllerABC(abc.ABC):
'''@brief Base class used to manage the connection to a controller.'''
CONNECT_RETRY_PERIOD_SEC = 60
FAST_CONNECT_RETRY_PERIOD_SEC = 3
def __init__(self, tid: trid.TID, service, discovery_ctrl: bool = False):
self._tid = tid
self._serv = service # Refers to the parent service (either Staf or Stac)
self.set_level_from_tron(self._serv.tron)
self._cancellable = Gio.Cancellable()
self._connect_attempts = 0
self._retry_connect_tmr = gutil.GTimer(self.CONNECT_RETRY_PERIOD_SEC, self._on_try_to_connect)
self._discovery_ctrl = discovery_ctrl
self._try_to_connect_deferred = gutil.Deferred(self._try_to_connect)
self._try_to_connect_deferred.schedule()
def _release_resources(self):
# Remove pending deferred from main loop
if self._try_to_connect_deferred:
self._try_to_connect_deferred.cancel()
if self._retry_connect_tmr is not None:
self._retry_connect_tmr.kill()
if self._alive():
self._cancellable.cancel()
self._tid = None
self._serv = None
self._cancellable = None
self._retry_connect_tmr = None
self._try_to_connect_deferred = None
@property
def id(self) -> str:
'''@brief Return the Transport ID as a printable string'''
return str(self.tid)
@property
def tid(self):
'''@brief Return the Transport ID object'''
return self._tid
def controller_id_dict(self) -> dict:
'''@brief return the controller ID as a dict.'''
return {k: str(v) for k, v in self.tid.as_dict().items()}
def details(self) -> dict:
'''@brief return detailed debug info about this controller'''
return self.info()
def info(self) -> dict:
'''@brief Get the controller info for this object'''
info = self.controller_id_dict()
info['connect attempts'] = str(self._connect_attempts)
info['retry connect timer'] = str(self._retry_connect_tmr)
return info
def cancel(self):
'''@brief Used to cancel pending operations.'''
if self._alive():
logging.debug('ControllerABC.cancel() - %s', self.id)
self._cancellable.cancel()
def kill(self):
'''@brief Used to release all resources associated with this object.'''
logging.debug('ControllerABC.kill() - %s', self.id)
self._release_resources()
def _alive(self):
'''There may be race condition where a queued event gets processed
after the object is no longer configured (i.e. alive). This method
can be used by callback functions to make sure the object is still
alive before processing further.
'''
return self._cancellable and not self._cancellable.is_cancelled()
def _on_try_to_connect(self):
if self._alive():
self._try_to_connect_deferred.schedule()
return GLib.SOURCE_REMOVE
def _should_try_to_reconnect(self):
return True
def _try_to_connect(self):
if not self._alive():
return GLib.SOURCE_REMOVE
# This is a deferred function call. Make sure
# the source of the deferred is still good.
source = GLib.main_current_source()
if source and source.is_destroyed():
return GLib.SOURCE_REMOVE
self._connect_attempts += 1
self._do_connect()
return GLib.SOURCE_REMOVE
@abc.abstractmethod
def set_level_from_tron(self, tron):
'''Set log level based on TRON'''
@abc.abstractmethod
def _do_connect(self):
'''Perform connection'''
@abc.abstractmethod
def _on_aen(self, aen: int):
'''Event handler when an AEN is received'''
@abc.abstractmethod
def _on_nvme_event(self, nvme_event):
'''Event handler when an nvme_event is received'''
@abc.abstractmethod
def _on_ctrl_removed(self, udev_obj):
'''Called when the associated nvme device (/dev/nvmeX) is removed
from the system by the kernel.
'''
@abc.abstractmethod
def _find_existing_connection(self):
'''Check if there is an existing connection that matches this Controller's TID'''
@abc.abstractmethod
def all_ops_completed(self) -> bool:
'''@brief Returns True if all operations have completed. False otherwise.'''
@abc.abstractmethod
def connected(self):
'''@brief Return whether a connection is established'''
@abc.abstractmethod
def disconnect(self, disconnected_cb, keep_connection):
'''@brief Issue an asynchronous disconnect command to a Controller.
Once the async command has completed, the callback 'disconnected_cb'
will be invoked. If a controller is already disconnected, then the
callback will be added to the main loop's next idle slot to be executed
ASAP.
'''
@abc.abstractmethod
def reload_hdlr(self):
'''@brief This is called when a "reload" signal is received.'''
# ******************************************************************************
class ServiceABC(abc.ABC): # pylint: disable=too-many-instance-attributes
'''@brief Base class used to manage a STorage Appliance Service'''
CONF_STABILITY_SOAK_TIME_SEC = 1.5
def __init__(self, args, default_conf, reload_hdlr):
service_conf = conf.SvcConf(default_conf=default_conf)
service_conf.set_conf_file(args.conf_file) # reload configuration
self._tron = args.tron or service_conf.tron
log.set_level_from_tron(self._tron)
self._lkc_file = os.path.join(
os.environ.get('RUNTIME_DIRECTORY', os.path.join('/run', defs.PROG_NAME)), 'last-known-config.pickle'
)
self._loop = GLib.MainLoop()
self._cancellable = Gio.Cancellable()
self._resolver = gutil.NameResolver()
self._controllers = self._load_last_known_config()
self._dbus_iface = None
self._cfg_soak_tmr = gutil.GTimer(self.CONF_STABILITY_SOAK_TIME_SEC, self._on_config_ctrls)
self._sysbus = dasbus.connection.SystemMessageBus()
GLib.unix_signal_add(GLib.PRIORITY_HIGH, signal.SIGINT, self._stop_hdlr) # CTRL-C
GLib.unix_signal_add(GLib.PRIORITY_HIGH, signal.SIGTERM, self._stop_hdlr) # systemctl stop stafd
GLib.unix_signal_add(GLib.PRIORITY_HIGH, signal.SIGHUP, reload_hdlr) # systemctl reload stafd
nvme_options = conf.NvmeOptions()
if not nvme_options.host_iface_supp or not nvme_options.discovery_supp:
logging.warning(
'Kernel does not appear to support all the options needed to run this program. Consider updating to a later kernel version.'
)
# We don't want to apply configuration changes right away.
# Often, multiple changes will occur in a short amount of time (sub-second).
# We want to wait until there are no more changes before applying them
# to the system. The following timer acts as a "soak period". Changes
# will be applied by calling self._on_config_ctrls() at the end of
# the soak period.
self._cfg_soak_tmr.start()
def _release_resources(self):
logging.debug('ServiceABC._release_resources()')
if self._alive():
self._cancellable.cancel()
if self._cfg_soak_tmr is not None:
self._cfg_soak_tmr.kill()
self._controllers.clear()
if self._sysbus:
self._sysbus.disconnect()
self._cfg_soak_tmr = None
self._cancellable = None
self._resolver = None
self._lkc_file = None
self._sysbus = None
def _config_dbus(self, iface_obj, bus_name: str, obj_name: str):
self._dbus_iface = iface_obj
self._sysbus.publish_object(obj_name, iface_obj)
self._sysbus.register_service(bus_name)
@property
def tron(self):
'''@brief Get Trace ON property'''
return self._tron
@tron.setter
def tron(self, value):
'''@brief Set Trace ON property'''
self._tron = value
log.set_level_from_tron(self._tron)
for controller in self._controllers.values():
controller.set_level_from_tron(self._tron)
def run(self):
'''@brief Start the main loop execution'''
try:
self._loop.run()
except Exception as ex: # pylint: disable=broad-except
logging.critical('exception: %s', ex)
self._loop = None
def info(self) -> dict:
'''@brief Get the status info for this object (used for debug)'''
nvme_options = conf.NvmeOptions()
info = conf.SysConf().as_dict()
info['last known config file'] = self._lkc_file
info['config soak timer'] = str(self._cfg_soak_tmr)
info['kernel support.TP8013'] = str(nvme_options.discovery_supp)
info['kernel support.host_iface'] = str(nvme_options.host_iface_supp)
return info
def get_controllers(self) -> dict:
'''@brief return the list of controller objects'''
return self._controllers.values()
def get_controller(
self,
transport: str,
traddr: str,
trsvcid: str,
subsysnqn: str,
host_traddr: str,
host_iface: str,
host_nqn: str,
): # pylint: disable=too-many-arguments
'''@brief get the specified controller object from the list of controllers'''
cid = {
'transport': transport,
'traddr': traddr,
'trsvcid': trsvcid,
'subsysnqn': subsysnqn,
'host-traddr': host_traddr,
'host-iface': host_iface,
'host-nqn': host_nqn,
}
return self._controllers.get(trid.TID(cid))
def _remove_ctrl_from_dict(self, controller, shutdown=False):
tid_to_pop = controller.tid
if not tid_to_pop:
# Being paranoid. This should not happen, but let's say the
# controller object has been purged, but it is somehow still
# listed in self._controllers.
for tid, _controller in self._controllers.items():
if _controller is controller:
tid_to_pop = tid
break
if tid_to_pop:
logging.debug('ServiceABC._remove_ctrl_from_dict()- %s | %s', tid_to_pop, controller.device)
popped = self._controllers.pop(tid_to_pop, None)
if not shutdown and popped is not None and self._cfg_soak_tmr:
self._cfg_soak_tmr.start()
else:
logging.debug('ServiceABC._remove_ctrl_from_dict()- already removed')
def remove_controller(self, controller, success): # pylint: disable=unused-argument
'''@brief remove the specified controller object from the list of controllers
@param controller: the controller object
@param success: whether the disconnect was successful'''
logging.debug('ServiceABC.remove_controller()')
if isinstance(controller, ControllerABC):
self._remove_ctrl_from_dict(controller)
controller.kill()
def _alive(self):
'''It's a good idea to check that this object hasn't been
cancelled (i.e. is still alive) when entering a callback function.
Callback functrions can be invoked after, for example, a process has
been signalled to stop or restart, in which case it makes no sense to
proceed with the callback.
'''
return self._cancellable and not self._cancellable.is_cancelled()
def _cancel(self):
logging.debug('ServiceABC._cancel()')
if self._alive():
self._cancellable.cancel()
for controller in self._controllers.values():
controller.cancel()
def _stop_hdlr(self):
logging.debug('ServiceABC._stop_hdlr()')
sd_notify('STOPPING=1')
self._cancel() # Cancel pending operations
self._dump_last_known_config(self._controllers)
if len(self._controllers) == 0:
GLib.idle_add(self._exit)
else:
self._disconnect_all()
return GLib.SOURCE_REMOVE
def _on_final_disconnect(self, controller, success):
'''Callback invoked after a controller is disconnected.
THIS IS USED DURING PROCESS SHUTDOWN TO WAIT FOR ALL CONTROLLERS TO BE
DISCONNECTED BEFORE EXITING THE PROGRAM. ONLY CALL ON SHUTDOWN!
@param controller: the controller object
@param success: whether the disconnect operation was successful
'''
logging.debug(
'ServiceABC._on_final_disconnect() - %s | %s: disconnect %s',
controller.id,
controller.device,
'succeeded' if success else 'failed',
)
self._remove_ctrl_from_dict(controller, True)
controller.kill()
# When all controllers have disconnected, we can finish the clean up
if len(self._controllers) == 0:
# Defer exit to the next main loop's idle period.
GLib.idle_add(self._exit)
def _exit(self):
logging.debug('ServiceABC._exit()')
self._release_resources()
self._loop.quit()
def _on_config_ctrls(self, *_user_data):
if self._alive():
self._config_ctrls()
return GLib.SOURCE_REMOVE
def _config_ctrls(self):
'''@brief Start controllers configuration.'''
# The configuration file may contain controllers and/or excluded
# controllers with traddr specified as hostname instead of IP address.
# Because of this, we need to remove those excluded elements before
# running name resolution. And we will need to remove excluded
# elements after name resolution is complete (i.e. in the callback
# function _config_ctrls_finish)
logging.debug('ServiceABC._config_ctrls()')
configured_controllers = [
trid.TID(cid) for cid in conf.SvcConf().get_controllers() + conf.NbftConf().get_controllers()
]
configured_controllers = remove_excluded(configured_controllers)
self._resolver.resolve_ctrl_async(self._cancellable, configured_controllers, self._config_ctrls_finish)
def _read_lkc(self):
'''@brief Read Last Known Config from file'''
try:
with open(self._lkc_file, 'rb') as file:
return pickle.load(file)
except (FileNotFoundError, AttributeError, EOFError):
return None
def _write_lkc(self, config):
'''@brief Write Last Known Config to file, and if config is empty
make sure the file is emptied.'''
try:
# Note that if config is empty we still
# want to open/close the file to empty it.
with open(self._lkc_file, 'wb') as file:
if config:
pickle.dump(config, file)
except FileNotFoundError as ex:
logging.error('Unable to save last known config: %s', ex)
@abc.abstractmethod
def _disconnect_all(self):
'''Tell all controller objects to disconnect'''
@abc.abstractmethod
def _keep_connections_on_exit(self):
'''@brief Determine whether connections should remain when the
process exits.
NOTE) This is the base class method used to define the interface.
It must be overloaded by a child class.
'''
@abc.abstractmethod
def _config_ctrls_finish(self, configured_ctrl_list):
'''@brief Finish controllers configuration after hostnames (if any)
have been resolved.
Configuring controllers must be done asynchronously in 2 steps.
In the first step, host names get resolved to find their IP addresses.
Name resolution can take a while, especially when an external name
resolution server is used. Once that step completed, the callback
method _config_ctrls_finish() (i.e. this method), gets invoked to
complete the controller configuration.
NOTE) This is the base class method used to define the interface.
It must be overloaded by a child class.
'''
@abc.abstractmethod
def _load_last_known_config(self):
'''Load last known config from file (if any)'''
@abc.abstractmethod
def _dump_last_known_config(self, controllers):
'''Save last known config to file'''