- properly handles big-endian data in `iputils.py` (Closes: #1057031). Signed-off-by: Daniel Baumann <daniel@debian.org>
604 lines
22 KiB
Python
604 lines
22 KiB
Python
# Copyright (c) 2021, Dell Inc. or its subsidiaries. All rights reserved.
|
|
# SPDX-License-Identifier: Apache-2.0
|
|
# See the LICENSE file for details.
|
|
#
|
|
# This file is part of NVMe STorage Appliance Services (nvme-stas).
|
|
#
|
|
# Authors: Martin Belanger <Martin.Belanger@dell.com>
|
|
#
|
|
'''Library for staf/stac. You will find here common code for stafd and stacd
|
|
including the Abstract Base Classes (ABC) for Controllers and Services'''
|
|
|
|
import os
|
|
import sys
|
|
import abc
|
|
import signal
|
|
import pickle
|
|
import logging
|
|
import dasbus.connection
|
|
from gi.repository import Gio, GLib
|
|
from systemd.daemon import notify as sd_notify
|
|
from staslib import conf, defs, gutil, iputil, log, trid
|
|
|
|
try:
|
|
# Python 3.9 or later
|
|
# This is the preferred way, but may not be available before Python 3.9
|
|
from importlib.resources import files
|
|
except ImportError:
|
|
try:
|
|
# Pre Python 3.9 backport of importlib.resources (if installed)
|
|
from importlib_resources import files
|
|
except ImportError:
|
|
# Less efficient, but available on older versions of Python
|
|
import pkg_resources
|
|
|
|
def load_idl(idl_fname):
|
|
'''@brief Load D-Bus Interface Description Language File'''
|
|
try:
|
|
return pkg_resources.resource_string('staslib', idl_fname).decode()
|
|
except (FileNotFoundError, AttributeError):
|
|
pass
|
|
|
|
return ''
|
|
|
|
else:
|
|
|
|
def load_idl(idl_fname):
|
|
'''@brief Load D-Bus Interface Description Language File'''
|
|
try:
|
|
return files('staslib').joinpath(idl_fname).read_text() # pylint: disable=unspecified-encoding
|
|
except FileNotFoundError:
|
|
pass
|
|
|
|
return ''
|
|
|
|
else:
|
|
|
|
def load_idl(idl_fname):
|
|
'''@brief Load D-Bus Interface Description Language File'''
|
|
try:
|
|
return files('staslib').joinpath(idl_fname).read_text() # pylint: disable=unspecified-encoding
|
|
except FileNotFoundError:
|
|
pass
|
|
|
|
return ''
|
|
|
|
|
|
# ******************************************************************************
|
|
def check_if_allowed_to_continue():
|
|
'''@brief Let's perform some basic checks before going too far. There are
|
|
a few pre-requisites that need to be met before this program
|
|
is allowed to proceed:
|
|
|
|
1) The program needs to have root privileges
|
|
2) The nvme-tcp kernel module must be loaded
|
|
|
|
@return This function will only return if all conditions listed above
|
|
are met. Otherwise the program exits.
|
|
'''
|
|
# 1) Check root privileges
|
|
if os.geteuid() != 0:
|
|
sys.exit(f'Permission denied. You need root privileges to run {defs.PROG_NAME}.')
|
|
|
|
# 2) Check that nvme-tcp kernel module is running
|
|
if not os.path.exists('/dev/nvme-fabrics'):
|
|
# There's no point going any further if the kernel module hasn't been loaded
|
|
sys.exit('Fatal error: missing nvme-tcp kernel module')
|
|
|
|
|
|
# ******************************************************************************
|
|
def remove_invalid_addresses(controllers: list):
|
|
'''@brief Remove controllers with invalid addresses from the list of controllers.
|
|
@param controllers: List of TIDs
|
|
'''
|
|
service_conf = conf.SvcConf()
|
|
valid_controllers = list()
|
|
for controller in controllers:
|
|
if controller.transport in ('tcp', 'rdma'):
|
|
# Let's make sure that traddr is
|
|
# syntactically a valid IPv4 or IPv6 address.
|
|
ip = iputil.get_ipaddress_obj(controller.traddr)
|
|
if ip is None:
|
|
logging.warning('%s IP address is not valid', controller)
|
|
continue
|
|
|
|
# Let's make sure the address family is enabled.
|
|
if ip.version not in service_conf.ip_family:
|
|
logging.debug(
|
|
'%s ignored because IPv%s is disabled in %s',
|
|
controller,
|
|
ip.version,
|
|
service_conf.conf_file,
|
|
)
|
|
continue
|
|
|
|
valid_controllers.append(controller)
|
|
|
|
elif controller.transport in ('fc', 'loop'):
|
|
# At some point, need to validate FC addresses as well...
|
|
valid_controllers.append(controller)
|
|
|
|
else:
|
|
logging.warning('Invalid transport %s', controller.transport)
|
|
|
|
return valid_controllers
|
|
|
|
|
|
# ******************************************************************************
|
|
def tid_from_dlpe(dlpe, host_traddr, host_iface, host_nqn):
|
|
'''@brief Take a Discovery Log Page Entry and return a Controller ID as a dict.'''
|
|
cid = {
|
|
'transport': dlpe['trtype'],
|
|
'traddr': dlpe['traddr'],
|
|
'trsvcid': dlpe['trsvcid'],
|
|
'host-traddr': host_traddr,
|
|
'host-iface': host_iface,
|
|
'subsysnqn': dlpe['subnqn'],
|
|
}
|
|
if host_nqn:
|
|
cid['host-nqn'] = host_nqn
|
|
return trid.TID(cid)
|
|
|
|
|
|
# ******************************************************************************
|
|
def _excluded(excluded_ctrl_list, controller: dict):
|
|
'''@brief Check if @controller is excluded.'''
|
|
for excluded_ctrl in excluded_ctrl_list:
|
|
test_results = [val == controller.get(key, None) for key, val in excluded_ctrl.items()]
|
|
if all(test_results):
|
|
return True
|
|
return False
|
|
|
|
|
|
# ******************************************************************************
|
|
def remove_excluded(controllers: list):
|
|
'''@brief Remove excluded controllers from the list of controllers.
|
|
@param controllers: List of TIDs
|
|
'''
|
|
excluded_ctrl_list = conf.SvcConf().get_excluded()
|
|
if excluded_ctrl_list:
|
|
logging.debug('remove_excluded() - excluded_ctrl_list = %s', excluded_ctrl_list)
|
|
controllers = [
|
|
controller for controller in controllers if not _excluded(excluded_ctrl_list, controller.as_dict())
|
|
]
|
|
return controllers
|
|
|
|
|
|
# ******************************************************************************
|
|
class ControllerABC(abc.ABC):
|
|
'''@brief Base class used to manage the connection to a controller.'''
|
|
|
|
CONNECT_RETRY_PERIOD_SEC = 60
|
|
FAST_CONNECT_RETRY_PERIOD_SEC = 3
|
|
|
|
def __init__(self, tid: trid.TID, service, discovery_ctrl: bool = False):
|
|
self._tid = tid
|
|
self._serv = service # Refers to the parent service (either Staf or Stac)
|
|
self.set_level_from_tron(self._serv.tron)
|
|
self._cancellable = Gio.Cancellable()
|
|
self._connect_attempts = 0
|
|
self._retry_connect_tmr = gutil.GTimer(self.CONNECT_RETRY_PERIOD_SEC, self._on_try_to_connect)
|
|
self._discovery_ctrl = discovery_ctrl
|
|
self._try_to_connect_deferred = gutil.Deferred(self._try_to_connect)
|
|
self._try_to_connect_deferred.schedule()
|
|
|
|
def _release_resources(self):
|
|
# Remove pending deferred from main loop
|
|
if self._try_to_connect_deferred:
|
|
self._try_to_connect_deferred.cancel()
|
|
|
|
if self._retry_connect_tmr is not None:
|
|
self._retry_connect_tmr.kill()
|
|
|
|
if self._alive():
|
|
self._cancellable.cancel()
|
|
|
|
self._tid = None
|
|
self._serv = None
|
|
self._cancellable = None
|
|
self._retry_connect_tmr = None
|
|
self._try_to_connect_deferred = None
|
|
|
|
@property
|
|
def id(self) -> str:
|
|
'''@brief Return the Transport ID as a printable string'''
|
|
return str(self.tid)
|
|
|
|
@property
|
|
def tid(self):
|
|
'''@brief Return the Transport ID object'''
|
|
return self._tid
|
|
|
|
def controller_id_dict(self) -> dict:
|
|
'''@brief return the controller ID as a dict.'''
|
|
return {k: str(v) for k, v in self.tid.as_dict().items()}
|
|
|
|
def details(self) -> dict:
|
|
'''@brief return detailed debug info about this controller'''
|
|
return self.info()
|
|
|
|
def info(self) -> dict:
|
|
'''@brief Get the controller info for this object'''
|
|
info = self.controller_id_dict()
|
|
info['connect attempts'] = str(self._connect_attempts)
|
|
info['retry connect timer'] = str(self._retry_connect_tmr)
|
|
return info
|
|
|
|
def cancel(self):
|
|
'''@brief Used to cancel pending operations.'''
|
|
if self._alive():
|
|
logging.debug('ControllerABC.cancel() - %s', self.id)
|
|
self._cancellable.cancel()
|
|
|
|
def kill(self):
|
|
'''@brief Used to release all resources associated with this object.'''
|
|
logging.debug('ControllerABC.kill() - %s', self.id)
|
|
self._release_resources()
|
|
|
|
def _alive(self):
|
|
'''There may be race condition where a queued event gets processed
|
|
after the object is no longer configured (i.e. alive). This method
|
|
can be used by callback functions to make sure the object is still
|
|
alive before processing further.
|
|
'''
|
|
return self._cancellable and not self._cancellable.is_cancelled()
|
|
|
|
def _on_try_to_connect(self):
|
|
if self._alive():
|
|
self._try_to_connect_deferred.schedule()
|
|
return GLib.SOURCE_REMOVE
|
|
|
|
def _should_try_to_reconnect(self):
|
|
return True
|
|
|
|
def _try_to_connect(self):
|
|
if not self._alive():
|
|
return GLib.SOURCE_REMOVE
|
|
|
|
# This is a deferred function call. Make sure
|
|
# the source of the deferred is still good.
|
|
source = GLib.main_current_source()
|
|
if source and source.is_destroyed():
|
|
return GLib.SOURCE_REMOVE
|
|
|
|
self._connect_attempts += 1
|
|
|
|
self._do_connect()
|
|
|
|
return GLib.SOURCE_REMOVE
|
|
|
|
@abc.abstractmethod
|
|
def set_level_from_tron(self, tron):
|
|
'''Set log level based on TRON'''
|
|
|
|
@abc.abstractmethod
|
|
def _do_connect(self):
|
|
'''Perform connection'''
|
|
|
|
@abc.abstractmethod
|
|
def _on_aen(self, aen: int):
|
|
'''Event handler when an AEN is received'''
|
|
|
|
@abc.abstractmethod
|
|
def _on_nvme_event(self, nvme_event):
|
|
'''Event handler when an nvme_event is received'''
|
|
|
|
@abc.abstractmethod
|
|
def _on_ctrl_removed(self, udev_obj):
|
|
'''Called when the associated nvme device (/dev/nvmeX) is removed
|
|
from the system by the kernel.
|
|
'''
|
|
|
|
@abc.abstractmethod
|
|
def _find_existing_connection(self):
|
|
'''Check if there is an existing connection that matches this Controller's TID'''
|
|
|
|
@abc.abstractmethod
|
|
def all_ops_completed(self) -> bool:
|
|
'''@brief Returns True if all operations have completed. False otherwise.'''
|
|
|
|
@abc.abstractmethod
|
|
def connected(self):
|
|
'''@brief Return whether a connection is established'''
|
|
|
|
@abc.abstractmethod
|
|
def disconnect(self, disconnected_cb, keep_connection):
|
|
'''@brief Issue an asynchronous disconnect command to a Controller.
|
|
Once the async command has completed, the callback 'disconnected_cb'
|
|
will be invoked. If a controller is already disconnected, then the
|
|
callback will be added to the main loop's next idle slot to be executed
|
|
ASAP.
|
|
'''
|
|
|
|
@abc.abstractmethod
|
|
def reload_hdlr(self):
|
|
'''@brief This is called when a "reload" signal is received.'''
|
|
|
|
|
|
# ******************************************************************************
|
|
class ServiceABC(abc.ABC): # pylint: disable=too-many-instance-attributes
|
|
'''@brief Base class used to manage a STorage Appliance Service'''
|
|
|
|
CONF_STABILITY_SOAK_TIME_SEC = 1.5
|
|
|
|
def __init__(self, args, default_conf, reload_hdlr):
|
|
service_conf = conf.SvcConf(default_conf=default_conf)
|
|
service_conf.set_conf_file(args.conf_file) # reload configuration
|
|
self._tron = args.tron or service_conf.tron
|
|
log.set_level_from_tron(self._tron)
|
|
|
|
self._lkc_file = os.path.join(
|
|
os.environ.get('RUNTIME_DIRECTORY', os.path.join('/run', defs.PROG_NAME)), 'last-known-config.pickle'
|
|
)
|
|
self._loop = GLib.MainLoop()
|
|
self._cancellable = Gio.Cancellable()
|
|
self._resolver = gutil.NameResolver()
|
|
self._controllers = self._load_last_known_config()
|
|
self._dbus_iface = None
|
|
self._cfg_soak_tmr = gutil.GTimer(self.CONF_STABILITY_SOAK_TIME_SEC, self._on_config_ctrls)
|
|
self._sysbus = dasbus.connection.SystemMessageBus()
|
|
|
|
GLib.unix_signal_add(GLib.PRIORITY_HIGH, signal.SIGINT, self._stop_hdlr) # CTRL-C
|
|
GLib.unix_signal_add(GLib.PRIORITY_HIGH, signal.SIGTERM, self._stop_hdlr) # systemctl stop stafd
|
|
GLib.unix_signal_add(GLib.PRIORITY_HIGH, signal.SIGHUP, reload_hdlr) # systemctl reload stafd
|
|
|
|
nvme_options = conf.NvmeOptions()
|
|
if not nvme_options.host_iface_supp or not nvme_options.discovery_supp:
|
|
logging.warning(
|
|
'Kernel does not appear to support all the options needed to run this program. Consider updating to a later kernel version.'
|
|
)
|
|
|
|
# We don't want to apply configuration changes right away.
|
|
# Often, multiple changes will occur in a short amount of time (sub-second).
|
|
# We want to wait until there are no more changes before applying them
|
|
# to the system. The following timer acts as a "soak period". Changes
|
|
# will be applied by calling self._on_config_ctrls() at the end of
|
|
# the soak period.
|
|
self._cfg_soak_tmr.start()
|
|
|
|
def _release_resources(self):
|
|
logging.debug('ServiceABC._release_resources()')
|
|
|
|
if self._alive():
|
|
self._cancellable.cancel()
|
|
|
|
if self._cfg_soak_tmr is not None:
|
|
self._cfg_soak_tmr.kill()
|
|
|
|
self._controllers.clear()
|
|
|
|
if self._sysbus:
|
|
self._sysbus.disconnect()
|
|
|
|
self._cfg_soak_tmr = None
|
|
self._cancellable = None
|
|
self._resolver = None
|
|
self._lkc_file = None
|
|
self._sysbus = None
|
|
|
|
def _config_dbus(self, iface_obj, bus_name: str, obj_name: str):
|
|
self._dbus_iface = iface_obj
|
|
self._sysbus.publish_object(obj_name, iface_obj)
|
|
self._sysbus.register_service(bus_name)
|
|
|
|
@property
|
|
def tron(self):
|
|
'''@brief Get Trace ON property'''
|
|
return self._tron
|
|
|
|
@tron.setter
|
|
def tron(self, value):
|
|
'''@brief Set Trace ON property'''
|
|
self._tron = value
|
|
log.set_level_from_tron(self._tron)
|
|
for controller in self._controllers.values():
|
|
controller.set_level_from_tron(self._tron)
|
|
|
|
def run(self):
|
|
'''@brief Start the main loop execution'''
|
|
try:
|
|
self._loop.run()
|
|
except Exception as ex: # pylint: disable=broad-except
|
|
logging.critical('exception: %s', ex)
|
|
|
|
self._loop = None
|
|
|
|
def info(self) -> dict:
|
|
'''@brief Get the status info for this object (used for debug)'''
|
|
nvme_options = conf.NvmeOptions()
|
|
info = conf.SysConf().as_dict()
|
|
info['last known config file'] = self._lkc_file
|
|
info['config soak timer'] = str(self._cfg_soak_tmr)
|
|
info['kernel support.TP8013'] = str(nvme_options.discovery_supp)
|
|
info['kernel support.host_iface'] = str(nvme_options.host_iface_supp)
|
|
return info
|
|
|
|
def get_controllers(self) -> dict:
|
|
'''@brief return the list of controller objects'''
|
|
return self._controllers.values()
|
|
|
|
def get_controller(
|
|
self,
|
|
transport: str,
|
|
traddr: str,
|
|
trsvcid: str,
|
|
subsysnqn: str,
|
|
host_traddr: str,
|
|
host_iface: str,
|
|
host_nqn: str,
|
|
): # pylint: disable=too-many-arguments
|
|
'''@brief get the specified controller object from the list of controllers'''
|
|
cid = {
|
|
'transport': transport,
|
|
'traddr': traddr,
|
|
'trsvcid': trsvcid,
|
|
'subsysnqn': subsysnqn,
|
|
'host-traddr': host_traddr,
|
|
'host-iface': host_iface,
|
|
'host-nqn': host_nqn,
|
|
}
|
|
return self._controllers.get(trid.TID(cid))
|
|
|
|
def _remove_ctrl_from_dict(self, controller, shutdown=False):
|
|
tid_to_pop = controller.tid
|
|
if not tid_to_pop:
|
|
# Being paranoid. This should not happen, but let's say the
|
|
# controller object has been purged, but it is somehow still
|
|
# listed in self._controllers.
|
|
for tid, _controller in self._controllers.items():
|
|
if _controller is controller:
|
|
tid_to_pop = tid
|
|
break
|
|
|
|
if tid_to_pop:
|
|
logging.debug('ServiceABC._remove_ctrl_from_dict()- %s | %s', tid_to_pop, controller.device)
|
|
popped = self._controllers.pop(tid_to_pop, None)
|
|
if not shutdown and popped is not None and self._cfg_soak_tmr:
|
|
self._cfg_soak_tmr.start()
|
|
else:
|
|
logging.debug('ServiceABC._remove_ctrl_from_dict()- already removed')
|
|
|
|
def remove_controller(self, controller, success): # pylint: disable=unused-argument
|
|
'''@brief remove the specified controller object from the list of controllers
|
|
@param controller: the controller object
|
|
@param success: whether the disconnect was successful'''
|
|
logging.debug('ServiceABC.remove_controller()')
|
|
if isinstance(controller, ControllerABC):
|
|
self._remove_ctrl_from_dict(controller)
|
|
controller.kill()
|
|
|
|
def _alive(self):
|
|
'''It's a good idea to check that this object hasn't been
|
|
cancelled (i.e. is still alive) when entering a callback function.
|
|
Callback functrions can be invoked after, for example, a process has
|
|
been signalled to stop or restart, in which case it makes no sense to
|
|
proceed with the callback.
|
|
'''
|
|
return self._cancellable and not self._cancellable.is_cancelled()
|
|
|
|
def _cancel(self):
|
|
logging.debug('ServiceABC._cancel()')
|
|
if self._alive():
|
|
self._cancellable.cancel()
|
|
|
|
for controller in self._controllers.values():
|
|
controller.cancel()
|
|
|
|
def _stop_hdlr(self):
|
|
logging.debug('ServiceABC._stop_hdlr()')
|
|
sd_notify('STOPPING=1')
|
|
|
|
self._cancel() # Cancel pending operations
|
|
|
|
self._dump_last_known_config(self._controllers)
|
|
|
|
if len(self._controllers) == 0:
|
|
GLib.idle_add(self._exit)
|
|
else:
|
|
self._disconnect_all()
|
|
|
|
return GLib.SOURCE_REMOVE
|
|
|
|
def _on_final_disconnect(self, controller, success):
|
|
'''Callback invoked after a controller is disconnected.
|
|
THIS IS USED DURING PROCESS SHUTDOWN TO WAIT FOR ALL CONTROLLERS TO BE
|
|
DISCONNECTED BEFORE EXITING THE PROGRAM. ONLY CALL ON SHUTDOWN!
|
|
@param controller: the controller object
|
|
@param success: whether the disconnect operation was successful
|
|
'''
|
|
logging.debug(
|
|
'ServiceABC._on_final_disconnect() - %s | %s: disconnect %s',
|
|
controller.id,
|
|
controller.device,
|
|
'succeeded' if success else 'failed',
|
|
)
|
|
|
|
self._remove_ctrl_from_dict(controller, True)
|
|
controller.kill()
|
|
|
|
# When all controllers have disconnected, we can finish the clean up
|
|
if len(self._controllers) == 0:
|
|
# Defer exit to the next main loop's idle period.
|
|
GLib.idle_add(self._exit)
|
|
|
|
def _exit(self):
|
|
logging.debug('ServiceABC._exit()')
|
|
self._release_resources()
|
|
self._loop.quit()
|
|
|
|
def _on_config_ctrls(self, *_user_data):
|
|
if self._alive():
|
|
self._config_ctrls()
|
|
return GLib.SOURCE_REMOVE
|
|
|
|
def _config_ctrls(self):
|
|
'''@brief Start controllers configuration.'''
|
|
# The configuration file may contain controllers and/or excluded
|
|
# controllers with traddr specified as hostname instead of IP address.
|
|
# Because of this, we need to remove those excluded elements before
|
|
# running name resolution. And we will need to remove excluded
|
|
# elements after name resolution is complete (i.e. in the callback
|
|
# function _config_ctrls_finish)
|
|
logging.debug('ServiceABC._config_ctrls()')
|
|
configured_controllers = [
|
|
trid.TID(cid) for cid in conf.SvcConf().get_controllers() + conf.NbftConf().get_controllers()
|
|
]
|
|
configured_controllers = remove_excluded(configured_controllers)
|
|
self._resolver.resolve_ctrl_async(self._cancellable, configured_controllers, self._config_ctrls_finish)
|
|
|
|
def _read_lkc(self):
|
|
'''@brief Read Last Known Config from file'''
|
|
try:
|
|
with open(self._lkc_file, 'rb') as file:
|
|
return pickle.load(file)
|
|
except (FileNotFoundError, AttributeError, EOFError):
|
|
return None
|
|
|
|
def _write_lkc(self, config):
|
|
'''@brief Write Last Known Config to file, and if config is empty
|
|
make sure the file is emptied.'''
|
|
try:
|
|
# Note that if config is empty we still
|
|
# want to open/close the file to empty it.
|
|
with open(self._lkc_file, 'wb') as file:
|
|
if config:
|
|
pickle.dump(config, file)
|
|
except FileNotFoundError as ex:
|
|
logging.error('Unable to save last known config: %s', ex)
|
|
|
|
@abc.abstractmethod
|
|
def _disconnect_all(self):
|
|
'''Tell all controller objects to disconnect'''
|
|
|
|
@abc.abstractmethod
|
|
def _keep_connections_on_exit(self):
|
|
'''@brief Determine whether connections should remain when the
|
|
process exits.
|
|
|
|
NOTE) This is the base class method used to define the interface.
|
|
It must be overloaded by a child class.
|
|
'''
|
|
|
|
@abc.abstractmethod
|
|
def _config_ctrls_finish(self, configured_ctrl_list):
|
|
'''@brief Finish controllers configuration after hostnames (if any)
|
|
have been resolved.
|
|
|
|
Configuring controllers must be done asynchronously in 2 steps.
|
|
In the first step, host names get resolved to find their IP addresses.
|
|
Name resolution can take a while, especially when an external name
|
|
resolution server is used. Once that step completed, the callback
|
|
method _config_ctrls_finish() (i.e. this method), gets invoked to
|
|
complete the controller configuration.
|
|
|
|
NOTE) This is the base class method used to define the interface.
|
|
It must be overloaded by a child class.
|
|
'''
|
|
|
|
@abc.abstractmethod
|
|
def _load_last_known_config(self):
|
|
'''Load last known config from file (if any)'''
|
|
|
|
@abc.abstractmethod
|
|
def _dump_last_known_config(self, controllers):
|
|
'''Save last known config to file'''
|