1
0
Fork 0

Adding upstream version 0.8.1.

Signed-off-by: Daniel Baumann <daniel@debian.org>
This commit is contained in:
Daniel Baumann 2025-02-05 13:43:43 +01:00
parent b16cc97368
commit 55e5e7ac79
Signed by: daniel
GPG key ID: FBB4F0E80A80222F
49 changed files with 4592 additions and 0 deletions

View file

@ -0,0 +1,47 @@
#!/usr/bin/python
# coding: utf-8 -*-
"""
EOS Downloader module.
"""
from __future__ import (absolute_import, division,
print_function, unicode_literals, annotations)
import dataclasses
from typing import Any
import json
import importlib.metadata
__author__ = '@titom73'
__email__ = 'tom@inetsix.net'
__date__ = '2022-03-16'
__version__ = importlib.metadata.version("eos-downloader")
# __all__ = ["CvpAuthenticationItem", "CvFeatureManager", "EOSDownloader", "ObjectDownloader", "reverse"]
ARISTA_GET_SESSION = "https://www.arista.com/custom_data/api/cvp/getSessionCode/"
ARISTA_SOFTWARE_FOLDER_TREE = "https://www.arista.com/custom_data/api/cvp/getFolderTree/"
ARISTA_DOWNLOAD_URL = "https://www.arista.com/custom_data/api/cvp/getDownloadLink/"
MSG_TOKEN_EXPIRED = """The API token has expired. Please visit arista.com, click on your profile and
select Regenerate Token then re-run the script with the new token.
"""
MSG_TOKEN_INVALID = """The API token is incorrect. Please visit arista.com, click on your profile and
check the Access Token. Then re-run the script with the correct token.
"""
MSG_INVALID_DATA = """Invalid data returned by server
"""
EVE_QEMU_FOLDER_PATH = '/opt/unetlab/addons/qemu/'
class EnhancedJSONEncoder(json.JSONEncoder):
"""Custom JSon encoder."""
def default(self, o: Any) -> Any:
if dataclasses.is_dataclass(o):
return dataclasses.asdict(o)
return super().default(o)

View file

76
eos_downloader/cli/cli.py Normal file
View file

@ -0,0 +1,76 @@
#!/usr/bin/env python
# coding: utf-8 -*-
# pylint: disable=no-value-for-parameter
# pylint: disable=cyclic-import
# pylint: disable=too-many-arguments
# pylint: disable=unused-argument
"""
ARDL CLI Baseline.
"""
import click
from rich.console import Console
import eos_downloader
from eos_downloader.cli.get import commands as get_commands
from eos_downloader.cli.debug import commands as debug_commands
from eos_downloader.cli.info import commands as info_commands
@click.group()
@click.pass_context
@click.option('--token', show_envvar=True, default=None, help='Arista Token from your customer account')
def ardl(ctx: click.Context, token: str) -> None:
"""Arista Network Download CLI"""
ctx.ensure_object(dict)
ctx.obj['token'] = token
@click.command()
def version() -> None:
"""Display version of ardl"""
console = Console()
console.print(f'ardl is running version {eos_downloader.__version__}')
@ardl.group(no_args_is_help=True)
@click.pass_context
def get(ctx: click.Context) -> None:
# pylint: disable=redefined-builtin
"""Download Arista from Arista website"""
@ardl.group(no_args_is_help=True)
@click.pass_context
def info(ctx: click.Context) -> None:
# pylint: disable=redefined-builtin
"""List information from Arista website"""
@ardl.group(no_args_is_help=True)
@click.pass_context
def debug(ctx: click.Context) -> None:
# pylint: disable=redefined-builtin
"""Debug commands to work with ardl"""
# ANTA CLI Execution
def cli() -> None:
"""Load ANTA CLI"""
# Load group commands
get.add_command(get_commands.eos)
get.add_command(get_commands.cvp)
info.add_command(info_commands.eos_versions)
debug.add_command(debug_commands.xml)
ardl.add_command(version)
# Load CLI
ardl(
obj={},
auto_envvar_prefix='arista'
)
if __name__ == '__main__':
cli()

View file

View file

@ -0,0 +1,53 @@
#!/usr/bin/env python
# coding: utf-8 -*-
# pylint: disable=no-value-for-parameter
# pylint: disable=too-many-arguments
# pylint: disable=line-too-long
# pylint: disable=duplicate-code
# flake8: noqa E501
"""
Commands for ARDL CLI to get data.
"""
import xml.etree.ElementTree as ET
from xml.dom import minidom
import click
from loguru import logger
from rich.console import Console
import eos_downloader.eos
@click.command()
@click.pass_context
@click.option('--output', default=str('arista.xml'), help='Path to save XML file', type=click.Path(), show_default=True)
@click.option('--log-level', '--log', help='Logging level of the command', default=None, type=click.Choice(['debug', 'info', 'warning', 'error', 'critical'], case_sensitive=False))
def xml(ctx: click.Context, output: str, log_level: str) -> None:
# sourcery skip: remove-unnecessary-cast
"""Extract XML directory structure"""
console = Console()
# Get from Context
token = ctx.obj['token']
logger.remove()
if log_level is not None:
logger.add("eos-downloader.log", rotation="10 MB", level=log_level.upper())
my_download = eos_downloader.eos.EOSDownloader(
image='unset',
software='EOS',
version='unset',
token=token,
hash_method='sha512sum')
my_download.authenticate()
xml_object: ET.ElementTree = my_download._get_folder_tree() # pylint: disable=protected-access
xml_content = xml_object.getroot()
xmlstr = minidom.parseString(ET.tostring(xml_content)).toprettyxml(indent=" ", newl='')
with open(output, "w", encoding='utf-8') as f:
f.write(str(xmlstr))
console.print(f'XML file saved in: { output }')

View file

View file

@ -0,0 +1,137 @@
#!/usr/bin/env python
# coding: utf-8 -*-
# pylint: disable=no-value-for-parameter
# pylint: disable=too-many-arguments
# pylint: disable=line-too-long
# pylint: disable=redefined-builtin
# flake8: noqa E501
"""
Commands for ARDL CLI to get data.
"""
import os
import sys
from typing import Union
import click
from loguru import logger
from rich.console import Console
import eos_downloader.eos
from eos_downloader.models.version import BASE_VERSION_STR, RTYPE_FEATURE, RTYPES
EOS_IMAGE_TYPE = ['64', 'INT', '2GB-INT', 'cEOS', 'cEOS64', 'vEOS', 'vEOS-lab', 'EOS-2GB', 'default']
CVP_IMAGE_TYPE = ['ova', 'rpm', 'kvm', 'upgrade']
@click.command(no_args_is_help=True)
@click.pass_context
@click.option('--image-type', default='default', help='EOS Image type', type=click.Choice(EOS_IMAGE_TYPE), required=True)
@click.option('--version', default=None, help='EOS version', type=str, required=False)
@click.option('--latest', '-l', is_flag=True, type=click.BOOL, default=False, help='Get latest version in given branch. If --branch is not use, get the latest branch with specific release type')
@click.option('--release-type', '-rtype', type=click.Choice(RTYPES, case_sensitive=False), default=RTYPE_FEATURE, help='EOS release type to search')
@click.option('--branch', '-b', type=click.STRING, default=None, help='EOS Branch to list releases')
@click.option('--docker-name', default='arista/ceos', help='Docker image name (default: arista/ceos)', type=str, show_default=True)
@click.option('--output', default=str(os.path.relpath(os.getcwd(), start=os.curdir)), help='Path to save image', type=click.Path(),show_default=True)
# Debugging
@click.option('--log-level', '--log', help='Logging level of the command', default=None, type=click.Choice(['debug', 'info', 'warning', 'error', 'critical'], case_sensitive=False))
# Boolean triggers
@click.option('--eve-ng', is_flag=True, help='Run EVE-NG vEOS provisioning (only if CLI runs on an EVE-NG server)', default=False)
@click.option('--disable-ztp', is_flag=True, help='Disable ZTP process in vEOS image (only available with --eve-ng)', default=False)
@click.option('--import-docker', is_flag=True, help='Import docker image (only available with --image_type cEOSlab)', default=False)
def eos(
ctx: click.Context, image_type: str, output: str, log_level: str, eve_ng: bool, disable_ztp: bool,
import_docker: bool, docker_name: str, version: Union[str, None] = None, release_type: str = RTYPE_FEATURE,
latest: bool = False, branch: Union[str,None] = None
) -> int:
"""Download EOS image from Arista website"""
console = Console()
# Get from Context
token = ctx.obj['token']
if token is None or token == '':
console.print('❗ Token is unset ! Please configure ARISTA_TOKEN or use --token option', style="bold red")
sys.exit(1)
logger.remove()
if log_level is not None:
logger.add("eos-downloader.log", rotation="10 MB", level=log_level.upper())
console.print("🪐 [bold blue]eos-downloader[/bold blue] is starting...", )
console.print(f' - Image Type: {image_type}')
console.print(f' - Version: {version}')
if version is not None:
my_download = eos_downloader.eos.EOSDownloader(
image=image_type,
software='EOS',
version=version,
token=token,
hash_method='sha512sum')
my_download.authenticate()
elif latest:
my_download = eos_downloader.eos.EOSDownloader(
image=image_type,
software='EOS',
version='unset',
token=token,
hash_method='sha512sum')
my_download.authenticate()
if branch is None:
branch = str(my_download.latest_branch(rtype=release_type).branch)
latest_version = my_download.latest_eos(branch, rtype=release_type)
if str(latest_version) == BASE_VERSION_STR:
console.print(f'[red]Error[/red], cannot find any version in {branch} for {release_type} release type')
sys.exit(1)
my_download.version = str(latest_version)
if eve_ng:
my_download.provision_eve(noztp=disable_ztp, checksum=True)
else:
my_download.download_local(file_path=output, checksum=True)
if import_docker:
my_download.docker_import(
image_name=docker_name
)
console.print('✅ processing done !')
sys.exit(0)
@click.command(no_args_is_help=True)
@click.pass_context
@click.option('--format', default='upgrade', help='CVP Image type', type=click.Choice(CVP_IMAGE_TYPE), required=True)
@click.option('--version', default=None, help='CVP version', type=str, required=True)
@click.option('--output', default=str(os.path.relpath(os.getcwd(), start=os.curdir)), help='Path to save image', type=click.Path(),show_default=True)
@click.option('--log-level', '--log', help='Logging level of the command', default=None, type=click.Choice(['debug', 'info', 'warning', 'error', 'critical'], case_sensitive=False))
def cvp(ctx: click.Context, version: str, format: str, output: str, log_level: str) -> int:
"""Download CVP image from Arista website"""
console = Console()
# Get from Context
token = ctx.obj['token']
if token is None or token == '':
console.print('❗ Token is unset ! Please configure ARISTA_TOKEN or use --token option', style="bold red")
sys.exit(1)
logger.remove()
if log_level is not None:
logger.add("eos-downloader.log", rotation="10 MB", level=log_level.upper())
console.print("🪐 [bold blue]eos-downloader[/bold blue] is starting...", )
console.print(f' - Image Type: {format}')
console.print(f' - Version: {version}')
my_download = eos_downloader.eos.EOSDownloader(
image=format,
software='CloudVision',
version=version,
token=token,
hash_method='md5sum')
my_download.authenticate()
my_download.download_local(file_path=output, checksum=False)
console.print('✅ processing done !')
sys.exit(0)

View file

View file

@ -0,0 +1,87 @@
#!/usr/bin/env python
# coding: utf-8 -*-
# pylint: disable=no-value-for-parameter
# pylint: disable=too-many-arguments
# pylint: disable=line-too-long
# pylint: disable=redefined-builtin
# flake8: noqa E501
"""
Commands for ARDL CLI to list data.
"""
import sys
from typing import Union
import click
from loguru import logger
from rich.console import Console
from rich.pretty import pprint
import eos_downloader.eos
from eos_downloader.models.version import BASE_VERSION_STR, RTYPE_FEATURE, RTYPES
@click.command(no_args_is_help=True)
@click.pass_context
@click.option('--latest', '-l', is_flag=True, type=click.BOOL, default=False, help='Get latest version in given branch. If --branch is not use, get the latest branch with specific release type')
@click.option('--release-type', '-rtype', type=click.Choice(RTYPES, case_sensitive=False), default=RTYPE_FEATURE, help='EOS release type to search')
@click.option('--branch', '-b', type=click.STRING, default=None, help='EOS Branch to list releases')
@click.option('--verbose', '-v', is_flag=True, type=click.BOOL, default=False, help='Human readable output. Default is none to use output in script)')
@click.option('--log-level', '--log', help='Logging level of the command', default='warning', type=click.Choice(['debug', 'info', 'warning', 'error', 'critical'], case_sensitive=False))
def eos_versions(ctx: click.Context, log_level: str, branch: Union[str,None] = None, release_type: str = RTYPE_FEATURE, latest: bool = False, verbose: bool = False) -> None:
# pylint: disable = too-many-branches
"""
List Available EOS version on Arista.com website.
Comes with some filters to get latest release (F or M) as well as branch filtering
- To get latest M release available (without any branch): ardl info eos-versions --latest -rtype m
- To get latest F release available: ardl info eos-versions --latest -rtype F
"""
console = Console()
# Get from Context
token = ctx.obj['token']
logger.remove()
if log_level is not None:
logger.add("eos-downloader.log", rotation="10 MB", level=log_level.upper())
my_download = eos_downloader.eos.EOSDownloader(
image='unset',
software='EOS',
version='unset',
token=token,
hash_method='sha512sum')
auth = my_download.authenticate()
if verbose and auth:
console.print('✅ Authenticated on arista.com')
if release_type is not None:
release_type = release_type.upper()
if latest:
if branch is None:
branch = str(my_download.latest_branch(rtype=release_type).branch)
latest_version = my_download.latest_eos(branch, rtype=release_type)
if str(latest_version) == BASE_VERSION_STR:
console.print(f'[red]Error[/red], cannot find any version in {branch} for {release_type} release type')
sys.exit(1)
if verbose:
console.print(f'Branch {branch} has been selected with release type {release_type}')
if branch is not None:
console.print(f'Latest release for {branch}: {latest_version}')
else:
console.print(f'Latest EOS release: {latest_version}')
else:
console.print(f'{ latest_version }')
else:
versions = my_download.get_eos_versions(branch=branch, rtype=release_type)
if verbose:
console.print(f'List of available versions for {branch if branch is not None else "all branches"}')
for version in versions:
console.print(f'{str(version)}')
else:
pprint([str(version) for version in versions])

276
eos_downloader/cvp.py Normal file
View file

@ -0,0 +1,276 @@
#!/usr/bin/python
# coding: utf-8 -*-
"""
CVP Uploader content
"""
import os
from typing import List, Optional, Any
from dataclasses import dataclass
from loguru import logger
from cvprac.cvp_client import CvpClient
from cvprac.cvp_client_errors import CvpLoginError
# from eos_downloader.tools import exc_to_str
# logger = logging.getLogger(__name__)
@dataclass
class CvpAuthenticationItem:
"""
Data structure to represent Cloudvision Authentication
"""
server: str
port: int = 443
token: Optional[str] = None
timeout: int = 1200
validate_cert: bool = False
class Filer():
# pylint: disable=too-few-public-methods
"""
Filer Helper for file management
"""
def __init__(self, path: str) -> None:
self.file_exist = False
self.filename = ''
self.absolute_path = ''
self.relative_path = path
if os.path.exists(path):
self.file_exist = True
self.filename = os.path.basename(path)
self.absolute_path = os.path.realpath(path)
def __repr__(self) -> str:
return self.absolute_path if self.file_exist else ''
class CvFeatureManager():
"""
CvFeatureManager Object to interect with Cloudvision
"""
def __init__(self, authentication: CvpAuthenticationItem) -> None:
"""
__init__ Class Creator
Parameters
----------
authentication : CvpAuthenticationItem
Authentication information to use to connect to Cloudvision
"""
self._authentication = authentication
# self._cv_instance = CvpClient()
self._cv_instance = self._connect(authentication=authentication)
self._cv_images = self.__get_images()
# self._cv_bundles = self.__get_bundles()
def _connect(self, authentication: CvpAuthenticationItem) -> CvpClient:
"""
_connect Connection management
Parameters
----------
authentication : CvpAuthenticationItem
Authentication information to use to connect to Cloudvision
Returns
-------
CvpClient
cvprac session to cloudvision
"""
client = CvpClient()
if authentication.token is not None:
try:
client.connect(
nodes=[authentication.server],
username='',
password='',
api_token=authentication.token,
is_cvaas=True,
port=authentication.port,
cert=authentication.validate_cert,
request_timeout=authentication.timeout
)
except CvpLoginError as error_data:
logger.error(f'Cannot connect to Cloudvision server {authentication.server}')
logger.debug(f'Error message: {error_data}')
logger.info('connected to Cloudvision server')
logger.debug(f'Connection info: {authentication}')
return client
def __get_images(self) -> List[Any]:
"""
__get_images Collect information about images on Cloudvision
Returns
-------
dict
Fact returned by Cloudvision
"""
images = []
logger.debug(' -> Collecting images')
images = self._cv_instance.api.get_images()['data']
return images if self.__check_api_result(images) else []
# def __get_bundles(self):
# """
# __get_bundles [Not In use] Collect information about bundles on Cloudvision
# Returns
# -------
# dict
# Fact returned by Cloudvision
# """
# bundles = []
# logger.debug(' -> Collecting images bundles')
# bundles = self._cv_instance.api.get_image_bundles()['data']
# # bundles = self._cv_instance.post(url='/cvpservice/image/getImageBundles.do?queryparam=&startIndex=0&endIndex=0')['data']
# return bundles if self.__check_api_result(bundles) else None
def __check_api_result(self, arg0: Any) -> bool:
"""
__check_api_result Check API calls return content
Parameters
----------
arg0 : any
Element to test
Returns
-------
bool
True if data are correct False in other cases
"""
logger.debug(arg0)
return len(arg0) > 0
def _does_image_exist(self, image_name: str) -> bool:
"""
_does_image_exist Check if an image is referenced in Cloudvision facts
Parameters
----------
image_name : str
Name of the image to search for
Returns
-------
bool
True if present
"""
return any(image_name == image['name'] for image in self._cv_images) if isinstance(self._cv_images, list) else False
def _does_bundle_exist(self, bundle_name: str) -> bool:
# pylint: disable=unused-argument
"""
_does_bundle_exist Check if an image is referenced in Cloudvision facts
Returns
-------
bool
True if present
"""
# return any(bundle_name == bundle['name'] for bundle in self._cv_bundles)
return False
def upload_image(self, image_path: str) -> bool:
"""
upload_image Upload an image to Cloudvision server
Parameters
----------
image_path : str
Path to the local file to upload
Returns
-------
bool
True if succeeds
"""
image_item = Filer(path=image_path)
if image_item.file_exist is False:
logger.error(f'File not found: {image_item.relative_path}')
return False
logger.info(f'File path for image: {image_item}')
if self._does_image_exist(image_name=image_item.filename):
logger.error("Image found in Cloudvision , Please delete it before running this script")
return False
try:
upload_result = self._cv_instance.api.add_image(filepath=image_item.absolute_path)
except Exception as e: # pylint: disable=broad-exception-caught
logger.error('An error occurred during upload, check CV connection')
logger.error(f'Exception message is: {e}')
return False
logger.debug(f'Upload Result is : {upload_result}')
return True
def build_image_list(self, image_list: List[str]) -> List[Any]:
"""
Builds a list of the image data structures, for a given list of image names.
Parameters
----------
image_list : list
List of software image names
Returns
-------
List:
Returns a list of images, with complete data or None in the event of failure
"""
internal_image_list = []
image_data = None
success = True
for entry in image_list:
for image in self._cv_images:
if image["imageFileName"] == entry:
image_data = image
if image_data is not None:
internal_image_list.append(image_data)
image_data = None
else:
success = False
return internal_image_list if success else []
def create_bundle(self, name: str, images_name: List[str]) -> bool:
"""
create_bundle Create a bundle with a list of images.
Parameters
----------
name : str
Name of the bundle
images_name : List[str]
List of images available on Cloudvision
Returns
-------
bool
True if succeeds
"""
logger.debug(f'Init creation of an image bundle {name} with following images {images_name}')
all_images_present: List[bool] = []
self._cv_images = self.__get_images()
all_images_present.extend(
self._does_image_exist(image_name=image_name)
for image_name in images_name
)
# Bundle Create
if self._does_bundle_exist(bundle_name=name) is False:
logger.debug(f'Creating image bundle {name} with following images {images_name}')
images_data = self.build_image_list(image_list=images_name)
if images_data is not None:
logger.debug('Images information: {images_data}')
try:
data = self._cv_instance.api.save_image_bundle(name=name, images=images_data)
except Exception as e: # pylint: disable=broad-exception-caught
logger.critical(f'{e}')
else:
logger.debug(data)
return True
logger.critical('No data found for images')
return False

93
eos_downloader/data.py Normal file
View file

@ -0,0 +1,93 @@
#!/usr/bin/python
# coding: utf-8 -*-
"""
EOS Downloader Information to use in
eos_downloader.object_downloader.ObjectDownloader._build_filename.
Data are built from content of Arista XML file
"""
# [platform][image][version]
DATA_MAPPING = {
"CloudVision": {
"ova": {
"extension": ".ova",
"prepend": "cvp",
"folder_level": 0
},
"rpm": {
"extension": "",
"prepend": "cvp-rpm-installer",
"folder_level": 0
},
"kvm": {
"extension": "-kvm.tgz",
"prepend": "cvp",
"folder_level": 0
},
"upgrade": {
"extension": ".tgz",
"prepend": "cvp-upgrade",
"folder_level": 0
},
},
"EOS": {
"64": {
"extension": ".swi",
"prepend": "EOS64",
"folder_level": 0
},
"INT": {
"extension": "-INT.swi",
"prepend": "EOS",
"folder_level": 1
},
"2GB-INT": {
"extension": "-INT.swi",
"prepend": "EOS-2GB",
"folder_level": 1
},
"cEOS": {
"extension": ".tar.xz",
"prepend": "cEOS-lab",
"folder_level": 0
},
"cEOS64": {
"extension": ".tar.xz",
"prepend": "cEOS64-lab",
"folder_level": 0
},
"vEOS": {
"extension": ".vmdk",
"prepend": "vEOS",
"folder_level": 0
},
"vEOS-lab": {
"extension": ".vmdk",
"prepend": "vEOS-lab",
"folder_level": 0
},
"EOS-2GB": {
"extension": ".swi",
"prepend": "EOS-2GB",
"folder_level": 0
},
"RN": {
"extension": "-",
"prepend": "RN",
"folder_level": 0
},
"SOURCE": {
"extension": "-source.tar",
"prepend": "EOS",
"folder_level": 0
},
"default": {
"extension": ".swi",
"prepend": "EOS",
"folder_level": 0
}
}
}

View file

@ -0,0 +1,77 @@
# flake8: noqa: F811
# pylint: disable=unused-argument
# pylint: disable=too-few-public-methods
"""download module"""
import os.path
import signal
from concurrent.futures import ThreadPoolExecutor
from threading import Event
from typing import Iterable, Any
import requests
import rich
from rich import console
from rich.progress import (BarColumn, DownloadColumn, Progress, TaskID,
TextColumn, TimeElapsedColumn, TransferSpeedColumn)
console = rich.get_console()
done_event = Event()
def handle_sigint(signum: Any, frame: Any) -> None:
"""Progress bar handler"""
done_event.set()
signal.signal(signal.SIGINT, handle_sigint)
class DownloadProgressBar():
"""
Object to manage Download process with Progress Bar from Rich
"""
def __init__(self) -> None:
"""
Class Constructor
"""
self.progress = Progress(
TextColumn("💾 Downloading [bold blue]{task.fields[filename]}", justify="right"),
BarColumn(bar_width=None),
"[progress.percentage]{task.percentage:>3.1f}%",
"",
TransferSpeedColumn(),
"",
DownloadColumn(),
"",
TimeElapsedColumn(),
"",
console=console
)
def _copy_url(self, task_id: TaskID, url: str, path: str, block_size: int = 1024) -> bool:
"""Copy data from a url to a local file."""
response = requests.get(url, stream=True, timeout=5)
# This will break if the response doesn't contain content length
self.progress.update(task_id, total=int(response.headers['Content-Length']))
with open(path, "wb") as dest_file:
self.progress.start_task(task_id)
for data in response.iter_content(chunk_size=block_size):
dest_file.write(data)
self.progress.update(task_id, advance=len(data))
if done_event.is_set():
return True
# console.print(f"Downloaded {path}")
return False
def download(self, urls: Iterable[str], dest_dir: str) -> None:
"""Download multuple files to the given directory."""
with self.progress:
with ThreadPoolExecutor(max_workers=4) as pool:
for url in urls:
filename = url.split("/")[-1].split('?')[0]
dest_path = os.path.join(dest_dir, filename)
task_id = self.progress.add_task("download", filename=filename, start=False)
pool.submit(self._copy_url, task_id, url, dest_path)

177
eos_downloader/eos.py Normal file
View file

@ -0,0 +1,177 @@
#!/usr/bin/python
# coding: utf-8 -*-
# flake8: noqa: F811
"""
Specific EOS inheritance from object_download
"""
import os
import xml.etree.ElementTree as ET
from typing import List, Union
import rich
from loguru import logger
from rich import console
from eos_downloader.models.version import BASE_BRANCH_STR, BASE_VERSION_STR, REGEX_EOS_VERSION, RTYPE_FEATURE, EosVersion
from eos_downloader.object_downloader import ObjectDownloader
# logger = logging.getLogger(__name__)
console = rich.get_console()
class EOSDownloader(ObjectDownloader):
"""
EOSDownloader Object to download EOS images from Arista.com website
Supercharge ObjectDownloader to support EOS specific actions
Parameters
----------
ObjectDownloader : ObjectDownloader
Base object
"""
eos_versions: Union[List[EosVersion], None] = None
@staticmethod
def _disable_ztp(file_path: str) -> None:
"""
_disable_ztp Method to disable ZTP in EOS image
Create a file in the EOS image to disable ZTP process during initial boot
Parameters
----------
file_path : str
Path where EOS image is located
"""
logger.info('Mounting volume to disable ZTP')
console.print('🚀 Mounting volume to disable ZTP')
raw_folder = os.path.join(file_path, "raw")
os.system(f"rm -rf {raw_folder}")
os.system(f"mkdir -p {raw_folder}")
os.system(
f'guestmount -a {os.path.join(file_path, "hda.qcow2")} -m /dev/sda2 {os.path.join(file_path, "raw")}')
ztp_file = os.path.join(file_path, 'raw/zerotouch-config')
with open(ztp_file, 'w', encoding='ascii') as zfile:
zfile.write('DISABLE=True')
logger.info(f'Unmounting volume in {file_path}')
os.system(f"guestunmount {os.path.join(file_path, 'raw')}")
os.system(f"rm -rf {os.path.join(file_path, 'raw')}")
logger.info(f"Volume has been successfully unmounted at {file_path}")
def _parse_xml_for_version(self,root_xml: ET.ElementTree, xpath: str = './/dir[@label="Active Releases"]/dir/dir/[@label]') -> List[EosVersion]:
"""
Extract list of available EOS versions from Arista.com website
Create a list of EosVersion object for all versions available on Arista.com
Args:
root_xml (ET.ElementTree): XML file with all versions available
xpath (str, optional): XPATH to use to extract EOS version. Defaults to './/dir[@label="Active Releases"]/dir/dir/[@label]'.
Returns:
List[EosVersion]: List of EosVersion representing all available EOS versions
"""
# XPATH: .//dir[@label="Active Releases"]/dir/dir/[@label]
if self.eos_versions is None:
logger.debug(f'Using xpath {xpath}')
eos_versions = []
for node in root_xml.findall(xpath):
if 'label' in node.attrib and node.get('label') is not None:
label = node.get('label')
if label is not None and REGEX_EOS_VERSION.match(label):
eos_version = EosVersion.from_str(label)
eos_versions.append(eos_version)
logger.debug(f"Found {label} - {eos_version}")
logger.debug(f'List of versions found on arista.com is: {eos_versions}')
self.eos_versions = eos_versions
else:
logger.debug('receiving instruction to download versions, but already available')
return self.eos_versions
def _get_branches(self, with_rtype: str = RTYPE_FEATURE) -> List[str]:
"""
Extract all EOS branches available from arista.com
Call self._parse_xml_for_version and then build list of available branches
Args:
rtype (str, optional): Release type to find. Can be M or F, default to F
Returns:
List[str]: A lsit of string that represent all availables EOS branches
"""
root = self._get_folder_tree()
versions = self._parse_xml_for_version(root_xml=root)
return list({version.branch for version in versions if version.rtype == with_rtype})
def latest_branch(self, rtype: str = RTYPE_FEATURE) -> EosVersion:
"""
Get latest branch from semver standpoint
Args:
rtype (str, optional): Release type to find. Can be M or F, default to F
Returns:
EosVersion: Latest Branch object
"""
selected_branch = EosVersion.from_str(BASE_BRANCH_STR)
for branch in self._get_branches(with_rtype=rtype):
branch = EosVersion.from_str(branch)
if branch > selected_branch:
selected_branch = branch
return selected_branch
def get_eos_versions(self, branch: Union[str,None] = None, rtype: Union[str,None] = None) -> List[EosVersion]:
"""
Get a list of available EOS version available on arista.com
If a branch is provided, only version in this branch are listed.
Otherwise, all versions are provided.
Args:
branch (str, optional): An EOS branch to filter. Defaults to None.
rtype (str, optional): Release type to find. Can be M or F, default to F
Returns:
List[EosVersion]: A list of versions available
"""
root = self._get_folder_tree()
result = []
for version in self._parse_xml_for_version(root_xml=root):
if branch is None and (version.rtype == rtype or rtype is None):
result.append(version)
elif branch is not None and version.is_in_branch(branch) and version.rtype == rtype:
result.append(version)
return result
def latest_eos(self, branch: Union[str,None] = None, rtype: str = RTYPE_FEATURE) -> EosVersion:
"""
Get latest version of EOS
If a branch is provided, only version in this branch are listed.
Otherwise, all versions are provided.
You can select what type of version to consider: M or F
Args:
branch (str, optional): An EOS branch to filter. Defaults to None.
rtype (str, optional): An EOS version type to filter, Can be M or F. Defaults to None.
Returns:
EosVersion: latest version selected
"""
selected_version = EosVersion.from_str(BASE_VERSION_STR)
if branch is None:
latest_branch = self.latest_branch(rtype=rtype)
else:
latest_branch = EosVersion.from_str(branch)
for version in self.get_eos_versions(branch=str(latest_branch.branch), rtype=rtype):
if version > selected_version:
if rtype is not None and version.rtype == rtype:
selected_version = version
if rtype is None:
selected_version = version
return selected_version

View file

View file

@ -0,0 +1,272 @@
#!/usr/bin/python
# coding: utf-8 -*-
"""Module for EOS version management"""
from __future__ import annotations
import re
import typing
from typing import Any, Optional
from loguru import logger
from pydantic import BaseModel
from eos_downloader.tools import exc_to_str
# logger = logging.getLogger(__name__)
BASE_VERSION_STR = '4.0.0F'
BASE_BRANCH_STR = '4.0'
RTYPE_FEATURE = 'F'
RTYPE_MAINTENANCE = 'M'
RTYPES = [RTYPE_FEATURE, RTYPE_MAINTENANCE]
# Regular Expression to capture multiple EOS version format
# 4.24
# 4.23.0
# 4.21.1M
# 4.28.10.F
# 4.28.6.1M
REGEX_EOS_VERSION = re.compile(r"^.*(?P<major>4)\.(?P<minor>\d{1,2})\.(?P<patch>\d{1,2})(?P<other>\.\d*)*(?P<rtype>[M,F])*$")
REGEX_EOS_BRANCH = re.compile(r"^.*(?P<major>4)\.(?P<minor>\d{1,2})(\.?P<patch>\d)*(\.\d)*(?P<rtype>[M,F])*$")
class EosVersion(BaseModel):
"""
EosVersion object to play with version management in code
Since EOS is not using strictly semver approach, this class mimic some functions from semver lib for Arista EOS versions
It is based on Pydantic and provides helpers for comparison:
Examples:
>>> eos_version_str = '4.23.2F'
>>> eos_version = EosVersion.from_str(eos_version_str)
>>> print(f'str representation is: {str(eos_version)}')
str representation is: 4.23.2F
>>> other_version = EosVersion.from_str(other_version_str)
>>> print(f'eos_version < other_version: {eos_version < other_version}')
eos_version < other_version: True
>>> print(f'Is eos_version match("<=4.23.3M"): {eos_version.match("<=4.23.3M")}')
Is eos_version match("<=4.23.3M"): True
>>> print(f'Is eos_version in branch 4.23: {eos_version.is_in_branch("4.23.0")}')
Is eos_version in branch 4.23: True
Args:
BaseModel (Pydantic): Pydantic Base Model
"""
major: int = 4
minor: int = 0
patch: int = 0
rtype: Optional[str] = 'F'
other: Any
@classmethod
def from_str(cls, eos_version: str) -> EosVersion:
"""
Class constructor from a string representing EOS version
Use regular expresion to extract fields from string.
It supports following formats:
- 4.24
- 4.23.0
- 4.21.1M
- 4.28.10.F
- 4.28.6.1M
Args:
eos_version (str): EOS version in str format
Returns:
EosVersion object
"""
logger.debug(f'receiving version: {eos_version}')
if REGEX_EOS_VERSION.match(eos_version):
matches = REGEX_EOS_VERSION.match(eos_version)
# assert matches is not None
assert matches is not None
return cls(**matches.groupdict())
if REGEX_EOS_BRANCH.match(eos_version):
matches = REGEX_EOS_BRANCH.match(eos_version)
# assert matches is not None
assert matches is not None
return cls(**matches.groupdict())
logger.error(f'Error occured with {eos_version}')
return EosVersion()
@property
def branch(self) -> str:
"""
Extract branch of version
Returns:
str: branch from version
"""
return f'{self.major}.{self.minor}'
def __str__(self) -> str:
"""
Standard str representation
Return string for EOS version like 4.23.3M
Returns:
str: A standard EOS version string representing <MAJOR>.<MINOR>.<PATCH><RTYPE>
"""
if self.other is None:
return f'{self.major}.{self.minor}.{self.patch}{self.rtype}'
return f'{self.major}.{self.minor}.{self.patch}{self.other}{self.rtype}'
def _compare(self, other: EosVersion) -> float:
"""
An internal comparison function to compare 2 EosVersion objects
Do a deep comparison from Major to Release Type
The return value is
- negative if ver1 < ver2,
- zero if ver1 == ver2
- strictly positive if ver1 > ver2
Args:
other (EosVersion): An EosVersion to compare with this object
Raises:
ValueError: Raise ValueError if input is incorrect type
Returns:
float: -1 if ver1 < ver2, 0 if ver1 == ver2, 1 if ver1 > ver2
"""
if not isinstance(other, EosVersion):
raise ValueError(f'could not compare {other} as it is not an EosVersion object')
comparison_flag: float = 0
logger.warning(f'current version {self.__str__()} - other {str(other)}') # pylint: disable = unnecessary-dunder-call
for key, _ in self.dict().items():
if comparison_flag == 0 and self.dict()[key] is None or other.dict()[key] is None:
logger.debug(f'{key}: local None - remote None')
logger.debug(f'{key}: local {self.dict()} - remote {other.dict()}')
return comparison_flag
logger.debug(f'{key}: local {self.dict()[key]} - remote {other.dict()[key]}')
if comparison_flag == 0 and self.dict()[key] < other.dict()[key]:
comparison_flag = -1
if comparison_flag == 0 and self.dict()[key] > other.dict()[key]:
comparison_flag = 1
if comparison_flag != 0:
logger.info(f'comparison result is {comparison_flag}')
return comparison_flag
logger.info(f'comparison result is {comparison_flag}')
return comparison_flag
@typing.no_type_check
def __eq__(self, other):
""" Implement __eq__ function (==) """
return self._compare(other) == 0
@typing.no_type_check
def __ne__(self, other):
# type: ignore
""" Implement __nw__ function (!=) """
return self._compare(other) != 0
@typing.no_type_check
def __lt__(self, other):
# type: ignore
""" Implement __lt__ function (<) """
return self._compare(other) < 0
@typing.no_type_check
def __le__(self, other):
# type: ignore
""" Implement __le__ function (<=) """
return self._compare(other) <= 0
@typing.no_type_check
def __gt__(self, other):
# type: ignore
""" Implement __gt__ function (>) """
return self._compare(other) > 0
@typing.no_type_check
def __ge__(self, other):
# type: ignore
""" Implement __ge__ function (>=) """
return self._compare(other) >= 0
def match(self, match_expr: str) -> bool:
"""
Compare self to match a match expression.
Example:
>>> eos_version.match("<=4.23.3M")
True
>>> eos_version.match("==4.23.3M")
False
Args:
match_expr (str): optional operator and version; valid operators are
``<`` smaller than
``>`` greater than
``>=`` greator or equal than
``<=`` smaller or equal than
``==`` equal
``!=`` not equal
Raises:
ValueError: If input has no match_expr nor match_ver
Returns:
bool: True if the expression matches the version, otherwise False
"""
prefix = match_expr[:2]
if prefix in (">=", "<=", "==", "!="):
match_version = match_expr[2:]
elif prefix and prefix[0] in (">", "<"):
prefix = prefix[0]
match_version = match_expr[1:]
elif match_expr and match_expr[0] in "0123456789":
prefix = "=="
match_version = match_expr
else:
raise ValueError(
"match_expr parameter should be in format <op><ver>, "
"where <op> is one of "
"['<', '>', '==', '<=', '>=', '!=']. "
f"You provided: {match_expr}"
)
logger.debug(f'work on comparison {prefix} with base release {match_version}')
possibilities_dict = {
">": (1,),
"<": (-1,),
"==": (0,),
"!=": (-1, 1),
">=": (0, 1),
"<=": (-1, 0),
}
possibilities = possibilities_dict[prefix]
cmp_res = self._compare(EosVersion.from_str(match_version))
return cmp_res in possibilities
def is_in_branch(self, branch_str: str) -> bool:
"""
Check if current version is part of a branch version
Comparison is done across MAJOR and MINOR
Args:
branch_str (str): a string for EOS branch. It supports following formats 4.23 or 4.23.0
Returns:
bool: True if current version is in provided branch, otherwise False
"""
try:
logger.debug(f'reading branch str:{branch_str}')
branch = EosVersion.from_str(branch_str)
except Exception as error: # pylint: disable = broad-exception-caught
logger.error(exc_to_str(error))
else:
return self.major == branch.major and self.minor == branch.minor
return False

View file

@ -0,0 +1,513 @@
#!/usr/bin/python
# coding: utf-8 -*-
# flake8: noqa: F811
# pylint: disable=too-many-instance-attributes
# pylint: disable=too-many-arguments
"""
eos_downloader class definition
"""
from __future__ import (absolute_import, division, print_function,
unicode_literals, annotations)
import base64
import glob
import hashlib
import json
import os
import sys
import xml.etree.ElementTree as ET
from typing import Union
import requests
import rich
from loguru import logger
from rich import console
from tqdm import tqdm
from eos_downloader import (ARISTA_DOWNLOAD_URL, ARISTA_GET_SESSION,
ARISTA_SOFTWARE_FOLDER_TREE, EVE_QEMU_FOLDER_PATH,
MSG_INVALID_DATA, MSG_TOKEN_EXPIRED)
from eos_downloader.data import DATA_MAPPING
from eos_downloader.download import DownloadProgressBar
# logger = logging.getLogger(__name__)
console = rich.get_console()
class ObjectDownloader():
"""
ObjectDownloader Generic Object to download from Arista.com
"""
def __init__(self, image: str, version: str, token: str, software: str = 'EOS', hash_method: str = 'md5sum'):
"""
__init__ Class constructor
generic class constructor
Parameters
----------
image : str
Type of image to download
version : str
Version of the package to download
token : str
Arista API token
software : str, optional
Package name to download (vEOS-lab, cEOS, EOS, ...), by default 'EOS'
hash_method : str, optional
Hash protocol to use to check download, by default 'md5sum'
"""
self.software = software
self.image = image
self._version = version
self.token = token
self.folder_level = 0
self.session_id = None
self.filename = self._build_filename()
self.hash_method = hash_method
self.timeout = 5
# Logging
logger.debug(f'Filename built by _build_filename is {self.filename}')
def __str__(self) -> str:
return f'{self.software} - {self.image} - {self.version}'
# def __repr__(self):
# return str(self.__dict__)
@property
def version(self) -> str:
"""Get version."""
return self._version
@version.setter
def version(self, value: str) -> None:
"""Set version."""
self._version = value
self.filename = self._build_filename()
# ------------------------------------------------------------------------ #
# Internal METHODS
# ------------------------------------------------------------------------ #
def _build_filename(self) -> str:
"""
_build_filename Helper to build filename to search on arista.com
Returns
-------
str:
Filename to search for on Arista.com
"""
logger.info('start build')
if self.software in DATA_MAPPING:
logger.info(f'software in data mapping: {self.software}')
if self.image in DATA_MAPPING[self.software]:
logger.info(f'image in data mapping: {self.image}')
return f"{DATA_MAPPING[self.software][self.image]['prepend']}-{self.version}{DATA_MAPPING[self.software][self.image]['extension']}"
return f"{DATA_MAPPING[self.software]['default']['prepend']}-{self.version}{DATA_MAPPING[self.software]['default']['extension']}"
raise ValueError(f'Incorrect value for software {self.software}')
def _parse_xml_for_path(self, root_xml: ET.ElementTree, xpath: str, search_file: str) -> str:
# sourcery skip: remove-unnecessary-cast
"""
_parse_xml Read and extract data from XML using XPATH
Get all interested nodes using XPATH and then get node that match search_file
Parameters
----------
root_xml : ET.ElementTree
XML document
xpath : str
XPATH expression to filter XML
search_file : str
Filename to search for
Returns
-------
str
File Path on Arista server side
"""
logger.debug(f'Using xpath {xpath}')
logger.debug(f'Search for file {search_file}')
console.print(f'🔎 Searching file {search_file}')
for node in root_xml.findall(xpath):
# logger.debug('Found {}', node.text)
if str(node.text).lower() == search_file.lower():
path = node.get('path')
console.print(f' -> Found file at {path}')
logger.info(f'Found {node.text} at {node.get("path")}')
return str(node.get('path')) if node.get('path') is not None else ''
logger.error(f'Requested file ({self.filename}) not found !')
return ''
def _get_hash(self, file_path: str) -> str:
"""
_get_hash Download HASH file from Arista server
Parameters
----------
file_path : str
Path of the HASH file
Returns
-------
str
Hash string read from HASH file downloaded from Arista.com
"""
remote_hash_file = self._get_remote_hashpath(hash_method=self.hash_method)
hash_url = self._get_url(remote_file_path=remote_hash_file)
# hash_downloaded = self._download_file_raw(url=hash_url, file_path=file_path + "/" + os.path.basename(remote_hash_file))
dl_rich_progress_bar = DownloadProgressBar()
dl_rich_progress_bar.download(urls=[hash_url], dest_dir=file_path)
hash_downloaded = f"{file_path}/{os.path.basename(remote_hash_file)}"
hash_content = 'unset'
with open(hash_downloaded, 'r', encoding='utf-8') as f:
hash_content = f.read()
return hash_content.split(' ')[0]
@staticmethod
def _compute_hash_md5sum(file: str, hash_expected: str) -> bool:
"""
_compute_hash_md5sum Compare MD5 sum
Do comparison between local md5 of the file and value provided by arista.com
Parameters
----------
file : str
Local file to use for MD5 sum
hash_expected : str
MD5 from arista.com
Returns
-------
bool
True if both are equal, False if not
"""
hash_md5 = hashlib.md5()
with open(file, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
if hash_md5.hexdigest() == hash_expected:
return True
logger.warning(f'Downloaded file is corrupt: local md5 ({hash_md5.hexdigest()}) is different to md5 from arista ({hash_expected})')
return False
@staticmethod
def _compute_hash_sh512sum(file: str, hash_expected: str) -> bool:
"""
_compute_hash_sh512sum Compare SHA512 sum
Do comparison between local sha512 of the file and value provided by arista.com
Parameters
----------
file : str
Local file to use for MD5 sum
hash_expected : str
SHA512 from arista.com
Returns
-------
bool
True if both are equal, False if not
"""
hash_sha512 = hashlib.sha512()
with open(file, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_sha512.update(chunk)
if hash_sha512.hexdigest() == hash_expected:
return True
logger.warning(f'Downloaded file is corrupt: local sha512 ({hash_sha512.hexdigest()}) is different to sha512 from arista ({hash_expected})')
return False
def _get_folder_tree(self) -> ET.ElementTree:
"""
_get_folder_tree Download XML tree from Arista server
Returns
-------
ET.ElementTree
XML document
"""
if self.session_id is None:
self.authenticate()
jsonpost = {'sessionCode': self.session_id}
result = requests.post(ARISTA_SOFTWARE_FOLDER_TREE, data=json.dumps(jsonpost), timeout=self.timeout)
try:
folder_tree = result.json()["data"]["xml"]
return ET.ElementTree(ET.fromstring(folder_tree))
except KeyError as error:
logger.error(MSG_INVALID_DATA)
logger.error(f'Server returned: {error}')
console.print(f'{MSG_INVALID_DATA}', style="bold red")
sys.exit(1)
def _get_remote_filepath(self) -> str:
"""
_get_remote_filepath Helper to get path of the file to download
Set XPATH and return result of _parse_xml for the file to download
Returns
-------
str
Remote path of the file to download
"""
root = self._get_folder_tree()
logger.debug("GET XML content from ARISTA.com")
xpath = f'.//dir[@label="{self.software}"]//file'
return self._parse_xml_for_path(root_xml=root, xpath=xpath, search_file=self.filename)
def _get_remote_hashpath(self, hash_method: str = 'md5sum') -> str:
"""
_get_remote_hashpath Helper to get path of the hash's file to download
Set XPATH and return result of _parse_xml for the file to download
Returns
-------
str
Remote path of the hash's file to download
"""
root = self._get_folder_tree()
logger.debug("GET XML content from ARISTA.com")
xpath = f'.//dir[@label="{self.software}"]//file'
return self._parse_xml_for_path(
root_xml=root,
xpath=xpath,
search_file=f'{self.filename}.{hash_method}',
)
def _get_url(self, remote_file_path: str) -> str:
"""
_get_url Get URL to use for downloading file from Arista server
Send remote_file_path to get correct URL to use for download
Parameters
----------
remote_file_path : str
Filepath from XML to use to get correct download link
Returns
-------
str
URL link to use for download
"""
if self.session_id is None:
self.authenticate()
jsonpost = {'sessionCode': self.session_id, 'filePath': remote_file_path}
result = requests.post(ARISTA_DOWNLOAD_URL, data=json.dumps(jsonpost), timeout=self.timeout)
if 'data' in result.json() and 'url' in result.json()['data']:
# logger.debug('URL to download file is: {}', result.json())
return result.json()["data"]["url"]
logger.critical(f'Server returns following message: {result.json()}')
return ''
@staticmethod
def _download_file_raw(url: str, file_path: str) -> str:
"""
_download_file Helper to download file from Arista.com
[extended_summary]
Parameters
----------
url : str
URL provided by server for remote_file_path
file_path : str
Location where to save local file
Returns
-------
str
File path
"""
chunkSize = 1024
r = requests.get(url, stream=True, timeout=5)
with open(file_path, 'wb') as f:
pbar = tqdm(unit="B", total=int(r.headers['Content-Length']), unit_scale=True, unit_divisor=1024)
for chunk in r.iter_content(chunk_size=chunkSize):
if chunk:
pbar.update(len(chunk))
f.write(chunk)
return file_path
def _download_file(self, file_path: str, filename: str, rich_interface: bool = True) -> Union[None, str]:
remote_file_path = self._get_remote_filepath()
logger.info(f'File found on arista server: {remote_file_path}')
file_url = self._get_url(remote_file_path=remote_file_path)
if file_url is not False:
if not rich_interface:
return self._download_file_raw(url=file_url, file_path=os.path.join(file_path, filename))
rich_downloader = DownloadProgressBar()
rich_downloader.download(urls=[file_url], dest_dir=file_path)
return os.path.join(file_path, filename)
logger.error(f'Cannot download file {file_path}')
return None
@staticmethod
def _create_destination_folder(path: str) -> None:
# os.makedirs(path, mode, exist_ok=True)
os.system(f'mkdir -p {path}')
@staticmethod
def _disable_ztp(file_path: str) -> None:
pass
# ------------------------------------------------------------------------ #
# Public METHODS
# ------------------------------------------------------------------------ #
def authenticate(self) -> bool:
"""
authenticate Authenticate user on Arista.com server
Send API token and get a session-id from remote server.
Session-id will be used by all other functions.
Returns
-------
bool
True if authentication succeeds=, False in all other situations.
"""
credentials = (base64.b64encode(self.token.encode())).decode("utf-8")
session_code_url = ARISTA_GET_SESSION
jsonpost = {'accessToken': credentials}
result = requests.post(session_code_url, data=json.dumps(jsonpost), timeout=self.timeout)
if result.json()["status"]["message"] in[ 'Access token expired', 'Invalid access token']:
console.print(f'{MSG_TOKEN_EXPIRED}', style="bold red")
logger.error(MSG_TOKEN_EXPIRED)
return False
try:
if 'data' in result.json():
self.session_id = result.json()["data"]["session_code"]
logger.info('Authenticated on arista.com')
return True
logger.debug(f'{result.json()}')
return False
except KeyError as error_arista:
logger.error(f'Error: {error_arista}')
sys.exit(1)
def download_local(self, file_path: str, checksum: bool = False) -> bool:
# sourcery skip: move-assign
"""
download_local Entrypoint for local download feature
Do local downnload feature:
- Get remote file path
- Get URL from Arista.com
- Download file
- Do HASH comparison (optional)
Parameters
----------
file_path : str
Local path to save downloaded file
checksum : bool, optional
Execute checksum or not, by default False
Returns
-------
bool
True if everything went well, False if any problem appears
"""
file_downloaded = str(self._download_file(file_path=file_path, filename=self.filename))
# Check file HASH
hash_result = False
if checksum:
logger.info('🚀 Running checksum validation')
console.print('🚀 Running checksum validation')
if self.hash_method == 'md5sum':
hash_expected = self._get_hash(file_path=file_path)
hash_result = self._compute_hash_md5sum(file=file_downloaded, hash_expected=hash_expected)
elif self.hash_method == 'sha512sum':
hash_expected = self._get_hash(file_path=file_path)
hash_result = self._compute_hash_sh512sum(file=file_downloaded, hash_expected=hash_expected)
if not hash_result:
logger.error('Downloaded file is corrupted, please check your connection')
console.print('❌ Downloaded file is corrupted, please check your connection')
return False
logger.info('Downloaded file is correct.')
console.print('✅ Downloaded file is correct.')
return True
def provision_eve(self, noztp: bool = False, checksum: bool = True) -> None:
# pylint: disable=unused-argument
"""
provision_eve Entrypoint for EVE-NG download and provisioning
Do following actions:
- Get remote file path
- Get URL from file path
- Download file
- Convert file to qcow2 format
- Create new version to EVE-NG
- Disable ZTP (optional)
Parameters
----------
noztp : bool, optional
Flag to deactivate ZTP in EOS image, by default False
checksum : bool, optional
Flag to ask for hash validation, by default True
"""
# Build image name to use in folder path
eos_image_name = self.filename.rstrip(".vmdk").lower()
if noztp:
eos_image_name = f'{eos_image_name}-noztp'
# Create full path for EVE-NG
file_path = os.path.join(EVE_QEMU_FOLDER_PATH, eos_image_name.rstrip())
# Create folders in filesystem
self._create_destination_folder(path=file_path)
# Download file to local destination
file_downloaded = self._download_file(
file_path=file_path, filename=self.filename)
# Convert to QCOW2 format
file_qcow2 = os.path.join(file_path, "hda.qcow2")
logger.info('Converting VMDK to QCOW2 format')
console.print('🚀 Converting VMDK to QCOW2 format...')
os.system(f'$(which qemu-img) convert -f vmdk -O qcow2 {file_downloaded} {file_qcow2}')
logger.info('Applying unl_wrapper to fix permissions')
console.print('Applying unl_wrapper to fix permissions')
os.system('/opt/unetlab/wrappers/unl_wrapper -a fixpermissions')
os.system(f'rm -f {file_downloaded}')
if noztp:
self._disable_ztp(file_path=file_path)
def docker_import(self, image_name: str = "arista/ceos") -> None:
"""
Import docker container to your docker server.
Import downloaded container to your local docker engine.
Args:
version (str):
image_name (str, optional): Image name to use. Defaults to "arista/ceos".
"""
docker_image = f'{image_name}:{self.version}'
logger.info(f'Importing image {self.filename} to {docker_image}')
console.print(f'🚀 Importing image {self.filename} to {docker_image}')
os.system(f'$(which docker) import {self.filename} {docker_image}')
for filename in glob.glob(f'{self.filename}*'):
try:
os.remove(filename)
except FileNotFoundError:
console.print(f'File not found: {filename}')

13
eos_downloader/tools.py Normal file
View file

@ -0,0 +1,13 @@
#!/usr/bin/python
# coding: utf-8 -*-
"""Module for tools related to ardl"""
def exc_to_str(exception: Exception) -> str:
"""
Helper function to parse Exceptions
"""
return (
f"{type(exception).__name__}{f' ({str(exception)})' if str(exception) else ''}"
)