1
0
Fork 0
nvme-stas/staslib/avahi.py

457 lines
17 KiB
Python
Raw Permalink Normal View History

# Copyright (c) 2021, Dell Inc. or its subsidiaries. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# See the LICENSE file for details.
#
# This file is part of NVMe STorage Appliance Services (nvme-stas).
#
# Authors: Martin Belanger <Martin.Belanger@dell.com>
#
''' Module that provides a way to retrieve discovered
services from the Avahi daemon over D-Bus.
'''
import socket
import typing
import logging
import functools
import dasbus.error
import dasbus.connection
import dasbus.client.proxy
import dasbus.client.observer
from gi.repository import GLib
from staslib import defs, conf, gutil
def _txt2dict(txt: list):
'''@param txt: A list of list of integers. The integers are the ASCII value
of printable text characters.
'''
the_dict = dict()
for list_of_chars in txt:
try:
string = functools.reduce(lambda accumulator, c: accumulator + chr(c), list_of_chars, '')
key, val = string.split("=")
the_dict[key.lower()] = val
except Exception: # pylint: disable=broad-except
pass
return the_dict
def _proto2trans(protocol):
'''Return the matching transport for the given protocol.'''
if protocol is None:
return None
protocol = protocol.strip().lower()
if protocol == 'tcp':
return 'tcp'
if protocol in ('roce', 'iwarp', 'rdma'):
return 'rdma'
return None
# ******************************************************************************
class Avahi: # pylint: disable=too-many-instance-attributes
'''@brief Avahi Server proxy. Set up the D-Bus connection to the Avahi
daemon and register to be notified when services of a certain
type (stype) are discovered or lost.
'''
DBUS_NAME = 'org.freedesktop.Avahi'
DBUS_INTERFACE_SERVICE_BROWSER = DBUS_NAME + '.ServiceBrowser'
DBUS_INTERFACE_SERVICE_RESOLVER = DBUS_NAME + '.ServiceResolver'
LOOKUP_USE_MULTICAST = 2
IF_UNSPEC = -1
PROTO_INET = 0
PROTO_INET6 = 1
PROTO_UNSPEC = -1
LOOKUP_RESULT_LOCAL = 8 # This record/service resides on and was announced by the local host
LOOKUP_RESULT_CACHED = 1 # This response originates from the cache
LOOKUP_RESULT_STATIC = 32 # The returned data has been defined statically by some configuration option
LOOKUP_RESULT_OUR_OWN = 16 # This service belongs to the same local client as the browser object
LOOKUP_RESULT_WIDE_AREA = 2 # This response originates from wide area DNS
LOOKUP_RESULT_MULTICAST = 4 # This response originates from multicast DNS
result_flags = {
LOOKUP_RESULT_LOCAL: 'local',
LOOKUP_RESULT_CACHED: 'cache',
LOOKUP_RESULT_STATIC: 'static',
LOOKUP_RESULT_OUR_OWN: 'own',
LOOKUP_RESULT_WIDE_AREA: 'wan',
LOOKUP_RESULT_MULTICAST: 'mcast',
}
protos = {PROTO_INET: 'IPv4', PROTO_INET6: 'IPv6', PROTO_UNSPEC: 'uspecified'}
@classmethod
def result_flags_as_string(cls, flags):
'''Convert flags to human-readable string'''
return '+'.join((value for flag, value in Avahi.result_flags.items() if (flags & flag) != 0))
@classmethod
def protocol_as_string(cls, proto):
'''Convert protocol codes to human-readable strings'''
return Avahi.protos.get(proto, 'unknown')
# ==========================================================================
def __init__(self, sysbus, change_cb):
self._change_cb = change_cb
self._services = dict()
self._sysbus = sysbus
self._stypes = set()
self._service_browsers = dict()
# Avahi is an on-demand service. If, for some reason, the avahi-daemon
# were to stop, we need to try to contact it for it to restart. For
# example, when installing the avahi-daemon package on a running system,
# the daemon doesn't get started right away. It needs another process to
# access it over D-Bus to wake it up. The following timer is used to
# periodically query the avahi-daemon until we successfully establish
# first contact.
self._kick_avahi_tmr = gutil.GTimer(60, self._on_kick_avahi)
# Subscribe for Avahi signals (i.e. events). This must be done before
# any Browser or Resolver is created to avoid race conditions and
# missed events.
self._subscriptions = [
self._sysbus.connection.signal_subscribe(
Avahi.DBUS_NAME,
Avahi.DBUS_INTERFACE_SERVICE_BROWSER,
'ItemNew',
None,
None,
0,
self._service_discovered,
),
self._sysbus.connection.signal_subscribe(
Avahi.DBUS_NAME,
Avahi.DBUS_INTERFACE_SERVICE_BROWSER,
'ItemRemove',
None,
None,
0,
self._service_removed,
),
self._sysbus.connection.signal_subscribe(
Avahi.DBUS_NAME, Avahi.DBUS_INTERFACE_SERVICE_BROWSER, 'Failure', None, None, 0, self._failure_handler
),
self._sysbus.connection.signal_subscribe(
Avahi.DBUS_NAME, Avahi.DBUS_INTERFACE_SERVICE_RESOLVER, 'Found', None, None, 0, self._service_identified
),
self._sysbus.connection.signal_subscribe(
Avahi.DBUS_NAME, Avahi.DBUS_INTERFACE_SERVICE_RESOLVER, 'Failure', None, None, 0, self._failure_handler
),
]
self._avahi = self._sysbus.get_proxy(Avahi.DBUS_NAME, '/')
self._avahi_watcher = dasbus.client.observer.DBusObserver(self._sysbus, Avahi.DBUS_NAME)
self._avahi_watcher.service_available.connect(self._avahi_available)
self._avahi_watcher.service_unavailable.connect(self._avahi_unavailable)
self._avahi_watcher.connect_once_available()
def kill(self):
'''@brief Clean up object'''
logging.debug('Avahi.kill()')
self._kick_avahi_tmr.kill()
self._kick_avahi_tmr = None
for subscription in self._subscriptions:
self._sysbus.connection.signal_unsubscribe(subscription)
self._subscriptions = list()
self._disconnect()
self._avahi_watcher.service_available.disconnect()
self._avahi_watcher.service_unavailable.disconnect()
self._avahi_watcher.disconnect()
self._avahi_watcher = None
dasbus.client.proxy.disconnect_proxy(self._avahi)
self._avahi = None
self._change_cb = None
self._sysbus = None
def info(self) -> dict:
'''@brief return debug info about this object'''
services = dict()
for service, obj in self._services.items():
interface, protocol, name, stype, domain = service
key = f'({socket.if_indextoname(interface)}, {Avahi.protos.get(protocol, "unknown")}, {name}.{domain}, {stype})'
services[key] = obj.get('data', {})
info = {
'avahi wake up timer': str(self._kick_avahi_tmr),
'service types': list(self._stypes),
'services': services,
}
return info
def get_controllers(self) -> list:
'''@brief Get the discovery controllers as a list of dict()
as follows:
[
{
'transport': tcp,
'traddr': str(),
'trsvcid': str(),
'host-iface': str(),
'subsysnqn': 'nqn.2014-08.org.nvmexpress.discovery',
},
{
'transport': tcp,
'traddr': str(),
'trsvcid': str(),
'host-iface': str(),
'subsysnqn': 'nqn.2014-08.org.nvmexpress.discovery',
},
[...]
]
'''
return [service['data'] for service in self._services.values() if len(service['data'])]
def config_stypes(self, stypes: list):
'''@brief Configure the service types that we want to discover.
@param stypes: A list of services types, e.g. ['_nvme-disc._tcp']
'''
self._stypes = set(stypes)
success = self._configure_browsers()
if not success:
self._kick_avahi_tmr.start()
def kick_start(self):
'''@brief We use this to kick start the Avahi
daemon (i.e. socket activation).
'''
self._kick_avahi_tmr.clear()
def _disconnect(self):
logging.debug('Avahi._disconnect()')
for service in self._services.values():
resolver = service.pop('resolver', None)
if resolver is not None:
try:
resolver.Free()
dasbus.client.proxy.disconnect_proxy(resolver)
except (AttributeError, dasbus.error.DBusError) as ex:
logging.debug('Avahi._disconnect() - Failed to Free() resolver. %s', ex)
self._services = dict()
for browser in self._service_browsers.values():
try:
browser.Free()
dasbus.client.proxy.disconnect_proxy(browser)
except (AttributeError, dasbus.error.DBusError) as ex:
logging.debug('Avahi._disconnect() - Failed to Free() browser. %s', ex)
self._service_browsers = dict()
def _on_kick_avahi(self):
try:
# try to contact avahi-daemon. This is just a wake
# up call in case the avahi-daemon was sleeping.
self._avahi.GetVersionString()
except dasbus.error.DBusError:
return GLib.SOURCE_CONTINUE
return GLib.SOURCE_REMOVE
def _avahi_available(self, _avahi_watcher):
'''@brief Hook up DBus signal handlers for signals from stafd.'''
logging.info('avahi-daemon service available, zeroconf supported.')
success = self._configure_browsers()
if not success:
self._kick_avahi_tmr.start()
def _avahi_unavailable(self, _avahi_watcher):
self._disconnect()
logging.warning('avahi-daemon not available, zeroconf not supported.')
self._kick_avahi_tmr.start()
def _configure_browsers(self):
stypes_cur = set(self._service_browsers.keys())
stypes_to_add = self._stypes - stypes_cur
stypes_to_rm = stypes_cur - self._stypes
logging.debug('Avahi._configure_browsers() - stypes_to_rm = %s', list(stypes_to_rm))
logging.debug('Avahi._configure_browsers() - stypes_to_add = %s', list(stypes_to_add))
for stype_to_rm in stypes_to_rm:
browser = self._service_browsers.pop(stype_to_rm, None)
if browser is not None:
try:
browser.Free()
dasbus.client.proxy.disconnect_proxy(browser)
except (AttributeError, dasbus.error.DBusError) as ex:
logging.debug('Avahi._configure_browsers() - Failed to Free() browser. %s', ex)
# Find the cached services corresponding to stype_to_rm and remove them
services_to_rm = [service for service in self._services if service[3] == stype_to_rm]
for service in services_to_rm:
resolver = self._services.pop(service, {}).pop('resolver', None)
if resolver is not None:
try:
resolver.Free()
dasbus.client.proxy.disconnect_proxy(resolver)
except (AttributeError, dasbus.error.DBusError) as ex:
logging.debug('Avahi._configure_browsers() - Failed to Free() resolver. %s', ex)
for stype in stypes_to_add:
try:
obj_path = self._avahi.ServiceBrowserNew(
Avahi.IF_UNSPEC, Avahi.PROTO_UNSPEC, stype, 'local', Avahi.LOOKUP_USE_MULTICAST
)
self._service_browsers[stype] = self._sysbus.get_proxy(Avahi.DBUS_NAME, obj_path)
except dasbus.error.DBusError as ex:
logging.debug('Avahi._configure_browsers() - Failed to contact avahi-daemon. %s', ex)
logging.warning('avahi-daemon not available, operating w/o mDNS discovery.')
return False
return True
def _service_discovered(
self,
_connection,
_sender_name: str,
_object_path: str,
_interface_name: str,
_signal_name: str,
args: typing.Tuple[int, int, str, str, str, int],
*_user_data,
):
(interface, protocol, name, stype, domain, flags) = args
logging.debug(
'Avahi._service_discovered() - interface=%s (%s), protocol=%s, stype=%s, domain=%s, flags=%s %-14s name=%s',
interface,
socket.if_indextoname(interface),
Avahi.protocol_as_string(protocol),
stype,
domain,
flags,
'(' + Avahi.result_flags_as_string(flags) + '),',
name,
)
service = (interface, protocol, name, stype, domain)
if service not in self._services:
try:
obj_path = self._avahi.ServiceResolverNew(
interface, protocol, name, stype, domain, Avahi.PROTO_UNSPEC, Avahi.LOOKUP_USE_MULTICAST
)
self._services[service] = {
'resolver': self._sysbus.get_proxy(Avahi.DBUS_NAME, obj_path),
'data': {},
}
except dasbus.error.DBusError as ex:
logging.warning('Failed to create resolver: "%s", "%s", "%s". %s', interface, name, stype, ex)
def _service_removed(
self,
_connection,
_sender_name: str,
_object_path: str,
_interface_name: str,
_signal_name: str,
args: typing.Tuple[int, int, str, str, str, int],
*_user_data,
):
(interface, protocol, name, stype, domain, flags) = args
logging.debug(
'Avahi._service_removed() - interface=%s (%s), protocol=%s, stype=%s, domain=%s, flags=%s %-14s name=%s',
interface,
socket.if_indextoname(interface),
Avahi.protocol_as_string(protocol),
stype,
domain,
flags,
'(' + Avahi.result_flags_as_string(flags) + '),',
name,
)
service = (interface, protocol, name, stype, domain)
resolver = self._services.pop(service, {}).pop('resolver', None)
if resolver is not None:
try:
resolver.Free()
dasbus.client.proxy.disconnect_proxy(resolver)
except (AttributeError, dasbus.error.DBusError) as ex:
logging.debug('Avahi._service_removed() - Failed to Free() resolver. %s', ex)
self._change_cb()
def _service_identified( # pylint: disable=too-many-locals
self,
_connection,
_sender_name: str,
_object_path: str,
_interface_name: str,
_signal_name: str,
args: typing.Tuple[int, int, str, str, str, str, int, str, int, list, int],
*_user_data,
):
(interface, protocol, name, stype, domain, host, aprotocol, address, port, txt, flags) = args
txt = _txt2dict(txt)
logging.debug(
'Avahi._service_identified() - interface=%s (%s), protocol=%s, stype=%s, domain=%s, flags=%s %-14s name=%s, host=%s, aprotocol=%s, address=%s, port=%s, txt=%s',
interface,
socket.if_indextoname(interface),
Avahi.protocol_as_string(protocol),
stype,
domain,
flags,
'(' + Avahi.result_flags_as_string(flags) + '),',
name,
host,
Avahi.protocol_as_string(aprotocol),
address,
port,
txt,
)
service = (interface, protocol, name, stype, domain)
if service in self._services:
transport = _proto2trans(txt.get('p'))
if transport is not None:
self._services[service]['data'] = {
'transport': transport,
'traddr': address.strip(),
'trsvcid': str(port).strip(),
# host-iface permitted for tcp alone and not rdma
'host-iface': socket.if_indextoname(interface).strip() if transport == 'tcp' else '',
'subsysnqn': txt.get('nqn', defs.WELL_KNOWN_DISC_NQN).strip()
if conf.NvmeOptions().discovery_supp
else defs.WELL_KNOWN_DISC_NQN,
}
self._change_cb()
else:
logging.error(
'Received invalid/undefined protocol in mDNS TXT field: address=%s, iface=%s, TXT=%s',
address,
socket.if_indextoname(interface).strip(),
txt,
)
def _failure_handler( # pylint: disable=no-self-use
self,
_connection,
_sender_name: str,
_object_path: str,
interface_name: str,
_signal_name: str,
args: typing.Tuple[str],
*_user_data,
):
(error,) = args
if 'ServiceResolver' not in interface_name or 'TimeoutError' not in error:
# ServiceResolver may fire a timeout event after being Free'd(). This seems to be normal.
logging.error('Avahi._failure_handler() - name=%s, error=%s', interface_name, error)