1
0
Fork 0

Adding upstream version 0.9.0.

Signed-off-by: Daniel Baumann <daniel@debian.org>
This commit is contained in:
Daniel Baumann 2025-02-05 13:48:40 +01:00
parent 6dc7f1a5a2
commit fb90b93350
Signed by: daniel
GPG key ID: FBB4F0E80A80222F
28 changed files with 1073 additions and 859 deletions

View file

@ -14,11 +14,11 @@ jobs:
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- name: Checkout - name: Checkout
uses: actions/checkout@v3 uses: actions/checkout@v4
- name: Docker meta for TAG - name: Docker meta for TAG
id: meta id: meta
uses: docker/metadata-action@v4 uses: docker/metadata-action@v5
with: with:
images: | images: |
${{ secrets.DOCKER_IMAGE }} ${{ secrets.DOCKER_IMAGE }}
@ -27,20 +27,20 @@ jobs:
type=raw,value=${{ inputs.tag }} type=raw,value=${{ inputs.tag }}
- name: Login to DockerHub - name: Login to DockerHub
uses: docker/login-action@v2 uses: docker/login-action@v3
with: with:
username: ${{ secrets.DOCKERHUB_USERNAME }} username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }} password: ${{ secrets.DOCKER_PASSWORD }}
- name: Login to GitHub Container Registry - name: Login to GitHub Container Registry
uses: docker/login-action@v2 uses: docker/login-action@v3
with: with:
registry: ghcr.io registry: ghcr.io
username: ${{ github.repository_owner }} username: ${{ github.repository_owner }}
password: ${{ secrets.GITHUB_TOKEN }} password: ${{ secrets.GITHUB_TOKEN }}
- name: Build and push - name: Build and push
uses: docker/build-push-action@v4 uses: docker/build-push-action@v5
with: with:
context: . context: .
file: Dockerfile file: Dockerfile
@ -54,11 +54,11 @@ jobs:
needs: [docker] needs: [docker]
steps: steps:
- name: Checkout - name: Checkout
uses: actions/checkout@v3 uses: actions/checkout@v4
- name: Docker meta for TAG - name: Docker meta for TAG
id: meta id: meta
uses: docker/metadata-action@v4 uses: docker/metadata-action@v5
with: with:
images: | images: |
${{ secrets.DOCKER_IMAGE }} ${{ secrets.DOCKER_IMAGE }}
@ -67,20 +67,20 @@ jobs:
type=raw,value=${{ inputs.tag }}-dind type=raw,value=${{ inputs.tag }}-dind
- name: Login to DockerHub - name: Login to DockerHub
uses: docker/login-action@v2 uses: docker/login-action@v3
with: with:
username: ${{ secrets.DOCKERHUB_USERNAME }} username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }} password: ${{ secrets.DOCKER_PASSWORD }}
- name: Login to GitHub Container Registry - name: Login to GitHub Container Registry
uses: docker/login-action@v2 uses: docker/login-action@v3
with: with:
registry: ghcr.io registry: ghcr.io
username: ${{ github.repository_owner }} username: ${{ github.repository_owner }}
password: ${{ secrets.GITHUB_TOKEN }} password: ${{ secrets.GITHUB_TOKEN }}
- name: Build and push - name: Build and push
uses: docker/build-push-action@v4 uses: docker/build-push-action@v5
with: with:
context: . context: .
file: Dockerfile.docker file: Dockerfile.docker

View file

@ -8,15 +8,29 @@ on:
types: [assigned, opened, synchronize, reopened] types: [assigned, opened, synchronize, reopened]
jobs: jobs:
pre-commit:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.8", "3.9", "3.10"]
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
- uses: pre-commit-ci/lite-action@v1.0.1
compiling: compiling:
name: Run installation process and code compilation supported Python versions name: Run installation process and code compilation supported Python versions
runs-on: ubuntu-latest runs-on: ubuntu-latest
needs: [pre-commit]
strategy: strategy:
matrix: matrix:
python-version: ["3.8", "3.9", "3.10"] python-version: ["3.8", "3.9", "3.10"]
steps: steps:
- uses: actions/checkout@v3 - uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }} - name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4 uses: actions/setup-python@v4
@ -44,7 +58,7 @@ jobs:
python: ["3.8", "3.9", "3.10"] python: ["3.8", "3.9", "3.10"]
steps: steps:
- uses: actions/checkout@v3 - uses: actions/checkout@v4
- name: Setup Python - name: Setup Python
uses: actions/setup-python@v4 uses: actions/setup-python@v4
@ -67,7 +81,7 @@ jobs:
python: ["3.8", "3.9", "3.10"] python: ["3.8", "3.9", "3.10"]
steps: steps:
- uses: actions/checkout@v3 - uses: actions/checkout@v4
- name: Setup Python - name: Setup Python
uses: actions/setup-python@v4 uses: actions/setup-python@v4
@ -90,7 +104,7 @@ jobs:
python: ["3.8", "3.9", "3.10"] python: ["3.8", "3.9", "3.10"]
steps: steps:
- uses: actions/checkout@v3 - uses: actions/checkout@v4
- name: Setup Python - name: Setup Python
uses: actions/setup-python@v4 uses: actions/setup-python@v4

View file

@ -11,7 +11,7 @@ jobs:
# runs-on: ubuntu-latest # runs-on: ubuntu-latest
# steps: # steps:
# - name: Checkout code # - name: Checkout code
# uses: actions/checkout@v3 # uses: actions/checkout@v4
# with: # with:
# fetch-depth: 0 # fetch-depth: 0
@ -36,7 +36,7 @@ jobs:
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- name: Checkout code - name: Checkout code
uses: actions/checkout@v3 uses: actions/checkout@v4
- name: Install dependencies - name: Install dependencies
run: | run: |
@ -59,11 +59,11 @@ jobs:
needs: [pypi] needs: [pypi]
steps: steps:
- name: Checkout - name: Checkout
uses: actions/checkout@v3 uses: actions/checkout@v4
- name: Docker meta for TAG - name: Docker meta for TAG
id: meta id: meta
uses: docker/metadata-action@v4 uses: docker/metadata-action@v5
with: with:
images: | images: |
${{ secrets.DOCKER_IMAGE }} ${{ secrets.DOCKER_IMAGE }}
@ -73,20 +73,20 @@ jobs:
type=raw,value=latest type=raw,value=latest
- name: Login to DockerHub - name: Login to DockerHub
uses: docker/login-action@v2 uses: docker/login-action@v3
with: with:
username: ${{ secrets.DOCKERHUB_USERNAME }} username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }} password: ${{ secrets.DOCKER_PASSWORD }}
- name: Login to GitHub Container Registry - name: Login to GitHub Container Registry
uses: docker/login-action@v2 uses: docker/login-action@v3
with: with:
registry: ghcr.io registry: ghcr.io
username: ${{ github.repository_owner }} username: ${{ github.repository_owner }}
password: ${{ secrets.GITHUB_TOKEN }} password: ${{ secrets.GITHUB_TOKEN }}
- name: Build and push - name: Build and push
uses: docker/build-push-action@v4 uses: docker/build-push-action@v5
with: with:
context: . context: .
file: Dockerfile file: Dockerfile
@ -100,11 +100,11 @@ jobs:
needs: [docker] needs: [docker]
steps: steps:
- name: Checkout - name: Checkout
uses: actions/checkout@v3 uses: actions/checkout@v4
- name: Docker meta for TAG - name: Docker meta for TAG
id: meta id: meta
uses: docker/metadata-action@v4 uses: docker/metadata-action@v5
with: with:
images: | images: |
${{ secrets.DOCKER_IMAGE }} ${{ secrets.DOCKER_IMAGE }}
@ -114,20 +114,20 @@ jobs:
type=raw,value=latest-dind type=raw,value=latest-dind
- name: Login to DockerHub - name: Login to DockerHub
uses: docker/login-action@v2 uses: docker/login-action@v3
with: with:
username: ${{ secrets.DOCKERHUB_USERNAME }} username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }} password: ${{ secrets.DOCKER_PASSWORD }}
- name: Login to GitHub Container Registry - name: Login to GitHub Container Registry
uses: docker/login-action@v2 uses: docker/login-action@v3
with: with:
registry: ghcr.io registry: ghcr.io
username: ${{ github.repository_owner }} username: ${{ github.repository_owner }}
password: ${{ secrets.GITHUB_TOKEN }} password: ${{ secrets.GITHUB_TOKEN }}
- name: Build and push - name: Build and push
uses: docker/build-push-action@v4 uses: docker/build-push-action@v5
with: with:
context: . context: .
file: Dockerfile.docker file: Dockerfile.docker

64
.pre-commit-config.yaml Normal file
View file

@ -0,0 +1,64 @@
---
# See https://pre-commit.com for more information
# See https://pre-commit.com/hooks.html for more hooks
files: ^(eos_downloader)/
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.4.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
- id: check-added-large-files
- id: check-merge-conflict
# - repo: https://github.com/pycqa/isort
# rev: 5.12.0
# hooks:
# - id: isort
# name: Check for changes when running isort on all python files
- repo: https://github.com/psf/black
rev: 23.7.0
hooks:
- id: black
name: Check for changes when running Black on all python files
- repo: https://github.com/pycqa/flake8
rev: 6.0.0
hooks:
- id: flake8
name: Check for PEP8 error on Python files
args:
- --config=/dev/null
- --max-line-length=165
- repo: local # as per https://pylint.pycqa.org/en/latest/user_guide/installation/pre-commit-integration.html
hooks:
- id: pylint
entry: pylint
language: python
name: Check for Linting error on Python files
description: This hook runs pylint.
types: [python]
args:
- -rn # Only display messages
- -sn # Don't display the score
- --rcfile=pylintrc # Link to config file
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.4.1
hooks:
- id: mypy
args:
- --config-file=pyproject.toml
additional_dependencies:
- "click==8.1.3"
- "click-help-colors==0.9.1"
- "pydantic~=2.0"
- "PyYAML==6.0"
- "requests>=2.27"
- "rich~=13.4"
- types-paramiko
- types-requests
files: eos_downloader

View file

