1
0
Fork 0

Merging upstream version 0.12.0.

Signed-off-by: Daniel Baumann <daniel@debian.org>
This commit is contained in:
Daniel Baumann 2025-02-10 06:39:52 +01:00
parent f45bc3d463
commit 8d2f70e3c7
Signed by: daniel
GPG key ID: FBB4F0E80A80222F
77 changed files with 23610 additions and 2331 deletions

View file

@ -0,0 +1 @@
"""eos_downloader logics"""

View file

@ -0,0 +1,272 @@
#!/usr/bin/python
# coding: utf-8 -*-
# pylint: disable=too-many-positional-arguments
# pylint: disable=dangerous-default-value
"""Server module for handling interactions with Arista software download portal.
This module provides the AristaServer class which manages authentication and
file retrieval operations with the Arista software download portal. It handles
session management, XML data retrieval, and download URL generation.
Classes
-------
AristaServer
Main class for interacting with the Arista software portal.
Dependencies
-----------
- base64: For encoding authentication tokens
- json: For handling JSON data in requests
- xml.etree.ElementTree: For parsing XML responses
- loguru: For logging
- requests: For making HTTP requests
Example
-------
>>> from eos_downloader.logics.server import AristaServer
>>> server = AristaServer(token='my_auth_token')
>>> server.authenticate()
>>> xml_data = server.get_xml_data()
>>> download_url = server.get_url('/path/to/file')
Notes
-----
The module requires valid authentication credentials to interact with the Arista portal.
All server interactions are performed over HTTPS and follow Arista's API specifications.
"""
from __future__ import annotations
import base64
import logging
import json
from typing import Dict, Union, Any
import xml.etree.ElementTree as ET
from loguru import logger
import requests
import eos_downloader.exceptions
import eos_downloader.defaults
class AristaServer:
"""AristaServer class to handle authentication and interactions with Arista software download portal.
This class provides methods to authenticate with the Arista software portal,
retrieve XML data containing available software packages, and generate download URLs
for specific files.
Attributes
----------
token : str, optional
Authentication token for Arista portal access
timeout : int, default=5
Timeout in seconds for HTTP requests
session_server : str
URL of the authentication server
headers : Dict[str, any]
HTTP headers to use in requests
xml_url : str
URL to retrieve software package XML data
download_server : str
Base URL for file downloads
_session_id : str
Session ID obtained after authentication
Methods
-------
authenticate(token: Union[bool, None] = None) -> bool
Authenticates with the Arista portal using provided or stored token
get_xml_data() -> ET.ElementTree
Retrieves XML data containing available software packages
get_url(remote_file_path: str) -> Union[str, None]
Generates download URL for a specific file path
Raises
------
eos_downloader.exceptions.AuthenticationError
When authentication fails due to invalid or expired token
"""
def __init__(
self,
token: Union[str, None] = None,
timeout: int = 5,
session_server: str = eos_downloader.defaults.DEFAULT_SERVER_SESSION,
headers: Dict[str, Any] = eos_downloader.defaults.DEFAULT_REQUEST_HEADERS,
xml_url: str = eos_downloader.defaults.DEFAULT_SOFTWARE_FOLDER_TREE,
download_server: str = eos_downloader.defaults.DEFAULT_DOWNLOAD_URL,
) -> None:
"""Initialize the Server class with optional parameters.
Parameters
----------
token : Union[str, None], optional
Authentication token. Defaults to None.
timeout : int, optional
Request timeout in seconds. Defaults to 5.
session_server : str, optional
URL of the session server. Defaults to DEFAULT_SERVER_SESSION.
headers : Dict[str, any], optional
HTTP headers for requests. Defaults to DEFAULT_REQUEST_HEADERS.
xml_url : str, optional
URL of the software folder tree XML. Defaults to DEFAULT_SOFTWARE_FOLDER_TREE.
download_server : str, optional
Base URL for downloads. Defaults to DEFAULT_DOWNLOAD_URL.
Returns
-------
None
"""
self.token: Union[str, None] = token
self._session_server = session_server
self._headers = headers
self._timeout = timeout
self._xml_url = xml_url
self._download_server = download_server
self._session_id = None
logging.info(f"Initialized AristaServer with headers: {self._headers}")
def authenticate(self, token: Union[str, None] = None) -> bool:
"""Authenticate to the API server using access token.
The token is encoded in base64 and sent to the server for authentication.
A session ID is retrieved from the server response if authentication is successful.
Parameters
----------
token : Union[str, None], optional
Access token for authentication. If None, uses existing token stored in instance. Defaults to None.
Returns
-------
bool
True if authentication successful, False otherwise
Raises
------
eos_downloader.exceptions.AuthenticationError
If access token is invalid or expired
"""
if token is not None:
self.token = token
if self.token is None:
logger.error("No token provided for authentication")
return False
credentials = (base64.b64encode(self.token.encode())).decode("utf-8")
jsonpost = {"accessToken": credentials}
result = requests.post(
self._session_server,
data=json.dumps(jsonpost),
timeout=self._timeout,
headers=self._headers,
)
if result.json()["status"]["message"] in [
"Access token expired",
"Invalid access token",
]:
logging.critical(
f"Authentication failed: {result.json()['status']['message']}"
)
raise eos_downloader.exceptions.AuthenticationError
# return False
try:
if "data" in result.json():
self._session_id = result.json()["data"]["session_code"]
logging.info(f"Authenticated with session ID: {self._session_id}")
return True
except KeyError as error:
logger.error(
f"Key Error in parsing server response ({result.json()}): {error}"
)
return False
return False
def get_xml_data(self) -> Union[ET.ElementTree, None]:
"""Retrieves XML data from the server.
This method fetches XML data by making a POST request to the server's XML endpoint.
If not already authenticated, it will initiate the authentication process first.
Returns
-------
ET.ElementTree
An ElementTree object containing the parsed XML data from the server response.
Raises
------
KeyError
If the server response doesn't contain the expected data structure.
Notes
-----
The method requires a valid session ID which is obtained through authentication.
The XML data is expected to be in the response JSON under data.xml path.
"""
logging.info(f"Getting XML data from server {self._session_server}")
if self._session_id is None:
logging.debug("Not authenticated to server, start authentication process")
self.authenticate()
jsonpost = {"sessionCode": self._session_id}
result = requests.post(
self._xml_url,
data=json.dumps(jsonpost),
timeout=self._timeout,
headers=self._headers,
)
try:
folder_tree = result.json()["data"]["xml"]
logging.debug("XML data received from Arista server")
return ET.ElementTree(ET.fromstring(folder_tree))
except KeyError as error:
logger.error(f"Unkown key in server response: {error}")
return None
def get_url(self, remote_file_path: str) -> Union[str, None]:
"""Get download URL for a remote file from server.
This method retrieves the download URL for a specified remote file by making a POST request
to the server. If not authenticated, it will first authenticate before making the request.
Parameters
----------
remote_file_path : str
Path to the remote file on server to get download URL for
Returns
-------
Union[str, None]
The download URL if successful, None if request fails or URL not found in response
Raises
------
requests.exceptions.RequestException
If the request to server fails
json.JSONDecodeError
If server response is not valid JSON
requests.exceptions.Timeout
If server request times out
"""
logging.info(f"Getting download URL for {remote_file_path}")
if self._session_id is None:
logging.debug("Not authenticated to server, start authentication process")
self.authenticate()
jsonpost = {"sessionCode": self._session_id, "filePath": remote_file_path}
result = requests.post(
self._download_server,
data=json.dumps(jsonpost),
timeout=self._timeout,
headers=self._headers,
)
if "data" in result.json() and "url" in result.json()["data"]:
# logger.debug('URL to download file is: {}', result.json())
logging.info("Download URL received from server")
logging.debug(f'URL to download file is: {result.json()["data"]["url"]}')
return result.json()["data"]["url"]
return None

