1
0
Fork 0

Merging upstream version 2.3~rc2.

Signed-off-by: Daniel Baumann <daniel@debian.org>
This commit is contained in:
Daniel Baumann 2025-02-16 12:54:46 +01:00
parent fb45a5d8b7
commit 91f6ff82fe
Signed by: daniel
GPG key ID: FBB4F0E80A80222F
6 changed files with 296 additions and 95 deletions

View file

@ -48,14 +48,14 @@ jobs:
# https://github.com/docker/metadata-action
- name: Extract Docker metadata
id: meta
uses: docker/metadata-action@2c0bd771b40637d97bf205cbccdd294a32112176
uses: docker/metadata-action@818d4b7b91585d195f67373fd9cb0332e31a7175
with:
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
# Build and push Docker image with Buildx (don't push on PR)
# https://github.com/docker/build-push-action
- name: Build and push Docker image
uses: docker/build-push-action@44ea916f6c540f9302d50c2b1e5a8dc071f15cdf
uses: docker/build-push-action@2eb1c1961a95fc15694676618e422e8ba1d63825
with:
context: .
push: ${{ github.event_name != 'pull_request' }}

View file

@ -5,11 +5,14 @@
New features:
- Support for nBFT (NVMe-oF Boot Table).
- The Avahi driver will now verify reachability of services discovered through mDNS to make sure all discovered IP addresses can be connected to. This avoids invoking the NVMe kernel driver with invalid IP addresses and getting error messages in the syslog.
- The Avahi driver will now print an error message if the same IP address is found on multiple interfaces. This indicates a misconfiguration of the network.
Bug fixes:
* For TCP transport: use `sysfs` controller `src_addr` attribute when matching to a configured "candidate" controller. This is to determine when an existing controller (located under the `sysfs`) can be reused instead of creating a new one. This avoids creating unnecessary duplicate connections.
* Udev event handling: use `systemctl restart` instead of `systemctl start`. There is a small chance that a `start` operation has not completed when a new `start` is required. Issuing a `start` while a `start` is being performed has no effect. However, a `restart` will be handled properly.
* `stafd`: Do not delete and recreate DC objects on kernel events indicating that an nvme device associated to a discovery controller was removed by the kernel. This was done to kick start the reconnect process, but was also causing the DLPE (Discovery Log Page Entries) cache to be lost. This could potentially result in `stacd` disconnecting from I/O controllers. Instead, keep the existing DC object which contains a valid DLPE cache and simply restart the "retry to connect" timer. This way the DLPE cache is maintained throughout the reconnect to DC process.
## Changes with release 2.2.3

View file

