Merging upstream version 0.15.0.

Signed-off-by: Daniel Baumann <daniel@debian.org>
This commit is contained in:
Daniel Baumann 2025-02-05 11:39:50 +01:00
parent bfebc2a0f4
commit 0a0cb7f4fd
Signed by: daniel
GPG key ID: FBB4F0E80A80222F
103 changed files with 79620 additions and 742 deletions

View file

@ -17,7 +17,7 @@ __credits__ = [
"Guillaume Mulocher",
"Thomas Grimonet",
]
__copyright__ = "Copyright 2022, Arista EMEA AS"
__copyright__ = "Copyright 2022-2024, Arista Networks, Inc."
# ANTA Debug Mode environment variable
__DEBUG__ = bool(os.environ.get("ANTA_DEBUG", "").lower() == "true")

View file

@ -1,106 +0,0 @@
# Copyright (c) 2023-2024 Arista Networks, Inc.
# Use of this source code is governed by the Apache License 2.0
# that can be found in the LICENSE file.
"""Patch for aioeapi waiting for https://github.com/jeremyschulman/aio-eapi/pull/13."""
from __future__ import annotations
from typing import Any, AnyStr
import aioeapi
Device = aioeapi.Device
class EapiCommandError(RuntimeError):
"""Exception class for EAPI command errors.
Attributes
----------
failed: str - the failed command
errmsg: str - a description of the failure reason
errors: list[str] - the command failure details
passed: list[dict] - a list of command results of the commands that passed
not_exec: list[str] - a list of commands that were not executed
"""
# pylint: disable=too-many-arguments
def __init__(self, failed: str, errors: list[str], errmsg: str, passed: list[str | dict[str, Any]], not_exec: list[dict[str, Any]]) -> None:
"""Initializer for the EapiCommandError exception."""
self.failed = failed
self.errmsg = errmsg
self.errors = errors
self.passed = passed
self.not_exec = not_exec
super().__init__()
def __str__(self) -> str:
"""Returns the error message associated with the exception."""
return self.errmsg
aioeapi.EapiCommandError = EapiCommandError
async def jsonrpc_exec(self, jsonrpc: dict) -> list[dict | AnyStr]: # type: ignore
"""Execute the JSON-RPC dictionary object.
Parameters
----------
jsonrpc: dict
The JSON-RPC as created by the `meth`:jsonrpc_command().
Raises
------
EapiCommandError
In the event that a command resulted in an error response.
Returns
-------
The list of command results; either dict or text depending on the
JSON-RPC format pameter.
"""
res = await self.post("/command-api", json=jsonrpc)
res.raise_for_status()
body = res.json()
commands = jsonrpc["params"]["cmds"]
ofmt = jsonrpc["params"]["format"]
get_output = (lambda _r: _r["output"]) if ofmt == "text" else (lambda _r: _r)
# if there are no errors then return the list of command results.
if (err_data := body.get("error")) is None:
return [get_output(cmd_res) for cmd_res in body["result"]]
# ---------------------------------------------------------------------
# if we are here, then there were some command errors. Raise a
# EapiCommandError exception with args (commands that failed, passed,
# not-executed).
# ---------------------------------------------------------------------
# -------------------------- eAPI specification ----------------------
# On an error, no result object is present, only an error object, which
# is guaranteed to have the following attributes: code, messages, and
# data. Similar to the result object in the successful response, the
# data object is a list of objects corresponding to the results of all
# commands up to, and including, the failed command. If there was a an
# error before any commands were executed (e.g. bad credentials), data
# will be empty. The last object in the data array will always
# correspond to the failed command. The command failure details are
# always stored in the errors array.
cmd_data = err_data["data"]
len_data = len(cmd_data)
err_at = len_data - 1
err_msg = err_data["message"]
raise EapiCommandError(
passed=[get_output(cmd_data[cmd_i]) for cmd_i, cmd in enumerate(commands[:err_at])],
failed=commands[err_at]["cmd"],
errors=cmd_data[err_at]["errors"],
errmsg=err_msg,
not_exec=commands[err_at + 1 :],
)
aioeapi.Device.jsonrpc_exec = jsonrpc_exec

View file