View file

@ -0,0 +1,536 @@
# coding: utf-8 -*-
"""This module provides classes for managing and querying Arista XML data.
Classes:
AristaXmlBase: Base class for Arista XML data management.
AristaXmlQuerier: Class to query Arista XML data for Software versions.
AristaXmlObject: Base class for Arista XML data management with specific software and version.
EosXmlObject: Class to query Arista XML data for EOS versions.
""" # noqa: E501
from __future__ import annotations
import logging
import xml.etree.ElementTree as ET
from typing import ClassVar, Union, List, Dict
import eos_downloader.logics.arista_server
import eos_downloader.models.version
import eos_downloader.models.data
from eos_downloader.models.types import AristaPackage, AristaVersions, AristaMapping
class AristaXmlBase:
# pylint: disable=too-few-public-methods
"""Base class for Arista XML data management."""
# File extensions supported to be downloaded from arista server.
# Should cover: image file (image) and has files (md5sum and/or sha512sum)
supported_role_types: ClassVar[List[str]] = ["image", "md5sum", "sha512sum"]
def __init__(
self, token: Union[str, None] = None, xml_path: Union[str, None] = None
) -> None:
"""
Initialize the AristaXmlBase class.
Parameters
----------
token : Union[str, None], optional
Authentication token. Defaults to None.
xml_path : Union[str, None], optional
Path to the XML file. Defaults to None.
Returns
-------
None
"""
logging.info("Initializing AristXmlBase.")
self.server = eos_downloader.logics.arista_server.AristaServer(token=token)
if xml_path is not None:
try:
self.xml_data = ET.parse(xml_path)
except ET.ParseError as error:
logging.error(f"Error while parsing XML data: {error}")
else:
if self.server.authenticate():
data = self._get_xml_root()
if data is None:
logging.error("Unable to get XML data from Arista server")
raise ValueError("Unable to get XML data from Arista server")
self.xml_data = data
else:
logging.error("Unable to authenticate to Arista server")
raise ValueError("Unable to authenticate to Arista server")
def _get_xml_root(self) -> Union[ET.ElementTree, None]:
"""
Retrieves the XML root from the Arista server.
Returns
-------
Union[ET.ElementTree, None]
The XML root element tree if successful, None otherwise.
"""
logging.info("Getting XML root from Arista server.")
try:
return self.server.get_xml_data()
except Exception as error: # pylint: disable=broad-except
logging.error(f"Error while getting XML data from Arista server: {error}")
return None
class AristaXmlQuerier(AristaXmlBase):
"""Class to query Arista XML data for Software versions."""
def available_public_versions(
self,
branch: Union[str, None] = None,
rtype: Union[str, None] = None,
package: AristaPackage = "eos",
) -> List[AristaVersions]:
"""
Get list of available public EOS versions from Arista website.
This method parses XML data to extract available EOS or CVP versions based on specified criteria.
Parameters
----------
branch : Union[str, None], optional
Branch number to filter versions (e.g. "4.29"). Defaults to None.
rtype : Union[str, None], optional
Release type to filter versions. Must be one of the valid release types defined in RTYPES. Defaults to None.
package : AristaPackage, optional
Type of package to look for - either 'eos' or 'cvp'. Defaults to 'eos'.
Returns
-------
List[AristaVersions]
List of version objects (EosVersion or CvpVersion) matching the criteria.
Raises
------
ValueError
If provided rtype is not in the list of valid release types.
Examples
--------
>>> server.available_public_eos_version(branch="4.29", rtype="INT", package="eos")
[EosVersion('4.29.0F-INT'), EosVersion('4.29.1F-INT'), ...]
"""
logging.info(f"Getting available versions for {package} package")
xpath_query = './/dir[@label="Active Releases"]//dir[@label]'
regexp = eos_downloader.models.version.EosVersion.regex_version
if package == "cvp":
xpath_query = './/dir[@label="Active Releases"]//dir[@label]'
regexp = eos_downloader.models.version.CvpVersion.regex_version
package_versions = []
if rtype is not None and rtype not in eos_downloader.models.data.RTYPES:
raise ValueError(
f"Invalid release type: {rtype}. Expected one of {eos_downloader.models.data.RTYPES}"
)
nodes = self.xml_data.findall(xpath_query)
for node in nodes:
if "label" in node.attrib and node.get("label") is not None:
label = node.get("label")
if label is not None and regexp.match(label):
package_version = None
if package == "eos":
package_version = (
eos_downloader.models.version.EosVersion.from_str(label)
)
elif package == "cvp":
package_version = (
eos_downloader.models.version.CvpVersion.from_str(label)
)
package_versions.append(package_version)
if rtype is not None or branch is not None:
package_versions = [
version
for version in package_versions
if version is not None
and (rtype is None or version.rtype == rtype)
and (branch is None or str(version.branch) == branch)
]
return package_versions
def latest(
self,
package: eos_downloader.models.types.AristaPackage = "eos",
branch: Union[str, None] = None,
rtype: Union[eos_downloader.models.types.ReleaseType, None] = None,
) -> AristaVersions:
"""
Get latest branch from semver standpoint.
Parameters
----------
package : eos_downloader.models.types.AristaPackage, optional
Type of package to look for - either 'eos' or 'cvp'. Defaults to 'eos'.
branch : Union[str, None], optional
Branch to search for. Defaults to None.
rtype : Union[eos_downloader.models.types.ReleaseType, None], optional
Release type to search for. Defaults to None.
Returns
-------
AristaVersions
Latest version found.
Raises
------
ValueError
If no versions are found to run the max() function.
"""
if package == "eos":
if rtype is not None and rtype not in eos_downloader.models.data.RTYPES:
raise ValueError(
f"Invalid release type: {rtype}. Expected {eos_downloader.models.data.RTYPES}"
)
versions = self.available_public_versions(
package=package, branch=branch, rtype=rtype
)
if len(versions) == 0:
raise ValueError("No versions found to run the max() function")
return max(versions)
def branches(
self,
package: eos_downloader.models.types.AristaPackage = "eos",
latest: bool = False,
) -> List[str]:
"""
Returns a list of valid EOS version branches.
The branches are determined based on the available public EOS versions.
When latest=True, only the most recent branch is returned.
Parameters
----------
package : eos_downloader.models.types.AristaPackage, optional
Type of package to look for - either 'eos' or 'cvp'. Defaults to 'eos'.
latest : bool, optional
If True, returns only the latest branch version. Defaults to False.
Returns
-------
List[str]
A list of branch version strings. Contains single latest version if latest=True,
otherwise all available versions sorted descendingly.
"""
if latest:
latest_branch = max(
self._get_branches(self.available_public_versions(package=package))
)
return [str(latest_branch)]
return sorted(
self._get_branches(self.available_public_versions(package=package)),
reverse=True,
)
def _get_branches(
self,
versions: Union[
List[eos_downloader.models.version.EosVersion],
List[eos_downloader.models.version.CvpVersion],
],
) -> List[str]:
"""
Extracts unique branch names from a list of version objects.
Parameters
----------
versions : Union[List[eos_downloader.models.version.EosVersion], List[eos_downloader.models.version.CvpVersion]]
A list of version objects, either EosVersion or CvpVersion types.
Returns
-------
List[str]
A list of unique branch names.
"""
branch = [version.branch for version in versions]
return list(set(branch))
class AristaXmlObject(AristaXmlBase):
"""Base class for Arista XML data management."""
software: ClassVar[AristaMapping]
base_xpath_active_version: ClassVar[str]
base_xpath_filepath: ClassVar[str]
checksum_file_extension: ClassVar[str] = "sha512sum"
def __init__(
self,
searched_version: str,
image_type: str,
token: Union[str, None] = None,
xml_path: Union[str, None] = None,
) -> None:
"""
Initialize the AristaXmlObject class.
Parameters
----------
searched_version : str
The version of the software to search for.
image_type : str
The type of image to download.
token : Union[str, None], optional
Authentication token. Defaults to None.
xml_path : Union[str, None], optional
Path to the XML file. Defaults to None.
Returns
-------
None
"""
self.search_version = searched_version
self.image_type = image_type
super().__init__(token=token, xml_path=xml_path)
@property
def filename(self) -> Union[str, None]:
"""
Helper to build filename to search on arista.com.
Returns
-------
Union[str, None]
Filename to search for on Arista.com.
"""
logging.info(
f"Building filename for {self.image_type} package: {self.search_version}."
)
try:
filename = eos_downloader.models.data.software_mapping.filename(
self.software, self.image_type, self.search_version
)
return filename
except ValueError as e:
logging.error(f"Error: {e}")
return None
def hash_filename(self) -> Union[str, None]:
"""
Helper to build filename for checksum to search on arista.com.
Returns
-------
Union[str, None]
Filename to search for on Arista.com.
"""
logging.info(f"Building hash filename for {self.software} package.")
if self.filename is not None:
return f"{self.filename}.{self.checksum_file_extension}"
return None
def path_from_xml(self, search_file: str) -> Union[str, None]:
"""
Parse XML to find path for a given file.
Parameters
----------
search_file : str
File to search for.
Returns
-------
Union[str, None]
Path from XML if found, None otherwise.
"""
logging.info(f"Building path from XML for {search_file}.")
# Build xpath with provided file
xpath_query = self.base_xpath_filepath.format(search_file)
# Find the element using XPath
path_element = self.xml_data.find(xpath_query)
if path_element is not None:
logging.debug(f'found path: {path_element.get("path")} for {search_file}')
# Return the path if found, otherwise return None
return path_element.get("path") if path_element is not None else None
def _url(self, xml_path: str) -> Union[str, None]:
"""
Get URL to download a file from Arista server.
Parameters
----------
xml_path : str
Path to the file in the XML.
Returns
-------
Union[str, None]
URL to download the file.
"""
logging.info(f"Getting URL for {xml_path}.")
return self.server.get_url(xml_path)
@property
def urls(self) -> Dict[str, Union[str, None]]:
"""
Get URLs to download files from Arista server for given software and version.
This method will return a dictionary with file type as key and URL as value.
It returns URL for the following items: 'image', 'md5sum', and 'sha512sum'.
Returns
-------
Dict[str, Union[str, None]]
Dictionary with file type as key and URL as value.
Raises
------
ValueError
If filename or hash file is not found.
"""
logging.info(f"Getting URLs for {self.software} package.")
urls = {}
if self.filename is None:
raise ValueError("Filename not found")
for role in self.supported_role_types:
file_path = None
logging.debug(f"working on {role}")
hash_filename = self.hash_filename()
if hash_filename is None:
raise ValueError("Hash file not found")
if role == "image":
file_path = self.path_from_xml(self.filename)
elif role == self.checksum_file_extension:
file_path = self.path_from_xml(hash_filename)
if file_path is not None:
logging.info(f"Adding {role} with {file_path} to urls dict")
urls[role] = self._url(file_path)
logging.debug(f"URLs dict contains: {urls}")
return urls
class EosXmlObject(AristaXmlObject):
"""Class to query Arista XML data for EOS versions."""
software: ClassVar[AristaMapping] = "EOS"
base_xpath_active_version: ClassVar[
str
] = './/dir[@label="Active Releases"]/dir/dir/[@label]'
base_xpath_filepath: ClassVar[str] = './/file[.="{}"]'
# File extensions supported to be downloaded from arista server.
# Should cover: image file (image) and has files (md5sum and/or sha512sum)
supported_role_types: ClassVar[List[str]] = ["image", "md5sum", "sha512sum"]
checksum_file_extension: ClassVar[str] = "sha512sum"
def __init__(
self,
searched_version: str,
image_type: str,
token: Union[str, None] = None,
xml_path: Union[str, None] = None,
) -> None:
"""
Initialize an instance of the EosXmlObject class.
Parameters
----------
searched_version : str
The version of the software to search for.
image_type : str
The type of image to download.
token : Union[str, None], optional
The authentication token. Defaults to None.
xml_path : Union[str, None], optional
The path to the XML file. Defaults to None.
Returns
-------
None
"""
self.search_version = searched_version
self.image_type = image_type
self.version = eos_downloader.models.version.EosVersion().from_str(
searched_version
)
super().__init__(
searched_version=searched_version,
image_type=image_type,
token=token,
xml_path=xml_path,
)
class CvpXmlObject(AristaXmlObject):
"""Class to query Arista XML data for CVP versions."""
software: ClassVar[AristaMapping] = "CloudVision"
base_xpath_active_version: ClassVar[
str
] = './/dir[@label="Active Releases"]/dir/dir/[@label]'
base_xpath_filepath: ClassVar[str] = './/file[.="{}"]'
# File extensions supported to be downloaded from arista server.
# Should cover: image file (image) and has files (md5sum and/or sha512sum)
supported_role_types: ClassVar[List[str]] = ["image", "md5"]
checksum_file_extension: ClassVar[str] = "md5"
def __init__(
self,
searched_version: str,
image_type: str,
token: Union[str, None] = None,
xml_path: Union[str, None] = None,
) -> None:
"""
Initialize an instance of the CvpXmlObject class.
Parameters
----------
searched_version : str
The version of the software to search for.
image_type : str
The type of image to download.
token : Union[str, None], optional
The authentication token. Defaults to None.
xml_path : Union[str, None], optional
The path to the XML file. Defaults to None.
Returns
-------
None
"""
self.search_version = searched_version
self.image_type = image_type
self.version = eos_downloader.models.version.CvpVersion().from_str(
searched_version
)
super().__init__(
searched_version=searched_version,
image_type=image_type,
token=token,
xml_path=xml_path,
)
# Create the custom type
AristaXmlObjects = Union[CvpXmlObject, EosXmlObject]

