Merging upstream version 1.0.0.

Signed-off-by: Daniel Baumann <daniel@debian.org>
This commit is contained in:
Daniel Baumann 2025-02-05 11:54:06 +01:00
parent 256a120fdd
commit 3ccac88507
Signed by: daniel
GPG key ID: FBB4F0E80A80222F
36 changed files with 2108 additions and 153 deletions

View file

@ -45,4 +45,4 @@ RICH_COLOR_THEME = {
"unset": RICH_COLOR_PALETTE.UNSET,
}
GITHUB_SUGGESTION = "Please reach out to the maintainer team or open an issue on Github: https://github.com/arista-netdevops-community/anta."
GITHUB_SUGGESTION = "Please reach out to the maintainer team or open an issue on Github: https://github.com/aristanetworks/anta."

View file

@ -48,7 +48,7 @@ def debug_options(f: Callable[..., Any]) -> Callable[..., Any]:
device: str,
**kwargs: Any,
) -> Any:
# TODO: @gmuloc - tags come from context https://github.com/arista-netdevops-community/anta/issues/584
# TODO: @gmuloc - tags come from context https://github.com/aristanetworks/anta/issues/584
# pylint: disable=unused-argument
# ruff: noqa: ARG001
if (d := inventory.get(device)) is None:

View file

