640 lines
20 KiB
Python
640 lines
20 KiB
Python
#!/usr/bin/env python
|
|
# SPDX-License-Identifier: ISC
|
|
|
|
#
|
|
# Copyright (c) 2021 by VMware, Inc. ("VMware")
|
|
# Used Copyright (c) 2018 by Network Device Education Foundation, Inc.
|
|
# ("NetDEF") in this file.
|
|
#
|
|
|
|
|
|
"""
|
|
Following tests are covered to test ecmp functionality on BGP GSHUT.
|
|
1. Verify graceful-shutdown functionality with iBGP peers
|
|
2. Verify graceful-shutdown functionality after
|
|
deleting/re-adding route-map with iBGP peers
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import time
|
|
import pytest
|
|
|
|
# Save the Current Working Directory to find configuration files.
|
|
CWD = os.path.dirname(os.path.realpath(__file__))
|
|
sys.path.append(os.path.join(CWD, "../"))
|
|
sys.path.append(os.path.join(CWD, "../../"))
|
|
|
|
# pylint: disable=C0413
|
|
# Import topogen and topotest helpers
|
|
from lib.topogen import Topogen, get_topogen
|
|
|
|
from lib.common_config import (
|
|
start_topology,
|
|
write_test_header,
|
|
write_test_footer,
|
|
verify_rib,
|
|
check_address_types,
|
|
reset_config_on_routers,
|
|
step,
|
|
get_frr_ipv6_linklocal,
|
|
create_route_maps,
|
|
create_bgp_community_lists,
|
|
required_linux_kernel_version,
|
|
)
|
|
from lib.topolog import logger
|
|
from lib.bgp import (
|
|
verify_bgp_convergence,
|
|
create_router_bgp,
|
|
verify_bgp_rib,
|
|
verify_bgp_attributes,
|
|
)
|
|
from lib.topojson import build_config_from_json
|
|
|
|
pytestmark = [pytest.mark.bgpd, pytest.mark.staticd]
|
|
|
|
|
|
# Global variables
|
|
NETWORK = {"ipv4": "100.0.10.1/32", "ipv6": "1::1/128"}
|
|
NEXT_HOP_IP_1 = {"ipv4": "10.0.3.1", "ipv6": "fd00:0:0:3::1"}
|
|
NEXT_HOP_IP_2 = {"ipv4": "10.0.4.1", "ipv6": "fd00:0:0:2::1"}
|
|
PREFERRED_NEXT_HOP = "link_local"
|
|
BGP_CONVERGENCE = False
|
|
|
|
|
|
def setup_module(mod):
|
|
"""
|
|
Sets up the pytest environment
|
|
|
|
* `mod`: module name
|
|
"""
|
|
|
|
global ADDR_TYPES
|
|
|
|
# Required linux kernel version for this suite to run.
|
|
result = required_linux_kernel_version("4.16")
|
|
if result is not True:
|
|
pytest.skip("Kernel requirements are not met, kernel version should be >=4.16")
|
|
|
|
testsuite_run_time = time.asctime(time.localtime(time.time()))
|
|
logger.info("Testsuite start time: {}".format(testsuite_run_time))
|
|
logger.info("=" * 40)
|
|
|
|
logger.info("Running setup_module to create topology")
|
|
|
|
# This function initiates the topology build with Topogen...
|
|
json_file = "{}/ibgp_gshut_topo1.json".format(CWD)
|
|
tgen = Topogen(json_file, mod.__name__)
|
|
global topo
|
|
topo = tgen.json_topo
|
|
# ... and here it calls Mininet initialization functions.
|
|
|
|
# Starting topology, create tmp files which are loaded to routers
|
|
# to start daemons and then start routers
|
|
start_topology(tgen)
|
|
|
|
# Creating configuration from JSON
|
|
build_config_from_json(tgen, topo)
|
|
|
|
# Don't run this test if we have any failure.
|
|
if tgen.routers_have_failure():
|
|
pytest.skip(tgen.errors)
|
|
|
|
# Api call verify whether BGP is converged
|
|
ADDR_TYPES = check_address_types()
|
|
|
|
BGP_CONVERGENCE = verify_bgp_convergence(tgen, topo)
|
|
assert BGP_CONVERGENCE is True, "setup_module :Failed \n Error:" " {}".format(
|
|
BGP_CONVERGENCE
|
|
)
|
|
|
|
logger.info("Running setup_module() done")
|
|
|
|
|
|
def teardown_module():
|
|
"""
|
|
Teardown the pytest environment.
|
|
|
|
* `mod`: module name
|
|
"""
|
|
|
|
logger.info("Running teardown_module to delete topology")
|
|
|
|
tgen = get_topogen()
|
|
|
|
# Stop toplogy and Remove tmp files
|
|
tgen.stop_topology()
|
|
|
|
|
|
###########################
|
|
# Local APIs
|
|
###########################
|
|
|
|
|
|
def next_hop_per_address_family(
|
|
tgen, dut, peer, addr_type, next_hop_dict, preferred_next_hop=PREFERRED_NEXT_HOP
|
|
):
|
|
"""
|
|
This function returns link_local or global next_hop per address-family
|
|
"""
|
|
|
|
intferface = topo["routers"][peer]["links"]["{}".format(dut)]["interface"]
|
|
if addr_type == "ipv6" and "link_local" in preferred_next_hop:
|
|
next_hop = get_frr_ipv6_linklocal(tgen, peer, intf=intferface)
|
|
else:
|
|
next_hop = next_hop_dict[addr_type]
|
|
|
|
return next_hop
|
|
|
|
|
|
###########################
|
|
# TESTCASES
|
|
###########################
|
|
|
|
|
|
def test_verify_graceful_shutdown_functionality_with_iBGP_peers_p0(request):
|
|
"""
|
|
Verify graceful-shutdown functionality with iBGP peers
|
|
"""
|
|
|
|
tc_name = request.node.name
|
|
write_test_header(tc_name)
|
|
tgen = get_topogen()
|
|
reset_config_on_routers(tgen)
|
|
|
|
step("Done in base config: Configure base config as per the topology")
|
|
step("Base config should be up, verify using BGP convergence")
|
|
result = verify_bgp_convergence(tgen, topo)
|
|
assert result is True, "Test case {} : Failed \n Error: {}".format(tc_name, result)
|
|
|
|
step("Done in base config: Advertise prefixes from R1")
|
|
step("Verify BGP routes are received at R4 with best path from R3 to R1")
|
|
|
|
for addr_type in ADDR_TYPES:
|
|
dut = "r4"
|
|
next_hop1 = next_hop_per_address_family(
|
|
tgen, "r4", "r2", addr_type, NEXT_HOP_IP_1
|
|
)
|
|
next_hop2 = next_hop_per_address_family(
|
|
tgen, "r4", "r3", addr_type, NEXT_HOP_IP_2
|
|
)
|
|
|
|
input_topo = {key: topo["routers"][key] for key in ["r1"]}
|
|
result = verify_bgp_rib(
|
|
tgen, addr_type, dut, input_topo, next_hop=[next_hop1, next_hop2]
|
|
)
|
|
assert result is True, "Test case {} : Failed \n Error: {}".format(
|
|
tc_name, result
|
|
)
|
|
|
|
result = verify_rib(
|
|
tgen, addr_type, dut, input_topo, next_hop=[next_hop1, next_hop2]
|
|
)
|
|
assert result is True, "Test case {} : Failed \n Error: {}".format(
|
|
tc_name, result
|
|
)
|
|
|
|
step("On R1 configure:")
|
|
step("Create standard bgp community-list to permit graceful-shutdown:")
|
|
input_dict_1 = {
|
|
"r1": {
|
|
"bgp_community_lists": [
|
|
{
|
|
"community_type": "standard",
|
|
"action": "permit",
|
|
"name": "GSHUT",
|
|
"value": "graceful-shutdown",
|
|
}
|
|
]
|
|
}
|
|
}
|
|
|
|
result = create_bgp_community_lists(tgen, input_dict_1)
|
|
assert result is True, "Test case {} : Failed \n Error: {}".format(tc_name, result)
|
|
|
|
step("Create route-map to set community GSHUT in OUT direction")
|
|
|
|
input_dict_2 = {
|
|
"r1": {
|
|
"route_maps": {
|
|
"GSHUT-OUT": [
|
|
{
|
|
"action": "permit",
|
|
"seq_id": "10",
|
|
"set": {"community": {"num": "graceful-shutdown"}},
|
|
}
|
|
]
|
|
}
|
|
}
|
|
}
|
|
|
|
result = create_route_maps(tgen, input_dict_2)
|
|
assert result is True, "Test case {} : Failed \n Error: {}".format(tc_name, result)
|
|
|
|
input_dict_3 = {
|
|
"r1": {
|
|
"bgp": {
|
|
"address_family": {
|
|
"ipv4": {
|
|
"unicast": {
|
|
"neighbor": {
|
|
"r3": {
|
|
"dest_link": {
|
|
"r1": {
|
|
"route_maps": [
|
|
{
|
|
"name": "GSHUT-OUT",
|
|
"direction": "out",
|
|
}
|
|
]
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
},
|
|
"ipv6": {
|
|
"unicast": {
|
|
"neighbor": {
|
|
"r3": {
|
|
"dest_link": {
|
|
"r1": {
|
|
"route_maps": [
|
|
{
|
|
"name": "GSHUT-OUT",
|
|
"direction": "out",
|
|
}
|
|
]
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
},
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
result = create_router_bgp(tgen, topo, input_dict_3)
|
|
assert result is True, "Test case {} : Failed \n Error: {}".format(tc_name, result)
|
|
|
|
step(
|
|
"FRR is setting local-pref to 0 by-default on receiver GSHUT community, "
|
|
"below step is not needed, but keeping for reference"
|
|
)
|
|
step(
|
|
"On R3, apply route-map IN direction to match GSHUT community "
|
|
"and set local-preference to 0."
|
|
)
|
|
|
|
step(
|
|
"Verify BGP convergence on R4 and ensure all the neighbours state "
|
|
"is established"
|
|
)
|
|
|
|
result = verify_bgp_convergence(tgen, topo)
|
|
assert result is True, "Test case {} : Failed \n Error: {}".format(tc_name, result)
|
|
|
|
step("Verify BGP routes on R4:")
|
|
step("local pref for routes coming from R1 is set to 0.")
|
|
|
|
for addr_type in ADDR_TYPES:
|
|
rmap_dict = {
|
|
"r1": {
|
|
"route_maps": {
|
|
"GSHUT-OUT": [{"set": {"locPrf": 0}}],
|
|
}
|
|
}
|
|
}
|
|
|
|
static_routes = [NETWORK[addr_type]]
|
|
result = verify_bgp_attributes(
|
|
tgen, addr_type, dut, static_routes, "GSHUT-OUT", rmap_dict
|
|
)
|
|
assert result is True, "Test case {} : Failed \n Error: {}".format(
|
|
tc_name, result
|
|
)
|
|
|
|
step("Ensure that best path is selected from R4 to R3.")
|
|
|
|
for addr_type in ADDR_TYPES:
|
|
dut = "r4"
|
|
next_hop1 = next_hop_per_address_family(
|
|
tgen, "r4", "r2", addr_type, NEXT_HOP_IP_1
|
|
)
|
|
next_hop2 = next_hop_per_address_family(
|
|
tgen, "r4", "r3", addr_type, NEXT_HOP_IP_2
|
|
)
|
|
|
|
input_topo = {key: topo["routers"][key] for key in ["r1"]}
|
|
result = verify_bgp_rib(
|
|
tgen, addr_type, dut, input_topo, next_hop=[next_hop1, next_hop2]
|
|
)
|
|
assert result is True, "Test case {} : Failed \n Error: {}".format(
|
|
tc_name, result
|
|
)
|
|
|
|
result = verify_rib(tgen, addr_type, dut, input_topo, next_hop=next_hop1)
|
|
assert result is True, "Test case {} : Failed \n Error: {}".format(
|
|
tc_name, result
|
|
)
|
|
|
|
write_test_footer(tc_name)
|
|
|
|
|
|
def test_verify_deleting_re_adding_route_map_with_iBGP_peers_p0(request):
|
|
"""
|
|
Verify graceful-shutdown functionality after deleting/re-adding route-map
|
|
with iBGP peers
|
|
"""
|
|
|
|
tc_name = request.node.name
|
|
write_test_header(tc_name)
|
|
tgen = get_topogen()
|
|
reset_config_on_routers(tgen)
|
|
|
|
step("Done in base config: Configure base config as per the topology")
|
|
step("Base config should be up, verify using BGP convergence")
|
|
result = verify_bgp_convergence(tgen, topo)
|
|
assert result is True, "Test case {} : Failed \n Error: {}".format(tc_name, result)
|
|
|
|
step("Done in base config: Advertise prefixes from R1")
|
|
step("Verify BGP routes are received at R4 with best path from R3 to R1")
|
|
|
|
for addr_type in ADDR_TYPES:
|
|
dut = "r4"
|
|
next_hop1 = next_hop_per_address_family(
|
|
tgen, "r4", "r2", addr_type, NEXT_HOP_IP_1
|
|
)
|
|
next_hop2 = next_hop_per_address_family(
|
|
tgen, "r4", "r3", addr_type, NEXT_HOP_IP_2
|
|
)
|
|
|
|
input_topo = {key: topo["routers"][key] for key in ["r1"]}
|
|
result = verify_bgp_rib(
|
|
tgen, addr_type, dut, input_topo, next_hop=[next_hop1, next_hop2]
|
|
)
|
|
assert result is True, "Test case {} : Failed \n Error: {}".format(
|
|
tc_name, result
|
|
)
|
|
|
|
result = verify_rib(
|
|
tgen, addr_type, dut, input_topo, next_hop=[next_hop1, next_hop2]
|
|
)
|
|
assert result is True, "Test case {} : Failed \n Error: {}".format(
|
|
tc_name, result
|
|
)
|
|
|
|
step("On R1 configure:")
|
|
step("Create standard bgp community-list to permit graceful-shutdown:")
|
|
input_dict_1 = {
|
|
"r1": {
|
|
"bgp_community_lists": [
|
|
{
|
|
"community_type": "standard",
|
|
"action": "permit",
|
|
"name": "GSHUT",
|
|
"value": "graceful-shutdown",
|
|
}
|
|
]
|
|
}
|
|
}
|
|
|
|
result = create_bgp_community_lists(tgen, input_dict_1)
|
|
assert result is True, "Test case {} : Failed \n Error: {}".format(tc_name, result)
|
|
|
|
step("Create route-map to set community GSHUT in OUT direction")
|
|
|
|
input_dict_2 = {
|
|
"r1": {
|
|
"route_maps": {
|
|
"GSHUT-OUT": [
|
|
{
|
|
"action": "permit",
|
|
"seq_id": "10",
|
|
"set": {"community": {"num": "graceful-shutdown"}},
|
|
}
|
|
]
|
|
}
|
|
}
|
|
}
|
|
|
|
result = create_route_maps(tgen, input_dict_2)
|
|
assert result is True, "Test case {} : Failed \n Error: {}".format(tc_name, result)
|
|
|
|
input_dict_3 = {
|
|
"r1": {
|
|
"bgp": {
|
|
"address_family": {
|
|
"ipv4": {
|
|
"unicast": {
|
|
"neighbor": {
|
|
"r3": {
|
|
"dest_link": {
|
|
"r1": {
|
|
"route_maps": [
|
|
{
|
|
"name": "GSHUT-OUT",
|
|
"direction": "out",
|
|
}
|
|
]
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
},
|
|
"ipv6": {
|
|
"unicast": {
|
|
"neighbor": {
|
|
"r3": {
|
|
"dest_link": {
|
|
"r1": {
|
|
"route_maps": [
|
|
{
|
|
"name": "GSHUT-OUT",
|
|
"direction": "out",
|
|
}
|
|
]
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
},
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
result = create_router_bgp(tgen, topo, input_dict_3)
|
|
assert result is True, "Test case {} : Failed \n Error: {}".format(tc_name, result)
|
|
|
|
step(
|
|
"FRR is setting local-pref to 0 by-default on receiver GSHUT community, "
|
|
"below step is not needed, but keeping for reference"
|
|
)
|
|
step(
|
|
"On R3, apply route-map IN direction to match GSHUT community "
|
|
"and set local-preference to 0."
|
|
)
|
|
|
|
step(
|
|
"Verify BGP convergence on R4 and ensure all the neighbours state "
|
|
"is established"
|
|
)
|
|
|
|
result = verify_bgp_convergence(tgen, topo)
|
|
assert result is True, "Test case {} : Failed \n Error: {}".format(tc_name, result)
|
|
|
|
step("Verify BGP routes on R4:")
|
|
step("local pref for routes coming from R1 is set to 0.")
|
|
|
|
for addr_type in ADDR_TYPES:
|
|
rmap_dict = {
|
|
"r1": {
|
|
"route_maps": {
|
|
"GSHUT-OUT": [{"set": {"locPrf": 0}}],
|
|
}
|
|
}
|
|
}
|
|
|
|
static_routes = [NETWORK[addr_type]]
|
|
result = verify_bgp_attributes(
|
|
tgen, addr_type, dut, static_routes, "GSHUT-OUT", rmap_dict
|
|
)
|
|
assert result is True, "Test case {} : Failed \n Error: {}".format(
|
|
tc_name, result
|
|
)
|
|
|
|
step("Ensure that best path is selected from R4 to R3.")
|
|
|
|
for addr_type in ADDR_TYPES:
|
|
dut = "r4"
|
|
next_hop1 = next_hop_per_address_family(
|
|
tgen, "r4", "r2", addr_type, NEXT_HOP_IP_1
|
|
)
|
|
next_hop2 = next_hop_per_address_family(
|
|
tgen, "r4", "r3", addr_type, NEXT_HOP_IP_2
|
|
)
|
|
|
|
input_topo = {key: topo["routers"][key] for key in ["r1"]}
|
|
result = verify_bgp_rib(
|
|
tgen, addr_type, dut, input_topo, next_hop=[next_hop1, next_hop2]
|
|
)
|
|
assert result is True, "Test case {} : Failed \n Error: {}".format(
|
|
tc_name, result
|
|
)
|
|
|
|
result = verify_rib(tgen, addr_type, dut, input_topo, next_hop=next_hop1)
|
|
assert result is True, "Test case {} : Failed \n Error: {}".format(
|
|
tc_name, result
|
|
)
|
|
|
|
step("Delete route-map from R1")
|
|
del_rmap_dict = {
|
|
"r1": {
|
|
"route_maps": {
|
|
"GSHUT-OUT": [
|
|
{
|
|
"action": "permit",
|
|
"seq_id": "10",
|
|
"set": {
|
|
"community": {"num": "graceful-shutdown", "delete": True}
|
|
},
|
|
}
|
|
]
|
|
}
|
|
}
|
|
}
|
|
|
|
result = create_route_maps(tgen, del_rmap_dict)
|
|
assert result is True, "Test case {} : Failed \n Error: {}".format(tc_name, result)
|
|
|
|
step(
|
|
"Verify BGP convergence on R3 and ensure that all neighbor state "
|
|
"is established"
|
|
)
|
|
result = verify_bgp_convergence(tgen, topo)
|
|
assert result is True, "Test case {} : Failed \n Error: {}".format(tc_name, result)
|
|
|
|
step("Verify BGP routes on R4:")
|
|
step("Ensure that best path is selected from R1->R3")
|
|
|
|
for addr_type in ADDR_TYPES:
|
|
dut = "r4"
|
|
next_hop1 = next_hop_per_address_family(
|
|
tgen, "r4", "r2", addr_type, NEXT_HOP_IP_1
|
|
)
|
|
next_hop2 = next_hop_per_address_family(
|
|
tgen, "r4", "r3", addr_type, NEXT_HOP_IP_2
|
|
)
|
|
|
|
input_topo = {key: topo["routers"][key] for key in ["r1"]}
|
|
result = verify_bgp_rib(tgen, addr_type, dut, input_topo, next_hop=[next_hop1])
|
|
assert result is True, "Test case {} : Failed \n Error: {}".format(
|
|
tc_name, result
|
|
)
|
|
|
|
result = verify_rib(tgen, addr_type, dut, input_topo, next_hop=[next_hop1])
|
|
assert result is True, "Test case {} : Failed \n Error: {}".format(
|
|
tc_name, result
|
|
)
|
|
|
|
step("Re-add route-map in R1")
|
|
result = create_route_maps(tgen, input_dict_2)
|
|
assert result is True, "Test case {} : Failed \n Error: {}".format(tc_name, result)
|
|
|
|
step(
|
|
"Verify BGP convergence on R3 and ensure all the neighbours state "
|
|
"is established"
|
|
)
|
|
|
|
result = verify_bgp_convergence(tgen, topo)
|
|
assert result is True, "Test case {} : Failed \n Error: {}".format(tc_name, result)
|
|
|
|
step("Verify BGP routes on R4:")
|
|
step("local pref for routes coming from R1 is set to 0.")
|
|
|
|
for addr_type in ADDR_TYPES:
|
|
rmap_dict = {"r1": {"route_maps": {"GSHUT-OUT": [{"set": {"locPrf": 0}}]}}}
|
|
|
|
static_routes = [NETWORK[addr_type]]
|
|
result = verify_bgp_attributes(
|
|
tgen, addr_type, dut, static_routes, "GSHUT-OUT", rmap_dict
|
|
)
|
|
assert result is True, "Test case {} : Failed \n Error: {}".format(
|
|
tc_name, result
|
|
)
|
|
|
|
step("Ensure that best path is selected from R4 to R3.")
|
|
|
|
for addr_type in ADDR_TYPES:
|
|
dut = "r4"
|
|
next_hop1 = next_hop_per_address_family(
|
|
tgen, "r4", "r2", addr_type, NEXT_HOP_IP_1
|
|
)
|
|
next_hop2 = next_hop_per_address_family(
|
|
tgen, "r4", "r3", addr_type, NEXT_HOP_IP_2
|
|
)
|
|
|
|
input_topo = {key: topo["routers"][key] for key in ["r1"]}
|
|
result = verify_bgp_rib(
|
|
tgen, addr_type, dut, input_topo, next_hop=[next_hop1, next_hop2]
|
|
)
|
|
assert result is True, "Test case {} : Failed \n Error: {}".format(
|
|
tc_name, result
|
|
)
|
|
|
|
result = verify_rib(tgen, addr_type, dut, input_topo, next_hop=next_hop1)
|
|
assert result is True, "Test case {} : Failed \n Error: {}".format(
|
|
tc_name, result
|
|
)
|
|
|
|
write_test_footer(tc_name)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
args = ["-s"] + sys.argv[1:]
|
|
sys.exit(pytest.main(args))
|