Adding upstream version 2.2.1.
Signed-off-by: Daniel Baumann <daniel@debian.org>
This commit is contained in:
parent
1d36de0179
commit
757b718eff
129 changed files with 16110 additions and 0 deletions
456
staslib/avahi.py
Normal file
456
staslib/avahi.py
Normal file
|
@ -0,0 +1,456 @@
|
|||
# Copyright (c) 2021, Dell Inc. or its subsidiaries. All rights reserved.
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
# See the LICENSE file for details.
|
||||
#
|
||||
# This file is part of NVMe STorage Appliance Services (nvme-stas).
|
||||
#
|
||||
# Authors: Martin Belanger <Martin.Belanger@dell.com>
|
||||
#
|
||||
''' Module that provides a way to retrieve discovered
|
||||
services from the Avahi daemon over D-Bus.
|
||||
'''
|
||||
import socket
|
||||
import typing
|
||||
import logging
|
||||
import functools
|
||||
import dasbus.error
|
||||
import dasbus.connection
|
||||
import dasbus.client.proxy
|
||||
import dasbus.client.observer
|
||||
from gi.repository import GLib
|
||||
from staslib import defs, conf, gutil
|
||||
|
||||
|
||||
def _txt2dict(txt: list):
|
||||
'''@param txt: A list of list of integers. The integers are the ASCII value
|
||||
of printable text characters.
|
||||
'''
|
||||
the_dict = dict()
|
||||
for list_of_chars in txt:
|
||||
try:
|
||||
string = functools.reduce(lambda accumulator, c: accumulator + chr(c), list_of_chars, '')
|
||||
key, val = string.split("=")
|
||||
the_dict[key.lower()] = val
|
||||
except Exception: # pylint: disable=broad-except
|
||||
pass
|
||||
|
||||
return the_dict
|
||||
|
||||
|
||||
def _proto2trans(protocol):
|
||||
'''Return the matching transport for the given protocol.'''
|
||||
if protocol is None:
|
||||
return None
|
||||
|
||||
protocol = protocol.strip().lower()
|
||||
if protocol == 'tcp':
|
||||
return 'tcp'
|
||||
|
||||
if protocol in ('roce', 'iwarp', 'rdma'):
|
||||
return 'rdma'
|
||||
|
||||
return None
|
||||
|
||||
|
||||
# ******************************************************************************
|
||||
class Avahi: # pylint: disable=too-many-instance-attributes
|
||||
'''@brief Avahi Server proxy. Set up the D-Bus connection to the Avahi
|
||||
daemon and register to be notified when services of a certain
|
||||
type (stype) are discovered or lost.
|
||||
'''
|
||||
|
||||
DBUS_NAME = 'org.freedesktop.Avahi'
|
||||
DBUS_INTERFACE_SERVICE_BROWSER = DBUS_NAME + '.ServiceBrowser'
|
||||
DBUS_INTERFACE_SERVICE_RESOLVER = DBUS_NAME + '.ServiceResolver'
|
||||
LOOKUP_USE_MULTICAST = 2
|
||||
|
||||
IF_UNSPEC = -1
|
||||
PROTO_INET = 0
|
||||
PROTO_INET6 = 1
|
||||
PROTO_UNSPEC = -1
|
||||
|
||||
LOOKUP_RESULT_LOCAL = 8 # This record/service resides on and was announced by the local host
|
||||
LOOKUP_RESULT_CACHED = 1 # This response originates from the cache
|
||||
LOOKUP_RESULT_STATIC = 32 # The returned data has been defined statically by some configuration option
|
||||
LOOKUP_RESULT_OUR_OWN = 16 # This service belongs to the same local client as the browser object
|
||||
LOOKUP_RESULT_WIDE_AREA = 2 # This response originates from wide area DNS
|
||||
LOOKUP_RESULT_MULTICAST = 4 # This response originates from multicast DNS
|
||||
|
||||
result_flags = {
|
||||
LOOKUP_RESULT_LOCAL: 'local',
|
||||
LOOKUP_RESULT_CACHED: 'cache',
|
||||
LOOKUP_RESULT_STATIC: 'static',
|
||||
LOOKUP_RESULT_OUR_OWN: 'own',
|
||||
LOOKUP_RESULT_WIDE_AREA: 'wan',
|
||||
LOOKUP_RESULT_MULTICAST: 'mcast',
|
||||
}
|
||||
|
||||
protos = {PROTO_INET: 'IPv4', PROTO_INET6: 'IPv6', PROTO_UNSPEC: 'uspecified'}
|
||||
|
||||
@classmethod
|
||||
def result_flags_as_string(cls, flags):
|
||||
'''Convert flags to human-readable string'''
|
||||
return '+'.join((value for flag, value in Avahi.result_flags.items() if (flags & flag) != 0))
|
||||
|
||||
@classmethod
|
||||
def protocol_as_string(cls, proto):
|
||||
'''Convert protocol codes to human-readable strings'''
|
||||
return Avahi.protos.get(proto, 'unknown')
|
||||
|
||||
# ==========================================================================
|
||||
def __init__(self, sysbus, change_cb):
|
||||
self._change_cb = change_cb
|
||||
self._services = dict()
|
||||
self._sysbus = sysbus
|
||||
self._stypes = set()
|
||||
self._service_browsers = dict()
|
||||
|
||||
# Avahi is an on-demand service. If, for some reason, the avahi-daemon
|
||||
# were to stop, we need to try to contact it for it to restart. For
|
||||
# example, when installing the avahi-daemon package on a running system,
|
||||
# the daemon doesn't get started right away. It needs another process to
|
||||
# access it over D-Bus to wake it up. The following timer is used to
|
||||
# periodically query the avahi-daemon until we successfully establish
|
||||
# first contact.
|
||||
self._kick_avahi_tmr = gutil.GTimer(60, self._on_kick_avahi)
|
||||
|
||||
# Subscribe for Avahi signals (i.e. events). This must be done before
|
||||
# any Browser or Resolver is created to avoid race conditions and
|
||||
# missed events.
|
||||
self._subscriptions = [
|
||||
self._sysbus.connection.signal_subscribe(
|
||||
Avahi.DBUS_NAME,
|
||||
Avahi.DBUS_INTERFACE_SERVICE_BROWSER,
|
||||
'ItemNew',
|
||||
None,
|
||||
None,
|
||||
0,
|
||||
self._service_discovered,
|
||||
),
|
||||
self._sysbus.connection.signal_subscribe(
|
||||
Avahi.DBUS_NAME,
|
||||
Avahi.DBUS_INTERFACE_SERVICE_BROWSER,
|
||||
'ItemRemove',
|
||||
None,
|
||||
None,
|
||||
0,
|
||||
self._service_removed,
|
||||
),
|
||||
self._sysbus.connection.signal_subscribe(
|
||||
Avahi.DBUS_NAME, Avahi.DBUS_INTERFACE_SERVICE_BROWSER, 'Failure', None, None, 0, self._failure_handler
|
||||
),
|
||||
self._sysbus.connection.signal_subscribe(
|
||||
Avahi.DBUS_NAME, Avahi.DBUS_INTERFACE_SERVICE_RESOLVER, 'Found', None, None, 0, self._service_identified
|
||||
),
|
||||
self._sysbus.connection.signal_subscribe(
|
||||
Avahi.DBUS_NAME, Avahi.DBUS_INTERFACE_SERVICE_RESOLVER, 'Failure', None, None, 0, self._failure_handler
|
||||
),
|
||||
]
|
||||
|
||||
self._avahi = self._sysbus.get_proxy(Avahi.DBUS_NAME, '/')
|
||||
|
||||
self._avahi_watcher = dasbus.client.observer.DBusObserver(self._sysbus, Avahi.DBUS_NAME)
|
||||
self._avahi_watcher.service_available.connect(self._avahi_available)
|
||||
self._avahi_watcher.service_unavailable.connect(self._avahi_unavailable)
|
||||
self._avahi_watcher.connect_once_available()
|
||||
|
||||
def kill(self):
|
||||
'''@brief Clean up object'''
|
||||
logging.debug('Avahi.kill()')
|
||||
|
||||
self._kick_avahi_tmr.kill()
|
||||
self._kick_avahi_tmr = None
|
||||
|
||||
for subscription in self._subscriptions:
|
||||
self._sysbus.connection.signal_unsubscribe(subscription)
|
||||
self._subscriptions = list()
|
||||
|
||||
self._disconnect()
|
||||
|
||||
self._avahi_watcher.service_available.disconnect()
|
||||
self._avahi_watcher.service_unavailable.disconnect()
|
||||
self._avahi_watcher.disconnect()
|
||||
self._avahi_watcher = None
|
||||
|
||||
dasbus.client.proxy.disconnect_proxy(self._avahi)
|
||||
self._avahi = None
|
||||
|
||||
self._change_cb = None
|
||||
self._sysbus = None
|
||||
|
||||
def info(self) -> dict:
|
||||
'''@brief return debug info about this object'''
|
||||
services = dict()
|
||||
for service, obj in self._services.items():
|
||||
interface, protocol, name, stype, domain = service
|
||||
key = f'({socket.if_indextoname(interface)}, {Avahi.protos.get(protocol, "unknown")}, {name}.{domain}, {stype})'
|
||||
services[key] = obj.get('data', {})
|
||||
|
||||
info = {
|
||||
'avahi wake up timer': str(self._kick_avahi_tmr),
|
||||
'service types': list(self._stypes),
|
||||
'services': services,
|
||||
}
|
||||
|
||||
return info
|
||||
|
||||
def get_controllers(self) -> list:
|
||||
'''@brief Get the discovery controllers as a list of dict()
|
||||
as follows:
|
||||
[
|
||||
{
|
||||
'transport': tcp,
|
||||
'traddr': str(),
|
||||
'trsvcid': str(),
|
||||
'host-iface': str(),
|
||||
'subsysnqn': 'nqn.2014-08.org.nvmexpress.discovery',
|
||||
},
|
||||
{
|
||||
'transport': tcp,
|
||||
'traddr': str(),
|
||||
'trsvcid': str(),
|
||||
'host-iface': str(),
|
||||
'subsysnqn': 'nqn.2014-08.org.nvmexpress.discovery',
|
||||
},
|
||||
[...]
|
||||
]
|
||||
'''
|
||||
return [service['data'] for service in self._services.values() if len(service['data'])]
|
||||
|
||||
def config_stypes(self, stypes: list):
|
||||
'''@brief Configure the service types that we want to discover.
|
||||
@param stypes: A list of services types, e.g. ['_nvme-disc._tcp']
|
||||
'''
|
||||
self._stypes = set(stypes)
|
||||
success = self._configure_browsers()
|
||||
if not success:
|
||||
self._kick_avahi_tmr.start()
|
||||
|
||||
def kick_start(self):
|
||||
'''@brief We use this to kick start the Avahi
|
||||
daemon (i.e. socket activation).
|
||||
'''
|
||||
self._kick_avahi_tmr.clear()
|
||||
|
||||
def _disconnect(self):
|
||||
logging.debug('Avahi._disconnect()')
|
||||
for service in self._services.values():
|
||||
resolver = service.pop('resolver', None)
|
||||
if resolver is not None:
|
||||
try:
|
||||
resolver.Free()
|
||||
dasbus.client.proxy.disconnect_proxy(resolver)
|
||||
except (AttributeError, dasbus.error.DBusError) as ex:
|
||||
logging.debug('Avahi._disconnect() - Failed to Free() resolver. %s', ex)
|
||||
|
||||
self._services = dict()
|
||||
|
||||
for browser in self._service_browsers.values():
|
||||
try:
|
||||
browser.Free()
|
||||
dasbus.client.proxy.disconnect_proxy(browser)
|
||||
except (AttributeError, dasbus.error.DBusError) as ex:
|
||||
logging.debug('Avahi._disconnect() - Failed to Free() browser. %s', ex)
|
||||
|
||||
self._service_browsers = dict()
|
||||
|
||||
def _on_kick_avahi(self):
|
||||
try:
|
||||
# try to contact avahi-daemon. This is just a wake
|
||||
# up call in case the avahi-daemon was sleeping.
|
||||
self._avahi.GetVersionString()
|
||||
except dasbus.error.DBusError:
|
||||
return GLib.SOURCE_CONTINUE
|
||||
|
||||
return GLib.SOURCE_REMOVE
|
||||
|
||||
def _avahi_available(self, _avahi_watcher):
|
||||
'''@brief Hook up DBus signal handlers for signals from stafd.'''
|
||||
logging.info('avahi-daemon service available, zeroconf supported.')
|
||||
success = self._configure_browsers()
|
||||
if not success:
|
||||
self._kick_avahi_tmr.start()
|
||||
|
||||
def _avahi_unavailable(self, _avahi_watcher):
|
||||
self._disconnect()
|
||||
logging.warning('avahi-daemon not available, zeroconf not supported.')
|
||||
self._kick_avahi_tmr.start()
|
||||
|
||||
def _configure_browsers(self):
|
||||
stypes_cur = set(self._service_browsers.keys())
|
||||
stypes_to_add = self._stypes - stypes_cur
|
||||
stypes_to_rm = stypes_cur - self._stypes
|
||||
|
||||
logging.debug('Avahi._configure_browsers() - stypes_to_rm = %s', list(stypes_to_rm))
|
||||
logging.debug('Avahi._configure_browsers() - stypes_to_add = %s', list(stypes_to_add))
|
||||
|
||||
for stype_to_rm in stypes_to_rm:
|
||||
browser = self._service_browsers.pop(stype_to_rm, None)
|
||||
if browser is not None:
|
||||
try:
|
||||
browser.Free()
|
||||
dasbus.client.proxy.disconnect_proxy(browser)
|
||||
except (AttributeError, dasbus.error.DBusError) as ex:
|
||||
logging.debug('Avahi._configure_browsers() - Failed to Free() browser. %s', ex)
|
||||
|
||||
# Find the cached services corresponding to stype_to_rm and remove them
|
||||
services_to_rm = [service for service in self._services if service[3] == stype_to_rm]
|
||||
for service in services_to_rm:
|
||||
resolver = self._services.pop(service, {}).pop('resolver', None)
|
||||
if resolver is not None:
|
||||
try:
|
||||
resolver.Free()
|
||||
dasbus.client.proxy.disconnect_proxy(resolver)
|
||||
except (AttributeError, dasbus.error.DBusError) as ex:
|
||||
logging.debug('Avahi._configure_browsers() - Failed to Free() resolver. %s', ex)
|
||||
|
||||
for stype in stypes_to_add:
|
||||
try:
|
||||
obj_path = self._avahi.ServiceBrowserNew(
|
||||
Avahi.IF_UNSPEC, Avahi.PROTO_UNSPEC, stype, 'local', Avahi.LOOKUP_USE_MULTICAST
|
||||
)
|
||||
self._service_browsers[stype] = self._sysbus.get_proxy(Avahi.DBUS_NAME, obj_path)
|
||||
except dasbus.error.DBusError as ex:
|
||||
logging.debug('Avahi._configure_browsers() - Failed to contact avahi-daemon. %s', ex)
|
||||
logging.warning('avahi-daemon not available, operating w/o mDNS discovery.')
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def _service_discovered(
|
||||
self,
|
||||
_connection,
|
||||
_sender_name: str,
|
||||
_object_path: str,
|
||||
_interface_name: str,
|
||||
_signal_name: str,
|
||||
args: typing.Tuple[int, int, str, str, str, int],
|
||||
*_user_data,
|
||||
):
|
||||
(interface, protocol, name, stype, domain, flags) = args
|
||||
logging.debug(
|
||||
'Avahi._service_discovered() - interface=%s (%s), protocol=%s, stype=%s, domain=%s, flags=%s %-14s name=%s',
|
||||
interface,
|
||||
socket.if_indextoname(interface),
|
||||
Avahi.protocol_as_string(protocol),
|
||||
stype,
|
||||
domain,
|
||||
flags,
|
||||
'(' + Avahi.result_flags_as_string(flags) + '),',
|
||||
name,
|
||||
)
|
||||
|
||||
service = (interface, protocol, name, stype, domain)
|
||||
if service not in self._services:
|
||||
try:
|
||||
obj_path = self._avahi.ServiceResolverNew(
|
||||
interface, protocol, name, stype, domain, Avahi.PROTO_UNSPEC, Avahi.LOOKUP_USE_MULTICAST
|
||||
)
|
||||
self._services[service] = {
|
||||
'resolver': self._sysbus.get_proxy(Avahi.DBUS_NAME, obj_path),
|
||||
'data': {},
|
||||
}
|
||||
except dasbus.error.DBusError as ex:
|
||||
logging.warning('Failed to create resolver: "%s", "%s", "%s". %s', interface, name, stype, ex)
|
||||
|
||||
def _service_removed(
|
||||
self,
|
||||
_connection,
|
||||
_sender_name: str,
|
||||
_object_path: str,
|
||||
_interface_name: str,
|
||||
_signal_name: str,
|
||||
args: typing.Tuple[int, int, str, str, str, int],
|
||||
*_user_data,
|
||||
):
|
||||
(interface, protocol, name, stype, domain, flags) = args
|
||||
logging.debug(
|
||||
'Avahi._service_removed() - interface=%s (%s), protocol=%s, stype=%s, domain=%s, flags=%s %-14s name=%s',
|
||||
interface,
|
||||
socket.if_indextoname(interface),
|
||||
Avahi.protocol_as_string(protocol),
|
||||
stype,
|
||||
domain,
|
||||
flags,
|
||||
'(' + Avahi.result_flags_as_string(flags) + '),',
|
||||
name,
|
||||
)
|
||||
|
||||
service = (interface, protocol, name, stype, domain)
|
||||
resolver = self._services.pop(service, {}).pop('resolver', None)
|
||||
if resolver is not None:
|
||||
try:
|
||||
resolver.Free()
|
||||
dasbus.client.proxy.disconnect_proxy(resolver)
|
||||
except (AttributeError, dasbus.error.DBusError) as ex:
|
||||
logging.debug('Avahi._service_removed() - Failed to Free() resolver. %s', ex)
|
||||
|
||||
self._change_cb()
|
||||
|
||||
def _service_identified( # pylint: disable=too-many-locals
|
||||
self,
|
||||
_connection,
|
||||
_sender_name: str,
|
||||
_object_path: str,
|
||||
_interface_name: str,
|
||||
_signal_name: str,
|
||||
args: typing.Tuple[int, int, str, str, str, str, int, str, int, list, int],
|
||||
*_user_data,
|
||||
):
|
||||
(interface, protocol, name, stype, domain, host, aprotocol, address, port, txt, flags) = args
|
||||
txt = _txt2dict(txt)
|
||||
logging.debug(
|
||||
'Avahi._service_identified() - interface=%s (%s), protocol=%s, stype=%s, domain=%s, flags=%s %-14s name=%s, host=%s, aprotocol=%s, address=%s, port=%s, txt=%s',
|
||||
interface,
|
||||
socket.if_indextoname(interface),
|
||||
Avahi.protocol_as_string(protocol),
|
||||
stype,
|
||||
domain,
|
||||
flags,
|
||||
'(' + Avahi.result_flags_as_string(flags) + '),',
|
||||
name,
|
||||
host,
|
||||
Avahi.protocol_as_string(aprotocol),
|
||||
address,
|
||||
port,
|
||||
txt,
|
||||
)
|
||||
|
||||
service = (interface, protocol, name, stype, domain)
|
||||
if service in self._services:
|
||||
transport = _proto2trans(txt.get('p'))
|
||||
if transport is not None:
|
||||
self._services[service]['data'] = {
|
||||
'transport': transport,
|
||||
'traddr': address.strip(),
|
||||
'trsvcid': str(port).strip(),
|
||||
# host-iface permitted for tcp alone and not rdma
|
||||
'host-iface': socket.if_indextoname(interface).strip() if transport == 'tcp' else '',
|
||||
'subsysnqn': txt.get('nqn', defs.WELL_KNOWN_DISC_NQN).strip()
|
||||
if conf.NvmeOptions().discovery_supp
|
||||
else defs.WELL_KNOWN_DISC_NQN,
|
||||
}
|
||||
|
||||
self._change_cb()
|
||||
else:
|
||||
logging.error(
|
||||
'Received invalid/undefined protocol in mDNS TXT field: address=%s, iface=%s, TXT=%s',
|
||||
address,
|
||||
socket.if_indextoname(interface).strip(),
|
||||
txt,
|
||||
)
|
||||
|
||||
def _failure_handler( # pylint: disable=no-self-use
|
||||
self,
|
||||
_connection,
|
||||
_sender_name: str,
|
||||
_object_path: str,
|
||||
interface_name: str,
|
||||
_signal_name: str,
|
||||
args: typing.Tuple[str],
|
||||
*_user_data,
|
||||
):
|
||||
(error,) = args
|
||||
if 'ServiceResolver' not in interface_name or 'TimeoutError' not in error:
|
||||
# ServiceResolver may fire a timeout event after being Free'd(). This seems to be normal.
|
||||
logging.error('Avahi._failure_handler() - name=%s, error=%s', interface_name, error)
|
Loading…
Add table
Add a link
Reference in a new issue