@ -9,7 +9,7 @@
project(
'nvme-stas',
meson_version: '>= 0.53.0',
version: '2.3-rc1',
version: '2.3-rc2',
license: 'Apache-2.0',
default_options: [
'buildtype=release',

View file

@ -18,7 +18,7 @@ import dasbus.connection
import dasbus.client.proxy
import dasbus.client.observer
from gi.repository import GLib
from staslib import defs, conf, gutil
from staslib import defs, conf, gutil, iputil
def _txt2dict(txt: list):
@ -54,6 +54,141 @@ def _proto2trans(protocol):
return None
def mk_service_key(interface, protocol, name, stype, domain):
'''Return a tuple used as a service key (unique identifier)'''
return (interface, protocol, name, stype, domain)
def fmt_service_str(interface, protocol, name, stype, domain, flags): # pylint: disable=too-many-arguments
'''Return service identifier as a string'''
return (
f'interface={interface}:{(socket.if_indextoname(interface) + ","):<9} '
f'protocol={Avahi.protocol_as_string(protocol)}, '
f'stype={stype}, '
f'domain={domain}, '
f'flags={flags}:{(Avahi.result_flags_as_string(flags) + ","):<12} '
f'name={name}'
)
# ******************************************************************************
class Service: # pylint: disable=too-many-instance-attributes
'''Object used to keep track of the services discovered from the avahi-daemon'''
interface_name = property(lambda self: self._interface_name)
interface = property(lambda self: self._interface_id)
ip_family = property(lambda self: self._ip_family)
reachable = property(lambda self: self._reachable)
protocol = property(lambda self: self._protocol_id)
key_str = property(lambda self: self._key_str)
domain = property(lambda self: self._domain)
stype = property(lambda self: self._stype)
data = property(lambda self: self._data)
name = property(lambda self: self._name)
key = property(lambda self: self._key)
ip = property(lambda self: self._ip)
def __init__(self, args, identified_cback):
self._identified_cback = identified_cback
self._interface_id = args[0]
self._protocol_id = args[1]
self._name = args[2]
self._stype = args[3]
self._domain = args[4]
self._flags = args[5]
self._ip_family = 4 if self._protocol_id == Avahi.PROTO_INET else 6
self._interface_name = socket.if_indextoname(self._interface_id).strip()
self._protocol_name = Avahi.protocol_as_string(self._protocol_id)
self._flags_str = '(' + Avahi.result_flags_as_string(self._flags) + '),'
self._key = mk_service_key(self._interface_id, self._protocol_id, self._name, self._stype, self._domain)
self._key_str = f'({self._interface_name}, {self._protocol_name}, {self._name}.{self._domain}, {self._stype})'
self._id = fmt_service_str(
self._interface_id, self._protocol_id, self._name, self._stype, self._domain, self._flags
)
self._ip = None
self._resolver = None
self._data = {}
self._reachable = False
self._connect_checker = None
def info(self):
'''Return debug info'''
info = self._data
info['reachable'] = str(self._reachable)
return info
def __str__(self):
return self._id
def set_identity(self, transport, address, port, txt): # pylint: disable=too-many-arguments
'''Complete identification and check connectivity (if needed)
Return True if identification is complete. Return False if
we need to check connectivity.
'''
traddr = address.strip()
trsvcid = str(port).strip()
# host-iface permitted for tcp alone and not rdma
host_iface = self._interface_name if transport == 'tcp' else ''
self._data = {
'transport': transport,
'traddr': traddr,
'trsvcid': trsvcid,
# host-iface permitted for tcp alone and not rdma
'host-iface': host_iface,
'subsysnqn': txt.get('nqn', defs.WELL_KNOWN_DISC_NQN).strip()
if conf.NvmeOptions().discovery_supp
else defs.WELL_KNOWN_DISC_NQN,
}
self._ip = iputil.get_ipaddress_obj(traddr, ipv4_mapped_convert=True)
if transport != 'tcp':
self._reachable = True
self._identified_cback()
return
self._reachable = False
connect_checker = gutil.TcpChecker(traddr, trsvcid, host_iface, self._tcp_connect_check_cback)
try:
connect_checker.connect()
except RuntimeError as err:
logging.error('Unable to verify connectivity: %s', err)
connect_checker.close()
connect_checker = None
self._connect_checker = connect_checker
def _tcp_connect_check_cback(self, connected):
if self._connect_checker is not None:
self._connect_checker.close()
self._connect_checker = None
self._reachable = connected
self._identified_cback()
def set_resolver(self, resolver):
'''Set the resolver object'''
self._resolver = resolver
def close(self):
'''Close this object and release all resources'''
if self._connect_checker is not None:
self._connect_checker.close()
self._connect_checker = None
if self._resolver is not None:
try:
self._resolver.Free()
dasbus.client.proxy.disconnect_proxy(self._resolver)
except (AttributeError, dasbus.error.DBusError) as ex:
logging.debug('Service.close() - Failed to Free() resolver. %s', ex)
self._resolver = None
# ******************************************************************************
class Avahi: # pylint: disable=too-many-instance-attributes
'''@brief Avahi Server proxy. Set up the D-Bus connection to the Avahi
@ -182,16 +317,10 @@ class Avahi: # pylint: disable=too-many-instance-attributes
def info(self) -> dict:
'''@brief return debug info about this object'''
services = dict()
for service, obj in self._services.items():
interface, protocol, name, stype, domain = service
key = f'({socket.if_indextoname(interface)}, {Avahi.protos.get(protocol, "unknown")}, {name}.{domain}, {stype})'
services[key] = obj.get('data', {})
info = {
'avahi wake up timer': str(self._kick_avahi_tmr),
'service types': list(self._stypes),
'services': services,
'services': {service.key_str: service.info() for service in self._services.values()},
}
return info
@ -217,7 +346,7 @@ class Avahi: # pylint: disable=too-many-instance-attributes
[...]
]
'''
return [service['data'] for service in self._services.values() if len(service['data'])]
return [service.data for service in self._services.values() if service.reachable]
def config_stypes(self, stypes: list):
'''@brief Configure the service types that we want to discover.
@ -234,18 +363,17 @@ class Avahi: # pylint: disable=too-many-instance-attributes
'''
self._kick_avahi_tmr.clear()
def _remove_service(self, service_to_rm: typing.Tuple[int, int, str, str, str]):
service = self._services.pop(service_to_rm)
if service is not None:
service.close()
def _disconnect(self):
logging.debug('Avahi._disconnect()')
for service in self._services.values():
resolver = service.pop('resolver', None)
if resolver is not None:
try:
resolver.Free()
dasbus.client.proxy.disconnect_proxy(resolver)
except (AttributeError, dasbus.error.DBusError) as ex:
logging.debug('Avahi._disconnect() - Failed to Free() resolver. %s', ex)
service.close()
self._services = dict()
self._services.clear()
for browser in self._service_browsers.values():
try:
@ -296,15 +424,9 @@ class Avahi: # pylint: disable=too-many-instance-attributes
logging.debug('Avahi._configure_browsers() - Failed to Free() browser. %s', ex)
# Find the cached services corresponding to stype_to_rm and remove them
services_to_rm = [service for service in self._services if service[3] == stype_to_rm]
for service in services_to_rm:
resolver = self._services.pop(service, {}).pop('resolver', None)
if resolver is not None:
try:
resolver.Free()
dasbus.client.proxy.disconnect_proxy(resolver)
except (AttributeError, dasbus.error.DBusError) as ex:
logging.debug('Avahi._configure_browsers() - Failed to Free() resolver. %s', ex)
services_to_rm = [service.key for service in self._services.values() if service.stype == stype_to_rm]
for service_to_rm in services_to_rm:
self._remove_service(service_to_rm)
for stype in stypes_to_add:
try:
@ -329,31 +451,25 @@ class Avahi: # pylint: disable=too-many-instance-attributes
args: typing.Tuple[int, int, str, str, str, int],
*_user_data,
):
(interface, protocol, name, stype, domain, flags) = args
logging.debug(
'Avahi._service_discovered() - interface=%s (%s), protocol=%s, stype=%s, domain=%s, flags=%s %-14s name=%s',
interface,
socket.if_indextoname(interface),
Avahi.protocol_as_string(protocol),
stype,
domain,
flags,
'(' + Avahi.result_flags_as_string(flags) + '),',
name,
)
service = Service(args, self._change_cb)
logging.debug('Avahi._service_discovered() - %s', service)
service = (interface, protocol, name, stype, domain)
if service not in self._services:
if service.key not in self._services:
try:
obj_path = self._avahi.ServiceResolverNew(
interface, protocol, name, stype, domain, Avahi.PROTO_UNSPEC, Avahi.LOOKUP_USE_MULTICAST
service.interface,
service.protocol,
service.name,
service.stype,
service.domain,
Avahi.PROTO_UNSPEC,
Avahi.LOOKUP_USE_MULTICAST,
)
self._services[service] = {
'resolver': self._sysbus.get_proxy(Avahi.DBUS_NAME, obj_path),
'data': {},
}
service.set_resolver(self._sysbus.get_proxy(Avahi.DBUS_NAME, obj_path))
except dasbus.error.DBusError as ex:
logging.warning('Failed to create resolver: "%s", "%s", "%s". %s', interface, name, stype, ex)
logging.warning('Failed to create resolver - %s: %s', service, ex)
self._services[service.key] = service
def _service_removed(
self,
@ -367,27 +483,14 @@ class Avahi: # pylint: disable=too-many-instance-attributes
):
(interface, protocol, name, stype, domain, flags) = args
logging.debug(
'Avahi._service_removed() - interface=%s (%s), protocol=%s, stype=%s, domain=%s, flags=%s %-14s name=%s',
interface,
socket.if_indextoname(interface),
Avahi.protocol_as_string(protocol),
stype,
domain,
flags,
'(' + Avahi.result_flags_as_string(flags) + '),',
name,
'Avahi._service_removed() - %s',
fmt_service_str(interface, protocol, name, stype, domain, flags),
)
service = (interface, protocol, name, stype, domain)
resolver = self._services.pop(service, {}).pop('resolver', None)
if resolver is not None:
try:
resolver.Free()
dasbus.client.proxy.disconnect_proxy(resolver)
except (AttributeError, dasbus.error.DBusError) as ex:
logging.debug('Avahi._service_removed() - Failed to Free() resolver. %s', ex)
self._change_cb()
service_key = mk_service_key(interface, protocol, name, stype, domain)
self._remove_service(service_key)
if self._change_cb is not None:
self._change_cb()
def _service_identified( # pylint: disable=too-many-locals
self,
@ -402,38 +505,21 @@ class Avahi: # pylint: disable=too-many-instance-attributes
(interface, protocol, name, stype, domain, host, aprotocol, address, port, txt, flags) = args
txt = _txt2dict(txt)
logging.debug(
'Avahi._service_identified() - interface=%s (%s), protocol=%s, stype=%s, domain=%s, flags=%s %-14s name=%s, host=%s, aprotocol=%s, address=%s, port=%s, txt=%s',
interface,
socket.if_indextoname(interface),
Avahi.protocol_as_string(protocol),
stype,
domain,
flags,
'(' + Avahi.result_flags_as_string(flags) + '),',
name,
'Avahi._service_identified() - %s, host=%s, aprotocol=%s, port=%s, address=%s, txt=%s',
fmt_service_str(interface, protocol, name, stype, domain, flags),
host,
Avahi.protocol_as_string(aprotocol),
address,
port,
address,
txt,
)
service = (interface, protocol, name, stype, domain)
if service in self._services:
service_key = mk_service_key(interface, protocol, name, stype, domain)
service = self._services.get(service_key, None)
if service is not None:
transport = _proto2trans(txt.get('p'))
if transport is not None:
self._services[service]['data'] = {
'transport': transport,
'traddr': address.strip(),
'trsvcid': str(port).strip(),
# host-iface permitted for tcp alone and not rdma
'host-iface': socket.if_indextoname(interface).strip() if transport == 'tcp' else '',
'subsysnqn': txt.get('nqn', defs.WELL_KNOWN_DISC_NQN).strip()
if conf.NvmeOptions().discovery_supp
else defs.WELL_KNOWN_DISC_NQN,
}
self._change_cb()
service.set_identity(transport, address, port, txt)
else:
logging.error(
'Received invalid/undefined protocol in mDNS TXT field: address=%s, iface=%s, TXT=%s',
@ -442,6 +528,8 @@ class Avahi: # pylint: disable=too-many-instance-attributes
txt,
)
self._check_for_duplicate_ips()
def _failure_handler( # pylint: disable=no-self-use
self,
_connection,
@ -456,3 +544,15 @@ class Avahi: # pylint: disable=too-many-instance-attributes
if 'ServiceResolver' not in interface_name or 'TimeoutError' not in error:
# ServiceResolver may fire a timeout event after being Free'd(). This seems to be normal.
logging.error('Avahi._failure_handler() - name=%s, error=%s', interface_name, error)
def _check_for_duplicate_ips(self):
'''This is to identify misconfigured networks where the
same IP addresses are discovered on two or more interfaces.'''
ips = {}
for service in self._services.values():
if service.ip is not None:
ips.setdefault(service.ip.compressed, []).append(service.interface_name)
for ip, ifaces in ips.items():
if len(ifaces) > 1:
logging.error('IP address %s was found on multiple interfaces: %s', ip, ','.join(ifaces))

View file

@ -169,9 +169,9 @@ class Controller(stas.ControllerABC): # pylint: disable=too-many-instance-attri
self._udev.unregister_for_device_events(self._on_udev_notification)
self._kill_ops() # Kill all pending operations
self._ctrl = None
# Defer removal of this object to the next main loop's idle period.
GLib.idle_add(self._serv.remove_controller, self, True)
self._device = None
self._connect_attempts = 0
self._retry_connect_tmr.start()
def _get_cfg(self):
'''Get configuration parameters. These may either come from the [Global]

View file

@ -11,6 +11,7 @@ access to GLib/Gio/Gobject resources.
'''
import logging
import socket
from gi.repository import Gio, GLib, GObject
from staslib import conf, iputil, trid
@ -416,3 +417,100 @@ class Deferred:
if self.is_scheduled():
self._source.destroy()
self._source = None
# ******************************************************************************
class TcpChecker: # pylint: disable=too-many-instance-attributes
'''@brief Verify that a TCP connection can be established with an enpoint'''
def __init__(self, traddr, trsvcid, host_iface, user_cback, *user_data):
self._user_cback = user_cback
self._host_iface = host_iface
self._user_data = user_data
self._trsvcid = trsvcid
self._traddr = iputil.get_ipaddress_obj(traddr, ipv4_mapped_convert=True)
self._cancellable = None
self._gio_sock = None
self._native_sock = None
def connect(self):
'''Attempt to connect'''
self.close()
# Gio has limited setsockopt() capabilities. To set SO_BINDTODEVICE
# we need to use a generic socket.socket() and then convert to a
# Gio.Socket() object to perform async connect operation within
# the GLib context.
family = socket.AF_INET if self._traddr.version == 4 else socket.AF_INET6
self._native_sock = socket.socket(family, socket.SOCK_STREAM | socket.SOCK_NONBLOCK, socket.IPPROTO_TCP)
if isinstance(self._host_iface, str):
self._native_sock.setsockopt(socket.SOL_SOCKET, socket.SO_BINDTODEVICE, self._host_iface.encode('utf-8'))
# Convert socket.socket() to a Gio.Socket() object
try:
self._gio_sock = Gio.Socket.new_from_fd(self._native_sock.fileno()) # returns None on error
except GLib.Error as err:
logging.error('Cannot create socket: %s', err.message) # pylint: disable=no-member
self._gio_sock = None
if self._gio_sock is None:
self._native_sock.close()
raise RuntimeError(f'Unable to connect to {self._traddr}, {self._trsvcid}, {self._host_iface}')
g_addr = Gio.InetSocketAddress.new_from_string(self._traddr.compressed, int(self._trsvcid))
self._cancellable = Gio.Cancellable()
g_sockconn = self._gio_sock.connection_factory_create_connection()
g_sockconn.connect_async(g_addr, self._cancellable, self._connect_async_cback)
def close(self):
'''Terminate/Cancel current connection attempt and free resources'''
if self._cancellable is not None:
self._cancellable.cancel()
self._cancellable = None
if self._gio_sock is not None:
try:
self._gio_sock.close()
except GLib.Error as err:
logging.debug('TcpChecker.close() gio_sock.close - %s', err.message) # pylint: disable=no-member
self._gio_sock = None
if self._native_sock is not None:
try:
# This is expected to fail because the socket
# is already closed by self._gio_sock.close() above.
# This code is just for completeness.
self._native_sock.close()
except OSError:
pass
self._native_sock = None
def _connect_async_cback(self, source_object, result):
'''
@param source_object: The Gio.SocketConnection object used to
invoke the connect_async() API.
'''
try:
connected = source_object.connect_finish(result)
except GLib.Error as err:
connected = False
# We don't need to report "cancellation" errors.
if err.matches(Gio.io_error_quark(), Gio.IOErrorEnum.CANCELLED):
logging.debug('TcpChecker._connect_async_cback() - %s', err.message) # pylint: disable=no-member
else:
logging.info(
'Unable to verify TCP connectivity - (%-10s %-14s %s): %s',
self._host_iface + ',',
self._traddr.compressed + ',',
self._trsvcid,
err.message, # pylint: disable=no-member
)
self.close()
if self._user_cback is not None:
self._user_cback(connected, *self._user_data)