@ -13,6 +13,7 @@ from pathlib import Path
from typing import TYPE_CHECKING, Any
import click
import requests
from cvprac.cvp_client import CvpClient
from cvprac.cvp_client_errors import CvpApiError
from rich.pretty import pretty_repr
@ -36,14 +37,27 @@ logger = logging.getLogger(__name__)
@click.option("--username", "-u", help="CloudVision username", type=str, required=True)
@click.option("--password", "-p", help="CloudVision password", type=str, required=True)
@click.option("--container", "-c", help="CloudVision container where devices are configured", type=str)
def from_cvp(ctx: click.Context, output: Path, host: str, username: str, password: str, container: str | None) -> None:
@click.option(
"--ignore-cert",
help="Ignore verifying the SSL certificate when connecting to CloudVision",
show_envvar=True,
is_flag=True,
default=False,
)
def from_cvp(ctx: click.Context, output: Path, host: str, username: str, password: str, container: str | None, *, ignore_cert: bool) -> None:
# pylint: disable=too-many-arguments
"""Build ANTA inventory from Cloudvision.
"""Build ANTA inventory from CloudVision.
TODO - handle get_inventory and get_devices_in_container failure
NOTE: Only username/password authentication is supported for on-premises CloudVision instances.
Token authentication for both on-premises and CloudVision as a Service (CVaaS) is not supported.
"""
# TODO: - Handle get_cv_token, get_inventory and get_devices_in_container failures.
logger.info("Getting authentication token for user '%s' from CloudVision instance '%s'", username, host)
token = get_cv_token(cvp_ip=host, cvp_username=username, cvp_password=password)
try:
token = get_cv_token(cvp_ip=host, cvp_username=username, cvp_password=password, verify_cert=not ignore_cert)
except requests.exceptions.SSLError as error:
logger.error("Authentication to CloudVison failed: %s.", error)
ctx.exit(ExitCode.USAGE_ERROR)
clnt = CvpClient()
try:

View file

@ -77,16 +77,33 @@ def inventory_output_options(f: Callable[..., Any]) -> Callable[..., Any]:
return wrapper
def get_cv_token(cvp_ip: str, cvp_username: str, cvp_password: str) -> str:
"""Generate AUTH token from CVP using password."""
# TODO: need to handle requests error
def get_cv_token(cvp_ip: str, cvp_username: str, cvp_password: str, *, verify_cert: bool) -> str:
"""Generate the authentication token from CloudVision using username and password.
TODO: need to handle requests error
Args:
----
cvp_ip: IP address of CloudVision.
cvp_username: Username to connect to CloudVision.
cvp_password: Password to connect to CloudVision.
verify_cert: Enable or disable certificate verification when connecting to CloudVision.
Returns
-------
token(str): The token to use in further API calls to CloudVision.
Raises
------
requests.ssl.SSLError: If the certificate verification fails
"""
# use CVP REST API to generate a token
url = f"https://{cvp_ip}/cvpservice/login/authenticate.do"
payload = json.dumps({"userId": cvp_username, "password": cvp_password})
headers = {"Content-Type": "application/json", "Accept": "application/json"}
response = requests.request("POST", url, headers=headers, data=payload, verify=False, timeout=10)
response = requests.request("POST", url, headers=headers, data=payload, verify=verify_cert, timeout=10)
return response.json()["sessionId"]
@ -94,7 +111,7 @@ def write_inventory_to_file(hosts: list[AntaInventoryHost], output: Path) -> Non
"""Write a file inventory from pydantic models."""
i = AntaInventoryInput(hosts=hosts)
with output.open(mode="w", encoding="UTF-8") as out_fd:
out_fd.write(yaml.dump({AntaInventory.INVENTORY_ROOT_KEY: i.model_dump(exclude_unset=True)}))
out_fd.write(yaml.dump({AntaInventory.INVENTORY_ROOT_KEY: yaml.safe_load(i.yaml())}))
logger.info("ANTA inventory file has been created: '%s'", output)

View file

@ -6,7 +6,9 @@
from __future__ import annotations
import logging
import math
import yaml
from pydantic import BaseModel, ConfigDict, IPvAnyAddress, IPvAnyNetwork
from anta.custom_types import Hostname, Port
@ -82,3 +84,16 @@ class AntaInventoryInput(BaseModel):
networks: list[AntaInventoryNetwork] | None = None
hosts: list[AntaInventoryHost] | None = None
ranges: list[AntaInventoryRange] | None = None
def yaml(self) -> str:
"""Return a YAML representation string of this model.
Returns
-------
The YAML representation string of this model.
"""
# TODO: Pydantic and YAML serialization/deserialization is not supported natively.
# This could be improved.
# https://github.com/pydantic/pydantic/issues/1043
# Explore if this worth using this: https://github.com/NowanIlfideme/pydantic-yaml
return yaml.safe_dump(yaml.safe_load(self.model_dump_json(serialize_as_any=True, exclude_unset=True)), indent=2, width=math.inf)

View file

@ -7,6 +7,7 @@
# mypy: disable-error-code=attr-defined
from __future__ import annotations
from ipaddress import IPv4Address, IPv4Network
from typing import Any, ClassVar, Literal
from pydantic import BaseModel
@ -118,6 +119,20 @@ def _get_interface_data(interface: str, vrf: str, command_output: dict[str, Any]
return None
def _get_adjacency_segment_data_by_neighbor(neighbor: str, instance: str, vrf: str, command_output: dict[str, Any]) -> dict[str, Any] | None:
"""Extract data related to an IS-IS interface for testing."""
search_path = f"vrfs.{vrf}.isisInstances.{instance}.adjacencySegments"
if get_value(dictionary=command_output, key=search_path, default=None) is None:
return None
isis_instance = get_value(dictionary=command_output, key=search_path, default=None)
return next(
(segment_data for segment_data in isis_instance if neighbor == segment_data["ipAddress"]),
None,
)
class VerifyISISNeighborState(AntaTest):
"""Verifies all IS-IS neighbors are in UP state.
@ -211,14 +226,15 @@ class VerifyISISNeighborCount(AntaTest):
isis_neighbor_count = _get_isis_neighbors_count(command_output)
if len(isis_neighbor_count) == 0:
self.result.is_skipped("No IS-IS neighbor detected")
return
for interface in self.inputs.interfaces:
eos_data = [ifl_data for ifl_data in isis_neighbor_count if ifl_data["interface"] == interface.name and ifl_data["level"] == interface.level]
if not eos_data:
self.result.is_failure(f"No neighbor detected for interface {interface.name}")
return
continue
if eos_data[0]["count"] != interface.count:
self.result.is_failure(
f"Interface {interface.name}:"
f"Interface {interface.name}: "
f"expected Level {interface.level}: count {interface.count}, "
f"got Level {eos_data[0]['level']}: count {eos_data[0]['count']}"
)
@ -284,7 +300,8 @@ class VerifyISISInterfaceMode(AntaTest):
self.result.is_success()
if len(command_output["vrfs"]) == 0:
self.result.is_failure("IS-IS is not configured on device")
self.result.is_skipped("IS-IS is not configured on device")
return
# Check for p2p interfaces
for interface in self.inputs.interfaces:
@ -306,3 +323,409 @@ class VerifyISISInterfaceMode(AntaTest):
self.result.is_failure(f"Interface {interface.name} in VRF {interface.vrf} is not running in passive mode")
else:
self.result.is_failure(f"Interface {interface.name} not found in VRF {interface.vrf}")
class VerifyISISSegmentRoutingAdjacencySegments(AntaTest):
"""Verifies ISIS Segment Routing Adjacency Segments.
Verify that all expected Adjacency segments are correctly visible for each interface.
Expected Results
----------------
* Success: The test will pass if all listed interfaces have correct adjacencies.
* Failure: The test will fail if any of the listed interfaces has not expected list of adjacencies.
* Skipped: The test will be skipped if no ISIS SR Adjacency is found.
Examples
--------
```yaml
anta.tests.routing:
isis:
- VerifyISISSegmentRoutingAdjacencySegments:
instances:
- name: CORE-ISIS
vrf: default
segments:
- interface: Ethernet2
address: 10.0.1.3
sid_origin: dynamic
```
"""
name = "VerifyISISSegmentRoutingAdjacencySegments"
description = "Verify expected Adjacency segments are correctly visible for each interface."
categories: ClassVar[list[str]] = ["isis", "segment-routing"]
commands: ClassVar[list[AntaCommand | AntaTemplate]] = [AntaCommand(command="show isis segment-routing adjacency-segments", ofmt="json")]
class Input(AntaTest.Input):
"""Input model for the VerifyISISSegmentRoutingAdjacencySegments test."""
instances: list[IsisInstance]
class IsisInstance(BaseModel):
"""ISIS Instance model definition."""
name: str
"""ISIS instance name."""
vrf: str = "default"
"""VRF name where ISIS instance is configured."""
segments: list[Segment]
"""List of Adjacency segments configured in this instance."""
class Segment(BaseModel):
"""Segment model definition."""
interface: Interface
"""Interface name to check."""
level: Literal[1, 2] = 2
"""ISIS level configured for interface. Default is 2."""
sid_origin: Literal["dynamic"] = "dynamic"
"""Adjacency type"""
address: IPv4Address
"""IP address of remote end of segment."""
@AntaTest.anta_test
def test(self) -> None:
"""Main test function for VerifyISISSegmentRoutingAdjacencySegments."""
command_output = self.instance_commands[0].json_output
self.result.is_success()
if len(command_output["vrfs"]) == 0:
self.result.is_skipped("IS-IS is not configured on device")
return
# initiate defaults
failure_message = []
skip_vrfs = []
skip_instances = []
# Check if VRFs and instances are present in output.
for instance in self.inputs.instances:
vrf_data = get_value(
dictionary=command_output,
key=f"vrfs.{instance.vrf}",
default=None,
)
if vrf_data is None:
skip_vrfs.append(instance.vrf)
failure_message.append(f"VRF {instance.vrf} is not configured to run segment routging.")
elif get_value(dictionary=vrf_data, key=f"isisInstances.{instance.name}", default=None) is None:
skip_instances.append(instance.name)
failure_message.append(f"Instance {instance.name} is not found in vrf {instance.vrf}.")
# Check Adjacency segments
for instance in self.inputs.instances:
if instance.vrf not in skip_vrfs and instance.name not in skip_instances:
for input_segment in instance.segments:
eos_segment = _get_adjacency_segment_data_by_neighbor(
neighbor=str(input_segment.address),
instance=instance.name,
vrf=instance.vrf,
command_output=command_output,
)
if eos_segment is None:
failure_message.append(f"Your segment has not been found: {input_segment}.")
elif (
eos_segment["localIntf"] != input_segment.interface
or eos_segment["level"] != input_segment.level
or eos_segment["sidOrigin"] != input_segment.sid_origin
):
failure_message.append(f"Your segment is not correct: Expected: {input_segment} - Found: {eos_segment}.")
if failure_message:
self.result.is_failure("\n".join(failure_message))
class VerifyISISSegmentRoutingDataplane(AntaTest):
"""
Verify dataplane of a list of ISIS-SR instances.
Expected Results
----------------
* Success: The test will pass if all instances have correct dataplane configured
* Failure: The test will fail if one of the instances has incorrect dataplane configured
* Skipped: The test will be skipped if ISIS is not running
Examples
--------
```yaml
anta.tests.routing:
isis:
- VerifyISISSegmentRoutingDataplane:
instances:
- name: CORE-ISIS
vrf: default
dataplane: MPLS
```
"""
name = "VerifyISISSegmentRoutingDataplane"
description = "Verify dataplane of a list of ISIS-SR instances"
categories: ClassVar[list[str]] = ["isis", "segment-routing"]
commands: ClassVar[list[AntaCommand | AntaTemplate]] = [AntaCommand(command="show isis segment-routing", ofmt="json")]
class Input(AntaTest.Input):
"""Input model for the VerifyISISSegmentRoutingDataplane test."""
instances: list[IsisInstance]
class IsisInstance(BaseModel):
"""ISIS Instance model definition."""
name: str
"""ISIS instance name."""
vrf: str = "default"
"""VRF name where ISIS instance is configured."""
dataplane: Literal["MPLS", "mpls", "unset"] = "MPLS"
"""Configured dataplane for the instance."""
@AntaTest.anta_test
def test(self) -> None:
"""Main test function for VerifyISISSegmentRoutingDataplane."""
command_output = self.instance_commands[0].json_output
self.result.is_success()
if len(command_output["vrfs"]) == 0:
self.result.is_skipped("IS-IS-SR is not running on device.")
return
# initiate defaults
failure_message = []
skip_vrfs = []
skip_instances = []
# Check if VRFs and instances are present in output.
for instance in self.inputs.instances:
vrf_data = get_value(
dictionary=command_output,
key=f"vrfs.{instance.vrf}",
default=None,
)
if vrf_data is None:
skip_vrfs.append(instance.vrf)
failure_message.append(f"VRF {instance.vrf} is not configured to run segment routing.")
elif get_value(dictionary=vrf_data, key=f"isisInstances.{instance.name}", default=None) is None:
skip_instances.append(instance.name)
failure_message.append(f"Instance {instance.name} is not found in vrf {instance.vrf}.")
# Check Adjacency segments
for instance in self.inputs.instances:
if instance.vrf not in skip_vrfs and instance.name not in skip_instances:
eos_dataplane = get_value(dictionary=command_output, key=f"vrfs.{instance.vrf}.isisInstances.{instance.name}.dataPlane", default=None)
if instance.dataplane.upper() != eos_dataplane:
failure_message.append(f"ISIS instance {instance.name} is not running dataplane {instance.dataplane} ({eos_dataplane})")
if failure_message:
self.result.is_failure("\n".join(failure_message))
class VerifyISISSegmentRoutingTunnels(AntaTest):
"""
Verify ISIS-SR tunnels computed by device.
Expected Results
----------------
* Success: The test will pass if all listed tunnels are computed on device.
* Failure: The test will fail if one of the listed tunnels is missing.
* Skipped: The test will be skipped if ISIS-SR is not configured.
Examples
--------
```yaml
anta.tests.routing:
isis:
- VerifyISISSegmentRoutingTunnels:
entries:
# Check only endpoint
- endpoint: 1.0.0.122/32
# Check endpoint and via TI-LFA
- endpoint: 1.0.0.13/32
vias:
- type: tunnel
tunnel_id: ti-lfa
# Check endpoint and via IP routers
- endpoint: 1.0.0.14/32
vias:
- type: ip
nexthop: 1.1.1.1
```
"""
name = "VerifyISISSegmentRoutingTunnels"
description = "Verify ISIS-SR tunnels computed by device"
categories: ClassVar[list[str]] = ["isis", "segment-routing"]
commands: ClassVar[list[AntaCommand | AntaTemplate]] = [AntaCommand(command="show isis segment-routing tunnel", ofmt="json")]
class Input(AntaTest.Input):
"""Input model for the VerifyISISSegmentRoutingTunnels test."""
entries: list[Entry]
"""List of tunnels to check on device."""
class Entry(BaseModel):
"""Definition of a tunnel entry."""
endpoint: IPv4Network
"""Endpoint IP of the tunnel."""
vias: list[Vias] | None = None
"""Optional list of path to reach endpoint."""
class Vias(BaseModel):
"""Definition of a tunnel path."""
nexthop: IPv4Address | None = None
"""Nexthop of the tunnel. If None, then it is not tested. Default: None"""
type: Literal["ip", "tunnel"] | None = None
"""Type of the tunnel. If None, then it is not tested. Default: None"""
interface: Interface | None = None
"""Interface of the tunnel. If None, then it is not tested. Default: None"""
tunnel_id: Literal["TI-LFA", "ti-lfa", "unset"] | None = None
"""Computation method of the tunnel. If None, then it is not tested. Default: None"""
def _eos_entry_lookup(self, search_value: IPv4Network, entries: dict[str, Any], search_key: str = "endpoint") -> dict[str, Any] | None:
return next(
(entry_value for entry_id, entry_value in entries.items() if str(entry_value[search_key]) == str(search_value)),
None,
)
@AntaTest.anta_test
def test(self) -> None:
"""Main test function for VerifyISISSegmentRoutingTunnels.
This method performs the main test logic for verifying ISIS Segment Routing tunnels.
It checks the command output, initiates defaults, and performs various checks on the tunnels.
Returns
-------
None
"""
command_output = self.instance_commands[0].json_output
self.result.is_success()
# initiate defaults
failure_message = []
if len(command_output["entries"]) == 0:
self.result.is_skipped("IS-IS-SR is not running on device.")
return
for input_entry in self.inputs.entries:
eos_entry = self._eos_entry_lookup(search_value=input_entry.endpoint, entries=command_output["entries"])
if eos_entry is None:
failure_message.append(f"Tunnel to {input_entry} is not found.")
elif input_entry.vias is not None:
failure_src = []
for via_input in input_entry.vias:
if not self._check_tunnel_type(via_input, eos_entry):
failure_src.append("incorrect tunnel type")
if not self._check_tunnel_nexthop(via_input, eos_entry):
failure_src.append("incorrect nexthop")
if not self._check_tunnel_interface(via_input, eos_entry):
failure_src.append("incorrect interface")
if not self._check_tunnel_id(via_input, eos_entry):
failure_src.append("incorrect tunnel ID")
if failure_src:
failure_message.append(f"Tunnel to {input_entry.endpoint!s} is incorrect: {', '.join(failure_src)}")
if failure_message:
self.result.is_failure("\n".join(failure_message))
def _check_tunnel_type(self, via_input: VerifyISISSegmentRoutingTunnels.Input.Entry.Vias, eos_entry: dict[str, Any]) -> bool:
"""
Check if the tunnel type specified in `via_input` matches any of the tunnel types in `eos_entry`.
Args:
via_input (VerifyISISSegmentRoutingTunnels.Input.Entry.Vias): The input tunnel type to check.
eos_entry (dict[str, Any]): The EOS entry containing the tunnel types.
Returns
-------
bool: True if the tunnel type matches any of the tunnel types in `eos_entry`, False otherwise.
"""
if via_input.type is not None:
return any(
via_input.type
== get_value(
dictionary=eos_via,
key="type",
default="undefined",
)
for eos_via in eos_entry["vias"]
)
return True
def _check_tunnel_nexthop(self, via_input: VerifyISISSegmentRoutingTunnels.Input.Entry.Vias, eos_entry: dict[str, Any]) -> bool:
"""
Check if the tunnel nexthop matches the given input.
Args:
via_input (VerifyISISSegmentRoutingTunnels.Input.Entry.Vias): The input via object.
eos_entry (dict[str, Any]): The EOS entry dictionary.
Returns
-------
bool: True if the tunnel nexthop matches, False otherwise.
"""
if via_input.nexthop is not None:
return any(
str(via_input.nexthop)
== get_value(
dictionary=eos_via,
key="nexthop",
default="undefined",
)
for eos_via in eos_entry["vias"]
)
return True
def _check_tunnel_interface(self, via_input: VerifyISISSegmentRoutingTunnels.Input.Entry.Vias, eos_entry: dict[str, Any]) -> bool:
"""
Check if the tunnel interface exists in the given EOS entry.
Args:
via_input (VerifyISISSegmentRoutingTunnels.Input.Entry.Vias): The input via object.
eos_entry (dict[str, Any]): The EOS entry dictionary.
Returns
-------
bool: True if the tunnel interface exists, False otherwise.
"""
if via_input.interface is not None:
return any(
via_input.interface
== get_value(
dictionary=eos_via,
key="interface",
default="undefined",
)
for eos_via in eos_entry["vias"]
)
return True
def _check_tunnel_id(self, via_input: VerifyISISSegmentRoutingTunnels.Input.Entry.Vias, eos_entry: dict[str, Any]) -> bool:
"""
Check if the tunnel ID matches any of the tunnel IDs in the EOS entry's vias.
Args:
via_input (VerifyISISSegmentRoutingTunnels.Input.Entry.Vias): The input vias to check.
eos_entry (dict[str, Any]): The EOS entry to compare against.
Returns
-------
bool: True if the tunnel ID matches any of the tunnel IDs in the EOS entry's vias, False otherwise.
"""
if via_input.tunnel_id is not None:
return any(
via_input.tunnel_id.upper()
== get_value(
dictionary=eos_via,
key="tunnelId.type",
default="undefined",
).upper()
for eos_via in eos_entry["vias"]
)
return True