anta/anta/cli/exec/utils.py
Daniel Baumann ecf5ca3300
Adding upstream version 0.13.0.
Signed-off-by: Daniel Baumann <daniel@debian.org>
2025-02-05 11:32:40 +01:00

161 lines
7.2 KiB
Python

# Copyright (c) 2023-2024 Arista Networks, Inc.
# Use of this source code is governed by the Apache License 2.0
# that can be found in the LICENSE file.
"""
Exec CLI helpers
"""
from __future__ import annotations
import asyncio
import itertools
import json
import logging
import re
from pathlib import Path
from typing import Literal
from aioeapi import EapiCommandError
from httpx import ConnectError, HTTPError
from anta.device import AntaDevice, AsyncEOSDevice
from anta.inventory import AntaInventory
from anta.models import AntaCommand
EOS_SCHEDULED_TECH_SUPPORT = "/mnt/flash/schedule/tech-support"
INVALID_CHAR = "`~!@#$/"
logger = logging.getLogger(__name__)
async def clear_counters_utils(anta_inventory: AntaInventory, tags: list[str] | None = None) -> None:
"""
Clear counters
"""
async def clear(dev: AntaDevice) -> None:
commands = [AntaCommand(command="clear counters")]
if dev.hw_model not in ["cEOSLab", "vEOS-lab"]:
commands.append(AntaCommand(command="clear hardware counter drop"))
await dev.collect_commands(commands=commands)
for command in commands:
if not command.collected:
logger.error(f"Could not clear counters on device {dev.name}: {command.errors}")
logger.info(f"Cleared counters on {dev.name} ({dev.hw_model})")
logger.info("Connecting to devices...")
await anta_inventory.connect_inventory()
devices = anta_inventory.get_inventory(established_only=True, tags=tags).values()
logger.info("Clearing counters on remote devices...")
await asyncio.gather(*(clear(device) for device in devices))
async def collect_commands(
inv: AntaInventory,
commands: dict[str, str],
root_dir: Path,
tags: list[str] | None = None,
) -> None:
"""
Collect EOS commands
"""
async def collect(dev: AntaDevice, command: str, outformat: Literal["json", "text"]) -> None:
outdir = Path() / root_dir / dev.name / outformat
outdir.mkdir(parents=True, exist_ok=True)
safe_command = re.sub(r"(/|\|$)", "_", command)
c = AntaCommand(command=command, ofmt=outformat)
await dev.collect(c)
if not c.collected:
logger.error(f"Could not collect commands on device {dev.name}: {c.errors}")
return
if c.ofmt == "json":
outfile = outdir / f"{safe_command}.json"
content = json.dumps(c.json_output, indent=2)
elif c.ofmt == "text":
outfile = outdir / f"{safe_command}.log"
content = c.text_output
with outfile.open(mode="w", encoding="UTF-8") as f:
f.write(content)
logger.info(f"Collected command '{command}' from device {dev.name} ({dev.hw_model})")
logger.info("Connecting to devices...")
await inv.connect_inventory()
devices = inv.get_inventory(established_only=True, tags=tags).values()
logger.info("Collecting commands from remote devices")
coros = []
if "json_format" in commands:
coros += [collect(device, command, "json") for device, command in itertools.product(devices, commands["json_format"])]
if "text_format" in commands:
coros += [collect(device, command, "text") for device, command in itertools.product(devices, commands["text_format"])]
res = await asyncio.gather(*coros, return_exceptions=True)
for r in res:
if isinstance(r, Exception):
logger.error(f"Error when collecting commands: {str(r)}")
async def collect_scheduled_show_tech(inv: AntaInventory, root_dir: Path, configure: bool, tags: list[str] | None = None, latest: int | None = None) -> None:
"""
Collect scheduled show-tech on devices
"""
async def collect(device: AntaDevice) -> None:
"""
Collect all the tech-support files stored on Arista switches flash and copy them locally
"""
try:
# Get the tech-support filename to retrieve
cmd = f"bash timeout 10 ls -1t {EOS_SCHEDULED_TECH_SUPPORT}"
if latest:
cmd += f" | head -{latest}"
command = AntaCommand(command=cmd, ofmt="text")
await device.collect(command=command)
if command.collected and command.text_output:
filenames = list(map(lambda f: Path(f"{EOS_SCHEDULED_TECH_SUPPORT}/{f}"), command.text_output.splitlines()))
else:
logger.error(f"Unable to get tech-support filenames on {device.name}: verify that {EOS_SCHEDULED_TECH_SUPPORT} is not empty")
return
# Create directories
outdir = Path() / root_dir / f"{device.name.lower()}"
outdir.mkdir(parents=True, exist_ok=True)
# Check if 'aaa authorization exec default local' is present in the running-config
command = AntaCommand(command="show running-config | include aaa authorization exec default", ofmt="text")
await device.collect(command=command)
if command.collected and not command.text_output:
logger.debug(f"'aaa authorization exec default local' is not configured on device {device.name}")
if configure:
# Otherwise mypy complains about enable
assert isinstance(device, AsyncEOSDevice)
# TODO - @mtache - add `config` field to `AntaCommand` object to handle this use case.
commands = []
if device.enable and device._enable_password is not None: # pylint: disable=protected-access
commands.append({"cmd": "enable", "input": device._enable_password}) # pylint: disable=protected-access
elif device.enable:
commands.append({"cmd": "enable"})
commands.extend(
[
{"cmd": "configure terminal"},
{"cmd": "aaa authorization exec default local"},
]
)
logger.warning(f"Configuring 'aaa authorization exec default local' on device {device.name}")
command = AntaCommand(command="show running-config | include aaa authorization exec default local", ofmt="text")
await device._session.cli(commands=commands) # pylint: disable=protected-access
logger.info(f"Configured 'aaa authorization exec default local' on device {device.name}")
else:
logger.error(f"Unable to collect tech-support on {device.name}: configuration 'aaa authorization exec default local' is not present")
return
logger.debug(f"'aaa authorization exec default local' is already configured on device {device.name}")
await device.copy(sources=filenames, destination=outdir, direction="from")
logger.info(f"Collected {len(filenames)} scheduled tech-support from {device.name}")
except (EapiCommandError, HTTPError, ConnectError) as e:
logger.error(f"Unable to collect tech-support on {device.name}: {str(e)}")
logger.info("Connecting to devices...")
await inv.connect_inventory()
devices = inv.get_inventory(established_only=True, tags=tags).values()
await asyncio.gather(*(collect(device) for device in devices))