Adding upstream version 1.4.0.

Signed-off-by: Daniel Baumann <daniel@debian.org>
This commit is contained in:
Daniel Baumann 2025-05-15 09:34:27 +02:00
parent dc7df702ea
commit 7996c81031
Signed by: daniel
GPG key ID: FBB4F0E80A80222F
166 changed files with 13787 additions and 11959 deletions

View file

@ -172,11 +172,11 @@ class TestResultManager:
assert "results_by_status" not in result_manager.__dict__
# Access the cache
assert result_manager.get_total_results() == 30
assert result_manager.get_total_results() == 181
# Check the cache is filled with the correct results count
assert "results_by_status" in result_manager.__dict__
assert sum(len(v) for v in result_manager.__dict__["results_by_status"].values()) == 30
assert sum(len(v) for v in result_manager.__dict__["results_by_status"].values()) == 181
# Add a new test
result_manager.add(result=test_result_factory())
@ -185,46 +185,51 @@ class TestResultManager:
assert "results_by_status" not in result_manager.__dict__
# Access the cache again
assert result_manager.get_total_results() == 31
assert result_manager.get_total_results() == 182
# Check the cache is filled again with the correct results count
assert "results_by_status" in result_manager.__dict__
assert sum(len(v) for v in result_manager.__dict__["results_by_status"].values()) == 31
assert sum(len(v) for v in result_manager.__dict__["results_by_status"].values()) == 182
def test_get_results(self, result_manager: ResultManager) -> None:
"""Test ResultManager.get_results."""
# Check for single status
success_results = result_manager.get_results(status={AntaTestStatus.SUCCESS})
assert len(success_results) == 4
assert len(success_results) == 43
assert all(r.result == "success" for r in success_results)
# Check for multiple statuses
failure_results = result_manager.get_results(status={AntaTestStatus.FAILURE, AntaTestStatus.ERROR})
assert len(failure_results) == 17
assert len(failure_results) == 104
assert all(r.result in {"failure", "error"} for r in failure_results)
# Check all results
all_results = result_manager.get_results()
assert len(all_results) == 30
assert len(all_results) == 181
def test_get_results_sort_by(self, result_manager: ResultManager) -> None:
"""Test ResultManager.get_results with sort_by."""
# Check all results with sort_by result
all_results = result_manager.get_results(sort_by=["result"])
assert len(all_results) == 30
assert [r.result for r in all_results] == ["error"] * 2 + ["failure"] * 15 + ["skipped"] * 9 + ["success"] * 4
assert len(all_results) == 181
assert [r.result for r in all_results] == ["error"] * 1 + ["failure"] * 103 + ["skipped"] * 34 + ["success"] * 43
# Check all results with sort_by device (name)
all_results = result_manager.get_results(sort_by=["name"])
assert len(all_results) == 30
assert len(all_results) == 181
assert all_results[0].name == "s1-spine1"
# Check multiple statuses with sort_by categories
success_skipped_results = result_manager.get_results(status={AntaTestStatus.SUCCESS, AntaTestStatus.SKIPPED}, sort_by=["categories"])
assert len(success_skipped_results) == 13
assert len(success_skipped_results) == 77
assert success_skipped_results[0].categories == ["avt"]
assert success_skipped_results[-1].categories == ["vxlan"]
# Check multiple statuses with sort_by custom_field
success_skipped_results = result_manager.get_results(status={AntaTestStatus.SUCCESS, AntaTestStatus.SKIPPED}, sort_by=["custom_field"])
assert len(success_skipped_results) == 77
assert success_skipped_results[-1].test == "VerifyReloadCause"
# Check all results with bad sort_by
with pytest.raises(
ValueError,
@ -237,18 +242,18 @@ class TestResultManager:
def test_get_total_results(self, result_manager: ResultManager) -> None:
"""Test ResultManager.get_total_results."""
# Test all results
assert result_manager.get_total_results() == 30
assert result_manager.get_total_results() == 181
# Test single status
assert result_manager.get_total_results(status={AntaTestStatus.SUCCESS}) == 4
assert result_manager.get_total_results(status={AntaTestStatus.FAILURE}) == 15
assert result_manager.get_total_results(status={AntaTestStatus.ERROR}) == 2
assert result_manager.get_total_results(status={AntaTestStatus.SKIPPED}) == 9
assert result_manager.get_total_results(status={AntaTestStatus.SUCCESS}) == 43
assert result_manager.get_total_results(status={AntaTestStatus.FAILURE}) == 103
assert result_manager.get_total_results(status={AntaTestStatus.ERROR}) == 1
assert result_manager.get_total_results(status={AntaTestStatus.SKIPPED}) == 34
# Test multiple statuses
assert result_manager.get_total_results(status={AntaTestStatus.SUCCESS, AntaTestStatus.FAILURE}) == 19
assert result_manager.get_total_results(status={AntaTestStatus.SUCCESS, AntaTestStatus.FAILURE, AntaTestStatus.ERROR}) == 21
assert result_manager.get_total_results(status={AntaTestStatus.SUCCESS, AntaTestStatus.FAILURE, AntaTestStatus.ERROR, AntaTestStatus.SKIPPED}) == 30
assert result_manager.get_total_results(status={AntaTestStatus.SUCCESS, AntaTestStatus.FAILURE}) == 146
assert result_manager.get_total_results(status={AntaTestStatus.SUCCESS, AntaTestStatus.FAILURE, AntaTestStatus.ERROR}) == 147
assert result_manager.get_total_results(status={AntaTestStatus.SUCCESS, AntaTestStatus.FAILURE, AntaTestStatus.ERROR, AntaTestStatus.SKIPPED}) == 181
@pytest.mark.parametrize(
("status", "error_status", "ignore_error", "expected_status"),
@ -546,6 +551,32 @@ class TestResultManager:
assert results[2].result == "failure"
assert results[2].test == "Test2"
def test_sort_fields_as_none(self, test_result_factory: Callable[[], TestResult]) -> None:
"""Test sorting by multiple fields."""
result_manager = ResultManager()
test1 = test_result_factory()
test1.result = AntaTestStatus.ERROR
test1.test = "Test3"
test1.custom_field = "custom"
test2 = test_result_factory()
test2.result = AntaTestStatus.ERROR
test2.test = "Test1"
test3 = test_result_factory()
test3.result = AntaTestStatus.FAILURE
test3.test = "Test2"
result_manager.results = [test1, test2, test3]
sorted_manager = result_manager.sort(["custom_field"])
results = sorted_manager.results
assert results[0].result == "error"
assert results[0].test == "Test1"
assert results[1].result == "failure"
assert results[1].test == "Test2"
assert results[2].result == "error"
assert results[2].test == "Test3"
assert results[2].custom_field == "custom"
def test_sort_invalid_field(self) -> None:
"""Test that sort method raises ValueError for invalid sort_by fields."""
result_manager = ResultManager()
@ -561,3 +592,34 @@ class TestResultManager:
"""Test that the sort method is chainable."""
result_manager = ResultManager()
assert isinstance(result_manager.sort(["name"]), ResultManager)
def test_merge_result_manager(self) -> None:
"""Test the merge_results function."""
result = ResultManager()
final_merged_results = ResultManager.merge_results([result])
assert isinstance(final_merged_results, ResultManager)
def test_merge_two_result_managers(self, test_result_factory: Callable[[], TestResult]) -> None:
"""Test merging two non-empty ResultManager instances."""
rm1 = ResultManager()
test1_rm1 = test_result_factory()
test1_rm1.name = "device1"
rm1.add(test1_rm1)
test2_rm1 = test_result_factory()
test2_rm1.name = "device2"
rm1.add(test2_rm1)
rm2 = ResultManager()
test1_rm2 = test_result_factory()
test1_rm2.name = "device3"
rm2.add(test1_rm2)
merged_rm = ResultManager.merge_results([rm1, rm2])
assert len(merged_rm) == 3
assert {r.name for r in merged_rm.results} == {"device1", "device2", "device3"}
def test_merge_empty_list(self) -> None:
"""Test merging an empty list of ResultManager instances."""
merged_rm = ResultManager.merge_results([])
assert isinstance(merged_rm, ResultManager)
assert len(merged_rm) == 0