1
0
Fork 0

Merging upstream version 0.9.0.

Signed-off-by: Daniel Baumann <daniel@debian.org>
This commit is contained in:
Daniel Baumann 2025-02-05 13:50:07 +01:00
parent 857951d9ac
commit 161de8690e
Signed by: daniel
GPG key ID: FBB4F0E80A80222F
28 changed files with 1073 additions and 859 deletions

View file

@ -14,11 +14,11 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v3
uses: actions/checkout@v4
- name: Docker meta for TAG
id: meta
uses: docker/metadata-action@v4
uses: docker/metadata-action@v5
with:
images: |
${{ secrets.DOCKER_IMAGE }}
@ -27,20 +27,20 @@ jobs:
type=raw,value=${{ inputs.tag }}
- name: Login to DockerHub
uses: docker/login-action@v2
uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
- name: Login to GitHub Container Registry
uses: docker/login-action@v2
uses: docker/login-action@v3
with:
registry: ghcr.io
username: ${{ github.repository_owner }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Build and push
uses: docker/build-push-action@v4
uses: docker/build-push-action@v5
with:
context: .
file: Dockerfile
@ -54,11 +54,11 @@ jobs:
needs: [docker]
steps:
- name: Checkout
uses: actions/checkout@v3
uses: actions/checkout@v4
- name: Docker meta for TAG
id: meta
uses: docker/metadata-action@v4
uses: docker/metadata-action@v5
with:
images: |
${{ secrets.DOCKER_IMAGE }}
@ -67,20 +67,20 @@ jobs:
type=raw,value=${{ inputs.tag }}-dind
- name: Login to DockerHub
uses: docker/login-action@v2
uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
- name: Login to GitHub Container Registry
uses: docker/login-action@v2
uses: docker/login-action@v3
with:
registry: ghcr.io
username: ${{ github.repository_owner }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Build and push
uses: docker/build-push-action@v4
uses: docker/build-push-action@v5
with:
context: .
file: Dockerfile.docker

View file

@ -8,15 +8,29 @@ on:
types: [assigned, opened, synchronize, reopened]
jobs:
pre-commit:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.8", "3.9", "3.10"]
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
- uses: pre-commit-ci/lite-action@v1.0.1
compiling:
name: Run installation process and code compilation supported Python versions
runs-on: ubuntu-latest
needs: [pre-commit]
strategy:
matrix:
python-version: ["3.8", "3.9", "3.10"]
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
@ -44,7 +58,7 @@ jobs:
python: ["3.8", "3.9", "3.10"]
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v4
@ -67,7 +81,7 @@ jobs:
python: ["3.8", "3.9", "3.10"]
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v4
@ -90,7 +104,7 @@ jobs:
python: ["3.8", "3.9", "3.10"]
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v4

View file

@ -11,7 +11,7 @@ jobs:
# runs-on: ubuntu-latest
# steps:
# - name: Checkout code
# uses: actions/checkout@v3
# uses: actions/checkout@v4
# with:
# fetch-depth: 0
@ -36,7 +36,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v3
uses: actions/checkout@v4
- name: Install dependencies
run: |
@ -59,11 +59,11 @@ jobs:
needs: [pypi]
steps:
- name: Checkout
uses: actions/checkout@v3
uses: actions/checkout@v4
- name: Docker meta for TAG
id: meta
uses: docker/metadata-action@v4
uses: docker/metadata-action@v5
with:
images: |
${{ secrets.DOCKER_IMAGE }}
@ -73,20 +73,20 @@ jobs:
type=raw,value=latest
- name: Login to DockerHub
uses: docker/login-action@v2
uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
- name: Login to GitHub Container Registry
uses: docker/login-action@v2
uses: docker/login-action@v3
with:
registry: ghcr.io
username: ${{ github.repository_owner }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Build and push
uses: docker/build-push-action@v4
uses: docker/build-push-action@v5
with:
context: .
file: Dockerfile
@ -100,11 +100,11 @@ jobs:
needs: [docker]
steps:
- name: Checkout
uses: actions/checkout@v3
uses: actions/checkout@v4
- name: Docker meta for TAG
id: meta
uses: docker/metadata-action@v4
uses: docker/metadata-action@v5
with:
images: |
${{ secrets.DOCKER_IMAGE }}
@ -114,20 +114,20 @@ jobs:
type=raw,value=latest-dind
- name: Login to DockerHub
uses: docker/login-action@v2
uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
- name: Login to GitHub Container Registry
uses: docker/login-action@v2
uses: docker/login-action@v3
with:
registry: ghcr.io
username: ${{ github.repository_owner }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Build and push
uses: docker/build-push-action@v4
uses: docker/build-push-action@v5
with:
context: .
file: Dockerfile.docker

64
.pre-commit-config.yaml Normal file
View file

@ -0,0 +1,64 @@
---
# See https://pre-commit.com for more information
# See https://pre-commit.com/hooks.html for more hooks
files: ^(eos_downloader)/
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.4.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
- id: check-added-large-files
- id: check-merge-conflict
# - repo: https://github.com/pycqa/isort
# rev: 5.12.0
# hooks:
# - id: isort
# name: Check for changes when running isort on all python files
- repo: https://github.com/psf/black
rev: 23.7.0
hooks:
- id: black
name: Check for changes when running Black on all python files
- repo: https://github.com/pycqa/flake8
rev: 6.0.0
hooks:
- id: flake8
name: Check for PEP8 error on Python files
args:
- --config=/dev/null
- --max-line-length=165
- repo: local # as per https://pylint.pycqa.org/en/latest/user_guide/installation/pre-commit-integration.html
hooks:
- id: pylint
entry: pylint
language: python
name: Check for Linting error on Python files
description: This hook runs pylint.
types: [python]
args:
- -rn # Only display messages
- -sn # Don't display the score
- --rcfile=pylintrc # Link to config file
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.4.1
hooks:
- id: mypy
args:
- --config-file=pyproject.toml
additional_dependencies:
- "click==8.1.3"
- "click-help-colors==0.9.1"
- "pydantic~=2.0"
- "PyYAML==6.0"
- "requests>=2.27"
- "rich~=13.4"
- types-paramiko
- types-requests
files: eos_downloader

View file

@ -1,4 +1,13 @@
[![code-testing](https://github.com/titom73/eos-downloader/actions/workflows/pr-management.yml/badge.svg?event=push)](https://github.com/titom73/eos-downloader/actions/workflows/pr-management.yml) ![PyPI - Python Version](https://img.shields.io/pypi/pyversions/eos-downloader) ![GitHub release (latest SemVer)](https://img.shields.io/github/v/release/titom73/arista-downloader) ![PyPI - Downloads/month](https://img.shields.io/pypi/dm/eos-downloader)
[![tests](https://github.com/titom73/eos-downloader/actions/workflows/pr-management.yml/badge.svg?event=push)](https://github.com/titom73/eos-downloader/actions/workflows/pr-management.yml)
![PyPI - Python Version](https://img.shields.io/pypi/pyversions/eos-downloader)
[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black)
![Checked with mypy](http://www.mypy-lang.org/static/mypy_badge.svg)
![GitHub release](https://img.shields.io/github/v/release/titom73/arista-downloader)
![PyPI - Downloads/month](https://img.shields.io/pypi/dm/eos-downloader)
<!--
[![pre-commit](https://img.shields.io/badge/pre--commit-enabled-brightgreen?logo=pre-commit&logoColor=white)](https://github.com/pre-commit/pre-commit)
!-->
# Arista Software Downloader
@ -19,6 +28,7 @@ Usage: ardl [OPTIONS] COMMAND [ARGS]...
Arista Network Download CLI
Options:
--version Show the version and exit.
--token TEXT Arista Token from your customer account [env var:
ARISTA_TOKEN]
--help Show this message and exit.
@ -26,7 +36,6 @@ Options:
Commands:
debug Debug commands to work with ardl
get Download Arista from Arista website
version Display version of ardl
```
> **Warning**
@ -35,10 +44,22 @@ Commands:
### Download EOS Package
> **Note**
> Supported packages are: EOS, cEOS, vEOS-lab, cEOS64
You can download EOS packages with following commands:
CLI gives an option to get latest version available. By default it takes latest `F` release
```bash
ardl get eos --image-type cEOS --latest
```
If you want to get latest M release, you can use `--release-type`:
```bash
ardl get eos --image-type cEOS --release-type M --latest
```
You can download a specific EOS packages with following commands:
```bash
# Example for a cEOS package
@ -164,7 +185,7 @@ tqdm
On EVE-NG, you may have to install/upgrade __pyOpenSSL__ in version `23.0.0`:
```
```bash
# Error when running ardl: AttributeError: module 'lib' has no attribute 'X509_V_FLAG_CB_ISSUER_CHECK'
$ pip install pyopenssl --upgrade

View file

@ -1,111 +0,0 @@
## scripts
These scripts are deprecated and will be removed in a futur version. Please prefer the use of the CLI implemented in the package.
### eos-download
```bash
usage: eos-download [-h]
--version VERSION
[--token TOKEN]
[--image IMAGE]
[--destination DESTINATION]
[--eve]
[--noztp]
[--import_docker]
[--docker_name DOCKER_NAME]
[--verbose VERBOSE]
[--log]
EOS downloader script.
optional arguments:
-h, --help show this help message and exit
--token TOKEN arista.com user API key - can use ENV:ARISTA_TOKEN
--image IMAGE Type of EOS image required
--version VERSION EOS version to download from website
--destination DESTINATION
Path where to save EOS package downloaded
--eve Option to install EOS package to EVE-NG
--noztp Option to deactivate ZTP when used with EVE-NG
--import_docker Option to import cEOS image to docker
--docker_name DOCKER_NAME
Docker image name to use
--verbose VERBOSE Script verbosity
--log Option to activate logging to eos-downloader.log file
```
- Token are read from `ENV:ARISTA_TOKEN` unless you specify a specific token with CLI.
- Supported platforms:
- `INT`: International version
- `64`: 64 bits version
- `2GB` for 2GB flash platform
- `2GB-INT`: for 2GB running International
- `vEOS`: Virtual EOS image
- `vEOS-lab`: Virtual Lab EOS
- `vEOS64-lab`: Virtual Lab EOS running 64B
- `cEOS`: Docker version of EOS
- `cEOS64`: Docker version of EOS running in 64 bits
#### Examples
- Download vEOS-lab image and install in EVE-NG
```bash
$ eos-download --image vEOS-lab --version 4.25.7M --eve --noztp
```
- Download Docker image
```bash
$ eos-download --image cEOS --version 4.27.1F
🪐 eos-downloader is starting...
- Image Type: cEOS
- Version: 4.27.2F
✅ Authenticated on arista.com
🔎 Searching file cEOS-lab-4.27.2F.tar.xz
-> Found file at /support/download/EOS-USA/Active Releases/4.27/EOS-4.27.2F/cEOS-lab/cEOS-lab-4.27.2F.tar.xz
💾 Downloading cEOS-lab-4.27.2F.tar.xz ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100.0% • 17.1 MB/s • 451.6/451.6 MB • 0:00:19 •
🚀 Running checksum validation
🔎 Searching file cEOS-lab-4.27.2F.tar.xz.sha512sum
-> Found file at /support/download/EOS-USA/Active
Releases/4.27/EOS-4.27.2F/cEOS-lab/cEOS-lab-4.27.2F.tar.xz.sha512sum
💾 Downloading cEOS-lab-4.27.2F.tar.xz.sha512sum ━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100.0% • ? • 154/154 bytes • 0:00:00 •
✅ Downloaded file is correct.
```
__Note:__ `ARISTA_TOKEN` should be set in your .profile and not set for each command. If not set, you can use `--token` knob.
```bash
# Export Token
export ARISTA_TOKEN="xxxxxxx"
```
### Cloudvision Image uploader
Create an image bundle on Cloudvision.
```bash
cvp-upload -h
usage: cvp-upload [-h]
[--token TOKEN]
[--image IMAGE]
--cloudvision CLOUDVISION
[--create_bundle]
[--timeout TIMEOUT]
[--verbose VERBOSE]
Cloudvision Image uploader script.
optional arguments:
-h, --help show this help message and exit
--token TOKEN CVP Authentication token - can use ENV:ARISTA_AVD_CV_TOKEN
--image IMAGE Type of EOS image required
--cloudvision CLOUDVISION
Cloudvision instance where to upload image
--create_bundle Option to create image bundle with new uploaded image
--timeout TIMEOUT Timeout connection. Default is set to 1200sec
--verbose VERBOSE Script verbosity
```

View file

@ -1,56 +0,0 @@
#!/usr/bin/python
import sys
import os
import argparse
from eos_downloader.cvp import CvFeatureManager, CvpAuthenticationItem
from loguru import logger
ARISTA_AVD_CV_TOKEN = os.getenv('ARISTA_AVD_CV_TOKEN', '')
def read_cli():
parser = argparse.ArgumentParser(description='Cloudvision Image uploader script.')
parser.add_argument('--token', required=False,
default=ARISTA_AVD_CV_TOKEN,
help='CVP Authentication token - can use ENV:ARISTA_AVD_CV_TOKEN')
parser.add_argument('--image', required=False,
default='EOS', help='Type of EOS image required')
parser.add_argument('--cloudvision', required=True,
help='Cloudvision instance where to upload image')
parser.add_argument('--create_bundle', required=False, action='store_true',
help="Option to create image bundle with new uploaded image")
parser.add_argument('--timeout', required=False,
default=1200,
help='Timeout connection. Default is set to 1200sec')
parser.add_argument('--verbose', required=False,
default='info', help='Script verbosity')
return parser.parse_args()
if __name__ == '__main__':
cli_options = read_cli()
logger.remove()
logger.add(sys.stderr, level=str(cli_options.verbose).upper())
cv_authentication = CvpAuthenticationItem(
server=cli_options.cloudvision,
token=cli_options.token,
port=443,
timeout=cli_options.timeout,
validate_cert=False
)
my_cvp_uploader = CvFeatureManager(authentication=cv_authentication)
result_upload = my_cvp_uploader.upload_image(cli_options.image)
if result_upload and cli_options.create_bundle:
bundle_name = os.path.basename(cli_options.image)
logger.info('Creating image bundle {}'.format(bundle_name))
my_cvp_uploader.create_bundle(
name=bundle_name,
images_name=[bundle_name]
)
sys.exit(0)

View file

@ -1,86 +0,0 @@
#!/usr/bin/python
import sys
import os
import argparse
import eos_downloader.eos
from loguru import logger
from rich.console import Console
ARISTA_TOKEN = os.getenv('ARISTA_TOKEN', '')
def read_cli():
parser = argparse.ArgumentParser(description='EOS downloader script.')
parser.add_argument('--token', required=False,
default=ARISTA_TOKEN,
help='arista.com user API key - can use ENV:ARISTA_TOKEN')
parser.add_argument('--image', required=False,
default='EOS', help='Type of EOS image required')
parser.add_argument('--version', required=True,
default='', help='EOS version to download from website')
parser.add_argument('--destination', required=False,
default=str(os.getcwd()),
help='Path where to save EOS package downloaded')
parser.add_argument('--eve', required=False, action='store_true',
help="Option to install EOS package to EVE-NG")
parser.add_argument('--noztp', required=False, action='store_true',
help="Option to deactivate ZTP when used with EVE-NG")
parser.add_argument('--import_docker', required=False, action='store_true',
help="Option to import cEOS image to docker")
parser.add_argument('--docker_name', required=False,
default='arista/ceos',
help='Docker image name to use')
parser.add_argument('--verbose', required=False,
default='info', help='Script verbosity')
parser.add_argument('--log', required=False, action='store_true',
help="Option to activate logging to eos-downloader.log file")
return parser.parse_args()
if __name__ == '__main__':
cli_options = read_cli()
console = Console()
console.print('\n[red]WARNING: This script is now deprecated. Please use ardl cli instead[/red]\n\n')
if cli_options.token is None or cli_options.token == '':
console.print('\n❗ Token is unset ! Please configure ARISTA_TOKEN or use --token option', style="bold red")
sys.exit(1)
logger.remove()
if cli_options.log:
logger.add("eos-downloader.log", rotation="10 MB", level=str(cli_options.verbose).upper())
console.print("🪐 [bold blue]eos-downloader[/bold blue] is starting...", )
console.print(f' - Image Type: {cli_options.image}')
console.print(f' - Version: {cli_options.version}')
my_download = eos_downloader.eos.EOSDownloader(
image=cli_options.image,
software='EOS',
version=cli_options.version,
token=cli_options.token,
hash_method='sha512sum')
my_download.authenticate()
if cli_options.eve:
my_download.provision_eve(noztp=cli_options.noztp, checksum=True)
else:
my_download.download_local(file_path=cli_options.destination, checksum=True)
if cli_options.import_docker:
my_download.docker_import(
image_name=cli_options.docker_name
)
console.print('✅ processing done !')
sys.exit(0)

View file

@ -5,23 +5,31 @@
EOS Downloader module.
"""
from __future__ import (absolute_import, division,
print_function, unicode_literals, annotations)
import dataclasses
from typing import Any
import json
import importlib.metadata
from __future__ import (
absolute_import,
annotations,
division,
print_function,
unicode_literals,
)
__author__ = '@titom73'
__email__ = 'tom@inetsix.net'
__date__ = '2022-03-16'
import dataclasses
import importlib.metadata
import json
from typing import Any
__author__ = "@titom73"
__email__ = "tom@inetsix.net"
__date__ = "2022-03-16"
__version__ = importlib.metadata.version("eos-downloader")
# __all__ = ["CvpAuthenticationItem", "CvFeatureManager", "EOSDownloader", "ObjectDownloader", "reverse"]
ARISTA_GET_SESSION = "https://www.arista.com/custom_data/api/cvp/getSessionCode/"
ARISTA_SOFTWARE_FOLDER_TREE = "https://www.arista.com/custom_data/api/cvp/getFolderTree/"
ARISTA_SOFTWARE_FOLDER_TREE = (
"https://www.arista.com/custom_data/api/cvp/getFolderTree/"
)
ARISTA_DOWNLOAD_URL = "https://www.arista.com/custom_data/api/cvp/getDownloadLink/"
@ -36,11 +44,12 @@ check the Access Token. Then re-run the script with the correct token.
MSG_INVALID_DATA = """Invalid data returned by server
"""
EVE_QEMU_FOLDER_PATH = '/opt/unetlab/addons/qemu/'
EVE_QEMU_FOLDER_PATH = "/opt/unetlab/addons/qemu/"
class EnhancedJSONEncoder(json.JSONEncoder):
"""Custom JSon encoder."""
def default(self, o: Any) -> Any:
if dataclasses.is_dataclass(o):
return dataclasses.asdict(o)

View file

@ -11,49 +11,51 @@ ARDL CLI Baseline.
"""
import click
from rich.console import Console
import eos_downloader
from eos_downloader.cli.get import commands as get_commands
from eos_downloader import __version__
from eos_downloader.cli.debug import commands as debug_commands
from eos_downloader.cli.get import commands as get_commands
from eos_downloader.cli.info import commands as info_commands
from eos_downloader.cli.utils import AliasedGroup
@click.group()
@click.group(cls=AliasedGroup)
@click.version_option(__version__)
@click.pass_context
@click.option('--token', show_envvar=True, default=None, help='Arista Token from your customer account')
@click.option(
"--token",
show_envvar=True,
default=None,
help="Arista Token from your customer account",
)
def ardl(ctx: click.Context, token: str) -> None:
"""Arista Network Download CLI"""
ctx.ensure_object(dict)
ctx.obj['token'] = token
ctx.obj["token"] = token
@click.command()
def version() -> None:
"""Display version of ardl"""
console = Console()
console.print(f'ardl is running version {eos_downloader.__version__}')
@ardl.group(no_args_is_help=True)
@ardl.group(cls=AliasedGroup, no_args_is_help=True)
@click.pass_context
def get(ctx: click.Context) -> None:
def get(ctx: click.Context, cls: click.Group = AliasedGroup) -> None:
# pylint: disable=redefined-builtin
"""Download Arista from Arista website"""
@ardl.group(no_args_is_help=True)
@ardl.group(cls=AliasedGroup, no_args_is_help=True)
@click.pass_context
def info(ctx: click.Context) -> None:
def info(ctx: click.Context, cls: click.Group = AliasedGroup) -> None:
# pylint: disable=redefined-builtin
"""List information from Arista website"""
@ardl.group(no_args_is_help=True)
@ardl.group(cls=AliasedGroup, no_args_is_help=True)
@click.pass_context
def debug(ctx: click.Context) -> None:
def debug(ctx: click.Context, cls: click.Group = AliasedGroup) -> None:
# pylint: disable=redefined-builtin
"""Debug commands to work with ardl"""
# ANTA CLI Execution
@ -64,13 +66,9 @@ def cli() -> None:
get.add_command(get_commands.cvp)
info.add_command(info_commands.eos_versions)
debug.add_command(debug_commands.xml)
ardl.add_command(version)
# Load CLI
ardl(
obj={},
auto_envvar_prefix='arista'
)
ardl(obj={}, auto_envvar_prefix="arista")
if __name__ == '__main__':
if __name__ == "__main__":
cli()

View file

@ -22,32 +22,51 @@ import eos_downloader.eos
@click.command()
@click.pass_context
@click.option('--output', default=str('arista.xml'), help='Path to save XML file', type=click.Path(), show_default=True)
@click.option('--log-level', '--log', help='Logging level of the command', default=None, type=click.Choice(['debug', 'info', 'warning', 'error', 'critical'], case_sensitive=False))
@click.option(
"--output",
default=str("arista.xml"),
help="Path to save XML file",
type=click.Path(),
show_default=True,
)
@click.option(
"--log-level",
"--log",
help="Logging level of the command",
default=None,
type=click.Choice(
["debug", "info", "warning", "error", "critical"], case_sensitive=False
),
)
def xml(ctx: click.Context, output: str, log_level: str) -> None:
# sourcery skip: remove-unnecessary-cast
"""Extract XML directory structure"""
console = Console()
# Get from Context
token = ctx.obj['token']
token = ctx.obj["token"]
logger.remove()
if log_level is not None:
logger.add("eos-downloader.log", rotation="10 MB", level=log_level.upper())
my_download = eos_downloader.eos.EOSDownloader(
image='unset',
software='EOS',
version='unset',
image="unset",
software="EOS",
version="unset",
token=token,
hash_method='sha512sum')
hash_method="sha512sum",
)
my_download.authenticate()
xml_object: ET.ElementTree = my_download._get_folder_tree() # pylint: disable=protected-access
xml_object: ET.ElementTree = (
my_download.get_folder_tree()
) # pylint: disable=protected-access
xml_content = xml_object.getroot()
xmlstr = minidom.parseString(ET.tostring(xml_content)).toprettyxml(indent=" ", newl='')
with open(output, "w", encoding='utf-8') as f:
xmlstr = minidom.parseString(ET.tostring(xml_content)).toprettyxml(
indent=" ", newl=""
)
with open(output, "w", encoding="utf-8") as f:
f.write(str(xmlstr))
console.print(f'XML file saved in: { output }')
console.print(f"XML file saved in: { output }")

View file

@ -21,68 +21,156 @@ from rich.console import Console
import eos_downloader.eos
from eos_downloader.models.version import BASE_VERSION_STR, RTYPE_FEATURE, RTYPES
EOS_IMAGE_TYPE = ['64', 'INT', '2GB-INT', 'cEOS', 'cEOS64', 'vEOS', 'vEOS-lab', 'EOS-2GB', 'default']
CVP_IMAGE_TYPE = ['ova', 'rpm', 'kvm', 'upgrade']
EOS_IMAGE_TYPE = [
"64",
"INT",
"2GB-INT",
"cEOS",
"cEOS64",
"vEOS",
"vEOS-lab",
"EOS-2GB",
"default",
]
CVP_IMAGE_TYPE = ["ova", "rpm", "kvm", "upgrade"]
@click.command(no_args_is_help=True)
@click.pass_context
@click.option('--image-type', default='default', help='EOS Image type', type=click.Choice(EOS_IMAGE_TYPE), required=True)
@click.option('--version', default=None, help='EOS version', type=str, required=False)
@click.option('--latest', '-l', is_flag=True, type=click.BOOL, default=False, help='Get latest version in given branch. If --branch is not use, get the latest branch with specific release type')
@click.option('--release-type', '-rtype', type=click.Choice(RTYPES, case_sensitive=False), default=RTYPE_FEATURE, help='EOS release type to search')
@click.option('--branch', '-b', type=click.STRING, default=None, help='EOS Branch to list releases')
@click.option('--docker-name', default='arista/ceos', help='Docker image name (default: arista/ceos)', type=str, show_default=True)
@click.option('--output', default=str(os.path.relpath(os.getcwd(), start=os.curdir)), help='Path to save image', type=click.Path(),show_default=True)
@click.option(
"--image-type",
default="default",
help="EOS Image type",
type=click.Choice(EOS_IMAGE_TYPE),
required=True,
)
@click.option("--version", default=None, help="EOS version", type=str, required=False)
@click.option(
"--latest",
"-l",
is_flag=True,
type=click.BOOL,
default=False,
help="Get latest version in given branch. If --branch is not use, get the latest branch with specific release type",
)
@click.option(
"--release-type",
"-rtype",
type=click.Choice(RTYPES, case_sensitive=False),
default=RTYPE_FEATURE,
help="EOS release type to search",
)
@click.option(
"--branch",
"-b",
type=click.STRING,
default=None,
help="EOS Branch to list releases",
)
@click.option(
"--docker-name",
default="arista/ceos",
help="Docker image name (default: arista/ceos)",
type=str,
show_default=True,
)
@click.option(
"--output",
default=str(os.path.relpath(os.getcwd(), start=os.curdir)),
help="Path to save image",
type=click.Path(),
show_default=True,
)
# Debugging
@click.option('--log-level', '--log', help='Logging level of the command', default=None, type=click.Choice(['debug', 'info', 'warning', 'error', 'critical'], case_sensitive=False))
@click.option(
"--log-level",
"--log",
help="Logging level of the command",
default=None,
type=click.Choice(
["debug", "info", "warning", "error", "critical"], case_sensitive=False
),
)
# Boolean triggers
@click.option('--eve-ng', is_flag=True, help='Run EVE-NG vEOS provisioning (only if CLI runs on an EVE-NG server)', default=False)
@click.option('--disable-ztp', is_flag=True, help='Disable ZTP process in vEOS image (only available with --eve-ng)', default=False)
@click.option('--import-docker', is_flag=True, help='Import docker image (only available with --image_type cEOSlab)', default=False)
@click.option(
"--eve-ng",
is_flag=True,
help="Run EVE-NG vEOS provisioning (only if CLI runs on an EVE-NG server)",
default=False,
)
@click.option(
"--disable-ztp",
is_flag=True,
help="Disable ZTP process in vEOS image (only available with --eve-ng)",
default=False,
)
@click.option(
"--import-docker",
is_flag=True,
help="Import docker image (only available with --image_type cEOSlab)",
default=False,
)
def eos(
ctx: click.Context, image_type: str, output: str, log_level: str, eve_ng: bool, disable_ztp: bool,
import_docker: bool, docker_name: str, version: Union[str, None] = None, release_type: str = RTYPE_FEATURE,
latest: bool = False, branch: Union[str,None] = None
ctx: click.Context,
image_type: str,
output: str,
log_level: str,
eve_ng: bool,
disable_ztp: bool,
import_docker: bool,
docker_name: str,
version: Union[str, None] = None,
release_type: str = RTYPE_FEATURE,
latest: bool = False,
branch: Union[str, None] = None,
) -> int:
"""Download EOS image from Arista website"""
console = Console()
# Get from Context
token = ctx.obj['token']
if token is None or token == '':
console.print('❗ Token is unset ! Please configure ARISTA_TOKEN or use --token option', style="bold red")
token = ctx.obj["token"]
if token is None or token == "":
console.print(
"❗ Token is unset ! Please configure ARISTA_TOKEN or use --token option",
style="bold red",
)
sys.exit(1)
logger.remove()
if log_level is not None:
logger.add("eos-downloader.log", rotation="10 MB", level=log_level.upper())
console.print("🪐 [bold blue]eos-downloader[/bold blue] is starting...", )
console.print(f' - Image Type: {image_type}')
console.print(f' - Version: {version}')
console.print(
"🪐 [bold blue]eos-downloader[/bold blue] is starting...",
)
console.print(f" - Image Type: {image_type}")
console.print(f" - Version: {version}")
if version is not None:
my_download = eos_downloader.eos.EOSDownloader(
image=image_type,
software='EOS',
software="EOS",
version=version,
token=token,
hash_method='sha512sum')
hash_method="sha512sum",
)
my_download.authenticate()
elif latest:
my_download = eos_downloader.eos.EOSDownloader(
image=image_type,
software='EOS',
version='unset',
software="EOS",
version="unset",
token=token,
hash_method='sha512sum')
hash_method="sha512sum",
)
my_download.authenticate()
if branch is None:
branch = str(my_download.latest_branch(rtype=release_type).branch)
latest_version = my_download.latest_eos(branch, rtype=release_type)
if str(latest_version) == BASE_VERSION_STR:
console.print(f'[red]Error[/red], cannot find any version in {branch} for {release_type} release type')
console.print(
f"[red]Error[/red], cannot find any version in {branch} for {release_type} release type"
)
sys.exit(1)
my_download.version = str(latest_version)
@ -92,46 +180,71 @@ def eos(
my_download.download_local(file_path=output, checksum=True)
if import_docker:
my_download.docker_import(
image_name=docker_name
)
console.print('✅ processing done !')
my_download.docker_import(image_name=docker_name)
console.print("✅ processing done !")
sys.exit(0)
@click.command(no_args_is_help=True)
@click.pass_context
@click.option('--format', default='upgrade', help='CVP Image type', type=click.Choice(CVP_IMAGE_TYPE), required=True)
@click.option('--version', default=None, help='CVP version', type=str, required=True)
@click.option('--output', default=str(os.path.relpath(os.getcwd(), start=os.curdir)), help='Path to save image', type=click.Path(),show_default=True)
@click.option('--log-level', '--log', help='Logging level of the command', default=None, type=click.Choice(['debug', 'info', 'warning', 'error', 'critical'], case_sensitive=False))
def cvp(ctx: click.Context, version: str, format: str, output: str, log_level: str) -> int:
@click.option(
"--format",
default="upgrade",
help="CVP Image type",
type=click.Choice(CVP_IMAGE_TYPE),
required=True,
)
@click.option("--version", default=None, help="CVP version", type=str, required=True)
@click.option(
"--output",
default=str(os.path.relpath(os.getcwd(), start=os.curdir)),
help="Path to save image",
type=click.Path(),
show_default=True,
)
@click.option(
"--log-level",
"--log",
help="Logging level of the command",
default=None,
type=click.Choice(
["debug", "info", "warning", "error", "critical"], case_sensitive=False
),
)
def cvp(
ctx: click.Context, version: str, format: str, output: str, log_level: str
) -> int:
"""Download CVP image from Arista website"""
console = Console()
# Get from Context
token = ctx.obj['token']
if token is None or token == '':
console.print('❗ Token is unset ! Please configure ARISTA_TOKEN or use --token option', style="bold red")
token = ctx.obj["token"]
if token is None or token == "":
console.print(
"❗ Token is unset ! Please configure ARISTA_TOKEN or use --token option",
style="bold red",
)
sys.exit(1)
logger.remove()
if log_level is not None:
logger.add("eos-downloader.log", rotation="10 MB", level=log_level.upper())
console.print("🪐 [bold blue]eos-downloader[/bold blue] is starting...", )
console.print(f' - Image Type: {format}')
console.print(f' - Version: {version}')
console.print(
"🪐 [bold blue]eos-downloader[/bold blue] is starting...",
)
console.print(f" - Image Type: {format}")
console.print(f" - Version: {version}")
my_download = eos_downloader.eos.EOSDownloader(
image=format,
software='CloudVision',
software="CloudVision",
version=version,
token=token,
hash_method='md5sum')
hash_method="md5sum",
)
my_download.authenticate()
my_download.download_local(file_path=output, checksum=False)
console.print('✅ processing done !')
console.print("✅ processing done !")
sys.exit(0)

View file

@ -24,12 +24,53 @@ from eos_downloader.models.version import BASE_VERSION_STR, RTYPE_FEATURE, RTYPE
@click.command(no_args_is_help=True)
@click.pass_context
@click.option('--latest', '-l', is_flag=True, type=click.BOOL, default=False, help='Get latest version in given branch. If --branch is not use, get the latest branch with specific release type')
@click.option('--release-type', '-rtype', type=click.Choice(RTYPES, case_sensitive=False), default=RTYPE_FEATURE, help='EOS release type to search')
@click.option('--branch', '-b', type=click.STRING, default=None, help='EOS Branch to list releases')
@click.option('--verbose', '-v', is_flag=True, type=click.BOOL, default=False, help='Human readable output. Default is none to use output in script)')
@click.option('--log-level', '--log', help='Logging level of the command', default='warning', type=click.Choice(['debug', 'info', 'warning', 'error', 'critical'], case_sensitive=False))
def eos_versions(ctx: click.Context, log_level: str, branch: Union[str,None] = None, release_type: str = RTYPE_FEATURE, latest: bool = False, verbose: bool = False) -> None:
@click.option(
"--latest",
"-l",
is_flag=True,
type=click.BOOL,
default=False,
help="Get latest version in given branch. If --branch is not use, get the latest branch with specific release type",
)
@click.option(
"--release-type",
"-rtype",
type=click.Choice(RTYPES, case_sensitive=False),
default=RTYPE_FEATURE,
help="EOS release type to search",
)
@click.option(
"--branch",
"-b",
type=click.STRING,
default=None,
help="EOS Branch to list releases",
)
@click.option(
"--verbose",
"-v",
is_flag=True,
type=click.BOOL,
default=False,
help="Human readable output. Default is none to use output in script)",
)
@click.option(
"--log-level",
"--log",
help="Logging level of the command",
default="warning",
type=click.Choice(
["debug", "info", "warning", "error", "critical"], case_sensitive=False
),
)
def eos_versions(
ctx: click.Context,
log_level: str,
branch: Union[str, None] = None,
release_type: str = RTYPE_FEATURE,
latest: bool = False,
verbose: bool = False,
) -> None:
# pylint: disable = too-many-branches
"""
List Available EOS version on Arista.com website.
@ -42,22 +83,23 @@ def eos_versions(ctx: click.Context, log_level: str, branch: Union[str,None] = N
"""
console = Console()
# Get from Context
token = ctx.obj['token']
token = ctx.obj["token"]
logger.remove()
if log_level is not None:
logger.add("eos-downloader.log", rotation="10 MB", level=log_level.upper())
my_download = eos_downloader.eos.EOSDownloader(
image='unset',
software='EOS',
version='unset',
image="unset",
software="EOS",
version="unset",
token=token,
hash_method='sha512sum')
hash_method="sha512sum",
)
auth = my_download.authenticate()
if verbose and auth:
console.print('✅ Authenticated on arista.com')
console.print("✅ Authenticated on arista.com")
if release_type is not None:
release_type = release_type.upper()
@ -67,21 +109,27 @@ def eos_versions(ctx: click.Context, log_level: str, branch: Union[str,None] = N
branch = str(my_download.latest_branch(rtype=release_type).branch)
latest_version = my_download.latest_eos(branch, rtype=release_type)
if str(latest_version) == BASE_VERSION_STR:
console.print(f'[red]Error[/red], cannot find any version in {branch} for {release_type} release type')
console.print(
f"[red]Error[/red], cannot find any version in {branch} for {release_type} release type"
)
sys.exit(1)
if verbose:
console.print(f'Branch {branch} has been selected with release type {release_type}')
console.print(
f"Branch {branch} has been selected with release type {release_type}"
)
if branch is not None:
console.print(f'Latest release for {branch}: {latest_version}')
console.print(f"Latest release for {branch}: {latest_version}")
else:
console.print(f'Latest EOS release: {latest_version}')
console.print(f"Latest EOS release: {latest_version}")
else:
console.print(f'{ latest_version }')
console.print(f"{ latest_version }")
else:
versions = my_download.get_eos_versions(branch=branch, rtype=release_type)
if verbose:
console.print(f'List of available versions for {branch if branch is not None else "all branches"}')
console.print(
f'List of available versions for {branch if branch is not None else "all branches"}'
)
for version in versions:
console.print(f'{str(version)}')
console.print(f"{str(version)}")
else:
pprint([str(version) for version in versions])

View file

@ -0,0 +1,38 @@
#!/usr/bin/python
# coding: utf-8 -*-
# pylint: disable=inconsistent-return-statements
"""
Extension for the python ``click`` module
to provide a group or command with aliases.
"""
from typing import Any
import click
class AliasedGroup(click.Group):
"""
Implements a subclass of Group that accepts a prefix for a command.
If there were a command called push, it would accept pus as an alias (so long as it was unique)
"""
def get_command(self, ctx: click.Context, cmd_name: str) -> Any:
"""Documentation to build"""
rv = click.Group.get_command(self, ctx, cmd_name)
if rv is not None:
return rv
matches = [x for x in self.list_commands(ctx)
if x.startswith(cmd_name)]
if not matches:
return None
if len(matches) == 1:
return click.Group.get_command(self, ctx, matches[0])
ctx.fail(f"Too many matches: {', '.join(sorted(matches))}")
def resolve_command(self, ctx: click.Context, args: Any) -> Any:
"""Documentation to build"""
# always return the full command name
_, cmd, args = super().resolve_command(ctx, args)
return cmd.name, cmd, args

View file

@ -6,11 +6,12 @@ CVP Uploader content
"""
import os
from typing import List, Optional, Any
from dataclasses import dataclass
from loguru import logger
from typing import Any, List, Optional
from cvprac.cvp_client import CvpClient
from cvprac.cvp_client_errors import CvpLoginError
from loguru import logger
# from eos_downloader.tools import exc_to_str
@ -22,6 +23,7 @@ class CvpAuthenticationItem:
"""
Data structure to represent Cloudvision Authentication
"""
server: str
port: int = 443
token: Optional[str] = None
@ -29,15 +31,16 @@ class CvpAuthenticationItem:
validate_cert: bool = False
class Filer():
class Filer:
# pylint: disable=too-few-public-methods
"""
Filer Helper for file management
"""
def __init__(self, path: str) -> None:
self.file_exist = False
self.filename = ''
self.absolute_path = ''
self.filename = ""
self.absolute_path = ""
self.relative_path = path
if os.path.exists(path):
self.file_exist = True
@ -45,13 +48,14 @@ class Filer():
self.absolute_path = os.path.realpath(path)
def __repr__(self) -> str:
return self.absolute_path if self.file_exist else ''
return self.absolute_path if self.file_exist else ""
class CvFeatureManager():
class CvFeatureManager:
"""
CvFeatureManager Object to interect with Cloudvision
"""
def __init__(self, authentication: CvpAuthenticationItem) -> None:
"""
__init__ Class Creator
@ -86,19 +90,21 @@ class CvFeatureManager():
try:
client.connect(
nodes=[authentication.server],
username='',
password='',
username="",
password="",
api_token=authentication.token,
is_cvaas=True,
port=authentication.port,
cert=authentication.validate_cert,
request_timeout=authentication.timeout
request_timeout=authentication.timeout,
)
except CvpLoginError as error_data:
logger.error(f'Cannot connect to Cloudvision server {authentication.server}')
logger.debug(f'Error message: {error_data}')
logger.info('connected to Cloudvision server')
logger.debug(f'Connection info: {authentication}')
logger.error(
f"Cannot connect to Cloudvision server {authentication.server}"
)
logger.debug(f"Error message: {error_data}")
logger.info("connected to Cloudvision server")
logger.debug(f"Connection info: {authentication}")
return client
def __get_images(self) -> List[Any]:
@ -111,8 +117,8 @@ class CvFeatureManager():
Fact returned by Cloudvision
"""
images = []
logger.debug(' -> Collecting images')
images = self._cv_instance.api.get_images()['data']
logger.debug(" -> Collecting images")
images = self._cv_instance.api.get_images()["data"]
return images if self.__check_api_result(images) else []
# def __get_bundles(self):
@ -161,7 +167,11 @@ class CvFeatureManager():
bool
True if present
"""
return any(image_name == image['name'] for image in self._cv_images) if isinstance(self._cv_images, list) else False
return (
any(image_name == image["name"] for image in self._cv_images)
if isinstance(self._cv_images, list)
else False
)
def _does_bundle_exist(self, bundle_name: str) -> bool:
# pylint: disable=unused-argument
@ -192,19 +202,23 @@ class CvFeatureManager():
"""
image_item = Filer(path=image_path)
if image_item.file_exist is False:
logger.error(f'File not found: {image_item.relative_path}')
logger.error(f"File not found: {image_item.relative_path}")
return False
logger.info(f'File path for image: {image_item}')
logger.info(f"File path for image: {image_item}")
if self._does_image_exist(image_name=image_item.filename):
logger.error("Image found in Cloudvision , Please delete it before running this script")
logger.error(
"Image found in Cloudvision , Please delete it before running this script"
)
return False
try:
upload_result = self._cv_instance.api.add_image(filepath=image_item.absolute_path)
upload_result = self._cv_instance.api.add_image(
filepath=image_item.absolute_path
)
except Exception as e: # pylint: disable=broad-exception-caught
logger.error('An error occurred during upload, check CV connection')
logger.error(f'Exception message is: {e}')
logger.error("An error occurred during upload, check CV connection")
logger.error(f"Exception message is: {e}")
return False
logger.debug(f'Upload Result is : {upload_result}')
logger.debug(f"Upload Result is : {upload_result}")
return True
def build_image_list(self, image_list: List[str]) -> List[Any]:
@ -252,25 +266,30 @@ class CvFeatureManager():
bool
True if succeeds
"""
logger.debug(f'Init creation of an image bundle {name} with following images {images_name}')
logger.debug(
f"Init creation of an image bundle {name} with following images {images_name}"
)
all_images_present: List[bool] = []
self._cv_images = self.__get_images()
all_images_present.extend(
self._does_image_exist(image_name=image_name)
for image_name in images_name
self._does_image_exist(image_name=image_name) for image_name in images_name
)
# Bundle Create
if self._does_bundle_exist(bundle_name=name) is False:
logger.debug(f'Creating image bundle {name} with following images {images_name}')
logger.debug(
f"Creating image bundle {name} with following images {images_name}"
)
images_data = self.build_image_list(image_list=images_name)
if images_data is not None:
logger.debug('Images information: {images_data}')
logger.debug("Images information: {images_data}")
try:
data = self._cv_instance.api.save_image_bundle(name=name, images=images_data)
data = self._cv_instance.api.save_image_bundle(
name=name, images=images_data
)
except Exception as e: # pylint: disable=broad-exception-caught
logger.critical(f'{e}')
logger.critical(f"{e}")
else:
logger.debug(data)
return True
logger.critical('No data found for images')
logger.critical("No data found for images")
return False

View file

@ -12,82 +12,22 @@ Data are built from content of Arista XML file
# [platform][image][version]
DATA_MAPPING = {
"CloudVision": {
"ova": {
"extension": ".ova",
"prepend": "cvp",
"folder_level": 0
},
"rpm": {
"extension": "",
"prepend": "cvp-rpm-installer",
"folder_level": 0
},
"kvm": {
"extension": "-kvm.tgz",
"prepend": "cvp",
"folder_level": 0
},
"upgrade": {
"extension": ".tgz",
"prepend": "cvp-upgrade",
"folder_level": 0
},
"ova": {"extension": ".ova", "prepend": "cvp", "folder_level": 0},
"rpm": {"extension": "", "prepend": "cvp-rpm-installer", "folder_level": 0},
"kvm": {"extension": "-kvm.tgz", "prepend": "cvp", "folder_level": 0},
"upgrade": {"extension": ".tgz", "prepend": "cvp-upgrade", "folder_level": 0},
},
"EOS": {
"64": {
"extension": ".swi",
"prepend": "EOS64",
"folder_level": 0
"64": {"extension": ".swi", "prepend": "EOS64", "folder_level": 0},
"INT": {"extension": "-INT.swi", "prepend": "EOS", "folder_level": 1},
"2GB-INT": {"extension": "-INT.swi", "prepend": "EOS-2GB", "folder_level": 1},
"cEOS": {"extension": ".tar.xz", "prepend": "cEOS-lab", "folder_level": 0},
"cEOS64": {"extension": ".tar.xz", "prepend": "cEOS64-lab", "folder_level": 0},
"vEOS": {"extension": ".vmdk", "prepend": "vEOS", "folder_level": 0},
"vEOS-lab": {"extension": ".vmdk", "prepend": "vEOS-lab", "folder_level": 0},
"EOS-2GB": {"extension": ".swi", "prepend": "EOS-2GB", "folder_level": 0},
"RN": {"extension": "-", "prepend": "RN", "folder_level": 0},
"SOURCE": {"extension": "-source.tar", "prepend": "EOS", "folder_level": 0},
"default": {"extension": ".swi", "prepend": "EOS", "folder_level": 0},
},
"INT": {
"extension": "-INT.swi",
"prepend": "EOS",
"folder_level": 1
},
"2GB-INT": {
"extension": "-INT.swi",
"prepend": "EOS-2GB",
"folder_level": 1
},
"cEOS": {
"extension": ".tar.xz",
"prepend": "cEOS-lab",
"folder_level": 0
},
"cEOS64": {
"extension": ".tar.xz",
"prepend": "cEOS64-lab",
"folder_level": 0
},
"vEOS": {
"extension": ".vmdk",
"prepend": "vEOS",
"folder_level": 0
},
"vEOS-lab": {
"extension": ".vmdk",
"prepend": "vEOS-lab",
"folder_level": 0
},
"EOS-2GB": {
"extension": ".swi",
"prepend": "EOS-2GB",
"folder_level": 0
},
"RN": {
"extension": "-",
"prepend": "RN",
"folder_level": 0
},
"SOURCE": {
"extension": "-source.tar",
"prepend": "EOS",
"folder_level": 0
},
"default": {
"extension": ".swi",
"prepend": "EOS",
"folder_level": 0
}
}
}

View file

@ -8,13 +8,20 @@ import os.path
import signal
from concurrent.futures import ThreadPoolExecutor
from threading import Event
from typing import Iterable, Any
from typing import Any, Iterable
import requests
import rich
from rich import console
from rich.progress import (BarColumn, DownloadColumn, Progress, TaskID,
TextColumn, TimeElapsedColumn, TransferSpeedColumn)
from rich.progress import (
BarColumn,
DownloadColumn,
Progress,
TaskID,
TextColumn,
TimeElapsedColumn,
TransferSpeedColumn,
)
console = rich.get_console()
done_event = Event()
@ -28,7 +35,7 @@ def handle_sigint(signum: Any, frame: Any) -> None:
signal.signal(signal.SIGINT, handle_sigint)
class DownloadProgressBar():
class DownloadProgressBar:
"""
Object to manage Download process with Progress Bar from Rich
"""
@ -38,7 +45,9 @@ class DownloadProgressBar():
Class Constructor
"""
self.progress = Progress(
TextColumn("💾 Downloading [bold blue]{task.fields[filename]}", justify="right"),
TextColumn(
"💾 Downloading [bold blue]{task.fields[filename]}", justify="right"
),
BarColumn(bar_width=None),
"[progress.percentage]{task.percentage:>3.1f}%",
"",
@ -48,14 +57,16 @@ class DownloadProgressBar():
"",
TimeElapsedColumn(),
"",
console=console
console=console,
)
def _copy_url(self, task_id: TaskID, url: str, path: str, block_size: int = 1024) -> bool:
def _copy_url(
self, task_id: TaskID, url: str, path: str, block_size: int = 1024
) -> bool:
"""Copy data from a url to a local file."""
response = requests.get(url, stream=True, timeout=5)
# This will break if the response doesn't contain content length
self.progress.update(task_id, total=int(response.headers['Content-Length']))
self.progress.update(task_id, total=int(response.headers["Content-Length"]))
with open(path, "wb") as dest_file:
self.progress.start_task(task_id)
for data in response.iter_content(chunk_size=block_size):
@ -71,7 +82,9 @@ class DownloadProgressBar():
with self.progress:
with ThreadPoolExecutor(max_workers=4) as pool:
for url in urls:
filename = url.split("/")[-1].split('?')[0]
filename = url.split("/")[-1].split("?")[0]
dest_path = os.path.join(dest_dir, filename)
task_id = self.progress.add_task("download", filename=filename, start=False)
task_id = self.progress.add_task(
"download", filename=filename, start=False
)
pool.submit(self._copy_url, task_id, url, dest_path)

View file

@ -14,13 +14,20 @@ import rich
from loguru import logger
from rich import console
from eos_downloader.models.version import BASE_BRANCH_STR, BASE_VERSION_STR, REGEX_EOS_VERSION, RTYPE_FEATURE, EosVersion
from eos_downloader.models.version import (
BASE_BRANCH_STR,
BASE_VERSION_STR,
REGEX_EOS_VERSION,
RTYPE_FEATURE,
EosVersion,
)
from eos_downloader.object_downloader import ObjectDownloader
# logger = logging.getLogger(__name__)
console = rich.get_console()
class EOSDownloader(ObjectDownloader):
"""
EOSDownloader Object to download EOS images from Arista.com website
@ -47,22 +54,27 @@ class EOSDownloader(ObjectDownloader):
file_path : str
Path where EOS image is located
"""
logger.info('Mounting volume to disable ZTP')
console.print('🚀 Mounting volume to disable ZTP')
logger.info("Mounting volume to disable ZTP")
console.print("🚀 Mounting volume to disable ZTP")
raw_folder = os.path.join(file_path, "raw")
os.system(f"rm -rf {raw_folder}")
os.system(f"mkdir -p {raw_folder}")
os.system(
f'guestmount -a {os.path.join(file_path, "hda.qcow2")} -m /dev/sda2 {os.path.join(file_path, "raw")}')
ztp_file = os.path.join(file_path, 'raw/zerotouch-config')
with open(ztp_file, 'w', encoding='ascii') as zfile:
zfile.write('DISABLE=True')
logger.info(f'Unmounting volume in {file_path}')
f'guestmount -a {os.path.join(file_path, "hda.qcow2")} -m /dev/sda2 {os.path.join(file_path, "raw")}'
)
ztp_file = os.path.join(file_path, "raw/zerotouch-config")
with open(ztp_file, "w", encoding="ascii") as zfile:
zfile.write("DISABLE=True")
logger.info(f"Unmounting volume in {file_path}")
os.system(f"guestunmount {os.path.join(file_path, 'raw')}")
os.system(f"rm -rf {os.path.join(file_path, 'raw')}")
logger.info(f"Volume has been successfully unmounted at {file_path}")
def _parse_xml_for_version(self,root_xml: ET.ElementTree, xpath: str = './/dir[@label="Active Releases"]/dir/dir/[@label]') -> List[EosVersion]:
def _parse_xml_for_version(
self,
root_xml: ET.ElementTree,
xpath: str = './/dir[@label="Active Releases"]/dir/dir/[@label]',
) -> List[EosVersion]:
"""
Extract list of available EOS versions from Arista.com website
@ -77,19 +89,21 @@ class EOSDownloader(ObjectDownloader):
"""
# XPATH: .//dir[@label="Active Releases"]/dir/dir/[@label]
if self.eos_versions is None:
logger.debug(f'Using xpath {xpath}')
logger.debug(f"Using xpath {xpath}")
eos_versions = []
for node in root_xml.findall(xpath):
if 'label' in node.attrib and node.get('label') is not None:
label = node.get('label')
if "label" in node.attrib and node.get("label") is not None:
label = node.get("label")
if label is not None and REGEX_EOS_VERSION.match(label):
eos_version = EosVersion.from_str(label)
eos_versions.append(eos_version)
logger.debug(f"Found {label} - {eos_version}")
logger.debug(f'List of versions found on arista.com is: {eos_versions}')
logger.debug(f"List of versions found on arista.com is: {eos_versions}")
self.eos_versions = eos_versions
else:
logger.debug('receiving instruction to download versions, but already available')
logger.debug(
"receiving instruction to download versions, but already available"
)
return self.eos_versions
def _get_branches(self, with_rtype: str = RTYPE_FEATURE) -> List[str]:
@ -104,9 +118,11 @@ class EOSDownloader(ObjectDownloader):
Returns:
List[str]: A lsit of string that represent all availables EOS branches
"""
root = self._get_folder_tree()
root = self.get_folder_tree()
versions = self._parse_xml_for_version(root_xml=root)
return list({version.branch for version in versions if version.rtype == with_rtype})
return list(
{version.branch for version in versions if version.rtype == with_rtype}
)
def latest_branch(self, rtype: str = RTYPE_FEATURE) -> EosVersion:
"""
@ -125,7 +141,9 @@ class EOSDownloader(ObjectDownloader):
selected_branch = branch
return selected_branch
def get_eos_versions(self, branch: Union[str,None] = None, rtype: Union[str,None] = None) -> List[EosVersion]:
def get_eos_versions(
self, branch: Union[str, None] = None, rtype: Union[str, None] = None
) -> List[EosVersion]:
"""
Get a list of available EOS version available on arista.com
@ -139,16 +157,22 @@ class EOSDownloader(ObjectDownloader):
Returns:
List[EosVersion]: A list of versions available
"""
root = self._get_folder_tree()
root = self.get_folder_tree()
result = []
for version in self._parse_xml_for_version(root_xml=root):
if branch is None and (version.rtype == rtype or rtype is None):
result.append(version)
elif branch is not None and version.is_in_branch(branch) and version.rtype == rtype:
elif (
branch is not None
and version.is_in_branch(branch)
and version.rtype == rtype
):
result.append(version)
return result
def latest_eos(self, branch: Union[str,None] = None, rtype: str = RTYPE_FEATURE) -> EosVersion:
def latest_eos(
self, branch: Union[str, None] = None, rtype: str = RTYPE_FEATURE
) -> EosVersion:
"""
Get latest version of EOS
@ -168,7 +192,9 @@ class EOSDownloader(ObjectDownloader):
latest_branch = self.latest_branch(rtype=rtype)
else:
latest_branch = EosVersion.from_str(branch)
for version in self.get_eos_versions(branch=str(latest_branch.branch), rtype=rtype):
for version in self.get_eos_versions(
branch=str(latest_branch.branch), rtype=rtype
):
if version > selected_version:
if rtype is not None and version.rtype == rtype:
selected_version = version

View file

@ -16,11 +16,11 @@ from eos_downloader.tools import exc_to_str
# logger = logging.getLogger(__name__)
BASE_VERSION_STR = '4.0.0F'
BASE_BRANCH_STR = '4.0'
BASE_VERSION_STR = "4.0.0F"
BASE_BRANCH_STR = "4.0"
RTYPE_FEATURE = 'F'
RTYPE_MAINTENANCE = 'M'
RTYPE_FEATURE = "F"
RTYPE_MAINTENANCE = "M"
RTYPES = [RTYPE_FEATURE, RTYPE_MAINTENANCE]
# Regular Expression to capture multiple EOS version format
@ -29,8 +29,12 @@ RTYPES = [RTYPE_FEATURE, RTYPE_MAINTENANCE]
# 4.21.1M
# 4.28.10.F
# 4.28.6.1M
REGEX_EOS_VERSION = re.compile(r"^.*(?P<major>4)\.(?P<minor>\d{1,2})\.(?P<patch>\d{1,2})(?P<other>\.\d*)*(?P<rtype>[M,F])*$")
REGEX_EOS_BRANCH = re.compile(r"^.*(?P<major>4)\.(?P<minor>\d{1,2})(\.?P<patch>\d)*(\.\d)*(?P<rtype>[M,F])*$")
REGEX_EOS_VERSION = re.compile(
r"^.*(?P<major>4)\.(?P<minor>\d{1,2})\.(?P<patch>\d{1,2})(?P<other>\.\d*)*(?P<rtype>[M,F])*$"
)
REGEX_EOS_BRANCH = re.compile(
r"^.*(?P<major>4)\.(?P<minor>\d{1,2})(\.?P<patch>\d)*(\.\d)*(?P<rtype>[M,F])*$"
)
class EosVersion(BaseModel):
@ -59,10 +63,11 @@ class EosVersion(BaseModel):
Args:
BaseModel (Pydantic): Pydantic Base Model
"""
major: int = 4
minor: int = 0
patch: int = 0
rtype: Optional[str] = 'F'
rtype: Optional[str] = "F"
other: Any = None
@classmethod
@ -84,7 +89,7 @@ class EosVersion(BaseModel):
Returns:
EosVersion object
"""
logger.debug(f'receiving version: {eos_version}')
logger.debug(f"receiving version: {eos_version}")
if REGEX_EOS_VERSION.match(eos_version):
matches = REGEX_EOS_VERSION.match(eos_version)
# assert matches is not None
@ -95,7 +100,7 @@ class EosVersion(BaseModel):
# assert matches is not None
assert matches is not None
return cls(**matches.groupdict())
logger.error(f'Error occured with {eos_version}')
logger.error(f"Error occured with {eos_version}")
return EosVersion()
@property
@ -106,7 +111,7 @@ class EosVersion(BaseModel):
Returns:
str: branch from version
"""
return f'{self.major}.{self.minor}'
return f"{self.major}.{self.minor}"
def __str__(self) -> str:
"""
@ -118,8 +123,8 @@ class EosVersion(BaseModel):
str: A standard EOS version string representing <MAJOR>.<MINOR>.<PATCH><RTYPE>
"""
if self.other is None:
return f'{self.major}.{self.minor}.{self.patch}{self.rtype}'
return f'{self.major}.{self.minor}.{self.patch}{self.other}{self.rtype}'
return f"{self.major}.{self.minor}.{self.patch}{self.rtype}"
return f"{self.major}.{self.minor}.{self.patch}{self.other}{self.rtype}"
def _compare(self, other: EosVersion) -> float:
"""
@ -141,23 +146,33 @@ class EosVersion(BaseModel):
float: -1 if ver1 < ver2, 0 if ver1 == ver2, 1 if ver1 > ver2
"""
if not isinstance(other, EosVersion):
raise ValueError(f'could not compare {other} as it is not an EosVersion object')
raise ValueError(
f"could not compare {other} as it is not an EosVersion object"
)
comparison_flag: float = 0
logger.warning(f'current version {self.__str__()} - other {str(other)}') # pylint: disable = unnecessary-dunder-call
logger.warning(
f"current version {self.__str__()} - other {str(other)}" # pylint: disable = unnecessary-dunder-call
)
for key, _ in self.dict().items():
if comparison_flag == 0 and self.dict()[key] is None or other.dict()[key] is None:
logger.debug(f'{key}: local None - remote None')
logger.debug(f'{key}: local {self.dict()} - remote {other.dict()}')
if (
comparison_flag == 0
and self.dict()[key] is None
or other.dict()[key] is None
):
logger.debug(f"{key}: local None - remote None")
logger.debug(f"{key}: local {self.dict()} - remote {other.dict()}")
return comparison_flag
logger.debug(f'{key}: local {self.dict()[key]} - remote {other.dict()[key]}')
logger.debug(
f"{key}: local {self.dict()[key]} - remote {other.dict()[key]}"
)
if comparison_flag == 0 and self.dict()[key] < other.dict()[key]:
comparison_flag = -1
if comparison_flag == 0 and self.dict()[key] > other.dict()[key]:
comparison_flag = 1
if comparison_flag != 0:
logger.info(f'comparison result is {comparison_flag}')
logger.info(f"comparison result is {comparison_flag}")
return comparison_flag
logger.info(f'comparison result is {comparison_flag}')
logger.info(f"comparison result is {comparison_flag}")
return comparison_flag
@typing.no_type_check
@ -236,7 +251,7 @@ class EosVersion(BaseModel):
"['<', '>', '==', '<=', '>=', '!=']. "
f"You provided: {match_expr}"
)
logger.debug(f'work on comparison {prefix} with base release {match_version}')
logger.debug(f"work on comparison {prefix} with base release {match_version}")
possibilities_dict = {
">": (1,),
"<": (-1,),
@ -263,7 +278,7 @@ class EosVersion(BaseModel):
bool: True if current version is in provided branch, otherwise False
"""
try:
logger.debug(f'reading branch str:{branch_str}')
logger.debug(f"reading branch str:{branch_str}")
branch = EosVersion.from_str(branch_str)
except Exception as error: # pylint: disable = broad-exception-caught
logger.error(exc_to_str(error))

View file

@ -8,8 +8,13 @@
eos_downloader class definition
"""
from __future__ import (absolute_import, division, print_function,
unicode_literals, annotations)
from __future__ import (
absolute_import,
annotations,
division,
print_function,
unicode_literals,
)
import base64
import glob
@ -26,9 +31,14 @@ from loguru import logger
from rich import console
from tqdm import tqdm
from eos_downloader import (ARISTA_DOWNLOAD_URL, ARISTA_GET_SESSION,
ARISTA_SOFTWARE_FOLDER_TREE, EVE_QEMU_FOLDER_PATH,
MSG_INVALID_DATA, MSG_TOKEN_EXPIRED)
from eos_downloader import (
ARISTA_DOWNLOAD_URL,
ARISTA_GET_SESSION,
ARISTA_SOFTWARE_FOLDER_TREE,
EVE_QEMU_FOLDER_PATH,
MSG_INVALID_DATA,
MSG_TOKEN_EXPIRED,
)
from eos_downloader.data import DATA_MAPPING
from eos_downloader.download import DownloadProgressBar
@ -37,11 +47,19 @@ from eos_downloader.download import DownloadProgressBar
console = rich.get_console()
class ObjectDownloader():
class ObjectDownloader:
"""
ObjectDownloader Generic Object to download from Arista.com
"""
def __init__(self, image: str, version: str, token: str, software: str = 'EOS', hash_method: str = 'md5sum'):
def __init__(
self,
image: str,
version: str,
token: str,
software: str = "EOS",
hash_method: str = "md5sum",
):
"""
__init__ Class constructor
@ -70,10 +88,10 @@ class ObjectDownloader():
self.hash_method = hash_method
self.timeout = 5
# Logging
logger.debug(f'Filename built by _build_filename is {self.filename}')
logger.debug(f"Filename built by _build_filename is {self.filename}")
def __str__(self) -> str:
return f'{self.software} - {self.image} - {self.version}'
return f"{self.software} - {self.image} - {self.version}"
# def __repr__(self):
# return str(self.__dict__)
@ -102,16 +120,18 @@ class ObjectDownloader():
str:
Filename to search for on Arista.com
"""
logger.info('start build')
logger.info("start build")
if self.software in DATA_MAPPING:
logger.info(f'software in data mapping: {self.software}')
logger.info(f"software in data mapping: {self.software}")
if self.image in DATA_MAPPING[self.software]:
logger.info(f'image in data mapping: {self.image}')
logger.info(f"image in data mapping: {self.image}")
return f"{DATA_MAPPING[self.software][self.image]['prepend']}-{self.version}{DATA_MAPPING[self.software][self.image]['extension']}"
return f"{DATA_MAPPING[self.software]['default']['prepend']}-{self.version}{DATA_MAPPING[self.software]['default']['extension']}"
raise ValueError(f'Incorrect value for software {self.software}')
raise ValueError(f"Incorrect value for software {self.software}")
def _parse_xml_for_path(self, root_xml: ET.ElementTree, xpath: str, search_file: str) -> str:
def _parse_xml_for_path(
self, root_xml: ET.ElementTree, xpath: str, search_file: str
) -> str:
# sourcery skip: remove-unnecessary-cast
"""
_parse_xml Read and extract data from XML using XPATH
@ -132,18 +152,18 @@ class ObjectDownloader():
str
File Path on Arista server side
"""
logger.debug(f'Using xpath {xpath}')
logger.debug(f'Search for file {search_file}')
console.print(f'🔎 Searching file {search_file}')
logger.debug(f"Using xpath {xpath}")
logger.debug(f"Search for file {search_file}")
console.print(f"🔎 Searching file {search_file}")
for node in root_xml.findall(xpath):
# logger.debug('Found {}', node.text)
if str(node.text).lower() == search_file.lower():
path = node.get('path')
console.print(f' -> Found file at {path}')
path = node.get("path")
console.print(f" -> Found file at {path}")
logger.info(f'Found {node.text} at {node.get("path")}')
return str(node.get('path')) if node.get('path') is not None else ''
logger.error(f'Requested file ({self.filename}) not found !')
return ''
return str(node.get("path")) if node.get("path") is not None else ""
logger.error(f"Requested file ({self.filename}) not found !")
return ""
def _get_hash(self, file_path: str) -> str:
"""
@ -165,10 +185,10 @@ class ObjectDownloader():
dl_rich_progress_bar = DownloadProgressBar()
dl_rich_progress_bar.download(urls=[hash_url], dest_dir=file_path)
hash_downloaded = f"{file_path}/{os.path.basename(remote_hash_file)}"
hash_content = 'unset'
with open(hash_downloaded, 'r', encoding='utf-8') as f:
hash_content = "unset"
with open(hash_downloaded, "r", encoding="utf-8") as f:
hash_content = f.read()
return hash_content.split(' ')[0]
return hash_content.split(" ")[0]
@staticmethod
def _compute_hash_md5sum(file: str, hash_expected: str) -> bool:
@ -195,7 +215,9 @@ class ObjectDownloader():
hash_md5.update(chunk)
if hash_md5.hexdigest() == hash_expected:
return True
logger.warning(f'Downloaded file is corrupt: local md5 ({hash_md5.hexdigest()}) is different to md5 from arista ({hash_expected})')
logger.warning(
f"Downloaded file is corrupt: local md5 ({hash_md5.hexdigest()}) is different to md5 from arista ({hash_expected})"
)
return False
@staticmethod
@ -223,10 +245,12 @@ class ObjectDownloader():
hash_sha512.update(chunk)
if hash_sha512.hexdigest() == hash_expected:
return True
logger.warning(f'Downloaded file is corrupt: local sha512 ({hash_sha512.hexdigest()}) is different to sha512 from arista ({hash_expected})')
logger.warning(
f"Downloaded file is corrupt: local sha512 ({hash_sha512.hexdigest()}) is different to sha512 from arista ({hash_expected})"
)
return False
def _get_folder_tree(self) -> ET.ElementTree:
def get_folder_tree(self) -> ET.ElementTree:
"""
_get_folder_tree Download XML tree from Arista server
@ -237,15 +261,17 @@ class ObjectDownloader():
"""
if self.session_id is None:
self.authenticate()
jsonpost = {'sessionCode': self.session_id}
result = requests.post(ARISTA_SOFTWARE_FOLDER_TREE, data=json.dumps(jsonpost), timeout=self.timeout)
jsonpost = {"sessionCode": self.session_id}
result = requests.post(
ARISTA_SOFTWARE_FOLDER_TREE, data=json.dumps(jsonpost), timeout=self.timeout
)
try:
folder_tree = result.json()["data"]["xml"]
return ET.ElementTree(ET.fromstring(folder_tree))
except KeyError as error:
logger.error(MSG_INVALID_DATA)
logger.error(f'Server returned: {error}')
console.print(f'{MSG_INVALID_DATA}', style="bold red")
logger.error(f"Server returned: {error}")
console.print(f"{MSG_INVALID_DATA}", style="bold red")
sys.exit(1)
def _get_remote_filepath(self) -> str:
@ -259,12 +285,14 @@ class ObjectDownloader():
str
Remote path of the file to download
"""
root = self._get_folder_tree()
root = self.get_folder_tree()
logger.debug("GET XML content from ARISTA.com")
xpath = f'.//dir[@label="{self.software}"]//file'
return self._parse_xml_for_path(root_xml=root, xpath=xpath, search_file=self.filename)
return self._parse_xml_for_path(
root_xml=root, xpath=xpath, search_file=self.filename
)
def _get_remote_hashpath(self, hash_method: str = 'md5sum') -> str:
def _get_remote_hashpath(self, hash_method: str = "md5sum") -> str:
"""
_get_remote_hashpath Helper to get path of the hash's file to download
@ -275,13 +303,13 @@ class ObjectDownloader():
str
Remote path of the hash's file to download
"""
root = self._get_folder_tree()
root = self.get_folder_tree()
logger.debug("GET XML content from ARISTA.com")
xpath = f'.//dir[@label="{self.software}"]//file'
return self._parse_xml_for_path(
root_xml=root,
xpath=xpath,
search_file=f'{self.filename}.{hash_method}',
search_file=f"{self.filename}.{hash_method}",
)
def _get_url(self, remote_file_path: str) -> str:
@ -302,13 +330,15 @@ class ObjectDownloader():
"""
if self.session_id is None:
self.authenticate()
jsonpost = {'sessionCode': self.session_id, 'filePath': remote_file_path}
result = requests.post(ARISTA_DOWNLOAD_URL, data=json.dumps(jsonpost), timeout=self.timeout)
if 'data' in result.json() and 'url' in result.json()['data']:
jsonpost = {"sessionCode": self.session_id, "filePath": remote_file_path}
result = requests.post(
ARISTA_DOWNLOAD_URL, data=json.dumps(jsonpost), timeout=self.timeout
)
if "data" in result.json() and "url" in result.json()["data"]:
# logger.debug('URL to download file is: {}', result.json())
return result.json()["data"]["url"]
logger.critical(f'Server returns following message: {result.json()}')
return ''
logger.critical(f"Server returns following message: {result.json()}")
return ""
@staticmethod
def _download_file_raw(url: str, file_path: str) -> str:
@ -331,31 +361,40 @@ class ObjectDownloader():
"""
chunkSize = 1024
r = requests.get(url, stream=True, timeout=5)
with open(file_path, 'wb') as f:
pbar = tqdm(unit="B", total=int(r.headers['Content-Length']), unit_scale=True, unit_divisor=1024)
with open(file_path, "wb") as f:
pbar = tqdm(
unit="B",
total=int(r.headers["Content-Length"]),
unit_scale=True,
unit_divisor=1024,
)
for chunk in r.iter_content(chunk_size=chunkSize):
if chunk:
pbar.update(len(chunk))
f.write(chunk)
return file_path
def _download_file(self, file_path: str, filename: str, rich_interface: bool = True) -> Union[None, str]:
def _download_file(
self, file_path: str, filename: str, rich_interface: bool = True
) -> Union[None, str]:
remote_file_path = self._get_remote_filepath()
logger.info(f'File found on arista server: {remote_file_path}')
logger.info(f"File found on arista server: {remote_file_path}")
file_url = self._get_url(remote_file_path=remote_file_path)
if file_url is not False:
if not rich_interface:
return self._download_file_raw(url=file_url, file_path=os.path.join(file_path, filename))
return self._download_file_raw(
url=file_url, file_path=os.path.join(file_path, filename)
)
rich_downloader = DownloadProgressBar()
rich_downloader.download(urls=[file_url], dest_dir=file_path)
return os.path.join(file_path, filename)
logger.error(f'Cannot download file {file_path}')
logger.error(f"Cannot download file {file_path}")
return None
@staticmethod
def _create_destination_folder(path: str) -> None:
# os.makedirs(path, mode, exist_ok=True)
os.system(f'mkdir -p {path}')
os.system(f"mkdir -p {path}")
@staticmethod
def _disable_ztp(file_path: str) -> None:
@ -379,24 +418,29 @@ class ObjectDownloader():
"""
credentials = (base64.b64encode(self.token.encode())).decode("utf-8")
session_code_url = ARISTA_GET_SESSION
jsonpost = {'accessToken': credentials}
jsonpost = {"accessToken": credentials}
result = requests.post(session_code_url, data=json.dumps(jsonpost), timeout=self.timeout)
result = requests.post(
session_code_url, data=json.dumps(jsonpost), timeout=self.timeout
)
if result.json()["status"]["message"] in[ 'Access token expired', 'Invalid access token']:
console.print(f'{MSG_TOKEN_EXPIRED}', style="bold red")
if result.json()["status"]["message"] in [
"Access token expired",
"Invalid access token",
]:
console.print(f"{MSG_TOKEN_EXPIRED}", style="bold red")
logger.error(MSG_TOKEN_EXPIRED)
return False
try:
if 'data' in result.json():
if "data" in result.json():
self.session_id = result.json()["data"]["session_code"]
logger.info('Authenticated on arista.com')
logger.info("Authenticated on arista.com")
return True
logger.debug(f'{result.json()}')
logger.debug(f"{result.json()}")
return False
except KeyError as error_arista:
logger.error(f'Error: {error_arista}')
logger.error(f"Error: {error_arista}")
sys.exit(1)
def download_local(self, file_path: str, checksum: bool = False) -> bool:
@ -422,25 +466,33 @@ class ObjectDownloader():
bool
True if everything went well, False if any problem appears
"""
file_downloaded = str(self._download_file(file_path=file_path, filename=self.filename))
file_downloaded = str(
self._download_file(file_path=file_path, filename=self.filename)
)
# Check file HASH
hash_result = False
if checksum:
logger.info('🚀 Running checksum validation')
console.print('🚀 Running checksum validation')
if self.hash_method == 'md5sum':
logger.info("🚀 Running checksum validation")
console.print("🚀 Running checksum validation")
if self.hash_method == "md5sum":
hash_expected = self._get_hash(file_path=file_path)
hash_result = self._compute_hash_md5sum(file=file_downloaded, hash_expected=hash_expected)
elif self.hash_method == 'sha512sum':
hash_result = self._compute_hash_md5sum(
file=file_downloaded, hash_expected=hash_expected
)
elif self.hash_method == "sha512sum":
hash_expected = self._get_hash(file_path=file_path)
hash_result = self._compute_hash_sh512sum(file=file_downloaded, hash_expected=hash_expected)
hash_result = self._compute_hash_sh512sum(
file=file_downloaded, hash_expected=hash_expected
)
if not hash_result:
logger.error('Downloaded file is corrupted, please check your connection')
console.print('❌ Downloaded file is corrupted, please check your connection')
logger.error("Downloaded file is corrupted, please check your connection")
console.print(
"❌ Downloaded file is corrupted, please check your connection"
)
return False
logger.info('Downloaded file is correct.')
console.print('✅ Downloaded file is correct.')
logger.info("Downloaded file is correct.")
console.print("✅ Downloaded file is correct.")
return True
def provision_eve(self, noztp: bool = False, checksum: bool = True) -> None:
@ -466,7 +518,7 @@ class ObjectDownloader():
# Build image name to use in folder path
eos_image_name = self.filename.rstrip(".vmdk").lower()
if noztp:
eos_image_name = f'{eos_image_name}-noztp'
eos_image_name = f"{eos_image_name}-noztp"
# Create full path for EVE-NG
file_path = os.path.join(EVE_QEMU_FOLDER_PATH, eos_image_name.rstrip())
# Create folders in filesystem
@ -474,20 +526,23 @@ class ObjectDownloader():
# Download file to local destination
file_downloaded = self._download_file(
file_path=file_path, filename=self.filename)
file_path=file_path, filename=self.filename
)
# Convert to QCOW2 format
file_qcow2 = os.path.join(file_path, "hda.qcow2")
logger.info('Converting VMDK to QCOW2 format')
console.print('🚀 Converting VMDK to QCOW2 format...')
logger.info("Converting VMDK to QCOW2 format")
console.print("🚀 Converting VMDK to QCOW2 format...")
os.system(f'$(which qemu-img) convert -f vmdk -O qcow2 {file_downloaded} {file_qcow2}')
os.system(
f"$(which qemu-img) convert -f vmdk -O qcow2 {file_downloaded} {file_qcow2}"
)
logger.info('Applying unl_wrapper to fix permissions')
console.print('Applying unl_wrapper to fix permissions')
logger.info("Applying unl_wrapper to fix permissions")
console.print("Applying unl_wrapper to fix permissions")
os.system('/opt/unetlab/wrappers/unl_wrapper -a fixpermissions')
os.system(f'rm -f {file_downloaded}')
os.system("/opt/unetlab/wrappers/unl_wrapper -a fixpermissions")
os.system(f"rm -f {file_downloaded}")
if noztp:
self._disable_ztp(file_path=file_path)
@ -502,12 +557,12 @@ class ObjectDownloader():
version (str):
image_name (str, optional): Image name to use. Defaults to "arista/ceos".
"""
docker_image = f'{image_name}:{self.version}'
logger.info(f'Importing image {self.filename} to {docker_image}')
console.print(f'🚀 Importing image {self.filename} to {docker_image}')
os.system(f'$(which docker) import {self.filename} {docker_image}')
for filename in glob.glob(f'{self.filename}*'):
docker_image = f"{image_name}:{self.version}"
logger.info(f"Importing image {self.filename} to {docker_image}")
console.print(f"🚀 Importing image {self.filename} to {docker_image}")
os.system(f"$(which docker) import {self.filename} {docker_image}")
for filename in glob.glob(f"{self.filename}*"):
try:
os.remove(filename)
except FileNotFoundError:
console.print(f'File not found: {filename}')
console.print(f"File not found: {filename}")

View file

@ -2,7 +2,8 @@
disable=
invalid-name,
logging-fstring-interpolation,
fixme
fixme,
line-too-long
[BASIC]
good-names=runCmds, i, y, t, c, x, e, fd, ip, v

View file

@ -6,7 +6,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "eos_downloader"
version = "v0.8.2"
version = "v0.9.0"
readme = "README.md"
authors = [{ name = "Thomas Grimonet", email = "thomas.grimonet@gmail.com" }]
maintainers = [
@ -22,7 +22,7 @@ dependencies = [
"scp",
"tqdm",
"loguru",
"rich~=13.5.2",
"rich>=13.5.2,<13.7.0",
"cvprac>=1.0.7",
"click~=8.1.6",
"click-help-colors~=0.9",
@ -62,7 +62,7 @@ dev = [
"pytest-html>=3.1.1",
"pytest-metadata>=1.11.0",
"pylint-pydantic>=0.2.4",
"tox==4.10.0",
"tox~=4.11",
"types-PyYAML",
"types-paramiko",
"types-requests",
@ -94,7 +94,7 @@ namespaces = false
# Version
################################
[tool.bumpver]
current_version = "0.8.2"
current_version = "0.9.0"
version_pattern = "MAJOR.MINOR.PATCH"
commit_message = "bump: Version {old_version} -> {new_version}"
commit = true

View file

@ -5,12 +5,13 @@
# flake8: noqa: W503
# flake8: noqa: W1202
from __future__ import (absolute_import, division, print_function)
import os
import eos_downloader
from eos_downloader.eos import EOSDownloader
from eos_downloader.data import DATA_MAPPING
from __future__ import absolute_import, division, print_function
import os
import eos_downloader
from eos_downloader.data import DATA_MAPPING
from eos_downloader.eos import EOSDownloader
# --------------------------------------------------------------- #
# MOOCK data to use for testing
@ -18,99 +19,99 @@ from eos_downloader.data import DATA_MAPPING
# Get Auth token
# eos_token = os.getenv('ARISTA_TOKEN')
eos_token = os.getenv('ARISTA_TOKEN', 'invalid_token')
eos_token_invalid = 'invalid_token'
eos_token = os.getenv("ARISTA_TOKEN", "invalid_token")
eos_token_invalid = "invalid_token"
eos_dataset_valid = [
{
'image': 'EOS',
'version': '4.26.3M',
'software': 'EOS',
'filename': 'EOS-4.26.3M.swi',
'expected_hash': 'sha512sum',
'remote_path': '/support/download/EOS-USA/Active Releases/4.26/EOS-4.26.3M/EOS-4.26.3M.swi',
'compute_checksum': True
"image": "EOS",
"version": "4.26.3M",
"software": "EOS",
"filename": "EOS-4.26.3M.swi",
"expected_hash": "sha512sum",
"remote_path": "/support/download/EOS-USA/Active Releases/4.26/EOS-4.26.3M/EOS-4.26.3M.swi",
"compute_checksum": True,
},
{
'image': 'EOS',
'version': '4.25.6M',
'software': 'EOS',
'filename': 'EOS-4.25.6M.swi',
'expected_hash': 'md5sum',
'remote_path': '/support/download/EOS-USA/Active Releases/4.25/EOS-4.25.6M/EOS-4.25.6M.swi',
'compute_checksum': True
"image": "EOS",
"version": "4.25.6M",
"software": "EOS",
"filename": "EOS-4.25.6M.swi",
"expected_hash": "md5sum",
"remote_path": "/support/download/EOS-USA/Active Releases/4.25/EOS-4.25.6M/EOS-4.25.6M.swi",
"compute_checksum": True,
},
{
'image': 'vEOS-lab',
'version': '4.25.6M',
'software': 'EOS',
'filename': 'vEOS-lab-4.25.6M.vmdk',
'expected_hash': 'md5sum',
'remote_path': '/support/download/EOS-USA/Active Releases/4.25/EOS-4.25.6M/vEOS-lab/vEOS-lab-4.25.6M.vmdk',
'compute_checksum': False
}
"image": "vEOS-lab",
"version": "4.25.6M",
"software": "EOS",
"filename": "vEOS-lab-4.25.6M.vmdk",
"expected_hash": "md5sum",
"remote_path": "/support/download/EOS-USA/Active Releases/4.25/EOS-4.25.6M/vEOS-lab/vEOS-lab-4.25.6M.vmdk",
"compute_checksum": False,
},
]
eos_dataset_invalid = [
{
'image': 'default',
'version': '4.26.3M',
'software': 'EOS',
'filename': 'EOS-4.26.3M.swi',
'expected_hash': 'sha512sum',
'remote_path': '/support/download/EOS-USA/Active Releases/4.26/EOS-4.26.3M/EOS-4.26.3M.swi',
'compute_checksum': True
"image": "default",
"version": "4.26.3M",
"software": "EOS",
"filename": "EOS-4.26.3M.swi",
"expected_hash": "sha512sum",
"remote_path": "/support/download/EOS-USA/Active Releases/4.26/EOS-4.26.3M/EOS-4.26.3M.swi",
"compute_checksum": True,
}
]
eos_version = [
{
'version': 'EOS-4.23.1F',
'is_valid': True,
'major': 4,
'minor': 23,
'patch': 1,
'rtype': 'F'
"version": "EOS-4.23.1F",
"is_valid": True,
"major": 4,
"minor": 23,
"patch": 1,
"rtype": "F",
},
{
'version': 'EOS-4.23.0',
'is_valid': True,
'major': 4,
'minor': 23,
'patch': 0,
'rtype': None
"version": "EOS-4.23.0",
"is_valid": True,
"major": 4,
"minor": 23,
"patch": 0,
"rtype": None,
},
{
'version': 'EOS-4.23',
'is_valid': True,
'major': 4,
'minor': 23,
'patch': 0,
'rtype': None
"version": "EOS-4.23",
"is_valid": True,
"major": 4,
"minor": 23,
"patch": 0,
"rtype": None,
},
{
'version': 'EOS-4.23.1M',
'is_valid': True,
'major': 4,
'minor': 23,
'patch': 1,
'rtype': 'M'
"version": "EOS-4.23.1M",
"is_valid": True,
"major": 4,
"minor": 23,
"patch": 1,
"rtype": "M",
},
{
'version': 'EOS-4.23.1.F',
'is_valid': True,
'major': 4,
'minor': 23,
'patch': 1,
'rtype': 'F'
"version": "EOS-4.23.1.F",
"is_valid": True,
"major": 4,
"minor": 23,
"patch": 1,
"rtype": "F",
},
{
'version': 'EOS-5.23.1F',
'is_valid': False,
'major': 4,
'minor': 23,
'patch': 1,
'rtype': 'F'
"version": "EOS-5.23.1F",
"is_valid": False,
"major": 4,
"minor": 23,
"patch": 1,
"rtype": "F",
},
]

View file

@ -5,13 +5,20 @@
# flake8: noqa: W503
# flake8: noqa: W1202
from __future__ import (absolute_import, division, print_function)
import os
import pytest
import eos_downloader
from typing import Dict, Any, List
from tests.lib.dataset import eos_dataset_valid, eos_dataset_invalid, eos_token, eos_token_invalid
from __future__ import absolute_import, division, print_function
import os
from typing import Any, Dict, List
import pytest
import eos_downloader
from tests.lib.dataset import (
eos_dataset_invalid,
eos_dataset_valid,
eos_token,
eos_token_invalid,
)
@pytest.fixture
@ -19,17 +26,18 @@ from tests.lib.dataset import eos_dataset_valid, eos_dataset_invalid, eos_token,
def create_download_instance(request, DOWNLOAD_INFO):
# logger.info("Execute fixture to create class elements")
request.cls.eos_downloader = eos_downloader.eos.EOSDownloader(
image=DOWNLOAD_INFO['image'],
software=DOWNLOAD_INFO['software'],
version=DOWNLOAD_INFO['version'],
image=DOWNLOAD_INFO["image"],
software=DOWNLOAD_INFO["software"],
version=DOWNLOAD_INFO["version"],
token=eos_token,
hash_method='sha512sum')
hash_method="sha512sum",
)
yield
# logger.info('Cleanup test environment')
os.system('rm -f {}*'.format(DOWNLOAD_INFO['filename']))
os.system("rm -f {}*".format(DOWNLOAD_INFO["filename"]))
def generate_test_ids_dict(val: Dict[str, Any], key: str = 'name') -> str:
def generate_test_ids_dict(val: Dict[str, Any], key: str = "name") -> str:
"""
generate_test_ids Helper to generate test ID for parametrize
@ -50,7 +58,8 @@ def generate_test_ids_dict(val: Dict[str, Any], key: str = 'name') -> str:
return val[key]
return "undefined_test"
def generate_test_ids_list(val: List[Dict[str, Any]], key: str = 'name') -> str:
def generate_test_ids_list(val: List[Dict[str, Any]], key: str = "name") -> str:
"""
generate_test_ids Helper to generate test ID for parametrize
@ -66,4 +75,4 @@ def generate_test_ids_list(val: List[Dict[str, Any]], key: str = 'name') -> str:
str
Name of the configlet
"""
return [ entry[key] if key in entry.keys() else 'unset_entry' for entry in val ]
return [entry[key] if key in entry.keys() else "unset_entry" for entry in val]

View file

@ -5,14 +5,13 @@
# flake8: noqa: W503
# flake8: noqa: W1202
from __future__ import (absolute_import, division, print_function)
from __future__ import absolute_import, division, print_function
import os
from eos_downloader.data import DATA_MAPPING
def default_filename(version: str, info):
"""
default_filename Helper to build default filename
@ -31,10 +30,14 @@ def default_filename(version: str, info):
"""
if version is None or info is None:
return None
return DATA_MAPPING[info['software']]['default']['prepend'] + '-' + version + '.swi'
return DATA_MAPPING[info["software"]]["default"]["prepend"] + "-" + version + ".swi"
def is_on_github_actions():
"""Check if code is running on a CI runner"""
if "CI" not in os.environ or not os.environ["CI"] or "GITHUB_RUN_ID" not in os.environ:
if (
"CI" not in os.environ
or not os.environ["CI"]
or "GITHUB_RUN_ID" not in os.environ
):
return False

View file

@ -45,4 +45,3 @@ class TestEosDownload_valid():
@pytest.mark.eos_download
def test_download_local(self, DOWNLOAD_INFO):
self.eos_downloader.download_local(file_path='.', checksum=DOWNLOAD_INFO['compute_checksum'])

View file

@ -5,126 +5,166 @@
# flake8: noqa: W503
# flake8: noqa: W1202
from __future__ import (absolute_import, division, print_function)
from __future__ import absolute_import, division, print_function
import sys
from loguru import logger
import pytest
from eos_downloader.models.version import EosVersion, BASE_VERSION_STR
from loguru import logger
from eos_downloader.models.version import BASE_VERSION_STR, EosVersion
from tests.lib.dataset import eos_version
from tests.lib.fixtures import generate_test_ids_list
logger.remove()
logger.add(sys.stderr, level="DEBUG")
@pytest.mark.parametrize("EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version,key='version'))
@pytest.mark.parametrize(
"EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version, key="version")
)
def test_eos_version_from_str(EOS_VERSION):
version = EosVersion.from_str(EOS_VERSION['version'])
if EOS_VERSION['is_valid']:
assert version.major == EOS_VERSION['major']
assert version.minor == EOS_VERSION['minor']
assert version.patch == EOS_VERSION['patch']
assert version.rtype == EOS_VERSION['rtype']
version = EosVersion.from_str(EOS_VERSION["version"])
if EOS_VERSION["is_valid"]:
assert version.major == EOS_VERSION["major"]
assert version.minor == EOS_VERSION["minor"]
assert version.patch == EOS_VERSION["patch"]
assert version.rtype == EOS_VERSION["rtype"]
else:
assert str(version) == BASE_VERSION_STR
@pytest.mark.parametrize("EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version,key='version'))
@pytest.mark.parametrize(
"EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version, key="version")
)
def test_eos_version_to_str(EOS_VERSION):
version = EosVersion(**EOS_VERSION)
if EOS_VERSION['is_valid']:
assert version.major == EOS_VERSION['major']
assert version.minor == EOS_VERSION['minor']
assert version.patch == EOS_VERSION['patch']
assert version.rtype == EOS_VERSION['rtype']
if EOS_VERSION["is_valid"]:
assert version.major == EOS_VERSION["major"]
assert version.minor == EOS_VERSION["minor"]
assert version.patch == EOS_VERSION["patch"]
assert version.rtype == EOS_VERSION["rtype"]
@pytest.mark.parametrize("EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version,key='version'))
@pytest.mark.parametrize(
"EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version, key="version")
)
def test_eos_version_branch(EOS_VERSION):
if EOS_VERSION['is_valid']:
if EOS_VERSION["is_valid"]:
version = EosVersion(**EOS_VERSION)
assert version.branch == f'{EOS_VERSION["major"]}.{EOS_VERSION["minor"]}'
@pytest.mark.parametrize("EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version,key='version'))
@pytest.mark.parametrize(
"EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version, key="version")
)
def test_eos_version_eq_operator(EOS_VERSION):
if not EOS_VERSION['is_valid']:
pytest.skip('not a valid version to test')
if not EOS_VERSION["is_valid"]:
pytest.skip("not a valid version to test")
version = EosVersion(**EOS_VERSION)
logger.warning(f'version is: {version.dict()}')
logger.warning(f"version is: {version.dict()}")
assert version == version
@pytest.mark.parametrize("EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version,key='version'))
@pytest.mark.parametrize(
"EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version, key="version")
)
def test_eos_version_ge_operator(EOS_VERSION):
if not EOS_VERSION['is_valid']:
pytest.skip('not a valid version to test')
if not EOS_VERSION["is_valid"]:
pytest.skip("not a valid version to test")
version = EosVersion(**EOS_VERSION)
version_b = EosVersion.from_str(BASE_VERSION_STR)
assert version >= version_b
@pytest.mark.parametrize("EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version,key='version'))
@pytest.mark.parametrize(
"EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version, key="version")
)
def test_eos_version_gs_operator(EOS_VERSION):
if not EOS_VERSION['is_valid']:
pytest.skip('not a valid version to test')
if not EOS_VERSION["is_valid"]:
pytest.skip("not a valid version to test")
version = EosVersion(**EOS_VERSION)
version_b = EosVersion.from_str(BASE_VERSION_STR)
assert version > version_b
@pytest.mark.parametrize("EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version,key='version'))
@pytest.mark.parametrize(
"EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version, key="version")
)
def test_eos_version_le_operator(EOS_VERSION):
if not EOS_VERSION['is_valid']:
pytest.skip('not a valid version to test')
if not EOS_VERSION["is_valid"]:
pytest.skip("not a valid version to test")
version = EosVersion(**EOS_VERSION)
version_b = EosVersion.from_str(BASE_VERSION_STR)
assert version_b <= version
@pytest.mark.parametrize("EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version,key='version'))
@pytest.mark.parametrize(
"EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version, key="version")
)
def test_eos_version_ls_operator(EOS_VERSION):
if not EOS_VERSION['is_valid']:
pytest.skip('not a valid version to test')
if not EOS_VERSION["is_valid"]:
pytest.skip("not a valid version to test")
version = EosVersion(**EOS_VERSION)
version_b = EosVersion.from_str(BASE_VERSION_STR)
assert version_b < version
@pytest.mark.parametrize("EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version,key='version'))
@pytest.mark.parametrize(
"EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version, key="version")
)
def test_eos_version_ne_operator(EOS_VERSION):
if not EOS_VERSION['is_valid']:
pytest.skip('not a valid version to test')
if not EOS_VERSION["is_valid"]:
pytest.skip("not a valid version to test")
version = EosVersion(**EOS_VERSION)
version_b = EosVersion.from_str(BASE_VERSION_STR)
assert version_b != version
@pytest.mark.parametrize("EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version,key='version'))
@pytest.mark.parametrize(
"EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version, key="version")
)
def test_eos_version_match(EOS_VERSION):
if not EOS_VERSION['is_valid']:
pytest.skip('not a valid version to test')
if not EOS_VERSION["is_valid"]:
pytest.skip("not a valid version to test")
version = EosVersion(**EOS_VERSION)
assert version.match(f'=={EOS_VERSION["version"]}')
assert version.match(f'!={BASE_VERSION_STR}')
assert version.match(f'>={BASE_VERSION_STR}')
assert version.match(f'>{BASE_VERSION_STR}')
assert version.match('<=4.99.0F')
assert version.match('<4.99.0F')
assert version.match(f"!={BASE_VERSION_STR}")
assert version.match(f">={BASE_VERSION_STR}")
assert version.match(f">{BASE_VERSION_STR}")
assert version.match("<=4.99.0F")
assert version.match("<4.99.0F")
@pytest.mark.parametrize("EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version,key='version'))
@pytest.mark.parametrize(
"EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version, key="version")
)
def test_eos_version_is_in_branch(EOS_VERSION):
if not EOS_VERSION['is_valid']:
pytest.skip('not a valid version to test')
if not EOS_VERSION["is_valid"]:
pytest.skip("not a valid version to test")
version = EosVersion(**EOS_VERSION)
assert version.is_in_branch(f"{EOS_VERSION['major']}.{EOS_VERSION['minor']}")
@pytest.mark.parametrize("EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version,key='version'))
@pytest.mark.parametrize(
"EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version, key="version")
)
def test_eos_version_match_exception(EOS_VERSION):
if not EOS_VERSION['is_valid']:
pytest.skip('not a valid version to test')
if not EOS_VERSION["is_valid"]:
pytest.skip("not a valid version to test")
with pytest.raises(Exception) as e_info:
version = EosVersion(**EOS_VERSION)
assert version.match(f'+={EOS_VERSION["version"]}')
logger.info(f'receive exception: {e_info}')
logger.info(f"receive exception: {e_info}")
@pytest.mark.parametrize("EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version,key='version'))
@pytest.mark.parametrize(
"EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version, key="version")
)
def test_eos_version_compare_exception(EOS_VERSION):
if not EOS_VERSION['is_valid']:
pytest.skip('not a valid version to test')
if not EOS_VERSION["is_valid"]:
pytest.skip("not a valid version to test")
with pytest.raises(Exception) as e_info:
version = EosVersion(**EOS_VERSION)
version._compare(BASE_VERSION_STR)
logger.info(f'receive exception: {e_info}')
logger.info(f"receive exception: {e_info}")

View file

@ -14,28 +14,39 @@ from loguru import logger
import eos_downloader
from eos_downloader.data import DATA_MAPPING
from eos_downloader.eos import EOSDownloader
from tests.lib.dataset import eos_dataset_invalid, eos_dataset_valid, eos_token, eos_token_invalid
from tests.lib.dataset import (
eos_dataset_invalid,
eos_dataset_valid,
eos_token,
eos_token_invalid,
)
from tests.lib.fixtures import create_download_instance
from tests.lib.helpers import default_filename, is_on_github_actions
logger.remove()
logger.add(sys.stderr, level="DEBUG")
@pytest.mark.usefixtures("create_download_instance")
@pytest.mark.parametrize("DOWNLOAD_INFO", eos_dataset_valid, ids=['EOS-sha512', 'EOS-md5' ,'vEOS-lab-no-hash'])
@pytest.mark.parametrize(
"DOWNLOAD_INFO",
eos_dataset_valid,
ids=["EOS-sha512", "EOS-md5", "vEOS-lab-no-hash"],
)
@pytest.mark.eos_download
class TestEosDownload_valid():
class TestEosDownload_valid:
def test_data(self, DOWNLOAD_INFO):
logger.info(f'test input: {DOWNLOAD_INFO}')
logger.info(f'test build: {self.eos_downloader.__dict__}')
logger.info(f"test input: {DOWNLOAD_INFO}")
logger.info(f"test build: {self.eos_downloader.__dict__}")
def test_eos_download_create(self, DOWNLOAD_INFO):
my_download = eos_downloader.eos.EOSDownloader(
image=DOWNLOAD_INFO['image'],
software=DOWNLOAD_INFO['software'],
version=DOWNLOAD_INFO['version'],
image=DOWNLOAD_INFO["image"],
software=DOWNLOAD_INFO["software"],
version=DOWNLOAD_INFO["version"],
token=eos_token,
hash_method='sha512sum')
hash_method="sha512sum",
)
logger.info(my_download)
assert isinstance(my_download, eos_downloader.eos.EOSDownloader)
@ -45,47 +56,56 @@ class TestEosDownload_valid():
assert str(self.eos_downloader) == expected
def test_eos_download_build_filename(self, DOWNLOAD_INFO):
assert self.eos_downloader._build_filename() == DOWNLOAD_INFO['filename']
assert self.eos_downloader._build_filename() == DOWNLOAD_INFO["filename"]
@pytest.mark.dependency(name='authentication')
@pytest.mark.skipif(eos_token == eos_token_invalid, reason="Token is not set correctly")
@pytest.mark.dependency(name="authentication")
@pytest.mark.skipif(
eos_token == eos_token_invalid, reason="Token is not set correctly"
)
@pytest.mark.skipif(is_on_github_actions(), reason="Running on Github Runner")
# @pytest.mark.xfail(reason="Deliberate - CI not set for testing AUTH")
@pytest.mark.webtest
def test_eos_download_authenticate(self):
assert self.eos_downloader.authenticate() is True
@pytest.mark.dependency(depends=["authentication"], scope='class')
@pytest.mark.dependency(depends=["authentication"], scope="class")
@pytest.mark.webtest
def test_eos_download_get_remote_file_path(self, DOWNLOAD_INFO):
assert self.eos_downloader._get_remote_filepath() == DOWNLOAD_INFO['remote_path']
assert (
self.eos_downloader._get_remote_filepath() == DOWNLOAD_INFO["remote_path"]
)
@pytest.mark.dependency(depends=["authentication"], scope='class')
@pytest.mark.dependency(depends=["authentication"], scope="class")
@pytest.mark.webtest
def test_eos_download_get_file_url(self, DOWNLOAD_INFO):
url = self.eos_downloader._get_url(remote_file_path = DOWNLOAD_INFO['remote_path'])
url = self.eos_downloader._get_url(
remote_file_path=DOWNLOAD_INFO["remote_path"]
)
logger.info(url)
assert 'https://downloads.arista.com/EOS-USA/Active%20Releases/' in url
assert "https://downloads.arista.com/EOS-USA/Active%20Releases/" in url
@pytest.mark.usefixtures("create_download_instance")
@pytest.mark.parametrize("DOWNLOAD_INFO", eos_dataset_invalid, ids=['EOS-FAKE'])
class TestEosDownload_invalid():
@pytest.mark.parametrize("DOWNLOAD_INFO", eos_dataset_invalid, ids=["EOS-FAKE"])
class TestEosDownload_invalid:
def test_data(self, DOWNLOAD_INFO):
logger.info(f'test input: {dict(DOWNLOAD_INFO)}')
logger.info(f'test build: {self.eos_downloader.__dict__}')
logger.info(f"test input: {dict(DOWNLOAD_INFO)}")
logger.info(f"test build: {self.eos_downloader.__dict__}")
def test_eos_download_login_error(self, DOWNLOAD_INFO):
my_download = eos_downloader.eos.EOSDownloader(
image=DOWNLOAD_INFO['image'],
software=DOWNLOAD_INFO['software'],
version=DOWNLOAD_INFO['version'],
image=DOWNLOAD_INFO["image"],
software=DOWNLOAD_INFO["software"],
version=DOWNLOAD_INFO["version"],
token=eos_token_invalid,
hash_method=DOWNLOAD_INFO['expected_hash'])
hash_method=DOWNLOAD_INFO["expected_hash"],
)
assert my_download.authenticate() is False
@pytest.mark.dependency(name='authentication')
@pytest.mark.skipif(eos_token == eos_token_invalid, reason="Token is not set correctly")
@pytest.mark.dependency(name="authentication")
@pytest.mark.skipif(
eos_token == eos_token_invalid, reason="Token is not set correctly"
)
@pytest.mark.skipif(is_on_github_actions(), reason="Running on Github Runner")
# @pytest.mark.xfail(reason="Deliberate - CI not set for testing AUTH")
@pytest.mark.webtest
@ -96,46 +116,48 @@ class TestEosDownload_invalid():
# @pytest.mark.skip(reason="Not yet implemented in lib")
def test_eos_file_name_with_incorrect_software(self, DOWNLOAD_INFO):
self.eos_downloader.software = 'FAKE'
logger.info(f'test build: {self.eos_downloader.__dict__}')
self.eos_downloader.software = "FAKE"
logger.info(f"test build: {self.eos_downloader.__dict__}")
with pytest.raises(ValueError) as e_info:
result = self.eos_downloader._build_filename()
logger.info(f'receive exception: {e_info}')
self.eos_downloader.software = DOWNLOAD_INFO['software']
logger.info(f"receive exception: {e_info}")
self.eos_downloader.software = DOWNLOAD_INFO["software"]
@pytest.mark.webtest
@pytest.mark.dependency(depends=["authentication"], scope='class')
def test_eos_download_get_remote_file_path_for_invlaid_software(self, DOWNLOAD_INFO):
self.eos_downloader.software = 'FAKE'
logger.info(f'Platform set to: {self.eos_downloader.software}')
logger.info(f'test build: {self.eos_downloader.__dict__}')
@pytest.mark.dependency(depends=["authentication"], scope="class")
def test_eos_download_get_remote_file_path_for_invlaid_software(
self, DOWNLOAD_INFO
):
self.eos_downloader.software = "FAKE"
logger.info(f"Platform set to: {self.eos_downloader.software}")
logger.info(f"test build: {self.eos_downloader.__dict__}")
with pytest.raises(ValueError) as e_info:
result = self.eos_downloader._build_filename()
logger.info(f'receive exception: {e_info}')
self.eos_downloader.software = DOWNLOAD_INFO['software']
logger.info(f"receive exception: {e_info}")
self.eos_downloader.software = DOWNLOAD_INFO["software"]
# IMAGE TESTING
def test_eos_file_name_with_incorrect_image(self, DOWNLOAD_INFO):
self.eos_downloader.image = 'FAKE'
logger.info(f'Image set to: {self.eos_downloader.image}')
assert DOWNLOAD_INFO['filename'] == self.eos_downloader._build_filename()
self.eos_downloader.software == DOWNLOAD_INFO['image']
self.eos_downloader.image = "FAKE"
logger.info(f"Image set to: {self.eos_downloader.image}")
assert DOWNLOAD_INFO["filename"] == self.eos_downloader._build_filename()
self.eos_downloader.software == DOWNLOAD_INFO["image"]
@pytest.mark.webtest
@pytest.mark.dependency(depends=["authentication"], scope='class')
@pytest.mark.dependency(depends=["authentication"], scope="class")
def test_eos_download_get_remote_file_path_for_invlaid_image(self, DOWNLOAD_INFO):
self.eos_downloader.image = 'FAKE'
logger.info(f'Image set to: {self.eos_downloader.image}')
self.eos_downloader.image = "FAKE"
logger.info(f"Image set to: {self.eos_downloader.image}")
assert self.eos_downloader.authenticate() is True
assert DOWNLOAD_INFO['filename'] == self.eos_downloader._build_filename()
self.eos_downloader.image = DOWNLOAD_INFO['image']
assert DOWNLOAD_INFO["filename"] == self.eos_downloader._build_filename()
self.eos_downloader.image = DOWNLOAD_INFO["image"]
# VERSION TESTING
@pytest.mark.webtest
@pytest.mark.dependency(depends=["authentication"], scope='class')
@pytest.mark.dependency(depends=["authentication"], scope="class")
def test_eos_download_get_remote_file_path_for_invlaid_version(self, DOWNLOAD_INFO):
self.eos_downloader.version = 'FAKE'
logger.info(f'Version set to: {self.eos_downloader.version}')
assert self.eos_downloader._get_remote_filepath() == ''
self.eos_downloader.version = "FAKE"
logger.info(f"Version set to: {self.eos_downloader.version}")
assert self.eos_downloader._get_remote_filepath() == ""