1
0
Fork 0
nvme-stas/staslib/gutil.py
Daniel Baumann a8f39c03aa
Merging upstream version 2.3.1:
- properly handles big-endian data in `iputils.py` (Closes: #1057031).

Signed-off-by: Daniel Baumann <daniel@debian.org>
2025-02-16 12:56:36 +01:00

520 lines
20 KiB
Python

# Copyright (c) 2022, Dell Inc. or its subsidiaries. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# See the LICENSE file for details.
#
# This file is part of NVMe STorage Appliance Services (nvme-stas).
#
# Authors: Martin Belanger <Martin.Belanger@dell.com>
#
'''This module provides utility functions (or classes) that simplify
the use of certain GLib/Gio/Gobject functions/resources.
'''
import logging
import socket
from gi.repository import Gio, GLib, GObject
from staslib import conf, iputil, trid
# ******************************************************************************
class GTimer:
'''@brief Convenience class to wrap GLib timers'''
def __init__(
self, interval_sec: float = 0, user_cback=lambda: GLib.SOURCE_REMOVE, *user_data, priority=GLib.PRIORITY_DEFAULT
): # pylint: disable=keyword-arg-before-vararg
self._source = None
self._interval_sec = float(interval_sec)
self._user_cback = user_cback
self._user_data = user_data
self._priority = priority if priority is not None else GLib.PRIORITY_DEFAULT
def _release_resources(self):
self.stop()
self._user_cback = None
self._user_data = None
def kill(self):
'''@brief Used to release all resources associated with a timer.'''
self._release_resources()
def __str__(self):
if self._source is not None:
return f'{self._interval_sec}s [{self.time_remaining()}s]'
return f'{self._interval_sec}s [off]'
def _callback(self, *_):
retval = self._user_cback(*self._user_data)
if retval == GLib.SOURCE_REMOVE:
self._source = None
return retval
def stop(self):
'''@brief Stop timer'''
if self._source is not None:
self._source.destroy()
self._source = None
def start(self, new_interval_sec: float = -1.0):
'''@brief Start (or restart) timer'''
if new_interval_sec >= 0:
self._interval_sec = float(new_interval_sec)
if self._source is not None:
self._source.set_ready_time(
self._source.get_time() + (self._interval_sec * 1000000)
) # ready time is in micro-seconds (monotonic time)
else:
if self._interval_sec.is_integer():
self._source = GLib.timeout_source_new_seconds(int(self._interval_sec)) # seconds resolution
else:
self._source = GLib.timeout_source_new(self._interval_sec * 1000.0) # mili-seconds resolution
self._source.set_priority(self._priority)
self._source.set_callback(self._callback)
self._source.attach()
def clear(self):
'''@brief Make timer expire now. The callback function
will be invoked immediately by the main loop.
'''
if self._source is not None:
self._source.set_ready_time(0) # Expire now!
def set_callback(self, user_cback, *user_data):
'''@brief set the callback function to invoke when timer expires'''
self._user_cback = user_cback
self._user_data = user_data
def set_timeout(self, new_interval_sec: float):
'''@brief set the timer's duration'''
if new_interval_sec >= 0:
self._interval_sec = float(new_interval_sec)
def get_timeout(self):
'''@brief get the timer's duration'''
return self._interval_sec
def time_remaining(self) -> float:
'''@brief Get how much time remains on a timer before it fires.'''
if self._source is not None:
delta_us = self._source.get_ready_time() - self._source.get_time() # monotonic time in micro-seconds
if delta_us > 0:
return delta_us / 1000000.0
return 0
# ******************************************************************************
class NameResolver: # pylint: disable=too-few-public-methods
'''@brief DNS resolver to convert host names to IP addresses.'''
def __init__(self):
self._resolver = Gio.Resolver.get_default()
def resolve_ctrl_async(self, cancellable, controllers_in: list, callback):
'''@brief The traddr fields may specify a hostname instead of an IP
address. We need to resolve all the host names to addresses.
Resolving hostnames may take a while as a DNS server may need
to be contacted. For that reason, we're using async APIs with
callbacks to resolve all the hostnames.
The callback @callback will be called once all hostnames have
been resolved.
@param controllers: List of trid.TID
'''
pending_resolution_count = 0
controllers_out = []
service_conf = conf.SvcConf()
def addr_resolved(resolver, result, controller):
try:
addresses = resolver.lookup_by_name_finish(result) # List of Gio.InetAddress objects
except GLib.GError as err:
# We don't need to report "cancellation" errors.
if err.matches(Gio.io_error_quark(), Gio.IOErrorEnum.CANCELLED):
# pylint: disable=no-member
logging.debug('NameResolver.resolve_ctrl_async() - %s %s', err.message, controller)
else:
logging.error('%s', err.message) # pylint: disable=no-member
# if err.matches(Gio.resolver_error_quark(), Gio.ResolverError.TEMPORARY_FAILURE):
# elif err.matches(Gio.resolver_error_quark(), Gio.ResolverError.NOT_FOUND):
# elif err.matches(Gio.resolver_error_quark(), Gio.ResolverError.INTERNAL):
else:
traddr = None
# If multiple addresses are returned (which is often the case),
# prefer IPv4 addresses over IPv6.
if 4 in service_conf.ip_family:
for address in addresses:
# There may be multiple IPv4 addresses. Pick 1st one.
if address.get_family() == Gio.SocketFamily.IPV4:
traddr = address.to_string()
break
if traddr is None and 6 in service_conf.ip_family:
for address in addresses:
# There may be multiple IPv6 addresses. Pick 1st one.
if address.get_family() == Gio.SocketFamily.IPV6:
traddr = address.to_string()
break
if traddr is not None:
logging.debug(
'NameResolver.resolve_ctrl_async() - resolved \'%s\' -> %s', controller.traddr, traddr
)
cid = controller.as_dict()
cid['traddr'] = traddr
nonlocal controllers_out
controllers_out.append(trid.TID(cid))
# Invoke callback after all hostnames have been resolved
nonlocal pending_resolution_count
pending_resolution_count -= 1
if pending_resolution_count == 0:
callback(controllers_out)
for controller in controllers_in:
if controller.transport in ('tcp', 'rdma'):
hostname_or_addr = controller.traddr
if not hostname_or_addr:
logging.error('Invalid traddr: %s', controller)
else:
# Try to convert to an ipaddress object. If this
# succeeds, then we don't need to call the resolver.
ip = iputil.get_ipaddress_obj(hostname_or_addr)
if ip is None:
logging.debug('NameResolver.resolve_ctrl_async() - resolving \'%s\'', hostname_or_addr)
pending_resolution_count += 1
self._resolver.lookup_by_name_async(hostname_or_addr, cancellable, addr_resolved, controller)
elif ip.version in service_conf.ip_family:
controllers_out.append(controller)
else:
logging.warning(
'Excluding configured IP address %s based on "ip-family" setting', hostname_or_addr
)
else:
controllers_out.append(controller)
if pending_resolution_count == 0: # No names are pending asynchronous resolution
callback(controllers_out)
# ******************************************************************************
class _TaskRunner(GObject.Object):
'''@brief This class allows running methods asynchronously in a thread.'''
def __init__(self, user_function, *user_args):
'''@param user_function: function to run inside a thread
@param user_args: arguments passed to @user_function
'''
super().__init__()
self._user_function = user_function
self._user_args = user_args
def communicate(self, cancellable, cb_function, *cb_args):
'''@param cancellable: A Gio.Cancellable object that can be used to
cancel an in-flight async command.
@param cb_function: User callback function to call when the async
command has completed. The callback function
will be passed these arguments:
(runner, result, *cb_args)
Where:
runner: This _TaskRunner object instance
result: A GObject.Object instance that contains the result
cb_args: The cb_args arguments passed to communicate()
@param cb_args: User arguments to pass to @cb_function
'''
def in_thread_exec(task, self, task_data, cancellable): # pylint: disable=unused-argument
if task.return_error_if_cancelled():
return # Bail out if task has been cancelled
try:
value = GObject.Object()
value.result = self._user_function(*self._user_args)
task.return_value(value)
except Exception as ex: # pylint: disable=broad-except
task.return_error(GLib.Error(message=str(ex), domain=type(ex).__name__))
task = Gio.Task.new(self, cancellable, cb_function, *cb_args)
task.set_return_on_cancel(False)
task.run_in_thread(in_thread_exec)
return task
def communicate_finish(self, result):
'''@brief Use this function in your callback (see @cb_function) to
extract data from the result object.
@return On success (True, data, None),
On failure (False, None, err: GLib.Error)
'''
try:
success, value = result.propagate_value()
return success, value.result, None
except GLib.Error as err:
return False, None, err
# ******************************************************************************
class AsyncTask: # pylint: disable=too-many-instance-attributes
'''Object used to manage an asynchronous GLib operation. The operation
can be cancelled or retried.
'''
def __init__(self, on_success_callback, on_failure_callback, operation, *op_args):
'''@param on_success_callback: Callback method invoked when @operation completes successfully
@param on_failure_callback: Callback method invoked when @operation fails
@param operation: Operation (i.e. a function) to execute asynchronously
@param op_args: Arguments passed to operation
'''
self._cancellable = Gio.Cancellable()
self._operation = operation
self._op_args = op_args
self._success_cb = on_success_callback
self._fail_cb = on_failure_callback
self._retry_tmr = None
self._errmsg = None
self._task = None
self._fail_cnt = 0
def _release_resources(self):
if self._alive():
self._cancellable.cancel()
if self._retry_tmr is not None:
self._retry_tmr.kill()
self._operation = None
self._op_args = None
self._success_cb = None
self._fail_cb = None
self._retry_tmr = None
self._errmsg = None
self._task = None
self._fail_cnt = None
self._cancellable = None
def __str__(self):
return str(self.as_dict())
def as_dict(self):
'''Return object members as a dictionary'''
info = {
'fail count': self._fail_cnt,
'completed': self._task.get_completed() if self._task else None,
'alive': self._alive(),
}
if self._retry_tmr:
info['retry timer'] = str(self._retry_tmr)
if self._errmsg:
info['error'] = self._errmsg
return info
def _alive(self):
return self._cancellable and not self._cancellable.is_cancelled()
def completed(self):
'''@brief Returns True if the task has completed, False otherwise.'''
return self._task is not None and self._task.get_completed()
def cancel(self):
'''@brief cancel async operation'''
if self._alive():
self._cancellable.cancel()
def kill(self):
'''@brief kill and clean up this object'''
self._release_resources()
def run_async(self, *args):
'''@brief
Method used to initiate an asynchronous operation with the
Controller. When the operation completes (or fails) the
callback method @_on_operation_complete() will be invoked.
'''
runner = _TaskRunner(self._operation, *self._op_args)
self._task = runner.communicate(self._cancellable, self._on_operation_complete, *args)
def retry(self, interval_sec, *args):
'''@brief Tell this object that the async operation is to be retried
in @interval_sec seconds.
'''
if self._retry_tmr is None:
self._retry_tmr = GTimer()
self._retry_tmr.set_callback(self._on_retry_timeout, *args)
self._retry_tmr.start(interval_sec)
def _on_retry_timeout(self, *args):
'''@brief
When an operation fails, the application has the option to
retry at a later time by calling the retry() method. The
retry() method starts a timer at the end of which the operation
will be executed again. This is the method that is called when
the timer expires.
'''
if self._alive():
self.run_async(*args)
return GLib.SOURCE_REMOVE
def _on_operation_complete(self, runner, result, *args):
'''@brief
This callback method is invoked when the operation with the
Controller has completed (be it successful or not).
'''
# The operation might have been cancelled.
# Only proceed if it hasn't been cancelled.
if self._operation is None or not self._alive():
return
success, data, err = runner.communicate_finish(result)
if success:
self._errmsg = None
self._fail_cnt = 0
self._success_cb(self, data, *args)
else:
self._errmsg = str(err)
self._fail_cnt += 1
self._fail_cb(self, err, self._fail_cnt, *args)
# ******************************************************************************
class Deferred:
'''Implement a deferred function call. A deferred is a function that gets
added to the main loop to be executed during the next idle slot.'''
def __init__(self, func, *user_data):
self._source = None
self._func = func
self._user_data = user_data
def schedule(self):
'''Schedule the function to be called by the main loop. If the
function is already scheduled, then do nothing'''
if not self.is_scheduled():
srce_id = GLib.idle_add(self._func, *self._user_data)
self._source = GLib.main_context_default().find_source_by_id(srce_id)
def is_scheduled(self):
'''Check if deferred is currently schedules to run'''
return self._source and not self._source.is_destroyed()
def cancel(self):
'''Remove deferred from main loop'''
if self.is_scheduled():
self._source.destroy()
self._source = None
# ******************************************************************************
class TcpChecker: # pylint: disable=too-many-instance-attributes
'''@brief Verify that a TCP connection can be established with an enpoint'''
def __init__(
self, traddr, trsvcid, host_iface, verbose, user_cback, *user_data
): # pylint: disable=too-many-arguments
self._user_cback = user_cback
self._host_iface = host_iface
self._user_data = user_data
self._trsvcid = trsvcid
self._traddr = iputil.get_ipaddress_obj(traddr, ipv4_mapped_convert=True)
self._cancellable = None
self._gio_sock = None
self._native_sock = None
self._verbose = verbose
def connect(self):
'''Attempt to connect'''
self.close()
# Gio has limited setsockopt() capabilities. To set SO_BINDTODEVICE
# we need to use a generic socket.socket() and then convert to a
# Gio.Socket() object to perform async connect operation within
# the GLib context.
family = socket.AF_INET if self._traddr.version == 4 else socket.AF_INET6
self._native_sock = socket.socket(family, socket.SOCK_STREAM | socket.SOCK_NONBLOCK, socket.IPPROTO_TCP)
if self._host_iface and isinstance(self._host_iface, str):
self._native_sock.setsockopt(socket.SOL_SOCKET, socket.SO_BINDTODEVICE, self._host_iface.encode('utf-8'))
# Convert socket.socket() to a Gio.Socket() object
try:
self._gio_sock = Gio.Socket.new_from_fd(self._native_sock.fileno()) # returns None on error
except GLib.Error as err:
logging.error('Cannot create socket: %s', err.message) # pylint: disable=no-member
self._gio_sock = None
if self._gio_sock is None:
self._native_sock.close()
raise RuntimeError(f'Unable to connect to {self._traddr}, {self._trsvcid}, {self._host_iface}')
g_addr = Gio.InetSocketAddress.new_from_string(self._traddr.compressed, int(self._trsvcid))
self._cancellable = Gio.Cancellable()
g_sockconn = self._gio_sock.connection_factory_create_connection()
g_sockconn.connect_async(g_addr, self._cancellable, self._connect_async_cback)
def close(self):
'''Terminate/Cancel current connection attempt and free resources'''
if self._cancellable is not None:
self._cancellable.cancel()
self._cancellable = None
if self._gio_sock is not None:
try:
self._gio_sock.close()
except GLib.Error as err:
logging.debug('TcpChecker.close() gio_sock.close - %s', err.message) # pylint: disable=no-member
self._gio_sock = None
if self._native_sock is not None:
try:
# This is expected to fail because the socket
# is already closed by self._gio_sock.close() above.
# This code is just for completeness.
self._native_sock.close()
except OSError:
pass
self._native_sock = None
def _connect_async_cback(self, source_object, result):
'''
@param source_object: The Gio.SocketConnection object used to
invoke the connect_async() API.
'''
try:
connected = source_object.connect_finish(result)
except GLib.Error as err:
connected = False
# We don't need to report "cancellation" errors.
if err.matches(Gio.io_error_quark(), Gio.IOErrorEnum.CANCELLED):
logging.debug('TcpChecker._connect_async_cback() - %s', err.message) # pylint: disable=no-member
else:
if self._verbose:
logging.info(
'Unable to verify TCP connectivity - (%-10s %-14s %s): %s',
self._host_iface + ',',
self._traddr.compressed + ',',
self._trsvcid,
err.message, # pylint: disable=no-member
)
self.close()
if self._user_cback is not None:
self._user_cback(connected, *self._user_data)