frr/tests/topotests/lib/ospf.py
Daniel Baumann 3124f89aed
Adding upstream version 10.1.1.
Signed-off-by: Daniel Baumann <daniel@debian.org>
2025-02-05 10:03:58 +01:00

3028 lines
115 KiB
Python

# SPDX-License-Identifier: ISC
#
# Copyright (c) 2020 by VMware, Inc. ("VMware")
# Used Copyright (c) 2018 by Network Device Education Foundation, Inc.
# ("NetDEF") in this file.
#
import ipaddress
import sys
from copy import deepcopy
from time import sleep
# Import common_config to use commomnly used APIs
from lib.common_config import (
create_common_configurations,
InvalidCLIError,
generate_ips,
retry,
run_frr_cmd,
validate_ip_address,
)
from lib.topolog import logger
from lib.topotest import frr_unicode
################################
# Configure procs
################################
def create_router_ospf(tgen, topo=None, input_dict=None, build=False, load_config=True):
"""
API to configure ospf on router.
Parameters
----------
* `tgen` : Topogen object
* `topo` : json file data
* `input_dict` : Input dict data, required when configuring from testcase
* `build` : Only for initial setup phase this is set as True.
* `load_config` : Loading the config to router this is set as True.
Usage
-----
input_dict = {
"r1": {
"ospf": {
"router_id": "22.22.22.22",
"area": [{ "id": "0.0.0.0", "type": "nssa"}]
}
}
result = create_router_ospf(tgen, topo, input_dict)
Returns
-------
True or False
"""
logger.debug("Entering lib API: create_router_ospf()")
result = False
if topo is None:
topo = tgen.json_topo
if not input_dict:
input_dict = deepcopy(topo)
else:
topo = topo["routers"]
input_dict = deepcopy(input_dict)
for ospf in ["ospf", "ospf6"]:
config_data_dict = {}
for router in input_dict.keys():
if ospf not in input_dict[router]:
logger.debug("Router %s: %s not present in input_dict", router, ospf)
continue
config_data = __create_ospf_global(
tgen, input_dict, router, build, load_config, ospf
)
if config_data:
if router not in config_data_dict:
config_data_dict[router] = config_data
else:
config_data_dict[router].extend(config_data)
try:
result = create_common_configurations(
tgen, config_data_dict, ospf, build, load_config
)
except InvalidCLIError:
logger.error("create_router_ospf (ipv4)", exc_info=True)
result = False
logger.debug("Exiting lib API: create_router_ospf()")
return result
def __create_ospf_global(tgen, input_dict, router, build, load_config, ospf):
"""
Helper API to create ospf global configuration.
Parameters
----------
* `tgen` : Topogen object
* `input_dict` : Input dict data, required when configuring from testcase
* `router` : router to be configured.
* `build` : Only for initial setup phase this is set as True.
* `load_config` : Loading the config to router this is set as True.
* `ospf` : either 'ospf' or 'ospf6'
Usage
-----
input_dict = {
"routers": {
"r1": {
"links": {
"r3": {
"ipv6": "2013:13::1/64",
"ospf6": {
"hello_interval": 1,
"dead_interval": 4,
"network": "point-to-point"
}
}
},
"ospf6": {
"router_id": "1.1.1.1",
"neighbors": {
"r3": {
"area": "1.1.1.1"
}
}
}
}
}
Returns
-------
list of configuration commands
"""
config_data = []
if ospf not in input_dict[router]:
return config_data
logger.debug("Entering lib API: __create_ospf_global()")
ospf_data = input_dict[router][ospf]
del_ospf_action = ospf_data.setdefault("delete", False)
if del_ospf_action:
config_data = ["no router {}".format(ospf)]
return config_data
cmd = "router {}".format(ospf)
config_data.append(cmd)
# router id
router_id = ospf_data.setdefault("router_id", None)
del_router_id = ospf_data.setdefault("del_router_id", False)
if del_router_id:
config_data.append("no {} router-id".format(ospf))
if router_id:
config_data.append("{} router-id {}".format(ospf, router_id))
# log-adjacency-changes
log_adj_changes = ospf_data.setdefault("log_adj_changes", None)
del_log_adj_changes = ospf_data.setdefault("del_log_adj_changes", False)
if del_log_adj_changes:
config_data.append("no log-adjacency-changes detail")
if log_adj_changes:
config_data.append("log-adjacency-changes {}".format(log_adj_changes))
# aggregation timer
aggr_timer = ospf_data.setdefault("aggr_timer", None)
del_aggr_timer = ospf_data.setdefault("del_aggr_timer", False)
if del_aggr_timer:
config_data.append("no aggregation timer")
if aggr_timer:
config_data.append("aggregation timer {}".format(aggr_timer))
# maximum path information
ecmp_data = ospf_data.setdefault("maximum-paths", {})
if ecmp_data:
cmd = "maximum-paths {}".format(ecmp_data)
del_action = ospf_data.setdefault("del_max_path", False)
if del_action:
cmd = "no maximum-paths"
config_data.append(cmd)
# Flood reduction.
flood_data = ospf_data.setdefault("flood-reduction", {})
if flood_data:
cmd = "flood-reduction"
del_action = ospf_data.setdefault("del_flood_reduction", False)
if del_action:
cmd = "no flood-reduction"
config_data.append(cmd)
# LSA refresh timer - A hidden command.
refresh_data = ospf_data.setdefault("lsa-refresh", {})
if refresh_data:
cmd = "ospf lsa-refresh {}".format(refresh_data)
del_action = ospf_data.setdefault("del_lsa_refresh", False)
if del_action:
cmd = "no ospf lsa-refresh"
config_data.append(cmd)
# redistribute command
redistribute_data = ospf_data.setdefault("redistribute", {})
if redistribute_data:
for redistribute in redistribute_data:
if "redist_type" not in redistribute:
logger.debug(
"Router %s: 'redist_type' not present in " "input_dict", router
)
else:
cmd = "redistribute {}".format(redistribute["redist_type"])
for red_type in redistribute_data:
if "route_map" in red_type:
cmd = cmd + " route-map {}".format(red_type["route_map"])
del_action = redistribute.setdefault("delete", False)
if del_action:
cmd = "no {}".format(cmd)
config_data.append(cmd)
# area information
area_data = ospf_data.setdefault("area", {})
if area_data:
for area in area_data:
if "id" not in area:
logger.debug(
"Router %s: 'area id' not present in " "input_dict", router
)
else:
cmd = "area {}".format(area["id"])
if "type" in area:
cmd = cmd + " {}".format(area["type"])
if "flood-reduction" in area:
cmd = cmd + " flood-reduction"
del_action = area.setdefault("delete", False)
if del_action:
cmd = "no {}".format(cmd)
config_data.append(cmd)
# def route information
def_rte_data = ospf_data.setdefault("default-information", {})
if def_rte_data:
if "originate" not in def_rte_data:
logger.debug(
"Router %s: 'originate key' not present in " "input_dict", router
)
else:
cmd = "default-information originate"
if "always" in def_rte_data:
cmd = cmd + " always"
if "metric" in def_rte_data:
cmd = cmd + " metric {}".format(def_rte_data["metric"])
if "metric-type" in def_rte_data:
cmd = cmd + " metric-type {}".format(def_rte_data["metric-type"])
if "route-map" in def_rte_data:
cmd = cmd + " route-map {}".format(def_rte_data["route-map"])
del_action = def_rte_data.setdefault("delete", False)
if del_action:
cmd = "no {}".format(cmd)
config_data.append(cmd)
# summary information
summary_data = ospf_data.setdefault("summary-address", {})
if summary_data:
for summary in summary_data:
if "prefix" not in summary:
logger.debug(
"Router %s: 'summary-address' not present in " "input_dict",
router,
)
else:
cmd = "summary {}/{}".format(summary["prefix"], summary["mask"])
_tag = summary.setdefault("tag", None)
if _tag:
cmd = "{} tag {}".format(cmd, _tag)
_advertise = summary.setdefault("advertise", True)
if not _advertise:
cmd = "{} no-advertise".format(cmd)
del_action = summary.setdefault("delete", False)
if del_action:
cmd = "no {}".format(cmd)
config_data.append(cmd)
# ospf gr information
gr_data = ospf_data.setdefault("graceful-restart", {})
if gr_data:
if "opaque" in gr_data and gr_data["opaque"]:
cmd = "capability opaque"
if gr_data.setdefault("delete", False):
cmd = "no {}".format(cmd)
config_data.append(cmd)
if "helper enable" in gr_data and not gr_data["helper enable"]:
cmd = "graceful-restart helper enable"
if gr_data.setdefault("delete", False):
cmd = "no {}".format(cmd)
config_data.append(cmd)
elif "helper enable" in gr_data and type(gr_data["helper enable"]) is list:
for rtrs in gr_data["helper enable"]:
cmd = "graceful-restart helper enable {}".format(rtrs)
if gr_data.setdefault("delete", False):
cmd = "no {}".format(cmd)
config_data.append(cmd)
if "helper" in gr_data:
if type(gr_data["helper"]) is not list:
gr_data["helper"] = list(gr_data["helper"])
for helper_role in gr_data["helper"]:
cmd = "graceful-restart helper {}".format(helper_role)
if gr_data.setdefault("delete", False):
cmd = "no {}".format(cmd)
config_data.append(cmd)
if "supported-grace-time" in gr_data:
cmd = "graceful-restart helper supported-grace-time {}".format(
gr_data["supported-grace-time"]
)
if gr_data.setdefault("delete", False):
cmd = "no {}".format(cmd)
config_data.append(cmd)
config_data.append("exit")
logger.debug("Exiting lib API: create_ospf_global()")
return config_data
def config_ospf_interface(
tgen, topo=None, input_dict=None, build=False, load_config=True
):
"""
API to configure ospf on router.
Parameters
----------
* `tgen` : Topogen object
* `topo` : json file data
* `input_dict` : Input dict data, required when configuring from testcase
* `build` : Only for initial setup phase this is set as True.
* `load_config` : Loading the config to router this is set as True.
Usage
-----
r1_ospf_auth = {
"r1": {
"links": {
"r2": {
"ospf": {
"authentication": "message-digest",
"authentication-key": "ospf",
"message-digest-key": "10"
}
}
}
}
}
result = config_ospf_interface(tgen, topo, r1_ospf_auth)
Returns
-------
True or False
"""
logger.debug("Enter lib config_ospf_interface")
result = False
if topo is None:
topo = tgen.json_topo
if not input_dict:
input_dict = deepcopy(topo)
else:
input_dict = deepcopy(input_dict)
config_data_dict = {}
for router in input_dict.keys():
config_data = []
for lnk in input_dict[router]["links"].keys():
if "ospf" not in input_dict[router]["links"][lnk]:
logger.debug(
"Router %s: ospf config is not present in" "input_dict", router
)
continue
ospf_data = input_dict[router]["links"][lnk]["ospf"]
data_ospf_area = ospf_data.setdefault("area", None)
data_ospf_auth = ospf_data.setdefault("authentication", None)
data_ospf_dr_priority = ospf_data.setdefault("priority", None)
data_ospf_cost = ospf_data.setdefault("cost", None)
data_ospf_mtu = ospf_data.setdefault("mtu_ignore", None)
try:
intf = topo["routers"][router]["links"][lnk]["interface"]
except KeyError:
intf = topo["switches"][router]["links"][lnk]["interface"]
# interface
cmd = "interface {}".format(intf)
config_data.append(cmd)
# interface area config
if data_ospf_area:
cmd = "ip ospf area {}".format(data_ospf_area)
config_data.append(cmd)
# interface ospf auth
if data_ospf_auth:
if data_ospf_auth == "null":
cmd = "ip ospf authentication null"
elif data_ospf_auth == "message-digest":
cmd = "ip ospf authentication message-digest"
elif data_ospf_auth == "key-chain":
cmd = "ip ospf authentication key-chain {}".format(
ospf_data["keychain"]
)
else:
cmd = "ip ospf authentication"
if "del_action" in ospf_data:
cmd = "no {}".format(cmd)
config_data.append(cmd)
if "message-digest-key" in ospf_data:
cmd = "ip ospf message-digest-key {} md5 {}".format(
ospf_data["message-digest-key"], ospf_data["authentication-key"]
)
if "del_action" in ospf_data:
cmd = "no {}".format(cmd)
config_data.append(cmd)
if (
"authentication-key" in ospf_data
and "message-digest-key" not in ospf_data
):
cmd = "ip ospf authentication-key {}".format(
ospf_data["authentication-key"]
)
if "del_action" in ospf_data:
cmd = "no {}".format(cmd)
config_data.append(cmd)
# interface ospf dr priority
if data_ospf_dr_priority:
cmd = "ip ospf priority {}".format(ospf_data["priority"])
if "del_action" in ospf_data:
cmd = "no {}".format(cmd)
config_data.append(cmd)
# interface ospf cost
if data_ospf_cost:
cmd = "ip ospf cost {}".format(ospf_data["cost"])
if "del_action" in ospf_data:
cmd = "no {}".format(cmd)
config_data.append(cmd)
# interface ospf mtu
if data_ospf_mtu:
cmd = "ip ospf mtu-ignore"
if "del_action" in ospf_data:
cmd = "no {}".format(cmd)
config_data.append(cmd)
if build:
return config_data
if config_data:
config_data_dict[router] = config_data
result = create_common_configurations(
tgen, config_data_dict, "interface_config", build=build
)
logger.debug("Exiting lib API: config_ospf_interface()")
return result
def clear_ospf(tgen, router, ospf=None):
"""
This API is to clear ospf neighborship by running
clear ip ospf interface * command,
Parameters
----------
* `tgen`: topogen object
* `router`: device under test
Usage
-----
clear_ospf(tgen, "r1")
"""
logger.debug("Entering lib API: clear_ospf()")
if router not in tgen.routers():
return False
rnode = tgen.routers()[router]
# Clearing OSPF
if ospf:
version = "ipv6"
else:
version = "ip"
cmd = "clear {} ospf interface".format(version)
logger.info("Clearing ospf process on router %s.. using command '%s'", router, cmd)
run_frr_cmd(rnode, cmd)
logger.debug("Exiting lib API: clear_ospf()")
def redistribute_ospf(tgen, topo, dut, route_type, **kwargs):
"""
Redstribution of routes inside ospf.
Parameters
----------
* `tgen`: Topogen object
* `topo` : json file data
* `dut`: device under test
* `route_type`: "static" or "connected" or ....
* `kwargs`: pass extra information (see below)
Usage
-----
redistribute_ospf(tgen, topo, "r0", "static", delete=True)
redistribute_ospf(tgen, topo, "r0", "static", route_map="rmap_ipv4")
"""
ospf_red = {dut: {"ospf": {"redistribute": [{"redist_type": route_type}]}}}
for k, v in kwargs.items():
ospf_red[dut]["ospf"]["redistribute"][0][k] = v
result = create_router_ospf(tgen, topo, ospf_red)
assert result is True, "Testcase : Failed \n Error: {}".format(result)
################################
# Verification procs
################################
@retry(retry_timeout=80)
def verify_ospf_neighbor(
tgen, topo=None, dut=None, input_dict=None, lan=False, expected=True
):
"""
This API is to verify ospf neighborship by running
show ip ospf neighbour command,
Parameters
----------
* `tgen` : Topogen object
* `topo` : json file data
* `dut`: device under test
* `input_dict` : Input dict data, required when configuring from testcase
* `lan` : verify neighbors in lan topology
* `expected` : expected results from API, by-default True
Usage
-----
1. To check FULL neighbors.
verify_ospf_neighbor(tgen, topo, dut=dut)
2. To check neighbors with their roles.
input_dict = {
"r0": {
"ospf": {
"neighbors": {
"r1": {
"nbrState": "Full",
"role": "DR"
},
"r2": {
"nbrState": "Full",
"role": "DROther"
},
"r3": {
"nbrState": "Full",
"role": "DROther"
}
}
}
}
}
result = verify_ospf_neighbor(tgen, topo, dut, input_dict, lan=True)
Returns
-------
True or False (Error Message)
"""
logger.debug("Entering lib API: verify_ospf_neighbor()")
result = False
if topo is None:
topo = tgen.json_topo
if input_dict:
for router, rnode in tgen.routers().items():
if "ospf" not in topo["routers"][router]:
continue
if dut is not None and dut != router:
continue
logger.info("Verifying OSPF neighborship on router %s:", router)
show_ospf_json = run_frr_cmd(
rnode, "show ip ospf neighbor all json", isjson=True
)
# Verifying output dictionary show_ospf_json is empty or not
if not bool(show_ospf_json):
errormsg = "OSPF is not running"
return errormsg
ospf_data_list = input_dict[router]["ospf"]
ospf_nbr_list = ospf_data_list["neighbors"]
for ospf_nbr, nbr_data in ospf_nbr_list.items():
data_ip = topo["routers"][ospf_nbr]["links"]
data_rid = topo["routers"][ospf_nbr]["ospf"]["router_id"]
if ospf_nbr in data_ip:
nbr_details = nbr_data[ospf_nbr]
elif lan:
for switch in topo["switches"]:
if "ospf" in topo["switches"][switch]["links"][router]:
neighbor_ip = data_ip[switch]["ipv4"].split("/")[0]
else:
continue
else:
neighbor_ip = data_ip[router]["ipv4"].split("/")[0]
nh_state = None
neighbor_ip = neighbor_ip.lower()
nbr_rid = data_rid
try:
nh_state = show_ospf_json[nbr_rid][0]["nbrState"].split("/")[0]
intf_state = show_ospf_json[nbr_rid][0]["nbrState"].split("/")[1]
except KeyError:
errormsg = "[DUT: {}] OSPF peer {} missing".format(router, nbr_rid)
return errormsg
nbr_state = nbr_data.setdefault("nbrState", None)
nbr_role = nbr_data.setdefault("role", None)
if nbr_state:
if nbr_state == nh_state:
logger.info(
"[DUT: {}] OSPF Nbr is {}:{} State {}".format(
router, ospf_nbr, nbr_rid, nh_state
)
)
result = True
else:
errormsg = (
"[DUT: {}] OSPF is not Converged, neighbor"
" state is {}".format(router, nh_state)
)
return errormsg
if nbr_role:
if nbr_role == intf_state:
logger.info(
"[DUT: {}] OSPF Nbr is {}: {} Role {}".format(
router, ospf_nbr, nbr_rid, nbr_role
)
)
else:
errormsg = (
"[DUT: {}] OSPF is not Converged with rid"
"{}, role is {}".format(router, nbr_rid, intf_state)
)
return errormsg
continue
else:
for router, rnode in tgen.routers().items():
if "ospf" not in topo["routers"][router]:
continue
if dut is not None and dut != router:
continue
logger.info("Verifying OSPF neighborship on router %s:", router)
show_ospf_json = run_frr_cmd(
rnode, "show ip ospf neighbor all json", isjson=True
)
# Verifying output dictionary show_ospf_json is empty or not
if not bool(show_ospf_json):
errormsg = "OSPF is not running"
return errormsg
ospf_data_list = topo["routers"][router]["ospf"]
ospf_neighbors = ospf_data_list["neighbors"]
total_peer = 0
total_peer = len(ospf_neighbors.keys())
no_of_ospf_nbr = 0
ospf_nbr_list = ospf_data_list["neighbors"]
no_of_peer = 0
for ospf_nbr, nbr_data in ospf_nbr_list.items():
if nbr_data:
data_ip = topo["routers"][nbr_data["nbr"]]["links"]
data_rid = topo["routers"][nbr_data["nbr"]]["ospf"]["router_id"]
else:
data_ip = topo["routers"][ospf_nbr]["links"]
data_rid = topo["routers"][ospf_nbr]["ospf"]["router_id"]
logger.info("ospf neighbor %s: router-id: %s", router, data_rid)
if ospf_nbr in data_ip:
nbr_details = nbr_data[ospf_nbr]
elif lan:
for switch in topo["switches"]:
if "ospf" in topo["switches"][switch]["links"][router]:
neighbor_ip = data_ip[switch]["ipv4"].split("/")[0]
else:
continue
else:
neighbor_ip = data_ip[router]["ipv4"].split("/")[0]
nh_state = None
neighbor_ip = neighbor_ip.lower()
nbr_rid = data_rid
try:
nh_state = show_ospf_json[nbr_rid][0]["nbrState"].split("/")[0]
except KeyError:
errormsg = (
"[DUT: {}] missing OSPF neighbor {} with router-id {}".format(
router, ospf_nbr, nbr_rid
)
)
return errormsg
if nh_state == "Full":
no_of_peer += 1
if no_of_peer == total_peer:
logger.info("[DUT: {}] OSPF is Converged".format(router))
result = True
else:
errormsg = "[DUT: {}] OSPF is not Converged".format(router)
return errormsg
logger.debug("Exiting API: verify_ospf_neighbor()")
return result
@retry(retry_timeout=50)
def verify_ospf6_neighbor(tgen, topo=None, dut=None, input_dict=None, lan=False):
"""
This API is to verify ospf neighborship by running
show ipv6 ospf neighbour command,
Parameters
----------
* `tgen` : Topogen object
* `topo` : json file data
* `dut`: device under test
* `input_dict` : Input dict data, required when configuring from testcase
* `lan` : verify neighbors in lan topology
Usage
-----
1. To check FULL neighbors.
verify_ospf_neighbor(tgen, topo, dut=dut)
2. To check neighbors with their roles.
input_dict = {
"r0": {
"ospf6": {
"neighbors": {
"r1": {
"state": "Full",
"role": "DR"
},
"r2": {
"state": "Full",
"role": "DROther"
},
"r3": {
"state": "Full",
"role": "DROther"
}
}
}
}
}
result = verify_ospf6_neighbor(tgen, topo, dut, input_dict, lan=True)
3. To check there are no neighbors.
input_dict = {
"r0": {
"ospf6": {
"neighbors": []
}
}
}
result = verify_ospf6_neighbor(tgen, topo, dut, input_dict)
Returns
-------
True or False (Error Message)
"""
logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
result = False
if topo is None:
topo = tgen.json_topo
if input_dict:
for router, rnode in tgen.routers().items():
if "ospf6" not in topo["routers"][router]:
continue
if dut is not None and dut != router:
continue
logger.info("Verifying OSPF neighborship on router %s:", router)
show_ospf_json = run_frr_cmd(
rnode, "show ipv6 ospf neighbor json", isjson=True
)
# Verifying output dictionary show_ospf_json is empty or not
if not bool(show_ospf_json):
errormsg = "OSPF6 is not running"
return errormsg
ospf_data_list = input_dict[router]["ospf6"]
ospf_nbr_list = ospf_data_list["neighbors"]
# Check if looking for no neighbors
if ospf_nbr_list == []:
if show_ospf_json["neighbors"] == []:
logger.info("[DUT: {}] OSPF6 no neighbors found".format(router))
return True
else:
errormsg = (
"[DUT: {}] OSPF6 active neighbors found, expected None".format(
router
)
)
return errormsg
for ospf_nbr, nbr_data in ospf_nbr_list.items():
try:
data_ip = data_rid = topo["routers"][ospf_nbr]["ospf6"]["router_id"]
except KeyError:
data_ip = data_rid = topo["routers"][nbr_data["nbr"]]["ospf6"][
"router_id"
]
if ospf_nbr in data_ip:
nbr_details = nbr_data[ospf_nbr]
elif lan:
for switch in topo["switches"]:
if "ospf6" in topo["switches"][switch]["links"][router]:
neighbor_ip = data_ip
else:
continue
else:
neighbor_ip = data_ip[router]["ipv6"].split("/")[0]
nh_state = None
neighbor_ip = neighbor_ip.lower()
nbr_rid = data_rid
get_index_val = dict(
(d["neighborId"], dict(d, index=index))
for (index, d) in enumerate(show_ospf_json["neighbors"])
)
try:
nh_state = get_index_val.get(neighbor_ip)["state"]
intf_state = get_index_val.get(neighbor_ip)["ifState"]
except TypeError:
errormsg = "[DUT: {}] OSPF peer {} missing,from " "{} ".format(
router, nbr_rid, ospf_nbr
)
return errormsg
nbr_state = nbr_data.setdefault("state", None)
nbr_role = nbr_data.setdefault("role", None)
if nbr_state:
if nbr_state == nh_state:
logger.info(
"[DUT: {}] OSPF6 Nbr is {}:{} State {}".format(
router, ospf_nbr, nbr_rid, nh_state
)
)
result = True
else:
errormsg = (
"[DUT: {}] OSPF6 is not Converged, neighbor"
" state is {} , Expected state is {}".format(
router, nh_state, nbr_state
)
)
return errormsg
if nbr_role:
if nbr_role == intf_state:
logger.info(
"[DUT: {}] OSPF6 Nbr is {}: {} Role {}".format(
router, ospf_nbr, nbr_rid, nbr_role
)
)
else:
errormsg = (
"[DUT: {}] OSPF6 is not Converged with rid"
"{}, role is {}, Expected role is {}".format(
router, nbr_rid, intf_state, nbr_role
)
)
return errormsg
continue
else:
for router, rnode in tgen.routers().items():
if "ospf6" not in topo["routers"][router]:
continue
if dut is not None and dut != router:
continue
logger.info("Verifying OSPF6 neighborship on router %s:", router)
show_ospf_json = run_frr_cmd(
rnode, "show ipv6 ospf neighbor json", isjson=True
)
# Verifying output dictionary show_ospf_json is empty or not
if not bool(show_ospf_json):
errormsg = "OSPF6 is not running"
return errormsg
ospf_data_list = topo["routers"][router]["ospf6"]
ospf_neighbors = ospf_data_list["neighbors"]
total_peer = 0
total_peer = len(ospf_neighbors.keys())
no_of_ospf_nbr = 0
ospf_nbr_list = ospf_data_list["neighbors"]
no_of_peer = 0
for ospf_nbr, nbr_data in ospf_nbr_list.items():
try:
data_ip = data_rid = topo["routers"][ospf_nbr]["ospf6"]["router_id"]
except KeyError:
data_ip = data_rid = topo["routers"][nbr_data["nbr"]]["ospf6"][
"router_id"
]
logger.info("ospf neighbor %s: router-id: %s", ospf_nbr, data_rid)
if ospf_nbr in data_ip:
nbr_details = nbr_data[ospf_nbr]
elif lan:
for switch in topo["switches"]:
if "ospf6" in topo["switches"][switch]["links"][router]:
neighbor_ip = data_ip
else:
continue
else:
neighbor_ip = data_ip
nh_state = None
neighbor_ip = neighbor_ip.lower()
nbr_rid = data_rid
get_index_val = dict(
(d["neighborId"], dict(d, index=index))
for (index, d) in enumerate(show_ospf_json["neighbors"])
)
try:
nh_state = get_index_val.get(neighbor_ip)["state"]
intf_state = get_index_val.get(neighbor_ip)["ifState"]
except TypeError:
errormsg = (
"[DUT: {}] missing OSPF neighbor {} with router-id {}".format(
router, ospf_nbr, nbr_rid
)
)
return errormsg
if nh_state == "Full":
no_of_peer += 1
if no_of_peer == total_peer:
logger.info("[DUT: {}] OSPF6 is Converged".format(router))
result = True
else:
errormsg = "[DUT: {}] OSPF6 is not Converged".format(router)
return errormsg
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return result
@retry(retry_timeout=40)
def verify_ospf_rib(
tgen, dut, input_dict, next_hop=None, tag=None, metric=None, fib=None, expected=True
):
"""
This API is to verify ospf routes by running
show ip ospf route command.
Parameters
----------
* `tgen` : Topogen object
* `dut`: device under test
* `input_dict` : Input dict data, required when configuring from testcase
* `next_hop` : next to be verified
* `tag` : tag to be verified
* `metric` : metric to be verified
* `fib` : True if the route is installed in FIB.
* `expected` : expected results from API, by-default True
Usage
-----
input_dict = {
"r1": {
"static_routes": [
{
"network": ip_net,
"no_of_ip": 1,
"routeType": "N"
}
]
}
}
result = verify_ospf_rib(tgen, dut, input_dict,next_hop=nh)
Returns
-------
True or False (Error Message)
"""
logger.info("Entering lib API: verify_ospf_rib()")
result = False
router_list = tgen.routers()
additional_nexthops_in_required_nhs = []
found_hops = []
for routerInput in input_dict.keys():
for router, rnode in router_list.items():
if router != dut:
continue
logger.info("Checking router %s RIB:", router)
# Verifying RIB routes
command = "show ip ospf route"
found_routes = []
missing_routes = []
if (
"static_routes" in input_dict[routerInput]
or "prefix" in input_dict[routerInput]
):
if "prefix" in input_dict[routerInput]:
static_routes = input_dict[routerInput]["prefix"]
else:
static_routes = input_dict[routerInput]["static_routes"]
for static_route in static_routes:
cmd = "{}".format(command)
cmd = "{} json".format(cmd)
ospf_rib_json = run_frr_cmd(rnode, cmd, isjson=True)
# Verifying output dictionary ospf_rib_json is not empty
if bool(ospf_rib_json) is False:
errormsg = (
"[DUT: {}] No routes found in OSPF route "
"table".format(router)
)
return errormsg
network = static_route["network"]
no_of_ip = static_route.setdefault("no_of_ip", 1)
_tag = static_route.setdefault("tag", None)
_rtype = static_route.setdefault("routeType", None)
# Generating IPs for verification
ip_list = generate_ips(network, no_of_ip)
st_found = False
nh_found = False
for st_rt in ip_list:
st_rt = str(ipaddress.ip_network(frr_unicode(st_rt)))
_addr_type = validate_ip_address(st_rt)
if _addr_type != "ipv4":
continue
if st_rt in ospf_rib_json:
st_found = True
found_routes.append(st_rt)
if fib and next_hop:
if type(next_hop) is not list:
next_hop = [next_hop]
for mnh in range(0, len(ospf_rib_json[st_rt])):
if (
"fib"
in ospf_rib_json[st_rt][mnh]["nexthops"][0]
):
found_hops.append(
[
rib_r["ip"]
for rib_r in ospf_rib_json[st_rt][mnh][
"nexthops"
]
]
)
if found_hops[0]:
missing_list_of_nexthops = set(
found_hops[0]
).difference(next_hop)
additional_nexthops_in_required_nhs = set(
next_hop
).difference(found_hops[0])
if additional_nexthops_in_required_nhs:
logger.info(
"Nexthop "
"%s is not active for route %s in "
"RIB of router %s\n",
additional_nexthops_in_required_nhs,
st_rt,
dut,
)
errormsg = (
"Nexthop {} is not active"
" for route {} in RIB of router"
" {}\n".format(
additional_nexthops_in_required_nhs,
st_rt,
dut,
)
)
return errormsg
else:
nh_found = True
elif next_hop and fib is None:
if type(next_hop) is not list:
next_hop = [next_hop]
found_hops = [
rib_r["ip"]
for rib_r in ospf_rib_json[st_rt]["nexthops"]
]
if found_hops:
missing_list_of_nexthops = set(
found_hops
).difference(next_hop)
additional_nexthops_in_required_nhs = set(
next_hop
).difference(found_hops)
if additional_nexthops_in_required_nhs:
logger.info(
"Missing nexthop %s for route"
" %s in RIB of router %s\n",
additional_nexthops_in_required_nhs,
st_rt,
dut,
)
errormsg = (
"Nexthop {} is Missing for "
"route {} in RIB of router {}\n".format(
additional_nexthops_in_required_nhs,
st_rt,
dut,
)
)
return errormsg
else:
nh_found = True
if _rtype:
if "routeType" not in ospf_rib_json[st_rt]:
errormsg = (
"[DUT: {}]: routeType missing"
" for route {} in OSPF RIB \n".format(
dut, st_rt
)
)
return errormsg
elif _rtype != ospf_rib_json[st_rt]["routeType"]:
errormsg = (
"[DUT: {}]: routeType mismatch"
" for route {} in OSPF RIB \n".format(
dut, st_rt
)
)
return errormsg
else:
logger.info(
"[DUT: {}]: Found routeType {}"
" for route {}".format(dut, _rtype, st_rt)
)
if tag:
if "tag" not in ospf_rib_json[st_rt]:
errormsg = (
"[DUT: {}]: tag is not"
" present for"
" route {} in RIB \n".format(dut, st_rt)
)
return errormsg
if _tag != ospf_rib_json[st_rt]["tag"]:
errormsg = (
"[DUT: {}]: tag value {}"
" is not matched for"
" route {} in RIB \n".format(
dut,
_tag,
st_rt,
)
)
return errormsg
if metric is not None:
if "type2cost" not in ospf_rib_json[st_rt]:
errormsg = (
"[DUT: {}]: metric is"
" not present for"
" route {} in RIB \n".format(dut, st_rt)
)
return errormsg
if metric != ospf_rib_json[st_rt]["type2cost"]:
errormsg = (
"[DUT: {}]: metric value "
"{} is not matched for "
"route {} in RIB \n".format(
dut,
metric,
st_rt,
)
)
return errormsg
else:
missing_routes.append(st_rt)
if nh_found:
logger.info(
"[DUT: {}]: Found next_hop {} for all OSPF"
" routes in RIB".format(router, next_hop)
)
if len(missing_routes) > 0:
errormsg = "[DUT: {}]: Missing route in RIB, " "routes: {}".format(
dut, missing_routes
)
return errormsg
if found_routes:
logger.info(
"[DUT: %s]: Verified routes in RIB, found" " routes are: %s\n",
dut,
found_routes,
)
result = True
logger.info("Exiting lib API: verify_ospf_rib()")
return result
@retry(retry_timeout=20)
def verify_ospf_interface(
tgen, topo=None, dut=None, lan=False, input_dict=None, expected=True
):
"""
This API is to verify ospf routes by running
show ip ospf interface command.
Parameters
----------
* `tgen` : Topogen object
* `topo` : topology descriptions
* `dut`: device under test
* `lan`: if set to true this interface belongs to LAN.
* `input_dict` : Input dict data, required when configuring from testcase
* `expected` : expected results from API, by-default True
Usage
-----
input_dict= {
'r0': {
'links':{
's1': {
'ospf':{
'priority':98,
'timerDeadSecs': 4,
'area': '0.0.0.3',
'mcastMemberOspfDesignatedRouters': True,
'mcastMemberOspfAllRouters': True,
'ospfEnabled': True,
}
}
}
}
}
result = verify_ospf_interface(tgen, topo, dut=dut, input_dict=input_dict)
Returns
-------
True or False (Error Message)
"""
logger.debug("Entering lib API: verify_ospf_interface()")
result = False
if topo is None:
topo = tgen.json_topo
for router, rnode in tgen.routers().items():
if "ospf" not in topo["routers"][router]:
continue
if dut is not None and dut != router:
continue
logger.info("Verifying OSPF interface on router %s:", router)
show_ospf_json = run_frr_cmd(rnode, "show ip ospf interface json", isjson=True)
# Verifying output dictionary show_ospf_json is empty or not
if not bool(show_ospf_json):
errormsg = "OSPF is not running"
return errormsg
# To find neighbor ip type
ospf_intf_data = input_dict[router]["links"]
for ospf_intf, intf_data in ospf_intf_data.items():
intf = topo["routers"][router]["links"][ospf_intf]["interface"]
if intf in show_ospf_json["interfaces"]:
for intf_attribute in intf_data["ospf"]:
if (
intf_data["ospf"][intf_attribute]
== show_ospf_json["interfaces"][intf][intf_attribute]
):
logger.info(
"[DUT: %s] OSPF interface %s: %s is %s",
router,
intf,
intf_attribute,
intf_data["ospf"][intf_attribute],
)
else:
errormsg = "[DUT: {}] OSPF interface {}: {} is {}, \
Expected is {}".format(
router,
intf,
intf_attribute,
intf_data["ospf"][intf_attribute],
show_ospf_json["interfaces"][intf][intf_attribute],
)
return errormsg
result = True
logger.debug("Exiting API: verify_ospf_interface()")
return result
@retry(retry_timeout=40)
def verify_ospf_database(
tgen, topo, dut, input_dict, vrf=None, lsatype=None, rid=None, expected=True
):
"""
This API is to verify ospf lsa's by running
show ip ospf database command.
Parameters
----------
* `tgen` : Topogen object
* `dut`: device under test
* `input_dict` : Input dict data, required when configuring from testcase
* `topo` : next to be verified
* `expected` : expected results from API, by-default True
Usage
-----
input_dict = {
"areas": {
"0.0.0.0": {
"Router Link States": {
"100.1.1.0-100.1.1.0": {
"LSID": "100.1.1.0",
"Advertised router": "100.1.1.0",
"LSA Age": 130,
"Sequence Number": "80000006",
"Checksum": "a703",
"Router links": 3
}
},
"Net Link States": {
"10.0.0.2-100.1.1.1": {
"LSID": "10.0.0.2",
"Advertised router": "100.1.1.1",
"LSA Age": 137,
"Sequence Number": "80000001",
"Checksum": "9583"
}
},
},
}
}
result = verify_ospf_database(tgen, topo, dut, input_dict)
Returns
-------
True or False (Error Message)
"""
result = False
router = dut
logger.debug("Entering lib API: verify_ospf_database()")
if "ospf" not in topo["routers"][dut]:
errormsg = "[DUT: {}] OSPF is not configured on the router.".format(dut)
return errormsg
rnode = tgen.routers()[dut]
logger.info("Verifying OSPF interface on router %s:", dut)
if not rid:
rid = "self-originate"
if lsatype:
if vrf is None:
command = "show ip ospf database {} {} json".format(lsatype, rid)
else:
command = "show ip ospf database {} {} vrf {} json".format(
lsatype, rid, vrf
)
else:
if vrf is None:
command = "show ip ospf database json"
else:
command = "show ip ospf database vrf {} json".format(vrf)
show_ospf_json = run_frr_cmd(rnode, command, isjson=True)
# Verifying output dictionary show_ospf_json is empty or not
if not bool(show_ospf_json):
errormsg = "OSPF is not running"
return errormsg
# for inter and inter lsa's
ospf_db_data = input_dict.setdefault("areas", None)
ospf_external_lsa = input_dict.setdefault("AS External Link States", None)
# import pdb; pdb.set_trace()
if ospf_db_data:
for ospf_area, area_lsa in ospf_db_data.items():
if ospf_area in show_ospf_json["routerLinkStates"]["areas"]:
if "routerLinkStates" in area_lsa:
for lsa in area_lsa["routerLinkStates"]:
_advrtr = lsa.setdefault("advertisedRouter", None)
_options = lsa.setdefault("options", None)
if (
_options
and lsa["lsaId"]
== show_ospf_json["routerLinkStates"]["areas"][ospf_area][
0
]["linkStateId"]
and lsa["options"]
== show_ospf_json["routerLinkStates"]["areas"][ospf_area][
0
]["options"]
):
result = True
break
else:
errormsg = '[DUT: {}] OSPF LSA options: expected {}, Received Options are {} lsa["options"] {} OSPF LSAID: expected lsaid {}, Received lsaid {}'.format(
dut,
show_ospf_json["routerLinkStates"]["areas"][ospf_area][
0
]["options"],
_options,
lsa["options"],
show_ospf_json["routerLinkStates"]["areas"][ospf_area][
0
]["linkStateId"],
lsa["lsaId"],
)
return errormsg
if "Net Link States" in area_lsa:
for lsa in area_lsa["Net Link States"]:
if lsa in show_ospf_json["areas"][ospf_area]["Net Link States"]:
logger.info(
"[DUT: %s] OSPF LSDB area %s:Network " "LSA %s",
router,
ospf_area,
lsa,
)
result = True
else:
errormsg = (
"[DUT: {}] OSPF LSDB area {}: expected"
" Network LSA is {}".format(router, ospf_area, lsa)
)
return errormsg
if "Summary Link States" in area_lsa:
for lsa in area_lsa["Summary Link States"]:
if (
lsa
in show_ospf_json["areas"][ospf_area]["Summary Link States"]
):
logger.info(
"[DUT: %s] OSPF LSDB area %s:Summary " "LSA %s",
router,
ospf_area,
lsa,
)
result = True
else:
errormsg = (
"[DUT: {}] OSPF LSDB area {}: expected"
" Summary LSA is {}".format(router, ospf_area, lsa)
)
return errormsg
if "ASBR-Summary Link States" in area_lsa:
for lsa in area_lsa["ASBR-Summary Link States"]:
if (
lsa
in show_ospf_json["areas"][ospf_area][
"ASBR-Summary Link States"
]
):
logger.info(
"[DUT: %s] OSPF LSDB area %s:ASBR Summary " "LSA %s",
router,
ospf_area,
lsa,
)
result = True
else:
errormsg = (
"[DUT: {}] OSPF LSDB area {}: expected"
" ASBR Summary LSA is {}".format(router, ospf_area, lsa)
)
return errormsg
if ospf_external_lsa:
for ospf_ext_lsa, ext_lsa_data in ospf_external_lsa.items():
if ospf_ext_lsa in show_ospf_json["AS External Link States"]:
logger.info(
"[DUT: %s] OSPF LSDB:External LSA %s", router, ospf_ext_lsa
)
result = True
else:
errormsg = (
"[DUT: {}] OSPF LSDB : expected"
" External LSA is {}".format(router, ospf_ext_lsa)
)
return errormsg
logger.debug("Exiting API: verify_ospf_database()")
return result
@retry(retry_timeout=20)
def verify_ospf_summary(tgen, topo, dut, input_dict, ospf=None, expected=True):
"""
This API is to verify ospf routes by running
show ip ospf interface command.
Parameters
----------
* `tgen` : Topogen object
* `topo` : topology descriptions
* `dut`: device under test
* `input_dict` : Input dict data, required when configuring from testcase
Usage
-----
input_dict = {
"11.0.0.0/8": {
"summaryAddress": "11.0.0.0/8",
"metricType": "E2",
"metric": 20,
"tag": 0,
"externalRouteCount": 5
}
}
result = verify_ospf_summary(tgen, topo, dut, input_dict)
Returns
-------
True or False (Error Message)
"""
logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
result = False
router = dut
logger.info("Verifying OSPF summary on router %s:", router)
rnode = tgen.routers()[dut]
if ospf:
if "ospf6" not in topo["routers"][dut]:
errormsg = "[DUT: {}] OSPF6 is not configured on the router.".format(router)
return errormsg
show_ospf_json = run_frr_cmd(
rnode, "show ipv6 ospf summary detail json", isjson=True
)
else:
if "ospf" not in topo["routers"][dut]:
errormsg = "[DUT: {}] OSPF is not configured on the router.".format(router)
return errormsg
show_ospf_json = run_frr_cmd(
rnode, "show ip ospf summary detail json", isjson=True
)
# Verifying output dictionary show_ospf_json is empty or not
if not bool(show_ospf_json):
errormsg = "OSPF is not running"
return errormsg
# To find neighbor ip type
ospf_summary_data = input_dict
if ospf:
show_ospf_json = show_ospf_json["default"]
for ospf_summ, summ_data in ospf_summary_data.items():
if ospf_summ not in show_ospf_json:
continue
summary = ospf_summary_data[ospf_summ]["summaryAddress"]
if summary in show_ospf_json:
for summ in summ_data:
if summ_data[summ] == show_ospf_json[summary][summ]:
logger.info(
"[DUT: %s] OSPF summary %s:%s is %s",
router,
summary,
summ,
summ_data[summ],
)
result = True
else:
errormsg = (
"[DUT: {}] OSPF summary {} : {} is {}, "
"Expected is {}".format(
router,
summary,
summ,
show_ospf_json[summary][summ],
summ_data[summ],
)
)
return errormsg
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return result
@retry(retry_timeout=30)
def verify_ospf6_rib(
tgen, dut, input_dict, next_hop=None, tag=None, metric=None, fib=None
):
"""
This API is to verify ospf routes by running
show ip ospf route command.
Parameters
----------
* `tgen` : Topogen object
* `dut`: device under test
* `input_dict` : Input dict data, required when configuring from testcase
* `next_hop` : next to be verified
* `tag` : tag to be verified
* `metric` : metric to be verified
* `fib` : True if the route is installed in FIB.
Usage
-----
input_dict = {
"r1": {
"static_routes": [
{
"network": ip_net,
"no_of_ip": 1,
"routeType": "N"
}
]
}
}
result = verify_ospf6_rib(tgen, dut, input_dict,next_hop=nh)
Returns
-------
True or False (Error Message)
"""
logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
result = False
router_list = tgen.routers()
additional_nexthops_in_required_nhs = []
found_hops = []
for routerInput in input_dict.keys():
for router, rnode in router_list.items():
if router != dut:
continue
logger.info("Checking router %s RIB:", router)
# Verifying RIB routes
command = "show ipv6 ospf route detail"
found_routes = []
missing_routes = []
if (
"static_routes" in input_dict[routerInput]
or "prefix" in input_dict[routerInput]
):
if "prefix" in input_dict[routerInput]:
static_routes = input_dict[routerInput]["prefix"]
else:
static_routes = input_dict[routerInput]["static_routes"]
for static_route in static_routes:
cmd = "{}".format(command)
cmd = "{} json".format(cmd)
ospf_rib_json = run_frr_cmd(rnode, cmd, isjson=True)
# Fix for PR 2644182
try:
ospf_rib_json = ospf_rib_json["routes"]
except KeyError:
pass
# Verifying output dictionary ospf_rib_json is not empty
if bool(ospf_rib_json) is False:
errormsg = (
"[DUT: {}] No routes found in OSPF6 route "
"table".format(router)
)
return errormsg
network = static_route["network"]
no_of_ip = static_route.setdefault("no_of_ip", 1)
_tag = static_route.setdefault("tag", None)
_rtype = static_route.setdefault("routeType", None)
# Generating IPs for verification
ip_list = generate_ips(network, no_of_ip)
if len(ip_list) == 1:
ip_list = [network]
st_found = False
nh_found = False
for st_rt in ip_list:
st_rt = str(ipaddress.ip_network(frr_unicode(st_rt)))
_addr_type = validate_ip_address(st_rt)
if _addr_type != "ipv6":
continue
if st_rt in ospf_rib_json:
st_found = True
found_routes.append(st_rt)
if fib and next_hop:
if type(next_hop) is not list:
next_hop = [next_hop]
for mnh in range(0, len(ospf_rib_json[st_rt])):
if (
"fib"
in ospf_rib_json[st_rt][mnh]["nextHops"][0]
):
found_hops.append(
[
rib_r["ip"]
for rib_r in ospf_rib_json[st_rt][mnh][
"nextHops"
]
]
)
if found_hops[0]:
missing_list_of_nexthops = set(
found_hops[0]
).difference(next_hop)
additional_nexthops_in_required_nhs = set(
next_hop
).difference(found_hops[0])
if additional_nexthops_in_required_nhs:
logger.info(
"Nexthop "
"%s is not active for route %s in "
"RIB of router %s\n",
additional_nexthops_in_required_nhs,
st_rt,
dut,
)
errormsg = (
"Nexthop {} is not active"
" for route {} in RIB of router"
" {}\n".format(
additional_nexthops_in_required_nhs,
st_rt,
dut,
)
)
return errormsg
else:
nh_found = True
elif next_hop and fib is None:
if type(next_hop) is not list:
next_hop = [next_hop]
found_hops = [
rib_r["nextHop"]
for rib_r in ospf_rib_json[st_rt]["nextHops"]
]
if found_hops:
missing_list_of_nexthops = set(
found_hops
).difference(next_hop)
additional_nexthops_in_required_nhs = set(
next_hop
).difference(found_hops)
if additional_nexthops_in_required_nhs:
logger.info(
"Missing nexthop %s for route"
" %s in RIB of router %s\n",
additional_nexthops_in_required_nhs,
st_rt,
dut,
)
errormsg = (
"Nexthop {} is Missing for "
"route {} in RIB of router {}\n".format(
additional_nexthops_in_required_nhs,
st_rt,
dut,
)
)
return errormsg
else:
nh_found = True
if _rtype:
if "destinationType" not in ospf_rib_json[st_rt]:
errormsg = (
"[DUT: {}]: destinationType missing"
"for route {} in OSPF RIB \n".format(dut, st_rt)
)
return errormsg
elif _rtype != ospf_rib_json[st_rt]["destinationType"]:
errormsg = (
"[DUT: {}]: destinationType mismatch"
"for route {} in OSPF RIB \n".format(dut, st_rt)
)
return errormsg
else:
logger.info(
"DUT: {}]: Found destinationType {}"
"for route {}".format(dut, _rtype, st_rt)
)
if tag:
if "tag" not in ospf_rib_json[st_rt]:
errormsg = (
"[DUT: {}]: tag is not"
" present for"
" route {} in RIB \n".format(dut, st_rt)
)
return errormsg
if _tag != ospf_rib_json[st_rt]["tag"]:
errormsg = (
"[DUT: {}]: tag value {}"
" is not matched for"
" route {} in RIB \n".format(
dut,
_tag,
st_rt,
)
)
return errormsg
if metric is not None:
if "metricCostE2" not in ospf_rib_json[st_rt]:
errormsg = (
"[DUT: {}]: metric is"
" not present for"
" route {} in RIB \n".format(dut, st_rt)
)
return errormsg
if metric != ospf_rib_json[st_rt]["metricCostE2"]:
errormsg = (
"[DUT: {}]: metric value "
"{} is not matched for "
"route {} in RIB \n".format(
dut,
metric,
st_rt,
)
)
return errormsg
else:
missing_routes.append(st_rt)
if nh_found:
logger.info(
"[DUT: {}]: Found next_hop {} for all OSPF"
" routes in RIB".format(router, next_hop)
)
if len(missing_routes) > 0:
errormsg = "[DUT: {}]: Missing route in RIB, " "routes: {}".format(
dut, missing_routes
)
return errormsg
if found_routes:
logger.info(
"[DUT: %s]: Verified routes in RIB, found" " routes are: %s\n",
dut,
found_routes,
)
result = True
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return result
@retry(retry_timeout=6)
def verify_ospf6_interface(tgen, topo=None, dut=None, lan=False, input_dict=None):
"""
This API is to verify ospf routes by running
show ip ospf interface command.
Parameters
----------
* `tgen` : Topogen object
* `topo` : topology descriptions
* `dut`: device under test
* `lan`: if set to true this interface belongs to LAN.
* `input_dict` : Input dict data, required when configuring from testcase
Usage
-----
input_dict= {
'r0': {
'links':{
's1': {
'ospf6':{
'priority':98,
'timerDeadSecs': 4,
'area': '0.0.0.3',
'mcastMemberOspfDesignatedRouters': True,
'mcastMemberOspfAllRouters': True,
'ospfEnabled': True,
}
}
}
}
}
result = verify_ospf_interface(tgen, topo, dut=dut, input_dict=input_dict)
Returns
-------
True or False (Error Message)
"""
logger.debug("Entering lib API: verify_ospf6_interface")
result = False
if topo is None:
topo = tgen.json_topo
for router, rnode in tgen.routers().items():
if "ospf6" not in topo["routers"][router]:
continue
if dut is not None and dut != router:
continue
logger.info("Verifying OSPF interface on router %s:", router)
show_ospf_json = run_frr_cmd(
rnode, "show ipv6 ospf interface json", isjson=True
)
# Verifying output dictionary show_ospf_json is empty or not
if not bool(show_ospf_json):
errormsg = "OSPF6 is not running"
return errormsg
# To find neighbor ip type
ospf_intf_data = input_dict[router]["links"]
for ospf_intf, intf_data in ospf_intf_data.items():
intf = topo["routers"][router]["links"][ospf_intf]["interface"]
if intf in show_ospf_json:
for intf_attribute in intf_data["ospf6"]:
if intf_data["ospf6"][intf_attribute] is not list:
if (
intf_data["ospf6"][intf_attribute]
== show_ospf_json[intf][intf_attribute]
):
logger.info(
"[DUT: %s] OSPF6 interface %s: %s is %s",
router,
intf,
intf_attribute,
intf_data["ospf6"][intf_attribute],
)
elif intf_data["ospf6"][intf_attribute] is list:
for addr_list in len(show_ospf_json[intf][intf_attribute]):
if (
show_ospf_json[intf][intf_attribute][addr_list][
"address"
].split("/")[0]
== intf_data["ospf6"]["internetAddress"][0]["address"]
):
break
else:
errormsg = "[DUT: {}] OSPF6 interface {}: {} is {}, \
Expected is {}".format(
router,
intf,
intf_attribute,
intf_data["ospf6"][intf_attribute],
intf_data["ospf6"][intf_attribute],
)
return errormsg
else:
errormsg = "[DUT: {}] OSPF6 interface {}: {} is {}, \
Expected is {}".format(
router,
intf,
intf_attribute,
intf_data["ospf6"][intf_attribute],
intf_data["ospf6"][intf_attribute],
)
return errormsg
result = True
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return result
@retry(retry_timeout=20)
def verify_ospf6_database(tgen, topo, dut, input_dict):
"""
This API is to verify ospf lsa's by running
show ip ospf database command.
Parameters
----------
* `tgen` : Topogen object
* `dut`: device under test
* `input_dict` : Input dict data, required when configuring from testcase
* `topo` : next to be verified
Usage
-----
input_dict = {
"areas": {
"0.0.0.0": {
"routerLinkStates": {
"100.1.1.0-100.1.1.0": {
"LSID": "100.1.1.0",
"Advertised router": "100.1.1.0",
"LSA Age": 130,
"Sequence Number": "80000006",
"Checksum": "a703",
"Router links": 3
}
},
"networkLinkStates": {
"10.0.0.2-100.1.1.1": {
"LSID": "10.0.0.2",
"Advertised router": "100.1.1.1",
"LSA Age": 137,
"Sequence Number": "80000001",
"Checksum": "9583"
}
},
},
}
}
result = verify_ospf_database(tgen, topo, dut, input_dict)
Returns
-------
True or False (Error Message)
"""
result = False
router = dut
logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
if "ospf" not in topo["routers"][dut]:
errormsg = "[DUT: {}] OSPF is not configured on the router.".format(dut)
return errormsg
rnode = tgen.routers()[dut]
logger.info("Verifying OSPF interface on router %s:", dut)
show_ospf_json = run_frr_cmd(rnode, "show ip ospf database json", isjson=True)
# Verifying output dictionary show_ospf_json is empty or not
if not bool(show_ospf_json):
errormsg = "OSPF is not running"
return errormsg
# for inter and inter lsa's
ospf_db_data = input_dict.setdefault("areas", None)
ospf_external_lsa = input_dict.setdefault("asExternalLinkStates", None)
if ospf_db_data:
for ospf_area, area_lsa in ospf_db_data.items():
if ospf_area in show_ospf_json["areas"]:
if "routerLinkStates" in area_lsa:
for lsa in area_lsa["routerLinkStates"]:
for rtrlsa in show_ospf_json["areas"][ospf_area][
"routerLinkStates"
]:
if (
lsa["lsaId"] == rtrlsa["lsaId"]
and lsa["advertisedRouter"]
== rtrlsa["advertisedRouter"]
):
result = True
break
if result:
logger.info(
"[DUT: %s] OSPF LSDB area %s:Router " "LSA %s",
router,
ospf_area,
lsa,
)
break
else:
errormsg = (
"[DUT: {}] OSPF LSDB area {}: expected"
" Router LSA is {}".format(router, ospf_area, lsa)
)
return errormsg
if "networkLinkStates" in area_lsa:
for lsa in area_lsa["networkLinkStates"]:
for netlsa in show_ospf_json["areas"][ospf_area][
"networkLinkStates"
]:
if (
lsa
in show_ospf_json["areas"][ospf_area][
"networkLinkStates"
]
):
if (
lsa["lsaId"] == netlsa["lsaId"]
and lsa["advertisedRouter"]
== netlsa["advertisedRouter"]
):
result = True
break
if result:
logger.info(
"[DUT: %s] OSPF LSDB area %s:Network " "LSA %s",
router,
ospf_area,
lsa,
)
break
else:
errormsg = (
"[DUT: {}] OSPF LSDB area {}: expected"
" Network LSA is {}".format(router, ospf_area, lsa)
)
return errormsg
if "summaryLinkStates" in area_lsa:
for lsa in area_lsa["summaryLinkStates"]:
for t3lsa in show_ospf_json["areas"][ospf_area][
"summaryLinkStates"
]:
if (
lsa["lsaId"] == t3lsa["lsaId"]
and lsa["advertisedRouter"] == t3lsa["advertisedRouter"]
):
result = True
break
if result:
logger.info(
"[DUT: %s] OSPF LSDB area %s:Summary " "LSA %s",
router,
ospf_area,
lsa,
)
break
else:
errormsg = (
"[DUT: {}] OSPF LSDB area {}: expected"
" Summary LSA is {}".format(router, ospf_area, lsa)
)
return errormsg
if "nssaExternalLinkStates" in area_lsa:
for lsa in area_lsa["nssaExternalLinkStates"]:
for t7lsa in show_ospf_json["areas"][ospf_area][
"nssaExternalLinkStates"
]:
if (
lsa["lsaId"] == t7lsa["lsaId"]
and lsa["advertisedRouter"] == t7lsa["advertisedRouter"]
):
result = True
break
if result:
logger.info(
"[DUT: %s] OSPF LSDB area %s:Type7 " "LSA %s",
router,
ospf_area,
lsa,
)
break
else:
errormsg = (
"[DUT: {}] OSPF LSDB area {}: expected"
" Type7 LSA is {}".format(router, ospf_area, lsa)
)
return errormsg
if "asbrSummaryLinkStates" in area_lsa:
for lsa in area_lsa["asbrSummaryLinkStates"]:
for t4lsa in show_ospf_json["areas"][ospf_area][
"asbrSummaryLinkStates"
]:
if (
lsa["lsaId"] == t4lsa["lsaId"]
and lsa["advertisedRouter"] == t4lsa["advertisedRouter"]
):
result = True
break
if result:
logger.info(
"[DUT: %s] OSPF LSDB area %s:ASBR Summary " "LSA %s",
router,
ospf_area,
lsa,
)
result = True
else:
errormsg = (
"[DUT: {}] OSPF LSDB area {}: expected"
" ASBR Summary LSA is {}".format(router, ospf_area, lsa)
)
return errormsg
if "linkLocalOpaqueLsa" in area_lsa:
for lsa in area_lsa["linkLocalOpaqueLsa"]:
try:
for lnklsa in show_ospf_json["areas"][ospf_area][
"linkLocalOpaqueLsa"
]:
if (
lsa["lsaId"] in lnklsa["lsaId"]
and "linkLocalOpaqueLsa"
in show_ospf_json["areas"][ospf_area]
):
logger.info(
(
"[DUT: FRR] OSPF LSDB area %s:Opaque-LSA"
"%s",
ospf_area,
lsa,
)
)
result = True
else:
errormsg = (
"[DUT: FRR] OSPF LSDB area: {} "
"expected Opaque-LSA is {}, Found is {}".format(
ospf_area, lsa, show_ospf_json
)
)
raise ValueError(errormsg)
return errormsg
except KeyError:
errormsg = "[DUT: FRR] linkLocalOpaqueLsa Not " "present"
return errormsg
if ospf_external_lsa:
for lsa in ospf_external_lsa:
try:
for t5lsa in show_ospf_json["asExternalLinkStates"]:
if (
lsa["lsaId"] == t5lsa["lsaId"]
and lsa["advertisedRouter"] == t5lsa["advertisedRouter"]
):
result = True
break
except KeyError:
result = False
if result:
logger.info("[DUT: %s] OSPF LSDB:External LSA %s", router, lsa)
result = True
else:
errormsg = (
"[DUT: {}] OSPF LSDB : expected"
" External LSA is {}".format(router, lsa)
)
return errormsg
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return result
def config_ospf6_interface(
tgen, topo=None, input_dict=None, build=False, load_config=True
):
"""
API to configure ospf on router.
Parameters
----------
* `tgen` : Topogen object
* `topo` : json file data
* `input_dict` : Input dict data, required when configuring from testcase
* `build` : Only for initial setup phase this is set as True.
* `load_config` : Loading the config to router this is set as True.
Usage
-----
r1_ospf_auth = {
"r1": {
"links": {
"r2": {
"ospf": {
"authentication": 'message-digest',
"authentication-key": "ospf",
"message-digest-key": "10"
}
}
}
}
}
result = config_ospf6_interface(tgen, topo, r1_ospf_auth)
Returns
-------
True or False
"""
logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
result = False
if topo is None:
topo = tgen.json_topo
if not input_dict:
input_dict = deepcopy(topo)
else:
input_dict = deepcopy(input_dict)
config_data_dict = {}
for router in input_dict.keys():
config_data = []
for lnk in input_dict[router]["links"].keys():
if "ospf6" not in input_dict[router]["links"][lnk]:
logger.debug(
"Router %s: ospf6 config is not present in"
"input_dict, passed input_dict %s",
router,
str(input_dict),
)
continue
ospf_data = input_dict[router]["links"][lnk]["ospf6"]
data_ospf_area = ospf_data.setdefault("area", None)
data_ospf_auth = ospf_data.setdefault("hash-algo", None)
data_ospf_keychain = ospf_data.setdefault("keychain", None)
data_ospf_dr_priority = ospf_data.setdefault("priority", None)
data_ospf_cost = ospf_data.setdefault("cost", None)
data_ospf_mtu = ospf_data.setdefault("mtu_ignore", None)
try:
intf = topo["routers"][router]["links"][lnk]["interface"]
except KeyError:
intf = topo["switches"][router]["links"][lnk]["interface"]
# interface
cmd = "interface {}".format(intf)
config_data.append(cmd)
# interface area config
if data_ospf_area:
cmd = "ipv6 ospf area {}".format(data_ospf_area)
config_data.append(cmd)
# interface ospf auth
if data_ospf_auth:
cmd = "ipv6 ospf6 authentication"
if "del_action" in ospf_data:
cmd = "no {}".format(cmd)
if "hash-algo" in ospf_data:
cmd = "{} key-id {} hash-algo {} key {}".format(
cmd,
ospf_data["key-id"],
ospf_data["hash-algo"],
ospf_data["key"],
)
config_data.append(cmd)
# interface ospf auth with keychain
if data_ospf_keychain:
cmd = "ipv6 ospf6 authentication"
if "del_action" in ospf_data:
cmd = "no {}".format(cmd)
if "keychain" in ospf_data:
cmd = "{} keychain {}".format(cmd, ospf_data["keychain"])
config_data.append(cmd)
# interface ospf dr priority
if data_ospf_dr_priority:
cmd = "ipv6 ospf priority {}".format(ospf_data["priority"])
if "del_action" in ospf_data:
cmd = "no {}".format(cmd)
config_data.append(cmd)
# interface ospf cost
if data_ospf_cost:
cmd = "ipv6 ospf cost {}".format(ospf_data["cost"])
if "del_action" in ospf_data:
cmd = "no {}".format(cmd)
config_data.append(cmd)
# interface ospf mtu
if data_ospf_mtu:
cmd = "ipv6 ospf mtu-ignore"
if "del_action" in ospf_data:
cmd = "no {}".format(cmd)
config_data.append(cmd)
if build:
return config_data
if config_data:
config_data_dict[router] = config_data
result = create_common_configurations(
tgen, config_data_dict, "interface_config", build=build
)
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return result
@retry(retry_timeout=20)
def verify_ospf_gr_helper(tgen, topo, dut, input_dict=None):
"""
This API is used to vreify gr helper using command
show ip ospf graceful-restart helper
Parameters
----------
* `tgen` : Topogen object
* `topo` : topology descriptions
* 'dut' : router
* 'input_dict' - values to be verified
Usage:
-------
input_dict = {
"helperSupport":"Disabled",
"strictLsaCheck":"Enabled",
"restartSupport":"Planned and Unplanned Restarts",
"supportedGracePeriod":1800
}
result = verify_ospf_gr_helper(tgen, topo, dut, input_dict)
"""
logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
result = False
if "ospf" not in topo["routers"][dut]:
errormsg = "[DUT: {}] OSPF is not configured on the router.".format(dut)
return errormsg
rnode = tgen.routers()[dut]
logger.info("Verifying OSPF GR details on router %s:", dut)
show_ospf_json = run_frr_cmd(
rnode, "show ip ospf graceful-restart helper json", isjson=True
)
# Verifying output dictionary show_ospf_json is empty or not
if not bool(show_ospf_json):
errormsg = "OSPF is not running"
raise ValueError(errormsg)
return errormsg
for ospf_gr, gr_data in input_dict.items():
try:
if input_dict[ospf_gr] == show_ospf_json[ospf_gr]:
logger.info(
"[DUT: FRR] OSPF GR Helper: %s is %s",
ospf_gr,
show_ospf_json[ospf_gr],
)
result = True
else:
errormsg = (
"[DUT: FRR] OSPF GR Helper: {} expected is {}, Found "
"is {}".format(
ospf_gr, input_dict[ospf_gr], show_ospf_json[ospf_gr]
)
)
raise ValueError(errormsg)
return errormsg
except KeyError:
errormsg = "[DUT: FRR] OSPF GR Helper: {}".format(ospf_gr)
return errormsg
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return result
def get_ospf_database(tgen, topo, dut, input_dict, vrf=None, lsatype=None, rid=None):
"""
This API is to return ospf lsa's by running
show ip ospf database command.
Parameters
----------
* `tgen` : Topogen object
* `dut`: device under test
* `input_dict` : Input dict data, required when configuring from testcase
* `topo` : next to be verified
* `vrf` : vrf to be checked
* `lsatype` : type of lsa to be checked
* `rid` : router id for lsa to be checked
Usage
-----
input_dict = {
"areas": {
"0.0.0.0": {
"routerLinkStates": {
"100.1.1.0-100.1.1.0": {
"LSID": "100.1.1.0",
"Advertised router": "100.1.1.0",
"LSA Age": 130,
"Sequence Number": "80000006",
"Checksum": "a703",
"Router links": 3
}
},
"networkLinkStates": {
"10.0.0.2-100.1.1.1": {
"LSID": "10.0.0.2",
"Advertised router": "100.1.1.1",
"LSA Age": 137,
"Sequence Number": "80000001",
"Checksum": "9583"
}
},
},
}
}
result = get_ospf_database(tgen, topo, dut, input_dict)
Returns
-------
True or False (Error Message)
"""
result = False
router = dut
logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
sleep(10)
if "ospf" not in topo["routers"][dut]:
errormsg = "[DUT: {}] OSPF is not configured on the router.".format(dut)
return errormsg
rnode = tgen.routers()[dut]
logger.info("Verifying OSPF interface on router %s:", dut)
if not rid:
rid = "self-originate"
if lsatype:
if vrf is None:
command = "show ip ospf database {} {} json".format(lsatype, rid)
else:
command = "show ip ospf database {} {} vrf {} json".format(
lsatype, rid, vrf
)
else:
if vrf is None:
command = "show ip ospf database json"
else:
command = "show ip ospf database vrf {} json".format(vrf)
show_ospf_json = run_frr_cmd(rnode, command, isjson=True)
# Verifying output dictionary show_ospf_json is empty or not
if not bool(show_ospf_json):
errormsg = "OSPF is not running"
return errormsg
# for inter and inter lsa's
ospf_db_data = input_dict.setdefault("areas", None)
ospf_external_lsa = input_dict.setdefault("asExternalLinkStates", None)
if ospf_db_data:
for ospf_area, area_lsa in ospf_db_data.items():
if "areas" in show_ospf_json and ospf_area in show_ospf_json["areas"]:
if "routerLinkStates" in area_lsa:
for lsa in area_lsa["routerLinkStates"]:
for rtrlsa in show_ospf_json["areas"][ospf_area][
"routerLinkStates"
]:
_advrtr = lsa.setdefault("advertisedRouter", None)
_options = lsa.setdefault("options", None)
if (
_advrtr
and lsa["lsaId"] == rtrlsa["lsaId"]
and lsa["advertisedRouter"]
== rtrlsa["advertisedRouter"]
):
result = True
break
if (
_options
and lsa["lsaId"] == rtrlsa["lsaId"]
and lsa["options"] == rtrlsa["options"]
):
result = True
break
if result:
logger.info(
"[DUT: %s] OSPF LSDB area %s:Router " "LSA %s",
router,
ospf_area,
lsa,
)
break
else:
errormsg = (
"[DUT: {}] OSPF LSDB area {}: expected"
" Router LSA is {}\n found Router LSA: {}".format(
router, ospf_area, lsa, rtrlsa
)
)
return errormsg
if "networkLinkStates" in area_lsa:
for lsa in area_lsa["networkLinkStates"]:
for netlsa in show_ospf_json["areas"][ospf_area][
"networkLinkStates"
]:
if (
lsa
in show_ospf_json["areas"][ospf_area][
"networkLinkStates"
]
):
if (
lsa["lsaId"] == netlsa["lsaId"]
and lsa["advertisedRouter"]
== netlsa["advertisedRouter"]
):
result = True
break
if result:
logger.info(
"[DUT: %s] OSPF LSDB area %s:Network " "LSA %s",
router,
ospf_area,
lsa,
)
break
else:
errormsg = (
"[DUT: {}] OSPF LSDB area {}: expected"
" Network LSA is {}".format(router, ospf_area, lsa)
)
return errormsg
if "summaryLinkStates" in area_lsa:
for lsa in area_lsa["summaryLinkStates"]:
for t3lsa in show_ospf_json["areas"][ospf_area][
"summaryLinkStates"
]:
if (
lsa["lsaId"] == t3lsa["lsaId"]
and lsa["advertisedRouter"] == t3lsa["advertisedRouter"]
):
result = True
break
if result:
logger.info(
"[DUT: %s] OSPF LSDB area %s:Summary " "LSA %s",
router,
ospf_area,
lsa,
)
break
else:
errormsg = (
"[DUT: {}] OSPF LSDB area {}: expected"
" Summary LSA is {}".format(router, ospf_area, lsa)
)
return errormsg
if "nssaExternalLinkStates" in area_lsa:
for lsa in area_lsa["nssaExternalLinkStates"]:
for t7lsa in show_ospf_json["areas"][ospf_area][
"nssaExternalLinkStates"
]:
if (
lsa["lsaId"] == t7lsa["lsaId"]
and lsa["advertisedRouter"] == t7lsa["advertisedRouter"]
):
result = True
break
if result:
logger.info(
"[DUT: %s] OSPF LSDB area %s:Type7 " "LSA %s",
router,
ospf_area,
lsa,
)
break
else:
errormsg = (
"[DUT: {}] OSPF LSDB area {}: expected"
" Type7 LSA is {}".format(router, ospf_area, lsa)
)
return errormsg
if "asbrSummaryLinkStates" in area_lsa:
for lsa in area_lsa["asbrSummaryLinkStates"]:
for t4lsa in show_ospf_json["areas"][ospf_area][
"asbrSummaryLinkStates"
]:
if (
lsa["lsaId"] == t4lsa["lsaId"]
and lsa["advertisedRouter"] == t4lsa["advertisedRouter"]
):
result = True
break
if result:
logger.info(
"[DUT: %s] OSPF LSDB area %s:ASBR Summary " "LSA %s",
router,
ospf_area,
lsa,
)
result = True
else:
errormsg = (
"[DUT: {}] OSPF LSDB area {}: expected"
" ASBR Summary LSA is {}".format(router, ospf_area, lsa)
)
return errormsg
if "linkLocalOpaqueLsa" in area_lsa:
for lsa in area_lsa["linkLocalOpaqueLsa"]:
try:
for lnklsa in show_ospf_json["areas"][ospf_area][
"linkLocalOpaqueLsa"
]:
if (
lsa["lsaId"] in lnklsa["lsaId"]
and "linkLocalOpaqueLsa"
in show_ospf_json["areas"][ospf_area]
):
logger.info(
(
"[DUT: FRR] OSPF LSDB area %s:Opaque-LSA"
"%s",
ospf_area,
lsa,
)
)
result = True
else:
errormsg = (
"[DUT: FRR] OSPF LSDB area: {} "
"expected Opaque-LSA is {}, Found is {}".format(
ospf_area, lsa, show_ospf_json
)
)
raise ValueError(errormsg)
return errormsg
except KeyError:
errormsg = "[DUT: FRR] linkLocalOpaqueLsa Not " "present"
return errormsg
else:
if "routerLinkStates" in area_lsa:
for lsa in area_lsa["routerLinkStates"]:
for rtrlsa in show_ospf_json["routerLinkStates"]:
_advrtr = lsa.setdefault("advertisedRouter", None)
_options = lsa.setdefault("options", None)
_age = lsa.setdefault("lsaAge", None)
if (
_options
and lsa["options"]
== show_ospf_json["routerLinkStates"][rtrlsa][
ospf_area
][0]["options"]
):
result = True
break
if (
_age != "get"
and lsa["lsaAge"]
== show_ospf_json["routerLinkStates"][rtrlsa][
ospf_area
][0]["lsaAge"]
):
result = True
break
if _age == "get":
return "{}".format(
show_ospf_json["routerLinkStates"][rtrlsa][
ospf_area
][0]["lsaAge"]
)
if result:
logger.info(
"[DUT: %s] OSPF LSDB area %s:Router " "LSA %s",
router,
ospf_area,
lsa,
)
break
else:
errormsg = (
"[DUT: {}] OSPF LSDB area {}: expected"
" Router LSA is {}\n found Router LSA: {}".format(
router,
ospf_area,
lsa,
show_ospf_json["routerLinkStates"],
)
)
return errormsg
if "networkLinkStates" in area_lsa:
for lsa in area_lsa["networkLinkStates"]:
for netlsa in show_ospf_json["areas"][ospf_area][
"networkLinkStates"
]:
if (
lsa
in show_ospf_json["areas"][ospf_area][
"networkLinkStates"
]
):
if (
lsa["lsaId"] == netlsa["lsaId"]
and lsa["advertisedRouter"]
== netlsa["advertisedRouter"]
):
result = True
break
if result:
logger.info(
"[DUT: %s] OSPF LSDB area %s:Network " "LSA %s",
router,
ospf_area,
lsa,
)
break
else:
errormsg = (
"[DUT: {}] OSPF LSDB area {}: expected"
" Network LSA is {}".format(router, ospf_area, lsa)
)
return errormsg
if "summaryLinkStates" in area_lsa:
for lsa in area_lsa["summaryLinkStates"]:
for t3lsa in show_ospf_json["areas"][ospf_area][
"summaryLinkStates"
]:
if (
lsa["lsaId"] == t3lsa["lsaId"]
and lsa["advertisedRouter"] == t3lsa["advertisedRouter"]
):
result = True
break
if result:
logger.info(
"[DUT: %s] OSPF LSDB area %s:Summary " "LSA %s",
router,
ospf_area,
lsa,
)
break
else:
errormsg = (
"[DUT: {}] OSPF LSDB area {}: expected"
" Summary LSA is {}".format(router, ospf_area, lsa)
)
return errormsg
if "nssaExternalLinkStates" in area_lsa:
for lsa in area_lsa["nssaExternalLinkStates"]:
for t7lsa in show_ospf_json["areas"][ospf_area][
"nssaExternalLinkStates"
]:
if (
lsa["lsaId"] == t7lsa["lsaId"]
and lsa["advertisedRouter"] == t7lsa["advertisedRouter"]
):
result = True
break
if result:
logger.info(
"[DUT: %s] OSPF LSDB area %s:Type7 " "LSA %s",
router,
ospf_area,
lsa,
)
break
else:
errormsg = (
"[DUT: {}] OSPF LSDB area {}: expected"
" Type7 LSA is {}".format(router, ospf_area, lsa)
)
return errormsg
if "asbrSummaryLinkStates" in area_lsa:
for lsa in area_lsa["asbrSummaryLinkStates"]:
for t4lsa in show_ospf_json["areas"][ospf_area][
"asbrSummaryLinkStates"
]:
if (
lsa["lsaId"] == t4lsa["lsaId"]
and lsa["advertisedRouter"] == t4lsa["advertisedRouter"]
):
result = True
break
if result:
logger.info(
"[DUT: %s] OSPF LSDB area %s:ASBR Summary " "LSA %s",
router,
ospf_area,
lsa,
)
result = True
else:
errormsg = (
"[DUT: {}] OSPF LSDB area {}: expected"
" ASBR Summary LSA is {}".format(router, ospf_area, lsa)
)
return errormsg
if "linkLocalOpaqueLsa" in area_lsa:
for lsa in area_lsa["linkLocalOpaqueLsa"]:
try:
for lnklsa in show_ospf_json["areas"][ospf_area][
"linkLocalOpaqueLsa"
]:
if (
lsa["lsaId"] in lnklsa["lsaId"]
and "linkLocalOpaqueLsa"
in show_ospf_json["areas"][ospf_area]
):
logger.info(
(
"[DUT: FRR] OSPF LSDB area %s:Opaque-LSA"
"%s",
ospf_area,
lsa,
)
)
result = True
else:
errormsg = (
"[DUT: FRR] OSPF LSDB area: {} "
"expected Opaque-LSA is {}, Found is {}".format(
ospf_area, lsa, show_ospf_json
)
)
raise ValueError(errormsg)
return errormsg
except KeyError:
errormsg = "[DUT: FRR] linkLocalOpaqueLsa Not " "present"
return errormsg
if ospf_external_lsa:
for lsa in ospf_external_lsa:
try:
for t5lsa in show_ospf_json["asExternalLinkStates"]:
if (
lsa["lsaId"] == t5lsa["lsaId"]
and lsa["advertisedRouter"] == t5lsa["advertisedRouter"]
):
result = True
break
except KeyError:
result = False
if result:
logger.info("[DUT: %s] OSPF LSDB:External LSA %s", router, lsa)
result = True
else:
errormsg = (
"[DUT: {}] OSPF LSDB : expected"
" External LSA is {}".format(router, lsa)
)
return errormsg
logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
return result