@ -1,4 +1,13 @@
[![code-testing](https://github.com/titom73/eos-downloader/actions/workflows/pr-management.yml/badge.svg?event=push)](https://github.com/titom73/eos-downloader/actions/workflows/pr-management.yml) ![PyPI - Python Version](https://img.shields.io/pypi/pyversions/eos-downloader) ![GitHub release (latest SemVer)](https://img.shields.io/github/v/release/titom73/arista-downloader) ![PyPI - Downloads/month](https://img.shields.io/pypi/dm/eos-downloader) [![tests](https://github.com/titom73/eos-downloader/actions/workflows/pr-management.yml/badge.svg?event=push)](https://github.com/titom73/eos-downloader/actions/workflows/pr-management.yml)
![PyPI - Python Version](https://img.shields.io/pypi/pyversions/eos-downloader)
[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black)
![Checked with mypy](http://www.mypy-lang.org/static/mypy_badge.svg)
![GitHub release](https://img.shields.io/github/v/release/titom73/arista-downloader)
![PyPI - Downloads/month](https://img.shields.io/pypi/dm/eos-downloader)
<!--
[![pre-commit](https://img.shields.io/badge/pre--commit-enabled-brightgreen?logo=pre-commit&logoColor=white)](https://github.com/pre-commit/pre-commit)
!-->
# Arista Software Downloader # Arista Software Downloader
@ -19,6 +28,7 @@ Usage: ardl [OPTIONS] COMMAND [ARGS]...
Arista Network Download CLI Arista Network Download CLI
Options: Options:
--version Show the version and exit.
--token TEXT Arista Token from your customer account [env var: --token TEXT Arista Token from your customer account [env var:
ARISTA_TOKEN] ARISTA_TOKEN]
--help Show this message and exit. --help Show this message and exit.
@ -26,7 +36,6 @@ Options:
Commands: Commands:
debug Debug commands to work with ardl debug Debug commands to work with ardl
get Download Arista from Arista website get Download Arista from Arista website
version Display version of ardl
``` ```
> **Warning** > **Warning**
@ -35,10 +44,22 @@ Commands:
### Download EOS Package ### Download EOS Package
> **Note**
> Supported packages are: EOS, cEOS, vEOS-lab, cEOS64 > Supported packages are: EOS, cEOS, vEOS-lab, cEOS64
You can download EOS packages with following commands: CLI gives an option to get latest version available. By default it takes latest `F` release
```bash
ardl get eos --image-type cEOS --latest
```
If you want to get latest M release, you can use `--release-type`:
```bash
ardl get eos --image-type cEOS --release-type M --latest
```
You can download a specific EOS packages with following commands:
```bash ```bash
# Example for a cEOS package # Example for a cEOS package
@ -164,7 +185,7 @@ tqdm
On EVE-NG, you may have to install/upgrade __pyOpenSSL__ in version `23.0.0`: On EVE-NG, you may have to install/upgrade __pyOpenSSL__ in version `23.0.0`:
``` ```bash
# Error when running ardl: AttributeError: module 'lib' has no attribute 'X509_V_FLAG_CB_ISSUER_CHECK' # Error when running ardl: AttributeError: module 'lib' has no attribute 'X509_V_FLAG_CB_ISSUER_CHECK'
$ pip install pyopenssl --upgrade $ pip install pyopenssl --upgrade

View file

@ -1,111 +0,0 @@
## scripts
These scripts are deprecated and will be removed in a futur version. Please prefer the use of the CLI implemented in the package.
### eos-download
```bash
usage: eos-download [-h]
--version VERSION
[--token TOKEN]
[--image IMAGE]
[--destination DESTINATION]
[--eve]
[--noztp]
[--import_docker]
[--docker_name DOCKER_NAME]
[--verbose VERBOSE]
[--log]
EOS downloader script.
optional arguments:
-h, --help show this help message and exit
--token TOKEN arista.com user API key - can use ENV:ARISTA_TOKEN
--image IMAGE Type of EOS image required
--version VERSION EOS version to download from website
--destination DESTINATION
Path where to save EOS package downloaded
--eve Option to install EOS package to EVE-NG
--noztp Option to deactivate ZTP when used with EVE-NG
--import_docker Option to import cEOS image to docker
--docker_name DOCKER_NAME
Docker image name to use
--verbose VERBOSE Script verbosity
--log Option to activate logging to eos-downloader.log file
```
- Token are read from `ENV:ARISTA_TOKEN` unless you specify a specific token with CLI.
- Supported platforms:
- `INT`: International version
- `64`: 64 bits version
- `2GB` for 2GB flash platform
- `2GB-INT`: for 2GB running International
- `vEOS`: Virtual EOS image
- `vEOS-lab`: Virtual Lab EOS
- `vEOS64-lab`: Virtual Lab EOS running 64B
- `cEOS`: Docker version of EOS
- `cEOS64`: Docker version of EOS running in 64 bits
#### Examples
- Download vEOS-lab image and install in EVE-NG
```bash
$ eos-download --image vEOS-lab --version 4.25.7M --eve --noztp
```
- Download Docker image
```bash
$ eos-download --image cEOS --version 4.27.1F
🪐 eos-downloader is starting...
- Image Type: cEOS
- Version: 4.27.2F
✅ Authenticated on arista.com
🔎 Searching file cEOS-lab-4.27.2F.tar.xz
-> Found file at /support/download/EOS-USA/Active Releases/4.27/EOS-4.27.2F/cEOS-lab/cEOS-lab-4.27.2F.tar.xz
💾 Downloading cEOS-lab-4.27.2F.tar.xz ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100.0% • 17.1 MB/s • 451.6/451.6 MB • 0:00:19 •
🚀 Running checksum validation
🔎 Searching file cEOS-lab-4.27.2F.tar.xz.sha512sum
-> Found file at /support/download/EOS-USA/Active
Releases/4.27/EOS-4.27.2F/cEOS-lab/cEOS-lab-4.27.2F.tar.xz.sha512sum
💾 Downloading cEOS-lab-4.27.2F.tar.xz.sha512sum ━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100.0% • ? • 154/154 bytes • 0:00:00 •
✅ Downloaded file is correct.
```
__Note:__ `ARISTA_TOKEN` should be set in your .profile and not set for each command. If not set, you can use `--token` knob.
```bash
# Export Token
export ARISTA_TOKEN="xxxxxxx"
```
### Cloudvision Image uploader
Create an image bundle on Cloudvision.
```bash
cvp-upload -h
usage: cvp-upload [-h]
[--token TOKEN]
[--image IMAGE]
--cloudvision CLOUDVISION
[--create_bundle]
[--timeout TIMEOUT]
[--verbose VERBOSE]
Cloudvision Image uploader script.
optional arguments:
-h, --help show this help message and exit
--token TOKEN CVP Authentication token - can use ENV:ARISTA_AVD_CV_TOKEN
--image IMAGE Type of EOS image required
--cloudvision CLOUDVISION
Cloudvision instance where to upload image
--create_bundle Option to create image bundle with new uploaded image
--timeout TIMEOUT Timeout connection. Default is set to 1200sec
--verbose VERBOSE Script verbosity
```

View file

@ -1,56 +0,0 @@
#!/usr/bin/python
import sys
import os
import argparse
from eos_downloader.cvp import CvFeatureManager, CvpAuthenticationItem
from loguru import logger
ARISTA_AVD_CV_TOKEN = os.getenv('ARISTA_AVD_CV_TOKEN', '')
def read_cli():
parser = argparse.ArgumentParser(description='Cloudvision Image uploader script.')
parser.add_argument('--token', required=False,
default=ARISTA_AVD_CV_TOKEN,
help='CVP Authentication token - can use ENV:ARISTA_AVD_CV_TOKEN')
parser.add_argument('--image', required=False,
default='EOS', help='Type of EOS image required')
parser.add_argument('--cloudvision', required=True,
help='Cloudvision instance where to upload image')
parser.add_argument('--create_bundle', required=False, action='store_true',
help="Option to create image bundle with new uploaded image")
parser.add_argument('--timeout', required=False,
default=1200,
help='Timeout connection. Default is set to 1200sec')
parser.add_argument('--verbose', required=False,
default='info', help='Script verbosity')
return parser.parse_args()
if __name__ == '__main__':
cli_options = read_cli()
logger.remove()
logger.add(sys.stderr, level=str(cli_options.verbose).upper())
cv_authentication = CvpAuthenticationItem(
server=cli_options.cloudvision,
token=cli_options.token,
port=443,
timeout=cli_options.timeout,
validate_cert=False
)
my_cvp_uploader = CvFeatureManager(authentication=cv_authentication)
result_upload = my_cvp_uploader.upload_image(cli_options.image)
if result_upload and cli_options.create_bundle:
bundle_name = os.path.basename(cli_options.image)
logger.info('Creating image bundle {}'.format(bundle_name))
my_cvp_uploader.create_bundle(
name=bundle_name,
images_name=[bundle_name]
)
sys.exit(0)

View file

@ -1,86 +0,0 @@
#!/usr/bin/python
import sys
import os
import argparse
import eos_downloader.eos
from loguru import logger
from rich.console import Console
ARISTA_TOKEN = os.getenv('ARISTA_TOKEN', '')
def read_cli():
parser = argparse.ArgumentParser(description='EOS downloader script.')
parser.add_argument('--token', required=False,
default=ARISTA_TOKEN,
help='arista.com user API key - can use ENV:ARISTA_TOKEN')
parser.add_argument('--image', required=False,
default='EOS', help='Type of EOS image required')
parser.add_argument('--version', required=True,
default='', help='EOS version to download from website')
parser.add_argument('--destination', required=False,
default=str(os.getcwd()),
help='Path where to save EOS package downloaded')
parser.add_argument('--eve', required=False, action='store_true',
help="Option to install EOS package to EVE-NG")
parser.add_argument('--noztp', required=False, action='store_true',
help="Option to deactivate ZTP when used with EVE-NG")
parser.add_argument('--import_docker', required=False, action='store_true',
help="Option to import cEOS image to docker")
parser.add_argument('--docker_name', required=False,
default='arista/ceos',
help='Docker image name to use')
parser.add_argument('--verbose', required=False,
default='info', help='Script verbosity')
parser.add_argument('--log', required=False, action='store_true',
help="Option to activate logging to eos-downloader.log file")
return parser.parse_args()
if __name__ == '__main__':
cli_options = read_cli()
console = Console()
console.print('\n[red]WARNING: This script is now deprecated. Please use ardl cli instead[/red]\n\n')
if cli_options.token is None or cli_options.token == '':
console.print('\n❗ Token is unset ! Please configure ARISTA_TOKEN or use --token option', style="bold red")
sys.exit(1)
logger.remove()
if cli_options.log:
logger.add("eos-downloader.log", rotation="10 MB", level=str(cli_options.verbose).upper())
console.print("🪐 [bold blue]eos-downloader[/bold blue] is starting...", )
console.print(f' - Image Type: {cli_options.image}')
console.print(f' - Version: {cli_options.version}')
my_download = eos_downloader.eos.EOSDownloader(
image=cli_options.image,
software='EOS',
version=cli_options.version,
token=cli_options.token,
hash_method='sha512sum')
my_download.authenticate()
if cli_options.eve:
my_download.provision_eve(noztp=cli_options.noztp, checksum=True)
else:
my_download.download_local(file_path=cli_options.destination, checksum=True)
if cli_options.import_docker:
my_download.docker_import(
image_name=cli_options.docker_name
)
console.print('✅ processing done !')
sys.exit(0)

View file

@ -5,23 +5,31 @@
EOS Downloader module. EOS Downloader module.
""" """
from __future__ import (absolute_import, division, from __future__ import (
print_function, unicode_literals, annotations) absolute_import,
import dataclasses annotations,
from typing import Any division,
import json print_function,
import importlib.metadata unicode_literals,
)
__author__ = '@titom73' import dataclasses
__email__ = 'tom@inetsix.net' import importlib.metadata
__date__ = '2022-03-16' import json
from typing import Any
__author__ = "@titom73"
__email__ = "tom@inetsix.net"
__date__ = "2022-03-16"
__version__ = importlib.metadata.version("eos-downloader") __version__ = importlib.metadata.version("eos-downloader")
# __all__ = ["CvpAuthenticationItem", "CvFeatureManager", "EOSDownloader", "ObjectDownloader", "reverse"] # __all__ = ["CvpAuthenticationItem", "CvFeatureManager", "EOSDownloader", "ObjectDownloader", "reverse"]
ARISTA_GET_SESSION = "https://www.arista.com/custom_data/api/cvp/getSessionCode/" ARISTA_GET_SESSION = "https://www.arista.com/custom_data/api/cvp/getSessionCode/"
ARISTA_SOFTWARE_FOLDER_TREE = "https://www.arista.com/custom_data/api/cvp/getFolderTree/" ARISTA_SOFTWARE_FOLDER_TREE = (
"https://www.arista.com/custom_data/api/cvp/getFolderTree/"
)
ARISTA_DOWNLOAD_URL = "https://www.arista.com/custom_data/api/cvp/getDownloadLink/" ARISTA_DOWNLOAD_URL = "https://www.arista.com/custom_data/api/cvp/getDownloadLink/"
@ -36,11 +44,12 @@ check the Access Token. Then re-run the script with the correct token.
MSG_INVALID_DATA = """Invalid data returned by server MSG_INVALID_DATA = """Invalid data returned by server
""" """
EVE_QEMU_FOLDER_PATH = '/opt/unetlab/addons/qemu/' EVE_QEMU_FOLDER_PATH = "/opt/unetlab/addons/qemu/"
class EnhancedJSONEncoder(json.JSONEncoder): class EnhancedJSONEncoder(json.JSONEncoder):
"""Custom JSon encoder.""" """Custom JSon encoder."""
def default(self, o: Any) -> Any: def default(self, o: Any) -> Any:
if dataclasses.is_dataclass(o): if dataclasses.is_dataclass(o):
return dataclasses.asdict(o) return dataclasses.asdict(o)

View file

@ -11,49 +11,51 @@ ARDL CLI Baseline.
""" """
import click import click
from rich.console import Console
import eos_downloader from eos_downloader import __version__
from eos_downloader.cli.get import commands as get_commands
from eos_downloader.cli.debug import commands as debug_commands from eos_downloader.cli.debug import commands as debug_commands
from eos_downloader.cli.get import commands as get_commands
from eos_downloader.cli.info import commands as info_commands from eos_downloader.cli.info import commands as info_commands
from eos_downloader.cli.utils import AliasedGroup
@click.group()
@click.group(cls=AliasedGroup)
@click.version_option(__version__)
@click.pass_context @click.pass_context
@click.option('--token', show_envvar=True, default=None, help='Arista Token from your customer account') @click.option(
"--token",
show_envvar=True,
default=None,
help="Arista Token from your customer account",
)
def ardl(ctx: click.Context, token: str) -> None: def ardl(ctx: click.Context, token: str) -> None:
"""Arista Network Download CLI""" """Arista Network Download CLI"""
ctx.ensure_object(dict) ctx.ensure_object(dict)
ctx.obj['token'] = token ctx.obj["token"] = token
@click.command() @ardl.group(cls=AliasedGroup, no_args_is_help=True)
def version() -> None:
"""Display version of ardl"""
console = Console()
console.print(f'ardl is running version {eos_downloader.__version__}')
@ardl.group(no_args_is_help=True)
@click.pass_context @click.pass_context
def get(ctx: click.Context) -> None: def get(ctx: click.Context, cls: click.Group = AliasedGroup) -> None:
# pylint: disable=redefined-builtin # pylint: disable=redefined-builtin
"""Download Arista from Arista website""" """Download Arista from Arista website"""
@ardl.group(no_args_is_help=True) @ardl.group(cls=AliasedGroup, no_args_is_help=True)
@click.pass_context @click.pass_context
def info(ctx: click.Context) -> None: def info(ctx: click.Context, cls: click.Group = AliasedGroup) -> None:
# pylint: disable=redefined-builtin # pylint: disable=redefined-builtin
"""List information from Arista website""" """List information from Arista website"""
@ardl.group(no_args_is_help=True) @ardl.group(cls=AliasedGroup, no_args_is_help=True)
@click.pass_context @click.pass_context
def debug(ctx: click.Context) -> None: def debug(ctx: click.Context, cls: click.Group = AliasedGroup) -> None:
# pylint: disable=redefined-builtin # pylint: disable=redefined-builtin
"""Debug commands to work with ardl""" """Debug commands to work with ardl"""
# ANTA CLI Execution # ANTA CLI Execution
@ -64,13 +66,9 @@ def cli() -> None:
get.add_command(get_commands.cvp) get.add_command(get_commands.cvp)
info.add_command(info_commands.eos_versions) info.add_command(info_commands.eos_versions)
debug.add_command(debug_commands.xml) debug.add_command(debug_commands.xml)
ardl.add_command(version)
# Load CLI # Load CLI
ardl( ardl(obj={}, auto_envvar_prefix="arista")
obj={},
auto_envvar_prefix='arista'
)
if __name__ == '__main__': if __name__ == "__main__":
cli() cli()

View file

@ -22,32 +22,51 @@ import eos_downloader.eos
@click.command() @click.command()
@click.pass_context @click.pass_context
@click.option('--output', default=str('arista.xml'), help='Path to save XML file', type=click.Path(), show_default=True) @click.option(
@click.option('--log-level', '--log', help='Logging level of the command', default=None, type=click.Choice(['debug', 'info', 'warning', 'error', 'critical'], case_sensitive=False)) "--output",
default=str("arista.xml"),
help="Path to save XML file",
type=click.Path(),
show_default=True,
)
@click.option(
"--log-level",
"--log",
help="Logging level of the command",
default=None,
type=click.Choice(
["debug", "info", "warning", "error", "critical"], case_sensitive=False
),
)
def xml(ctx: click.Context, output: str, log_level: str) -> None: def xml(ctx: click.Context, output: str, log_level: str) -> None:
# sourcery skip: remove-unnecessary-cast # sourcery skip: remove-unnecessary-cast
"""Extract XML directory structure""" """Extract XML directory structure"""
console = Console() console = Console()
# Get from Context # Get from Context
token = ctx.obj['token'] token = ctx.obj["token"]
logger.remove() logger.remove()
if log_level is not None: if log_level is not None:
logger.add("eos-downloader.log", rotation="10 MB", level=log_level.upper()) logger.add("eos-downloader.log", rotation="10 MB", level=log_level.upper())
my_download = eos_downloader.eos.EOSDownloader( my_download = eos_downloader.eos.EOSDownloader(
image='unset', image="unset",
software='EOS', software="EOS",
version='unset', version="unset",
token=token, token=token,
hash_method='sha512sum') hash_method="sha512sum",
)
my_download.authenticate() my_download.authenticate()
xml_object: ET.ElementTree = my_download._get_folder_tree() # pylint: disable=protected-access xml_object: ET.ElementTree = (
my_download.get_folder_tree()
) # pylint: disable=protected-access
xml_content = xml_object.getroot() xml_content = xml_object.getroot()
xmlstr = minidom.parseString(ET.tostring(xml_content)).toprettyxml(indent=" ", newl='') xmlstr = minidom.parseString(ET.tostring(xml_content)).toprettyxml(
with open(output, "w", encoding='utf-8') as f: indent=" ", newl=""
)
with open(output, "w", encoding="utf-8") as f:
f.write(str(xmlstr)) f.write(str(xmlstr))
console.print(f'XML file saved in: { output }') console.print(f"XML file saved in: { output }")

View file

@ -21,68 +21,156 @@ from rich.console import Console
import eos_downloader.eos import eos_downloader.eos
from eos_downloader.models.version import BASE_VERSION_STR, RTYPE_FEATURE, RTYPES from eos_downloader.models.version import BASE_VERSION_STR, RTYPE_FEATURE, RTYPES
EOS_IMAGE_TYPE = ['64', 'INT', '2GB-INT', 'cEOS', 'cEOS64', 'vEOS', 'vEOS-lab', 'EOS-2GB', 'default'] EOS_IMAGE_TYPE = [
CVP_IMAGE_TYPE = ['ova', 'rpm', 'kvm', 'upgrade'] "64",
"INT",
"2GB-INT",
"cEOS",
"cEOS64",
"vEOS",
"vEOS-lab",
"EOS-2GB",
"default",
]
CVP_IMAGE_TYPE = ["ova", "rpm", "kvm", "upgrade"]
@click.command(no_args_is_help=True) @click.command(no_args_is_help=True)
@click.pass_context @click.pass_context
@click.option('--image-type', default='default', help='EOS Image type', type=click.Choice(EOS_IMAGE_TYPE), required=True) @click.option(
@click.option('--version', default=None, help='EOS version', type=str, required=False) "--image-type",
@click.option('--latest', '-l', is_flag=True, type=click.BOOL, default=False, help='Get latest version in given branch. If --branch is not use, get the latest branch with specific release type') default="default",
@click.option('--release-type', '-rtype', type=click.Choice(RTYPES, case_sensitive=False), default=RTYPE_FEATURE, help='EOS release type to search') help="EOS Image type",
@click.option('--branch', '-b', type=click.STRING, default=None, help='EOS Branch to list releases') type=click.Choice(EOS_IMAGE_TYPE),
@click.option('--docker-name', default='arista/ceos', help='Docker image name (default: arista/ceos)', type=str, show_default=True) required=True,
@click.option('--output', default=str(os.path.relpath(os.getcwd(), start=os.curdir)), help='Path to save image', type=click.Path(),show_default=True) )
@click.option("--version", default=None, help="EOS version", type=str, required=False)
@click.option(
"--latest",
"-l",
is_flag=True,
type=click.BOOL,
default=False,
help="Get latest version in given branch. If --branch is not use, get the latest branch with specific release type",
)
@click.option(
"--release-type",
"-rtype",
type=click.Choice(RTYPES, case_sensitive=False),
default=RTYPE_FEATURE,
help="EOS release type to search",
)
@click.option(
"--branch",
"-b",
type=click.STRING,
default=None,
help="EOS Branch to list releases",
)
@click.option(
"--docker-name",
default="arista/ceos",
help="Docker image name (default: arista/ceos)",
type=str,
show_default=True,
)
@click.option(
"--output",
default=str(os.path.relpath(os.getcwd(), start=os.curdir)),
help="Path to save image",
type=click.Path(),
show_default=True,
)
# Debugging # Debugging
@click.option('--log-level', '--log', help='Logging level of the command', default=None, type=click.Choice(['debug', 'info', 'warning', 'error', 'critical'], case_sensitive=False)) @click.option(
"--log-level",
"--log",
help="Logging level of the command",
default=None,
type=click.Choice(
["debug", "info", "warning", "error", "critical"], case_sensitive=False
),
)
# Boolean triggers # Boolean triggers
@click.option('--eve-ng', is_flag=True, help='Run EVE-NG vEOS provisioning (only if CLI runs on an EVE-NG server)', default=False) @click.option(
@click.option('--disable-ztp', is_flag=True, help='Disable ZTP process in vEOS image (only available with --eve-ng)', default=False) "--eve-ng",
@click.option('--import-docker', is_flag=True, help='Import docker image (only available with --image_type cEOSlab)', default=False) is_flag=True,
help="Run EVE-NG vEOS provisioning (only if CLI runs on an EVE-NG server)",
default=False,
)
@click.option(
"--disable-ztp",
is_flag=True,
help="Disable ZTP process in vEOS image (only available with --eve-ng)",
default=False,
)
@click.option(
"--import-docker",
is_flag=True,
help="Import docker image (only available with --image_type cEOSlab)",
default=False,
)
def eos( def eos(
ctx: click.Context, image_type: str, output: str, log_level: str, eve_ng: bool, disable_ztp: bool, ctx: click.Context,
import_docker: bool, docker_name: str, version: Union[str, None] = None, release_type: str = RTYPE_FEATURE, image_type: str,
latest: bool = False, branch: Union[str,None] = None output: str,
) -> int: log_level: str,
eve_ng: bool,
disable_ztp: bool,
import_docker: bool,
docker_name: str,
version: Union[str, None] = None,
release_type: str = RTYPE_FEATURE,
latest: bool = False,
branch: Union[str, None] = None,
) -> int:
"""Download EOS image from Arista website""" """Download EOS image from Arista website"""
console = Console() console = Console()
# Get from Context # Get from Context
token = ctx.obj['token'] token = ctx.obj["token"]
if token is None or token == '': if token is None or token == "":
console.print('❗ Token is unset ! Please configure ARISTA_TOKEN or use --token option', style="bold red") console.print(
"❗ Token is unset ! Please configure ARISTA_TOKEN or use --token option",
style="bold red",
)
sys.exit(1) sys.exit(1)
logger.remove() logger.remove()
if log_level is not None: if log_level is not None:
logger.add("eos-downloader.log", rotation="10 MB", level=log_level.upper()) logger.add("eos-downloader.log", rotation="10 MB", level=log_level.upper())
console.print("🪐 [bold blue]eos-downloader[/bold blue] is starting...", ) console.print(
console.print(f' - Image Type: {image_type}') "🪐 [bold blue]eos-downloader[/bold blue] is starting...",
console.print(f' - Version: {version}') )
console.print(f" - Image Type: {image_type}")
console.print(f" - Version: {version}")
if version is not None: if version is not None:
my_download = eos_downloader.eos.EOSDownloader( my_download = eos_downloader.eos.EOSDownloader(
image=image_type, image=image_type,
software='EOS', software="EOS",
version=version, version=version,
token=token, token=token,
hash_method='sha512sum') hash_method="sha512sum",
)
my_download.authenticate() my_download.authenticate()
elif latest: elif latest:
my_download = eos_downloader.eos.EOSDownloader( my_download = eos_downloader.eos.EOSDownloader(
image=image_type, image=image_type,
software='EOS', software="EOS",
version='unset', version="unset",
token=token, token=token,
hash_method='sha512sum') hash_method="sha512sum",
)
my_download.authenticate() my_download.authenticate()
if branch is None: if branch is None:
branch = str(my_download.latest_branch(rtype=release_type).branch) branch = str(my_download.latest_branch(rtype=release_type).branch)
latest_version = my_download.latest_eos(branch, rtype=release_type) latest_version = my_download.latest_eos(branch, rtype=release_type)
if str(latest_version) == BASE_VERSION_STR: if str(latest_version) == BASE_VERSION_STR:
console.print(f'[red]Error[/red], cannot find any version in {branch} for {release_type} release type') console.print(
f"[red]Error[/red], cannot find any version in {branch} for {release_type} release type"
)
sys.exit(1) sys.exit(1)
my_download.version = str(latest_version) my_download.version = str(latest_version)
@ -92,46 +180,71 @@ def eos(
my_download.download_local(file_path=output, checksum=True) my_download.download_local(file_path=output, checksum=True)
if import_docker: if import_docker:
my_download.docker_import( my_download.docker_import(image_name=docker_name)
image_name=docker_name console.print("✅ processing done !")
)
console.print('✅ processing done !')
sys.exit(0) sys.exit(0)
@click.command(no_args_is_help=True) @click.command(no_args_is_help=True)
@click.pass_context @click.pass_context
@click.option('--format', default='upgrade', help='CVP Image type', type=click.Choice(CVP_IMAGE_TYPE), required=True) @click.option(
@click.option('--version', default=None, help='CVP version', type=str, required=True) "--format",
@click.option('--output', default=str(os.path.relpath(os.getcwd(), start=os.curdir)), help='Path to save image', type=click.Path(),show_default=True) default="upgrade",
@click.option('--log-level', '--log', help='Logging level of the command', default=None, type=click.Choice(['debug', 'info', 'warning', 'error', 'critical'], case_sensitive=False)) help="CVP Image type",
def cvp(ctx: click.Context, version: str, format: str, output: str, log_level: str) -> int: type=click.Choice(CVP_IMAGE_TYPE),
required=True,
)
@click.option("--version", default=None, help="CVP version", type=str, required=True)
@click.option(
"--output",
default=str(os.path.relpath(os.getcwd(), start=os.curdir)),
help="Path to save image",
type=click.Path(),
show_default=True,
)
@click.option(
"--log-level",
"--log",
help="Logging level of the command",
default=None,
type=click.Choice(
["debug", "info", "warning", "error", "critical"], case_sensitive=False
),
)
def cvp(
ctx: click.Context, version: str, format: str, output: str, log_level: str
) -> int:
"""Download CVP image from Arista website""" """Download CVP image from Arista website"""
console = Console() console = Console()
# Get from Context # Get from Context
token = ctx.obj['token'] token = ctx.obj["token"]
if token is None or token == '': if token is None or token == "":
console.print('❗ Token is unset ! Please configure ARISTA_TOKEN or use --token option', style="bold red") console.print(
"❗ Token is unset ! Please configure ARISTA_TOKEN or use --token option",
style="bold red",
)
sys.exit(1) sys.exit(1)
logger.remove() logger.remove()
if log_level is not None: if log_level is not None:
logger.add("eos-downloader.log", rotation="10 MB", level=log_level.upper()) logger.add("eos-downloader.log", rotation="10 MB", level=log_level.upper())
console.print("🪐 [bold blue]eos-downloader[/bold blue] is starting...", ) console.print(
console.print(f' - Image Type: {format}') "🪐 [bold blue]eos-downloader[/bold blue] is starting...",
console.print(f' - Version: {version}') )
console.print(f" - Image Type: {format}")
console.print(f" - Version: {version}")
my_download = eos_downloader.eos.EOSDownloader( my_download = eos_downloader.eos.EOSDownloader(
image=format, image=format,
software='CloudVision', software="CloudVision",
version=version, version=version,
token=token, token=token,
hash_method='md5sum') hash_method="md5sum",
)
my_download.authenticate() my_download.authenticate()
my_download.download_local(file_path=output, checksum=False) my_download.download_local(file_path=output, checksum=False)
console.print('✅ processing done !') console.print("✅ processing done !")
sys.exit(0) sys.exit(0)

View file

@ -24,12 +24,53 @@ from eos_downloader.models.version import BASE_VERSION_STR, RTYPE_FEATURE, RTYPE
@click.command(no_args_is_help=True) @click.command(no_args_is_help=True)
@click.pass_context @click.pass_context
@click.option('--latest', '-l', is_flag=True, type=click.BOOL, default=False, help='Get latest version in given branch. If --branch is not use, get the latest branch with specific release type') @click.option(
@click.option('--release-type', '-rtype', type=click.Choice(RTYPES, case_sensitive=False), default=RTYPE_FEATURE, help='EOS release type to search') "--latest",
@click.option('--branch', '-b', type=click.STRING, default=None, help='EOS Branch to list releases') "-l",
@click.option('--verbose', '-v', is_flag=True, type=click.BOOL, default=False, help='Human readable output. Default is none to use output in script)') is_flag=True,
@click.option('--log-level', '--log', help='Logging level of the command', default='warning', type=click.Choice(['debug', 'info', 'warning', 'error', 'critical'], case_sensitive=False)) type=click.BOOL,
def eos_versions(ctx: click.Context, log_level: str, branch: Union[str,None] = None, release_type: str = RTYPE_FEATURE, latest: bool = False, verbose: bool = False) -> None: default=False,
help="Get latest version in given branch. If --branch is not use, get the latest branch with specific release type",
)
@click.option(
"--release-type",
"-rtype",
type=click.Choice(RTYPES, case_sensitive=False),
default=RTYPE_FEATURE,
help="EOS release type to search",
)
@click.option(
"--branch",
"-b",
type=click.STRING,
default=None,
help="EOS Branch to list releases",
)
@click.option(
"--verbose",
"-v",
is_flag=True,
type=click.BOOL,
default=False,
help="Human readable output. Default is none to use output in script)",
)
@click.option(
"--log-level",
"--log",
help="Logging level of the command",
default="warning",
type=click.Choice(
["debug", "info", "warning", "error", "critical"], case_sensitive=False
),
)
def eos_versions(
ctx: click.Context,
log_level: str,
branch: Union[str, None] = None,
release_type: str = RTYPE_FEATURE,
latest: bool = False,
verbose: bool = False,
) -> None:
# pylint: disable = too-many-branches # pylint: disable = too-many-branches
""" """
List Available EOS version on Arista.com website. List Available EOS version on Arista.com website.
@ -42,22 +83,23 @@ def eos_versions(ctx: click.Context, log_level: str, branch: Union[str,None] = N
""" """
console = Console() console = Console()
# Get from Context # Get from Context
token = ctx.obj['token'] token = ctx.obj["token"]
logger.remove() logger.remove()
if log_level is not None: if log_level is not None:
logger.add("eos-downloader.log", rotation="10 MB", level=log_level.upper()) logger.add("eos-downloader.log", rotation="10 MB", level=log_level.upper())
my_download = eos_downloader.eos.EOSDownloader( my_download = eos_downloader.eos.EOSDownloader(
image='unset', image="unset",
software='EOS', software="EOS",
version='unset', version="unset",
token=token, token=token,
hash_method='sha512sum') hash_method="sha512sum",
)
auth = my_download.authenticate() auth = my_download.authenticate()
if verbose and auth: if verbose and auth:
console.print('✅ Authenticated on arista.com') console.print("✅ Authenticated on arista.com")
if release_type is not None: if release_type is not None:
release_type = release_type.upper() release_type = release_type.upper()
@ -67,21 +109,27 @@ def eos_versions(ctx: click.Context, log_level: str, branch: Union[str,None] = N
branch = str(my_download.latest_branch(rtype=release_type).branch) branch = str(my_download.latest_branch(rtype=release_type).branch)
latest_version = my_download.latest_eos(branch, rtype=release_type) latest_version = my_download.latest_eos(branch, rtype=release_type)
if str(latest_version) == BASE_VERSION_STR: if str(latest_version) == BASE_VERSION_STR:
console.print(f'[red]Error[/red], cannot find any version in {branch} for {release_type} release type') console.print(
f"[red]Error[/red], cannot find any version in {branch} for {release_type} release type"
)
sys.exit(1) sys.exit(1)
if verbose: if verbose:
console.print(f'Branch {branch} has been selected with release type {release_type}') console.print(
f"Branch {branch} has been selected with release type {release_type}"
)
if branch is not None: if branch is not None:
console.print(f'Latest release for {branch}: {latest_version}') console.print(f"Latest release for {branch}: {latest_version}")
else: else:
console.print(f'Latest EOS release: {latest_version}') console.print(f"Latest EOS release: {latest_version}")
else: else:
console.print(f'{ latest_version }') console.print(f"{ latest_version }")
else: else:
versions = my_download.get_eos_versions(branch=branch, rtype=release_type) versions = my_download.get_eos_versions(branch=branch, rtype=release_type)
if verbose: if verbose:
console.print(f'List of available versions for {branch if branch is not None else "all branches"}') console.print(
f'List of available versions for {branch if branch is not None else "all branches"}'
)
for version in versions: for version in versions:
console.print(f'{str(version)}') console.print(f"{str(version)}")
else: else:
pprint([str(version) for version in versions]) pprint([str(version) for version in versions])

View file

@ -0,0 +1,38 @@
#!/usr/bin/python
# coding: utf-8 -*-
# pylint: disable=inconsistent-return-statements
"""
Extension for the python ``click`` module
to provide a group or command with aliases.
"""
from typing import Any
import click
class AliasedGroup(click.Group):
"""
Implements a subclass of Group that accepts a prefix for a command.
If there were a command called push, it would accept pus as an alias (so long as it was unique)
"""
def get_command(self, ctx: click.Context, cmd_name: str) -> Any:
"""Documentation to build"""
rv = click.Group.get_command(self, ctx, cmd_name)
if rv is not None:
return rv
matches = [x for x in self.list_commands(ctx)
if x.startswith(cmd_name)]
if not matches:
return None
if len(matches) == 1:
return click.Group.get_command(self, ctx, matches[0])
ctx.fail(f"Too many matches: {', '.join(sorted(matches))}")
def resolve_command(self, ctx: click.Context, args: Any) -> Any:
"""Documentation to build"""
# always return the full command name
_, cmd, args = super().resolve_command(ctx, args)
return cmd.name, cmd, args

View file

@ -6,11 +6,12 @@ CVP Uploader content
""" """
import os import os
from typing import List, Optional, Any
from dataclasses import dataclass from dataclasses import dataclass
from loguru import logger from typing import Any, List, Optional
from cvprac.cvp_client import CvpClient from cvprac.cvp_client import CvpClient
from cvprac.cvp_client_errors import CvpLoginError from cvprac.cvp_client_errors import CvpLoginError
from loguru import logger
# from eos_downloader.tools import exc_to_str # from eos_downloader.tools import exc_to_str
@ -20,8 +21,9 @@ from cvprac.cvp_client_errors import CvpLoginError
@dataclass @dataclass
class CvpAuthenticationItem: class CvpAuthenticationItem:
""" """
Data structure to represent Cloudvision Authentication Data structure to represent Cloudvision Authentication
""" """
server: str server: str
port: int = 443 port: int = 443
token: Optional[str] = None token: Optional[str] = None
@ -29,15 +31,16 @@ class CvpAuthenticationItem:
validate_cert: bool = False validate_cert: bool = False
class Filer(): class Filer:
# pylint: disable=too-few-public-methods # pylint: disable=too-few-public-methods
""" """
Filer Helper for file management Filer Helper for file management
""" """
def __init__(self, path: str) -> None: def __init__(self, path: str) -> None:
self.file_exist = False self.file_exist = False
self.filename = '' self.filename = ""
self.absolute_path = '' self.absolute_path = ""
self.relative_path = path self.relative_path = path
if os.path.exists(path): if os.path.exists(path):
self.file_exist = True self.file_exist = True
@ -45,13 +48,14 @@ class Filer():
self.absolute_path = os.path.realpath(path) self.absolute_path = os.path.realpath(path)
def __repr__(self) -> str: def __repr__(self) -> str:
return self.absolute_path if self.file_exist else '' return self.absolute_path if self.file_exist else ""
class CvFeatureManager(): class CvFeatureManager:
""" """
CvFeatureManager Object to interect with Cloudvision CvFeatureManager Object to interect with Cloudvision
""" """
def __init__(self, authentication: CvpAuthenticationItem) -> None: def __init__(self, authentication: CvpAuthenticationItem) -> None:
""" """
__init__ Class Creator __init__ Class Creator
@ -86,19 +90,21 @@ class CvFeatureManager():
try: try:
client.connect( client.connect(
nodes=[authentication.server], nodes=[authentication.server],
username='', username="",
password='', password="",
api_token=authentication.token, api_token=authentication.token,
is_cvaas=True, is_cvaas=True,
port=authentication.port, port=authentication.port,
cert=authentication.validate_cert, cert=authentication.validate_cert,
request_timeout=authentication.timeout request_timeout=authentication.timeout,
) )
except CvpLoginError as error_data: except CvpLoginError as error_data:
logger.error(f'Cannot connect to Cloudvision server {authentication.server}') logger.error(
logger.debug(f'Error message: {error_data}') f"Cannot connect to Cloudvision server {authentication.server}"
logger.info('connected to Cloudvision server') )
logger.debug(f'Connection info: {authentication}') logger.debug(f"Error message: {error_data}")
logger.info("connected to Cloudvision server")
logger.debug(f"Connection info: {authentication}")
return client return client
def __get_images(self) -> List[Any]: def __get_images(self) -> List[Any]:
@ -111,8 +117,8 @@ class CvFeatureManager():
Fact returned by Cloudvision Fact returned by Cloudvision
""" """
images = [] images = []
logger.debug(' -> Collecting images') logger.debug(" -> Collecting images")
images = self._cv_instance.api.get_images()['data'] images = self._cv_instance.api.get_images()["data"]
return images if self.__check_api_result(images) else [] return images if self.__check_api_result(images) else []
# def __get_bundles(self): # def __get_bundles(self):
@ -161,7 +167,11 @@ class CvFeatureManager():
bool bool
True if present True if present
""" """
return any(image_name == image['name'] for image in self._cv_images) if isinstance(self._cv_images, list) else False return (
any(image_name == image["name"] for image in self._cv_images)
if isinstance(self._cv_images, list)
else False
)
def _does_bundle_exist(self, bundle_name: str) -> bool: def _does_bundle_exist(self, bundle_name: str) -> bool:
# pylint: disable=unused-argument # pylint: disable=unused-argument
@ -192,19 +202,23 @@ class CvFeatureManager():
""" """
image_item = Filer(path=image_path) image_item = Filer(path=image_path)
if image_item.file_exist is False: if image_item.file_exist is False:
logger.error(f'File not found: {image_item.relative_path}') logger.error(f"File not found: {image_item.relative_path}")
return False return False
logger.info(f'File path for image: {image_item}') logger.info(f"File path for image: {image_item}")
if self._does_image_exist(image_name=image_item.filename): if self._does_image_exist(image_name=image_item.filename):
logger.error("Image found in Cloudvision , Please delete it before running this script") logger.error(
"Image found in Cloudvision , Please delete it before running this script"
)
return False return False
try: try:
upload_result = self._cv_instance.api.add_image(filepath=image_item.absolute_path) upload_result = self._cv_instance.api.add_image(
filepath=image_item.absolute_path
)
except Exception as e: # pylint: disable=broad-exception-caught except Exception as e: # pylint: disable=broad-exception-caught
logger.error('An error occurred during upload, check CV connection') logger.error("An error occurred during upload, check CV connection")
logger.error(f'Exception message is: {e}') logger.error(f"Exception message is: {e}")
return False return False
logger.debug(f'Upload Result is : {upload_result}') logger.debug(f"Upload Result is : {upload_result}")
return True return True
def build_image_list(self, image_list: List[str]) -> List[Any]: def build_image_list(self, image_list: List[str]) -> List[Any]:
@ -252,25 +266,30 @@ class CvFeatureManager():
bool bool
True if succeeds True if succeeds
""" """
logger.debug(f'Init creation of an image bundle {name} with following images {images_name}') logger.debug(
f"Init creation of an image bundle {name} with following images {images_name}"
)
all_images_present: List[bool] = [] all_images_present: List[bool] = []
self._cv_images = self.__get_images() self._cv_images = self.__get_images()
all_images_present.extend( all_images_present.extend(
self._does_image_exist(image_name=image_name) self._does_image_exist(image_name=image_name) for image_name in images_name
for image_name in images_name
) )
# Bundle Create # Bundle Create
if self._does_bundle_exist(bundle_name=name) is False: if self._does_bundle_exist(bundle_name=name) is False:
logger.debug(f'Creating image bundle {name} with following images {images_name}') logger.debug(
f"Creating image bundle {name} with following images {images_name}"
)
images_data = self.build_image_list(image_list=images_name) images_data = self.build_image_list(image_list=images_name)
if images_data is not None: if images_data is not None:
logger.debug('Images information: {images_data}') logger.debug("Images information: {images_data}")
try: try:
data = self._cv_instance.api.save_image_bundle(name=name, images=images_data) data = self._cv_instance.api.save_image_bundle(
name=name, images=images_data
)
except Exception as e: # pylint: disable=broad-exception-caught except Exception as e: # pylint: disable=broad-exception-caught
logger.critical(f'{e}') logger.critical(f"{e}")
else: else:
logger.debug(data) logger.debug(data)
return True return True
logger.critical('No data found for images') logger.critical("No data found for images")
return False return False

View file

@ -12,82 +12,22 @@ Data are built from content of Arista XML file
# [platform][image][version] # [platform][image][version]
DATA_MAPPING = { DATA_MAPPING = {
"CloudVision": { "CloudVision": {
"ova": { "ova": {"extension": ".ova", "prepend": "cvp", "folder_level": 0},
"extension": ".ova", "rpm": {"extension": "", "prepend": "cvp-rpm-installer", "folder_level": 0},
"prepend": "cvp", "kvm": {"extension": "-kvm.tgz", "prepend": "cvp", "folder_level": 0},
"folder_level": 0 "upgrade": {"extension": ".tgz", "prepend": "cvp-upgrade", "folder_level": 0},
},
"rpm": {
"extension": "",
"prepend": "cvp-rpm-installer",
"folder_level": 0
},
"kvm": {
"extension": "-kvm.tgz",
"prepend": "cvp",
"folder_level": 0
},
"upgrade": {
"extension": ".tgz",
"prepend": "cvp-upgrade",
"folder_level": 0
},
}, },
"EOS": { "EOS": {
"64": { "64": {"extension": ".swi", "prepend": "EOS64", "folder_level": 0},
"extension": ".swi", "INT": {"extension": "-INT.swi", "prepend": "EOS", "folder_level": 1},
"prepend": "EOS64", "2GB-INT": {"extension": "-INT.swi", "prepend": "EOS-2GB", "folder_level": 1},
"folder_level": 0 "cEOS": {"extension": ".tar.xz", "prepend": "cEOS-lab", "folder_level": 0},
}, "cEOS64": {"extension": ".tar.xz", "prepend": "cEOS64-lab", "folder_level": 0},
"INT": { "vEOS": {"extension": ".vmdk", "prepend": "vEOS", "folder_level": 0},
"extension": "-INT.swi", "vEOS-lab": {"extension": ".vmdk", "prepend": "vEOS-lab", "folder_level": 0},
"prepend": "EOS", "EOS-2GB": {"extension": ".swi", "prepend": "EOS-2GB", "folder_level": 0},
"folder_level": 1 "RN": {"extension": "-", "prepend": "RN", "folder_level": 0},
}, "SOURCE": {"extension": "-source.tar", "prepend": "EOS", "folder_level": 0},
"2GB-INT": { "default": {"extension": ".swi", "prepend": "EOS", "folder_level": 0},
"extension": "-INT.swi", },
"prepend": "EOS-2GB",
"folder_level": 1
},
"cEOS": {
"extension": ".tar.xz",
"prepend": "cEOS-lab",
"folder_level": 0
},
"cEOS64": {
"extension": ".tar.xz",
"prepend": "cEOS64-lab",
"folder_level": 0
},
"vEOS": {
"extension": ".vmdk",
"prepend": "vEOS",
"folder_level": 0
},
"vEOS-lab": {
"extension": ".vmdk",
"prepend": "vEOS-lab",
"folder_level": 0
},
"EOS-2GB": {
"extension": ".swi",
"prepend": "EOS-2GB",
"folder_level": 0
},
"RN": {
"extension": "-",
"prepend": "RN",
"folder_level": 0
},
"SOURCE": {
"extension": "-source.tar",
"prepend": "EOS",
"folder_level": 0
},
"default": {
"extension": ".swi",
"prepend": "EOS",
"folder_level": 0
}
}
} }

View file

@ -8,13 +8,20 @@ import os.path
import signal import signal
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
from threading import Event from threading import Event
from typing import Iterable, Any from typing import Any, Iterable
import requests import requests
import rich import rich
from rich import console from rich import console
from rich.progress import (BarColumn, DownloadColumn, Progress, TaskID, from rich.progress import (
TextColumn, TimeElapsedColumn, TransferSpeedColumn) BarColumn,
DownloadColumn,
Progress,
TaskID,
TextColumn,
TimeElapsedColumn,
TransferSpeedColumn,
)
console = rich.get_console() console = rich.get_console()
done_event = Event() done_event = Event()
@ -28,7 +35,7 @@ def handle_sigint(signum: Any, frame: Any) -> None:
signal.signal(signal.SIGINT, handle_sigint) signal.signal(signal.SIGINT, handle_sigint)
class DownloadProgressBar(): class DownloadProgressBar:
""" """
Object to manage Download process with Progress Bar from Rich Object to manage Download process with Progress Bar from Rich
""" """
@ -38,7 +45,9 @@ class DownloadProgressBar():
Class Constructor Class Constructor
""" """
self.progress = Progress( self.progress = Progress(
TextColumn("💾 Downloading [bold blue]{task.fields[filename]}", justify="right"), TextColumn(
"💾 Downloading [bold blue]{task.fields[filename]}", justify="right"
),
BarColumn(bar_width=None), BarColumn(bar_width=None),
"[progress.percentage]{task.percentage:>3.1f}%", "[progress.percentage]{task.percentage:>3.1f}%",
"", "",
@ -48,14 +57,16 @@ class DownloadProgressBar():
"", "",
TimeElapsedColumn(), TimeElapsedColumn(),
"", "",
console=console console=console,
) )
def _copy_url(self, task_id: TaskID, url: str, path: str, block_size: int = 1024) -> bool: def _copy_url(
self, task_id: TaskID, url: str, path: str, block_size: int = 1024
) -> bool:
"""Copy data from a url to a local file.""" """Copy data from a url to a local file."""
response = requests.get(url, stream=True, timeout=5) response = requests.get(url, stream=True, timeout=5)
# This will break if the response doesn't contain content length # This will break if the response doesn't contain content length
self.progress.update(task_id, total=int(response.headers['Content-Length'])) self.progress.update(task_id, total=int(response.headers["Content-Length"]))
with open(path, "wb") as dest_file: with open(path, "wb") as dest_file:
self.progress.start_task(task_id) self.progress.start_task(task_id)
for data in response.iter_content(chunk_size=block_size): for data in response.iter_content(chunk_size=block_size):
@ -71,7 +82,9 @@ class DownloadProgressBar():
with self.progress: with self.progress:
with ThreadPoolExecutor(max_workers=4) as pool: with ThreadPoolExecutor(max_workers=4) as pool:
for url in urls: for url in urls:
filename = url.split("/")[-1].split('?')[0] filename = url.split("/")[-1].split("?")[0]
dest_path = os.path.join(dest_dir, filename) dest_path = os.path.join(dest_dir, filename)
task_id = self.progress.add_task("download", filename=filename, start=False) task_id = self.progress.add_task(
"download", filename=filename, start=False
)
pool.submit(self._copy_url, task_id, url, dest_path) pool.submit(self._copy_url, task_id, url, dest_path)

View file

@ -14,13 +14,20 @@ import rich
from loguru import logger from loguru import logger
from rich import console from rich import console
from eos_downloader.models.version import BASE_BRANCH_STR, BASE_VERSION_STR, REGEX_EOS_VERSION, RTYPE_FEATURE, EosVersion from eos_downloader.models.version import (
BASE_BRANCH_STR,
BASE_VERSION_STR,
REGEX_EOS_VERSION,
RTYPE_FEATURE,
EosVersion,
)
from eos_downloader.object_downloader import ObjectDownloader from eos_downloader.object_downloader import ObjectDownloader
# logger = logging.getLogger(__name__) # logger = logging.getLogger(__name__)
console = rich.get_console() console = rich.get_console()
class EOSDownloader(ObjectDownloader): class EOSDownloader(ObjectDownloader):
""" """
EOSDownloader Object to download EOS images from Arista.com website EOSDownloader Object to download EOS images from Arista.com website
@ -47,22 +54,27 @@ class EOSDownloader(ObjectDownloader):
file_path : str file_path : str
Path where EOS image is located Path where EOS image is located
""" """
logger.info('Mounting volume to disable ZTP') logger.info("Mounting volume to disable ZTP")
console.print('🚀 Mounting volume to disable ZTP') console.print("🚀 Mounting volume to disable ZTP")
raw_folder = os.path.join(file_path, "raw") raw_folder = os.path.join(file_path, "raw")
os.system(f"rm -rf {raw_folder}") os.system(f"rm -rf {raw_folder}")
os.system(f"mkdir -p {raw_folder}") os.system(f"mkdir -p {raw_folder}")
os.system( os.system(
f'guestmount -a {os.path.join(file_path, "hda.qcow2")} -m /dev/sda2 {os.path.join(file_path, "raw")}') f'guestmount -a {os.path.join(file_path, "hda.qcow2")} -m /dev/sda2 {os.path.join(file_path, "raw")}'
ztp_file = os.path.join(file_path, 'raw/zerotouch-config') )
with open(ztp_file, 'w', encoding='ascii') as zfile: ztp_file = os.path.join(file_path, "raw/zerotouch-config")
zfile.write('DISABLE=True') with open(ztp_file, "w", encoding="ascii") as zfile:
logger.info(f'Unmounting volume in {file_path}') zfile.write("DISABLE=True")
logger.info(f"Unmounting volume in {file_path}")
os.system(f"guestunmount {os.path.join(file_path, 'raw')}") os.system(f"guestunmount {os.path.join(file_path, 'raw')}")
os.system(f"rm -rf {os.path.join(file_path, 'raw')}") os.system(f"rm -rf {os.path.join(file_path, 'raw')}")
logger.info(f"Volume has been successfully unmounted at {file_path}") logger.info(f"Volume has been successfully unmounted at {file_path}")
def _parse_xml_for_version(self,root_xml: ET.ElementTree, xpath: str = './/dir[@label="Active Releases"]/dir/dir/[@label]') -> List[EosVersion]: def _parse_xml_for_version(
self,
root_xml: ET.ElementTree,
xpath: str = './/dir[@label="Active Releases"]/dir/dir/[@label]',
) -> List[EosVersion]:
""" """
Extract list of available EOS versions from Arista.com website Extract list of available EOS versions from Arista.com website
@ -77,19 +89,21 @@ class EOSDownloader(ObjectDownloader):
""" """
# XPATH: .//dir[@label="Active Releases"]/dir/dir/[@label] # XPATH: .//dir[@label="Active Releases"]/dir/dir/[@label]
if self.eos_versions is None: if self.eos_versions is None:
logger.debug(f'Using xpath {xpath}') logger.debug(f"Using xpath {xpath}")
eos_versions = [] eos_versions = []
for node in root_xml.findall(xpath): for node in root_xml.findall(xpath):
if 'label' in node.attrib and node.get('label') is not None: if "label" in node.attrib and node.get("label") is not None:
label = node.get('label') label = node.get("label")
if label is not None and REGEX_EOS_VERSION.match(label): if label is not None and REGEX_EOS_VERSION.match(label):
eos_version = EosVersion.from_str(label) eos_version = EosVersion.from_str(label)
eos_versions.append(eos_version) eos_versions.append(eos_version)
logger.debug(f"Found {label} - {eos_version}") logger.debug(f"Found {label} - {eos_version}")
logger.debug(f'List of versions found on arista.com is: {eos_versions}') logger.debug(f"List of versions found on arista.com is: {eos_versions}")
self.eos_versions = eos_versions self.eos_versions = eos_versions
else: else:
logger.debug('receiving instruction to download versions, but already available') logger.debug(
"receiving instruction to download versions, but already available"
)
return self.eos_versions return self.eos_versions
def _get_branches(self, with_rtype: str = RTYPE_FEATURE) -> List[str]: def _get_branches(self, with_rtype: str = RTYPE_FEATURE) -> List[str]:
@ -104,9 +118,11 @@ class EOSDownloader(ObjectDownloader):
Returns: Returns:
List[str]: A lsit of string that represent all availables EOS branches List[str]: A lsit of string that represent all availables EOS branches
""" """
root = self._get_folder_tree() root = self.get_folder_tree()
versions = self._parse_xml_for_version(root_xml=root) versions = self._parse_xml_for_version(root_xml=root)
return list({version.branch for version in versions if version.rtype == with_rtype}) return list(
{version.branch for version in versions if version.rtype == with_rtype}
)
def latest_branch(self, rtype: str = RTYPE_FEATURE) -> EosVersion: def latest_branch(self, rtype: str = RTYPE_FEATURE) -> EosVersion:
""" """
@ -125,7 +141,9 @@ class EOSDownloader(ObjectDownloader):
selected_branch = branch selected_branch = branch
return selected_branch return selected_branch
def get_eos_versions(self, branch: Union[str,None] = None, rtype: Union[str,None] = None) -> List[EosVersion]: def get_eos_versions(
self, branch: Union[str, None] = None, rtype: Union[str, None] = None
) -> List[EosVersion]:
""" """
Get a list of available EOS version available on arista.com Get a list of available EOS version available on arista.com
@ -139,16 +157,22 @@ class EOSDownloader(ObjectDownloader):
Returns: Returns:
List[EosVersion]: A list of versions available List[EosVersion]: A list of versions available
""" """
root = self._get_folder_tree() root = self.get_folder_tree()
result = [] result = []
for version in self._parse_xml_for_version(root_xml=root): for version in self._parse_xml_for_version(root_xml=root):
if branch is None and (version.rtype == rtype or rtype is None): if branch is None and (version.rtype == rtype or rtype is None):
result.append(version) result.append(version)
elif branch is not None and version.is_in_branch(branch) and version.rtype == rtype: elif (
branch is not None
and version.is_in_branch(branch)
and version.rtype == rtype
):
result.append(version) result.append(version)
return result return result
def latest_eos(self, branch: Union[str,None] = None, rtype: str = RTYPE_FEATURE) -> EosVersion: def latest_eos(
self, branch: Union[str, None] = None, rtype: str = RTYPE_FEATURE
) -> EosVersion:
""" """
Get latest version of EOS Get latest version of EOS
@ -168,7 +192,9 @@ class EOSDownloader(ObjectDownloader):
latest_branch = self.latest_branch(rtype=rtype) latest_branch = self.latest_branch(rtype=rtype)
else: else:
latest_branch = EosVersion.from_str(branch) latest_branch = EosVersion.from_str(branch)
for version in self.get_eos_versions(branch=str(latest_branch.branch), rtype=rtype): for version in self.get_eos_versions(
branch=str(latest_branch.branch), rtype=rtype
):
if version > selected_version: if version > selected_version:
if rtype is not None and version.rtype == rtype: if rtype is not None and version.rtype == rtype:
selected_version = version selected_version = version

View file

@ -16,11 +16,11 @@ from eos_downloader.tools import exc_to_str
# logger = logging.getLogger(__name__) # logger = logging.getLogger(__name__)
BASE_VERSION_STR = '4.0.0F' BASE_VERSION_STR = "4.0.0F"
BASE_BRANCH_STR = '4.0' BASE_BRANCH_STR = "4.0"
RTYPE_FEATURE = 'F' RTYPE_FEATURE = "F"
RTYPE_MAINTENANCE = 'M' RTYPE_MAINTENANCE = "M"
RTYPES = [RTYPE_FEATURE, RTYPE_MAINTENANCE] RTYPES = [RTYPE_FEATURE, RTYPE_MAINTENANCE]
# Regular Expression to capture multiple EOS version format # Regular Expression to capture multiple EOS version format
@ -29,8 +29,12 @@ RTYPES = [RTYPE_FEATURE, RTYPE_MAINTENANCE]
# 4.21.1M # 4.21.1M
# 4.28.10.F # 4.28.10.F
# 4.28.6.1M # 4.28.6.1M
REGEX_EOS_VERSION = re.compile(r"^.*(?P<major>4)\.(?P<minor>\d{1,2})\.(?P<patch>\d{1,2})(?P<other>\.\d*)*(?P<rtype>[M,F])*$") REGEX_EOS_VERSION = re.compile(
REGEX_EOS_BRANCH = re.compile(r"^.*(?P<major>4)\.(?P<minor>\d{1,2})(\.?P<patch>\d)*(\.\d)*(?P<rtype>[M,F])*$") r"^.*(?P<major>4)\.(?P<minor>\d{1,2})\.(?P<patch>\d{1,2})(?P<other>\.\d*)*(?P<rtype>[M,F])*$"
)
REGEX_EOS_BRANCH = re.compile(
r"^.*(?P<major>4)\.(?P<minor>\d{1,2})(\.?P<patch>\d)*(\.\d)*(?P<rtype>[M,F])*$"
)
class EosVersion(BaseModel): class EosVersion(BaseModel):
@ -59,10 +63,11 @@ class EosVersion(BaseModel):
Args: Args:
BaseModel (Pydantic): Pydantic Base Model BaseModel (Pydantic): Pydantic Base Model
""" """
major: int = 4 major: int = 4
minor: int = 0 minor: int = 0
patch: int = 0 patch: int = 0
rtype: Optional[str] = 'F' rtype: Optional[str] = "F"
other: Any = None other: Any = None
@classmethod @classmethod
@ -84,7 +89,7 @@ class EosVersion(BaseModel):
Returns: Returns:
EosVersion object EosVersion object
""" """
logger.debug(f'receiving version: {eos_version}') logger.debug(f"receiving version: {eos_version}")
if REGEX_EOS_VERSION.match(eos_version): if REGEX_EOS_VERSION.match(eos_version):
matches = REGEX_EOS_VERSION.match(eos_version) matches = REGEX_EOS_VERSION.match(eos_version)
# assert matches is not None # assert matches is not None
@ -95,7 +100,7 @@ class EosVersion(BaseModel):
# assert matches is not None # assert matches is not None
assert matches is not None assert matches is not None
return cls(**matches.groupdict()) return cls(**matches.groupdict())
logger.error(f'Error occured with {eos_version}') logger.error(f"Error occured with {eos_version}")
return EosVersion() return EosVersion()
@property @property
@ -106,7 +111,7 @@ class EosVersion(BaseModel):
Returns: Returns:
str: branch from version str: branch from version
""" """
return f'{self.major}.{self.minor}' return f"{self.major}.{self.minor}"
def __str__(self) -> str: def __str__(self) -> str:
""" """
@ -118,8 +123,8 @@ class EosVersion(BaseModel):
str: A standard EOS version string representing <MAJOR>.<MINOR>.<PATCH><RTYPE> str: A standard EOS version string representing <MAJOR>.<MINOR>.<PATCH><RTYPE>
""" """
if self.other is None: if self.other is None:
return f'{self.major}.{self.minor}.{self.patch}{self.rtype}' return f"{self.major}.{self.minor}.{self.patch}{self.rtype}"
return f'{self.major}.{self.minor}.{self.patch}{self.other}{self.rtype}' return f"{self.major}.{self.minor}.{self.patch}{self.other}{self.rtype}"
def _compare(self, other: EosVersion) -> float: def _compare(self, other: EosVersion) -> float:
""" """
@ -141,58 +146,68 @@ class EosVersion(BaseModel):
float: -1 if ver1 < ver2, 0 if ver1 == ver2, 1 if ver1 > ver2 float: -1 if ver1 < ver2, 0 if ver1 == ver2, 1 if ver1 > ver2
""" """
if not isinstance(other, EosVersion): if not isinstance(other, EosVersion):
raise ValueError(f'could not compare {other} as it is not an EosVersion object') raise ValueError(
f"could not compare {other} as it is not an EosVersion object"
)
comparison_flag: float = 0 comparison_flag: float = 0
logger.warning(f'current version {self.__str__()} - other {str(other)}') # pylint: disable = unnecessary-dunder-call logger.warning(
f"current version {self.__str__()} - other {str(other)}" # pylint: disable = unnecessary-dunder-call
)
for key, _ in self.dict().items(): for key, _ in self.dict().items():
if comparison_flag == 0 and self.dict()[key] is None or other.dict()[key] is None: if (
logger.debug(f'{key}: local None - remote None') comparison_flag == 0
logger.debug(f'{key}: local {self.dict()} - remote {other.dict()}') and self.dict()[key] is None
or other.dict()[key] is None
):
logger.debug(f"{key}: local None - remote None")
logger.debug(f"{key}: local {self.dict()} - remote {other.dict()}")
return comparison_flag return comparison_flag
logger.debug(f'{key}: local {self.dict()[key]} - remote {other.dict()[key]}') logger.debug(
f"{key}: local {self.dict()[key]} - remote {other.dict()[key]}"
)
if comparison_flag == 0 and self.dict()[key] < other.dict()[key]: if comparison_flag == 0 and self.dict()[key] < other.dict()[key]:
comparison_flag = -1 comparison_flag = -1
if comparison_flag == 0 and self.dict()[key] > other.dict()[key]: if comparison_flag == 0 and self.dict()[key] > other.dict()[key]:
comparison_flag = 1 comparison_flag = 1
if comparison_flag != 0: if comparison_flag != 0:
logger.info(f'comparison result is {comparison_flag}') logger.info(f"comparison result is {comparison_flag}")
return comparison_flag return comparison_flag
logger.info(f'comparison result is {comparison_flag}') logger.info(f"comparison result is {comparison_flag}")
return comparison_flag return comparison_flag
@typing.no_type_check @typing.no_type_check
def __eq__(self, other): def __eq__(self, other):
""" Implement __eq__ function (==) """ """Implement __eq__ function (==)"""
return self._compare(other) == 0 return self._compare(other) == 0
@typing.no_type_check @typing.no_type_check
def __ne__(self, other): def __ne__(self, other):
# type: ignore # type: ignore
""" Implement __nw__ function (!=) """ """Implement __nw__ function (!=)"""
return self._compare(other) != 0 return self._compare(other) != 0
@typing.no_type_check @typing.no_type_check
def __lt__(self, other): def __lt__(self, other):
# type: ignore # type: ignore
""" Implement __lt__ function (<) """ """Implement __lt__ function (<)"""
return self._compare(other) < 0 return self._compare(other) < 0
@typing.no_type_check @typing.no_type_check
def __le__(self, other): def __le__(self, other):
# type: ignore # type: ignore
""" Implement __le__ function (<=) """ """Implement __le__ function (<=)"""
return self._compare(other) <= 0 return self._compare(other) <= 0
@typing.no_type_check @typing.no_type_check
def __gt__(self, other): def __gt__(self, other):
# type: ignore # type: ignore
""" Implement __gt__ function (>) """ """Implement __gt__ function (>)"""
return self._compare(other) > 0 return self._compare(other) > 0
@typing.no_type_check @typing.no_type_check
def __ge__(self, other): def __ge__(self, other):
# type: ignore # type: ignore
""" Implement __ge__ function (>=) """ """Implement __ge__ function (>=)"""
return self._compare(other) >= 0 return self._compare(other) >= 0
def match(self, match_expr: str) -> bool: def match(self, match_expr: str) -> bool:
@ -236,7 +251,7 @@ class EosVersion(BaseModel):
"['<', '>', '==', '<=', '>=', '!=']. " "['<', '>', '==', '<=', '>=', '!=']. "
f"You provided: {match_expr}" f"You provided: {match_expr}"
) )
logger.debug(f'work on comparison {prefix} with base release {match_version}') logger.debug(f"work on comparison {prefix} with base release {match_version}")
possibilities_dict = { possibilities_dict = {
">": (1,), ">": (1,),
"<": (-1,), "<": (-1,),
@ -263,7 +278,7 @@ class EosVersion(BaseModel):
bool: True if current version is in provided branch, otherwise False bool: True if current version is in provided branch, otherwise False
""" """
try: try:
logger.debug(f'reading branch str:{branch_str}') logger.debug(f"reading branch str:{branch_str}")
branch = EosVersion.from_str(branch_str) branch = EosVersion.from_str(branch_str)
except Exception as error: # pylint: disable = broad-exception-caught except Exception as error: # pylint: disable = broad-exception-caught
logger.error(exc_to_str(error)) logger.error(exc_to_str(error))

View file

@ -8,8 +8,13 @@
eos_downloader class definition eos_downloader class definition
""" """
from __future__ import (absolute_import, division, print_function, from __future__ import (
unicode_literals, annotations) absolute_import,
annotations,
division,
print_function,
unicode_literals,
)
import base64 import base64
import glob import glob
@ -26,9 +31,14 @@ from loguru import logger
from rich import console from rich import console
from tqdm import tqdm from tqdm import tqdm
from eos_downloader import (ARISTA_DOWNLOAD_URL, ARISTA_GET_SESSION, from eos_downloader import (
ARISTA_SOFTWARE_FOLDER_TREE, EVE_QEMU_FOLDER_PATH, ARISTA_DOWNLOAD_URL,
MSG_INVALID_DATA, MSG_TOKEN_EXPIRED) ARISTA_GET_SESSION,
ARISTA_SOFTWARE_FOLDER_TREE,
EVE_QEMU_FOLDER_PATH,
MSG_INVALID_DATA,
MSG_TOKEN_EXPIRED,
)
from eos_downloader.data import DATA_MAPPING from eos_downloader.data import DATA_MAPPING
from eos_downloader.download import DownloadProgressBar from eos_downloader.download import DownloadProgressBar
@ -37,11 +47,19 @@ from eos_downloader.download import DownloadProgressBar
console = rich.get_console() console = rich.get_console()
class ObjectDownloader(): class ObjectDownloader:
""" """
ObjectDownloader Generic Object to download from Arista.com ObjectDownloader Generic Object to download from Arista.com
""" """
def __init__(self, image: str, version: str, token: str, software: str = 'EOS', hash_method: str = 'md5sum'):
def __init__(
self,
image: str,
version: str,
token: str,
software: str = "EOS",
hash_method: str = "md5sum",
):
""" """
__init__ Class constructor __init__ Class constructor
@ -70,10 +88,10 @@ class ObjectDownloader():
self.hash_method = hash_method self.hash_method = hash_method
self.timeout = 5 self.timeout = 5
# Logging # Logging
logger.debug(f'Filename built by _build_filename is {self.filename}') logger.debug(f"Filename built by _build_filename is {self.filename}")
def __str__(self) -> str: def __str__(self) -> str:
return f'{self.software} - {self.image} - {self.version}' return f"{self.software} - {self.image} - {self.version}"
# def __repr__(self): # def __repr__(self):
# return str(self.__dict__) # return str(self.__dict__)
@ -102,16 +120,18 @@ class ObjectDownloader():
str: str:
Filename to search for on Arista.com Filename to search for on Arista.com
""" """
logger.info('start build') logger.info("start build")
if self.software in DATA_MAPPING: if self.software in DATA_MAPPING:
logger.info(f'software in data mapping: {self.software}') logger.info(f"software in data mapping: {self.software}")
if self.image in DATA_MAPPING[self.software]: if self.image in DATA_MAPPING[self.software]:
logger.info(f'image in data mapping: {self.image}') logger.info(f"image in data mapping: {self.image}")
return f"{DATA_MAPPING[self.software][self.image]['prepend']}-{self.version}{DATA_MAPPING[self.software][self.image]['extension']}" return f"{DATA_MAPPING[self.software][self.image]['prepend']}-{self.version}{DATA_MAPPING[self.software][self.image]['extension']}"
return f"{DATA_MAPPING[self.software]['default']['prepend']}-{self.version}{DATA_MAPPING[self.software]['default']['extension']}" return f"{DATA_MAPPING[self.software]['default']['prepend']}-{self.version}{DATA_MAPPING[self.software]['default']['extension']}"
raise ValueError(f'Incorrect value for software {self.software}') raise ValueError(f"Incorrect value for software {self.software}")
def _parse_xml_for_path(self, root_xml: ET.ElementTree, xpath: str, search_file: str) -> str: def _parse_xml_for_path(
self, root_xml: ET.ElementTree, xpath: str, search_file: str
) -> str:
# sourcery skip: remove-unnecessary-cast # sourcery skip: remove-unnecessary-cast
""" """
_parse_xml Read and extract data from XML using XPATH _parse_xml Read and extract data from XML using XPATH
@ -132,18 +152,18 @@ class ObjectDownloader():
str str
File Path on Arista server side File Path on Arista server side
""" """
logger.debug(f'Using xpath {xpath}') logger.debug(f"Using xpath {xpath}")
logger.debug(f'Search for file {search_file}') logger.debug(f"Search for file {search_file}")
console.print(f'🔎 Searching file {search_file}') console.print(f"🔎 Searching file {search_file}")
for node in root_xml.findall(xpath): for node in root_xml.findall(xpath):
# logger.debug('Found {}', node.text) # logger.debug('Found {}', node.text)
if str(node.text).lower() == search_file.lower(): if str(node.text).lower() == search_file.lower():
path = node.get('path') path = node.get("path")
console.print(f' -> Found file at {path}') console.print(f" -> Found file at {path}")
logger.info(f'Found {node.text} at {node.get("path")}') logger.info(f'Found {node.text} at {node.get("path")}')
return str(node.get('path')) if node.get('path') is not None else '' return str(node.get("path")) if node.get("path") is not None else ""
logger.error(f'Requested file ({self.filename}) not found !') logger.error(f"Requested file ({self.filename}) not found !")
return '' return ""
def _get_hash(self, file_path: str) -> str: def _get_hash(self, file_path: str) -> str:
""" """
@ -165,10 +185,10 @@ class ObjectDownloader():
dl_rich_progress_bar = DownloadProgressBar() dl_rich_progress_bar = DownloadProgressBar()
dl_rich_progress_bar.download(urls=[hash_url], dest_dir=file_path) dl_rich_progress_bar.download(urls=[hash_url], dest_dir=file_path)
hash_downloaded = f"{file_path}/{os.path.basename(remote_hash_file)}" hash_downloaded = f"{file_path}/{os.path.basename(remote_hash_file)}"
hash_content = 'unset' hash_content = "unset"
with open(hash_downloaded, 'r', encoding='utf-8') as f: with open(hash_downloaded, "r", encoding="utf-8") as f:
hash_content = f.read() hash_content = f.read()
return hash_content.split(' ')[0] return hash_content.split(" ")[0]
@staticmethod @staticmethod
def _compute_hash_md5sum(file: str, hash_expected: str) -> bool: def _compute_hash_md5sum(file: str, hash_expected: str) -> bool:
@ -195,7 +215,9 @@ class ObjectDownloader():
hash_md5.update(chunk) hash_md5.update(chunk)
if hash_md5.hexdigest() == hash_expected: if hash_md5.hexdigest() == hash_expected:
return True return True
logger.warning(f'Downloaded file is corrupt: local md5 ({hash_md5.hexdigest()}) is different to md5 from arista ({hash_expected})') logger.warning(
f"Downloaded file is corrupt: local md5 ({hash_md5.hexdigest()}) is different to md5 from arista ({hash_expected})"
)
return False return False
@staticmethod @staticmethod
@ -223,10 +245,12 @@ class ObjectDownloader():
hash_sha512.update(chunk) hash_sha512.update(chunk)
if hash_sha512.hexdigest() == hash_expected: if hash_sha512.hexdigest() == hash_expected:
return True return True
logger.warning(f'Downloaded file is corrupt: local sha512 ({hash_sha512.hexdigest()}) is different to sha512 from arista ({hash_expected})') logger.warning(
f"Downloaded file is corrupt: local sha512 ({hash_sha512.hexdigest()}) is different to sha512 from arista ({hash_expected})"
)
return False return False
def _get_folder_tree(self) -> ET.ElementTree: def get_folder_tree(self) -> ET.ElementTree:
""" """
_get_folder_tree Download XML tree from Arista server _get_folder_tree Download XML tree from Arista server
@ -237,15 +261,17 @@ class ObjectDownloader():
""" """
if self.session_id is None: if self.session_id is None:
self.authenticate() self.authenticate()
jsonpost = {'sessionCode': self.session_id} jsonpost = {"sessionCode": self.session_id}
result = requests.post(ARISTA_SOFTWARE_FOLDER_TREE, data=json.dumps(jsonpost), timeout=self.timeout) result = requests.post(
ARISTA_SOFTWARE_FOLDER_TREE, data=json.dumps(jsonpost), timeout=self.timeout
)
try: try:
folder_tree = result.json()["data"]["xml"] folder_tree = result.json()["data"]["xml"]
return ET.ElementTree(ET.fromstring(folder_tree)) return ET.ElementTree(ET.fromstring(folder_tree))
except KeyError as error: except KeyError as error:
logger.error(MSG_INVALID_DATA) logger.error(MSG_INVALID_DATA)
logger.error(f'Server returned: {error}') logger.error(f"Server returned: {error}")
console.print(f'{MSG_INVALID_DATA}', style="bold red") console.print(f"{MSG_INVALID_DATA}", style="bold red")
sys.exit(1) sys.exit(1)
def _get_remote_filepath(self) -> str: def _get_remote_filepath(self) -> str:
@ -259,12 +285,14 @@ class ObjectDownloader():
str str
Remote path of the file to download Remote path of the file to download
""" """
root = self._get_folder_tree() root = self.get_folder_tree()
logger.debug("GET XML content from ARISTA.com") logger.debug("GET XML content from ARISTA.com")
xpath = f'.//dir[@label="{self.software}"]//file' xpath = f'.//dir[@label="{self.software}"]//file'
return self._parse_xml_for_path(root_xml=root, xpath=xpath, search_file=self.filename) return self._parse_xml_for_path(
root_xml=root, xpath=xpath, search_file=self.filename
)
def _get_remote_hashpath(self, hash_method: str = 'md5sum') -> str: def _get_remote_hashpath(self, hash_method: str = "md5sum") -> str:
""" """
_get_remote_hashpath Helper to get path of the hash's file to download _get_remote_hashpath Helper to get path of the hash's file to download
@ -275,16 +303,16 @@ class ObjectDownloader():
str str
Remote path of the hash's file to download Remote path of the hash's file to download
""" """
root = self._get_folder_tree() root = self.get_folder_tree()
logger.debug("GET XML content from ARISTA.com") logger.debug("GET XML content from ARISTA.com")
xpath = f'.//dir[@label="{self.software}"]//file' xpath = f'.//dir[@label="{self.software}"]//file'
return self._parse_xml_for_path( return self._parse_xml_for_path(
root_xml=root, root_xml=root,
xpath=xpath, xpath=xpath,
search_file=f'{self.filename}.{hash_method}', search_file=f"{self.filename}.{hash_method}",
) )
def _get_url(self, remote_file_path: str) -> str: def _get_url(self, remote_file_path: str) -> str:
""" """
_get_url Get URL to use for downloading file from Arista server _get_url Get URL to use for downloading file from Arista server
@ -302,13 +330,15 @@ class ObjectDownloader():
""" """
if self.session_id is None: if self.session_id is None:
self.authenticate() self.authenticate()
jsonpost = {'sessionCode': self.session_id, 'filePath': remote_file_path} jsonpost = {"sessionCode": self.session_id, "filePath": remote_file_path}
result = requests.post(ARISTA_DOWNLOAD_URL, data=json.dumps(jsonpost), timeout=self.timeout) result = requests.post(
if 'data' in result.json() and 'url' in result.json()['data']: ARISTA_DOWNLOAD_URL, data=json.dumps(jsonpost), timeout=self.timeout
)
if "data" in result.json() and "url" in result.json()["data"]:
# logger.debug('URL to download file is: {}', result.json()) # logger.debug('URL to download file is: {}', result.json())
return result.json()["data"]["url"] return result.json()["data"]["url"]
logger.critical(f'Server returns following message: {result.json()}') logger.critical(f"Server returns following message: {result.json()}")
return '' return ""
@staticmethod @staticmethod
def _download_file_raw(url: str, file_path: str) -> str: def _download_file_raw(url: str, file_path: str) -> str:
@ -331,31 +361,40 @@ class ObjectDownloader():
""" """
chunkSize = 1024 chunkSize = 1024
r = requests.get(url, stream=True, timeout=5) r = requests.get(url, stream=True, timeout=5)
with open(file_path, 'wb') as f: with open(file_path, "wb") as f:
pbar = tqdm(unit="B", total=int(r.headers['Content-Length']), unit_scale=True, unit_divisor=1024) pbar = tqdm(
unit="B",
total=int(r.headers["Content-Length"]),
unit_scale=True,
unit_divisor=1024,
)
for chunk in r.iter_content(chunk_size=chunkSize): for chunk in r.iter_content(chunk_size=chunkSize):
if chunk: if chunk:
pbar.update(len(chunk)) pbar.update(len(chunk))
f.write(chunk) f.write(chunk)
return file_path return file_path
def _download_file(self, file_path: str, filename: str, rich_interface: bool = True) -> Union[None, str]: def _download_file(
self, file_path: str, filename: str, rich_interface: bool = True
) -> Union[None, str]:
remote_file_path = self._get_remote_filepath() remote_file_path = self._get_remote_filepath()
logger.info(f'File found on arista server: {remote_file_path}') logger.info(f"File found on arista server: {remote_file_path}")
file_url = self._get_url(remote_file_path=remote_file_path) file_url = self._get_url(remote_file_path=remote_file_path)
if file_url is not False: if file_url is not False:
if not rich_interface: if not rich_interface:
return self._download_file_raw(url=file_url, file_path=os.path.join(file_path, filename)) return self._download_file_raw(
url=file_url, file_path=os.path.join(file_path, filename)
)
rich_downloader = DownloadProgressBar() rich_downloader = DownloadProgressBar()
rich_downloader.download(urls=[file_url], dest_dir=file_path) rich_downloader.download(urls=[file_url], dest_dir=file_path)
return os.path.join(file_path, filename) return os.path.join(file_path, filename)
logger.error(f'Cannot download file {file_path}') logger.error(f"Cannot download file {file_path}")
return None return None
@staticmethod @staticmethod
def _create_destination_folder(path: str) -> None: def _create_destination_folder(path: str) -> None:
# os.makedirs(path, mode, exist_ok=True) # os.makedirs(path, mode, exist_ok=True)
os.system(f'mkdir -p {path}') os.system(f"mkdir -p {path}")
@staticmethod @staticmethod
def _disable_ztp(file_path: str) -> None: def _disable_ztp(file_path: str) -> None:
@ -379,24 +418,29 @@ class ObjectDownloader():
""" """
credentials = (base64.b64encode(self.token.encode())).decode("utf-8") credentials = (base64.b64encode(self.token.encode())).decode("utf-8")
session_code_url = ARISTA_GET_SESSION session_code_url = ARISTA_GET_SESSION
jsonpost = {'accessToken': credentials} jsonpost = {"accessToken": credentials}
result = requests.post(session_code_url, data=json.dumps(jsonpost), timeout=self.timeout) result = requests.post(
session_code_url, data=json.dumps(jsonpost), timeout=self.timeout
)
if result.json()["status"]["message"] in[ 'Access token expired', 'Invalid access token']: if result.json()["status"]["message"] in [
console.print(f'{MSG_TOKEN_EXPIRED}', style="bold red") "Access token expired",
"Invalid access token",
]:
console.print(f"{MSG_TOKEN_EXPIRED}", style="bold red")
logger.error(MSG_TOKEN_EXPIRED) logger.error(MSG_TOKEN_EXPIRED)
return False return False
try: try:
if 'data' in result.json(): if "data" in result.json():
self.session_id = result.json()["data"]["session_code"] self.session_id = result.json()["data"]["session_code"]
logger.info('Authenticated on arista.com') logger.info("Authenticated on arista.com")
return True return True
logger.debug(f'{result.json()}') logger.debug(f"{result.json()}")
return False return False
except KeyError as error_arista: except KeyError as error_arista:
logger.error(f'Error: {error_arista}') logger.error(f"Error: {error_arista}")
sys.exit(1) sys.exit(1)
def download_local(self, file_path: str, checksum: bool = False) -> bool: def download_local(self, file_path: str, checksum: bool = False) -> bool:
@ -422,25 +466,33 @@ class ObjectDownloader():
bool bool
True if everything went well, False if any problem appears True if everything went well, False if any problem appears
""" """
file_downloaded = str(self._download_file(file_path=file_path, filename=self.filename)) file_downloaded = str(
self._download_file(file_path=file_path, filename=self.filename)
)
# Check file HASH # Check file HASH
hash_result = False hash_result = False
if checksum: if checksum:
logger.info('🚀 Running checksum validation') logger.info("🚀 Running checksum validation")
console.print('🚀 Running checksum validation') console.print("🚀 Running checksum validation")
if self.hash_method == 'md5sum': if self.hash_method == "md5sum":
hash_expected = self._get_hash(file_path=file_path) hash_expected = self._get_hash(file_path=file_path)
hash_result = self._compute_hash_md5sum(file=file_downloaded, hash_expected=hash_expected) hash_result = self._compute_hash_md5sum(
elif self.hash_method == 'sha512sum': file=file_downloaded, hash_expected=hash_expected
)
elif self.hash_method == "sha512sum":
hash_expected = self._get_hash(file_path=file_path) hash_expected = self._get_hash(file_path=file_path)
hash_result = self._compute_hash_sh512sum(file=file_downloaded, hash_expected=hash_expected) hash_result = self._compute_hash_sh512sum(
file=file_downloaded, hash_expected=hash_expected
)
if not hash_result: if not hash_result:
logger.error('Downloaded file is corrupted, please check your connection') logger.error("Downloaded file is corrupted, please check your connection")
console.print('❌ Downloaded file is corrupted, please check your connection') console.print(
"❌ Downloaded file is corrupted, please check your connection"
)
return False return False
logger.info('Downloaded file is correct.') logger.info("Downloaded file is correct.")
console.print('✅ Downloaded file is correct.') console.print("✅ Downloaded file is correct.")
return True return True
def provision_eve(self, noztp: bool = False, checksum: bool = True) -> None: def provision_eve(self, noztp: bool = False, checksum: bool = True) -> None:
@ -466,7 +518,7 @@ class ObjectDownloader():
# Build image name to use in folder path # Build image name to use in folder path
eos_image_name = self.filename.rstrip(".vmdk").lower() eos_image_name = self.filename.rstrip(".vmdk").lower()
if noztp: if noztp:
eos_image_name = f'{eos_image_name}-noztp' eos_image_name = f"{eos_image_name}-noztp"
# Create full path for EVE-NG # Create full path for EVE-NG
file_path = os.path.join(EVE_QEMU_FOLDER_PATH, eos_image_name.rstrip()) file_path = os.path.join(EVE_QEMU_FOLDER_PATH, eos_image_name.rstrip())
# Create folders in filesystem # Create folders in filesystem
@ -474,20 +526,23 @@ class ObjectDownloader():
# Download file to local destination # Download file to local destination
file_downloaded = self._download_file( file_downloaded = self._download_file(
file_path=file_path, filename=self.filename) file_path=file_path, filename=self.filename
)
# Convert to QCOW2 format # Convert to QCOW2 format
file_qcow2 = os.path.join(file_path, "hda.qcow2") file_qcow2 = os.path.join(file_path, "hda.qcow2")
logger.info('Converting VMDK to QCOW2 format') logger.info("Converting VMDK to QCOW2 format")
console.print('🚀 Converting VMDK to QCOW2 format...') console.print("🚀 Converting VMDK to QCOW2 format...")
os.system(f'$(which qemu-img) convert -f vmdk -O qcow2 {file_downloaded} {file_qcow2}') os.system(
f"$(which qemu-img) convert -f vmdk -O qcow2 {file_downloaded} {file_qcow2}"
)
logger.info('Applying unl_wrapper to fix permissions') logger.info("Applying unl_wrapper to fix permissions")
console.print('Applying unl_wrapper to fix permissions') console.print("Applying unl_wrapper to fix permissions")
os.system('/opt/unetlab/wrappers/unl_wrapper -a fixpermissions') os.system("/opt/unetlab/wrappers/unl_wrapper -a fixpermissions")
os.system(f'rm -f {file_downloaded}') os.system(f"rm -f {file_downloaded}")
if noztp: if noztp:
self._disable_ztp(file_path=file_path) self._disable_ztp(file_path=file_path)
@ -502,12 +557,12 @@ class ObjectDownloader():
version (str): version (str):
image_name (str, optional): Image name to use. Defaults to "arista/ceos". image_name (str, optional): Image name to use. Defaults to "arista/ceos".
""" """
docker_image = f'{image_name}:{self.version}' docker_image = f"{image_name}:{self.version}"
logger.info(f'Importing image {self.filename} to {docker_image}') logger.info(f"Importing image {self.filename} to {docker_image}")
console.print(f'🚀 Importing image {self.filename} to {docker_image}') console.print(f"🚀 Importing image {self.filename} to {docker_image}")
os.system(f'$(which docker) import {self.filename} {docker_image}') os.system(f"$(which docker) import {self.filename} {docker_image}")
for filename in glob.glob(f'{self.filename}*'): for filename in glob.glob(f"{self.filename}*"):
try: try:
os.remove(filename) os.remove(filename)
except FileNotFoundError: except FileNotFoundError:
console.print(f'File not found: {filename}') console.print(f"File not found: {filename}")

View file

@ -2,7 +2,8 @@
disable= disable=
invalid-name, invalid-name,
logging-fstring-interpolation, logging-fstring-interpolation,
fixme fixme,
line-too-long
[BASIC] [BASIC]
good-names=runCmds, i, y, t, c, x, e, fd, ip, v good-names=runCmds, i, y, t, c, x, e, fd, ip, v

View file

@ -6,7 +6,7 @@ build-backend = "setuptools.build_meta"
[project] [project]
name = "eos_downloader" name = "eos_downloader"
version = "v0.8.2" version = "v0.9.0"
readme = "README.md" readme = "README.md"
authors = [{ name = "Thomas Grimonet", email = "thomas.grimonet@gmail.com" }] authors = [{ name = "Thomas Grimonet", email = "thomas.grimonet@gmail.com" }]
maintainers = [ maintainers = [
@ -22,7 +22,7 @@ dependencies = [
"scp", "scp",
"tqdm", "tqdm",
"loguru", "loguru",
"rich~=13.5.2", "rich>=13.5.2,<13.7.0",
"cvprac>=1.0.7", "cvprac>=1.0.7",
"click~=8.1.6", "click~=8.1.6",
"click-help-colors~=0.9", "click-help-colors~=0.9",
@ -62,7 +62,7 @@ dev = [
"pytest-html>=3.1.1", "pytest-html>=3.1.1",
"pytest-metadata>=1.11.0", "pytest-metadata>=1.11.0",
"pylint-pydantic>=0.2.4", "pylint-pydantic>=0.2.4",
"tox==4.10.0", "tox~=4.11",
"types-PyYAML", "types-PyYAML",
"types-paramiko", "types-paramiko",
"types-requests", "types-requests",
@ -94,7 +94,7 @@ namespaces = false
# Version # Version
################################ ################################
[tool.bumpver] [tool.bumpver]
current_version = "0.8.2" current_version = "0.9.0"
version_pattern = "MAJOR.MINOR.PATCH" version_pattern = "MAJOR.MINOR.PATCH"
commit_message = "bump: Version {old_version} -> {new_version}" commit_message = "bump: Version {old_version} -> {new_version}"
commit = true commit = true

View file

@ -5,12 +5,13 @@
# flake8: noqa: W503 # flake8: noqa: W503
# flake8: noqa: W1202 # flake8: noqa: W1202
from __future__ import (absolute_import, division, print_function) from __future__ import absolute_import, division, print_function
import os
import eos_downloader
from eos_downloader.eos import EOSDownloader
from eos_downloader.data import DATA_MAPPING
import os
import eos_downloader
from eos_downloader.data import DATA_MAPPING
from eos_downloader.eos import EOSDownloader
# --------------------------------------------------------------- # # --------------------------------------------------------------- #
# MOOCK data to use for testing # MOOCK data to use for testing
@ -18,99 +19,99 @@ from eos_downloader.data import DATA_MAPPING
# Get Auth token # Get Auth token
# eos_token = os.getenv('ARISTA_TOKEN') # eos_token = os.getenv('ARISTA_TOKEN')
eos_token = os.getenv('ARISTA_TOKEN', 'invalid_token') eos_token = os.getenv("ARISTA_TOKEN", "invalid_token")
eos_token_invalid = 'invalid_token' eos_token_invalid = "invalid_token"
eos_dataset_valid = [ eos_dataset_valid = [
{ {
'image': 'EOS', "image": "EOS",
'version': '4.26.3M', "version": "4.26.3M",
'software': 'EOS', "software": "EOS",
'filename': 'EOS-4.26.3M.swi', "filename": "EOS-4.26.3M.swi",
'expected_hash': 'sha512sum', "expected_hash": "sha512sum",
'remote_path': '/support/download/EOS-USA/Active Releases/4.26/EOS-4.26.3M/EOS-4.26.3M.swi', "remote_path": "/support/download/EOS-USA/Active Releases/4.26/EOS-4.26.3M/EOS-4.26.3M.swi",
'compute_checksum': True "compute_checksum": True,
}, },
{ {
'image': 'EOS', "image": "EOS",
'version': '4.25.6M', "version": "4.25.6M",
'software': 'EOS', "software": "EOS",
'filename': 'EOS-4.25.6M.swi', "filename": "EOS-4.25.6M.swi",
'expected_hash': 'md5sum', "expected_hash": "md5sum",
'remote_path': '/support/download/EOS-USA/Active Releases/4.25/EOS-4.25.6M/EOS-4.25.6M.swi', "remote_path": "/support/download/EOS-USA/Active Releases/4.25/EOS-4.25.6M/EOS-4.25.6M.swi",
'compute_checksum': True "compute_checksum": True,
}, },
{ {
'image': 'vEOS-lab', "image": "vEOS-lab",
'version': '4.25.6M', "version": "4.25.6M",
'software': 'EOS', "software": "EOS",
'filename': 'vEOS-lab-4.25.6M.vmdk', "filename": "vEOS-lab-4.25.6M.vmdk",
'expected_hash': 'md5sum', "expected_hash": "md5sum",
'remote_path': '/support/download/EOS-USA/Active Releases/4.25/EOS-4.25.6M/vEOS-lab/vEOS-lab-4.25.6M.vmdk', "remote_path": "/support/download/EOS-USA/Active Releases/4.25/EOS-4.25.6M/vEOS-lab/vEOS-lab-4.25.6M.vmdk",
'compute_checksum': False "compute_checksum": False,
} },
] ]
eos_dataset_invalid = [ eos_dataset_invalid = [
{ {
'image': 'default', "image": "default",
'version': '4.26.3M', "version": "4.26.3M",
'software': 'EOS', "software": "EOS",
'filename': 'EOS-4.26.3M.swi', "filename": "EOS-4.26.3M.swi",
'expected_hash': 'sha512sum', "expected_hash": "sha512sum",
'remote_path': '/support/download/EOS-USA/Active Releases/4.26/EOS-4.26.3M/EOS-4.26.3M.swi', "remote_path": "/support/download/EOS-USA/Active Releases/4.26/EOS-4.26.3M/EOS-4.26.3M.swi",
'compute_checksum': True "compute_checksum": True,
} }
] ]
eos_version = [ eos_version = [
{ {
'version': 'EOS-4.23.1F', "version": "EOS-4.23.1F",
'is_valid': True, "is_valid": True,
'major': 4, "major": 4,
'minor': 23, "minor": 23,
'patch': 1, "patch": 1,
'rtype': 'F' "rtype": "F",
}, },
{ {
'version': 'EOS-4.23.0', "version": "EOS-4.23.0",
'is_valid': True, "is_valid": True,
'major': 4, "major": 4,
'minor': 23, "minor": 23,
'patch': 0, "patch": 0,
'rtype': None "rtype": None,
}, },
{ {
'version': 'EOS-4.23', "version": "EOS-4.23",
'is_valid': True, "is_valid": True,
'major': 4, "major": 4,
'minor': 23, "minor": 23,
'patch': 0, "patch": 0,
'rtype': None "rtype": None,
}, },
{ {
'version': 'EOS-4.23.1M', "version": "EOS-4.23.1M",
'is_valid': True, "is_valid": True,
'major': 4, "major": 4,
'minor': 23, "minor": 23,
'patch': 1, "patch": 1,
'rtype': 'M' "rtype": "M",
}, },
{ {
'version': 'EOS-4.23.1.F', "version": "EOS-4.23.1.F",
'is_valid': True, "is_valid": True,
'major': 4, "major": 4,
'minor': 23, "minor": 23,
'patch': 1, "patch": 1,
'rtype': 'F' "rtype": "F",
}, },
{ {
'version': 'EOS-5.23.1F', "version": "EOS-5.23.1F",
'is_valid': False, "is_valid": False,
'major': 4, "major": 4,
'minor': 23, "minor": 23,
'patch': 1, "patch": 1,
'rtype': 'F' "rtype": "F",
}, },
] ]

View file

@ -5,13 +5,20 @@
# flake8: noqa: W503 # flake8: noqa: W503
# flake8: noqa: W1202 # flake8: noqa: W1202
from __future__ import (absolute_import, division, print_function) from __future__ import absolute_import, division, print_function
import os
import pytest
import eos_downloader
from typing import Dict, Any, List
from tests.lib.dataset import eos_dataset_valid, eos_dataset_invalid, eos_token, eos_token_invalid
import os
from typing import Any, Dict, List
import pytest
import eos_downloader
from tests.lib.dataset import (
eos_dataset_invalid,
eos_dataset_valid,
eos_token,
eos_token_invalid,
)
@pytest.fixture @pytest.fixture
@ -19,17 +26,18 @@ from tests.lib.dataset import eos_dataset_valid, eos_dataset_invalid, eos_token,
def create_download_instance(request, DOWNLOAD_INFO): def create_download_instance(request, DOWNLOAD_INFO):
# logger.info("Execute fixture to create class elements") # logger.info("Execute fixture to create class elements")
request.cls.eos_downloader = eos_downloader.eos.EOSDownloader( request.cls.eos_downloader = eos_downloader.eos.EOSDownloader(
image=DOWNLOAD_INFO['image'], image=DOWNLOAD_INFO["image"],
software=DOWNLOAD_INFO['software'], software=DOWNLOAD_INFO["software"],
version=DOWNLOAD_INFO['version'], version=DOWNLOAD_INFO["version"],
token=eos_token, token=eos_token,
hash_method='sha512sum') hash_method="sha512sum",
)
yield yield
# logger.info('Cleanup test environment') # logger.info('Cleanup test environment')
os.system('rm -f {}*'.format(DOWNLOAD_INFO['filename'])) os.system("rm -f {}*".format(DOWNLOAD_INFO["filename"]))
def generate_test_ids_dict(val: Dict[str, Any], key: str = 'name') -> str: def generate_test_ids_dict(val: Dict[str, Any], key: str = "name") -> str:
""" """
generate_test_ids Helper to generate test ID for parametrize generate_test_ids Helper to generate test ID for parametrize
@ -50,7 +58,8 @@ def generate_test_ids_dict(val: Dict[str, Any], key: str = 'name') -> str:
return val[key] return val[key]
return "undefined_test" return "undefined_test"
def generate_test_ids_list(val: List[Dict[str, Any]], key: str = 'name') -> str:
def generate_test_ids_list(val: List[Dict[str, Any]], key: str = "name") -> str:
""" """
generate_test_ids Helper to generate test ID for parametrize generate_test_ids Helper to generate test ID for parametrize
@ -66,4 +75,4 @@ def generate_test_ids_list(val: List[Dict[str, Any]], key: str = 'name') -> str:
str str
Name of the configlet Name of the configlet
""" """
return [ entry[key] if key in entry.keys() else 'unset_entry' for entry in val ] return [entry[key] if key in entry.keys() else "unset_entry" for entry in val]

View file

@ -5,14 +5,13 @@
# flake8: noqa: W503 # flake8: noqa: W503
# flake8: noqa: W1202 # flake8: noqa: W1202
from __future__ import (absolute_import, division, print_function) from __future__ import absolute_import, division, print_function
import os import os
from eos_downloader.data import DATA_MAPPING from eos_downloader.data import DATA_MAPPING
def default_filename(version: str, info): def default_filename(version: str, info):
""" """
default_filename Helper to build default filename default_filename Helper to build default filename
@ -31,10 +30,14 @@ def default_filename(version: str, info):
""" """
if version is None or info is None: if version is None or info is None:
return None return None
return DATA_MAPPING[info['software']]['default']['prepend'] + '-' + version + '.swi' return DATA_MAPPING[info["software"]]["default"]["prepend"] + "-" + version + ".swi"
def is_on_github_actions(): def is_on_github_actions():
"""Check if code is running on a CI runner""" """Check if code is running on a CI runner"""
if "CI" not in os.environ or not os.environ["CI"] or "GITHUB_RUN_ID" not in os.environ: if (
return False "CI" not in os.environ
or not os.environ["CI"]
or "GITHUB_RUN_ID" not in os.environ
):
return False

View file

@ -45,4 +45,3 @@ class TestEosDownload_valid():
@pytest.mark.eos_download @pytest.mark.eos_download
def test_download_local(self, DOWNLOAD_INFO): def test_download_local(self, DOWNLOAD_INFO):
self.eos_downloader.download_local(file_path='.', checksum=DOWNLOAD_INFO['compute_checksum']) self.eos_downloader.download_local(file_path='.', checksum=DOWNLOAD_INFO['compute_checksum'])

View file

@ -5,126 +5,166 @@
# flake8: noqa: W503 # flake8: noqa: W503
# flake8: noqa: W1202 # flake8: noqa: W1202
from __future__ import (absolute_import, division, print_function) from __future__ import absolute_import, division, print_function
import sys import sys
from loguru import logger
import pytest import pytest
from eos_downloader.models.version import EosVersion, BASE_VERSION_STR from loguru import logger
from eos_downloader.models.version import BASE_VERSION_STR, EosVersion
from tests.lib.dataset import eos_version from tests.lib.dataset import eos_version
from tests.lib.fixtures import generate_test_ids_list from tests.lib.fixtures import generate_test_ids_list
logger.remove() logger.remove()
logger.add(sys.stderr, level="DEBUG") logger.add(sys.stderr, level="DEBUG")
@pytest.mark.parametrize("EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version,key='version'))
@pytest.mark.parametrize(
"EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version, key="version")
)
def test_eos_version_from_str(EOS_VERSION): def test_eos_version_from_str(EOS_VERSION):
version = EosVersion.from_str(EOS_VERSION['version']) version = EosVersion.from_str(EOS_VERSION["version"])
if EOS_VERSION['is_valid']: if EOS_VERSION["is_valid"]:
assert version.major == EOS_VERSION['major'] assert version.major == EOS_VERSION["major"]
assert version.minor == EOS_VERSION['minor'] assert version.minor == EOS_VERSION["minor"]
assert version.patch == EOS_VERSION['patch'] assert version.patch == EOS_VERSION["patch"]
assert version.rtype == EOS_VERSION['rtype'] assert version.rtype == EOS_VERSION["rtype"]
else: else:
assert str(version) == BASE_VERSION_STR assert str(version) == BASE_VERSION_STR
@pytest.mark.parametrize("EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version,key='version')) @pytest.mark.parametrize(
"EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version, key="version")
)
def test_eos_version_to_str(EOS_VERSION): def test_eos_version_to_str(EOS_VERSION):
version = EosVersion(**EOS_VERSION) version = EosVersion(**EOS_VERSION)
if EOS_VERSION['is_valid']: if EOS_VERSION["is_valid"]:
assert version.major == EOS_VERSION['major'] assert version.major == EOS_VERSION["major"]
assert version.minor == EOS_VERSION['minor'] assert version.minor == EOS_VERSION["minor"]
assert version.patch == EOS_VERSION['patch'] assert version.patch == EOS_VERSION["patch"]
assert version.rtype == EOS_VERSION['rtype'] assert version.rtype == EOS_VERSION["rtype"]
@pytest.mark.parametrize("EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version,key='version'))
@pytest.mark.parametrize(
"EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version, key="version")
)
def test_eos_version_branch(EOS_VERSION): def test_eos_version_branch(EOS_VERSION):
if EOS_VERSION['is_valid']: if EOS_VERSION["is_valid"]:
version = EosVersion(**EOS_VERSION) version = EosVersion(**EOS_VERSION)
assert version.branch == f'{EOS_VERSION["major"]}.{EOS_VERSION["minor"]}' assert version.branch == f'{EOS_VERSION["major"]}.{EOS_VERSION["minor"]}'
@pytest.mark.parametrize("EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version,key='version'))
@pytest.mark.parametrize(
"EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version, key="version")
)
def test_eos_version_eq_operator(EOS_VERSION): def test_eos_version_eq_operator(EOS_VERSION):
if not EOS_VERSION['is_valid']: if not EOS_VERSION["is_valid"]:
pytest.skip('not a valid version to test') pytest.skip("not a valid version to test")
version = EosVersion(**EOS_VERSION) version = EosVersion(**EOS_VERSION)
logger.warning(f'version is: {version.dict()}') logger.warning(f"version is: {version.dict()}")
assert version == version assert version == version
@pytest.mark.parametrize("EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version,key='version'))
@pytest.mark.parametrize(
"EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version, key="version")
)
def test_eos_version_ge_operator(EOS_VERSION): def test_eos_version_ge_operator(EOS_VERSION):
if not EOS_VERSION['is_valid']: if not EOS_VERSION["is_valid"]:
pytest.skip('not a valid version to test') pytest.skip("not a valid version to test")
version = EosVersion(**EOS_VERSION) version = EosVersion(**EOS_VERSION)
version_b = EosVersion.from_str(BASE_VERSION_STR) version_b = EosVersion.from_str(BASE_VERSION_STR)
assert version >= version_b assert version >= version_b
@pytest.mark.parametrize("EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version,key='version'))
@pytest.mark.parametrize(
"EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version, key="version")
)
def test_eos_version_gs_operator(EOS_VERSION): def test_eos_version_gs_operator(EOS_VERSION):
if not EOS_VERSION['is_valid']: if not EOS_VERSION["is_valid"]:
pytest.skip('not a valid version to test') pytest.skip("not a valid version to test")
version = EosVersion(**EOS_VERSION) version = EosVersion(**EOS_VERSION)
version_b = EosVersion.from_str(BASE_VERSION_STR) version_b = EosVersion.from_str(BASE_VERSION_STR)
assert version > version_b assert version > version_b
@pytest.mark.parametrize("EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version,key='version'))
@pytest.mark.parametrize(
"EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version, key="version")
)
def test_eos_version_le_operator(EOS_VERSION): def test_eos_version_le_operator(EOS_VERSION):
if not EOS_VERSION['is_valid']: if not EOS_VERSION["is_valid"]:
pytest.skip('not a valid version to test') pytest.skip("not a valid version to test")
version = EosVersion(**EOS_VERSION) version = EosVersion(**EOS_VERSION)
version_b = EosVersion.from_str(BASE_VERSION_STR) version_b = EosVersion.from_str(BASE_VERSION_STR)
assert version_b <= version assert version_b <= version
@pytest.mark.parametrize("EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version,key='version'))
@pytest.mark.parametrize(
"EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version, key="version")
)
def test_eos_version_ls_operator(EOS_VERSION): def test_eos_version_ls_operator(EOS_VERSION):
if not EOS_VERSION['is_valid']: if not EOS_VERSION["is_valid"]:
pytest.skip('not a valid version to test') pytest.skip("not a valid version to test")
version = EosVersion(**EOS_VERSION) version = EosVersion(**EOS_VERSION)
version_b = EosVersion.from_str(BASE_VERSION_STR) version_b = EosVersion.from_str(BASE_VERSION_STR)
assert version_b < version assert version_b < version
@pytest.mark.parametrize("EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version,key='version'))
@pytest.mark.parametrize(
"EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version, key="version")
)
def test_eos_version_ne_operator(EOS_VERSION): def test_eos_version_ne_operator(EOS_VERSION):
if not EOS_VERSION['is_valid']: if not EOS_VERSION["is_valid"]:
pytest.skip('not a valid version to test') pytest.skip("not a valid version to test")
version = EosVersion(**EOS_VERSION) version = EosVersion(**EOS_VERSION)
version_b = EosVersion.from_str(BASE_VERSION_STR) version_b = EosVersion.from_str(BASE_VERSION_STR)
assert version_b != version assert version_b != version
@pytest.mark.parametrize("EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version,key='version'))
@pytest.mark.parametrize(
"EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version, key="version")
)
def test_eos_version_match(EOS_VERSION): def test_eos_version_match(EOS_VERSION):
if not EOS_VERSION['is_valid']: if not EOS_VERSION["is_valid"]:
pytest.skip('not a valid version to test') pytest.skip("not a valid version to test")
version = EosVersion(**EOS_VERSION) version = EosVersion(**EOS_VERSION)
assert version.match(f'=={EOS_VERSION["version"]}') assert version.match(f'=={EOS_VERSION["version"]}')
assert version.match(f'!={BASE_VERSION_STR}') assert version.match(f"!={BASE_VERSION_STR}")
assert version.match(f'>={BASE_VERSION_STR}') assert version.match(f">={BASE_VERSION_STR}")
assert version.match(f'>{BASE_VERSION_STR}') assert version.match(f">{BASE_VERSION_STR}")
assert version.match('<=4.99.0F') assert version.match("<=4.99.0F")
assert version.match('<4.99.0F') assert version.match("<4.99.0F")
@pytest.mark.parametrize("EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version,key='version'))
@pytest.mark.parametrize(
"EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version, key="version")
)
def test_eos_version_is_in_branch(EOS_VERSION): def test_eos_version_is_in_branch(EOS_VERSION):
if not EOS_VERSION['is_valid']: if not EOS_VERSION["is_valid"]:
pytest.skip('not a valid version to test') pytest.skip("not a valid version to test")
version = EosVersion(**EOS_VERSION) version = EosVersion(**EOS_VERSION)
assert version.is_in_branch(f"{EOS_VERSION['major']}.{EOS_VERSION['minor']}") assert version.is_in_branch(f"{EOS_VERSION['major']}.{EOS_VERSION['minor']}")
@pytest.mark.parametrize("EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version,key='version'))
@pytest.mark.parametrize(
"EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version, key="version")
)
def test_eos_version_match_exception(EOS_VERSION): def test_eos_version_match_exception(EOS_VERSION):
if not EOS_VERSION['is_valid']: if not EOS_VERSION["is_valid"]:
pytest.skip('not a valid version to test') pytest.skip("not a valid version to test")
with pytest.raises(Exception) as e_info: with pytest.raises(Exception) as e_info:
version = EosVersion(**EOS_VERSION) version = EosVersion(**EOS_VERSION)
assert version.match(f'+={EOS_VERSION["version"]}') assert version.match(f'+={EOS_VERSION["version"]}')
logger.info(f'receive exception: {e_info}') logger.info(f"receive exception: {e_info}")
@pytest.mark.parametrize("EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version,key='version'))
@pytest.mark.parametrize(
"EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version, key="version")
)
def test_eos_version_compare_exception(EOS_VERSION): def test_eos_version_compare_exception(EOS_VERSION):
if not EOS_VERSION['is_valid']: if not EOS_VERSION["is_valid"]:
pytest.skip('not a valid version to test') pytest.skip("not a valid version to test")
with pytest.raises(Exception) as e_info: with pytest.raises(Exception) as e_info:
version = EosVersion(**EOS_VERSION) version = EosVersion(**EOS_VERSION)
version._compare(BASE_VERSION_STR) version._compare(BASE_VERSION_STR)
logger.info(f'receive exception: {e_info}') logger.info(f"receive exception: {e_info}")

View file

@ -14,30 +14,41 @@ from loguru import logger
import eos_downloader import eos_downloader
from eos_downloader.data import DATA_MAPPING from eos_downloader.data import DATA_MAPPING
from eos_downloader.eos import EOSDownloader from eos_downloader.eos import EOSDownloader
from tests.lib.dataset import eos_dataset_invalid, eos_dataset_valid, eos_token, eos_token_invalid from tests.lib.dataset import (
eos_dataset_invalid,
eos_dataset_valid,
eos_token,
eos_token_invalid,
)
from tests.lib.fixtures import create_download_instance from tests.lib.fixtures import create_download_instance
from tests.lib.helpers import default_filename, is_on_github_actions from tests.lib.helpers import default_filename, is_on_github_actions
logger.remove() logger.remove()
logger.add(sys.stderr, level="DEBUG") logger.add(sys.stderr, level="DEBUG")
@pytest.mark.usefixtures("create_download_instance") @pytest.mark.usefixtures("create_download_instance")
@pytest.mark.parametrize("DOWNLOAD_INFO", eos_dataset_valid, ids=['EOS-sha512', 'EOS-md5' ,'vEOS-lab-no-hash']) @pytest.mark.parametrize(
"DOWNLOAD_INFO",
eos_dataset_valid,
ids=["EOS-sha512", "EOS-md5", "vEOS-lab-no-hash"],
)
@pytest.mark.eos_download @pytest.mark.eos_download
class TestEosDownload_valid(): class TestEosDownload_valid:
def test_data(self, DOWNLOAD_INFO): def test_data(self, DOWNLOAD_INFO):
logger.info(f'test input: {DOWNLOAD_INFO}') logger.info(f"test input: {DOWNLOAD_INFO}")
logger.info(f'test build: {self.eos_downloader.__dict__}') logger.info(f"test build: {self.eos_downloader.__dict__}")
def test_eos_download_create(self, DOWNLOAD_INFO): def test_eos_download_create(self, DOWNLOAD_INFO):
my_download = eos_downloader.eos.EOSDownloader( my_download = eos_downloader.eos.EOSDownloader(
image=DOWNLOAD_INFO['image'], image=DOWNLOAD_INFO["image"],
software=DOWNLOAD_INFO['software'], software=DOWNLOAD_INFO["software"],
version=DOWNLOAD_INFO['version'], version=DOWNLOAD_INFO["version"],
token=eos_token, token=eos_token,
hash_method='sha512sum') hash_method="sha512sum",
)
logger.info(my_download) logger.info(my_download)
assert isinstance(my_download, eos_downloader.eos.EOSDownloader) assert isinstance(my_download, eos_downloader.eos.EOSDownloader)
def test_eos_download_repr_string(self, DOWNLOAD_INFO): def test_eos_download_repr_string(self, DOWNLOAD_INFO):
expected = f"{DOWNLOAD_INFO['software']} - {DOWNLOAD_INFO['image']} - {DOWNLOAD_INFO['version']}" expected = f"{DOWNLOAD_INFO['software']} - {DOWNLOAD_INFO['image']} - {DOWNLOAD_INFO['version']}"
@ -45,47 +56,56 @@ class TestEosDownload_valid():
assert str(self.eos_downloader) == expected assert str(self.eos_downloader) == expected
def test_eos_download_build_filename(self, DOWNLOAD_INFO): def test_eos_download_build_filename(self, DOWNLOAD_INFO):
assert self.eos_downloader._build_filename() == DOWNLOAD_INFO['filename'] assert self.eos_downloader._build_filename() == DOWNLOAD_INFO["filename"]
@pytest.mark.dependency(name='authentication') @pytest.mark.dependency(name="authentication")
@pytest.mark.skipif(eos_token == eos_token_invalid, reason="Token is not set correctly") @pytest.mark.skipif(
eos_token == eos_token_invalid, reason="Token is not set correctly"
)
@pytest.mark.skipif(is_on_github_actions(), reason="Running on Github Runner") @pytest.mark.skipif(is_on_github_actions(), reason="Running on Github Runner")
# @pytest.mark.xfail(reason="Deliberate - CI not set for testing AUTH") # @pytest.mark.xfail(reason="Deliberate - CI not set for testing AUTH")
@pytest.mark.webtest @pytest.mark.webtest
def test_eos_download_authenticate(self): def test_eos_download_authenticate(self):
assert self.eos_downloader.authenticate() is True assert self.eos_downloader.authenticate() is True
@pytest.mark.dependency(depends=["authentication"], scope='class') @pytest.mark.dependency(depends=["authentication"], scope="class")
@pytest.mark.webtest @pytest.mark.webtest
def test_eos_download_get_remote_file_path(self, DOWNLOAD_INFO): def test_eos_download_get_remote_file_path(self, DOWNLOAD_INFO):
assert self.eos_downloader._get_remote_filepath() == DOWNLOAD_INFO['remote_path'] assert (
self.eos_downloader._get_remote_filepath() == DOWNLOAD_INFO["remote_path"]
)
@pytest.mark.dependency(depends=["authentication"], scope='class') @pytest.mark.dependency(depends=["authentication"], scope="class")
@pytest.mark.webtest @pytest.mark.webtest
def test_eos_download_get_file_url(self, DOWNLOAD_INFO): def test_eos_download_get_file_url(self, DOWNLOAD_INFO):
url = self.eos_downloader._get_url(remote_file_path = DOWNLOAD_INFO['remote_path']) url = self.eos_downloader._get_url(
remote_file_path=DOWNLOAD_INFO["remote_path"]
)
logger.info(url) logger.info(url)
assert 'https://downloads.arista.com/EOS-USA/Active%20Releases/' in url assert "https://downloads.arista.com/EOS-USA/Active%20Releases/" in url
@pytest.mark.usefixtures("create_download_instance") @pytest.mark.usefixtures("create_download_instance")
@pytest.mark.parametrize("DOWNLOAD_INFO", eos_dataset_invalid, ids=['EOS-FAKE']) @pytest.mark.parametrize("DOWNLOAD_INFO", eos_dataset_invalid, ids=["EOS-FAKE"])
class TestEosDownload_invalid(): class TestEosDownload_invalid:
def test_data(self, DOWNLOAD_INFO): def test_data(self, DOWNLOAD_INFO):
logger.info(f'test input: {dict(DOWNLOAD_INFO)}') logger.info(f"test input: {dict(DOWNLOAD_INFO)}")
logger.info(f'test build: {self.eos_downloader.__dict__}') logger.info(f"test build: {self.eos_downloader.__dict__}")
def test_eos_download_login_error(self, DOWNLOAD_INFO): def test_eos_download_login_error(self, DOWNLOAD_INFO):
my_download = eos_downloader.eos.EOSDownloader( my_download = eos_downloader.eos.EOSDownloader(
image=DOWNLOAD_INFO['image'], image=DOWNLOAD_INFO["image"],
software=DOWNLOAD_INFO['software'], software=DOWNLOAD_INFO["software"],
version=DOWNLOAD_INFO['version'], version=DOWNLOAD_INFO["version"],
token=eos_token_invalid, token=eos_token_invalid,
hash_method=DOWNLOAD_INFO['expected_hash']) hash_method=DOWNLOAD_INFO["expected_hash"],
)
assert my_download.authenticate() is False assert my_download.authenticate() is False
@pytest.mark.dependency(name='authentication') @pytest.mark.dependency(name="authentication")
@pytest.mark.skipif(eos_token == eos_token_invalid, reason="Token is not set correctly") @pytest.mark.skipif(
eos_token == eos_token_invalid, reason="Token is not set correctly"
)
@pytest.mark.skipif(is_on_github_actions(), reason="Running on Github Runner") @pytest.mark.skipif(is_on_github_actions(), reason="Running on Github Runner")
# @pytest.mark.xfail(reason="Deliberate - CI not set for testing AUTH") # @pytest.mark.xfail(reason="Deliberate - CI not set for testing AUTH")
@pytest.mark.webtest @pytest.mark.webtest
@ -96,46 +116,48 @@ class TestEosDownload_invalid():
# @pytest.mark.skip(reason="Not yet implemented in lib") # @pytest.mark.skip(reason="Not yet implemented in lib")
def test_eos_file_name_with_incorrect_software(self, DOWNLOAD_INFO): def test_eos_file_name_with_incorrect_software(self, DOWNLOAD_INFO):
self.eos_downloader.software = 'FAKE' self.eos_downloader.software = "FAKE"
logger.info(f'test build: {self.eos_downloader.__dict__}') logger.info(f"test build: {self.eos_downloader.__dict__}")
with pytest.raises(ValueError) as e_info: with pytest.raises(ValueError) as e_info:
result = self.eos_downloader._build_filename() result = self.eos_downloader._build_filename()
logger.info(f'receive exception: {e_info}') logger.info(f"receive exception: {e_info}")
self.eos_downloader.software = DOWNLOAD_INFO['software'] self.eos_downloader.software = DOWNLOAD_INFO["software"]
@pytest.mark.webtest @pytest.mark.webtest
@pytest.mark.dependency(depends=["authentication"], scope='class') @pytest.mark.dependency(depends=["authentication"], scope="class")
def test_eos_download_get_remote_file_path_for_invlaid_software(self, DOWNLOAD_INFO): def test_eos_download_get_remote_file_path_for_invlaid_software(
self.eos_downloader.software = 'FAKE' self, DOWNLOAD_INFO
logger.info(f'Platform set to: {self.eos_downloader.software}') ):
logger.info(f'test build: {self.eos_downloader.__dict__}') self.eos_downloader.software = "FAKE"
logger.info(f"Platform set to: {self.eos_downloader.software}")
logger.info(f"test build: {self.eos_downloader.__dict__}")
with pytest.raises(ValueError) as e_info: with pytest.raises(ValueError) as e_info:
result = self.eos_downloader._build_filename() result = self.eos_downloader._build_filename()
logger.info(f'receive exception: {e_info}') logger.info(f"receive exception: {e_info}")
self.eos_downloader.software = DOWNLOAD_INFO['software'] self.eos_downloader.software = DOWNLOAD_INFO["software"]
# IMAGE TESTING # IMAGE TESTING
def test_eos_file_name_with_incorrect_image(self, DOWNLOAD_INFO): def test_eos_file_name_with_incorrect_image(self, DOWNLOAD_INFO):
self.eos_downloader.image = 'FAKE' self.eos_downloader.image = "FAKE"
logger.info(f'Image set to: {self.eos_downloader.image}') logger.info(f"Image set to: {self.eos_downloader.image}")
assert DOWNLOAD_INFO['filename'] == self.eos_downloader._build_filename() assert DOWNLOAD_INFO["filename"] == self.eos_downloader._build_filename()
self.eos_downloader.software == DOWNLOAD_INFO['image'] self.eos_downloader.software == DOWNLOAD_INFO["image"]
@pytest.mark.webtest @pytest.mark.webtest
@pytest.mark.dependency(depends=["authentication"], scope='class') @pytest.mark.dependency(depends=["authentication"], scope="class")
def test_eos_download_get_remote_file_path_for_invlaid_image(self, DOWNLOAD_INFO): def test_eos_download_get_remote_file_path_for_invlaid_image(self, DOWNLOAD_INFO):
self.eos_downloader.image = 'FAKE' self.eos_downloader.image = "FAKE"
logger.info(f'Image set to: {self.eos_downloader.image}') logger.info(f"Image set to: {self.eos_downloader.image}")
assert self.eos_downloader.authenticate() is True assert self.eos_downloader.authenticate() is True
assert DOWNLOAD_INFO['filename'] == self.eos_downloader._build_filename() assert DOWNLOAD_INFO["filename"] == self.eos_downloader._build_filename()
self.eos_downloader.image = DOWNLOAD_INFO['image'] self.eos_downloader.image = DOWNLOAD_INFO["image"]
# VERSION TESTING # VERSION TESTING
@pytest.mark.webtest @pytest.mark.webtest
@pytest.mark.dependency(depends=["authentication"], scope='class') @pytest.mark.dependency(depends=["authentication"], scope="class")
def test_eos_download_get_remote_file_path_for_invlaid_version(self, DOWNLOAD_INFO): def test_eos_download_get_remote_file_path_for_invlaid_version(self, DOWNLOAD_INFO):
self.eos_downloader.version = 'FAKE' self.eos_downloader.version = "FAKE"
logger.info(f'Version set to: {self.eos_downloader.version}') logger.info(f"Version set to: {self.eos_downloader.version}")
assert self.eos_downloader._get_remote_filepath() == '' assert self.eos_downloader._get_remote_filepath() == ""