1
0
Fork 0

Merging upstream version 2.3~rc4.

Signed-off-by: Daniel Baumann <daniel@debian.org>
This commit is contained in:
Daniel Baumann 2025-02-16 12:55:12 +01:00
parent 0bb8b81ada
commit 3cb8664662
Signed by: daniel
GPG key ID: FBB4F0E80A80222F
16 changed files with 708 additions and 276 deletions

View file

@ -131,66 +131,22 @@ def mac2iface(mac: str): # pylint: disable=too-many-locals
# ******************************************************************************
def _data_matches_ip(data_family, data, ip):
if data_family == socket.AF_INET:
try:
other_ip = ipaddress.IPv4Address(data)
except ValueError:
return False
if ip.version == 6:
ip = ip.ipv4_mapped
elif data_family == socket.AF_INET6:
try:
other_ip = ipaddress.IPv6Address(data)
except ValueError:
return False
if ip.version == 4:
other_ip = other_ip.ipv4_mapped
else:
def ip_equal(ip1, ip2):
'''Check whther two IP addresses are equal.
@param ip1: IPv4Address or IPv6Address object
@param ip2: IPv4Address or IPv6Address object
'''
if not isinstance(ip1, ipaddress._BaseAddress): # pylint: disable=protected-access
return False
if not isinstance(ip2, ipaddress._BaseAddress): # pylint: disable=protected-access
return False
return other_ip == ip
if ip1.version == 4 and ip2.version == 6:
ip2 = ip2.ipv4_mapped
elif ip1.version == 6 and ip2.version == 4:
ip1 = ip1.ipv4_mapped
def _iface_of(src_addr): # pylint: disable=too-many-locals
'''@brief Find the interface that has src_addr as one of its assigned IP addresses.
@param src_addr: The IP address to match
@type src_addr: Instance of ipaddress.IPv4Address or ipaddress.IPv6Address
'''
with socket.socket(socket.AF_NETLINK, socket.SOCK_RAW) as sock:
sock.sendall(GETADDRCMD)
nlmsg = sock.recv(8192)
nlmsg_idx = 0
while True:
if nlmsg_idx >= len(nlmsg):
nlmsg += sock.recv(8192)
nlmsghdr = nlmsg[nlmsg_idx : nlmsg_idx + NLMSG_HDRLEN]
nlmsg_len, nlmsg_type, _, _, _ = struct.unpack('<LHHLL', nlmsghdr)
if nlmsg_type == NLMSG_DONE:
break
if nlmsg_type == RTM_NEWADDR:
msg_indx = nlmsg_idx + NLMSG_HDRLEN
msg = nlmsg[msg_indx : msg_indx + IFADDRMSG_SZ] # ifaddrmsg
ifa_family, _, _, _, ifa_index = struct.unpack('<BBBBL', msg)
rtattr_indx = msg_indx + IFADDRMSG_SZ
while rtattr_indx < (nlmsg_idx + nlmsg_len):
rtattr = nlmsg[rtattr_indx : rtattr_indx + RTATTR_SZ]
rta_len, rta_type = struct.unpack('<HH', rtattr)
if rta_type == IFLA_ADDRESS:
data = nlmsg[rtattr_indx + RTATTR_SZ : rtattr_indx + rta_len]
if _data_matches_ip(ifa_family, data, src_addr):
return socket.if_indextoname(ifa_index)
rta_len = RTA_ALIGN(rta_len) # Round up to multiple of 4
rtattr_indx += rta_len # Move to next rtattr
nlmsg_idx += nlmsg_len # Move to next Netlink message
return ''
return ip1 == ip2
# ******************************************************************************
@ -221,21 +177,21 @@ def net_if_addrs(): # pylint: disable=too-many-locals
source address.
@example: {
'wlp0s20f3': {
4: ['10.0.0.28'],
4: [IPv4Address('10.0.0.28')],
6: [
'fd5e:9a9e:c5bd:0:5509:890c:1848:3843',
'fd5e:9a9e:c5bd:0:1fd5:e527:8df7:7912',
'2605:59c8:6128:fb00:c083:1b8:c467:81d2',
'2605:59c8:6128:fb00:e99d:1a02:38e0:ad52',
'fe80::d71b:e807:d5ee:7614'
IPv6Address('fd5e:9a9e:c5bd:0:5509:890c:1848:3843'),
IPv6Address('fd5e:9a9e:c5bd:0:1fd5:e527:8df7:7912'),
IPv6Address('2605:59c8:6128:fb00:c083:1b8:c467:81d2'),
IPv6Address('2605:59c8:6128:fb00:e99d:1a02:38e0:ad52'),
IPv6Address('fe80::d71b:e807:d5ee:7614'),
],
},
'lo': {
4: ['127.0.0.1'],
6: ['::1'],
4: [IPv4Address('127.0.0.1')],
6: [IPv6Address('::1')],
},
'docker0': {
4: ['172.17.0.1'],
4: [IPv4Address('172.17.0.1')],
6: []
},
}
@ -295,14 +251,18 @@ def net_if_addrs(): # pylint: disable=too-many-locals
# ******************************************************************************
def get_interface(src_addr):
def get_interface(ifaces: dict, src_addr):
'''Get interface for given source address
@param src_addr: The source address
@type src_addr: str
@param ifaces: Interface info previously returned by @net_if_addrs()
@param src_addr: IPv4Address or IPv6Address object
'''
if not src_addr:
if not isinstance(src_addr, ipaddress._BaseAddress): # pylint: disable=protected-access
return ''
src_addr = src_addr.split('%')[0] # remove scope-id (if any)
src_addr = get_ipaddress_obj(src_addr)
return '' if src_addr is None else _iface_of(src_addr)
for iface, addr_map in ifaces.items():
for addrs in addr_map.values():
for addr in addrs:
if ip_equal(src_addr, addr):
return iface
return ''