342 lines
9.8 KiB
Python
342 lines
9.8 KiB
Python
|
#!/usr/bin/env python
|
||
|
# SPDX-License-Identifier: ISC
|
||
|
|
||
|
#
|
||
|
# test_pim_autorp.py
|
||
|
#
|
||
|
# Copyright (c) 2024 ATCorp
|
||
|
# Nathan Bahr
|
||
|
#
|
||
|
|
||
|
import os
|
||
|
import sys
|
||
|
import pytest
|
||
|
from functools import partial
|
||
|
|
||
|
# pylint: disable=C0413
|
||
|
# Import topogen and topotest helpers
|
||
|
from lib import topotest
|
||
|
from lib.topogen import Topogen, get_topogen
|
||
|
from lib.topolog import logger
|
||
|
from lib.pim import (
|
||
|
scapy_send_autorp_raw_packet,
|
||
|
verify_pim_rp_info,
|
||
|
verify_pim_rp_info_is_empty,
|
||
|
)
|
||
|
from lib.common_config import step, write_test_header
|
||
|
|
||
|
from time import sleep
|
||
|
|
||
|
"""
|
||
|
test_pim_autorp.py: Test general PIM AutoRP functionality
|
||
|
"""
|
||
|
|
||
|
TOPOLOGY = """
|
||
|
Basic AutoRP functionality
|
||
|
|
||
|
+---+---+ +---+---+
|
||
|
| | 10.10.76.0/24 | |
|
||
|
+ R1 + <------------------> + R2 |
|
||
|
| | .1 .2 | |
|
||
|
+---+---+ +---+---+
|
||
|
"""
|
||
|
|
||
|
# Save the Current Working Directory to find configuration files.
|
||
|
CWD = os.path.dirname(os.path.realpath(__file__))
|
||
|
sys.path.append(os.path.join(CWD, "../"))
|
||
|
|
||
|
# Required to instantiate the topology builder class.
|
||
|
pytestmark = [pytest.mark.pimd]
|
||
|
|
||
|
|
||
|
def build_topo(tgen):
|
||
|
"Build function"
|
||
|
|
||
|
# Create routers
|
||
|
tgen.add_router("r1")
|
||
|
tgen.add_router("r2")
|
||
|
|
||
|
# Create link between router 1 and 2
|
||
|
switch = tgen.add_switch("s1-2")
|
||
|
switch.add_link(tgen.gears["r1"])
|
||
|
switch.add_link(tgen.gears["r2"])
|
||
|
|
||
|
|
||
|
def setup_module(mod):
|
||
|
logger.info("PIM AutoRP basic functionality:\n {}".format(TOPOLOGY))
|
||
|
|
||
|
tgen = Topogen(build_topo, mod.__name__)
|
||
|
tgen.start_topology()
|
||
|
|
||
|
# Router 1 will be the router configured with "fake" autorp configuration, so give it a default route
|
||
|
# to router 2 so that routing to the RP address is not an issue
|
||
|
# r1_defrt_setup_cmds = [
|
||
|
# "ip route add default via 10.10.76.1 dev r1-eth0",
|
||
|
# ]
|
||
|
# for cmd in r1_defrt_setup_cmds:
|
||
|
# tgen.net["r1"].cmd(cmd)
|
||
|
|
||
|
logger.info("Testing PIM AutoRP support")
|
||
|
router_list = tgen.routers()
|
||
|
for rname, router in router_list.items():
|
||
|
logger.info("Loading router %s" % rname)
|
||
|
router.load_frr_config(os.path.join(CWD, "{}/frr.conf".format(rname)))
|
||
|
|
||
|
# Initialize all routers.
|
||
|
tgen.start_router()
|
||
|
for router in router_list.values():
|
||
|
if router.has_version("<", "4.0"):
|
||
|
tgen.set_error("unsupported version")
|
||
|
|
||
|
|
||
|
def teardown_module(mod):
|
||
|
"Teardown the pytest environment"
|
||
|
tgen = get_topogen()
|
||
|
tgen.stop_topology()
|
||
|
|
||
|
|
||
|
def test_pim_autorp_discovery_single_rp(request):
|
||
|
"Test PIM AutoRP Discovery with single RP"
|
||
|
tgen = get_topogen()
|
||
|
tc_name = request.node.name
|
||
|
write_test_header(tc_name)
|
||
|
|
||
|
if tgen.routers_have_failure():
|
||
|
pytest.skip(tgen.errors)
|
||
|
|
||
|
step("Start with no RP configuration")
|
||
|
result = verify_pim_rp_info_is_empty(tgen, "r1")
|
||
|
assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result)
|
||
|
|
||
|
step("Send AutoRP packet from r1 to r2")
|
||
|
# 1 RP(s), hold time 5 secs, 10.10.76.1, group(s) 224.0.0.0/4
|
||
|
data = "01005e00012800127f55cfb1080045c00030700c000008110abe0a0a4c01e000012801f001f0001c798b12010005000000000a0a4c0103010004e0000000"
|
||
|
scapy_send_autorp_raw_packet(tgen, "r1", "r1-eth0", data)
|
||
|
|
||
|
step("Verify rp-info from AutoRP packet")
|
||
|
result = verify_pim_rp_info(
|
||
|
tgen,
|
||
|
None,
|
||
|
"r2",
|
||
|
"224.0.0.0/4",
|
||
|
"r2-eth0",
|
||
|
"10.10.76.1",
|
||
|
"AutoRP",
|
||
|
False,
|
||
|
"ipv4",
|
||
|
True,
|
||
|
)
|
||
|
assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result)
|
||
|
|
||
|
step("Verify AutoRP configuration times out")
|
||
|
result = verify_pim_rp_info_is_empty(tgen, "r2")
|
||
|
assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result)
|
||
|
|
||
|
|
||
|
def test_pim_autorp_discovery_multiple_rp(request):
|
||
|
"Test PIM AutoRP Discovery with multiple RP's"
|
||
|
tgen = get_topogen()
|
||
|
tc_name = request.node.name
|
||
|
write_test_header(tc_name)
|
||
|
|
||
|
if tgen.routers_have_failure():
|
||
|
pytest.skip("skipped because of router(s) failure")
|
||
|
|
||
|
step("Start with no RP configuration")
|
||
|
result = verify_pim_rp_info_is_empty(tgen, "r2")
|
||
|
assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result)
|
||
|
|
||
|
step("Send AutoRP packet from r1 to r2")
|
||
|
# 2 RP(s), hold time 5 secs, 10.10.76.1, group(s) 224.0.0.0/8, 10.10.76.3, group(s) 225.0.0.0/8
|
||
|
data = "01005e00012800127f55cfb1080045c0003c700c000008110ab20a0a4c01e000012801f001f000283f5712020005000000000a0a4c0103010008e00000000a0a4c0303010008e1000000"
|
||
|
scapy_send_autorp_raw_packet(tgen, "r1", "r1-eth0", data)
|
||
|
|
||
|
step("Verify rp-info from AutoRP packet")
|
||
|
result = verify_pim_rp_info(
|
||
|
tgen,
|
||
|
None,
|
||
|
"r2",
|
||
|
"224.0.0.0/8",
|
||
|
"r2-eth0",
|
||
|
"10.10.76.1",
|
||
|
"AutoRP",
|
||
|
False,
|
||
|
"ipv4",
|
||
|
True,
|
||
|
)
|
||
|
assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result)
|
||
|
result = verify_pim_rp_info(
|
||
|
tgen,
|
||
|
None,
|
||
|
"r2",
|
||
|
"225.0.0.0/8",
|
||
|
"r2-eth0",
|
||
|
"10.10.76.3",
|
||
|
"AutoRP",
|
||
|
False,
|
||
|
"ipv4",
|
||
|
True,
|
||
|
)
|
||
|
assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result)
|
||
|
|
||
|
|
||
|
def test_pim_autorp_discovery_static(request):
|
||
|
"Test PIM AutoRP Discovery with Static RP"
|
||
|
tgen = get_topogen()
|
||
|
tc_name = request.node.name
|
||
|
write_test_header(tc_name)
|
||
|
|
||
|
if tgen.routers_have_failure():
|
||
|
pytest.skip("skipped because of router(s) failure")
|
||
|
|
||
|
step("Start with no RP configuration")
|
||
|
result = verify_pim_rp_info_is_empty(tgen, "r2")
|
||
|
assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result)
|
||
|
|
||
|
step("Add static RP configuration to r2")
|
||
|
rnode = tgen.routers()["r2"]
|
||
|
rnode.cmd("vtysh -c 'conf t' -c 'router pim' -c 'rp 10.10.76.3 224.0.0.0/4'")
|
||
|
|
||
|
step("Verify static rp-info from r2")
|
||
|
result = verify_pim_rp_info(
|
||
|
tgen,
|
||
|
None,
|
||
|
"r2",
|
||
|
"224.0.0.0/4",
|
||
|
"r2-eth0",
|
||
|
"10.10.76.3",
|
||
|
"Static",
|
||
|
False,
|
||
|
"ipv4",
|
||
|
True,
|
||
|
)
|
||
|
assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result)
|
||
|
|
||
|
step("Send AutoRP packet from r1 to r2")
|
||
|
# 1 RP(s), hold time 5 secs, 10.10.76.1, group(s) 224.0.0.0/4
|
||
|
data = "01005e00012800127f55cfb1080045c00030700c000008110abe0a0a4c01e000012801f001f0001c798b12010005000000000a0a4c0103010004e0000000"
|
||
|
scapy_send_autorp_raw_packet(tgen, "r1", "r1-eth0", data)
|
||
|
|
||
|
step("Verify rp-info from AutoRP packet")
|
||
|
result = verify_pim_rp_info(
|
||
|
tgen,
|
||
|
None,
|
||
|
"r2",
|
||
|
"224.0.0.0/4",
|
||
|
"r2-eth0",
|
||
|
"10.10.76.1",
|
||
|
"AutoRP",
|
||
|
False,
|
||
|
"ipv4",
|
||
|
True,
|
||
|
)
|
||
|
assert result is True, "Testcase {} :Failed \n Error: {}".format(tc_name, result)
|
||
|
|
||
|
|
||
|
def test_pim_autorp_announce_cli(request):
|
||
|
"Test PIM AutoRP Announcement CLI commands"
|
||
|
tgen = get_topogen()
|
||
|
tc_name = request.node.name
|
||
|
write_test_header(tc_name)
|
||
|
|
||
|
if tgen.routers_have_failure():
|
||
|
pytest.skip("skipped because of router(s) failure")
|
||
|
|
||
|
step("Add AutoRP announcement configuration to r1")
|
||
|
r1 = tgen.routers()["r1"]
|
||
|
r1.vtysh_cmd(
|
||
|
"""
|
||
|
conf
|
||
|
router pim
|
||
|
autorp announce holdtime 90
|
||
|
autorp announce interval 120
|
||
|
autorp announce scope 5
|
||
|
autorp announce 10.2.3.4 225.0.0.0/24
|
||
|
"""
|
||
|
)
|
||
|
|
||
|
expected = {
|
||
|
"discoveryEnabled": True,
|
||
|
"announce": {
|
||
|
"scope": 5,
|
||
|
"interval": 120,
|
||
|
"holdtime": 90,
|
||
|
"rpList": [
|
||
|
{"rpAddress": "10.2.3.4", "group": "225.0.0.0/24", "prefixList": ""}
|
||
|
],
|
||
|
},
|
||
|
}
|
||
|
|
||
|
test_func = partial(
|
||
|
topotest.router_json_cmp, r1, "show ip pim autorp json", expected
|
||
|
)
|
||
|
_, result = topotest.run_and_expect(test_func, None, count=30, wait=1)
|
||
|
assertmsg = '"{}" JSON output mismatches'.format(r1.name)
|
||
|
assert result is None, assertmsg
|
||
|
|
||
|
r1.vtysh_cmd(
|
||
|
"""
|
||
|
conf
|
||
|
router pim
|
||
|
autorp announce 10.2.3.4 group-list ListA
|
||
|
"""
|
||
|
)
|
||
|
expected = {
|
||
|
"discoveryEnabled": True,
|
||
|
"announce": {
|
||
|
"scope": 5,
|
||
|
"interval": 120,
|
||
|
"holdtime": 90,
|
||
|
"rpList": [{"rpAddress": "10.2.3.4", "group": "", "prefixList": "ListA"}],
|
||
|
},
|
||
|
}
|
||
|
|
||
|
test_func = partial(
|
||
|
topotest.router_json_cmp, r1, "show ip pim autorp json", expected
|
||
|
)
|
||
|
_, result = topotest.run_and_expect(test_func, None, count=30, wait=1)
|
||
|
assertmsg = '"{}" JSON output mismatches'.format(r1.name)
|
||
|
assert result is None, assertmsg
|
||
|
|
||
|
|
||
|
def test_pim_autorp_announce_group(request):
|
||
|
"Test PIM AutoRP Announcement with a single group"
|
||
|
tgen = get_topogen()
|
||
|
tc_name = request.node.name
|
||
|
write_test_header(tc_name)
|
||
|
|
||
|
if tgen.routers_have_failure():
|
||
|
pytest.skip("skipped because of router(s) failure")
|
||
|
|
||
|
step("Add candidate RP configuration to r1")
|
||
|
rnode = tgen.routers()["r1"]
|
||
|
rnode.cmd(
|
||
|
"vtysh -c 'conf t' -c 'router pim' -c 'send-rp-announce 10.10.76.1 224.0.0.0/4'"
|
||
|
)
|
||
|
step("Verify Announcement sent data")
|
||
|
# TODO: Verify AutoRP mapping agent receives candidate RP announcement
|
||
|
# Mapping agent is not yet implemented
|
||
|
# sleep(10)
|
||
|
step("Change AutoRP Announcement packet parameters")
|
||
|
rnode.cmd(
|
||
|
"vtysh -c 'conf t' -c 'router pim' -c 'send-rp-announce scope 8 interval 10 holdtime 60'"
|
||
|
)
|
||
|
step("Verify Announcement sent data")
|
||
|
# TODO: Verify AutoRP mapping agent receives updated candidate RP announcement
|
||
|
# Mapping agent is not yet implemented
|
||
|
# sleep(10)
|
||
|
|
||
|
|
||
|
def test_memory_leak():
|
||
|
"Run the memory leak test and report results."
|
||
|
tgen = get_topogen()
|
||
|
if not tgen.is_memleak_enabled():
|
||
|
pytest.skip("Memory leak test/report is disabled")
|
||
|
|
||
|
tgen.report_memory_leaks()
|
||
|
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
args = ["-s"] + sys.argv[1:]
|
||
|
sys.exit(pytest.main(args))
|