@ -7,11 +7,14 @@ from __future__ import annotations
import importlib
import logging
import math
from collections import defaultdict
from inspect import isclass
from pathlib import Path
from typing import TYPE_CHECKING, Any, Optional, Union
from pydantic import BaseModel, ConfigDict, RootModel, ValidationError, ValidationInfo, field_validator, model_validator
import yaml
from pydantic import BaseModel, ConfigDict, RootModel, ValidationError, ValidationInfo, field_validator, model_serializer, model_validator
from pydantic.types import ImportString
from pydantic_core import PydanticCustomError
from yaml import YAMLError, safe_load
@ -43,6 +46,22 @@ class AntaTestDefinition(BaseModel):
test: type[AntaTest]
inputs: AntaTest.Input
@model_serializer()
def serialize_model(self) -> dict[str, AntaTest.Input]:
"""Serialize the AntaTestDefinition model.
The dictionary representing the model will be look like:
```
<AntaTest subclass name>:
<AntaTest.Input compliant dictionary>
```
Returns
-------
A dictionary representing the model.
"""
return {self.test.__name__: self.inputs}
def __init__(self, **data: type[AntaTest] | AntaTest.Input | dict[str, Any] | None) -> None:
"""Inject test in the context to allow to instantiate Input in the BeforeValidator.
@ -157,12 +176,12 @@ class AntaCatalogFile(RootModel[dict[ImportString[Any], list[AntaTestDefinition]
if isinstance(tests, dict):
# This is an inner Python module
modules.update(AntaCatalogFile.flatten_modules(data=tests, package=module.__name__))
else:
if not isinstance(tests, list):
msg = f"Syntax error when parsing: {tests}\nIt must be a list of ANTA tests. Check the test catalog."
raise ValueError(msg) # noqa: TRY004 pydantic catches ValueError or AssertionError, no TypeError
elif isinstance(tests, list):
# This is a list of AntaTestDefinition
modules[module] = tests
else:
msg = f"Syntax error when parsing: {tests}\nIt must be a list of ANTA tests. Check the test catalog."
raise ValueError(msg) # noqa: TRY004 pydantic catches ValueError or AssertionError, no TypeError
return modules
# ANN401 - Any ok for this validator as we are validating the received data
@ -177,10 +196,15 @@ class AntaCatalogFile(RootModel[dict[ImportString[Any], list[AntaTestDefinition]
with provided value to validate test inputs.
"""
if isinstance(data, dict):
if not data:
return data
typed_data: dict[ModuleType, list[Any]] = AntaCatalogFile.flatten_modules(data)
for module, tests in typed_data.items():
test_definitions: list[AntaTestDefinition] = []
for test_definition in tests:
if isinstance(test_definition, AntaTestDefinition):
test_definitions.append(test_definition)
continue
if not isinstance(test_definition, dict):
msg = f"Syntax error when parsing: {test_definition}\nIt must be a dictionary. Check the test catalog."
raise ValueError(msg) # noqa: TRY004 pydantic catches ValueError or AssertionError, no TypeError
@ -200,7 +224,21 @@ class AntaCatalogFile(RootModel[dict[ImportString[Any], list[AntaTestDefinition]
raise ValueError(msg)
test_definitions.append(AntaTestDefinition(test=test, inputs=test_inputs))
typed_data[module] = test_definitions
return typed_data
return typed_data
return data
def yaml(self) -> str:
"""Return a YAML representation string of this model.
Returns
-------
The YAML representation string of this model.
"""
# TODO: Pydantic and YAML serialization/deserialization is not supported natively.
# This could be improved.
# https://github.com/pydantic/pydantic/issues/1043
# Explore if this worth using this: https://github.com/NowanIlfideme/pydantic-yaml
return yaml.safe_dump(yaml.safe_load(self.model_dump_json(serialize_as_any=True, exclude_unset=True)), indent=2, width=math.inf)
class AntaCatalog:
@ -232,6 +270,12 @@ class AntaCatalog:
else:
self._filename = Path(filename)
# Default indexes for faster access
self.tag_to_tests: defaultdict[str | None, set[AntaTestDefinition]] = defaultdict(set)
self.tests_without_tags: set[AntaTestDefinition] = set()
self.indexes_built: bool = False
self.final_tests_count: int = 0
@property
def filename(self) -> Path | None:
"""Path of the file used to create this AntaCatalog instance."""
@ -297,7 +341,7 @@ class AntaCatalog:
raise TypeError(msg)
try:
catalog_data = AntaCatalogFile(**data) # type: ignore[arg-type]
catalog_data = AntaCatalogFile(data) # type: ignore[arg-type]
except ValidationError as e:
anta_log_exception(
e,
@ -328,40 +372,85 @@ class AntaCatalog:
raise
return AntaCatalog(tests)
def get_tests_by_tags(self, tags: set[str], *, strict: bool = False) -> list[AntaTestDefinition]:
"""Return all the tests that have matching tags in their input filters.
If strict=True, return only tests that match all the tags provided as input.
If strict=False, return all the tests that match at least one tag provided as input.
def merge(self, catalog: AntaCatalog) -> AntaCatalog:
"""Merge two AntaCatalog instances.
Args:
----
tags: Tags of the tests to get.
strict: Specify if the returned tests must match all the tags provided.
catalog: AntaCatalog instance to merge to this instance.
Returns
-------
List of AntaTestDefinition that match the tags
A new AntaCatalog instance containing the tests of the two instances.
"""
result: list[AntaTestDefinition] = []
return AntaCatalog(tests=self.tests + catalog.tests)
def dump(self) -> AntaCatalogFile:
"""Return an AntaCatalogFile instance from this AntaCatalog instance.
Returns
-------
An AntaCatalogFile instance containing tests of this AntaCatalog instance.
"""
root: dict[ImportString[Any], list[AntaTestDefinition]] = {}
for test in self.tests:
if test.inputs.filters and (f := test.inputs.filters.tags):
if strict:
if all(t in tags for t in f):
result.append(test)
elif any(t in tags for t in f):
result.append(test)
return result
# Cannot use AntaTest.module property as the class is not instantiated
root.setdefault(test.test.__module__, []).append(test)
return AntaCatalogFile(root=root)
def get_tests_by_names(self, names: set[str]) -> list[AntaTestDefinition]:
"""Return all the tests that have matching a list of tests names.
def build_indexes(self, filtered_tests: set[str] | None = None) -> None:
"""Indexes tests by their tags for quick access during filtering operations.
If a `filtered_tests` set is provided, only the tests in this set will be indexed.
This method populates two attributes:
- tag_to_tests: A dictionary mapping each tag to a set of tests that contain it.
- tests_without_tags: A set of tests that do not have any tags.
Once the indexes are built, the `indexes_built` attribute is set to True.
"""
for test in self.tests:
# Skip tests that are not in the specified filtered_tests set
if filtered_tests and test.test.name not in filtered_tests:
continue
# Indexing by tag
if test.inputs.filters and (test_tags := test.inputs.filters.tags):
for tag in test_tags:
self.tag_to_tests[tag].add(test)
else:
self.tests_without_tags.add(test)
self.tag_to_tests[None] = self.tests_without_tags
self.indexes_built = True
def get_tests_by_tags(self, tags: set[str], *, strict: bool = False) -> set[AntaTestDefinition]:
"""Return all tests that match a given set of tags, according to the specified strictness.
Args:
----
names: Names of the tests to get.
tags: The tags to filter tests by. If empty, return all tests without tags.
strict: If True, returns only tests that contain all specified tags (intersection).
If False, returns tests that contain any of the specified tags (union).
Returns
-------
List of AntaTestDefinition that match the names
set[AntaTestDefinition]: A set of tests that match the given tags.
Raises
------
ValueError: If the indexes have not been built prior to method call.
"""
return [test for test in self.tests if test.test.name in names]
if not self.indexes_built:
msg = "Indexes have not been built yet. Call build_indexes() first."
raise ValueError(msg)
if not tags:
return self.tag_to_tests[None]
filtered_sets = [self.tag_to_tests[tag] for tag in tags if tag in self.tag_to_tests]
if not filtered_sets:
return set()
if strict:
return set.intersection(*filtered_sets)
return set.union(*filtered_sets)

View file

@ -5,70 +5,37 @@
from __future__ import annotations
import logging
import pathlib
import sys
from typing import Callable
import click
from anta import __DEBUG__
from anta import GITHUB_SUGGESTION, __version__
from anta.cli.check import check as check_command
from anta.cli.debug import debug as debug_command
from anta.cli.exec import _exec as exec_command
from anta.cli.get import get as get_command
from anta.cli.nrfu import nrfu as nrfu_command
from anta.cli.utils import AliasedGroup, ExitCode
from anta.logger import Log, LogLevel, anta_log_exception, setup_logging
# Note: need to separate this file from _main to be able to fail on the import.
try:
from ._main import anta, cli
logger = logging.getLogger(__name__)
except ImportError as exc:
def build_cli(exception: Exception) -> Callable[[], None]:
"""Build CLI function using the caught exception."""
@click.group(cls=AliasedGroup)
@click.pass_context
@click.version_option(__version__)
@click.option(
"--log-file",
help="Send the logs to a file. If logging level is DEBUG, only INFO or higher will be sent to stdout.",
show_envvar=True,
type=click.Path(file_okay=True, dir_okay=False, writable=True, path_type=pathlib.Path),
)
@click.option(
"--log-level",
"-l",
help="ANTA logging level",
default=logging.getLevelName(logging.INFO),
show_envvar=True,
show_default=True,
type=click.Choice(
[Log.CRITICAL, Log.ERROR, Log.WARNING, Log.INFO, Log.DEBUG],
case_sensitive=False,
),
)
def anta(ctx: click.Context, log_level: LogLevel, log_file: pathlib.Path) -> None:
"""Arista Network Test Automation (ANTA) CLI."""
ctx.ensure_object(dict)
setup_logging(log_level, log_file)
def wrap() -> None:
"""Error message if any CLI dependency is missing."""
print(
"The ANTA command line client could not run because the required "
"dependencies were not installed.\nMake sure you've installed "
"everything with: pip install 'anta[cli]'"
)
if __DEBUG__:
print(f"The caught exception was: {exception}")
sys.exit(1)
anta.add_command(nrfu_command)
anta.add_command(check_command)
anta.add_command(exec_command)
anta.add_command(get_command)
anta.add_command(debug_command)
return wrap
cli = build_cli(exc)
def cli() -> None:
"""Entrypoint for pyproject.toml."""
try:
anta(obj={}, auto_envvar_prefix="ANTA")
except Exception as exc: # pylint: disable=broad-exception-caught
anta_log_exception(
exc,
f"Uncaught Exception when running ANTA CLI\n{GITHUB_SUGGESTION}",
logger,
)
sys.exit(ExitCode.INTERNAL_ERROR)
__all__ = ["cli", "anta"]
if __name__ == "__main__":
cli()

70
anta/cli/_main.py Normal file
View file

@ -0,0 +1,70 @@
# Copyright (c) 2023-2024 Arista Networks, Inc.
# Use of this source code is governed by the Apache License 2.0
# that can be found in the LICENSE file.
"""ANTA CLI."""
from __future__ import annotations
import logging
import pathlib
import sys
import click
from anta import GITHUB_SUGGESTION, __version__
from anta.cli.check import check as check_command
from anta.cli.debug import debug as debug_command
from anta.cli.exec import _exec as exec_command
from anta.cli.get import get as get_command
from anta.cli.nrfu import nrfu as nrfu_command
from anta.cli.utils import AliasedGroup, ExitCode
from anta.logger import Log, LogLevel, anta_log_exception, setup_logging
logger = logging.getLogger(__name__)
@click.group(cls=AliasedGroup)
@click.pass_context
@click.version_option(__version__)
@click.option(
"--log-file",
help="Send the logs to a file. If logging level is DEBUG, only INFO or higher will be sent to stdout.",
show_envvar=True,
type=click.Path(file_okay=True, dir_okay=False, writable=True, path_type=pathlib.Path),
)
@click.option(
"--log-level",
"-l",
help="ANTA logging level",
default=logging.getLevelName(logging.INFO),
show_envvar=True,
show_default=True,
type=click.Choice(
[Log.CRITICAL, Log.ERROR, Log.WARNING, Log.INFO, Log.DEBUG],
case_sensitive=False,
),
)
def anta(ctx: click.Context, log_level: LogLevel, log_file: pathlib.Path) -> None:
"""Arista Network Test Automation (ANTA) CLI."""
ctx.ensure_object(dict)
setup_logging(log_level, log_file)
anta.add_command(nrfu_command)
anta.add_command(check_command)
anta.add_command(exec_command)
anta.add_command(get_command)
anta.add_command(debug_command)
def cli() -> None:
"""Entrypoint for pyproject.toml."""
try:
anta(obj={}, auto_envvar_prefix="ANTA")
except Exception as exc: # pylint: disable=broad-exception-caught
anta_log_exception(
exc,
f"Uncaught Exception when running ANTA CLI\n{GITHUB_SUGGESTION}",
logger,
)
sys.exit(ExitCode.INTERNAL_ERROR)

View file

@ -51,11 +51,8 @@ def debug_options(f: Callable[..., Any]) -> Callable[..., Any]:
# TODO: @gmuloc - tags come from context https://github.com/arista-netdevops-community/anta/issues/584
# pylint: disable=unused-argument
# ruff: noqa: ARG001
try:
d = inventory[device]
except KeyError as e:
message = f"Device {device} does not exist in Inventory"
logger.error(e, message)
if (d := inventory.get(device)) is None:
logger.error("Device '%s' does not exist in Inventory", device)
ctx.exit(ExitCode.USAGE_ERROR)
return f(*args, device=d, **kwargs)

View file

@ -16,7 +16,7 @@ import click
from yaml import safe_load
from anta.cli.console import console
from anta.cli.exec.utils import clear_counters_utils, collect_commands, collect_scheduled_show_tech
from anta.cli.exec import utils
from anta.cli.utils import inventory_options
if TYPE_CHECKING:
@ -29,7 +29,7 @@ logger = logging.getLogger(__name__)
@inventory_options
def clear_counters(inventory: AntaInventory, tags: set[str] | None) -> None:
"""Clear counter statistics on EOS devices."""
asyncio.run(clear_counters_utils(inventory, tags=tags))
asyncio.run(utils.clear_counters(inventory, tags=tags))
@click.command()
@ -62,7 +62,7 @@ def snapshot(inventory: AntaInventory, tags: set[str] | None, commands_list: Pat
except FileNotFoundError:
logger.error("Error reading %s", commands_list)
sys.exit(1)
asyncio.run(collect_commands(inventory, eos_commands, output, tags=tags))
asyncio.run(utils.collect_commands(inventory, eos_commands, output, tags=tags))
@click.command()
@ -98,4 +98,4 @@ def collect_tech_support(
configure: bool,
) -> None:
"""Collect scheduled tech-support from EOS devices."""
asyncio.run(collect_scheduled_show_tech(inventory, output, configure=configure, tags=tags, latest=latest))
asyncio.run(utils.collect_show_tech(inventory, output, configure=configure, tags=tags, latest=latest))

View file

@ -14,12 +14,13 @@ import re
from pathlib import Path
from typing import TYPE_CHECKING, Literal
from aioeapi import EapiCommandError
from click.exceptions import UsageError
from httpx import ConnectError, HTTPError
from anta.custom_types import REGEXP_PATH_MARKERS
from anta.device import AntaDevice, AsyncEOSDevice
from anta.models import AntaCommand
from asynceapi import EapiCommandError
if TYPE_CHECKING:
from anta.inventory import AntaInventory
@ -29,7 +30,7 @@ INVALID_CHAR = "`~!@#$/"
logger = logging.getLogger(__name__)
async def clear_counters_utils(anta_inventory: AntaInventory, tags: set[str] | None = None) -> None:
async def clear_counters(anta_inventory: AntaInventory, tags: set[str] | None = None) -> None:
"""Clear counters."""
async def clear(dev: AntaDevice) -> None:
@ -60,7 +61,7 @@ async def collect_commands(
async def collect(dev: AntaDevice, command: str, outformat: Literal["json", "text"]) -> None:
outdir = Path() / root_dir / dev.name / outformat
outdir.mkdir(parents=True, exist_ok=True)
safe_command = re.sub(r"(/|\|$)", "_", command)
safe_command = re.sub(rf"{REGEXP_PATH_MARKERS}", "_", command)
c = AntaCommand(command=command, ofmt=outformat)
await dev.collect(c)
if not c.collected:
@ -72,6 +73,9 @@ async def collect_commands(
elif c.ofmt == "text":
outfile = outdir / f"{safe_command}.log"
content = c.text_output
else:
logger.error("Command outformat is not in ['json', 'text'] for command '%s'", command)
return
with outfile.open(mode="w", encoding="UTF-8") as f:
f.write(content)
logger.info("Collected command '%s' from device %s (%s)", command, dev.name, dev.hw_model)
@ -91,7 +95,7 @@ async def collect_commands(
logger.error("Error when collecting commands: %s", str(r))
async def collect_scheduled_show_tech(inv: AntaInventory, root_dir: Path, *, configure: bool, tags: set[str] | None = None, latest: int | None = None) -> None:
async def collect_show_tech(inv: AntaInventory, root_dir: Path, *, configure: bool, tags: set[str] | None = None, latest: int | None = None) -> None:
"""Collect scheduled show-tech on devices."""
async def collect(device: AntaDevice) -> None:
@ -103,12 +107,12 @@ async def collect_scheduled_show_tech(inv: AntaInventory, root_dir: Path, *, con
cmd += f" | head -{latest}"
command = AntaCommand(command=cmd, ofmt="text")
await device.collect(command=command)
if command.collected and command.text_output:
filenames = [Path(f"{EOS_SCHEDULED_TECH_SUPPORT}/{f}") for f in command.text_output.splitlines()]
else:
if not (command.collected and command.text_output):
logger.error("Unable to get tech-support filenames on %s: verify that %s is not empty", device.name, EOS_SCHEDULED_TECH_SUPPORT)
return
filenames = [Path(f"{EOS_SCHEDULED_TECH_SUPPORT}/{f}") for f in command.text_output.splitlines()]
# Create directories
outdir = Path() / root_dir / f"{device.name.lower()}"
outdir.mkdir(parents=True, exist_ok=True)
@ -119,31 +123,32 @@ async def collect_scheduled_show_tech(inv: AntaInventory, root_dir: Path, *, con
if command.collected and not command.text_output:
logger.debug("'aaa authorization exec default local' is not configured on device %s", device.name)
if configure:
commands = []
# TODO: @mtache - add `config` field to `AntaCommand` object to handle this use case.
# Otherwise mypy complains about enable as it is only implemented for AsyncEOSDevice
# TODO: Should enable be also included in AntaDevice?
if not isinstance(device, AsyncEOSDevice):
msg = "anta exec collect-tech-support is only supported with AsyncEOSDevice for now."
raise UsageError(msg)
if device.enable and device._enable_password is not None: # pylint: disable=protected-access
commands.append({"cmd": "enable", "input": device._enable_password}) # pylint: disable=protected-access
elif device.enable:
commands.append({"cmd": "enable"})
commands.extend(
[
{"cmd": "configure terminal"},
{"cmd": "aaa authorization exec default local"},
],
)
logger.warning("Configuring 'aaa authorization exec default local' on device %s", device.name)
command = AntaCommand(command="show running-config | include aaa authorization exec default local", ofmt="text")
await device._session.cli(commands=commands) # pylint: disable=protected-access
logger.info("Configured 'aaa authorization exec default local' on device %s", device.name)
else:
if not configure:
logger.error("Unable to collect tech-support on %s: configuration 'aaa authorization exec default local' is not present", device.name)
return
commands = []
# TODO: @mtache - add `config` field to `AntaCommand` object to handle this use case.
# Otherwise mypy complains about enable as it is only implemented for AsyncEOSDevice
# TODO: Should enable be also included in AntaDevice?
if not isinstance(device, AsyncEOSDevice):
msg = "anta exec collect-tech-support is only supported with AsyncEOSDevice for now."
raise UsageError(msg)
if device.enable and device._enable_password is not None: # pylint: disable=protected-access
commands.append({"cmd": "enable", "input": device._enable_password}) # pylint: disable=protected-access
elif device.enable:
commands.append({"cmd": "enable"})
commands.extend(
[
{"cmd": "configure terminal"},
{"cmd": "aaa authorization exec default local"},
],
)
logger.warning("Configuring 'aaa authorization exec default local' on device %s", device.name)
command = AntaCommand(command="show running-config | include aaa authorization exec default local", ofmt="text")
await device._session.cli(commands=commands) # pylint: disable=protected-access
logger.info("Configured 'aaa authorization exec default local' on device %s", device.name)
logger.debug("'aaa authorization exec default local' is already configured on device %s", device.name)
await device.copy(sources=filenames, destination=outdir, direction="from")

View file

@ -76,7 +76,11 @@ def from_cvp(ctx: click.Context, output: Path, host: str, username: str, passwor
required=True,
)
def from_ansible(ctx: click.Context, output: Path, ansible_group: str, ansible_inventory: Path) -> None:
"""Build ANTA inventory from an ansible inventory YAML file."""
"""Build ANTA inventory from an ansible inventory YAML file.
NOTE: This command does not support inline vaulted variables. Make sure to comment them out.
"""
logger.info("Building inventory from ansible file '%s'", ansible_inventory)
try:
create_inventory_from_ansible(

View file

@ -154,6 +154,15 @@ def create_inventory_from_ansible(inventory: Path, output: Path, ansible_group:
try:
with inventory.open(encoding="utf-8") as inv:
ansible_inventory = yaml.safe_load(inv)
except yaml.constructor.ConstructorError as exc:
if exc.problem and "!vault" in exc.problem:
logger.error(
"`anta get from-ansible` does not support inline vaulted variables, comment them out to generate your inventory. "
"If the vaulted variable is necessary to build the inventory (e.g. `ansible_host`), it needs to be unvaulted for "
"`from-ansible` command to work."
)
msg = f"Could not parse {inventory}."
raise ValueError(msg) from exc
except OSError as exc:
msg = f"Could not parse {inventory}."
raise ValueError(msg) from exc

View file

@ -99,6 +99,14 @@ HIDE_STATUS.remove("unset")
help="Group result by test or device.",
required=False,
)
@click.option(
"--dry-run",
help="Run anta nrfu command but stop before starting to execute the tests. Considers all devices as connected.",
type=str,
show_envvar=True,
is_flag=True,
default=False,
)
# pylint: disable=too-many-arguments
def nrfu(
ctx: click.Context,
@ -111,6 +119,7 @@ def nrfu(
*,
ignore_status: bool,
ignore_error: bool,
dry_run: bool,
) -> None:
"""Run ANTA tests on selected inventory devices."""
# If help is invoke somewhere, skip the command
@ -124,7 +133,19 @@ def nrfu(
ctx.obj["hide"] = set(hide) if hide else None
print_settings(inventory, catalog)
with anta_progress_bar() as AntaTest.progress:
asyncio.run(main(ctx.obj["result_manager"], inventory, catalog, tags=tags, devices=set(device) if device else None, tests=set(test) if test else None))
asyncio.run(
main(
ctx.obj["result_manager"],
inventory,
catalog,
tags=tags,
devices=set(device) if device else None,
tests=set(test) if test else None,
dry_run=dry_run,
)
)
if dry_run:
return
# Invoke `anta nrfu table` if no command is passed
if ctx.invoked_subcommand is None:
ctx.invoke(commands.table)

View file

@ -38,7 +38,7 @@ def print_settings(
catalog: AntaCatalog,
) -> None:
"""Print ANTA settings before running tests."""
message = f"Running ANTA tests:\n- {inventory}\n- Tests catalog contains {len(catalog.tests)} tests"
message = f"- {inventory}\n- Tests catalog contains {len(catalog.tests)} tests"
console.print(Panel.fit(message, style="cyan", title="[green]Settings"))
console.print()

View file

@ -12,7 +12,6 @@ from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable
import click
from pydantic import ValidationError
from yaml import YAMLError
from anta.catalog import AntaCatalog
@ -254,7 +253,7 @@ def inventory_options(f: Callable[..., Any]) -> Callable[..., Any]:
insecure=insecure,
disable_cache=disable_cache,
)
except (ValidationError, TypeError, ValueError, YAMLError, OSError, InventoryIncorrectSchemaError, InventoryRootKeyError):
except (TypeError, ValueError, YAMLError, OSError, InventoryIncorrectSchemaError, InventoryRootKeyError):
ctx.exit(ExitCode.USAGE_ERROR)
return f(*args, inventory=i, tags=tags, **kwargs)
@ -292,7 +291,7 @@ def catalog_options(f: Callable[..., Any]) -> Callable[..., Any]:
return f(*args, catalog=None, **kwargs)
try:
c = AntaCatalog.parse(catalog)
except (ValidationError, TypeError, ValueError, YAMLError, OSError):
except (TypeError, ValueError, YAMLError, OSError):
ctx.exit(ExitCode.USAGE_ERROR)
return f(*args, catalog=c, **kwargs)

View file

@ -9,6 +9,31 @@ from typing import Annotated, Literal
from pydantic import Field
from pydantic.functional_validators import AfterValidator, BeforeValidator
# Regular Expression definition
# TODO: make this configurable - with an env var maybe?
REGEXP_EOS_BLACKLIST_CMDS = [r"^reload.*", r"^conf\w*\s*(terminal|session)*", r"^wr\w*\s*\w+"]
"""List of regular expressions to blacklist from eos commands."""
REGEXP_PATH_MARKERS = r"[\\\/\s]"
"""Match directory path from string."""
REGEXP_INTERFACE_ID = r"\d+(\/\d+)*(\.\d+)?"
"""Match Interface ID lilke 1/1.1."""
REGEXP_TYPE_EOS_INTERFACE = r"^(Dps|Ethernet|Fabric|Loopback|Management|Port-Channel|Tunnel|Vlan|Vxlan)[0-9]+(\/[0-9]+)*(\.[0-9]+)?$"
"""Match EOS interface types like Ethernet1/1, Vlan1, Loopback1, etc."""
REGEXP_TYPE_VXLAN_SRC_INTERFACE = r"^(Loopback)([0-9]|[1-9][0-9]{1,2}|[1-7][0-9]{3}|8[01][0-9]{2}|819[01])$"
"""Match Vxlan source interface like Loopback10."""
REGEXP_TYPE_HOSTNAME = r"^(([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]*[a-zA-Z0-9])\.)*([A-Za-z0-9]|[A-Za-z0-9][A-Za-z0-9\-]*[A-Za-z0-9])$"
"""Match hostname like `my-hostname`, `my-hostname-1`, `my-hostname-1-2`."""
# Regexp BGP AFI/SAFI
REGEXP_BGP_L2VPN_AFI = r"\b(l2[\s\-]?vpn[\s\-]?evpn)\b"
"""Match L2VPN EVPN AFI."""
REGEXP_BGP_IPV4_MPLS_LABELS = r"\b(ipv4[\s\-]?mpls[\s\-]?label(s)?)\b"
"""Match IPv4 MPLS Labels."""
REGEX_BGP_IPV4_MPLS_VPN = r"\b(ipv4[\s\-]?mpls[\s\-]?vpn)\b"
"""Match IPv4 MPLS VPN."""
REGEX_BGP_IPV4_UNICAST = r"\b(ipv4[\s\-]?uni[\s\-]?cast)\b"
"""Match IPv4 Unicast."""
def aaa_group_prefix(v: str) -> str:
"""Prefix the AAA method with 'group' if it is known."""
@ -24,7 +49,7 @@ def interface_autocomplete(v: str) -> str:
- `po` will be changed to `Port-Channel`
- `lo` will be changed to `Loopback`
"""
intf_id_re = re.compile(r"[0-9]+(\/[0-9]+)*(\.[0-9]+)?")
intf_id_re = re.compile(REGEXP_INTERFACE_ID)
m = intf_id_re.search(v)
if m is None:
msg = f"Could not parse interface ID in interface '{v}'"
@ -33,11 +58,7 @@ def interface_autocomplete(v: str) -> str:
alias_map = {"et": "Ethernet", "eth": "Ethernet", "po": "Port-Channel", "lo": "Loopback"}
for alias, full_name in alias_map.items():
if v.lower().startswith(alias):
return f"{full_name}{intf_id}"
return v
return next((f"{full_name}{intf_id}" for alias, full_name in alias_map.items() if v.lower().startswith(alias)), v)
def interface_case_sensitivity(v: str) -> str:
@ -50,7 +71,7 @@ def interface_case_sensitivity(v: str) -> str:
- loopback -> Loopback
"""
if isinstance(v, str) and len(v) > 0 and not v[0].isupper():
if isinstance(v, str) and v != "" and not v[0].isupper():
return f"{v[0].upper()}{v[1:]}"
return v
@ -67,10 +88,10 @@ def bgp_multiprotocol_capabilities_abbreviations(value: str) -> str:
"""
patterns = {
r"\b(l2[\s\-]?vpn[\s\-]?evpn)\b": "l2VpnEvpn",
r"\bipv4[\s_-]?mpls[\s_-]?label(s)?\b": "ipv4MplsLabels",
r"\bipv4[\s_-]?mpls[\s_-]?vpn\b": "ipv4MplsVpn",
r"\bipv4[\s_-]?uni[\s_-]?cast\b": "ipv4Unicast",
REGEXP_BGP_L2VPN_AFI: "l2VpnEvpn",
REGEXP_BGP_IPV4_MPLS_LABELS: "ipv4MplsLabels",
REGEX_BGP_IPV4_MPLS_VPN: "ipv4MplsVpn",
REGEX_BGP_IPV4_UNICAST: "ipv4Unicast",
}
for pattern, replacement in patterns.items():
@ -81,6 +102,16 @@ def bgp_multiprotocol_capabilities_abbreviations(value: str) -> str:
return value
def validate_regex(value: str) -> str:
"""Validate that the input value is a valid regex format."""
try:
re.compile(value)
except re.error as e:
msg = f"Invalid regex: {e}"
raise ValueError(msg) from e
return value
# ANTA framework
TestStatus = Literal["unset", "success", "failure", "error", "skipped"]
@ -91,13 +122,19 @@ MlagPriority = Annotated[int, Field(ge=1, le=32767)]
Vni = Annotated[int, Field(ge=1, le=16777215)]
Interface = Annotated[
str,
Field(pattern=r"^(Dps|Ethernet|Fabric|Loopback|Management|Port-Channel|Tunnel|Vlan|Vxlan)[0-9]+(\/[0-9]+)*(\.[0-9]+)?$"),
Field(pattern=REGEXP_TYPE_EOS_INTERFACE),
BeforeValidator(interface_autocomplete),
BeforeValidator(interface_case_sensitivity),
]
EthernetInterface = Annotated[
str,
Field(pattern=r"^Ethernet[0-9]+(\/[0-9]+)*$"),
BeforeValidator(interface_autocomplete),
BeforeValidator(interface_case_sensitivity),
]
VxlanSrcIntf = Annotated[
str,
Field(pattern=r"^(Loopback)([0-9]|[1-9][0-9]{1,2}|[1-7][0-9]{3}|8[01][0-9]{2}|819[01])$"),
Field(pattern=REGEXP_TYPE_VXLAN_SRC_INTERFACE),
BeforeValidator(interface_autocomplete),
BeforeValidator(interface_case_sensitivity),
]
@ -105,7 +142,7 @@ Afi = Literal["ipv4", "ipv6", "vpn-ipv4", "vpn-ipv6", "evpn", "rt-membership", "
Safi = Literal["unicast", "multicast", "labeled-unicast", "sr-te"]
EncryptionAlgorithm = Literal["RSA", "ECDSA"]
RsaKeySize = Literal[2048, 3072, 4096]
EcdsaKeySize = Literal[256, 384, 521]
EcdsaKeySize = Literal[256, 384, 512]
MultiProtocolCaps = Annotated[str, BeforeValidator(bgp_multiprotocol_capabilities_abbreviations)]
BfdInterval = Annotated[int, Field(ge=50, le=60000)]
BfdMultiplier = Annotated[int, Field(ge=3, le=50)]
@ -127,5 +164,6 @@ ErrDisableInterval = Annotated[int, Field(ge=30, le=86400)]
Percent = Annotated[float, Field(ge=0.0, le=100.0)]
PositiveInteger = Annotated[int, Field(ge=0)]
Revision = Annotated[int, Field(ge=1, le=99)]
Hostname = Annotated[str, Field(pattern=r"^(([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]*[a-zA-Z0-9])\.)*([A-Za-z0-9]|[A-Za-z0-9][A-Za-z0-9\-]*[A-Za-z0-9])$")]
Hostname = Annotated[str, Field(pattern=REGEXP_TYPE_HOSTNAME)]
Port = Annotated[int, Field(ge=1, le=65535)]
RegexString = Annotated[str, AfterValidator(validate_regex)]

View file

@ -18,7 +18,8 @@ from aiocache.plugins import HitMissRatioPlugin
from asyncssh import SSHClientConnection, SSHClientConnectionOptions
from httpx import ConnectError, HTTPError, TimeoutException
from anta import __DEBUG__, aioeapi
import asynceapi
from anta import __DEBUG__
from anta.logger import anta_log_exception, exc_to_str
from anta.models import AntaCommand
@ -116,7 +117,7 @@ class AntaDevice(ABC):
yield "disable_cache", self.cache is None
@abstractmethod
async def _collect(self, command: AntaCommand) -> None:
async def _collect(self, command: AntaCommand, *, collection_id: str | None = None) -> None:
"""Collect device command output.
This abstract coroutine can be used to implement any command collection method
@ -131,11 +132,11 @@ class AntaDevice(ABC):
Args:
----
command: the command to collect
command: The command to collect.
collection_id: An identifier used to build the eAPI request ID.
"""
async def collect(self, command: AntaCommand) -> None:
async def collect(self, command: AntaCommand, *, collection_id: str | None = None) -> None:
"""Collect the output for a specified command.
When caching is activated on both the device and the command,
@ -148,8 +149,8 @@ class AntaDevice(ABC):
Args:
----
command (AntaCommand): The command to process.
command: The command to collect.
collection_id: An identifier used to build the eAPI request ID.
"""
# Need to ignore pylint no-member as Cache is a proxy class and pylint is not smart enough
# https://github.com/pylint-dev/pylint/issues/7258
@ -161,20 +162,20 @@ class AntaDevice(ABC):
logger.debug("Cache hit for %s on %s", command.command, self.name)
command.output = cached_output
else:
await self._collect(command=command)
await self._collect(command=command, collection_id=collection_id)
await self.cache.set(command.uid, command.output) # pylint: disable=no-member
else:
await self._collect(command=command)
await self._collect(command=command, collection_id=collection_id)
async def collect_commands(self, commands: list[AntaCommand]) -> None:
async def collect_commands(self, commands: list[AntaCommand], *, collection_id: str | None = None) -> None:
"""Collect multiple commands.
Args:
----
commands: the commands to collect
commands: The commands to collect.
collection_id: An identifier used to build the eAPI request ID.
"""
await asyncio.gather(*(self.collect(command=command) for command in commands))
await asyncio.gather(*(self.collect(command=command, collection_id=collection_id) for command in commands))
@abstractmethod
async def refresh(self) -> None:
@ -270,7 +271,7 @@ class AsyncEOSDevice(AntaDevice):
raise ValueError(message)
self.enable = enable
self._enable_password = enable_password
self._session: aioeapi.Device = aioeapi.Device(host=host, port=port, username=username, password=password, proto=proto, timeout=timeout)
self._session: asynceapi.Device = asynceapi.Device(host=host, port=port, username=username, password=password, proto=proto, timeout=timeout)
ssh_params: dict[str, Any] = {}
if insecure:
ssh_params["known_hosts"] = None
@ -305,7 +306,7 @@ class AsyncEOSDevice(AntaDevice):
"""
return (self._session.host, self._session.port)
async def _collect(self, command: AntaCommand) -> None: # noqa: C901 function is too complex - because of many required except blocks
async def _collect(self, command: AntaCommand, *, collection_id: str | None = None) -> None: # noqa: C901 function is too complex - because of many required except blocks #pylint: disable=line-too-long
"""Collect device command output from EOS using aio-eapi.
Supports outformat `json` and `text` as output structure.
@ -314,9 +315,10 @@ class AsyncEOSDevice(AntaDevice):
Args:
----
command: the AntaCommand to collect.
command: The command to collect.
collection_id: An identifier used to build the eAPI request ID.
"""
commands: list[dict[str, Any]] = []
commands: list[dict[str, str | int]] = []
if self.enable and self._enable_password is not None:
commands.append(
{
@ -329,14 +331,15 @@ class AsyncEOSDevice(AntaDevice):
commands.append({"cmd": "enable"})
commands += [{"cmd": command.command, "revision": command.revision}] if command.revision else [{"cmd": command.command}]
try:
response: list[dict[str, Any]] = await self._session.cli(
response: list[dict[str, Any] | str] = await self._session.cli(
commands=commands,
ofmt=command.ofmt,
version=command.version,
)
req_id=f"ANTA-{collection_id}-{id(command)}" if collection_id else f"ANTA-{id(command)}",
) # type: ignore[assignment] # multiple commands returns a list
# Do not keep response of 'enable' command
command.output = response[-1]
except aioeapi.EapiCommandError as e:
except asynceapi.EapiCommandError as e:
# This block catches exceptions related to EOS issuing an error.
command.errors = e.errors
if command.requires_privileges:

View file

@ -7,6 +7,7 @@ from __future__ import annotations
import logging
import traceback
from datetime import timedelta
from enum import Enum
from typing import TYPE_CHECKING, Literal
@ -87,6 +88,12 @@ def setup_logging(level: LogLevel = Log.INFO, file: Path | None = None) -> None:
logger.debug("ANTA Debug Mode enabled")
def format_td(seconds: float, digits: int = 3) -> str:
"""Return a formatted string from a float number representing seconds and a number of digits."""
isec, fsec = divmod(round(seconds * 10**digits), 10**digits)
return f"{timedelta(seconds=isec)}.{fsec:0{digits}.0f}"
def exc_to_str(exception: BaseException) -> str:
"""Return a human readable string from an BaseException object."""
return f"{type(exception).__name__}{f': {exception}' if str(exception) else ''}"

View file

@ -8,10 +8,7 @@ from __future__ import annotations
import hashlib
import logging
import re
import time
from abc import ABC, abstractmethod
from copy import deepcopy
from datetime import timedelta
from functools import wraps
from string import Formatter
from typing import TYPE_CHECKING, Any, Callable, ClassVar, Literal, TypeVar
@ -19,7 +16,7 @@ from typing import TYPE_CHECKING, Any, Callable, ClassVar, Literal, TypeVar
from pydantic import BaseModel, ConfigDict, ValidationError, create_model
from anta import GITHUB_SUGGESTION
from anta.custom_types import Revision
from anta.custom_types import REGEXP_EOS_BLACKLIST_CMDS, Revision
from anta.logger import anta_log_exception, exc_to_str
from anta.result_manager.models import TestResult
@ -35,9 +32,6 @@ F = TypeVar("F", bound=Callable[..., Any])
# This would imply overhead to define classes
# https://stackoverflow.com/questions/74103528/type-hinting-an-instance-of-a-nested-class
# TODO: make this configurable - with an env var maybe?
BLACKLIST_REGEX = [r"^reload.*", r"^conf\w*\s*(terminal|session)*", r"^wr\w*\s*\w+"]
logger = logging.getLogger(__name__)
@ -46,19 +40,8 @@ class AntaParamsBaseModel(BaseModel):
model_config = ConfigDict(extra="forbid")
if not TYPE_CHECKING:
# Following pydantic declaration and keeping __getattr__ only when TYPE_CHECKING is false.
# Disabling 1 Dynamically typed expressions (typing.Any) are disallowed in `__getattr__
# ruff: noqa: ANN401
def __getattr__(self, item: str) -> Any:
"""For AntaParams if we try to access an attribute that is not present We want it to be None."""
try:
return super().__getattr__(item)
except AttributeError:
return None
class AntaTemplate(BaseModel):
class AntaTemplate:
"""Class to define a command template as Python f-string.
Can render a command from parameters.
@ -70,14 +53,42 @@ class AntaTemplate(BaseModel):
revision: Revision of the command. Valid values are 1 to 99. Revision has precedence over version.
ofmt: eAPI output - json or text.
use_cache: Enable or disable caching for this AntaTemplate if the AntaDevice supports it.
"""
template: str
version: Literal[1, "latest"] = "latest"
revision: Revision | None = None
ofmt: Literal["json", "text"] = "json"
use_cache: bool = True
# pylint: disable=too-few-public-methods
def __init__( # noqa: PLR0913
self,
template: str,
version: Literal[1, "latest"] = "latest",
revision: Revision | None = None,
ofmt: Literal["json", "text"] = "json",
*,
use_cache: bool = True,
) -> None:
# pylint: disable=too-many-arguments
self.template = template
self.version = version
self.revision = revision
self.ofmt = ofmt
self.use_cache = use_cache
# Create a AntaTemplateParams model to elegantly store AntaTemplate variables
field_names = [fname for _, fname, _, _ in Formatter().parse(self.template) if fname]
# Extracting the type from the params based on the expected field_names from the template
fields: dict[str, Any] = {key: (Any, ...) for key in field_names}
self.params_schema = create_model(
"AntaParams",
__base__=AntaParamsBaseModel,
**fields,
)
def __repr__(self) -> str:
"""Return the representation of the class.
Copying pydantic model style, excluding `params_schema`
"""
return " ".join(f"{a}={v!r}" for a, v in vars(self).items() if a != "params_schema")
def render(self, **params: str | int | bool) -> AntaCommand:
"""Render an AntaCommand from an AntaTemplate instance.
@ -90,34 +101,28 @@ class AntaTemplate(BaseModel):
Returns
-------
command: The rendered AntaCommand.
This AntaCommand instance have a template attribute that references this
AntaTemplate instance.
The rendered AntaCommand.
This AntaCommand instance have a template attribute that references this
AntaTemplate instance.
Raises
------
AntaTemplateRenderError
If a parameter is missing to render the AntaTemplate instance.
"""
# Create params schema on the fly
field_names = [fname for _, fname, _, _ in Formatter().parse(self.template) if fname]
# Extracting the type from the params based on the expected field_names from the template
fields: dict[str, Any] = {key: (type(params.get(key)), ...) for key in field_names}
# Accepting ParamsSchema as non lowercase variable
ParamsSchema = create_model( # noqa: N806
"ParamsSchema",
__base__=AntaParamsBaseModel,
**fields,
)
try:
return AntaCommand(
command=self.template.format(**params),
ofmt=self.ofmt,
version=self.version,
revision=self.revision,
template=self,
params=ParamsSchema(**params),
use_cache=self.use_cache,
)
except KeyError as e:
command = self.template.format(**params)
except (KeyError, SyntaxError) as e:
raise AntaTemplateRenderError(self, e.args[0]) from e
return AntaCommand(
command=command,
ofmt=self.ofmt,
version=self.version,
revision=self.revision,
template=self,
params=self.params_schema(**params),
use_cache=self.use_cache,
)
class AntaCommand(BaseModel):
@ -148,6 +153,8 @@ class AntaCommand(BaseModel):
"""
model_config = ConfigDict(arbitrary_types_allowed=True)
command: str
version: Literal[1, "latest"] = "latest"
revision: Revision | None = None
@ -273,14 +280,13 @@ class AntaTest(ABC):
vrf: str = "default"
def render(self, template: AntaTemplate) -> list[AntaCommand]:
return [template.render({"dst": host.dst, "src": host.src, "vrf": host.vrf}) for host in self.inputs.hosts]
return [template.render(dst=host.dst, src=host.src, vrf=host.vrf) for host in self.inputs.hosts]
@AntaTest.anta_test
def test(self) -> None:
failures = []
for command in self.instance_commands:
if command.params and ("src" and "dst") in command.params:
src, dst = command.params["src"], command.params["dst"]
src, dst = command.params.src, command.params.dst
if "2 received" not in command.json_output["messages"][0]:
failures.append((str(src), str(dst)))
if not failures:
@ -288,13 +294,14 @@ class AntaTest(ABC):
else:
self.result.is_failure(f"Connectivity test failed for the following source-destination pairs: {failures}")
```
Attributes:
Attributes
----------
device: AntaDevice instance on which this test is run
inputs: AntaTest.Input instance carrying the test inputs
instance_commands: List of AntaCommand instances of this test
result: TestResult instance representing the result of this test
logger: Python logger for this test instance
"""
# Mandatory class attributes
@ -322,9 +329,10 @@ class AntaTest(ABC):
description: "Test with overwritten description"
custom_field: "Test run by John Doe"
```
Attributes:
result_overwrite: Define fields to overwrite in the TestResult object
Attributes
----------
result_overwrite: Define fields to overwrite in the TestResult object
"""
model_config = ConfigDict(extra="forbid")
@ -360,7 +368,6 @@ class AntaTest(ABC):
Attributes
----------
tags: Tag of devices on which to run the test.
"""
model_config = ConfigDict(extra="forbid")
@ -380,9 +387,8 @@ class AntaTest(ABC):
inputs: dictionary of attributes used to instantiate the AntaTest.Input instance
eos_data: Populate outputs of the test commands instead of collecting from devices.
This list must have the same length and order than the `instance_commands` instance attribute.
"""
self.logger: logging.Logger = logging.getLogger(f"{self.__module__}.{self.__class__.__name__}")
self.logger: logging.Logger = logging.getLogger(f"{self.module}.{self.__class__.__name__}")
self.device: AntaDevice = device
self.inputs: AntaTest.Input
self.instance_commands: list[AntaCommand] = []
@ -411,7 +417,7 @@ class AntaTest(ABC):
elif isinstance(inputs, dict):
self.inputs = self.Input(**inputs)
except ValidationError as e:
message = f"{self.__module__}.{self.__class__.__name__}: Inputs are not valid\n{e}"
message = f"{self.module}.{self.name}: Inputs are not valid\n{e}"
self.logger.error(message)
self.result.is_error(message=message)
return
@ -434,7 +440,7 @@ class AntaTest(ABC):
if self.__class__.commands:
for cmd in self.__class__.commands:
if isinstance(cmd, AntaCommand):
self.instance_commands.append(deepcopy(cmd))
self.instance_commands.append(cmd.model_copy())
elif isinstance(cmd, AntaTemplate):
try:
self.instance_commands.extend(self.render(cmd))
@ -448,7 +454,7 @@ class AntaTest(ABC):
# render() is user-defined code.
# We need to catch everything if we want the AntaTest object
# to live until the reporting
message = f"Exception in {self.__module__}.{self.__class__.__name__}.render()"
message = f"Exception in {self.module}.{self.__class__.__name__}.render()"
anta_log_exception(e, message, self.logger)
self.result.is_error(message=f"{message}: {exc_to_str(e)}")
return
@ -476,14 +482,19 @@ class AntaTest(ABC):
msg = f"Class {cls.__module__}.{cls.__name__} is missing required class attribute {attr}"
raise NotImplementedError(msg)
@property
def module(self) -> str:
"""Return the Python module in which this AntaTest class is defined."""
return self.__module__
@property
def collected(self) -> bool:
"""Returns True if all commands for this test have been collected."""
"""Return True if all commands for this test have been collected."""
return all(command.collected for command in self.instance_commands)
@property
def failed_commands(self) -> list[AntaCommand]:
"""Returns a list of all the commands that have failed."""
"""Return a list of all the commands that have failed."""
return [command for command in self.instance_commands if command.error]
def render(self, template: AntaTemplate) -> list[AntaCommand]:
@ -493,7 +504,7 @@ class AntaTest(ABC):
no AntaTemplate for this test.
"""
_ = template
msg = f"AntaTemplate are provided but render() method has not been implemented for {self.__module__}.{self.name}"
msg = f"AntaTemplate are provided but render() method has not been implemented for {self.module}.{self.__class__.__name__}"
raise NotImplementedError(msg)
@property
@ -501,12 +512,12 @@ class AntaTest(ABC):
"""Check if CLI commands contain a blocked keyword."""
state = False
for command in self.instance_commands:
for pattern in BLACKLIST_REGEX:
for pattern in REGEXP_EOS_BLACKLIST_CMDS:
if re.match(pattern, command.command):
self.logger.error(
"Command <%s> is blocked for security reason matching %s",
command.command,
BLACKLIST_REGEX,
REGEXP_EOS_BLACKLIST_CMDS,
)
self.result.is_error(f"<{command.command}> is blocked for security reason")
state = True
@ -516,7 +527,7 @@ class AntaTest(ABC):
"""Collect outputs of all commands of this test class from the device of this test instance."""
try:
if self.blocked is False:
await self.device.collect_commands(self.instance_commands)
await self.device.collect_commands(self.instance_commands, collection_id=self.name)
except Exception as e: # pylint: disable=broad-exception-caught
# device._collect() is user-defined code.
# We need to catch everything if we want the AntaTest object
@ -557,12 +568,6 @@ class AntaTest(ABC):
result: TestResult instance attribute populated with error status if any
"""
def format_td(seconds: float, digits: int = 3) -> str:
isec, fsec = divmod(round(seconds * 10**digits), 10**digits)
return f"{timedelta(seconds=isec)}.{fsec:0{digits}.0f}"
start_time = time.time()
if self.result.result != "unset":
return self.result
@ -575,6 +580,7 @@ class AntaTest(ABC):
if not self.collected:
await self.collect()
if self.result.result != "unset":
AntaTest.update_progress()
return self.result
if cmds := self.failed_commands:
@ -583,8 +589,9 @@ class AntaTest(ABC):
msg = f"Test {self.name} has been skipped because it is not supported on {self.device.hw_model}: {GITHUB_SUGGESTION}"
self.logger.warning(msg)
self.result.is_skipped("\n".join(unsupported_commands))
return self.result
self.result.is_error(message="\n".join([f"{c.command} has failed: {', '.join(c.errors)}" for c in cmds]))
else:
self.result.is_error(message="\n".join([f"{c.command} has failed: {', '.join(c.errors)}" for c in cmds]))
AntaTest.update_progress()
return self.result
try:
@ -597,10 +604,7 @@ class AntaTest(ABC):
anta_log_exception(e, message, self.logger)
self.result.is_error(message=exc_to_str(e))
test_duration = time.time() - start_time
msg = f"Executing test {self.name} on device {self.device.name} took {format_td(test_duration)}"
self.logger.debug(msg)
# TODO: find a correct way to time test execution
AntaTest.update_progress()
return self.result

View file

@ -215,12 +215,12 @@ class ReportJinja:
def __init__(self, template_path: pathlib.Path) -> None:
"""Create a ReportJinja instance."""
if template_path.is_file():
self.tempalte_path = template_path
else:
if not template_path.is_file():
msg = f"template file is not found: {template_path}"
raise FileNotFoundError(msg)
self.template_path = template_path
def render(self, data: list[dict[str, Any]], *, trim_blocks: bool = True, lstrip_blocks: bool = True) -> str:
"""Build a report based on a Jinja2 template.
@ -250,7 +250,7 @@ class ReportJinja:
Rendered template
"""
with self.tempalte_path.open(encoding="utf-8") as file_:
with self.template_path.open(encoding="utf-8") as file_:
template = Template(file_.read(), trim_blocks=trim_blocks, lstrip_blocks=lstrip_blocks)
return template.render({"data": data})

View file

@ -48,19 +48,25 @@ class ResultManager:
manager.results
[
TestResult(
host=IPv4Address('192.168.0.10'),
test='VerifyNTP',
result='failure',
message="device is not running NTP correctly"
name="pf1",
test="VerifyZeroTouch",
categories=["configuration"],
description="Verifies ZeroTouch is disabled",
result="success",
messages=[],
custom_field=None,
),
TestResult(
host=IPv4Address('192.168.0.10'),
test='VerifyEOSVersion',
result='success',
message=None
name="pf1",
test='VerifyNTP',
categories=["software"],
categories=['system'],
description='Verifies if NTP is synchronised.',
result='failure',
messages=["The device is not synchronized with the configured NTP server(s): 'NTP is disabled.'"],
custom_field=None,
),
]
"""
def __init__(self) -> None:

View file

@ -1,7 +1,6 @@
# Copyright (c) 2023-2024 Arista Networks, Inc.
# Use of this source code is governed by the Apache License 2.0
# that can be found in the LICENSE file.
# pylint: disable=too-many-branches
"""ANTA runner function."""
from __future__ import annotations
@ -10,31 +9,51 @@ import asyncio
import logging
import os
import resource
from typing import TYPE_CHECKING
from collections import defaultdict
from typing import TYPE_CHECKING, Any
from anta import GITHUB_SUGGESTION
from anta.catalog import AntaCatalog, AntaTestDefinition
from anta.device import AntaDevice
from anta.logger import anta_log_exception, exc_to_str
from anta.models import AntaTest
from anta.tools import Catchtime, cprofile
if TYPE_CHECKING:
from collections.abc import Coroutine
from anta.catalog import AntaCatalog, AntaTestDefinition
from anta.device import AntaDevice
from anta.inventory import AntaInventory
from anta.result_manager import ResultManager
from anta.result_manager.models import TestResult
logger = logging.getLogger(__name__)
AntaTestRunner = tuple[AntaTestDefinition, AntaDevice]
# Environment variable to set ANTA's maximum number of open file descriptors.
# Maximum number of file descriptor the ANTA process will be able to open.
# This limit is independent from the system's hard limit, the lower will be used.
DEFAULT_NOFILE = 16384
try:
__NOFILE__ = int(os.environ.get("ANTA_NOFILE", DEFAULT_NOFILE))
except ValueError as exception:
logger.warning("The ANTA_NOFILE environment variable value is invalid: %s\nDefault to %s.", exc_to_str(exception), DEFAULT_NOFILE)
__NOFILE__ = DEFAULT_NOFILE
def adjust_rlimit_nofile() -> tuple[int, int]:
"""Adjust the maximum number of open file descriptors for the ANTA process.
The limit is set to the lower of the current hard limit and the value of the ANTA_NOFILE environment variable.
If the `ANTA_NOFILE` environment variable is not set or is invalid, `DEFAULT_NOFILE` is used.
Returns
-------
tuple[int, int]: The new soft and hard limits for open file descriptors.
"""
try:
nofile = int(os.environ.get("ANTA_NOFILE", DEFAULT_NOFILE))
except ValueError as exception:
logger.warning("The ANTA_NOFILE environment variable value is invalid: %s\nDefault to %s.", exc_to_str(exception), DEFAULT_NOFILE)
nofile = DEFAULT_NOFILE
limits = resource.getrlimit(resource.RLIMIT_NOFILE)
logger.debug("Initial limit numbers for open file descriptors for the current ANTA process: Soft Limit: %s | Hard Limit: %s", limits[0], limits[1])
nofile = nofile if limits[1] > nofile else limits[1]
logger.debug("Setting soft limit for open file descriptors for the current ANTA process to %s", nofile)
resource.setrlimit(resource.RLIMIT_NOFILE, (nofile, limits[1]))
return resource.getrlimit(resource.RLIMIT_NOFILE)
def log_cache_statistics(devices: list[AntaDevice]) -> None:
@ -56,7 +75,120 @@ def log_cache_statistics(devices: list[AntaDevice]) -> None:
logger.info("Caching is not enabled on %s", device.name)
async def main( # noqa: PLR0912 PLR0913 too-many-branches too-many-arguments - keep the main method readable
async def setup_inventory(inventory: AntaInventory, tags: set[str] | None, devices: set[str] | None, *, established_only: bool) -> AntaInventory | None:
"""Set up the inventory for the ANTA run.
Args:
----
inventory: AntaInventory object that includes the device(s).
tags: Tags to filter devices from the inventory.
devices: Devices on which to run tests. None means all devices.
Returns
-------
AntaInventory | None: The filtered inventory or None if there are no devices to run tests on.
"""
if len(inventory) == 0:
logger.info("The inventory is empty, exiting")
return None
# Filter the inventory based on the CLI provided tags and devices if any
selected_inventory = inventory.get_inventory(tags=tags, devices=devices) if tags or devices else inventory
with Catchtime(logger=logger, message="Connecting to devices"):
# Connect to the devices
await selected_inventory.connect_inventory()
# Remove devices that are unreachable
selected_inventory = selected_inventory.get_inventory(established_only=established_only)
# If there are no devices in the inventory after filtering, exit
if not selected_inventory.devices:
msg = f'No reachable device {f"matching the tags {tags} " if tags else ""}was found.{f" Selected devices: {devices} " if devices is not None else ""}'
logger.warning(msg)
return None
return selected_inventory
def prepare_tests(
inventory: AntaInventory, catalog: AntaCatalog, tests: set[str] | None, tags: set[str] | None
) -> defaultdict[AntaDevice, set[AntaTestDefinition]] | None:
"""Prepare the tests to run.
Args:
----
inventory: AntaInventory object that includes the device(s).
catalog: AntaCatalog object that includes the list of tests.
tests: Tests to run against devices. None means all tests.
tags: Tags to filter devices from the inventory.
Returns
-------
A mapping of devices to the tests to run or None if there are no tests to run.
"""
# Build indexes for the catalog. If `tests` is set, filter the indexes based on these tests
catalog.build_indexes(filtered_tests=tests)
# Using a set to avoid inserting duplicate tests
device_to_tests: defaultdict[AntaDevice, set[AntaTestDefinition]] = defaultdict(set)
# Create AntaTestRunner tuples from the tags
for device in inventory.devices:
if tags:
# If there are CLI tags, only execute tests with matching tags
device_to_tests[device].update(catalog.get_tests_by_tags(tags))
else:
# If there is no CLI tags, execute all tests that do not have any tags
device_to_tests[device].update(catalog.tag_to_tests[None])
# Then add the tests with matching tags from device tags
device_to_tests[device].update(catalog.get_tests_by_tags(device.tags))
catalog.final_tests_count += len(device_to_tests[device])
if catalog.final_tests_count == 0:
msg = (
f"There are no tests{f' matching the tags {tags} ' if tags else ' '}to run in the current test catalog and device inventory, please verify your inputs."
)
logger.warning(msg)
return None
return device_to_tests
def get_coroutines(selected_tests: defaultdict[AntaDevice, set[AntaTestDefinition]]) -> list[Coroutine[Any, Any, TestResult]]:
"""Get the coroutines for the ANTA run.
Args:
----
selected_tests: A mapping of devices to the tests to run. The selected tests are generated by the `prepare_tests` function.
Returns
-------
The list of coroutines to run.
"""
coros = []
for device, test_definitions in selected_tests.items():
for test in test_definitions:
try:
test_instance = test.test(device=device, inputs=test.inputs)
coros.append(test_instance.test())
except Exception as e: # noqa: PERF203, pylint: disable=broad-exception-caught
# An AntaTest instance is potentially user-defined code.
# We need to catch everything and exit gracefully with an error message.
message = "\n".join(
[
f"There is an error when creating test {test.test.module}.{test.test.__name__}.",
f"If this is not a custom test implementation: {GITHUB_SUGGESTION}",
],
)
anta_log_exception(e, message, logger)
return coros
@cprofile()
async def main( # noqa: PLR0913
manager: ResultManager,
inventory: AntaInventory,
catalog: AntaCatalog,
@ -65,6 +197,7 @@ async def main( # noqa: PLR0912 PLR0913 too-many-branches too-many-arguments -
tags: set[str] | None = None,
*,
established_only: bool = True,
dry_run: bool = False,
) -> None:
# pylint: disable=too-many-arguments
"""Run ANTA.
@ -77,103 +210,61 @@ async def main( # noqa: PLR0912 PLR0913 too-many-branches too-many-arguments -
manager: ResultManager object to populate with the test results.
inventory: AntaInventory object that includes the device(s).
catalog: AntaCatalog object that includes the list of tests.
devices: devices on which to run tests. None means all devices.
tests: tests to run against devices. None means all tests.
tags: Tags to filter devices from the inventory.
devices: Devices on which to run tests. None means all devices. These may come from the `--device / -d` CLI option in NRFU.
tests: Tests to run against devices. None means all tests. These may come from the `--test / -t` CLI option in NRFU.
tags: Tags to filter devices from the inventory. These may come from the `--tags` CLI option in NRFU.
established_only: Include only established device(s).
dry_run: Build the list of coroutine to run and stop before test execution.
"""
limits = resource.getrlimit(resource.RLIMIT_NOFILE)
logger.debug("Initial limit numbers for open file descriptors for the current ANTA process: Soft Limit: %s | Hard Limit: %s", limits[0], limits[1])
nofile = __NOFILE__ if limits[1] > __NOFILE__ else limits[1]
logger.debug("Setting soft limit for open file descriptors for the current ANTA process to %s", nofile)
resource.setrlimit(resource.RLIMIT_NOFILE, (nofile, limits[1]))
limits = resource.getrlimit(resource.RLIMIT_NOFILE)
# Adjust the maximum number of open file descriptors for the ANTA process
limits = adjust_rlimit_nofile()
if not catalog.tests:
logger.info("The list of tests is empty, exiting")
return
if len(inventory) == 0:
logger.info("The inventory is empty, exiting")
return
# Filter the inventory based on tags and devices parameters
selected_inventory = inventory.get_inventory(
tags=tags,
devices=devices,
)
await selected_inventory.connect_inventory()
with Catchtime(logger=logger, message="Preparing ANTA NRFU Run"):
# Setup the inventory
selected_inventory = inventory if dry_run else await setup_inventory(inventory, tags, devices, established_only=established_only)
if selected_inventory is None:
return
# Remove devices that are unreachable
inventory = selected_inventory.get_inventory(established_only=established_only)
with Catchtime(logger=logger, message="Preparing the tests"):
selected_tests = prepare_tests(selected_inventory, catalog, tests, tags)
if selected_tests is None:
return
if not inventory.devices:
msg = f'No reachable device {f"matching the tags {tags} " if tags else ""}was found.{f" Selected devices: {devices} " if devices is not None else ""}'
logger.warning(msg)
return
coros = []
# Select the tests from the catalog
if tests:
catalog = AntaCatalog(catalog.get_tests_by_names(tests))
# Using a set to avoid inserting duplicate tests
selected_tests: set[AntaTestRunner] = set()
# Create AntaTestRunner tuples from the tags
for device in inventory.devices:
if tags:
# If there are CLI tags, only execute tests with matching tags
selected_tests.update((test, device) for test in catalog.get_tests_by_tags(tags))
else:
# If there is no CLI tags, execute all tests that do not have any filters
selected_tests.update((t, device) for t in catalog.tests if t.inputs.filters is None or t.inputs.filters.tags is None)
# Then add the tests with matching tags from device tags
selected_tests.update((t, device) for t in catalog.get_tests_by_tags(device.tags))
if not selected_tests:
msg = f"There is no tests{f' matching the tags {tags} ' if tags else ' '}to run in the current test catalog and device inventory, please verify your inputs."
logger.warning(msg)
return
run_info = (
"--- ANTA NRFU Run Information ---\n"
f"Number of devices: {len(selected_inventory)} ({len(inventory)} established)\n"
f"Total number of selected tests: {len(selected_tests)}\n"
f"Maximum number of open file descriptors for the current ANTA process: {limits[0]}\n"
"---------------------------------"
)
logger.info(run_info)
if len(selected_tests) > limits[0]:
logger.warning(
"The number of concurrent tests is higher than the open file descriptors limit for this ANTA process.\n"
"Errors may occur while running the tests.\n"
"Please consult the ANTA FAQ."
run_info = (
"--- ANTA NRFU Run Information ---\n"
f"Number of devices: {len(inventory)} ({len(selected_inventory)} established)\n"
f"Total number of selected tests: {catalog.final_tests_count}\n"
f"Maximum number of open file descriptors for the current ANTA process: {limits[0]}\n"
"---------------------------------"
)
for test_definition, device in selected_tests:
try:
test_instance = test_definition.test(device=device, inputs=test_definition.inputs)
logger.info(run_info)
coros.append(test_instance.test())
except Exception as e: # pylint: disable=broad-exception-caught
# An AntaTest instance is potentially user-defined code.
# We need to catch everything and exit gracefully with an
# error message
message = "\n".join(
[
f"There is an error when creating test {test_definition.test.__module__}.{test_definition.test.__name__}.",
f"If this is not a custom test implementation: {GITHUB_SUGGESTION}",
],
if catalog.final_tests_count > limits[0]:
logger.warning(
"The number of concurrent tests is higher than the open file descriptors limit for this ANTA process.\n"
"Errors may occur while running the tests.\n"
"Please consult the ANTA FAQ."
)
anta_log_exception(e, message, logger)
coroutines = get_coroutines(selected_tests)
if dry_run:
logger.info("Dry-run mode, exiting before running the tests.")
for coro in coroutines:
coro.close()
return
if AntaTest.progress is not None:
AntaTest.nrfu_task = AntaTest.progress.add_task("Running NRFU Tests...", total=len(coros))
AntaTest.nrfu_task = AntaTest.progress.add_task("Running NRFU Tests...", total=len(coroutines))
logger.info("Running ANTA tests...")
test_results = await asyncio.gather(*coros)
for r in test_results:
manager.add(r)
with Catchtime(logger=logger, message="Running ANTA tests"):
test_results = await asyncio.gather(*coroutines)
for r in test_results:
manager.add(r)
log_cache_statistics(inventory.devices)
log_cache_statistics(selected_inventory.devices)

234
anta/tests/avt.py Normal file
View file

@ -0,0 +1,234 @@
# Copyright (c) 2023-2024 Arista Networks, Inc.
# Use of this source code is governed by the Apache License 2.0
# that can be found in the LICENSE file.
"""Module related to Adaptive virtual topology tests."""
# Mypy does not understand AntaTest.Input typing
# mypy: disable-error-code=attr-defined
from __future__ import annotations
from ipaddress import IPv4Address
from typing import ClassVar
from pydantic import BaseModel
from anta.decorators import skip_on_platforms
from anta.models import AntaCommand, AntaTemplate, AntaTest
from anta.tools import get_value
class VerifyAVTPathHealth(AntaTest):
"""
Verifies the status of all Adaptive Virtual Topology (AVT) paths for all VRFs.
Expected Results
----------------
* Success: The test will pass if all AVT paths for all VRFs are active and valid.
* Failure: The test will fail if the AVT path is not configured or if any AVT path under any VRF is either inactive or invalid.
Examples
--------
```yaml
anta.tests.avt:
- VerifyAVTPathHealth:
```
"""
name = "VerifyAVTPathHealth"
description = "Verifies the status of all AVT paths for all VRFs."
categories: ClassVar[list[str]] = ["avt"]
commands: ClassVar[list[AntaCommand | AntaTemplate]] = [AntaCommand(command="show adaptive-virtual-topology path")]
@skip_on_platforms(["cEOSLab", "vEOS-lab"])
@AntaTest.anta_test
def test(self) -> None:
"""Main test function for VerifyAVTPathHealth."""
# Initialize the test result as success
self.result.is_success()
# Get the command output
command_output = self.instance_commands[0].json_output.get("vrfs", {})
# Check if AVT is configured
if not command_output:
self.result.is_failure("Adaptive virtual topology paths are not configured.")
return
# Iterate over each VRF
for vrf, vrf_data in command_output.items():
# Iterate over each AVT path
for profile, avt_path in vrf_data.get("avts", {}).items():
for path, flags in avt_path.get("avtPaths", {}).items():
# Get the status of the AVT path
valid = flags["flags"]["valid"]
active = flags["flags"]["active"]
# Check the status of the AVT path
if not valid and not active:
self.result.is_failure(f"AVT path {path} for profile {profile} in VRF {vrf} is invalid and not active.")
elif not valid:
self.result.is_failure(f"AVT path {path} for profile {profile} in VRF {vrf} is invalid.")
elif not active:
self.result.is_failure(f"AVT path {path} for profile {profile} in VRF {vrf} is not active.")
class VerifyAVTSpecificPath(AntaTest):
"""
Verifies the status and type of an Adaptive Virtual Topology (AVT) path for a specified VRF.
Expected Results
----------------
* Success: The test will pass if all AVT paths for the specified VRF are active, valid, and match the specified type (direct/multihop) if provided.
If multiple paths are configured, the test will pass only if all the paths are valid and active.
* Failure: The test will fail if no AVT paths are configured for the specified VRF, or if any configured path is not active, valid,
or does not match the specified type.
Examples
--------
```yaml
anta.tests.avt:
- VerifyAVTSpecificPath:
avt_paths:
- avt_name: CONTROL-PLANE-PROFILE
vrf: default
destination: 10.101.255.2
next_hop: 10.101.255.1
path_type: direct
```
"""
name = "VerifyAVTSpecificPath"
description = "Verifies the status and type of an AVT path for a specified VRF."
categories: ClassVar[list[str]] = ["avt"]
commands: ClassVar[list[AntaCommand | AntaTemplate]] = [
AntaTemplate(template="show adaptive-virtual-topology path vrf {vrf} avt {avt_name} destination {destination}")
]
class Input(AntaTest.Input):
"""Input model for the VerifyAVTSpecificPath test."""
avt_paths: list[AVTPaths]
"""List of AVT paths to verify."""
class AVTPaths(BaseModel):
"""Model for the details of AVT paths."""
vrf: str = "default"
"""The VRF for the AVT path. Defaults to 'default' if not provided."""
avt_name: str
"""Name of the adaptive virtual topology."""
destination: IPv4Address
"""The IPv4 address of the AVT peer."""
next_hop: IPv4Address
"""The IPv4 address of the next hop for the AVT peer."""
path_type: str | None = None
"""The type of the AVT path. If not provided, both 'direct' and 'multihop' paths are considered."""
def render(self, template: AntaTemplate) -> list[AntaCommand]:
"""Render the template for each input AVT path/peer."""
return [template.render(vrf=path.vrf, avt_name=path.avt_name, destination=path.destination) for path in self.inputs.avt_paths]
@skip_on_platforms(["cEOSLab", "vEOS-lab"])
@AntaTest.anta_test
def test(self) -> None:
"""Main test function for VerifyAVTSpecificPath."""
# Assume the test is successful until a failure is detected
self.result.is_success()
# Process each command in the instance
for command, input_avt in zip(self.instance_commands, self.inputs.avt_paths):
# Extract the command output and parameters
vrf = command.params.vrf
avt_name = command.params.avt_name
peer = str(command.params.destination)
command_output = command.json_output.get("vrfs", {})
# If no AVT is configured, mark the test as failed and skip to the next command
if not command_output:
self.result.is_failure(f"AVT configuration for peer '{peer}' under topology '{avt_name}' in VRF '{vrf}' is not found.")
continue
# Extract the AVT paths
avt_paths = get_value(command_output, f"{vrf}.avts.{avt_name}.avtPaths")
next_hop, input_path_type = str(input_avt.next_hop), input_avt.path_type
nexthop_path_found = path_type_found = False
# Check each AVT path
for path, path_data in avt_paths.items():
# If the path does not match the expected next hop, skip to the next path
if path_data.get("nexthopAddr") != next_hop:
continue
nexthop_path_found = True
path_type = "direct" if get_value(path_data, "flags.directPath") else "multihop"
# If the path type does not match the expected path type, skip to the next path
if input_path_type and path_type != input_path_type:
continue
path_type_found = True
valid = get_value(path_data, "flags.valid")
active = get_value(path_data, "flags.active")
# Check the path status and type against the expected values
if not all([valid, active]):
failure_reasons = []
if not get_value(path_data, "flags.active"):
failure_reasons.append("inactive")
if not get_value(path_data, "flags.valid"):
failure_reasons.append("invalid")
# Construct the failure message prefix
failed_log = f"AVT path '{path}' for topology '{avt_name}' in VRF '{vrf}'"
self.result.is_failure(f"{failed_log} is {', '.join(failure_reasons)}.")
# If no matching next hop or path type was found, mark the test as failed
if not nexthop_path_found or not path_type_found:
self.result.is_failure(
f"No '{input_path_type}' path found with next-hop address '{next_hop}' for AVT peer '{peer}' under topology '{avt_name}' in VRF '{vrf}'."
)
class VerifyAVTRole(AntaTest):
"""
Verifies the Adaptive Virtual Topology (AVT) role of a device.
Expected Results
----------------
* Success: The test will pass if the AVT role of the device matches the expected role.
* Failure: The test will fail if the AVT is not configured or if the AVT role does not match the expected role.
Examples
--------
```yaml
anta.tests.avt:
- VerifyAVTRole:
role: edge
```
"""
name = "VerifyAVTRole"
description = "Verifies the AVT role of a device."
categories: ClassVar[list[str]] = ["avt"]
commands: ClassVar[list[AntaCommand | AntaTemplate]] = [AntaCommand(command="show adaptive-virtual-topology path")]
class Input(AntaTest.Input):
"""Input model for the VerifyAVTRole test."""
role: str
"""Expected AVT role of the device."""
@skip_on_platforms(["cEOSLab", "vEOS-lab"])
@AntaTest.anta_test
def test(self) -> None:
"""Main test function for VerifyAVTRole."""
# Initialize the test result as success
self.result.is_success()
# Get the command output
command_output = self.instance_commands[0].json_output
# Check if the AVT role matches the expected role
if self.inputs.role != command_output.get("role"):
self.result.is_failure(f"Expected AVT role as `{self.inputs.role}`, but found `{command_output.get('role')}` instead.")

View file

@ -7,8 +7,10 @@
# mypy: disable-error-code=attr-defined
from __future__ import annotations
import re
from typing import TYPE_CHECKING, ClassVar
from anta.custom_types import RegexString
from anta.models import AntaCommand, AntaTest
if TYPE_CHECKING:
@ -75,3 +77,57 @@ class VerifyRunningConfigDiffs(AntaTest):
self.result.is_success()
else:
self.result.is_failure(command_output)
class VerifyRunningConfigLines(AntaTest):
"""Verifies the given regular expression patterns are present in the running-config.
!!! warning
Since this uses regular expression searches on the whole running-config, it can
drastically impact performance and should only be used if no other test is available.
If possible, try using another ANTA test that is more specific.
Expected Results
----------------
* Success: The test will pass if all the patterns are found in the running-config.
* Failure: The test will fail if any of the patterns are NOT found in the running-config.
Examples
--------
```yaml
anta.tests.configuration:
- VerifyRunningConfigLines:
regex_patterns:
- "^enable password.*$"
- "bla bla"
```
"""
name = "VerifyRunningConfigLines"
description = "Search the Running-Config for the given RegEx patterns."
categories: ClassVar[list[str]] = ["configuration"]
commands: ClassVar[list[AntaCommand | AntaTemplate]] = [AntaCommand(command="show running-config", ofmt="text")]
class Input(AntaTest.Input):
"""Input model for the VerifyRunningConfigLines test."""
regex_patterns: list[RegexString]
"""List of regular expressions."""
@AntaTest.anta_test
def test(self) -> None:
"""Main test function for VerifyRunningConfigLines."""
failure_msgs = []
command_output = self.instance_commands[0].text_output
for pattern in self.inputs.regex_patterns:
re_search = re.compile(pattern, flags=re.MULTILINE)
if not re_search.search(command_output):
failure_msgs.append(f"'{pattern}'")
if not failure_msgs:
self.result.is_success()
else:
self.result.is_failure("Following patterns were not found: " + ",".join(failure_msgs))

View file

@ -103,6 +103,11 @@ class VerifyFieldNotice44Resolution(AntaTest):
for component in command_output["details"]["components"]:
if component["name"] == "Aboot":
aboot_version = component["version"].split("-")[2]
break
else:
self.result.is_failure("Aboot component not found")
return
self.result.is_success()
incorrect_aboot_version = (
aboot_version.startswith("4.0.")
@ -192,4 +197,3 @@ class VerifyFieldNotice72Resolution(AntaTest):
return
# We should never hit this point
self.result.is_error("Error in running test - FixedSystemvrm1 not found")
return

View file

@ -14,10 +14,13 @@ from typing import Any, ClassVar, Literal
from pydantic import BaseModel, Field
from pydantic_extra_types.mac_address import MacAddress
from anta.custom_types import Interface, Percent, PositiveInteger
from anta import GITHUB_SUGGESTION
from anta.custom_types import EthernetInterface, Interface, Percent, PositiveInteger
from anta.decorators import skip_on_platforms
from anta.models import AntaCommand, AntaTemplate, AntaTest
from anta.tools import get_item, get_value
from anta.tools import custom_division, get_failed_logs, get_item, get_value
BPS_GBPS_CONVERSIONS = 1000000000
class VerifyInterfaceUtilization(AntaTest):
@ -427,7 +430,7 @@ class VerifyLoopbackCount(AntaTest):
self.result.is_failure()
if loopback_count != self.inputs.number:
self.result.is_failure(f"Found {loopback_count} Loopbacks when expecting {self.inputs.number}")
elif len(down_loopback_interfaces) != 0:
elif len(down_loopback_interfaces) != 0: # pragma: no branch
self.result.is_failure(f"The following Loopbacks are not up: {down_loopback_interfaces}")
@ -700,6 +703,11 @@ class VerifyInterfaceIPv4(AntaTest):
for interface in self.inputs.interfaces:
if interface.name == intf:
input_interface_detail = interface
break
else:
self.result.is_error(f"Could not find `{intf}` in the input interfaces. {GITHUB_SUGGESTION}")
continue
input_primary_ip = str(input_interface_detail.primary_ip)
failed_messages = []
@ -778,3 +786,100 @@ class VerifyIpVirtualRouterMac(AntaTest):
self.result.is_failure(f"IP virtual router MAC address `{self.inputs.mac_address}` is not configured.")
else:
self.result.is_success()
class VerifyInterfacesSpeed(AntaTest):
"""Verifies the speed, lanes, auto-negotiation status, and mode as full duplex for interfaces.
- If the auto-negotiation status is set to True, verifies that auto-negotiation is successful, the mode is full duplex and the speed/lanes match the input.
- If the auto-negotiation status is set to False, verifies that the mode is full duplex and the speed/lanes match the input.
Expected Results
----------------
* Success: The test will pass if an interface is configured correctly with the specified speed, lanes, auto-negotiation status, and mode as full duplex.
* Failure: The test will fail if an interface is not found, if the speed, lanes, and auto-negotiation status do not match the input, or mode is not full duplex.
Examples
--------
```yaml
anta.tests.interfaces:
- VerifyInterfacesSpeed:
interfaces:
- name: Ethernet2
auto: False
speed: 10
- name: Eth3
auto: True
speed: 100
lanes: 1
- name: Eth2
auto: False
speed: 2.5
```
"""
name = "VerifyInterfacesSpeed"
description = "Verifies the speed, lanes, auto-negotiation status, and mode as full duplex for interfaces."
categories: ClassVar[list[str]] = ["interfaces"]
commands: ClassVar[list[AntaCommand | AntaTemplate]] = [AntaCommand(command="show interfaces")]
class Input(AntaTest.Input):
"""Inputs for the VerifyInterfacesSpeed test."""
interfaces: list[InterfaceDetail]
"""List of interfaces to be tested"""
class InterfaceDetail(BaseModel):
"""Detail of an interface."""
name: EthernetInterface
"""The name of the interface."""
auto: bool
"""The auto-negotiation status of the interface."""
speed: float = Field(ge=1, le=1000)
"""The speed of the interface in Gigabits per second. Valid range is 1 to 1000."""
lanes: None | int = Field(None, ge=1, le=8)
"""The number of lanes in the interface. Valid range is 1 to 8. This field is optional."""
@AntaTest.anta_test
def test(self) -> None:
"""Main test function for VerifyInterfacesSpeed."""
self.result.is_success()
command_output = self.instance_commands[0].json_output
# Iterate over all the interfaces
for interface in self.inputs.interfaces:
intf = interface.name
# Check if interface exists
if not (interface_output := get_value(command_output, f"interfaces.{intf}")):
self.result.is_failure(f"Interface `{intf}` is not found.")
continue
auto_negotiation = interface_output.get("autoNegotiate")
actual_lanes = interface_output.get("lanes")
# Collecting actual interface details
actual_interface_output = {
"auto negotiation": auto_negotiation if interface.auto is True else None,
"duplex mode": interface_output.get("duplex"),
"speed": interface_output.get("bandwidth"),
"lanes": actual_lanes if interface.lanes is not None else None,
}
# Forming expected interface details
expected_interface_output = {
"auto negotiation": "success" if interface.auto is True else None,
"duplex mode": "duplexFull",
"speed": interface.speed * BPS_GBPS_CONVERSIONS,
"lanes": interface.lanes,
}
# Forming failure message
if actual_interface_output != expected_interface_output:
for output in [actual_interface_output, expected_interface_output]:
# Convert speed to Gbps for readability
if output["speed"] is not None:
output["speed"] = f"{custom_division(output['speed'], BPS_GBPS_CONVERSIONS)}Gbps"
failed_log = get_failed_logs(expected_interface_output, actual_interface_output)
self.result.is_failure(f"For interface {intf}:{failed_log}\n")

View file

@ -0,0 +1,165 @@
# Copyright (c) 2023-2024 Arista Networks, Inc.
# Use of this source code is governed by the Apache License 2.0
# that can be found in the LICENSE file.
"""Test functions related to various router path-selection settings."""
# Mypy does not understand AntaTest.Input typing
# mypy: disable-error-code=attr-defined
from __future__ import annotations
from ipaddress import IPv4Address
from typing import ClassVar
from pydantic import BaseModel
from anta.decorators import skip_on_platforms
from anta.models import AntaCommand, AntaTemplate, AntaTest
from anta.tools import get_value
class VerifyPathsHealth(AntaTest):
"""
Verifies the path and telemetry state of all paths under router path-selection.
The expected states are 'IPsec established', 'Resolved' for path and 'active' for telemetry.
Expected Results
----------------
* Success: The test will pass if all path states under router path-selection are either 'IPsec established' or 'Resolved'
and their telemetry state as 'active'.
* Failure: The test will fail if router path-selection is not configured or if any path state is not 'IPsec established' or 'Resolved',
or the telemetry state is 'inactive'.
Examples
--------
```yaml
anta.tests.path_selection:
- VerifyPathsHealth:
```
"""
name = "VerifyPathsHealth"
description = "Verifies the path and telemetry state of all paths under router path-selection."
categories: ClassVar[list[str]] = ["path-selection"]
commands: ClassVar[list[AntaCommand | AntaTemplate]] = [AntaCommand(command="show path-selection paths", revision=1)]
@skip_on_platforms(["cEOSLab", "vEOS-lab"])
@AntaTest.anta_test
def test(self) -> None:
"""Main test function for VerifyPathsHealth."""
self.result.is_success()
command_output = self.instance_commands[0].json_output["dpsPeers"]
# If no paths are configured for router path-selection, the test fails
if not command_output:
self.result.is_failure("No path configured for router path-selection.")
return
# Check the state of each path
for peer, peer_data in command_output.items():
for group, group_data in peer_data["dpsGroups"].items():
for path_data in group_data["dpsPaths"].values():
path_state = path_data["state"]
session = path_data["dpsSessions"]["0"]["active"]
# If the path state of any path is not 'ipsecEstablished' or 'routeResolved', the test fails
if path_state not in ["ipsecEstablished", "routeResolved"]:
self.result.is_failure(f"Path state for peer {peer} in path-group {group} is `{path_state}`.")
# If the telemetry state of any path is inactive, the test fails
elif not session:
self.result.is_failure(f"Telemetry state for peer {peer} in path-group {group} is `inactive`.")
class VerifySpecificPath(AntaTest):
"""
Verifies the path and telemetry state of a specific path for an IPv4 peer under router path-selection.
The expected states are 'IPsec established', 'Resolved' for path and 'active' for telemetry.
Expected Results
----------------
* Success: The test will pass if the path state under router path-selection is either 'IPsec established' or 'Resolved'
and telemetry state as 'active'.
* Failure: The test will fail if router path-selection is not configured or if the path state is not 'IPsec established' or 'Resolved',
or if the telemetry state is 'inactive'.
Examples
--------
```yaml
anta.tests.path_selection:
- VerifySpecificPath:
paths:
- peer: 10.255.0.1
path_group: internet
source_address: 100.64.3.2
destination_address: 100.64.1.2
```
"""
name = "VerifySpecificPath"
description = "Verifies the path and telemetry state of a specific path under router path-selection."
categories: ClassVar[list[str]] = ["path-selection"]
commands: ClassVar[list[AntaCommand | AntaTemplate]] = [
AntaTemplate(template="show path-selection paths peer {peer} path-group {group} source {source} destination {destination}", revision=1)
]
class Input(AntaTest.Input):
"""Input model for the VerifySpecificPath test."""
paths: list[RouterPath]
"""List of router paths to verify."""
class RouterPath(BaseModel):
"""Detail of a router path."""
peer: IPv4Address
"""Static peer IPv4 address."""
path_group: str
"""Router path group name."""
source_address: IPv4Address
"""Source IPv4 address of path."""
destination_address: IPv4Address
"""Destination IPv4 address of path."""
def render(self, template: AntaTemplate) -> list[AntaCommand]:
"""Render the template for each router path."""
return [
template.render(peer=path.peer, group=path.path_group, source=path.source_address, destination=path.destination_address) for path in self.inputs.paths
]
@skip_on_platforms(["cEOSLab", "vEOS-lab"])
@AntaTest.anta_test
def test(self) -> None:
"""Main test function for VerifySpecificPath."""
self.result.is_success()
# Check the state of each path
for command in self.instance_commands:
peer = command.params.peer
path_group = command.params.group
source = command.params.source
destination = command.params.destination
command_output = command.json_output.get("dpsPeers", [])
# If the peer is not configured for the path group, the test fails
if not command_output:
self.result.is_failure(f"Path `peer: {peer} source: {source} destination: {destination}` is not configured for path-group `{path_group}`.")
continue
# Extract the state of the path
path_output = get_value(command_output, f"{peer}..dpsGroups..{path_group}..dpsPaths", separator="..")
path_state = next(iter(path_output.values())).get("state")
session = get_value(next(iter(path_output.values())), "dpsSessions.0.active")
# If the state of the path is not 'ipsecEstablished' or 'routeResolved', or the telemetry state is 'inactive', the test fails
if path_state not in ["ipsecEstablished", "routeResolved"]:
self.result.is_failure(f"Path state for `peer: {peer} source: {source} destination: {destination}` in path-group {path_group} is `{path_state}`.")
elif not session:
self.result.is_failure(
f"Telemetry state for path `peer: {peer} source: {source} destination: {destination}` in path-group {path_group} is `inactive`."
)

View file

@ -23,7 +23,7 @@ class VerifyPtpModeStatus(AntaTest):
----------------
* Success: The test will pass if the device is a BC.
* Failure: The test will fail if the device is not a BC.
* Error: The test will error if the 'ptpMode' variable is not present in the command output.
* Skipped: The test will be skipped if PTP is not configured on the device.
Examples
--------
@ -45,7 +45,7 @@ class VerifyPtpModeStatus(AntaTest):
command_output = self.instance_commands[0].json_output
if (ptp_mode := command_output.get("ptpMode")) is None:
self.result.is_error("'ptpMode' variable is not present in the command output")
self.result.is_skipped("PTP is not configured")
return
if ptp_mode != "ptpBoundaryClock":
@ -63,7 +63,7 @@ class VerifyPtpGMStatus(AntaTest):
----------------
* Success: The test will pass if the device is locked to the provided Grandmaster.
* Failure: The test will fail if the device is not locked to the provided Grandmaster.
* Error: The test will error if the 'gmClockIdentity' variable is not present in the command output.
* Skipped: The test will be skipped if PTP is not configured on the device.
Examples
--------
@ -92,7 +92,7 @@ class VerifyPtpGMStatus(AntaTest):
command_output = self.instance_commands[0].json_output
if (ptp_clock_summary := command_output.get("ptpClockSummary")) is None:
self.result.is_error("'ptpClockSummary' variable is not present in the command output")
self.result.is_skipped("PTP is not configured")
return
if ptp_clock_summary["gmClockIdentity"] != self.inputs.gmid:
@ -110,7 +110,7 @@ class VerifyPtpLockStatus(AntaTest):
----------------
* Success: The test will pass if the device was locked to the upstream GM in the last minute.
* Failure: The test will fail if the device was not locked to the upstream GM in the last minute.
* Error: The test will error if the 'lastSyncTime' variable is not present in the command output.
* Skipped: The test will be skipped if PTP is not configured on the device.
Examples
--------
@ -133,7 +133,7 @@ class VerifyPtpLockStatus(AntaTest):
command_output = self.instance_commands[0].json_output
if (ptp_clock_summary := command_output.get("ptpClockSummary")) is None:
self.result.is_error("'ptpClockSummary' variable is not present in the command output")
self.result.is_skipped("PTP is not configured")
return
time_difference = ptp_clock_summary["currentPtpSystemTime"] - ptp_clock_summary["lastSyncTime"]
@ -151,7 +151,7 @@ class VerifyPtpOffset(AntaTest):
----------------
* Success: The test will pass if the PTP timing offset is within +/- 1000ns from the master clock.
* Failure: The test will fail if the PTP timing offset is greater than +/- 1000ns from the master clock.
* Skipped: The test will be skipped if PTP is not configured.
* Skipped: The test will be skipped if PTP is not configured on the device.
Examples
--------

View file

@ -262,8 +262,8 @@ class VerifyBGPPeerCount(AntaTest):
command_output = command.json_output
afi = command.params.afi
safi = command.params.safi
afi_vrf = command.params.vrf or "default"
safi = command.params.safi if hasattr(command.params, "safi") else None
afi_vrf = command.params.vrf if hasattr(command.params, "vrf") else "default"
# Swapping AFI and SAFI in case of SR-TE
if afi == "sr-te":
@ -400,12 +400,12 @@ class VerifyBGPPeersHealth(AntaTest):
command_output = command.json_output
afi = command.params.afi
safi = command.params.safi
safi = command.params.safi if hasattr(command.params, "safi") else None
afi_vrf = command.params.vrf if hasattr(command.params, "vrf") else "default"
# Swapping AFI and SAFI in case of SR-TE
if afi == "sr-te":
afi, safi = safi, afi
afi_vrf = command.params.vrf or "default"
if not (vrfs := command_output.get("vrfs")):
_add_bgp_failures(failures=failures, afi=afi, safi=safi, vrf=afi_vrf, issue="Not Configured")
@ -551,8 +551,8 @@ class VerifyBGPSpecificPeers(AntaTest):
command_output = command.json_output
afi = command.params.afi
safi = command.params.safi
afi_vrf = command.params.vrf or "default"
safi = command.params.safi if hasattr(command.params, "safi") else None
afi_vrf = command.params.vrf if hasattr(command.params, "vrf") else "default"
# Swapping AFI and SAFI in case of SR-TE
if afi == "sr-te":

308
anta/tests/routing/isis.py Normal file
View file

@ -0,0 +1,308 @@
# Copyright (c) 2023-2024 Arista Networks, Inc.
# Use of this source code is governed by the Apache License 2.0
# that can be found in the LICENSE file.
"""Module related to IS-IS tests."""
# Mypy does not understand AntaTest.Input typing
# mypy: disable-error-code=attr-defined
from __future__ import annotations
from typing import Any, ClassVar, Literal
from pydantic import BaseModel
from anta.custom_types import Interface
from anta.models import AntaCommand, AntaTemplate, AntaTest
from anta.tools import get_value
def _count_isis_neighbor(isis_neighbor_json: dict[str, Any]) -> int:
"""Count the number of isis neighbors.
Args
----
isis_neighbor_json: The JSON output of the `show isis neighbors` command.
Returns
-------
int: The number of isis neighbors.
"""
count = 0
for vrf_data in isis_neighbor_json["vrfs"].values():
for instance_data in vrf_data["isisInstances"].values():
count += len(instance_data.get("neighbors", {}))
return count
def _get_not_full_isis_neighbors(isis_neighbor_json: dict[str, Any]) -> list[dict[str, Any]]:
"""Return the isis neighbors whose adjacency state is not `up`.
Args
----
isis_neighbor_json: The JSON output of the `show isis neighbors` command.
Returns
-------
list[dict[str, Any]]: A list of isis neighbors whose adjacency state is not `UP`.
"""
return [
{
"vrf": vrf,
"instance": instance,
"neighbor": adjacency["hostname"],
"state": state,
}
for vrf, vrf_data in isis_neighbor_json["vrfs"].items()
for instance, instance_data in vrf_data.get("isisInstances").items()
for neighbor, neighbor_data in instance_data.get("neighbors").items()
for adjacency in neighbor_data.get("adjacencies")
if (state := adjacency["state"]) != "up"
]
def _get_full_isis_neighbors(isis_neighbor_json: dict[str, Any], neighbor_state: Literal["up", "down"] = "up") -> list[dict[str, Any]]:
"""Return the isis neighbors whose adjacency state is `up`.
Args
----
isis_neighbor_json: The JSON output of the `show isis neighbors` command.
neighbor_state: Value of the neihbor state we are looking for. Default up
Returns
-------
list[dict[str, Any]]: A list of isis neighbors whose adjacency state is not `UP`.
"""
return [
{
"vrf": vrf,
"instance": instance,
"neighbor": adjacency["hostname"],
"neighbor_address": adjacency["routerIdV4"],
"interface": adjacency["interfaceName"],
"state": state,
}
for vrf, vrf_data in isis_neighbor_json["vrfs"].items()
for instance, instance_data in vrf_data.get("isisInstances").items()
for neighbor, neighbor_data in instance_data.get("neighbors").items()
for adjacency in neighbor_data.get("adjacencies")
if (state := adjacency["state"]) == neighbor_state
]
def _get_isis_neighbors_count(isis_neighbor_json: dict[str, Any]) -> list[dict[str, Any]]:
"""Count number of IS-IS neighbor of the device."""
return [
{"vrf": vrf, "interface": interface, "mode": mode, "count": int(level_data["numAdjacencies"]), "level": int(level)}
for vrf, vrf_data in isis_neighbor_json["vrfs"].items()
for instance, instance_data in vrf_data.get("isisInstances").items()
for interface, interface_data in instance_data.get("interfaces").items()
for level, level_data in interface_data.get("intfLevels").items()
if (mode := level_data["passive"]) is not True
]
def _get_interface_data(interface: str, vrf: str, command_output: dict[str, Any]) -> dict[str, Any] | None:
"""Extract data related to an IS-IS interface for testing."""
if (vrf_data := get_value(command_output, f"vrfs.{vrf}")) is None:
return None
for instance_data in vrf_data.get("isisInstances").values():
if (intf_dict := get_value(dictionary=instance_data, key="interfaces")) is not None:
try:
return next(ifl_data for ifl, ifl_data in intf_dict.items() if ifl == interface)
except StopIteration:
return None
return None
class VerifyISISNeighborState(AntaTest):
"""Verifies all IS-IS neighbors are in UP state.
Expected Results
----------------
* Success: The test will pass if all IS-IS neighbors are in UP state.
* Failure: The test will fail if some IS-IS neighbors are not in UP state.
* Skipped: The test will be skipped if no IS-IS neighbor is found.
Examples
--------
```yaml
anta.tests.routing:
isis:
- VerifyISISNeighborState:
```
"""
name = "VerifyISISNeighborState"
description = "Verifies all IS-IS neighbors are in UP state."
categories: ClassVar[list[str]] = ["isis"]
commands: ClassVar[list[AntaCommand | AntaTemplate]] = [AntaCommand(command="show isis neighbors", revision=1)]
@AntaTest.anta_test
def test(self) -> None:
"""Main test function for VerifyISISNeighborState."""
command_output = self.instance_commands[0].json_output
if _count_isis_neighbor(command_output) == 0:
self.result.is_skipped("No IS-IS neighbor detected")
return
self.result.is_success()
not_full_neighbors = _get_not_full_isis_neighbors(command_output)
if not_full_neighbors:
self.result.is_failure(f"Some neighbors are not in the correct state (UP): {not_full_neighbors}.")
class VerifyISISNeighborCount(AntaTest):
"""Verifies number of IS-IS neighbors per level and per interface.
Expected Results
----------------
* Success: The test will pass if the number of neighbors is correct.
* Failure: The test will fail if the number of neighbors is incorrect.
* Skipped: The test will be skipped if no IS-IS neighbor is found.
Examples
--------
```yaml
anta.tests.routing:
isis:
- VerifyISISNeighborCount:
interfaces:
- name: Ethernet1
level: 1
count: 2
- name: Ethernet2
level: 2
count: 1
- name: Ethernet3
count: 2
# level is set to 2 by default
```
"""
name = "VerifyISISNeighborCount"
description = "Verifies count of IS-IS interface per level"
categories: ClassVar[list[str]] = ["isis"]
commands: ClassVar[list[AntaCommand | AntaTemplate]] = [AntaCommand(command="show isis interface brief", revision=1)]
class Input(AntaTest.Input):
"""Input model for the VerifyISISNeighborCount test."""
interfaces: list[InterfaceCount]
"""list of interfaces with their information."""
class InterfaceCount(BaseModel):
"""Input model for the VerifyISISNeighborCount test."""
name: Interface
"""Interface name to check."""
level: int = 2
"""IS-IS level to check."""
count: int
"""Number of IS-IS neighbors."""
@AntaTest.anta_test
def test(self) -> None:
"""Main test function for VerifyISISNeighborCount."""
command_output = self.instance_commands[0].json_output
self.result.is_success()
isis_neighbor_count = _get_isis_neighbors_count(command_output)
if len(isis_neighbor_count) == 0:
self.result.is_skipped("No IS-IS neighbor detected")
for interface in self.inputs.interfaces:
eos_data = [ifl_data for ifl_data in isis_neighbor_count if ifl_data["interface"] == interface.name and ifl_data["level"] == interface.level]
if not eos_data:
self.result.is_failure(f"No neighbor detected for interface {interface.name}")
return
if eos_data[0]["count"] != interface.count:
self.result.is_failure(
f"Interface {interface.name}:"
f"expected Level {interface.level}: count {interface.count}, "
f"got Level {eos_data[0]['level']}: count {eos_data[0]['count']}"
)
class VerifyISISInterfaceMode(AntaTest):
"""Verifies ISIS Interfaces are running in correct mode.
Expected Results
----------------
* Success: The test will pass if all listed interfaces are running in correct mode.
* Failure: The test will fail if any of the listed interfaces is not running in correct mode.
* Skipped: The test will be skipped if no ISIS neighbor is found.
Examples
--------
```yaml
anta.tests.routing:
isis:
- VerifyISISInterfaceMode:
interfaces:
- name: Loopback0
mode: passive
# vrf is set to default by default
- name: Ethernet2
mode: passive
level: 2
# vrf is set to default by default
- name: Ethernet1
mode: point-to-point
vrf: default
# level is set to 2 by default
```
"""
name = "VerifyISISInterfaceMode"
description = "Verifies interface mode for IS-IS"
categories: ClassVar[list[str]] = ["isis"]
commands: ClassVar[list[AntaCommand | AntaTemplate]] = [AntaCommand(command="show isis interface brief", revision=1)]
class Input(AntaTest.Input):
"""Input model for the VerifyISISNeighborCount test."""
interfaces: list[InterfaceState]
"""list of interfaces with their information."""
class InterfaceState(BaseModel):
"""Input model for the VerifyISISNeighborCount test."""
name: Interface
"""Interface name to check."""
level: Literal[1, 2] = 2
"""ISIS level configured for interface. Default is 2."""
mode: Literal["point-to-point", "broadcast", "passive"]
"""Number of IS-IS neighbors."""
vrf: str = "default"
"""VRF where the interface should be configured"""
@AntaTest.anta_test
def test(self) -> None:
"""Main test function for VerifyISISInterfaceMode."""
command_output = self.instance_commands[0].json_output
self.result.is_success()
if len(command_output["vrfs"]) == 0:
self.result.is_failure("IS-IS is not configured on device")
# Check for p2p interfaces
for interface in self.inputs.interfaces:
interface_data = _get_interface_data(
interface=interface.name,
vrf=interface.vrf,
command_output=command_output,
)
# Check for correct VRF
if interface_data is not None:
interface_type = get_value(dictionary=interface_data, key="interfaceType", default="unset")
# Check for interfaceType
if interface.mode == "point-to-point" and interface.mode != interface_type:
self.result.is_failure(f"Interface {interface.name} in VRF {interface.vrf} is not running in {interface.mode} reporting {interface_type}")
# Check for passive
elif interface.mode == "passive":
json_path = f"intfLevels.{interface.level}.passive"
if interface_data is None or get_value(dictionary=interface_data, key=json_path, default=False) is False:
self.result.is_failure(f"Interface {interface.name} in VRF {interface.vrf} is not running in passive mode")
else:
self.result.is_failure(f"Interface {interface.name} not found in VRF {interface.vrf}")

View file

@ -44,7 +44,11 @@ class VerifySSHStatus(AntaTest):
"""Main test function for VerifySSHStatus."""
command_output = self.instance_commands[0].text_output
line = next(line for line in command_output.split("\n") if line.startswith("SSHD status"))
try:
line = next(line for line in command_output.split("\n") if line.startswith("SSHD status"))
except StopIteration:
self.result.is_error("Could not find SSH status in returned output.")
return
status = line.split("is ")[1]
if status == "disabled":

View file

@ -188,6 +188,7 @@ class VerifySTPForwardingPorts(AntaTest):
if not (topologies := get_value(command.json_output, "topologies")):
not_configured.append(vlan_id)
else:
interfaces_not_forwarding = []
for value in topologies.values():
if vlan_id and int(vlan_id) in value["vlans"]:
interfaces_not_forwarding = [interface for interface, state in value["interfaces"].items() if state["state"] != "forwarding"]

View file

@ -5,7 +5,26 @@
from __future__ import annotations
from typing import Any
import cProfile
import os
import pstats
from functools import wraps
from time import perf_counter
from typing import TYPE_CHECKING, Any, Callable, TypeVar, cast
from anta.logger import format_td
if TYPE_CHECKING:
import sys
from logging import Logger
from types import TracebackType
if sys.version_info >= (3, 11):
from typing import Self
else:
from typing_extensions import Self
F = TypeVar("F", bound=Callable[..., Any])
def get_failed_logs(expected_output: dict[Any, Any], actual_output: dict[Any, Any]) -> str:
@ -28,14 +47,35 @@ def get_failed_logs(expected_output: dict[Any, Any], actual_output: dict[Any, An
for element, expected_data in expected_output.items():
actual_data = actual_output.get(element)
if actual_data == expected_data:
continue
if actual_data is None:
failed_logs.append(f"\nExpected `{expected_data}` as the {element}, but it was not found in the actual output.")
elif actual_data != expected_data:
failed_logs.append(f"\nExpected `{expected_data}` as the {element}, but found `{actual_data}` instead.")
continue
# actual_data != expected_data: and actual_data is not None
failed_logs.append(f"\nExpected `{expected_data}` as the {element}, but found `{actual_data}` instead.")
return "".join(failed_logs)
def custom_division(numerator: float, denominator: float) -> int | float:
"""Get the custom division of numbers.
Custom division that returns an integer if the result is an integer, otherwise a float.
Parameters
----------
numerator: The numerator.
denominator: The denominator.
Returns
-------
Union[int, float]: The result of the division.
"""
result = numerator / denominator
return int(result) if result.is_integer() else result
# pylint: disable=too-many-arguments
def get_dict_superset(
list_of_dicts: list[dict[Any, Any]],
@ -228,3 +268,81 @@ def get_item(
if required is True:
raise ValueError(custom_error_msg or var_name)
return default
class Catchtime:
"""A class working as a context to capture time differences."""
start: float
raw_time: float
time: str
def __init__(self, logger: Logger | None = None, message: str | None = None) -> None:
self.logger = logger
self.message = message
def __enter__(self) -> Self:
"""__enter__ method."""
self.start = perf_counter()
if self.logger and self.message:
self.logger.info("%s ...", self.message)
return self
def __exit__(self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None) -> None:
"""__exit__ method."""
self.raw_time = perf_counter() - self.start
self.time = format_td(self.raw_time, 3)
if self.logger and self.message:
self.logger.info("%s completed in: %s.", self.message, self.time)
def cprofile(sort_by: str = "cumtime") -> Callable[[F], F]:
"""Profile a function with cProfile.
profile is conditionally enabled based on the presence of ANTA_CPROFILE environment variable.
Expect to decorate an async function.
Args:
----
sort_by (str): The criterion to sort the profiling results. Default is 'cumtime'.
Returns
-------
Callable: The decorated function with conditional profiling.
"""
def decorator(func: F) -> F:
@wraps(func)
async def wrapper(*args: Any, **kwargs: Any) -> Any:
"""Enable cProfile or not.
If `ANTA_CPROFILE` is set, cProfile is enabled and dumps the stats to the file.
Args:
----
*args: Arbitrary positional arguments.
**kwargs: Arbitrary keyword arguments.
Returns
-------
The result of the function call.
"""
cprofile_file = os.environ.get("ANTA_CPROFILE")
if cprofile_file is not None:
profiler = cProfile.Profile()
profiler.enable()
try:
result = await func(*args, **kwargs)
finally:
if cprofile_file is not None:
profiler.disable()
stats = pstats.Stats(profiler).sort_stats(sort_by)
stats.dump_stats(cprofile_file)
return result
return cast(F, wrapper)
return decorator