Merging upstream version 1.1.0.

Signed-off-by: Daniel Baumann <daniel@debian.org>
This commit is contained in:
Daniel Baumann 2025-02-05 11:54:55 +01:00
parent 50f8dbf7e8
commit 2044ea6182
Signed by: daniel
GPG key ID: FBB4F0E80A80222F
196 changed files with 10121 additions and 3780 deletions

View file

@ -6,15 +6,16 @@
from __future__ import annotations
import json
import re
from contextlib import AbstractContextManager, nullcontext
from typing import TYPE_CHECKING, Callable
import pytest
from anta.result_manager import ResultManager, models
from anta.result_manager.models import AntaTestStatus
if TYPE_CHECKING:
from anta.custom_types import TestStatus
from anta.result_manager.models import TestResult
@ -55,7 +56,7 @@ class TestResultManager:
success_list = list_result_factory(3)
for test in success_list:
test.result = "success"
test.result = AntaTestStatus.SUCCESS
result_manager.results = success_list
json_res = result_manager.json
@ -71,6 +72,27 @@ class TestResultManager:
assert test.get("custom_field") is None
assert test.get("result") == "success"
def test_sorted_category_stats(self, list_result_factory: Callable[[int], list[TestResult]]) -> None:
"""Test ResultManager.sorted_category_stats."""
result_manager = ResultManager()
results = list_result_factory(4)
# Modify the categories to have a mix of different acronym categories
results[0].categories = ["ospf"]
results[1].categories = ["bgp"]
results[2].categories = ["vxlan"]
results[3].categories = ["system"]
result_manager.results = results
# Check the current categories order
expected_order = ["ospf", "bgp", "vxlan", "system"]
assert list(result_manager.category_stats.keys()) == expected_order
# Check the sorted categories order
expected_order = ["bgp", "ospf", "system", "vxlan"]
assert list(result_manager.sorted_category_stats.keys()) == expected_order
@pytest.mark.parametrize(
("starting_status", "test_status", "expected_status", "expected_raise"),
[
@ -119,29 +141,26 @@ class TestResultManager:
nullcontext(),
id="failure, add success",
),
pytest.param(
"unset", "unknown", None, pytest.raises(ValueError, match="Input should be 'unset', 'success', 'failure', 'error' or 'skipped'"), id="wrong status"
),
pytest.param("unset", "unknown", None, pytest.raises(ValueError, match="'unknown' is not a valid AntaTestStatus"), id="wrong status"),
],
)
def test_add(
self,
test_result_factory: Callable[[], TestResult],
starting_status: TestStatus,
test_status: TestStatus,
starting_status: str,
test_status: str,
expected_status: str,
expected_raise: AbstractContextManager[Exception],
) -> None:
# pylint: disable=too-many-arguments
"""Test ResultManager_update_status."""
result_manager = ResultManager()
result_manager.status = starting_status
result_manager.status = AntaTestStatus(starting_status)
assert result_manager.error_status is False
assert len(result_manager) == 0
test = test_result_factory()
test.result = test_status
with expected_raise:
test.result = AntaTestStatus(test_status)
result_manager.add(test)
if test_status == "error":
assert result_manager.error_status is True
@ -149,6 +168,91 @@ class TestResultManager:
assert result_manager.status == expected_status
assert len(result_manager) == 1
def test_add_clear_cache(self, result_manager: ResultManager, test_result_factory: Callable[[], TestResult]) -> None:
"""Test ResultManager.add and make sure the cache is reset after adding a new test."""
# Check the cache is empty
assert "results_by_status" not in result_manager.__dict__
# Access the cache
assert result_manager.get_total_results() == 30
# Check the cache is filled with the correct results count
assert "results_by_status" in result_manager.__dict__
assert sum(len(v) for v in result_manager.__dict__["results_by_status"].values()) == 30
# Add a new test
result_manager.add(result=test_result_factory())
# Check the cache has been reset
assert "results_by_status" not in result_manager.__dict__
# Access the cache again
assert result_manager.get_total_results() == 31
# Check the cache is filled again with the correct results count
assert "results_by_status" in result_manager.__dict__
assert sum(len(v) for v in result_manager.__dict__["results_by_status"].values()) == 31
def test_get_results(self, result_manager: ResultManager) -> None:
"""Test ResultManager.get_results."""
# Check for single status
success_results = result_manager.get_results(status={AntaTestStatus.SUCCESS})
assert len(success_results) == 7
assert all(r.result == "success" for r in success_results)
# Check for multiple statuses
failure_results = result_manager.get_results(status={AntaTestStatus.FAILURE, AntaTestStatus.ERROR})
assert len(failure_results) == 21
assert all(r.result in {"failure", "error"} for r in failure_results)
# Check all results
all_results = result_manager.get_results()
assert len(all_results) == 30
def test_get_results_sort_by(self, result_manager: ResultManager) -> None:
"""Test ResultManager.get_results with sort_by."""
# Check all results with sort_by result
all_results = result_manager.get_results(sort_by=["result"])
assert len(all_results) == 30
assert [r.result for r in all_results] == ["error"] * 2 + ["failure"] * 19 + ["skipped"] * 2 + ["success"] * 7
# Check all results with sort_by device (name)
all_results = result_manager.get_results(sort_by=["name"])
assert len(all_results) == 30
assert all_results[0].name == "DC1-LEAF1A"
assert all_results[-1].name == "DC1-SPINE1"
# Check multiple statuses with sort_by categories
success_skipped_results = result_manager.get_results(status={AntaTestStatus.SUCCESS, AntaTestStatus.SKIPPED}, sort_by=["categories"])
assert len(success_skipped_results) == 9
assert success_skipped_results[0].categories == ["Interfaces"]
assert success_skipped_results[-1].categories == ["VXLAN"]
# Check all results with bad sort_by
with pytest.raises(
ValueError,
match=re.escape(
"Invalid sort_by fields: ['bad_field']. Accepted fields are: ['name', 'test', 'categories', 'description', 'result', 'messages', 'custom_field']",
),
):
all_results = result_manager.get_results(sort_by=["bad_field"])
def test_get_total_results(self, result_manager: ResultManager) -> None:
"""Test ResultManager.get_total_results."""
# Test all results
assert result_manager.get_total_results() == 30
# Test single status
assert result_manager.get_total_results(status={AntaTestStatus.SUCCESS}) == 7
assert result_manager.get_total_results(status={AntaTestStatus.FAILURE}) == 19
assert result_manager.get_total_results(status={AntaTestStatus.ERROR}) == 2
assert result_manager.get_total_results(status={AntaTestStatus.SKIPPED}) == 2
# Test multiple statuses
assert result_manager.get_total_results(status={AntaTestStatus.SUCCESS, AntaTestStatus.FAILURE}) == 26
assert result_manager.get_total_results(status={AntaTestStatus.SUCCESS, AntaTestStatus.FAILURE, AntaTestStatus.ERROR}) == 28
assert result_manager.get_total_results(status={AntaTestStatus.SUCCESS, AntaTestStatus.FAILURE, AntaTestStatus.ERROR, AntaTestStatus.SKIPPED}) == 30
@pytest.mark.parametrize(
("status", "error_status", "ignore_error", "expected_status"),
[
@ -159,7 +263,7 @@ class TestResultManager:
)
def test_get_status(
self,
status: TestStatus,
status: AntaTestStatus,
error_status: bool,
ignore_error: bool,
expected_status: str,
@ -177,28 +281,28 @@ class TestResultManager:
success_list = list_result_factory(3)
for test in success_list:
test.result = "success"
test.result = AntaTestStatus.SUCCESS
result_manager.results = success_list
test = test_result_factory()
test.result = "failure"
test.result = AntaTestStatus.FAILURE
result_manager.add(test)
test = test_result_factory()
test.result = "error"
test.result = AntaTestStatus.ERROR
result_manager.add(test)
test = test_result_factory()
test.result = "skipped"
test.result = AntaTestStatus.SKIPPED
result_manager.add(test)
assert len(result_manager) == 6
assert len(result_manager.filter({"failure"})) == 5
assert len(result_manager.filter({"error"})) == 5
assert len(result_manager.filter({"skipped"})) == 5
assert len(result_manager.filter({"failure", "error"})) == 4
assert len(result_manager.filter({"failure", "error", "skipped"})) == 3
assert len(result_manager.filter({"success", "failure", "error", "skipped"})) == 0
assert len(result_manager.filter({AntaTestStatus.FAILURE})) == 5
assert len(result_manager.filter({AntaTestStatus.ERROR})) == 5
assert len(result_manager.filter({AntaTestStatus.SKIPPED})) == 5
assert len(result_manager.filter({AntaTestStatus.FAILURE, AntaTestStatus.ERROR})) == 4
assert len(result_manager.filter({AntaTestStatus.FAILURE, AntaTestStatus.ERROR, AntaTestStatus.SKIPPED})) == 3
assert len(result_manager.filter({AntaTestStatus.SUCCESS, AntaTestStatus.FAILURE, AntaTestStatus.ERROR, AntaTestStatus.SKIPPED})) == 0
def test_get_by_tests(self, test_result_factory: Callable[[], TestResult], result_manager_factory: Callable[[int], ResultManager]) -> None:
"""Test ResultManager.get_by_tests."""