Adding upstream version 2.3~rc2.
Signed-off-by: Daniel Baumann <daniel@debian.org>
This commit is contained in:
parent
a8d8344f07
commit
54e93f8d6d
6 changed files with 296 additions and 95 deletions
278
staslib/avahi.py
278
staslib/avahi.py
|
@ -18,7 +18,7 @@ import dasbus.connection
|
|||
import dasbus.client.proxy
|
||||
import dasbus.client.observer
|
||||
from gi.repository import GLib
|
||||
from staslib import defs, conf, gutil
|
||||
from staslib import defs, conf, gutil, iputil
|
||||
|
||||
|
||||
def _txt2dict(txt: list):
|
||||
|
@ -54,6 +54,141 @@ def _proto2trans(protocol):
|
|||
return None
|
||||
|
||||
|
||||
def mk_service_key(interface, protocol, name, stype, domain):
|
||||
'''Return a tuple used as a service key (unique identifier)'''
|
||||
return (interface, protocol, name, stype, domain)
|
||||
|
||||
|
||||
def fmt_service_str(interface, protocol, name, stype, domain, flags): # pylint: disable=too-many-arguments
|
||||
'''Return service identifier as a string'''
|
||||
return (
|
||||
f'interface={interface}:{(socket.if_indextoname(interface) + ","):<9} '
|
||||
f'protocol={Avahi.protocol_as_string(protocol)}, '
|
||||
f'stype={stype}, '
|
||||
f'domain={domain}, '
|
||||
f'flags={flags}:{(Avahi.result_flags_as_string(flags) + ","):<12} '
|
||||
f'name={name}'
|
||||
)
|
||||
|
||||
|
||||
# ******************************************************************************
|
||||
class Service: # pylint: disable=too-many-instance-attributes
|
||||
'''Object used to keep track of the services discovered from the avahi-daemon'''
|
||||
|
||||
interface_name = property(lambda self: self._interface_name)
|
||||
interface = property(lambda self: self._interface_id)
|
||||
ip_family = property(lambda self: self._ip_family)
|
||||
reachable = property(lambda self: self._reachable)
|
||||
protocol = property(lambda self: self._protocol_id)
|
||||
key_str = property(lambda self: self._key_str)
|
||||
domain = property(lambda self: self._domain)
|
||||
stype = property(lambda self: self._stype)
|
||||
data = property(lambda self: self._data)
|
||||
name = property(lambda self: self._name)
|
||||
key = property(lambda self: self._key)
|
||||
ip = property(lambda self: self._ip)
|
||||
|
||||
def __init__(self, args, identified_cback):
|
||||
self._identified_cback = identified_cback
|
||||
self._interface_id = args[0]
|
||||
self._protocol_id = args[1]
|
||||
self._name = args[2]
|
||||
self._stype = args[3]
|
||||
self._domain = args[4]
|
||||
self._flags = args[5]
|
||||
self._ip_family = 4 if self._protocol_id == Avahi.PROTO_INET else 6
|
||||
|
||||
self._interface_name = socket.if_indextoname(self._interface_id).strip()
|
||||
self._protocol_name = Avahi.protocol_as_string(self._protocol_id)
|
||||
self._flags_str = '(' + Avahi.result_flags_as_string(self._flags) + '),'
|
||||
|
||||
self._key = mk_service_key(self._interface_id, self._protocol_id, self._name, self._stype, self._domain)
|
||||
self._key_str = f'({self._interface_name}, {self._protocol_name}, {self._name}.{self._domain}, {self._stype})'
|
||||
|
||||
self._id = fmt_service_str(
|
||||
self._interface_id, self._protocol_id, self._name, self._stype, self._domain, self._flags
|
||||
)
|
||||
|
||||
self._ip = None
|
||||
self._resolver = None
|
||||
self._data = {}
|
||||
self._reachable = False
|
||||
self._connect_checker = None
|
||||
|
||||
def info(self):
|
||||
'''Return debug info'''
|
||||
info = self._data
|
||||
info['reachable'] = str(self._reachable)
|
||||
return info
|
||||
|
||||
def __str__(self):
|
||||
return self._id
|
||||
|
||||
def set_identity(self, transport, address, port, txt): # pylint: disable=too-many-arguments
|
||||
'''Complete identification and check connectivity (if needed)
|
||||
Return True if identification is complete. Return False if
|
||||
we need to check connectivity.
|
||||
'''
|
||||
traddr = address.strip()
|
||||
trsvcid = str(port).strip()
|
||||
# host-iface permitted for tcp alone and not rdma
|
||||
host_iface = self._interface_name if transport == 'tcp' else ''
|
||||
self._data = {
|
||||
'transport': transport,
|
||||
'traddr': traddr,
|
||||
'trsvcid': trsvcid,
|
||||
# host-iface permitted for tcp alone and not rdma
|
||||
'host-iface': host_iface,
|
||||
'subsysnqn': txt.get('nqn', defs.WELL_KNOWN_DISC_NQN).strip()
|
||||
if conf.NvmeOptions().discovery_supp
|
||||
else defs.WELL_KNOWN_DISC_NQN,
|
||||
}
|
||||
|
||||
self._ip = iputil.get_ipaddress_obj(traddr, ipv4_mapped_convert=True)
|
||||
|
||||
if transport != 'tcp':
|
||||
self._reachable = True
|
||||
self._identified_cback()
|
||||
return
|
||||
|
||||
self._reachable = False
|
||||
connect_checker = gutil.TcpChecker(traddr, trsvcid, host_iface, self._tcp_connect_check_cback)
|
||||
|
||||
try:
|
||||
connect_checker.connect()
|
||||
except RuntimeError as err:
|
||||
logging.error('Unable to verify connectivity: %s', err)
|
||||
connect_checker.close()
|
||||
connect_checker = None
|
||||
|
||||
self._connect_checker = connect_checker
|
||||
|
||||
def _tcp_connect_check_cback(self, connected):
|
||||
if self._connect_checker is not None:
|
||||
self._connect_checker.close()
|
||||
self._connect_checker = None
|
||||
self._reachable = connected
|
||||
self._identified_cback()
|
||||
|
||||
def set_resolver(self, resolver):
|
||||
'''Set the resolver object'''
|
||||
self._resolver = resolver
|
||||
|
||||
def close(self):
|
||||
'''Close this object and release all resources'''
|
||||
if self._connect_checker is not None:
|
||||
self._connect_checker.close()
|
||||
self._connect_checker = None
|
||||
|
||||
if self._resolver is not None:
|
||||
try:
|
||||
self._resolver.Free()
|
||||
dasbus.client.proxy.disconnect_proxy(self._resolver)
|
||||
except (AttributeError, dasbus.error.DBusError) as ex:
|
||||
logging.debug('Service.close() - Failed to Free() resolver. %s', ex)
|
||||
self._resolver = None
|
||||
|
||||
|
||||
# ******************************************************************************
|
||||
class Avahi: # pylint: disable=too-many-instance-attributes
|
||||
'''@brief Avahi Server proxy. Set up the D-Bus connection to the Avahi
|
||||
|
@ -182,16 +317,10 @@ class Avahi: # pylint: disable=too-many-instance-attributes
|
|||
|
||||
def info(self) -> dict:
|
||||
'''@brief return debug info about this object'''
|
||||
services = dict()
|
||||
for service, obj in self._services.items():
|
||||
interface, protocol, name, stype, domain = service
|
||||
key = f'({socket.if_indextoname(interface)}, {Avahi.protos.get(protocol, "unknown")}, {name}.{domain}, {stype})'
|
||||
services[key] = obj.get('data', {})
|
||||
|
||||
info = {
|
||||
'avahi wake up timer': str(self._kick_avahi_tmr),
|
||||
'service types': list(self._stypes),
|
||||
'services': services,
|
||||
'services': {service.key_str: service.info() for service in self._services.values()},
|
||||
}
|
||||
|
||||
return info
|
||||
|
@ -217,7 +346,7 @@ class Avahi: # pylint: disable=too-many-instance-attributes
|
|||
[...]
|
||||
]
|
||||
'''
|
||||
return [service['data'] for service in self._services.values() if len(service['data'])]
|
||||
return [service.data for service in self._services.values() if service.reachable]
|
||||
|
||||
def config_stypes(self, stypes: list):
|
||||
'''@brief Configure the service types that we want to discover.
|
||||
|
@ -234,18 +363,17 @@ class Avahi: # pylint: disable=too-many-instance-attributes
|
|||
'''
|
||||
self._kick_avahi_tmr.clear()
|
||||
|
||||
def _remove_service(self, service_to_rm: typing.Tuple[int, int, str, str, str]):
|
||||
service = self._services.pop(service_to_rm)
|
||||
if service is not None:
|
||||
service.close()
|
||||
|
||||
def _disconnect(self):
|
||||
logging.debug('Avahi._disconnect()')
|
||||
for service in self._services.values():
|
||||
resolver = service.pop('resolver', None)
|
||||
if resolver is not None:
|
||||
try:
|
||||
resolver.Free()
|
||||
dasbus.client.proxy.disconnect_proxy(resolver)
|
||||
except (AttributeError, dasbus.error.DBusError) as ex:
|
||||
logging.debug('Avahi._disconnect() - Failed to Free() resolver. %s', ex)
|
||||
service.close()
|
||||
|
||||
self._services = dict()
|
||||
self._services.clear()
|
||||
|
||||
for browser in self._service_browsers.values():
|
||||
try:
|
||||
|
@ -296,15 +424,9 @@ class Avahi: # pylint: disable=too-many-instance-attributes
|
|||
logging.debug('Avahi._configure_browsers() - Failed to Free() browser. %s', ex)
|
||||
|
||||
# Find the cached services corresponding to stype_to_rm and remove them
|
||||
services_to_rm = [service for service in self._services if service[3] == stype_to_rm]
|
||||
for service in services_to_rm:
|
||||
resolver = self._services.pop(service, {}).pop('resolver', None)
|
||||
if resolver is not None:
|
||||
try:
|
||||
resolver.Free()
|
||||
dasbus.client.proxy.disconnect_proxy(resolver)
|
||||
except (AttributeError, dasbus.error.DBusError) as ex:
|
||||
logging.debug('Avahi._configure_browsers() - Failed to Free() resolver. %s', ex)
|
||||
services_to_rm = [service.key for service in self._services.values() if service.stype == stype_to_rm]
|
||||
for service_to_rm in services_to_rm:
|
||||
self._remove_service(service_to_rm)
|
||||
|
||||
for stype in stypes_to_add:
|
||||
try:
|
||||
|
@ -329,31 +451,25 @@ class Avahi: # pylint: disable=too-many-instance-attributes
|
|||
args: typing.Tuple[int, int, str, str, str, int],
|
||||
*_user_data,
|
||||
):
|
||||
(interface, protocol, name, stype, domain, flags) = args
|
||||
logging.debug(
|
||||
'Avahi._service_discovered() - interface=%s (%s), protocol=%s, stype=%s, domain=%s, flags=%s %-14s name=%s',
|
||||
interface,
|
||||
socket.if_indextoname(interface),
|
||||
Avahi.protocol_as_string(protocol),
|
||||
stype,
|
||||
domain,
|
||||
flags,
|
||||
'(' + Avahi.result_flags_as_string(flags) + '),',
|
||||
name,
|
||||
)
|
||||
service = Service(args, self._change_cb)
|
||||
logging.debug('Avahi._service_discovered() - %s', service)
|
||||
|
||||
service = (interface, protocol, name, stype, domain)
|
||||
if service not in self._services:
|
||||
if service.key not in self._services:
|
||||
try:
|
||||
obj_path = self._avahi.ServiceResolverNew(
|
||||
interface, protocol, name, stype, domain, Avahi.PROTO_UNSPEC, Avahi.LOOKUP_USE_MULTICAST
|
||||
service.interface,
|
||||
service.protocol,
|
||||
service.name,
|
||||
service.stype,
|
||||
service.domain,
|
||||
Avahi.PROTO_UNSPEC,
|
||||
Avahi.LOOKUP_USE_MULTICAST,
|
||||
)
|
||||
self._services[service] = {
|
||||
'resolver': self._sysbus.get_proxy(Avahi.DBUS_NAME, obj_path),
|
||||
'data': {},
|
||||
}
|
||||
service.set_resolver(self._sysbus.get_proxy(Avahi.DBUS_NAME, obj_path))
|
||||
except dasbus.error.DBusError as ex:
|
||||
logging.warning('Failed to create resolver: "%s", "%s", "%s". %s', interface, name, stype, ex)
|
||||
logging.warning('Failed to create resolver - %s: %s', service, ex)
|
||||
|
||||
self._services[service.key] = service
|
||||
|
||||
def _service_removed(
|
||||
self,
|
||||
|
@ -367,27 +483,14 @@ class Avahi: # pylint: disable=too-many-instance-attributes
|
|||
):
|
||||
(interface, protocol, name, stype, domain, flags) = args
|
||||
logging.debug(
|
||||
'Avahi._service_removed() - interface=%s (%s), protocol=%s, stype=%s, domain=%s, flags=%s %-14s name=%s',
|
||||
interface,
|
||||
socket.if_indextoname(interface),
|
||||
Avahi.protocol_as_string(protocol),
|
||||
stype,
|
||||
domain,
|
||||
flags,
|
||||
'(' + Avahi.result_flags_as_string(flags) + '),',
|
||||
name,
|
||||
'Avahi._service_removed() - %s',
|
||||
fmt_service_str(interface, protocol, name, stype, domain, flags),
|
||||
)
|
||||
|
||||
service = (interface, protocol, name, stype, domain)
|
||||
resolver = self._services.pop(service, {}).pop('resolver', None)
|
||||
if resolver is not None:
|
||||
try:
|
||||
resolver.Free()
|
||||
dasbus.client.proxy.disconnect_proxy(resolver)
|
||||
except (AttributeError, dasbus.error.DBusError) as ex:
|
||||
logging.debug('Avahi._service_removed() - Failed to Free() resolver. %s', ex)
|
||||
|
||||
self._change_cb()
|
||||
service_key = mk_service_key(interface, protocol, name, stype, domain)
|
||||
self._remove_service(service_key)
|
||||
if self._change_cb is not None:
|
||||
self._change_cb()
|
||||
|
||||
def _service_identified( # pylint: disable=too-many-locals
|
||||
self,
|
||||
|
@ -402,38 +505,21 @@ class Avahi: # pylint: disable=too-many-instance-attributes
|
|||
(interface, protocol, name, stype, domain, host, aprotocol, address, port, txt, flags) = args
|
||||
txt = _txt2dict(txt)
|
||||
logging.debug(
|
||||
'Avahi._service_identified() - interface=%s (%s), protocol=%s, stype=%s, domain=%s, flags=%s %-14s name=%s, host=%s, aprotocol=%s, address=%s, port=%s, txt=%s',
|
||||
interface,
|
||||
socket.if_indextoname(interface),
|
||||
Avahi.protocol_as_string(protocol),
|
||||
stype,
|
||||
domain,
|
||||
flags,
|
||||
'(' + Avahi.result_flags_as_string(flags) + '),',
|
||||
name,
|
||||
'Avahi._service_identified() - %s, host=%s, aprotocol=%s, port=%s, address=%s, txt=%s',
|
||||
fmt_service_str(interface, protocol, name, stype, domain, flags),
|
||||
host,
|
||||
Avahi.protocol_as_string(aprotocol),
|
||||
address,
|
||||
port,
|
||||
address,
|
||||
txt,
|
||||
)
|
||||
|
||||
service = (interface, protocol, name, stype, domain)
|
||||
if service in self._services:
|
||||
service_key = mk_service_key(interface, protocol, name, stype, domain)
|
||||
service = self._services.get(service_key, None)
|
||||
if service is not None:
|
||||
transport = _proto2trans(txt.get('p'))
|
||||
if transport is not None:
|
||||
self._services[service]['data'] = {
|
||||
'transport': transport,
|
||||
'traddr': address.strip(),
|
||||
'trsvcid': str(port).strip(),
|
||||
# host-iface permitted for tcp alone and not rdma
|
||||
'host-iface': socket.if_indextoname(interface).strip() if transport == 'tcp' else '',
|
||||
'subsysnqn': txt.get('nqn', defs.WELL_KNOWN_DISC_NQN).strip()
|
||||
if conf.NvmeOptions().discovery_supp
|
||||
else defs.WELL_KNOWN_DISC_NQN,
|
||||
}
|
||||
|
||||
self._change_cb()
|
||||
service.set_identity(transport, address, port, txt)
|
||||
else:
|
||||
logging.error(
|
||||
'Received invalid/undefined protocol in mDNS TXT field: address=%s, iface=%s, TXT=%s',
|
||||
|
@ -442,6 +528,8 @@ class Avahi: # pylint: disable=too-many-instance-attributes
|
|||
txt,
|
||||
)
|
||||
|
||||
self._check_for_duplicate_ips()
|
||||
|
||||
def _failure_handler( # pylint: disable=no-self-use
|
||||
self,
|
||||
_connection,
|
||||
|
@ -456,3 +544,15 @@ class Avahi: # pylint: disable=too-many-instance-attributes
|
|||
if 'ServiceResolver' not in interface_name or 'TimeoutError' not in error:
|
||||
# ServiceResolver may fire a timeout event after being Free'd(). This seems to be normal.
|
||||
logging.error('Avahi._failure_handler() - name=%s, error=%s', interface_name, error)
|
||||
|
||||
def _check_for_duplicate_ips(self):
|
||||
'''This is to identify misconfigured networks where the
|
||||
same IP addresses are discovered on two or more interfaces.'''
|
||||
ips = {}
|
||||
for service in self._services.values():
|
||||
if service.ip is not None:
|
||||
ips.setdefault(service.ip.compressed, []).append(service.interface_name)
|
||||
|
||||
for ip, ifaces in ips.items():
|
||||
if len(ifaces) > 1:
|
||||
logging.error('IP address %s was found on multiple interfaces: %s', ip, ','.join(ifaces))
|
||||
|
|
|
@ -169,9 +169,9 @@ class Controller(stas.ControllerABC): # pylint: disable=too-many-instance-attri
|
|||
self._udev.unregister_for_device_events(self._on_udev_notification)
|
||||
self._kill_ops() # Kill all pending operations
|
||||
self._ctrl = None
|
||||
|
||||
# Defer removal of this object to the next main loop's idle period.
|
||||
GLib.idle_add(self._serv.remove_controller, self, True)
|
||||
self._device = None
|
||||
self._connect_attempts = 0
|
||||
self._retry_connect_tmr.start()
|
||||
|
||||
def _get_cfg(self):
|
||||
'''Get configuration parameters. These may either come from the [Global]
|
||||
|
|
|
@ -11,6 +11,7 @@ access to GLib/Gio/Gobject resources.
|
|||
'''
|
||||
|
||||
import logging
|
||||
import socket
|
||||
from gi.repository import Gio, GLib, GObject
|
||||
from staslib import conf, iputil, trid
|
||||
|
||||
|
@ -416,3 +417,100 @@ class Deferred:
|
|||
if self.is_scheduled():
|
||||
self._source.destroy()
|
||||
self._source = None
|
||||
|
||||
|
||||
# ******************************************************************************
|
||||
class TcpChecker: # pylint: disable=too-many-instance-attributes
|
||||
'''@brief Verify that a TCP connection can be established with an enpoint'''
|
||||
|
||||
def __init__(self, traddr, trsvcid, host_iface, user_cback, *user_data):
|
||||
self._user_cback = user_cback
|
||||
self._host_iface = host_iface
|
||||
self._user_data = user_data
|
||||
self._trsvcid = trsvcid
|
||||
self._traddr = iputil.get_ipaddress_obj(traddr, ipv4_mapped_convert=True)
|
||||
self._cancellable = None
|
||||
self._gio_sock = None
|
||||
self._native_sock = None
|
||||
|
||||
def connect(self):
|
||||
'''Attempt to connect'''
|
||||
self.close()
|
||||
|
||||
# Gio has limited setsockopt() capabilities. To set SO_BINDTODEVICE
|
||||
# we need to use a generic socket.socket() and then convert to a
|
||||
# Gio.Socket() object to perform async connect operation within
|
||||
# the GLib context.
|
||||
family = socket.AF_INET if self._traddr.version == 4 else socket.AF_INET6
|
||||
self._native_sock = socket.socket(family, socket.SOCK_STREAM | socket.SOCK_NONBLOCK, socket.IPPROTO_TCP)
|
||||
if isinstance(self._host_iface, str):
|
||||
self._native_sock.setsockopt(socket.SOL_SOCKET, socket.SO_BINDTODEVICE, self._host_iface.encode('utf-8'))
|
||||
|
||||
# Convert socket.socket() to a Gio.Socket() object
|
||||
try:
|
||||
self._gio_sock = Gio.Socket.new_from_fd(self._native_sock.fileno()) # returns None on error
|
||||
except GLib.Error as err:
|
||||
logging.error('Cannot create socket: %s', err.message) # pylint: disable=no-member
|
||||
self._gio_sock = None
|
||||
|
||||
if self._gio_sock is None:
|
||||
self._native_sock.close()
|
||||
raise RuntimeError(f'Unable to connect to {self._traddr}, {self._trsvcid}, {self._host_iface}')
|
||||
|
||||
g_addr = Gio.InetSocketAddress.new_from_string(self._traddr.compressed, int(self._trsvcid))
|
||||
|
||||
self._cancellable = Gio.Cancellable()
|
||||
|
||||
g_sockconn = self._gio_sock.connection_factory_create_connection()
|
||||
g_sockconn.connect_async(g_addr, self._cancellable, self._connect_async_cback)
|
||||
|
||||
def close(self):
|
||||
'''Terminate/Cancel current connection attempt and free resources'''
|
||||
if self._cancellable is not None:
|
||||
self._cancellable.cancel()
|
||||
self._cancellable = None
|
||||
|
||||
if self._gio_sock is not None:
|
||||
try:
|
||||
self._gio_sock.close()
|
||||
except GLib.Error as err:
|
||||
logging.debug('TcpChecker.close() gio_sock.close - %s', err.message) # pylint: disable=no-member
|
||||
|
||||
self._gio_sock = None
|
||||
|
||||
if self._native_sock is not None:
|
||||
try:
|
||||
# This is expected to fail because the socket
|
||||
# is already closed by self._gio_sock.close() above.
|
||||
# This code is just for completeness.
|
||||
self._native_sock.close()
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
self._native_sock = None
|
||||
|
||||
def _connect_async_cback(self, source_object, result):
|
||||
'''
|
||||
@param source_object: The Gio.SocketConnection object used to
|
||||
invoke the connect_async() API.
|
||||
'''
|
||||
try:
|
||||
connected = source_object.connect_finish(result)
|
||||
except GLib.Error as err:
|
||||
connected = False
|
||||
# We don't need to report "cancellation" errors.
|
||||
if err.matches(Gio.io_error_quark(), Gio.IOErrorEnum.CANCELLED):
|
||||
logging.debug('TcpChecker._connect_async_cback() - %s', err.message) # pylint: disable=no-member
|
||||
else:
|
||||
logging.info(
|
||||
'Unable to verify TCP connectivity - (%-10s %-14s %s): %s',
|
||||
self._host_iface + ',',
|
||||
self._traddr.compressed + ',',
|
||||
self._trsvcid,
|
||||
err.message, # pylint: disable=no-member
|
||||
)
|
||||
|
||||
self.close()
|
||||
|
||||
if self._user_cback is not None:
|
||||
self._user_cback(connected, *self._user_data)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue