- properly handles big-endian data in `iputils.py` (Closes: #1057031). Signed-off-by: Daniel Baumann <daniel@debian.org>
520 lines
20 KiB
Python
520 lines
20 KiB
Python
# Copyright (c) 2022, Dell Inc. or its subsidiaries. All rights reserved.
|
|
# SPDX-License-Identifier: Apache-2.0
|
|
# See the LICENSE file for details.
|
|
#
|
|
# This file is part of NVMe STorage Appliance Services (nvme-stas).
|
|
#
|
|
# Authors: Martin Belanger <Martin.Belanger@dell.com>
|
|
#
|
|
'''This module provides utility functions (or classes) that simplify
|
|
the use of certain GLib/Gio/Gobject functions/resources.
|
|
'''
|
|
|
|
import logging
|
|
import socket
|
|
from gi.repository import Gio, GLib, GObject
|
|
from staslib import conf, iputil, trid
|
|
|
|
|
|
# ******************************************************************************
|
|
class GTimer:
|
|
'''@brief Convenience class to wrap GLib timers'''
|
|
|
|
def __init__(
|
|
self, interval_sec: float = 0, user_cback=lambda: GLib.SOURCE_REMOVE, *user_data, priority=GLib.PRIORITY_DEFAULT
|
|
): # pylint: disable=keyword-arg-before-vararg
|
|
self._source = None
|
|
self._interval_sec = float(interval_sec)
|
|
self._user_cback = user_cback
|
|
self._user_data = user_data
|
|
self._priority = priority if priority is not None else GLib.PRIORITY_DEFAULT
|
|
|
|
def _release_resources(self):
|
|
self.stop()
|
|
self._user_cback = None
|
|
self._user_data = None
|
|
|
|
def kill(self):
|
|
'''@brief Used to release all resources associated with a timer.'''
|
|
self._release_resources()
|
|
|
|
def __str__(self):
|
|
if self._source is not None:
|
|
return f'{self._interval_sec}s [{self.time_remaining()}s]'
|
|
|
|
return f'{self._interval_sec}s [off]'
|
|
|
|
def _callback(self, *_):
|
|
retval = self._user_cback(*self._user_data)
|
|
if retval == GLib.SOURCE_REMOVE:
|
|
self._source = None
|
|
return retval
|
|
|
|
def stop(self):
|
|
'''@brief Stop timer'''
|
|
if self._source is not None:
|
|
self._source.destroy()
|
|
self._source = None
|
|
|
|
def start(self, new_interval_sec: float = -1.0):
|
|
'''@brief Start (or restart) timer'''
|
|
if new_interval_sec >= 0:
|
|
self._interval_sec = float(new_interval_sec)
|
|
|
|
if self._source is not None:
|
|
self._source.set_ready_time(
|
|
self._source.get_time() + (self._interval_sec * 1000000)
|
|
) # ready time is in micro-seconds (monotonic time)
|
|
else:
|
|
if self._interval_sec.is_integer():
|
|
self._source = GLib.timeout_source_new_seconds(int(self._interval_sec)) # seconds resolution
|
|
else:
|
|
self._source = GLib.timeout_source_new(self._interval_sec * 1000.0) # mili-seconds resolution
|
|
|
|
self._source.set_priority(self._priority)
|
|
self._source.set_callback(self._callback)
|
|
self._source.attach()
|
|
|
|
def clear(self):
|
|
'''@brief Make timer expire now. The callback function
|
|
will be invoked immediately by the main loop.
|
|
'''
|
|
if self._source is not None:
|
|
self._source.set_ready_time(0) # Expire now!
|
|
|
|
def set_callback(self, user_cback, *user_data):
|
|
'''@brief set the callback function to invoke when timer expires'''
|
|
self._user_cback = user_cback
|
|
self._user_data = user_data
|
|
|
|
def set_timeout(self, new_interval_sec: float):
|
|
'''@brief set the timer's duration'''
|
|
if new_interval_sec >= 0:
|
|
self._interval_sec = float(new_interval_sec)
|
|
|
|
def get_timeout(self):
|
|
'''@brief get the timer's duration'''
|
|
return self._interval_sec
|
|
|
|
def time_remaining(self) -> float:
|
|
'''@brief Get how much time remains on a timer before it fires.'''
|
|
if self._source is not None:
|
|
delta_us = self._source.get_ready_time() - self._source.get_time() # monotonic time in micro-seconds
|
|
if delta_us > 0:
|
|
return delta_us / 1000000.0
|
|
|
|
return 0
|
|
|
|
|
|
# ******************************************************************************
|
|
class NameResolver: # pylint: disable=too-few-public-methods
|
|
'''@brief DNS resolver to convert host names to IP addresses.'''
|
|
|
|
def __init__(self):
|
|
self._resolver = Gio.Resolver.get_default()
|
|
|
|
def resolve_ctrl_async(self, cancellable, controllers_in: list, callback):
|
|
'''@brief The traddr fields may specify a hostname instead of an IP
|
|
address. We need to resolve all the host names to addresses.
|
|
Resolving hostnames may take a while as a DNS server may need
|
|
to be contacted. For that reason, we're using async APIs with
|
|
callbacks to resolve all the hostnames.
|
|
|
|
The callback @callback will be called once all hostnames have
|
|
been resolved.
|
|
|
|
@param controllers: List of trid.TID
|
|
'''
|
|
pending_resolution_count = 0
|
|
controllers_out = []
|
|
service_conf = conf.SvcConf()
|
|
|
|
def addr_resolved(resolver, result, controller):
|
|
try:
|
|
addresses = resolver.lookup_by_name_finish(result) # List of Gio.InetAddress objects
|
|
|
|
except GLib.GError as err:
|
|
# We don't need to report "cancellation" errors.
|
|
if err.matches(Gio.io_error_quark(), Gio.IOErrorEnum.CANCELLED):
|
|
# pylint: disable=no-member
|
|
logging.debug('NameResolver.resolve_ctrl_async() - %s %s', err.message, controller)
|
|
else:
|
|
logging.error('%s', err.message) # pylint: disable=no-member
|
|
|
|
# if err.matches(Gio.resolver_error_quark(), Gio.ResolverError.TEMPORARY_FAILURE):
|
|
# elif err.matches(Gio.resolver_error_quark(), Gio.ResolverError.NOT_FOUND):
|
|
# elif err.matches(Gio.resolver_error_quark(), Gio.ResolverError.INTERNAL):
|
|
|
|
else:
|
|
traddr = None
|
|
|
|
# If multiple addresses are returned (which is often the case),
|
|
# prefer IPv4 addresses over IPv6.
|
|
if 4 in service_conf.ip_family:
|
|
for address in addresses:
|
|
# There may be multiple IPv4 addresses. Pick 1st one.
|
|
if address.get_family() == Gio.SocketFamily.IPV4:
|
|
traddr = address.to_string()
|
|
break
|
|
|
|
if traddr is None and 6 in service_conf.ip_family:
|
|
for address in addresses:
|
|
# There may be multiple IPv6 addresses. Pick 1st one.
|
|
if address.get_family() == Gio.SocketFamily.IPV6:
|
|
traddr = address.to_string()
|
|
break
|
|
|
|
if traddr is not None:
|
|
logging.debug(
|
|
'NameResolver.resolve_ctrl_async() - resolved \'%s\' -> %s', controller.traddr, traddr
|
|
)
|
|
cid = controller.as_dict()
|
|
cid['traddr'] = traddr
|
|
nonlocal controllers_out
|
|
controllers_out.append(trid.TID(cid))
|
|
|
|
# Invoke callback after all hostnames have been resolved
|
|
nonlocal pending_resolution_count
|
|
pending_resolution_count -= 1
|
|
if pending_resolution_count == 0:
|
|
callback(controllers_out)
|
|
|
|
for controller in controllers_in:
|
|
if controller.transport in ('tcp', 'rdma'):
|
|
hostname_or_addr = controller.traddr
|
|
if not hostname_or_addr:
|
|
logging.error('Invalid traddr: %s', controller)
|
|
else:
|
|
# Try to convert to an ipaddress object. If this
|
|
# succeeds, then we don't need to call the resolver.
|
|
ip = iputil.get_ipaddress_obj(hostname_or_addr)
|
|
if ip is None:
|
|
logging.debug('NameResolver.resolve_ctrl_async() - resolving \'%s\'', hostname_or_addr)
|
|
pending_resolution_count += 1
|
|
self._resolver.lookup_by_name_async(hostname_or_addr, cancellable, addr_resolved, controller)
|
|
elif ip.version in service_conf.ip_family:
|
|
controllers_out.append(controller)
|
|
else:
|
|
logging.warning(
|
|
'Excluding configured IP address %s based on "ip-family" setting', hostname_or_addr
|
|
)
|
|
else:
|
|
controllers_out.append(controller)
|
|
|
|
if pending_resolution_count == 0: # No names are pending asynchronous resolution
|
|
callback(controllers_out)
|
|
|
|
|
|
# ******************************************************************************
|
|
class _TaskRunner(GObject.Object):
|
|
'''@brief This class allows running methods asynchronously in a thread.'''
|
|
|
|
def __init__(self, user_function, *user_args):
|
|
'''@param user_function: function to run inside a thread
|
|
@param user_args: arguments passed to @user_function
|
|
'''
|
|
super().__init__()
|
|
self._user_function = user_function
|
|
self._user_args = user_args
|
|
|
|
def communicate(self, cancellable, cb_function, *cb_args):
|
|
'''@param cancellable: A Gio.Cancellable object that can be used to
|
|
cancel an in-flight async command.
|
|
@param cb_function: User callback function to call when the async
|
|
command has completed. The callback function
|
|
will be passed these arguments:
|
|
|
|
(runner, result, *cb_args)
|
|
|
|
Where:
|
|
runner: This _TaskRunner object instance
|
|
result: A GObject.Object instance that contains the result
|
|
cb_args: The cb_args arguments passed to communicate()
|
|
|
|
@param cb_args: User arguments to pass to @cb_function
|
|
'''
|
|
|
|
def in_thread_exec(task, self, task_data, cancellable): # pylint: disable=unused-argument
|
|
if task.return_error_if_cancelled():
|
|
return # Bail out if task has been cancelled
|
|
|
|
try:
|
|
value = GObject.Object()
|
|
value.result = self._user_function(*self._user_args)
|
|
task.return_value(value)
|
|
except Exception as ex: # pylint: disable=broad-except
|
|
task.return_error(GLib.Error(message=str(ex), domain=type(ex).__name__))
|
|
|
|
task = Gio.Task.new(self, cancellable, cb_function, *cb_args)
|
|
task.set_return_on_cancel(False)
|
|
task.run_in_thread(in_thread_exec)
|
|
return task
|
|
|
|
def communicate_finish(self, result):
|
|
'''@brief Use this function in your callback (see @cb_function) to
|
|
extract data from the result object.
|
|
|
|
@return On success (True, data, None),
|
|
On failure (False, None, err: GLib.Error)
|
|
'''
|
|
try:
|
|
success, value = result.propagate_value()
|
|
return success, value.result, None
|
|
except GLib.Error as err:
|
|
return False, None, err
|
|
|
|
|
|
# ******************************************************************************
|
|
class AsyncTask: # pylint: disable=too-many-instance-attributes
|
|
'''Object used to manage an asynchronous GLib operation. The operation
|
|
can be cancelled or retried.
|
|
'''
|
|
|
|
def __init__(self, on_success_callback, on_failure_callback, operation, *op_args):
|
|
'''@param on_success_callback: Callback method invoked when @operation completes successfully
|
|
@param on_failure_callback: Callback method invoked when @operation fails
|
|
@param operation: Operation (i.e. a function) to execute asynchronously
|
|
@param op_args: Arguments passed to operation
|
|
'''
|
|
self._cancellable = Gio.Cancellable()
|
|
self._operation = operation
|
|
self._op_args = op_args
|
|
self._success_cb = on_success_callback
|
|
self._fail_cb = on_failure_callback
|
|
self._retry_tmr = None
|
|
self._errmsg = None
|
|
self._task = None
|
|
self._fail_cnt = 0
|
|
|
|
def _release_resources(self):
|
|
if self._alive():
|
|
self._cancellable.cancel()
|
|
|
|
if self._retry_tmr is not None:
|
|
self._retry_tmr.kill()
|
|
|
|
self._operation = None
|
|
self._op_args = None
|
|
self._success_cb = None
|
|
self._fail_cb = None
|
|
self._retry_tmr = None
|
|
self._errmsg = None
|
|
self._task = None
|
|
self._fail_cnt = None
|
|
self._cancellable = None
|
|
|
|
def __str__(self):
|
|
return str(self.as_dict())
|
|
|
|
def as_dict(self):
|
|
'''Return object members as a dictionary'''
|
|
info = {
|
|
'fail count': self._fail_cnt,
|
|
'completed': self._task.get_completed() if self._task else None,
|
|
'alive': self._alive(),
|
|
}
|
|
|
|
if self._retry_tmr:
|
|
info['retry timer'] = str(self._retry_tmr)
|
|
|
|
if self._errmsg:
|
|
info['error'] = self._errmsg
|
|
|
|
return info
|
|
|
|
def _alive(self):
|
|
return self._cancellable and not self._cancellable.is_cancelled()
|
|
|
|
def completed(self):
|
|
'''@brief Returns True if the task has completed, False otherwise.'''
|
|
return self._task is not None and self._task.get_completed()
|
|
|
|
def cancel(self):
|
|
'''@brief cancel async operation'''
|
|
if self._alive():
|
|
self._cancellable.cancel()
|
|
|
|
def kill(self):
|
|
'''@brief kill and clean up this object'''
|
|
self._release_resources()
|
|
|
|
def run_async(self, *args):
|
|
'''@brief
|
|
Method used to initiate an asynchronous operation with the
|
|
Controller. When the operation completes (or fails) the
|
|
callback method @_on_operation_complete() will be invoked.
|
|
'''
|
|
runner = _TaskRunner(self._operation, *self._op_args)
|
|
self._task = runner.communicate(self._cancellable, self._on_operation_complete, *args)
|
|
|
|
def retry(self, interval_sec, *args):
|
|
'''@brief Tell this object that the async operation is to be retried
|
|
in @interval_sec seconds.
|
|
|
|
'''
|
|
if self._retry_tmr is None:
|
|
self._retry_tmr = GTimer()
|
|
self._retry_tmr.set_callback(self._on_retry_timeout, *args)
|
|
self._retry_tmr.start(interval_sec)
|
|
|
|
def _on_retry_timeout(self, *args):
|
|
'''@brief
|
|
When an operation fails, the application has the option to
|
|
retry at a later time by calling the retry() method. The
|
|
retry() method starts a timer at the end of which the operation
|
|
will be executed again. This is the method that is called when
|
|
the timer expires.
|
|
'''
|
|
if self._alive():
|
|
self.run_async(*args)
|
|
return GLib.SOURCE_REMOVE
|
|
|
|
def _on_operation_complete(self, runner, result, *args):
|
|
'''@brief
|
|
This callback method is invoked when the operation with the
|
|
Controller has completed (be it successful or not).
|
|
'''
|
|
# The operation might have been cancelled.
|
|
# Only proceed if it hasn't been cancelled.
|
|
if self._operation is None or not self._alive():
|
|
return
|
|
|
|
success, data, err = runner.communicate_finish(result)
|
|
|
|
if success:
|
|
self._errmsg = None
|
|
self._fail_cnt = 0
|
|
self._success_cb(self, data, *args)
|
|
else:
|
|
self._errmsg = str(err)
|
|
self._fail_cnt += 1
|
|
self._fail_cb(self, err, self._fail_cnt, *args)
|
|
|
|
|
|
# ******************************************************************************
|
|
class Deferred:
|
|
'''Implement a deferred function call. A deferred is a function that gets
|
|
added to the main loop to be executed during the next idle slot.'''
|
|
|
|
def __init__(self, func, *user_data):
|
|
self._source = None
|
|
self._func = func
|
|
self._user_data = user_data
|
|
|
|
def schedule(self):
|
|
'''Schedule the function to be called by the main loop. If the
|
|
function is already scheduled, then do nothing'''
|
|
if not self.is_scheduled():
|
|
srce_id = GLib.idle_add(self._func, *self._user_data)
|
|
self._source = GLib.main_context_default().find_source_by_id(srce_id)
|
|
|
|
def is_scheduled(self):
|
|
'''Check if deferred is currently schedules to run'''
|
|
return self._source and not self._source.is_destroyed()
|
|
|
|
def cancel(self):
|
|
'''Remove deferred from main loop'''
|
|
if self.is_scheduled():
|
|
self._source.destroy()
|
|
self._source = None
|
|
|
|
|
|
# ******************************************************************************
|
|
class TcpChecker: # pylint: disable=too-many-instance-attributes
|
|
'''@brief Verify that a TCP connection can be established with an enpoint'''
|
|
|
|
def __init__(
|
|
self, traddr, trsvcid, host_iface, verbose, user_cback, *user_data
|
|
): # pylint: disable=too-many-arguments
|
|
self._user_cback = user_cback
|
|
self._host_iface = host_iface
|
|
self._user_data = user_data
|
|
self._trsvcid = trsvcid
|
|
self._traddr = iputil.get_ipaddress_obj(traddr, ipv4_mapped_convert=True)
|
|
self._cancellable = None
|
|
self._gio_sock = None
|
|
self._native_sock = None
|
|
self._verbose = verbose
|
|
|
|
def connect(self):
|
|
'''Attempt to connect'''
|
|
self.close()
|
|
|
|
# Gio has limited setsockopt() capabilities. To set SO_BINDTODEVICE
|
|
# we need to use a generic socket.socket() and then convert to a
|
|
# Gio.Socket() object to perform async connect operation within
|
|
# the GLib context.
|
|
family = socket.AF_INET if self._traddr.version == 4 else socket.AF_INET6
|
|
self._native_sock = socket.socket(family, socket.SOCK_STREAM | socket.SOCK_NONBLOCK, socket.IPPROTO_TCP)
|
|
if self._host_iface and isinstance(self._host_iface, str):
|
|
self._native_sock.setsockopt(socket.SOL_SOCKET, socket.SO_BINDTODEVICE, self._host_iface.encode('utf-8'))
|
|
|
|
# Convert socket.socket() to a Gio.Socket() object
|
|
try:
|
|
self._gio_sock = Gio.Socket.new_from_fd(self._native_sock.fileno()) # returns None on error
|
|
except GLib.Error as err:
|
|
logging.error('Cannot create socket: %s', err.message) # pylint: disable=no-member
|
|
self._gio_sock = None
|
|
|
|
if self._gio_sock is None:
|
|
self._native_sock.close()
|
|
raise RuntimeError(f'Unable to connect to {self._traddr}, {self._trsvcid}, {self._host_iface}')
|
|
|
|
g_addr = Gio.InetSocketAddress.new_from_string(self._traddr.compressed, int(self._trsvcid))
|
|
|
|
self._cancellable = Gio.Cancellable()
|
|
|
|
g_sockconn = self._gio_sock.connection_factory_create_connection()
|
|
g_sockconn.connect_async(g_addr, self._cancellable, self._connect_async_cback)
|
|
|
|
def close(self):
|
|
'''Terminate/Cancel current connection attempt and free resources'''
|
|
if self._cancellable is not None:
|
|
self._cancellable.cancel()
|
|
self._cancellable = None
|
|
|
|
if self._gio_sock is not None:
|
|
try:
|
|
self._gio_sock.close()
|
|
except GLib.Error as err:
|
|
logging.debug('TcpChecker.close() gio_sock.close - %s', err.message) # pylint: disable=no-member
|
|
|
|
self._gio_sock = None
|
|
|
|
if self._native_sock is not None:
|
|
try:
|
|
# This is expected to fail because the socket
|
|
# is already closed by self._gio_sock.close() above.
|
|
# This code is just for completeness.
|
|
self._native_sock.close()
|
|
except OSError:
|
|
pass
|
|
|
|
self._native_sock = None
|
|
|
|
def _connect_async_cback(self, source_object, result):
|
|
'''
|
|
@param source_object: The Gio.SocketConnection object used to
|
|
invoke the connect_async() API.
|
|
'''
|
|
try:
|
|
connected = source_object.connect_finish(result)
|
|
except GLib.Error as err:
|
|
connected = False
|
|
# We don't need to report "cancellation" errors.
|
|
if err.matches(Gio.io_error_quark(), Gio.IOErrorEnum.CANCELLED):
|
|
logging.debug('TcpChecker._connect_async_cback() - %s', err.message) # pylint: disable=no-member
|
|
else:
|
|
if self._verbose:
|
|
logging.info(
|
|
'Unable to verify TCP connectivity - (%-10s %-14s %s): %s',
|
|
self._host_iface + ',',
|
|
self._traddr.compressed + ',',
|
|
self._trsvcid,
|
|
err.message, # pylint: disable=no-member
|
|
)
|
|
|
|
self.close()
|
|
|
|
if self._user_cback is not None:
|
|
self._user_cback(connected, *self._user_data)
|