View file

@ -0,0 +1,494 @@
# coding: utf-8 -*-
"""ObjectDownloader class to manage file downloads with an option to use rich interface.
This class provides methods to download files from URLs with progress tracking using either
tqdm or rich interface. It supports both raw downloads and enhanced visual feedback during
the download process.
Methods
--------
download_file(url: str, file_path: str, filename: str, rich_interface: bool = True) -> Union[None, str]
Downloads a file from the given URL to the specified path with optional rich interface.
_download_file_raw(url: str, file_path: str) -> str
Static method that performs the actual file download with tqdm progress bar.
Attributes
--------
None
Example
--------
>>> downloader = ObjectDownloader()
>>> result = downloader.download_file(
... url='http://example.com/file.zip',
... file_path='/downloads',
... filename='file.zip',
... rich_interface=True
... )
"""
import os
import shutil
import hashlib
from typing import Union, Literal, Dict
import logging
import requests
from tqdm import tqdm
import eos_downloader.models.types
import eos_downloader.defaults
import eos_downloader.helpers
import eos_downloader.logics
import eos_downloader.logics.arista_xml_server
import eos_downloader.models.version
class SoftManager:
"""SoftManager helps to download files from a remote location.
This class provides methods to download files using either a simple progress bar
or a rich interface with enhanced visual feedback.
Examples
--------
>>> downloader = SoftManager()
>>> downloader.download_file(
... url="http://example.com/file.txt",
... file_path="/tmp",
... filename="file.txt"
... )
'/tmp/file.txt'
"""
def __init__(self, dry_run: bool = False) -> None:
self.file: Dict[str, Union[str, None]] = {}
self.file["name"] = None
self.file["md5sum"] = None
self.file["sha512sum"] = None
self.dry_run = dry_run
logging.info("SoftManager initialized%s", " in dry-run mode" if dry_run else "")
@staticmethod
def _download_file_raw(url: str, file_path: str) -> str:
"""Downloads a file from a URL and saves it to a local file.
Parameters
----------
url : str
The URL of the file to download.
file_path : str
The local path where the file will be saved.
Returns
-------
str
The path to the downloaded file.
Notes
-----
- Uses requests library to stream download in chunks of 1024 bytes
- Shows download progress using tqdm progress bar
- Sets timeout of 5 seconds for initial connection
"""
chunkSize = 1024
r = requests.get(url, stream=True, timeout=5)
with open(file_path, "wb") as f:
pbar = tqdm(
unit="B",
total=int(r.headers["Content-Length"]),
unit_scale=True,
unit_divisor=1024,
)
for chunk in r.iter_content(chunk_size=chunkSize):
if chunk:
pbar.update(len(chunk))
f.write(chunk)
return file_path
@staticmethod
def _create_destination_folder(path: str) -> None:
"""Creates a directory path if it doesn't already exist.
Parameters
----------
path : str
The directory path to create.
Returns
-------
None
"""
try:
os.makedirs(path, exist_ok=True)
except OSError as e:
logging.critical(f"Error creating folder: {e}")
def _compute_hash_md5sum(self, file: str, hash_expected: str) -> bool:
"""
Compare MD5 sum.
Do comparison between local md5 of the file and value provided by arista.com.
Parameters
----------
file : str
Local file to use for MD5 sum.
hash_expected : str
MD5 from arista.com.
Returns
-------
bool
True if both are equal, False if not.
"""
hash_md5 = hashlib.md5()
with open(file, "rb") as f:
while True:
chunk = f.read(4096)
if not chunk:
break
hash_md5.update(chunk)
if hash_md5.hexdigest() == hash_expected:
return True
logging.warning(
f"Downloaded file is corrupt: local md5 ({hash_md5.hexdigest()}) is different to md5 from arista ({hash_expected})"
)
return False
def checksum(self, check_type: Literal["md5sum", "sha512sum", "md5"]) -> bool:
"""
Verifies the integrity of a downloaded file using a specified checksum algorithm.
Parameters
----------
check_type : Literal['md5sum', 'sha512sum', 'md5']
The type of checksum to perform. Currently supports 'md5sum' or 'sha512sum'.
Returns
-------
bool
True if the checksum verification passes.
Raises
------
ValueError
If the calculated checksum does not match the expected checksum.
FileNotFoundError
If either the checksum file or the target file cannot be found.
Examples
--------
>>> client.checksum('sha512sum') # Returns True if checksum matches
"""
logging.info(f"Checking checksum for {self.file['name']} using {check_type}")
if self.dry_run:
logging.debug("Dry-run mode enabled, skipping checksum verification")
return True
if check_type == "sha512sum":
hash_sha512 = hashlib.sha512()
hash512sum = self.file["sha512sum"]
file_name = self.file["name"]
logging.debug(f"checksum sha512sum file is: {hash512sum}")
if file_name is None or hash512sum is None:
logging.error("File or checksum not found")
raise ValueError("File or checksum not found")
with open(hash512sum, "r", encoding="utf-8") as f:
hash_expected = f.read().split()[0]
with open(file_name, "rb") as f:
while True:
chunk = f.read(4096)
if not chunk:
break
hash_sha512.update(chunk)
if hash_sha512.hexdigest() != hash_expected:
logging.error(
f"Checksum failed for {self.file['name']}: computed {hash_sha512.hexdigest()} - expected {hash_expected}"
)
raise ValueError("Incorrect checksum")
return True
if check_type in ["md5sum", "md5"]:
md5sum_file = self.file["md5sum"]
file_name = self.file["name"]
if md5sum_file is None:
raise ValueError(f"md5sum is not found: {md5sum_file}")
with open(md5sum_file, "r", encoding="utf-8") as f:
hash_expected = f.read().split()[0]
if hash_expected is None:
raise ValueError("MD5Sum is empty, cannot compute file.")
if file_name is None:
raise ValueError("Filename is None. Please fix it")
if not self._compute_hash_md5sum(file_name, hash_expected=hash_expected):
logging.error(
f"Checksum failed for {self.file['name']}: expected {hash_expected}"
)
raise ValueError("Incorrect checksum")
return True
logging.error(f"Checksum type {check_type} not yet supported")
raise ValueError(f"Checksum type {check_type} not yet supported")
def download_file(
self, url: str, file_path: str, filename: str, rich_interface: bool = True
) -> Union[None, str]:
"""
Downloads a file from a given URL to a specified location.
Parameters
----------
url : str
The URL from which to download the file.
file_path : str
The directory path where the file should be saved.
filename : str
The name to be given to the downloaded file.
rich_interface : bool, optional
Whether to use rich progress bar interface. Defaults to True.
Returns
-------
Union[None, str]
The full path to the downloaded file if successful, None if download fails.
"""
logging.info(
f"{'[DRY-RUN] Would download' if self.dry_run else 'Downloading'} {filename} from {url}"
)
if self.dry_run:
return os.path.join(file_path, filename)
if url is not False:
if not rich_interface:
return self._download_file_raw(
url=url, file_path=os.path.join(file_path, filename)
)
rich_downloader = eos_downloader.helpers.DownloadProgressBar()
rich_downloader.download(urls=[url], dest_dir=file_path)
return os.path.join(file_path, filename)
logging.error(f"Cannot download file {file_path}")
return None
def downloads(
self,
object_arista: eos_downloader.logics.arista_xml_server.AristaXmlObjects,
file_path: str,
rich_interface: bool = True,
) -> Union[None, str]:
"""
Downloads files from Arista EOS server.
Downloads the EOS image and optional md5/sha512 files based on the provided EOS XML object.
Each file is downloaded to the specified path with appropriate filenames.
Parameters
----------
object_arista : eos_downloader.logics.arista_xml_server.AristaXmlObjects
Object containing EOS image and hash file URLs.
file_path : str
Directory path where files should be downloaded.
rich_interface : bool, optional
Whether to use rich console output. Defaults to True.
Returns
-------
Union[None, str]
The file path where files were downloaded, or None if download failed.
Examples
--------
>>> client.downloads(eos_obj, "/tmp/downloads")
'/tmp/downloads'
"""
logging.info(f"Downloading files from {object_arista.version}")
if len(object_arista.urls) == 0:
logging.error("No URLs found for download")
raise ValueError("No URLs found for download")
for file_type, url in sorted(object_arista.urls.items(), reverse=True):
logging.debug(f"Downloading {file_type} from {url}")
if file_type == "image":
filename = object_arista.filename
self.file["name"] = filename
else:
filename = object_arista.hash_filename()
self.file[file_type] = filename
if url is None:
logging.error(f"URL not found for {file_type}")
raise ValueError(f"URL not found for {file_type}")
if filename is None:
logging.error(f"Filename not found for {file_type}")
raise ValueError(f"Filename not found for {file_type}")
if not self.dry_run:
logging.info(
f"downloading file {filename} for version {object_arista.version}"
)
self.download_file(url, file_path, filename, rich_interface)
else:
logging.info(
f"[DRY-RUN] - downloading file {filename} for version {object_arista.version}"
)
return file_path
def import_docker(
self,
local_file_path: str,
docker_name: str = "arista/ceos",
docker_tag: str = "latest",
) -> None:
"""
Import a local file into a Docker image.
This method imports a local file into Docker with a specified image name and tag.
It checks for the existence of both the local file and docker binary before proceeding.
Parameters
----------
local_file_path : str
Path to the local file to import.
docker_name : str, optional
Name for the Docker image. Defaults to 'arista/ceos'.
docker_tag : str, optional
Tag for the Docker image. Defaults to 'latest'.
Raises
------
FileNotFoundError
If the local file doesn't exist or docker binary is not found.
Exception
If the docker import operation fails.
Returns
-------
None
"""
logging.info(
f"Importing {local_file_path} to {docker_name}:{docker_tag} in local docker enginge"
)
if os.path.exists(local_file_path) is False:
raise FileNotFoundError(f"File {local_file_path} not found")
if not shutil.which("docker"):
raise FileNotFoundError(f"File {local_file_path} not found")
try:
cmd = f"$(which docker) import {local_file_path} {docker_name}:{docker_tag}"
if self.dry_run:
logging.info(f"[DRY-RUN] Would execute: {cmd}")
else:
logging.debug("running docker import process")
os.system(cmd)
except Exception as e:
logging.error(f"Error importing docker image: {e}")
raise e
# pylint: disable=too-many-branches
def provision_eve(
self,
object_arista: eos_downloader.logics.arista_xml_server.EosXmlObject,
noztp: bool = False,
) -> None:
"""
Provisions EVE-NG with the specified Arista EOS object.
Parameters
----------
object_arista : eos_downloader.logics.arista_xml_server.EosXmlObject
The Arista EOS object containing version, filename, and URLs.
noztp : bool, optional
If True, disables ZTP (Zero Touch Provisioning). Defaults to False.
Raises
------
ValueError
If no URLs are found for download or if a URL or filename is None.
Returns
-------
None
"""
# EVE-NG provisioning page for vEOS
# https://www.eve-ng.net/index.php/documentation/howtos/howto-add-arista-veos/
logging.info(
f"Provisioning EVE-NG with {object_arista.version} / {object_arista.filename}"
)
file_path = f"{eos_downloader.defaults.EVE_QEMU_FOLDER_PATH}/veos-{object_arista.version}"
filename: Union[str, None] = None
eos_filename = object_arista.filename
if len(object_arista.urls) == 0:
logging.error("No URLs found for download")
raise ValueError("No URLs found for download")
for file_type, url in sorted(object_arista.urls.items(), reverse=True):
logging.debug(f"Downloading {file_type} from {url}")
if file_type == "image":
fname = object_arista.filename
if fname is not None:
filename = fname
if noztp:
filename = f"{os.path.splitext(fname)[0]}-noztp{os.path.splitext(fname)[1]}"
eos_filename = filename
logging.debug(f"filename is {filename}")
self.file["name"] = filename
else:
filename = object_arista.hash_filename()
if filename is not None:
self.file[file_type] = filename
if url is None:
logging.error(f"URL not found for {file_type}")
raise ValueError(f"URL not found for {file_type}")
if filename is None:
logging.error(f"Filename not found for {file_type}")
raise ValueError(f"Filename not found for {file_type}")
if not os.path.exists(file_path):
logging.warning(f"creating folder on eve-ng server : {file_path}")
self._create_destination_folder(path=file_path)
logging.debug(
f"downloading file {filename} for version {object_arista.version}"
)
self.download_file(url, file_path, filename, rich_interface=True)
# Convert to QCOW2 format
file_qcow2 = os.path.join(file_path, "hda.qcow2")
if not self.dry_run:
os.system(
f"$(which qemu-img) convert -f vmdk -O qcow2 {file_path}/{eos_filename} {file_path}/{file_qcow2}"
)
else:
logging.info(
f"{'[DRY-RUN] Would convert' if self.dry_run else 'Converting'} VMDK to QCOW2 format: {file_path}/{eos_filename} to {file_qcow2} "
)
logging.info("Applying unl_wrapper to fix permissions")
if not self.dry_run:
os.system("/opt/unetlab/wrappers/unl_wrapper -a fixpermissions")
else:
logging.info("[DRY-RUN] Would execute unl_wrapper to fix permissions")
# os.system(f"rm -f {file_downloaded}")
# if noztp:
# self._disable_ztp(file_path=file_path)