1
0
Fork 0
oarc-dsc-datatool/dsc_datatool/generator/client_subnet_authority.py
Daniel Baumann 1ea3e103a7
Adding upstream version 1.4.2.
Signed-off-by: Daniel Baumann <daniel@debian.org>
2025-04-22 13:23:23 +02:00

181 lines
6.1 KiB
Python

"""dsc_datatool.generator.client_subnet_authority
See `man dsc-datatool-generator client_subnet_authority`.
Part of dsc_datatool.
:copyright: 2024 OARC, Inc.
"""
import csv
import ipaddress
import logging
from urllib.request import Request, urlopen
from io import StringIO
from dsc_datatool import Generator, Dataset, Dimension, args, encoding
_whois2rir = {
'whois.apnic.net': 'APNIC',
'whois.arin.net': 'ARIN',
'whois.ripe.net': 'RIPE NCC',
'whois.lacnic.net': 'LACNIC',
'whois.afrinic.net': 'AFRINIC',
}
_desig2rir = {
'apnic': 'APNIC',
'arin': 'ARIN',
'ripe ncc': 'RIPE NCC',
'lacnic': 'LACNIC',
'afrinic': 'AFRINIC',
'iana': 'IANA',
'6to4': 'IANA',
}
class client_subnet_authority(Generator):
auth = None
nonstrict = False
def _read(self, input):
global _whois2rir, _desig2rir
for row in csv.reader(input):
prefix, designation, date, whois, rdap, status, note = row
if prefix == 'Prefix':
continue
rir = designation.replace('Administered by ', '').lower()
whois = whois.lower()
if whois in _whois2rir:
rir = _whois2rir[whois]
else:
if rir in _desig2rir:
rir = _desig2rir[rir]
else:
found = None
for k, v in _desig2rir.items():
if k in rir:
found = v
break
if found:
rir = found
else:
if status == 'RESERVED':
rir = 'IANA'
elif designation == 'Segment Routing (SRv6) SIDs':
# TODO: How to better handle this weird allocation?
rir = 'IANA'
else:
raise Exception('Unknown whois/designation: %r/%r' % (whois, designation))
try:
net = ipaddress.ip_network(prefix)
except Exception:
ip, net = prefix.split('/')
net = ipaddress.ip_network('%s.0.0.0/%s' % (int(ip), net))
if net.version == 4:
idx = ipaddress.ip_network('%s/8' % net.network_address, strict=False)
else:
idx = ipaddress.ip_network('%s/24' % net.network_address, strict=False)
if idx.network_address in self.auth:
self.auth[idx.network_address].append({'net': net, 'auth': rir})
else:
self.auth[idx.network_address] = [{'net': net, 'auth': rir}]
def __init__(self, opts):
Generator.__init__(self, opts)
self.auth = {}
csvs = opts.get('csv', None)
urlv4 = opts.get('urlv4', 'https://www.iana.org/assignments/ipv4-address-space/ipv4-address-space.csv')
urlv6 = opts.get('urlv6', 'https://www.iana.org/assignments/ipv6-unicast-address-assignments/ipv6-unicast-address-assignments.csv')
if opts.get('nonstrict', False):
self.nonstrict = True
if csvs:
if not isinstance(csvs, list):
csvs = [ csvs ]
for file in csvs:
with open(file, newline='', encoding=encoding) as csvfile:
self._read(csvfile)
elif opts.get('fetch', 'no').lower() == 'yes':
urls = opts.get('url', [ urlv4, urlv6 ])
if urls and not isinstance(urls, list):
urls = [ urls ]
logging.info('bootstrapping client subnet authority using URLs')
for url in urls:
logging.info('fetching %s' % url)
self._read(StringIO(urlopen(Request(url)).read().decode('utf-8')))
else:
raise Exception('No authorities bootstrapped, please specify csv= or fetch=yes')
def process(self, datasets):
gen_datasets = []
for dataset in datasets:
if dataset.name != 'client_subnet':
continue
subnets = {}
for d1 in dataset.dimensions:
for d2 in d1.dimensions:
for k, v in d2.values.items():
if k == args.skipped_key:
continue
elif k == args.skipped_sum_key:
continue
if k in subnets:
subnets[k] += v
else:
subnets[k] = v
auth = {}
for subnet in subnets:
try:
ip = ipaddress.ip_address(subnet)
except Exception as e:
if not self.nonstrict:
raise e
continue
if ip.version == 4:
idx = ipaddress.ip_network('%s/8' % ip, strict=False)
ip = ipaddress.ip_network('%s/32' % ip)
else:
idx = ipaddress.ip_network('%s/24' % ip, strict=False)
ip = ipaddress.ip_network('%s/128' % ip)
if not idx.network_address in self.auth:
idx = '??'
else:
for entry in self.auth[idx.network_address]:
if entry['net'].overlaps(ip):
idx = entry['auth']
break
if idx in auth:
auth[idx] += subnets[subnet]
else:
auth[idx] = subnets[subnet]
if auth:
authd = Dataset()
authd.name = 'client_subnet_authority'
authd.start_time = dataset.start_time
authd.stop_time = dataset.stop_time
gen_datasets.append(authd)
authd1 = Dimension('ClientAuthority')
authd1.values = auth
authd.dimensions.append(authd1)
return gen_datasets
import sys
if sys.version_info[0] == 3 and sys.version_info[1] == 5: # pragma: no cover
Generator.__init_subclass__(client_subnet_authority)