1
0
Fork 0
nvme-stas/staslib/service.py
Daniel Baumann a8f39c03aa
Merging upstream version 2.3.1:
- properly handles big-endian data in `iputils.py` (Closes: #1057031).

Signed-off-by: Daniel Baumann <daniel@debian.org>
2025-02-16 12:56:36 +01:00

886 lines
38 KiB
Python

# Copyright (c) 2022, Dell Inc. or its subsidiaries. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# See the LICENSE file for details.
#
# This file is part of NVMe STorage Appliance Services (nvme-stas).
#
# Authors: Martin Belanger <Martin.Belanger@dell.com>
#
'''This module defines the base Service object from
which the Staf and the Stac objects are derived.'''
import json
import logging
import pathlib
import subprocess
from itertools import filterfalse
import dasbus.error
import dasbus.client.observer
import dasbus.client.proxy
from gi.repository import GLib
from systemd.daemon import notify as sd_notify
from staslib import avahi, conf, ctrl, defs, gutil, stas, timeparse, trid, udev
# ******************************************************************************
class CtrlTerminator:
'''The Controller Terminator is used to gracefully disconnect from
controllers. All communications with controllers is handled by the kernel.
Once we make a request to the kernel to perform an operation (e.g. connect),
we have to wait for it to complete before requesting another operation. This
is particularly important when we want to disconnect from a controller while
there are pending operations, especially a pending connect.
The "connect" operation is especially unpredictable because all connect
requests are made through the blocking interface "/dev/nvme-fabrics". This
means that once a "connect" operation has been submitted, and depending on
how many connect requests are made concurrently, it can take several seconds
for a connect to be processed by the kernel.
While connect or other operations are being performed, it is possible
that a disconnect may be requested (e.g. someone or something changes the
configuration to remove a controller). Because it is not possible to
terminate a pending operation request, we have to wait for it to complete
before we can issue a disconnect. Failure to do that will result in
operations being performed by the kernel in reverse order. For example,
a disconnect may be executed before a pending connect has had a chance to
complete. And this will result in controllers that are supposed to be
disconnected to be connected without nvme-stas knowing about it.
The Controller Terminator is used when we need to disconnect from a
controller. It will make sure that there are no pending operations before
issuing a disconnect.
'''
DISPOSAL_AUDIT_PERIOD_SEC = 30
def __init__(self):
self._udev = udev.UDEV
self._controllers = list() # The list of controllers to dispose of.
self._audit_tmr = gutil.GTimer(self.DISPOSAL_AUDIT_PERIOD_SEC, self._on_disposal_check)
def dispose(self, controller: ctrl.Controller, on_controller_removed_cb, keep_connection: bool):
'''Invoked by a service (stafd or stacd) to dispose of a controller'''
if controller.all_ops_completed():
logging.debug(
'CtrlTerminator.dispose() - %s | %s: Invoke disconnect()', controller.tid, controller.device
)
controller.disconnect(on_controller_removed_cb, keep_connection)
else:
logging.debug(
'CtrlTerminator.dispose() - %s | %s: Add controller to garbage disposal',
controller.tid,
controller.device,
)
self._controllers.append((controller, keep_connection, on_controller_removed_cb, controller.tid))
self._udev.register_for_action_events('add', self._on_kernel_events)
self._udev.register_for_action_events('remove', self._on_kernel_events)
if self._audit_tmr.time_remaining() == 0:
self._audit_tmr.start()
def pending_disposal(self, tid):
'''Check whether @tid is pending disposal'''
for controller in self._controllers:
if controller.tid == tid:
return True
return False
def info(self):
'''@brief Get info about this object (used for debug)'''
info = {
'terminator.audit timer': str(self._audit_tmr),
}
for controller, _, _, tid in self._controllers:
info[f'terminator.controller.{tid}'] = str(controller.info())
return info
def kill(self):
'''Stop Controller Terminator and release resources.'''
self._audit_tmr.stop()
self._audit_tmr = None
if self._udev:
self._udev.unregister_for_action_events('add', self._on_kernel_events)
self._udev.unregister_for_action_events('remove', self._on_kernel_events)
self._udev = None
for controller, keep_connection, on_controller_removed_cb, _ in self._controllers:
controller.disconnect(on_controller_removed_cb, keep_connection)
self._controllers.clear()
def _on_kernel_events(self, udev_obj):
logging.debug('CtrlTerminator._on_kernel_events() - %s event received', udev_obj.action)
self._disposal_check()
def _on_disposal_check(self, *_user_data):
logging.debug('CtrlTerminator._on_disposal_check()- Periodic audit')
return GLib.SOURCE_REMOVE if self._disposal_check() else GLib.SOURCE_CONTINUE
@staticmethod
def _keep_or_terminate(args):
'''Return False if controller is to be kept. True if controller
was terminated and can be removed from the list.'''
controller, keep_connection, on_controller_removed_cb, tid = args
if controller.all_ops_completed():
logging.debug(
'CtrlTerminator._keep_or_terminate()- %s | %s: Disconnecting controller',
tid,
controller.device,
)
controller.disconnect(on_controller_removed_cb, keep_connection)
return True
return False
def _disposal_check(self):
# Iterate over the list, terminating (disconnecting) those controllers
# that have no pending operations, and remove those controllers from the
# list (only keep controllers that still have operations pending).
self._controllers[:] = filterfalse(self._keep_or_terminate, self._controllers)
disposal_complete = len(self._controllers) == 0
if disposal_complete:
logging.debug('CtrlTerminator._disposal_check() - Disposal complete')
self._audit_tmr.stop()
self._udev.unregister_for_action_events('add', self._on_kernel_events)
self._udev.unregister_for_action_events('remove', self._on_kernel_events)
else:
self._audit_tmr.start() # Restart timer
return disposal_complete
# ******************************************************************************
class Service(stas.ServiceABC):
'''@brief Base class used to manage a STorage Appliance Service'''
def __init__(self, args, default_conf, reload_hdlr):
self._udev = udev.UDEV
self._terminator = CtrlTerminator()
super().__init__(args, default_conf, reload_hdlr)
def _release_resources(self):
logging.debug('Service._release_resources()')
super()._release_resources()
if self._terminator:
self._terminator.kill()
self._udev = None
self._terminator = None
def _disconnect_all(self):
'''Tell all controller objects to disconnect'''
keep_connections = self._keep_connections_on_exit()
controllers = self._controllers.values()
logging.debug(
'Service._stop_hdlr() - Controller count = %s, keep_connections = %s',
len(controllers),
keep_connections,
)
for controller in controllers:
self._terminator.dispose(controller, self._on_final_disconnect, keep_connections)
def info(self) -> dict:
'''@brief Get the status info for this object (used for debug)'''
info = super().info()
if self._terminator:
info.update(self._terminator.info())
return info
@stas.ServiceABC.tron.setter
def tron(self, value):
'''@brief Set Trace ON property'''
super(__class__, self.__class__).tron.__set__(self, value)
# ******************************************************************************
class Stac(Service):
'''STorage Appliance Connector (STAC)'''
CONF_STABILITY_LONG_SOAK_TIME_SEC = 10 # pylint: disable=invalid-name
ADD_EVENT_SOAK_TIME_SEC = 1
def __init__(self, args, dbus):
default_conf = {
('Global', 'tron'): False,
('Global', 'hdr-digest'): False,
('Global', 'data-digest'): False,
('Global', 'kato'): None, # None to let the driver decide the default
('Global', 'nr-io-queues'): None, # None to let the driver decide the default
('Global', 'nr-write-queues'): None, # None to let the driver decide the default
('Global', 'nr-poll-queues'): None, # None to let the driver decide the default
('Global', 'queue-size'): None, # None to let the driver decide the default
('Global', 'reconnect-delay'): None, # None to let the driver decide the default
('Global', 'ctrl-loss-tmo'): None, # None to let the driver decide the default
('Global', 'disable-sqflow'): None, # None to let the driver decide the default
('Global', 'ignore-iface'): False,
('Global', 'ip-family'): (4, 6),
('Controllers', 'controller'): list(),
('Controllers', 'exclude'): list(),
('I/O controller connection management', 'disconnect-scope'): 'only-stas-connections',
('I/O controller connection management', 'disconnect-trtypes'): ['tcp'],
('I/O controller connection management', 'connect-attempts-on-ncc'): 0,
}
super().__init__(args, default_conf, self._reload_hdlr)
self._add_event_soak_tmr = gutil.GTimer(self.ADD_EVENT_SOAK_TIME_SEC, self._on_add_event_soaked)
self._config_connections_audit()
# Create the D-Bus instance.
self._config_dbus(dbus, defs.STACD_DBUS_NAME, defs.STACD_DBUS_PATH)
# Connect to STAF D-Bus interface
self._staf = None
self._staf_watcher = dasbus.client.observer.DBusObserver(self._sysbus, defs.STAFD_DBUS_NAME)
self._staf_watcher.service_available.connect(self._connect_to_staf)
self._staf_watcher.service_unavailable.connect(self._disconnect_from_staf)
self._staf_watcher.connect_once_available()
def _release_resources(self):
logging.debug('Stac._release_resources()')
if self._add_event_soak_tmr:
self._add_event_soak_tmr.kill()
if self._udev:
self._udev.unregister_for_action_events('add', self._on_add_event)
self._destroy_staf_comlink(self._staf_watcher)
if self._staf_watcher is not None:
self._staf_watcher.disconnect()
super()._release_resources()
self._staf = None
self._staf_watcher = None
self._add_event_soak_tmr = None
def _dump_last_known_config(self, controllers):
config = list(controllers.keys())
logging.debug('Stac._dump_last_known_config() - IOC count = %s', len(config))
self._write_lkc(config)
def _load_last_known_config(self):
config = self._read_lkc() or list()
logging.debug('Stac._load_last_known_config() - IOC count = %s', len(config))
controllers = {}
for tid in config:
# Only create Ioc objects if there is already a connection in the kernel
# First, regenerate the TID (in case of soft. upgrade and TID object
# has changed internally)
tid = trid.TID(tid.as_dict())
if udev.UDEV.find_nvme_ioc_device(tid) is not None:
controllers[tid] = ctrl.Ioc(self, tid)
return controllers
def _audit_all_connections(self, tids):
'''A host should only connect to I/O controllers that have been zoned
for that host or a manual "controller" entry exists in stacd.conf.
A host should disconnect from an I/O controller when that I/O controller
is removed from the zone or a "controller" entry is manually removed
from stacd.conf. stacd will audit connections if "disconnect-scope=
all-connections-matching-disconnect-trtypes". stacd will delete any
connection that is not supposed to exist.
'''
logging.debug('Stac._audit_all_connections() - tids = %s', tids)
num_controllers = len(self._controllers)
for tid in tids:
if tid not in self._controllers and not self._terminator.pending_disposal(tid):
self._controllers[tid] = ctrl.Ioc(self, tid)
if num_controllers != len(self._controllers):
self._cfg_soak_tmr.start(self.CONF_STABILITY_SOAK_TIME_SEC)
def _on_add_event(self, udev_obj):
'''@brief This function is called when a "add" event is received from
the kernel for an NVMe device. This is used to trigger an audit and make
sure that the connection to an I/O controller is allowed.
WARNING: There is a race condition with the "add" event from the kernel.
The kernel sends the "add" event a bit early and the sysfs attributes
associated with the nvme object are not always fully initialized.
To workaround this problem we use a soaking timer to give time for the
sysfs attributes to stabilize.
'''
logging.debug('Stac._on_add_event(() - Received "add" event: %s', udev_obj.sys_name)
self._add_event_soak_tmr.start()
def _on_add_event_soaked(self):
'''@brief After the add event has been soaking for ADD_EVENT_SOAK_TIME_SEC
seconds, we can audit the connections.
'''
if self._alive():
svc_conf = conf.SvcConf()
if svc_conf.disconnect_scope == 'all-connections-matching-disconnect-trtypes':
self._audit_all_connections(self._udev.get_nvme_ioc_tids(svc_conf.disconnect_trtypes))
return GLib.SOURCE_REMOVE
def _config_connections_audit(self):
'''This function checks the "disconnect_scope" parameter to determine
whether audits should be performed. Audits are enabled when
"disconnect_scope == all-connections-matching-disconnect-trtypes".
'''
svc_conf = conf.SvcConf()
if svc_conf.disconnect_scope == 'all-connections-matching-disconnect-trtypes':
if not self._udev.is_action_cback_registered('add', self._on_add_event):
self._udev.register_for_action_events('add', self._on_add_event)
self._audit_all_connections(self._udev.get_nvme_ioc_tids(svc_conf.disconnect_trtypes))
else:
self._udev.unregister_for_action_events('add', self._on_add_event)
def _keep_connections_on_exit(self):
'''@brief Determine whether connections should remain when the
process exits.
'''
return True
def _reload_hdlr(self):
'''@brief Reload configuration file. This is triggered by the SIGHUP
signal, which can be sent with "systemctl reload stacd".
'''
if not self._alive():
return GLib.SOURCE_REMOVE
sd_notify('RELOADING=1')
service_cnf = conf.SvcConf()
service_cnf.reload()
self.tron = service_cnf.tron
self._config_connections_audit()
self._cfg_soak_tmr.start(self.CONF_STABILITY_SOAK_TIME_SEC)
for controller in self._controllers.values():
controller.reload_hdlr()
sd_notify('READY=1')
return GLib.SOURCE_CONTINUE
def _get_log_pages_from_stafd(self):
if self._staf:
try:
return json.loads(self._staf.get_all_log_pages(True))
except dasbus.error.DBusError:
pass
return list()
def _config_ctrls_finish(self, configured_ctrl_list: list): # pylint: disable=too-many-locals
'''@param configured_ctrl_list: list of TIDs'''
# This is a callback function, which may be called after the service
# has been signalled to stop. So let's make sure the service is still
# alive and well before continuing.
if not self._alive():
logging.debug('Stac._config_ctrls_finish() - Exiting because service is no longer alive')
return
# Eliminate invalid entries from stacd.conf "controller list".
configured_ctrl_list = [
tid for tid in configured_ctrl_list if '' not in (tid.transport, tid.traddr, tid.trsvcid, tid.subsysnqn)
]
logging.debug('Stac._config_ctrls_finish() - configured_ctrl_list = %s', configured_ctrl_list)
discovered_ctrls = dict()
for staf_data in self._get_log_pages_from_stafd():
host_traddr = staf_data['discovery-controller']['host-traddr']
host_iface = staf_data['discovery-controller']['host-iface']
host_nqn = staf_data['discovery-controller']['host-nqn']
for dlpe in staf_data['log-pages']:
if dlpe.get('subtype') == 'nvme': # eliminate discovery controllers
tid = stas.tid_from_dlpe(dlpe, host_traddr, host_iface, host_nqn)
discovered_ctrls[tid] = dlpe
discovered_ctrl_list = list(discovered_ctrls.keys())
logging.debug('Stac._config_ctrls_finish() - discovered_ctrl_list = %s', discovered_ctrl_list)
controllers = stas.remove_excluded(configured_ctrl_list + discovered_ctrl_list)
controllers = stas.remove_invalid_addresses(controllers)
new_controller_tids = set(controllers)
cur_controller_tids = set(self._controllers.keys())
controllers_to_add = new_controller_tids - cur_controller_tids
controllers_to_del = cur_controller_tids - new_controller_tids
logging.debug('Stac._config_ctrls_finish() - controllers_to_add = %s', list(controllers_to_add))
logging.debug('Stac._config_ctrls_finish() - controllers_to_del = %s', list(controllers_to_del))
svc_conf = conf.SvcConf()
no_disconnect = svc_conf.disconnect_scope == 'no-disconnect'
match_trtypes = svc_conf.disconnect_scope == 'all-connections-matching-disconnect-trtypes'
logging.debug(
'Stac._config_ctrls_finish() - no_disconnect=%s, match_trtypes=%s, svc_conf.disconnect_trtypes=%s',
no_disconnect,
match_trtypes,
svc_conf.disconnect_trtypes,
)
for tid in controllers_to_del:
controller = self._controllers.pop(tid, None)
if controller is not None:
keep_connection = no_disconnect or (match_trtypes and tid.transport not in svc_conf.disconnect_trtypes)
self._terminator.dispose(controller, self.remove_controller, keep_connection)
for tid in controllers_to_add:
self._controllers[tid] = ctrl.Ioc(self, tid)
for tid, controller in self._controllers.items():
if tid in discovered_ctrls:
dlpe = discovered_ctrls[tid]
controller.update_dlpe(dlpe)
self._dump_last_known_config(self._controllers)
def _connect_to_staf(self, _):
'''@brief Hook up DBus signal handlers for signals from stafd.'''
if not self._alive():
return
try:
self._staf = self._sysbus.get_proxy(defs.STAFD_DBUS_NAME, defs.STAFD_DBUS_PATH)
self._staf.log_pages_changed.connect(self._log_pages_changed)
self._staf.dc_removed.connect(self._dc_removed)
self._cfg_soak_tmr.start()
# Make sure timer is set back to its normal value.
self._cfg_soak_tmr.set_timeout(self.CONF_STABILITY_SOAK_TIME_SEC)
logging.debug('Stac._connect_to_staf() - Connected to staf')
except dasbus.error.DBusError:
logging.error('Failed to connect to staf')
def _destroy_staf_comlink(self, watcher): # pylint: disable=unused-argument
if self._staf:
self._staf.log_pages_changed.disconnect(self._log_pages_changed)
self._staf.dc_removed.disconnect(self._dc_removed)
dasbus.client.proxy.disconnect_proxy(self._staf)
self._staf = None
def _disconnect_from_staf(self, watcher):
self._destroy_staf_comlink(watcher)
# When we lose connectivity with stafd, the most logical explanation
# is that stafd restarted. In that case, it may take some time for stafd
# to re-populate its log pages cache. So let's give stafd plenty of time
# to update its log pages cache and send log pages change notifications
# before triggering a stacd re-config. We do this by momentarily
# increasing the config soak timer to a longer period.
if self._cfg_soak_tmr:
self._cfg_soak_tmr.set_timeout(self.CONF_STABILITY_LONG_SOAK_TIME_SEC)
logging.debug('Stac._disconnect_from_staf() - Disconnected from staf')
def _log_pages_changed( # pylint: disable=too-many-arguments
self, transport, traddr, trsvcid, subsysnqn, host_traddr, host_iface, host_nqn, device
):
if not self._alive():
return
logging.debug(
'Stac._log_pages_changed() - transport=%s, traddr=%s, trsvcid=%s, subsysnqn=%s, host_traddr=%s, host_iface=%s, host_nqn=%s, device=%s',
transport,
traddr,
trsvcid,
subsysnqn,
host_traddr,
host_iface,
host_nqn,
device,
)
if self._cfg_soak_tmr:
self._cfg_soak_tmr.start(self.CONF_STABILITY_SOAK_TIME_SEC)
def _dc_removed(self):
if not self._alive():
return
logging.debug('Stac._dc_removed()')
if self._cfg_soak_tmr:
self._cfg_soak_tmr.start(self.CONF_STABILITY_SOAK_TIME_SEC)
# ******************************************************************************
# Only keep legacy FC rule (not even sure this is still in use today, but just to be safe).
UDEV_RULE_OVERRIDE = r'''
ACTION=="change", SUBSYSTEM=="fc", ENV{FC_EVENT}=="nvmediscovery", \
ENV{NVMEFC_HOST_TRADDR}=="*", ENV{NVMEFC_TRADDR}=="*", \
RUN+="%s --no-block start nvmf-connect@--transport=fc\t--traddr=$env{NVMEFC_TRADDR}\t--trsvcid=none\t--host-traddr=$env{NVMEFC_HOST_TRADDR}.service"
'''
def _udev_rule_ctrl(suppress):
'''@brief We override the standard udev rule installed by nvme-cli, i.e.
'/usr/lib/udev/rules.d/70-nvmf-autoconnect.rules', with a copy into
/run/udev/rules.d. The goal is to suppress the udev rule that controls TCP
connections to I/O controllers. This is to avoid race conditions between
stacd and udevd. This is configurable. See "udev-rule" in stacd.conf
for details.
@param enable: When True, override nvme-cli's udev rule and prevent TCP I/O
Controller connections by nvme-cli. When False, allow nvme-cli's udev rule
to make TCP I/O connections.
@type enable: bool
'''
udev_rule_file = pathlib.Path('/run/udev/rules.d', '70-nvmf-autoconnect.rules')
if suppress:
if not udev_rule_file.exists():
pathlib.Path('/run/udev/rules.d').mkdir(parents=True, exist_ok=True)
text = UDEV_RULE_OVERRIDE % (defs.SYSTEMCTL)
udev_rule_file.write_text(text) # pylint: disable=unspecified-encoding
else:
try:
udev_rule_file.unlink()
except FileNotFoundError:
pass
def _is_dlp_changed_aen(udev_obj):
'''Check whether we received a Change of Discovery Log Page AEN'''
nvme_aen = udev_obj.get('NVME_AEN')
if not isinstance(nvme_aen, str):
return False
aen = int(nvme_aen, 16)
if aen != ctrl.DLP_CHANGED:
return False
logging.info(
'%s - Received AEN: Change of Discovery Log Page (%s)',
udev_obj.sys_name,
nvme_aen,
)
return True
def _event_matches(udev_obj, nvme_events):
'''Check whether we received an NVMe Event matching
one of the events listed in @nvme_events'''
nvme_event = udev_obj.get('NVME_EVENT')
if nvme_event not in nvme_events:
return False
logging.info('%s - Received "%s" event', udev_obj.sys_name, nvme_event)
return True
# ******************************************************************************
class Staf(Service):
'''STorage Appliance Finder (STAF)'''
def __init__(self, args, dbus):
default_conf = {
('Global', 'tron'): False,
('Global', 'hdr-digest'): False,
('Global', 'data-digest'): False,
('Global', 'kato'): 30,
('Global', 'queue-size'): None, # None to let the driver decide the default
('Global', 'reconnect-delay'): None, # None to let the driver decide the default
('Global', 'ctrl-loss-tmo'): None, # None to let the driver decide the default
('Global', 'disable-sqflow'): None, # None to let the driver decide the default
('Global', 'persistent-connections'): False, # Deprecated
('Discovery controller connection management', 'persistent-connections'): True,
('Discovery controller connection management', 'zeroconf-connections-persistence'): timeparse.timeparse(
'72hours'
),
('Global', 'ignore-iface'): False,
('Global', 'ip-family'): (4, 6),
('Global', 'pleo'): True,
('Service Discovery', 'zeroconf'): True,
('Controllers', 'controller'): list(),
('Controllers', 'exclude'): list(),
}
super().__init__(args, default_conf, self._reload_hdlr)
self._avahi = avahi.Avahi(self._sysbus, self._avahi_change)
self._avahi.config_stypes(conf.SvcConf().stypes)
# Create the D-Bus instance.
self._config_dbus(dbus, defs.STAFD_DBUS_NAME, defs.STAFD_DBUS_PATH)
self._udev.register_for_action_events('change', self._nvme_cli_interop)
_udev_rule_ctrl(True)
def info(self) -> dict:
'''@brief Get the status info for this object (used for debug)'''
info = super().info()
info['avahi'] = self._avahi.info()
return info
def _release_resources(self):
logging.debug('Staf._release_resources()')
if self._udev:
self._udev.unregister_for_action_events('change', self._nvme_cli_interop)
super()._release_resources()
_udev_rule_ctrl(False)
if self._avahi:
self._avahi.kill()
self._avahi = None
def _dump_last_known_config(self, controllers):
config = {tid: {'log_pages': dc.log_pages(), 'origin': dc.origin} for tid, dc in controllers.items()}
logging.debug('Staf._dump_last_known_config() - DC count = %s', len(config))
self._write_lkc(config)
def _load_last_known_config(self):
config = self._read_lkc() or dict()
logging.debug('Staf._load_last_known_config() - DC count = %s', len(config))
controllers = {}
for tid, data in config.items():
if isinstance(data, dict):
log_pages = data.get('log_pages')
origin = data.get('origin')
else:
log_pages = data
origin = None
# Regenerate the TID (in case of soft. upgrade and TID object
# has changed internally)
tid = trid.TID(tid.as_dict())
controllers[tid] = ctrl.Dc(self, tid, log_pages, origin)
return controllers
def _keep_connections_on_exit(self):
'''@brief Determine whether connections should remain when the
process exits.
'''
return conf.SvcConf().persistent_connections
def _reload_hdlr(self):
'''@brief Reload configuration file. This is triggered by the SIGHUP
signal, which can be sent with "systemctl reload stafd".
'''
if not self._alive():
return GLib.SOURCE_REMOVE
sd_notify('RELOADING=1')
service_cnf = conf.SvcConf()
service_cnf.reload()
self.tron = service_cnf.tron
self._avahi.kick_start() # Make sure Avahi is running
self._avahi.config_stypes(service_cnf.stypes)
self._cfg_soak_tmr.start()
for controller in self._controllers.values():
controller.reload_hdlr()
sd_notify('READY=1')
return GLib.SOURCE_CONTINUE
def is_avahi_reported(self, tid):
'''@brief Return whether @tid is being reported by the Avahi daemon.
@return: True if the Avahi daemon is reporting it, False otherwise.
'''
for cid in self._avahi.get_controllers():
if trid.TID(cid) == tid:
return True
return False
def log_pages_changed(self, controller, device):
'''@brief Function invoked when a controller's cached log pages
have changed. This will emit a D-Bus signal to inform
other applications that the cached log pages have changed.
'''
self._dbus_iface.log_pages_changed.emit(
controller.tid.transport,
controller.tid.traddr,
controller.tid.trsvcid,
controller.tid.subsysnqn,
controller.tid.host_traddr,
controller.tid.host_iface,
controller.tid.host_nqn,
device,
)
def dc_removed(self):
'''@brief Function invoked when a controller's cached log pages
have changed. This will emit a D-Bus signal to inform
other applications that the cached log pages have changed.
'''
self._dbus_iface.dc_removed.emit()
def _referrals(self) -> list:
return [
stas.tid_from_dlpe(
dlpe,
controller.tid.host_traddr,
controller.tid.host_iface,
controller.tid.host_nqn,
)
for controller in self.get_controllers()
for dlpe in controller.referrals()
]
def _config_ctrls_finish(self, configured_ctrl_list: list):
'''@brief Finish discovery controllers configuration after
hostnames (if any) have been resolved. All the logic associated
with discovery controller creation/deletion is found here. To
avoid calling this algorithm repetitively for each and every events,
it is called after a soaking period controlled by self._cfg_soak_tmr.
@param configured_ctrl_list: List of TIDs configured in stafd.conf with
all hostnames resolved to their corresponding IP addresses.
'''
# This is a callback function, which may be called after the service
# has been signalled to stop. So let's make sure the service is still
# alive and well before continuing.
if not self._alive():
logging.debug('Staf._config_ctrls_finish() - Exiting because service is no longer alive')
return
# Eliminate invalid entries from stafd.conf "controller list".
controllers = list()
for tid in configured_ctrl_list:
if '' in (tid.transport, tid.traddr, tid.trsvcid):
continue
if not tid.subsysnqn:
cid = tid.as_dict()
cid['subsysnqn'] = defs.WELL_KNOWN_DISC_NQN
controllers.append(trid.TID(cid))
else:
controllers.append(tid)
configured_ctrl_list = controllers
# Get the Avahi-discovered list and the referrals.
discovered_ctrl_list = [trid.TID(cid) for cid in self._avahi.get_controllers()]
referral_ctrl_list = self._referrals()
logging.debug('Staf._config_ctrls_finish() - configured_ctrl_list = %s', configured_ctrl_list)
logging.debug('Staf._config_ctrls_finish() - discovered_ctrl_list = %s', discovered_ctrl_list)
logging.debug('Staf._config_ctrls_finish() - referral_ctrl_list = %s', referral_ctrl_list)
all_ctrls = configured_ctrl_list + discovered_ctrl_list + referral_ctrl_list
controllers = stas.remove_excluded(all_ctrls)
controllers = stas.remove_invalid_addresses(controllers)
new_controller_tids = set(controllers)
cur_controller_tids = set(self._controllers.keys())
controllers_to_add = new_controller_tids - cur_controller_tids
controllers_to_del = cur_controller_tids - new_controller_tids
# Make a list list of excluded and invalid controllers
must_remove_list = set(all_ctrls) - new_controller_tids
# Find "discovered" controllers that have not responded
# in a while and add them to controllers that must be removed.
must_remove_list.update({tid for tid, controller in self._controllers.items() if controller.is_unresponsive()})
# Do not remove Avahi-discovered DCs from controllers_to_del unless
# marked as "must-be-removed" (must_remove_list). This is to account for
# the case where mDNS discovery is momentarily disabled (e.g. Avahi
# daemon restarts). We don't want to delete connections because of
# temporary mDNS impairments. Removal of Avahi-discovered DCs will be
# handled differently and only if the connection cannot be established
# for a long period of time.
logging.debug('Staf._config_ctrls_finish() - must_remove_list = %s', list(must_remove_list))
controllers_to_del = {
tid
for tid in controllers_to_del
if tid in must_remove_list or self._controllers[tid].origin != 'discovered'
}
logging.debug('Staf._config_ctrls_finish() - controllers_to_add = %s', list(controllers_to_add))
logging.debug('Staf._config_ctrls_finish() - controllers_to_del = %s', list(controllers_to_del))
# Delete controllers
for tid in controllers_to_del:
controller = self._controllers.pop(tid, None)
if controller is not None:
self._terminator.dispose(controller, self.remove_controller, keep_connection=False)
if len(controllers_to_del) > 0:
self.dc_removed() # Let other apps (e.g. stacd) know that discovery controllers were removed.
# Add controllers
for tid in controllers_to_add:
self._controllers[tid] = ctrl.Dc(self, tid)
# Update "origin" on all DC objects
for tid, controller in self._controllers.items():
origin = (
'configured'
if tid in configured_ctrl_list
else 'referral'
if tid in referral_ctrl_list
else 'discovered'
if tid in discovered_ctrl_list
else None
)
if origin is not None:
controller.origin = origin
self._dump_last_known_config(self._controllers)
def _avahi_change(self):
if self._alive() and self._cfg_soak_tmr is not None:
self._cfg_soak_tmr.start()
def controller_unresponsive(self, tid):
'''@brief Function invoked when a controller becomes unresponsive and
needs to be removed.
'''
if self._alive() and self._cfg_soak_tmr is not None:
logging.debug('Staf.controller_unresponsive() - tid = %s', tid)
self._cfg_soak_tmr.start()
def referrals_changed(self):
'''@brief Function invoked when a controller's cached referrals
have changed.
'''
if self._alive() and self._cfg_soak_tmr is not None:
logging.debug('Staf.referrals_changed()')
self._cfg_soak_tmr.start()
def _nvme_cli_interop(self, udev_obj):
'''Interoperability with nvme-cli:
stafd will invoke nvme-cli's connect-all the same way nvme-cli's udev
rules would do normally. This is for the case where a user has an hybrid
configuration where some controllers are configured through nvme-stas
and others through nvme-cli. This is not an optimal configuration. It
would be better if everything was configured through nvme-stas, however
support for hybrid configuration was requested by users (actually only
one user requested this).'''
# Looking for 'change' events only
if udev_obj.action != 'change':
return
# Looking for events from Discovery Controllers only
if not udev.Udev.is_dc_device(udev_obj):
return
# Is the controller already being monitored by stafd?
for controller in self.get_controllers():
if controller.device == udev_obj.sys_name:
return
# Did we receive a Change of DLP AEN or an NVME Event indicating 'connect' or 'rediscover'?
if not _is_dlp_changed_aen(udev_obj) and not _event_matches(udev_obj, ('rediscover',)):
return
# We need to invoke "nvme connect-all" using nvme-cli's nvmf-connect@.service
# NOTE: Eventually, we'll be able to drop --host-traddr and --host-iface from
# the parameters passed to nvmf-connect@.service. A fix was added to connect-all
# to infer these two values from the device used to connect to the DC.
# Ref: https://github.com/linux-nvme/nvme-cli/pull/1812
cnf = [
('--device', udev_obj.sys_name),
('--host-traddr', udev_obj.properties.get('NVME_HOST_TRADDR', None)),
('--host-iface', udev_obj.properties.get('NVME_HOST_IFACE', None)),
]
# Use systemd's escaped syntax (i.e. '=' is replaced by '\x3d', '\t' by '\x09', etc.
options = r'\x09'.join(
[fr'{option}\x3d{value}' for option, value in cnf if value not in (None, 'none', 'None', '')]
)
logging.debug('Invoking: systemctl restart nvmf-connect@%s.service', options)
cmd = [defs.SYSTEMCTL, '--quiet', '--no-block', 'restart', fr'nvmf-connect@{options}.service']
subprocess.run(cmd, check=False)