1
0
Fork 0

Compare commits

...

6 commits

Author SHA1 Message Date
28e863dd91
Adding upstream version 0.10.3.
Signed-off-by: Daniel Baumann <daniel@debian.org>
2025-02-05 13:52:31 +01:00
5a4c1edf77
Adding upstream version 0.10.2.
Signed-off-by: Daniel Baumann <daniel@debian.org>
2025-02-05 13:51:57 +01:00
160b3093ed
Adding upstream version 0.10.1.
Signed-off-by: Daniel Baumann <daniel@debian.org>
2025-02-05 13:51:25 +01:00
ac8cf99bdd
Adding upstream version 0.10.0.
Signed-off-by: Daniel Baumann <daniel@debian.org>
2025-02-05 13:50:26 +01:00
fb90b93350
Adding upstream version 0.9.0.
Signed-off-by: Daniel Baumann <daniel@debian.org>
2025-02-05 13:48:40 +01:00
6dc7f1a5a2
Adding upstream version 0.8.2.
Signed-off-by: Daniel Baumann <daniel@debian.org>
2025-02-05 13:47:22 +01:00
35 changed files with 1551 additions and 1001 deletions

View file

@ -1,24 +0,0 @@
[bumpversion]
commit = True
tag = False
tag_message = Bump version: {current_version} → {new_version}
tag_name = v{new_version}
current_version = 0.7.0
parse = (?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>\d+)([-](?P<release>(dev|rc))+(?P<build>\d+))?
serialize =
{major}.{minor}.{patch}-{release}{build}
{major}.{minor}.{patch}
[bumpversion:part:release]
first_value = dev
optional_value = prod
values =
dev
prod
[bumpversion:part:build]
first_value = 1
[bumpversion:file:./eos_downloader/__init__.py]
search = __version__ = '{current_version}'
replace = __version__ = '{new_version}'

View file

@ -0,0 +1,41 @@
name: "rn-pr-labeler"
author: "@gmuloc"
description: "Parse a conventional commit compliant PR title and add it as a label to the PR with the prefix 'rn: '"
inputs:
auto_create_label:
description: "Boolean to indicate if the label should be auto created"
required: false
default: false
runs:
using: "composite"
steps:
- name: 'Looking up existing "rn:" label'
run: |
echo "OLD_LABEL=$(gh pr view ${{ github.event.pull_request.number }} --json labels -q .labels[].name | grep 'rn: ')" >> $GITHUB_ENV
shell: bash
- name: 'Delete existing "rn:" label if found'
run: gh pr edit ${{ github.event.pull_request.number }} --remove-label "${{ env.OLD_LABEL }}"
shell: bash
if: ${{ env.OLD_LABEL }}
- name: Set Label
# Using toJSON to support ' and " in commit messages
# https://stackoverflow.com/questions/73363167/github-actions-how-to-escape-characters-in-commit-message
run: echo "LABEL=$(echo ${{ toJSON(github.event.pull_request.title) }} | cut -d ':' -f 1 | tr -d ' ')" >> $GITHUB_ENV
shell: bash
# an alternative to verifying if the target label already exist is to
# create the label with --force in the next step, it will keep on changing
# the color of the label though so it may not be desirable.
- name: Check if label exist
run: |
EXIST=$(gh label list -L 100 --search "rn:" --json name -q '.[] | select(.name=="rn: ${{ env.LABEL }}").name')
echo "EXIST=$EXIST" >> $GITHUB_ENV
shell: bash
- name: Create Label if auto-create and label does not exist already
run: |
gh label create "rn: ${{ env.LABEL }}"
shell: bash
if: ${{ inputs.auto_create_label && ! env.EXIST }}
- name: Labelling PR
run: |
gh pr edit ${{ github.event.pull_request.number }} --add-label "rn: ${{ env.LABEL }}"
shell: bash

View file

@ -3,26 +3,33 @@
# Please see the documentation for all configuration options: # Please see the documentation for all configuration options:
# https://docs.github.com/github/administering-a-repository/configuration-options-for-dependency-updates # https://docs.github.com/github/administering-a-repository/configuration-options-for-dependency-updates
# Basic set up for three package managers
version: 2 version: 2
updates: updates:
- package-ecosystem: "github-actions" # Maintain dependencies for Python
directory: "/" # Dependabot supports updates to pyproject.toml files
labels: # if they follow the PEP 621 standard.
- dependabot
schedule:
interval: "weekly"
commit-message:
prefix: "bump"
include: "ci"
open-pull-requests-limit: 10
- package-ecosystem: "pip" - package-ecosystem: "pip"
directory: "/" directory: "/"
schedule:
interval: "daily"
reviewers:
- "titom73"
labels: labels:
- 'dependencies'
pull-request-branch-name:
separator: "/"
commit-message:
prefix: "bump: "
# Maintain dependencies for GitHub Actions
- package-ecosystem: "github-actions"
directory: "/"
schedule: schedule:
interval: "weekly" interval: "weekly"
reviewers:
- "titom73"
labels: labels:
- dependabot - 'CI'
commit-message: commit-message:
prefix: "bump" prefix: "ci: "
include: "requirements"
open-pull-requests-limit: 10

102
.github/release.md vendored Normal file
View file

@ -0,0 +1,102 @@
# Notes
Notes regarding how to release eos-downloader package
## Package requirements
- `bumpver`
- `build`
- `twine`
Also, [Github CLI](https://cli.github.com/) can be helpful and is recommended
## Bumping version
In a branch specific for this, use the `bumpver` tool.
It is configured to update:
* pyproject.toml
For instance to bump a patch version:
```
bumpver update --patch
```
and for a minor version
```
bumpver update --minor
```
Tip: It is possible to check what the changes would be using `--dry`
```
bumpver update --minor --dry
```
## Creating release on Github
Create the release on Github with the appropriate tag `vx.x.x`
## Release version `x.x.x`
> [!IMPORTANT]
> TODO - make this a github workflow
`x.x.x` is the version to be released
This is to be executed at the top of the repo
1. Checkout the latest version of `main` with the correct tag for the release
2. Create a new branch for release
```bash
git switch -c rel/vx.x.x
```
3. [Optional] Clean dist if required
4. Build the package locally
```bash
python -m build
```
5. Check the package with `twine` (replace with your vesion)
```bash
twine check dist/*
```
6. Upload the package to test.pypi
```bash
twine upload -r testpypi dist/eos-downloader-x.x.x.*
```
7. Verify the package by installing it in a local venv and checking it installs
and run correctly (run the tests)
```bash
# In a brand new venv
pip install -i https://test.pypi.org/simple/ --extra-index-url https://pypi.org/simple --no-cache eos-downloader
```
8. Push to eos-downloader repository and create a Pull Request
```bash
git push origin HEAD
gh pr create --title 'bump: eos-downloader vx.x.x'
```
9. Merge PR after review and wait for [workflow](https://github.com/titom73/eos-downloader/blob/main/.github/workflows/release.yml) to be executed.
```bash
gh pr merge --squash
```
10. Like 7 but for normal pypi
```bash
# In a brand new venv
pip install eos-downloader
```
11. Test installed version
```bash
eos-downloader --version
```

40
.github/release.yml vendored Normal file
View file

@ -0,0 +1,40 @@
changelog:
exclude:
labels:
- 'rn: test'
- 'rn: ci'
categories:
- title: Breaking Changes
labels:
- 'rn: feat!'
- 'rn: feat(cli)!'
- 'rn: fix!'
- 'rn: fix(cli)!'
- 'rn: cut!'
- 'rn: cut(cli)!'
- 'rn: revert!'
- 'rn: revert(cli)!'
- 'rn: refactor!'
- 'rn: refactor(cli)!'
- 'rn: bump!'
- 'rn: bump(cli)!'
- 'rn: feat!'
- 'rn: fix!'
- 'rn: cut!'
- 'rn: revert!'
- 'rn: refactor!'
- 'rn: bump!'
- title: New features and enhancements
labels:
- 'rn: feat'
- 'rn: feat(cli)'
- title: Fixed issues
labels:
- 'rn: fix'
- 'rn: fix(cli)'
- title: Documentation
labels:
- 'rn: doc!'
- title: Other Changes
labels:
- '*'

View file

@ -14,11 +14,11 @@ jobs:
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- name: Checkout - name: Checkout
uses: actions/checkout@v3 uses: actions/checkout@v4
- name: Docker meta for TAG - name: Docker meta for TAG
id: meta id: meta
uses: docker/metadata-action@v4 uses: docker/metadata-action@v5
with: with:
images: | images: |
${{ secrets.DOCKER_IMAGE }} ${{ secrets.DOCKER_IMAGE }}
@ -27,20 +27,20 @@ jobs:
type=raw,value=${{ inputs.tag }} type=raw,value=${{ inputs.tag }}
- name: Login to DockerHub - name: Login to DockerHub
uses: docker/login-action@v2 uses: docker/login-action@v3
with: with:
username: ${{ secrets.DOCKERHUB_USERNAME }} username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }} password: ${{ secrets.DOCKER_PASSWORD }}
- name: Login to GitHub Container Registry - name: Login to GitHub Container Registry
uses: docker/login-action@v2 uses: docker/login-action@v3
with: with:
registry: ghcr.io registry: ghcr.io
username: ${{ github.repository_owner }} username: ${{ github.repository_owner }}
password: ${{ secrets.GITHUB_TOKEN }} password: ${{ secrets.GITHUB_TOKEN }}
- name: Build and push - name: Build and push
uses: docker/build-push-action@v4 uses: docker/build-push-action@v6
with: with:
context: . context: .
file: Dockerfile file: Dockerfile
@ -54,11 +54,11 @@ jobs:
needs: [docker] needs: [docker]
steps: steps:
- name: Checkout - name: Checkout
uses: actions/checkout@v3 uses: actions/checkout@v4
- name: Docker meta for TAG - name: Docker meta for TAG
id: meta id: meta
uses: docker/metadata-action@v4 uses: docker/metadata-action@v5
with: with:
images: | images: |
${{ secrets.DOCKER_IMAGE }} ${{ secrets.DOCKER_IMAGE }}
@ -67,20 +67,20 @@ jobs:
type=raw,value=${{ inputs.tag }}-dind type=raw,value=${{ inputs.tag }}-dind
- name: Login to DockerHub - name: Login to DockerHub
uses: docker/login-action@v2 uses: docker/login-action@v3
with: with:
username: ${{ secrets.DOCKERHUB_USERNAME }} username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }} password: ${{ secrets.DOCKER_PASSWORD }}
- name: Login to GitHub Container Registry - name: Login to GitHub Container Registry
uses: docker/login-action@v2 uses: docker/login-action@v3
with: with:
registry: ghcr.io registry: ghcr.io
username: ${{ github.repository_owner }} username: ${{ github.repository_owner }}
password: ${{ secrets.GITHUB_TOKEN }} password: ${{ secrets.GITHUB_TOKEN }}
- name: Build and push - name: Build and push
uses: docker/build-push-action@v4 uses: docker/build-push-action@v6
with: with:
context: . context: .
file: Dockerfile.docker file: Dockerfile.docker

View file

@ -4,22 +4,36 @@ on:
push: push:
branches: branches:
- main - main
pull_request_target: pull_request:
types: [assigned, opened, synchronize, reopened] types: [assigned, opened, synchronize, reopened]
jobs: jobs:
compiling:
name: Run installation process and code compilation supported Python versions pre-commit:
runs-on: ubuntu-latest runs-on: ubuntu-latest
strategy: strategy:
matrix: matrix:
python-version: ["3.8", "3.9", "3.10"] python-version: ["3.8", "3.9", "3.10", "3.11"]
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- uses: pre-commit-ci/lite-action@v1.1.0
compiling:
name: Run installation process and code compilation supported Python versions
runs-on: ubuntu-latest
needs: [pre-commit]
strategy:
matrix:
python-version: ["3.8", "3.9", "3.10", "3.11"]
steps: steps:
- uses: actions/checkout@v3 - uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }} - name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4 uses: actions/setup-python@v5
with: with:
python-version: ${{ matrix.python-version }} python-version: ${{ matrix.python-version }}
@ -41,13 +55,13 @@ jobs:
strategy: strategy:
matrix: matrix:
python: ["3.8", "3.9", "3.10"] python: ["3.8", "3.9", "3.10", "3.11"]
steps: steps:
- uses: actions/checkout@v3 - uses: actions/checkout@v4
- name: Setup Python - name: Setup Python
uses: actions/setup-python@v3 uses: actions/setup-python@v5
with: with:
python-version: ${{ matrix.python }} python-version: ${{ matrix.python }}
@ -64,13 +78,13 @@ jobs:
strategy: strategy:
matrix: matrix:
python: ["3.8", "3.9", "3.10"] python: ["3.8", "3.9", "3.10", "3.11"]
steps: steps:
- uses: actions/checkout@v3 - uses: actions/checkout@v4
- name: Setup Python - name: Setup Python
uses: actions/setup-python@v3 uses: actions/setup-python@v5
with: with:
python-version: ${{ matrix.python }} python-version: ${{ matrix.python }}
@ -87,13 +101,13 @@ jobs:
strategy: strategy:
matrix: matrix:
python: ["3.8", "3.9", "3.10"] python: ["3.8", "3.9", "3.10", "3.11"]
steps: steps:
- uses: actions/checkout@v3 - uses: actions/checkout@v4
- name: Setup Python - name: Setup Python
uses: actions/setup-python@v3 uses: actions/setup-python@v5
with: with:
python-version: ${{ matrix.python }} python-version: ${{ matrix.python }}
@ -101,4 +115,5 @@ jobs:
run: pip install tox tox-gh-actions run: pip install tox tox-gh-actions
- name: "Run tox for ${{ matrix.python }}" - name: "Run tox for ${{ matrix.python }}"
run: tox -e testenv # run: tox -e testenv
run: tox

View file

@ -2,41 +2,20 @@
name: "Tag & Release management" name: "Tag & Release management"
on: on:
push: push:
# Sequence of patterns matched against refs/tags
tags: tags:
- 'v[0-9]+.[0-9]+.[0-9]+' # Push events to matching v*, i.e. v1.0, v20.15.10 - 'v[0-9]+.[0-9]+.[0-9]+' # Push events to matching v*, i.e. v1.0, v20.15.10
jobs: jobs:
# release:
# name: Create Github Release
# runs-on: ubuntu-latest
# steps:
# - name: Checkout code
# uses: actions/checkout@v3
# with:
# fetch-depth: 0
# - name: Generate Changelog
# run: |
# sudo apt update && sudo apt install zsh
# export TAG_CURRENT=$(git describe --abbrev=0 --tags)
# export TAG_PREVIOUS=$(git describe --abbrev=0 --tags `git rev-list --tags --skip=1 --max-count=1`)
# echo "Previous tag is: ${TAG_PREVIOUS}"
# echo "Current tag is: ${TAG_CURRENT}"
# zsh .github/changelog.sh ${TAG_CURRENT} ${TAG_PREVIOUS} md > CHANGELOG.md
# cat CHANGELOG.md
# - name: Release on Github
# uses: softprops/action-gh-release@v1
# with:
# draft: false
# body_path: CHANGELOG.md
pypi: pypi:
name: Publish version to Pypi servers name: Publish version to Pypi servers
runs-on: ubuntu-latest runs-on: ubuntu-latest
environment:
name: pypi
url: https://pypi.org/p/eos-downloader
permissions:
id-token: write
steps: steps:
- name: Checkout code - name: Checkout code
uses: actions/checkout@v3 uses: actions/checkout@v4
- name: Install dependencies - name: Install dependencies
run: | run: |
@ -47,7 +26,7 @@ jobs:
run: | run: |
python -m build python -m build
- name: Publish package to TestPyPI - name: Publish package to Pypi server
uses: pypa/gh-action-pypi-publish@release/v1 uses: pypa/gh-action-pypi-publish@release/v1
with: with:
user: __token__ user: __token__
@ -59,13 +38,13 @@ jobs:
needs: [pypi] needs: [pypi]
steps: steps:
- name: Checkout - name: Checkout
uses: actions/checkout@v3 uses: actions/checkout@v4
- name: Docker meta for TAG - name: Docker meta for TAG
id: meta id: meta
uses: docker/metadata-action@v4 uses: docker/metadata-action@v5
with: with:
images: images: |
${{ secrets.DOCKER_IMAGE }} ${{ secrets.DOCKER_IMAGE }}
ghcr.io/${{ secrets.DOCKER_IMAGE }} ghcr.io/${{ secrets.DOCKER_IMAGE }}
tags: | tags: |
@ -73,20 +52,20 @@ jobs:
type=raw,value=latest type=raw,value=latest
- name: Login to DockerHub - name: Login to DockerHub
uses: docker/login-action@v2 uses: docker/login-action@v3
with: with:
username: ${{ secrets.DOCKERHUB_USERNAME }} username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }} password: ${{ secrets.DOCKER_PASSWORD }}
- name: Login to GitHub Container Registry - name: Login to GitHub Container Registry
uses: docker/login-action@v2 uses: docker/login-action@v3
with: with:
registry: ghcr.io registry: ghcr.io
username: ${{ github.repository_owner }} username: ${{ github.repository_owner }}
password: ${{ secrets.GITHUB_TOKEN }} password: ${{ secrets.GITHUB_TOKEN }}
- name: Build and push - name: Build and push
uses: docker/build-push-action@v4 uses: docker/build-push-action@v6
with: with:
context: . context: .
file: Dockerfile file: Dockerfile
@ -100,13 +79,13 @@ jobs:
needs: [docker] needs: [docker]
steps: steps:
- name: Checkout - name: Checkout
uses: actions/checkout@v3 uses: actions/checkout@v4
- name: Docker meta for TAG - name: Docker meta for TAG
id: meta id: meta
uses: docker/metadata-action@v4 uses: docker/metadata-action@v5
with: with:
images: images: |
${{ secrets.DOCKER_IMAGE }} ${{ secrets.DOCKER_IMAGE }}
ghcr.io/${{ secrets.DOCKER_IMAGE }} ghcr.io/${{ secrets.DOCKER_IMAGE }}
tags: | tags: |
@ -114,23 +93,24 @@ jobs:
type=raw,value=latest-dind type=raw,value=latest-dind
- name: Login to DockerHub - name: Login to DockerHub
uses: docker/login-action@v2 uses: docker/login-action@v3
with: with:
username: ${{ secrets.DOCKERHUB_USERNAME }} username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }} password: ${{ secrets.DOCKER_PASSWORD }}
- name: Login to GitHub Container Registry - name: Login to GitHub Container Registry
uses: docker/login-action@v2 uses: docker/login-action@v3
with: with:
registry: ghcr.io registry: ghcr.io
username: ${{ github.repository_owner }} username: ${{ github.repository_owner }}
password: ${{ secrets.GITHUB_TOKEN }} password: ${{ secrets.GITHUB_TOKEN }}
- name: Build and push - name: Build and push
uses: docker/build-push-action@v4 uses: docker/build-push-action@v6
with: with:
context: . context: .
file: Dockerfile.docker file: Dockerfile.docker
push: true push: true
tags: ${{ steps.meta.outputs.tags }} tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }} labels: ${{ steps.meta.outputs.labels }}

1
.gitignore vendored
View file

@ -346,3 +346,4 @@ report.html
*.swp *.swp
arista.xml arista.xml
tester.py tester.py
*.tgz

64
.pre-commit-config.yaml Normal file
View file

@ -0,0 +1,64 @@
---
# See https://pre-commit.com for more information
# See https://pre-commit.com/hooks.html for more hooks
files: ^(eos_downloader)/
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.4.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
- id: check-added-large-files
- id: check-merge-conflict
# - repo: https://github.com/pycqa/isort
# rev: 5.12.0
# hooks:
# - id: isort
# name: Check for changes when running isort on all python files
- repo: https://github.com/psf/black
rev: 23.7.0
hooks:
- id: black
name: Check for changes when running Black on all python files
- repo: https://github.com/pycqa/flake8
rev: 6.0.0
hooks:
- id: flake8
name: Check for PEP8 error on Python files
args:
- --config=/dev/null
- --max-line-length=165
- repo: local # as per https://pylint.pycqa.org/en/latest/user_guide/installation/pre-commit-integration.html
hooks:
- id: pylint
entry: pylint
language: python
name: Check for Linting error on Python files
description: This hook runs pylint.
types: [python]
args:
- -rn # Only display messages
- -sn # Don't display the score
- --rcfile=pylintrc # Link to config file
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.4.1
hooks:
- id: mypy
args:
- --config-file=pyproject.toml
additional_dependencies:
- "click==8.1.3"
- "click-help-colors==0.9.1"
- "pydantic~=2.0"
- "PyYAML==6.0"
- "requests>=2.27"
- "rich~=13.4"
- types-paramiko
- types-requests
files: eos_downloader

128
CODE_OF_CONDUCT.md Normal file
View file

@ -0,0 +1,128 @@
# Contributor Covenant Code of Conduct
## Our Pledge
We as members, contributors, and leaders pledge to make participation in our
community a harassment-free experience for everyone, regardless of age, body
size, visible or invisible disability, ethnicity, sex characteristics, gender
identity and expression, level of experience, education, socio-economic status,
nationality, personal appearance, race, religion, or sexual identity
and orientation.
We pledge to act and interact in ways that contribute to an open, welcoming,
diverse, inclusive, and healthy community.
## Our Standards
Examples of behavior that contributes to a positive environment for our
community include:
* Demonstrating empathy and kindness toward other people
* Being respectful of differing opinions, viewpoints, and experiences
* Giving and gracefully accepting constructive feedback
* Accepting responsibility and apologizing to those affected by our mistakes,
and learning from the experience
* Focusing on what is best not just for us as individuals, but for the
overall community
Examples of unacceptable behavior include:
* The use of sexualized language or imagery, and sexual attention or
advances of any kind
* Trolling, insulting or derogatory comments, and personal or political attacks
* Public or private harassment
* Publishing others' private information, such as a physical or email
address, without their explicit permission
* Other conduct which could reasonably be considered inappropriate in a
professional setting
## Enforcement Responsibilities
Community leaders are responsible for clarifying and enforcing our standards of
acceptable behavior and will take appropriate and fair corrective action in
response to any behavior that they deem inappropriate, threatening, offensive,
or harmful.
Community leaders have the right and responsibility to remove, edit, or reject
comments, commits, code, wiki edits, issues, and other contributions that are
not aligned to this Code of Conduct, and will communicate reasons for moderation
decisions when appropriate.
## Scope
This Code of Conduct applies within all community spaces, and also applies when
an individual is officially representing the community in public spaces.
Examples of representing our community include using an official e-mail address,
posting via an official social media account, or acting as an appointed
representative at an online or offline event.
## Enforcement
Instances of abusive, harassing, or otherwise unacceptable behavior may be
reported to the community leaders responsible for enforcement at
contact@inetsix.net.
All complaints will be reviewed and investigated promptly and fairly.
All community leaders are obligated to respect the privacy and security of the
reporter of any incident.
## Enforcement Guidelines
Community leaders will follow these Community Impact Guidelines in determining
the consequences for any action they deem in violation of this Code of Conduct:
### 1. Correction
**Community Impact**: Use of inappropriate language or other behavior deemed
unprofessional or unwelcome in the community.
**Consequence**: A private, written warning from community leaders, providing
clarity around the nature of the violation and an explanation of why the
behavior was inappropriate. A public apology may be requested.
### 2. Warning
**Community Impact**: A violation through a single incident or series
of actions.
**Consequence**: A warning with consequences for continued behavior. No
interaction with the people involved, including unsolicited interaction with
those enforcing the Code of Conduct, for a specified period of time. This
includes avoiding interactions in community spaces as well as external channels
like social media. Violating these terms may lead to a temporary or
permanent ban.
### 3. Temporary Ban
**Community Impact**: A serious violation of community standards, including
sustained inappropriate behavior.
**Consequence**: A temporary ban from any sort of interaction or public
communication with the community for a specified period of time. No public or
private interaction with the people involved, including unsolicited interaction
with those enforcing the Code of Conduct, is allowed during this period.
Violating these terms may lead to a permanent ban.
### 4. Permanent Ban
**Community Impact**: Demonstrating a pattern of violation of community
standards, including sustained inappropriate behavior, harassment of an
individual, or aggression toward or disparagement of classes of individuals.
**Consequence**: A permanent ban from any sort of public interaction within
the community.
## Attribution
This Code of Conduct is adapted from the [Contributor Covenant][homepage],
version 2.0, available at
https://www.contributor-covenant.org/version/2/0/code_of_conduct.html.
Community Impact Guidelines were inspired by [Mozilla's code of conduct
enforcement ladder](https://github.com/mozilla/diversity).
[homepage]: https://www.contributor-covenant.org
For answers to common questions about this code of conduct, see the FAQ at
https://www.contributor-covenant.org/faq. Translations are available at
https://www.contributor-covenant.org/translations.

View file

@ -1,9 +1,22 @@
[![code-testing](https://github.com/titom73/eos-downloader/actions/workflows/pr-management.yml/badge.svg?event=push)](https://github.com/titom73/eos-downloader/actions/workflows/pr-management.yml) ![PyPI - Python Version](https://img.shields.io/pypi/pyversions/eos-downloader) ![GitHub release (latest SemVer)](https://img.shields.io/github/v/release/titom73/arista-downloader) ![PyPI - Downloads/month](https://img.shields.io/pypi/dm/eos-downloader) ![Docker Image Size (tag)](https://img.shields.io/docker/image-size/titom73/eos-downloader/edge) [![tests](https://github.com/titom73/eos-downloader/actions/workflows/pr-management.yml/badge.svg?event=push)](https://github.com/titom73/eos-downloader/actions/workflows/pr-management.yml)
![PyPI - Python Version](https://img.shields.io/pypi/pyversions/eos-downloader)
[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black)
![Checked with mypy](http://www.mypy-lang.org/static/mypy_badge.svg)
![GitHub release](https://img.shields.io/github/v/release/titom73/arista-downloader)
![PyPI - Downloads/month](https://img.shields.io/pypi/dm/eos-downloader)
<!--
[![pre-commit](https://img.shields.io/badge/pre--commit-enabled-brightgreen?logo=pre-commit&logoColor=white)](https://github.com/pre-commit/pre-commit)
!-->
# Arista Software Downloader # Arista Software Downloader
Script to download Arista softwares to local folder, Cloudvision or EVE-NG. Script to download Arista softwares to local folder, Cloudvision or EVE-NG.
> [!CAUTION]
> This script should not be deployed on EOS device. If you do that, there is no support to expect from Arista TAC team.
```bash ```bash
pip install eos-downloader pip install eos-downloader
``` ```
@ -19,6 +32,7 @@ Usage: ardl [OPTIONS] COMMAND [ARGS]...
Arista Network Download CLI Arista Network Download CLI
Options: Options:
--version Show the version and exit.
--token TEXT Arista Token from your customer account [env var: --token TEXT Arista Token from your customer account [env var:
ARISTA_TOKEN] ARISTA_TOKEN]
--help Show this message and exit. --help Show this message and exit.
@ -26,7 +40,6 @@ Options:
Commands: Commands:
debug Debug commands to work with ardl debug Debug commands to work with ardl
get Download Arista from Arista website get Download Arista from Arista website
version Display version of ardl
``` ```
> **Warning** > **Warning**
@ -35,10 +48,22 @@ Commands:
### Download EOS Package ### Download EOS Package
> **Note**
> Supported packages are: EOS, cEOS, vEOS-lab, cEOS64 > Supported packages are: EOS, cEOS, vEOS-lab, cEOS64
You can download EOS packages with following commands: CLI gives an option to get latest version available. By default it takes latest `F` release
```bash
ardl get eos --image-type cEOS --latest
```
If you want to get latest M release, you can use `--release-type`:
```bash
ardl get eos --image-type cEOS --release-type M --latest
```
You can download a specific EOS packages with following commands:
```bash ```bash
# Example for a cEOS package # Example for a cEOS package
@ -164,7 +189,7 @@ tqdm
On EVE-NG, you may have to install/upgrade __pyOpenSSL__ in version `23.0.0`: On EVE-NG, you may have to install/upgrade __pyOpenSSL__ in version `23.0.0`:
``` ```bash
# Error when running ardl: AttributeError: module 'lib' has no attribute 'X509_V_FLAG_CB_ISSUER_CHECK' # Error when running ardl: AttributeError: module 'lib' has no attribute 'X509_V_FLAG_CB_ISSUER_CHECK'
$ pip install pyopenssl --upgrade $ pip install pyopenssl --upgrade

View file

@ -1,111 +0,0 @@
## scripts
These scripts are deprecated and will be removed in a futur version. Please prefer the use of the CLI implemented in the package.
### eos-download
```bash
usage: eos-download [-h]
--version VERSION
[--token TOKEN]
[--image IMAGE]
[--destination DESTINATION]
[--eve]
[--noztp]
[--import_docker]
[--docker_name DOCKER_NAME]
[--verbose VERBOSE]
[--log]
EOS downloader script.
optional arguments:
-h, --help show this help message and exit
--token TOKEN arista.com user API key - can use ENV:ARISTA_TOKEN
--image IMAGE Type of EOS image required
--version VERSION EOS version to download from website
--destination DESTINATION
Path where to save EOS package downloaded
--eve Option to install EOS package to EVE-NG
--noztp Option to deactivate ZTP when used with EVE-NG
--import_docker Option to import cEOS image to docker
--docker_name DOCKER_NAME
Docker image name to use
--verbose VERBOSE Script verbosity
--log Option to activate logging to eos-downloader.log file
```
- Token are read from `ENV:ARISTA_TOKEN` unless you specify a specific token with CLI.
- Supported platforms:
- `INT`: International version
- `64`: 64 bits version
- `2GB` for 2GB flash platform
- `2GB-INT`: for 2GB running International
- `vEOS`: Virtual EOS image
- `vEOS-lab`: Virtual Lab EOS
- `vEOS64-lab`: Virtual Lab EOS running 64B
- `cEOS`: Docker version of EOS
- `cEOS64`: Docker version of EOS running in 64 bits
#### Examples
- Download vEOS-lab image and install in EVE-NG
```bash
$ eos-download --image vEOS-lab --version 4.25.7M --eve --noztp
```
- Download Docker image
```bash
$ eos-download --image cEOS --version 4.27.1F
🪐 eos-downloader is starting...
- Image Type: cEOS
- Version: 4.27.2F
✅ Authenticated on arista.com
🔎 Searching file cEOS-lab-4.27.2F.tar.xz
-> Found file at /support/download/EOS-USA/Active Releases/4.27/EOS-4.27.2F/cEOS-lab/cEOS-lab-4.27.2F.tar.xz
💾 Downloading cEOS-lab-4.27.2F.tar.xz ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100.0% • 17.1 MB/s • 451.6/451.6 MB • 0:00:19 •
🚀 Running checksum validation
🔎 Searching file cEOS-lab-4.27.2F.tar.xz.sha512sum
-> Found file at /support/download/EOS-USA/Active
Releases/4.27/EOS-4.27.2F/cEOS-lab/cEOS-lab-4.27.2F.tar.xz.sha512sum
💾 Downloading cEOS-lab-4.27.2F.tar.xz.sha512sum ━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100.0% • ? • 154/154 bytes • 0:00:00 •
✅ Downloaded file is correct.
```
__Note:__ `ARISTA_TOKEN` should be set in your .profile and not set for each command. If not set, you can use `--token` knob.
```bash
# Export Token
export ARISTA_TOKEN="xxxxxxx"
```
### Cloudvision Image uploader
Create an image bundle on Cloudvision.
```bash
cvp-upload -h
usage: cvp-upload [-h]
[--token TOKEN]
[--image IMAGE]
--cloudvision CLOUDVISION
[--create_bundle]
[--timeout TIMEOUT]
[--verbose VERBOSE]
Cloudvision Image uploader script.
optional arguments:
-h, --help show this help message and exit
--token TOKEN CVP Authentication token - can use ENV:ARISTA_AVD_CV_TOKEN
--image IMAGE Type of EOS image required
--cloudvision CLOUDVISION
Cloudvision instance where to upload image
--create_bundle Option to create image bundle with new uploaded image
--timeout TIMEOUT Timeout connection. Default is set to 1200sec
--verbose VERBOSE Script verbosity
```

View file

@ -1,56 +0,0 @@
#!/usr/bin/python
import sys
import os
import argparse
from eos_downloader.cvp import CvFeatureManager, CvpAuthenticationItem
from loguru import logger
ARISTA_AVD_CV_TOKEN = os.getenv('ARISTA_AVD_CV_TOKEN', '')
def read_cli():
parser = argparse.ArgumentParser(description='Cloudvision Image uploader script.')
parser.add_argument('--token', required=False,
default=ARISTA_AVD_CV_TOKEN,
help='CVP Authentication token - can use ENV:ARISTA_AVD_CV_TOKEN')
parser.add_argument('--image', required=False,
default='EOS', help='Type of EOS image required')
parser.add_argument('--cloudvision', required=True,
help='Cloudvision instance where to upload image')
parser.add_argument('--create_bundle', required=False, action='store_true',
help="Option to create image bundle with new uploaded image")
parser.add_argument('--timeout', required=False,
default=1200,
help='Timeout connection. Default is set to 1200sec')
parser.add_argument('--verbose', required=False,
default='info', help='Script verbosity')
return parser.parse_args()
if __name__ == '__main__':
cli_options = read_cli()
logger.remove()
logger.add(sys.stderr, level=str(cli_options.verbose).upper())
cv_authentication = CvpAuthenticationItem(
server=cli_options.cloudvision,
token=cli_options.token,
port=443,
timeout=cli_options.timeout,
validate_cert=False
)
my_cvp_uploader = CvFeatureManager(authentication=cv_authentication)
result_upload = my_cvp_uploader.upload_image(cli_options.image)
if result_upload and cli_options.create_bundle:
bundle_name = os.path.basename(cli_options.image)
logger.info('Creating image bundle {}'.format(bundle_name))
my_cvp_uploader.create_bundle(
name=bundle_name,
images_name=[bundle_name]
)
sys.exit(0)

View file

@ -1,86 +0,0 @@
#!/usr/bin/python
import sys
import os
import argparse
import eos_downloader.eos
from loguru import logger
from rich.console import Console
ARISTA_TOKEN = os.getenv('ARISTA_TOKEN', '')
def read_cli():
parser = argparse.ArgumentParser(description='EOS downloader script.')
parser.add_argument('--token', required=False,
default=ARISTA_TOKEN,
help='arista.com user API key - can use ENV:ARISTA_TOKEN')
parser.add_argument('--image', required=False,
default='EOS', help='Type of EOS image required')
parser.add_argument('--version', required=True,
default='', help='EOS version to download from website')
parser.add_argument('--destination', required=False,
default=str(os.getcwd()),
help='Path where to save EOS package downloaded')
parser.add_argument('--eve', required=False, action='store_true',
help="Option to install EOS package to EVE-NG")
parser.add_argument('--noztp', required=False, action='store_true',
help="Option to deactivate ZTP when used with EVE-NG")
parser.add_argument('--import_docker', required=False, action='store_true',
help="Option to import cEOS image to docker")
parser.add_argument('--docker_name', required=False,
default='arista/ceos',
help='Docker image name to use')
parser.add_argument('--verbose', required=False,
default='info', help='Script verbosity')
parser.add_argument('--log', required=False, action='store_true',
help="Option to activate logging to eos-downloader.log file")
return parser.parse_args()
if __name__ == '__main__':
cli_options = read_cli()
console = Console()
console.print('\n[red]WARNING: This script is now deprecated. Please use ardl cli instead[/red]\n\n')
if cli_options.token is None or cli_options.token == '':
console.print('\n❗ Token is unset ! Please configure ARISTA_TOKEN or use --token option', style="bold red")
sys.exit(1)
logger.remove()
if cli_options.log:
logger.add("eos-downloader.log", rotation="10 MB", level=str(cli_options.verbose).upper())
console.print("🪐 [bold blue]eos-downloader[/bold blue] is starting...", )
console.print(f' - Image Type: {cli_options.image}')
console.print(f' - Version: {cli_options.version}')
my_download = eos_downloader.eos.EOSDownloader(
image=cli_options.image,
software='EOS',
version=cli_options.version,
token=cli_options.token,
hash_method='sha512sum')
my_download.authenticate()
if cli_options.eve:
my_download.provision_eve(noztp=cli_options.noztp, checksum=True)
else:
my_download.download_local(file_path=cli_options.destination, checksum=True)
if cli_options.import_docker:
my_download.docker_import(
image_name=cli_options.docker_name
)
console.print('✅ processing done !')
sys.exit(0)

View file

@ -5,23 +5,36 @@
EOS Downloader module. EOS Downloader module.
""" """
from __future__ import (absolute_import, division, from __future__ import (
print_function, unicode_literals, annotations) absolute_import,
import dataclasses annotations,
from typing import Any division,
import json print_function,
import importlib.metadata unicode_literals,
)
__author__ = '@titom73' import dataclasses
__email__ = 'tom@inetsix.net' import importlib.metadata
__date__ = '2022-03-16' import json
from typing import Any
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from _typeshed import DataclassInstance # noqa: F401
__author__ = "@titom73"
__email__ = "tom@inetsix.net"
__date__ = "2022-03-16"
__version__ = importlib.metadata.version("eos-downloader") __version__ = importlib.metadata.version("eos-downloader")
# __all__ = ["CvpAuthenticationItem", "CvFeatureManager", "EOSDownloader", "ObjectDownloader", "reverse"] # __all__ = ["CvpAuthenticationItem", "CvFeatureManager", "EOSDownloader", "ObjectDownloader", "reverse"]
ARISTA_GET_SESSION = "https://www.arista.com/custom_data/api/cvp/getSessionCode/" ARISTA_GET_SESSION = "https://www.arista.com/custom_data/api/cvp/getSessionCode/"
ARISTA_SOFTWARE_FOLDER_TREE = "https://www.arista.com/custom_data/api/cvp/getFolderTree/" ARISTA_SOFTWARE_FOLDER_TREE = (
"https://www.arista.com/custom_data/api/cvp/getFolderTree/"
)
ARISTA_DOWNLOAD_URL = "https://www.arista.com/custom_data/api/cvp/getDownloadLink/" ARISTA_DOWNLOAD_URL = "https://www.arista.com/custom_data/api/cvp/getDownloadLink/"
@ -36,12 +49,13 @@ check the Access Token. Then re-run the script with the correct token.
MSG_INVALID_DATA = """Invalid data returned by server MSG_INVALID_DATA = """Invalid data returned by server
""" """
EVE_QEMU_FOLDER_PATH = '/opt/unetlab/addons/qemu/' EVE_QEMU_FOLDER_PATH = "/opt/unetlab/addons/qemu/"
class EnhancedJSONEncoder(json.JSONEncoder): class EnhancedJSONEncoder(json.JSONEncoder):
"""Custom JSon encoder.""" """Custom JSon encoder."""
def default(self, o: Any) -> Any: def default(self, o: Any) -> Any:
if dataclasses.is_dataclass(o): if dataclasses.is_dataclass(o):
return dataclasses.asdict(o) return dataclasses.asdict(o) # type: ignore
return super().default(o) return super().default(o)

View file

@ -11,49 +11,51 @@ ARDL CLI Baseline.
""" """
import click import click
from rich.console import Console
import eos_downloader from eos_downloader import __version__
from eos_downloader.cli.get import commands as get_commands
from eos_downloader.cli.debug import commands as debug_commands from eos_downloader.cli.debug import commands as debug_commands
from eos_downloader.cli.get import commands as get_commands
from eos_downloader.cli.info import commands as info_commands from eos_downloader.cli.info import commands as info_commands
from eos_downloader.cli.utils import AliasedGroup
@click.group()
@click.group(cls=AliasedGroup)
@click.version_option(__version__)
@click.pass_context @click.pass_context
@click.option('--token', show_envvar=True, default=None, help='Arista Token from your customer account') @click.option(
"--token",
show_envvar=True,
default=None,
help="Arista Token from your customer account",
)
def ardl(ctx: click.Context, token: str) -> None: def ardl(ctx: click.Context, token: str) -> None:
"""Arista Network Download CLI""" """Arista Network Download CLI"""
ctx.ensure_object(dict) ctx.ensure_object(dict)
ctx.obj['token'] = token ctx.obj["token"] = token
@click.command() @ardl.group(cls=AliasedGroup, no_args_is_help=True)
def version() -> None:
"""Display version of ardl"""
console = Console()
console.print(f'ardl is running version {eos_downloader.__version__}')
@ardl.group(no_args_is_help=True)
@click.pass_context @click.pass_context
def get(ctx: click.Context) -> None: def get(ctx: click.Context, cls: click.Group = AliasedGroup) -> None:
# pylint: disable=redefined-builtin # pylint: disable=redefined-builtin
"""Download Arista from Arista website""" """Download Arista from Arista website"""
@ardl.group(no_args_is_help=True) @ardl.group(cls=AliasedGroup, no_args_is_help=True)
@click.pass_context @click.pass_context
def info(ctx: click.Context) -> None: def info(ctx: click.Context, cls: click.Group = AliasedGroup) -> None:
# pylint: disable=redefined-builtin # pylint: disable=redefined-builtin
"""List information from Arista website""" """List information from Arista website"""
@ardl.group(no_args_is_help=True) @ardl.group(cls=AliasedGroup, no_args_is_help=True)
@click.pass_context @click.pass_context
def debug(ctx: click.Context) -> None: def debug(ctx: click.Context, cls: click.Group = AliasedGroup) -> None:
# pylint: disable=redefined-builtin # pylint: disable=redefined-builtin
"""Debug commands to work with ardl""" """Debug commands to work with ardl"""
# ANTA CLI Execution # ANTA CLI Execution
@ -64,13 +66,9 @@ def cli() -> None:
get.add_command(get_commands.cvp) get.add_command(get_commands.cvp)
info.add_command(info_commands.eos_versions) info.add_command(info_commands.eos_versions)
debug.add_command(debug_commands.xml) debug.add_command(debug_commands.xml)
ardl.add_command(version)
# Load CLI # Load CLI
ardl( ardl(obj={}, auto_envvar_prefix="arista")
obj={},
auto_envvar_prefix='arista'
)
if __name__ == '__main__': if __name__ == "__main__":
cli() cli()

View file

@ -22,32 +22,51 @@ import eos_downloader.eos
@click.command() @click.command()
@click.pass_context @click.pass_context
@click.option('--output', default=str('arista.xml'), help='Path to save XML file', type=click.Path(), show_default=True) @click.option(
@click.option('--log-level', '--log', help='Logging level of the command', default=None, type=click.Choice(['debug', 'info', 'warning', 'error', 'critical'], case_sensitive=False)) "--output",
default=str("arista.xml"),
help="Path to save XML file",
type=click.Path(),
show_default=True,
)
@click.option(
"--log-level",
"--log",
help="Logging level of the command",
default=None,
type=click.Choice(
["debug", "info", "warning", "error", "critical"], case_sensitive=False
),
)
def xml(ctx: click.Context, output: str, log_level: str) -> None: def xml(ctx: click.Context, output: str, log_level: str) -> None:
# sourcery skip: remove-unnecessary-cast # sourcery skip: remove-unnecessary-cast
"""Extract XML directory structure""" """Extract XML directory structure"""
console = Console() console = Console()
# Get from Context # Get from Context
token = ctx.obj['token'] token = ctx.obj["token"]
logger.remove() logger.remove()
if log_level is not None: if log_level is not None:
logger.add("eos-downloader.log", rotation="10 MB", level=log_level.upper()) logger.add("eos-downloader.log", rotation="10 MB", level=log_level.upper())
my_download = eos_downloader.eos.EOSDownloader( my_download = eos_downloader.eos.EOSDownloader(
image='unset', image="unset",
software='EOS', software="EOS",
version='unset', version="unset",
token=token, token=token,
hash_method='sha512sum') hash_method="sha512sum",
)
my_download.authenticate() my_download.authenticate()
xml_object: ET.ElementTree = my_download._get_folder_tree() # pylint: disable=protected-access xml_object: ET.ElementTree = (
my_download.get_folder_tree()
) # pylint: disable=protected-access
xml_content = xml_object.getroot() xml_content = xml_object.getroot()
xmlstr = minidom.parseString(ET.tostring(xml_content)).toprettyxml(indent=" ", newl='') xmlstr = minidom.parseString(ET.tostring(xml_content)).toprettyxml(
with open(output, "w", encoding='utf-8') as f: indent=" ", newl=""
)
with open(output, "w", encoding="utf-8") as f:
f.write(str(xmlstr)) f.write(str(xmlstr))
console.print(f'XML file saved in: { output }') console.print(f"XML file saved in: { output }")

View file

@ -21,68 +21,159 @@ from rich.console import Console
import eos_downloader.eos import eos_downloader.eos
from eos_downloader.models.version import BASE_VERSION_STR, RTYPE_FEATURE, RTYPES from eos_downloader.models.version import BASE_VERSION_STR, RTYPE_FEATURE, RTYPES
EOS_IMAGE_TYPE = ['64', 'INT', '2GB-INT', 'cEOS', 'cEOS64', 'vEOS', 'vEOS-lab', 'EOS-2GB', 'default'] EOS_IMAGE_TYPE = [
CVP_IMAGE_TYPE = ['ova', 'rpm', 'kvm', 'upgrade'] "64",
"INT",
"2GB-INT",
"cEOS",
"cEOS64",
"vEOS",
"vEOS-lab",
"EOS-2GB",
"default",
]
CVP_IMAGE_TYPE = ["ova", "rpm", "kvm", "upgrade"]
@click.command(no_args_is_help=True) @click.command(no_args_is_help=True)
@click.pass_context @click.pass_context
@click.option('--image-type', default='default', help='EOS Image type', type=click.Choice(EOS_IMAGE_TYPE), required=True) @click.option(
@click.option('--version', default=None, help='EOS version', type=str, required=False) "--image-type",
@click.option('--latest', '-l', is_flag=True, type=click.BOOL, default=False, help='Get latest version in given branch. If --branch is not use, get the latest branch with specific release type') default="default",
@click.option('--release-type', '-rtype', type=click.Choice(RTYPES, case_sensitive=False), default=RTYPE_FEATURE, help='EOS release type to search') help="EOS Image type",
@click.option('--branch', '-b', type=click.STRING, default=None, help='EOS Branch to list releases') type=click.Choice(EOS_IMAGE_TYPE),
@click.option('--docker-name', default='arista/ceos', help='Docker image name (default: arista/ceos)', type=str, show_default=True) required=True,
@click.option('--output', default=str(os.path.relpath(os.getcwd(), start=os.curdir)), help='Path to save image', type=click.Path(),show_default=True) )
@click.option("--version", default=None, help="EOS version", type=str, required=False)
@click.option(
"--latest",
"-l",
is_flag=True,
type=click.BOOL,
default=False,
help="Get latest version in given branch. If --branch is not use, get the latest branch with specific release type",
)
@click.option(
"--release-type",
"-rtype",
type=click.Choice(RTYPES, case_sensitive=False),
default=RTYPE_FEATURE,
help="EOS release type to search",
)
@click.option(
"--branch",
"-b",
type=click.STRING,
default=None,
help="EOS Branch to list releases",
)
@click.option(
"--docker-name",
default="arista/ceos",
help="Docker image name (default: arista/ceos)",
type=str,
show_default=True,
)
@click.option(
"--output",
default=str(os.path.relpath(os.getcwd(), start=os.curdir)),
help="Path to save image",
type=click.Path(),
show_default=True,
)
# Debugging # Debugging
@click.option('--log-level', '--log', help='Logging level of the command', default=None, type=click.Choice(['debug', 'info', 'warning', 'error', 'critical'], case_sensitive=False)) @click.option(
"--log-level",
"--log",
help="Logging level of the command",
default=None,
type=click.Choice(
["debug", "info", "warning", "error", "critical"], case_sensitive=False
),
)
# Boolean triggers # Boolean triggers
@click.option('--eve-ng', is_flag=True, help='Run EVE-NG vEOS provisioning (only if CLI runs on an EVE-NG server)', default=False) @click.option(
@click.option('--disable-ztp', is_flag=True, help='Disable ZTP process in vEOS image (only available with --eve-ng)', default=False) "--eve-ng",
@click.option('--import-docker', is_flag=True, help='Import docker image (only available with --image_type cEOSlab)', default=False) is_flag=True,
help="Run EVE-NG vEOS provisioning (only if CLI runs on an EVE-NG server)",
default=False,
)
@click.option(
"--disable-ztp",
is_flag=True,
help="Disable ZTP process in vEOS image (only available with --eve-ng)",
default=False,
)
@click.option(
"--import-docker",
is_flag=True,
help="Import docker image (only available with --image_type cEOSlab)",
default=False,
)
def eos( def eos(
ctx: click.Context, image_type: str, output: str, log_level: str, eve_ng: bool, disable_ztp: bool, ctx: click.Context,
import_docker: bool, docker_name: str, version: Union[str, None] = None, release_type: str = RTYPE_FEATURE, image_type: str,
latest: bool = False, branch: Union[str,None] = None output: str,
) -> int: log_level: str,
eve_ng: bool,
disable_ztp: bool,
import_docker: bool,
docker_name: str,
version: Union[str, None] = None,
release_type: str = RTYPE_FEATURE,
latest: bool = False,
branch: Union[str, None] = None,
) -> int:
# pylint: disable=R0917
"""Download EOS image from Arista website""" """Download EOS image from Arista website"""
console = Console() console = Console()
# Get from Context # Get from Context
token = ctx.obj['token'] token = ctx.obj["token"]
if token is None or token == '': is_latest: bool = False
console.print('❗ Token is unset ! Please configure ARISTA_TOKEN or use --token option', style="bold red") if token is None or token == "":
console.print(
"❗ Token is unset ! Please configure ARISTA_TOKEN or use --token option",
style="bold red",
)
sys.exit(1) sys.exit(1)
logger.remove() logger.remove()
if log_level is not None: if log_level is not None:
logger.add("eos-downloader.log", rotation="10 MB", level=log_level.upper()) logger.add("eos-downloader.log", rotation="10 MB", level=log_level.upper())
console.print("🪐 [bold blue]eos-downloader[/bold blue] is starting...", ) console.print(
console.print(f' - Image Type: {image_type}') "🪐 [bold blue]eos-downloader[/bold blue] is starting...",
console.print(f' - Version: {version}') )
console.print(f" - Image Type: {image_type}")
console.print(f" - Version: {version}")
if version is not None: if version is not None:
my_download = eos_downloader.eos.EOSDownloader( my_download = eos_downloader.eos.EOSDownloader(
image=image_type, image=image_type,
software='EOS', software="EOS",
version=version, version=version,
token=token, token=token,
hash_method='sha512sum') hash_method="sha512sum",
)
my_download.authenticate() my_download.authenticate()
elif latest: elif latest:
is_latest = True
my_download = eos_downloader.eos.EOSDownloader( my_download = eos_downloader.eos.EOSDownloader(
image=image_type, image=image_type,
software='EOS', software="EOS",
version='unset', version="unset",
token=token, token=token,
hash_method='sha512sum') hash_method="sha512sum",
)
my_download.authenticate() my_download.authenticate()
if branch is None: if branch is None:
branch = str(my_download.latest_branch(rtype=release_type).branch) branch = str(my_download.latest_branch(rtype=release_type).branch)
latest_version = my_download.latest_eos(branch, rtype=release_type) latest_version = my_download.latest_eos(branch, rtype=release_type)
if str(latest_version) == BASE_VERSION_STR: if str(latest_version) == BASE_VERSION_STR:
console.print(f'[red]Error[/red], cannot find any version in {branch} for {release_type} release type') console.print(
f"[red]Error[/red], cannot find any version in {branch} for {release_type} release type"
)
sys.exit(1) sys.exit(1)
my_download.version = str(latest_version) my_download.version = str(latest_version)
@ -92,46 +183,71 @@ def eos(
my_download.download_local(file_path=output, checksum=True) my_download.download_local(file_path=output, checksum=True)
if import_docker: if import_docker:
my_download.docker_import( my_download.docker_import(image_name=docker_name, is_latest=is_latest)
image_name=docker_name console.print("✅ processing done !")
)
console.print('✅ processing done !')
sys.exit(0) sys.exit(0)
@click.command(no_args_is_help=True) @click.command(no_args_is_help=True)
@click.pass_context @click.pass_context
@click.option('--format', default='upgrade', help='CVP Image type', type=click.Choice(CVP_IMAGE_TYPE), required=True) @click.option(
@click.option('--version', default=None, help='CVP version', type=str, required=True) "--format",
@click.option('--output', default=str(os.path.relpath(os.getcwd(), start=os.curdir)), help='Path to save image', type=click.Path(),show_default=True) default="upgrade",
@click.option('--log-level', '--log', help='Logging level of the command', default=None, type=click.Choice(['debug', 'info', 'warning', 'error', 'critical'], case_sensitive=False)) help="CVP Image type",
def cvp(ctx: click.Context, version: str, format: str, output: str, log_level: str) -> int: type=click.Choice(CVP_IMAGE_TYPE),
required=True,
)
@click.option("--version", default=None, help="CVP version", type=str, required=True)
@click.option(
"--output",
default=str(os.path.relpath(os.getcwd(), start=os.curdir)),
help="Path to save image",
type=click.Path(),
show_default=True,
)
@click.option(
"--log-level",
"--log",
help="Logging level of the command",
default=None,
type=click.Choice(
["debug", "info", "warning", "error", "critical"], case_sensitive=False
),
)
def cvp(
ctx: click.Context, version: str, format: str, output: str, log_level: str
) -> int:
"""Download CVP image from Arista website""" """Download CVP image from Arista website"""
console = Console() console = Console()
# Get from Context # Get from Context
token = ctx.obj['token'] token = ctx.obj["token"]
if token is None or token == '': if token is None or token == "":
console.print('❗ Token is unset ! Please configure ARISTA_TOKEN or use --token option', style="bold red") console.print(
"❗ Token is unset ! Please configure ARISTA_TOKEN or use --token option",
style="bold red",
)
sys.exit(1) sys.exit(1)
logger.remove() logger.remove()
if log_level is not None: if log_level is not None:
logger.add("eos-downloader.log", rotation="10 MB", level=log_level.upper()) logger.add("eos-downloader.log", rotation="10 MB", level=log_level.upper())
console.print("🪐 [bold blue]eos-downloader[/bold blue] is starting...", ) console.print(
console.print(f' - Image Type: {format}') "🪐 [bold blue]eos-downloader[/bold blue] is starting...",
console.print(f' - Version: {version}') )
console.print(f" - Image Type: {format}")
console.print(f" - Version: {version}")
my_download = eos_downloader.eos.EOSDownloader( my_download = eos_downloader.eos.EOSDownloader(
image=format, image=format,
software='CloudVision', software="CloudVision",
version=version, version=version,
token=token, token=token,
hash_method='md5sum') hash_method="md5sum",
)
my_download.authenticate() my_download.authenticate()
my_download.download_local(file_path=output, checksum=False) my_download.download_local(file_path=output, checksum=False)
console.print('✅ processing done !') console.print("✅ processing done !")
sys.exit(0) sys.exit(0)

View file

@ -24,13 +24,54 @@ from eos_downloader.models.version import BASE_VERSION_STR, RTYPE_FEATURE, RTYPE
@click.command(no_args_is_help=True) @click.command(no_args_is_help=True)
@click.pass_context @click.pass_context
@click.option('--latest', '-l', is_flag=True, type=click.BOOL, default=False, help='Get latest version in given branch. If --branch is not use, get the latest branch with specific release type') @click.option(
@click.option('--release-type', '-rtype', type=click.Choice(RTYPES, case_sensitive=False), default=RTYPE_FEATURE, help='EOS release type to search') "--latest",
@click.option('--branch', '-b', type=click.STRING, default=None, help='EOS Branch to list releases') "-l",
@click.option('--verbose', '-v', is_flag=True, type=click.BOOL, default=False, help='Human readable output. Default is none to use output in script)') is_flag=True,
@click.option('--log-level', '--log', help='Logging level of the command', default='warning', type=click.Choice(['debug', 'info', 'warning', 'error', 'critical'], case_sensitive=False)) type=click.BOOL,
def eos_versions(ctx: click.Context, log_level: str, branch: Union[str,None] = None, release_type: str = RTYPE_FEATURE, latest: bool = False, verbose: bool = False) -> None: default=False,
# pylint: disable = too-many-branches help="Get latest version in given branch. If --branch is not use, get the latest branch with specific release type",
)
@click.option(
"--release-type",
"-rtype",
type=click.Choice(RTYPES, case_sensitive=False),
default=RTYPE_FEATURE,
help="EOS release type to search",
)
@click.option(
"--branch",
"-b",
type=click.STRING,
default=None,
help="EOS Branch to list releases",
)
@click.option(
"--verbose",
"-v",
is_flag=True,
type=click.BOOL,
default=False,
help="Human readable output. Default is none to use output in script)",
)
@click.option(
"--log-level",
"--log",
help="Logging level of the command",
default="warning",
type=click.Choice(
["debug", "info", "warning", "error", "critical"], case_sensitive=False
),
)
def eos_versions(
ctx: click.Context,
log_level: str,
branch: Union[str, None] = None,
release_type: str = RTYPE_FEATURE,
latest: bool = False,
verbose: bool = False,
) -> None:
# pylint: disable = too-many-branches, R0917
""" """
List Available EOS version on Arista.com website. List Available EOS version on Arista.com website.
@ -42,22 +83,23 @@ def eos_versions(ctx: click.Context, log_level: str, branch: Union[str,None] = N
""" """
console = Console() console = Console()
# Get from Context # Get from Context
token = ctx.obj['token'] token = ctx.obj["token"]
logger.remove() logger.remove()
if log_level is not None: if log_level is not None:
logger.add("eos-downloader.log", rotation="10 MB", level=log_level.upper()) logger.add("eos-downloader.log", rotation="10 MB", level=log_level.upper())
my_download = eos_downloader.eos.EOSDownloader( my_download = eos_downloader.eos.EOSDownloader(
image='unset', image="unset",
software='EOS', software="EOS",
version='unset', version="unset",
token=token, token=token,
hash_method='sha512sum') hash_method="sha512sum",
)
auth = my_download.authenticate() auth = my_download.authenticate()
if verbose and auth: if verbose and auth:
console.print('✅ Authenticated on arista.com') console.print("✅ Authenticated on arista.com")
if release_type is not None: if release_type is not None:
release_type = release_type.upper() release_type = release_type.upper()
@ -67,21 +109,27 @@ def eos_versions(ctx: click.Context, log_level: str, branch: Union[str,None] = N
branch = str(my_download.latest_branch(rtype=release_type).branch) branch = str(my_download.latest_branch(rtype=release_type).branch)
latest_version = my_download.latest_eos(branch, rtype=release_type) latest_version = my_download.latest_eos(branch, rtype=release_type)
if str(latest_version) == BASE_VERSION_STR: if str(latest_version) == BASE_VERSION_STR:
console.print(f'[red]Error[/red], cannot find any version in {branch} for {release_type} release type') console.print(
f"[red]Error[/red], cannot find any version in {branch} for {release_type} release type"
)
sys.exit(1) sys.exit(1)
if verbose: if verbose:
console.print(f'Branch {branch} has been selected with release type {release_type}') console.print(
f"Branch {branch} has been selected with release type {release_type}"
)
if branch is not None: if branch is not None:
console.print(f'Latest release for {branch}: {latest_version}') console.print(f"Latest release for {branch}: {latest_version}")
else: else:
console.print(f'Latest EOS release: {latest_version}') console.print(f"Latest EOS release: {latest_version}")
else: else:
console.print(f'{ latest_version }') console.print(f"{ latest_version }")
else: else:
versions = my_download.get_eos_versions(branch=branch, rtype=release_type) versions = my_download.get_eos_versions(branch=branch, rtype=release_type)
if verbose: if verbose:
console.print(f'List of available versions for {branch if branch is not None else "all branches"}') console.print(
f'List of available versions for {branch if branch is not None else "all branches"}'
)
for version in versions: for version in versions:
console.print(f'{str(version)}') console.print(f"{str(version)}")
else: else:
pprint([str(version) for version in versions]) pprint([str(version) for version in versions])

View file

@ -0,0 +1,38 @@
#!/usr/bin/python
# coding: utf-8 -*-
# pylint: disable=inconsistent-return-statements
"""
Extension for the python ``click`` module
to provide a group or command with aliases.
"""
from typing import Any
import click
class AliasedGroup(click.Group):
"""
Implements a subclass of Group that accepts a prefix for a command.
If there were a command called push, it would accept pus as an alias (so long as it was unique)
"""
def get_command(self, ctx: click.Context, cmd_name: str) -> Any:
"""Documentation to build"""
rv = click.Group.get_command(self, ctx, cmd_name)
if rv is not None:
return rv
matches = [x for x in self.list_commands(ctx)
if x.startswith(cmd_name)]
if not matches:
return None
if len(matches) == 1:
return click.Group.get_command(self, ctx, matches[0])
ctx.fail(f"Too many matches: {', '.join(sorted(matches))}")
def resolve_command(self, ctx: click.Context, args: Any) -> Any:
"""Documentation to build"""
# always return the full command name
_, cmd, args = super().resolve_command(ctx, args)
return cmd.name, cmd, args

View file

@ -6,11 +6,12 @@ CVP Uploader content
""" """
import os import os
from typing import List, Optional, Any
from dataclasses import dataclass from dataclasses import dataclass
from loguru import logger from typing import Any, List, Optional
from cvprac.cvp_client import CvpClient from cvprac.cvp_client import CvpClient
from cvprac.cvp_client_errors import CvpLoginError from cvprac.cvp_client_errors import CvpLoginError
from loguru import logger
# from eos_downloader.tools import exc_to_str # from eos_downloader.tools import exc_to_str
@ -20,8 +21,9 @@ from cvprac.cvp_client_errors import CvpLoginError
@dataclass @dataclass
class CvpAuthenticationItem: class CvpAuthenticationItem:
""" """
Data structure to represent Cloudvision Authentication Data structure to represent Cloudvision Authentication
""" """
server: str server: str
port: int = 443 port: int = 443
token: Optional[str] = None token: Optional[str] = None
@ -29,15 +31,16 @@ class CvpAuthenticationItem:
validate_cert: bool = False validate_cert: bool = False
class Filer(): class Filer:
# pylint: disable=too-few-public-methods # pylint: disable=too-few-public-methods
""" """
Filer Helper for file management Filer Helper for file management
""" """
def __init__(self, path: str) -> None: def __init__(self, path: str) -> None:
self.file_exist = False self.file_exist = False
self.filename = '' self.filename = ""
self.absolute_path = '' self.absolute_path = ""
self.relative_path = path self.relative_path = path
if os.path.exists(path): if os.path.exists(path):
self.file_exist = True self.file_exist = True
@ -45,13 +48,14 @@ class Filer():
self.absolute_path = os.path.realpath(path) self.absolute_path = os.path.realpath(path)
def __repr__(self) -> str: def __repr__(self) -> str:
return self.absolute_path if self.file_exist else '' return self.absolute_path if self.file_exist else ""
class CvFeatureManager(): class CvFeatureManager:
""" """
CvFeatureManager Object to interect with Cloudvision CvFeatureManager Object to interect with Cloudvision
""" """
def __init__(self, authentication: CvpAuthenticationItem) -> None: def __init__(self, authentication: CvpAuthenticationItem) -> None:
""" """
__init__ Class Creator __init__ Class Creator
@ -86,19 +90,21 @@ class CvFeatureManager():
try: try:
client.connect( client.connect(
nodes=[authentication.server], nodes=[authentication.server],
username='', username="",
password='', password="",
api_token=authentication.token, api_token=authentication.token,
is_cvaas=True, is_cvaas=True,
port=authentication.port, port=authentication.port,
cert=authentication.validate_cert, cert=authentication.validate_cert,
request_timeout=authentication.timeout request_timeout=authentication.timeout,
) )
except CvpLoginError as error_data: except CvpLoginError as error_data:
logger.error(f'Cannot connect to Cloudvision server {authentication.server}') logger.error(
logger.debug(f'Error message: {error_data}') f"Cannot connect to Cloudvision server {authentication.server}"
logger.info('connected to Cloudvision server') )
logger.debug(f'Connection info: {authentication}') logger.debug(f"Error message: {error_data}")
logger.info("connected to Cloudvision server")
logger.debug(f"Connection info: {authentication}")
return client return client
def __get_images(self) -> List[Any]: def __get_images(self) -> List[Any]:
@ -111,8 +117,8 @@ class CvFeatureManager():
Fact returned by Cloudvision Fact returned by Cloudvision
""" """
images = [] images = []
logger.debug(' -> Collecting images') logger.debug(" -> Collecting images")
images = self._cv_instance.api.get_images()['data'] images = self._cv_instance.api.get_images()["data"]
return images if self.__check_api_result(images) else [] return images if self.__check_api_result(images) else []
# def __get_bundles(self): # def __get_bundles(self):
@ -161,7 +167,11 @@ class CvFeatureManager():
bool bool
True if present True if present
""" """
return any(image_name == image['name'] for image in self._cv_images) if isinstance(self._cv_images, list) else False return (
any(image_name == image["name"] for image in self._cv_images)
if isinstance(self._cv_images, list)
else False
)
def _does_bundle_exist(self, bundle_name: str) -> bool: def _does_bundle_exist(self, bundle_name: str) -> bool:
# pylint: disable=unused-argument # pylint: disable=unused-argument
@ -192,19 +202,23 @@ class CvFeatureManager():
""" """
image_item = Filer(path=image_path) image_item = Filer(path=image_path)
if image_item.file_exist is False: if image_item.file_exist is False:
logger.error(f'File not found: {image_item.relative_path}') logger.error(f"File not found: {image_item.relative_path}")
return False return False
logger.info(f'File path for image: {image_item}') logger.info(f"File path for image: {image_item}")
if self._does_image_exist(image_name=image_item.filename): if self._does_image_exist(image_name=image_item.filename):
logger.error("Image found in Cloudvision , Please delete it before running this script") logger.error(
"Image found in Cloudvision , Please delete it before running this script"
)
return False return False
try: try:
upload_result = self._cv_instance.api.add_image(filepath=image_item.absolute_path) upload_result = self._cv_instance.api.add_image(
filepath=image_item.absolute_path
)
except Exception as e: # pylint: disable=broad-exception-caught except Exception as e: # pylint: disable=broad-exception-caught
logger.error('An error occurred during upload, check CV connection') logger.error("An error occurred during upload, check CV connection")
logger.error(f'Exception message is: {e}') logger.error(f"Exception message is: {e}")
return False return False
logger.debug(f'Upload Result is : {upload_result}') logger.debug(f"Upload Result is : {upload_result}")
return True return True
def build_image_list(self, image_list: List[str]) -> List[Any]: def build_image_list(self, image_list: List[str]) -> List[Any]:
@ -252,25 +266,30 @@ class CvFeatureManager():
bool bool
True if succeeds True if succeeds
""" """
logger.debug(f'Init creation of an image bundle {name} with following images {images_name}') logger.debug(
f"Init creation of an image bundle {name} with following images {images_name}"
)
all_images_present: List[bool] = [] all_images_present: List[bool] = []
self._cv_images = self.__get_images() self._cv_images = self.__get_images()
all_images_present.extend( all_images_present.extend(
self._does_image_exist(image_name=image_name) self._does_image_exist(image_name=image_name) for image_name in images_name
for image_name in images_name
) )
# Bundle Create # Bundle Create
if self._does_bundle_exist(bundle_name=name) is False: if self._does_bundle_exist(bundle_name=name) is False:
logger.debug(f'Creating image bundle {name} with following images {images_name}') logger.debug(
f"Creating image bundle {name} with following images {images_name}"
)
images_data = self.build_image_list(image_list=images_name) images_data = self.build_image_list(image_list=images_name)
if images_data is not None: if images_data is not None:
logger.debug('Images information: {images_data}') logger.debug("Images information: {images_data}")
try: try:
data = self._cv_instance.api.save_image_bundle(name=name, images=images_data) data = self._cv_instance.api.save_image_bundle(
name=name, images=images_data
)
except Exception as e: # pylint: disable=broad-exception-caught except Exception as e: # pylint: disable=broad-exception-caught
logger.critical(f'{e}') logger.critical(f"{e}")
else: else:
logger.debug(data) logger.debug(data)
return True return True
logger.critical('No data found for images') logger.critical("No data found for images")
return False return False

View file

@ -12,82 +12,22 @@ Data are built from content of Arista XML file
# [platform][image][version] # [platform][image][version]
DATA_MAPPING = { DATA_MAPPING = {
"CloudVision": { "CloudVision": {
"ova": { "ova": {"extension": ".ova", "prepend": "cvp", "folder_level": 0},
"extension": ".ova", "rpm": {"extension": "", "prepend": "cvp-rpm-installer", "folder_level": 0},
"prepend": "cvp", "kvm": {"extension": "-kvm.tgz", "prepend": "cvp", "folder_level": 0},
"folder_level": 0 "upgrade": {"extension": ".tgz", "prepend": "cvp-upgrade", "folder_level": 0},
},
"rpm": {
"extension": "",
"prepend": "cvp-rpm-installer",
"folder_level": 0
},
"kvm": {
"extension": "-kvm.tgz",
"prepend": "cvp",
"folder_level": 0
},
"upgrade": {
"extension": ".tgz",
"prepend": "cvp-upgrade",
"folder_level": 0
},
}, },
"EOS": { "EOS": {
"64": { "64": {"extension": ".swi", "prepend": "EOS64", "folder_level": 0},
"extension": ".swi", "INT": {"extension": "-INT.swi", "prepend": "EOS", "folder_level": 1},
"prepend": "EOS64", "2GB-INT": {"extension": "-INT.swi", "prepend": "EOS-2GB", "folder_level": 1},
"folder_level": 0 "cEOS": {"extension": ".tar.xz", "prepend": "cEOS-lab", "folder_level": 0},
}, "cEOS64": {"extension": ".tar.xz", "prepend": "cEOS64-lab", "folder_level": 0},
"INT": { "vEOS": {"extension": ".vmdk", "prepend": "vEOS", "folder_level": 0},
"extension": "-INT.swi", "vEOS-lab": {"extension": ".vmdk", "prepend": "vEOS-lab", "folder_level": 0},
"prepend": "EOS", "EOS-2GB": {"extension": ".swi", "prepend": "EOS-2GB", "folder_level": 0},
"folder_level": 1 "RN": {"extension": "-", "prepend": "RN", "folder_level": 0},
}, "SOURCE": {"extension": "-source.tar", "prepend": "EOS", "folder_level": 0},
"2GB-INT": { "default": {"extension": ".swi", "prepend": "EOS", "folder_level": 0},
"extension": "-INT.swi", },
"prepend": "EOS-2GB",
"folder_level": 1
},
"cEOS": {
"extension": ".tar.xz",
"prepend": "cEOS-lab",
"folder_level": 0
},
"cEOS64": {
"extension": ".tar.xz",
"prepend": "cEOS64-lab",
"folder_level": 0
},
"vEOS": {
"extension": ".vmdk",
"prepend": "vEOS",
"folder_level": 0
},
"vEOS-lab": {
"extension": ".vmdk",
"prepend": "vEOS-lab",
"folder_level": 0
},
"EOS-2GB": {
"extension": ".swi",
"prepend": "EOS-2GB",
"folder_level": 0
},
"RN": {
"extension": "-",
"prepend": "RN",
"folder_level": 0
},
"SOURCE": {
"extension": "-source.tar",
"prepend": "EOS",
"folder_level": 0
},
"default": {
"extension": ".swi",
"prepend": "EOS",
"folder_level": 0
}
}
} }

View file

@ -8,17 +8,29 @@ import os.path
import signal import signal
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
from threading import Event from threading import Event
from typing import Iterable, Any from typing import Any, Iterable
import requests import requests
import rich import rich
from rich import console from rich import console
from rich.progress import (BarColumn, DownloadColumn, Progress, TaskID, from rich.progress import (
TextColumn, TimeElapsedColumn, TransferSpeedColumn) BarColumn,
DownloadColumn,
Progress,
TaskID,
TextColumn,
TimeElapsedColumn,
TransferSpeedColumn,
)
console = rich.get_console() console = rich.get_console()
done_event = Event() done_event = Event()
REQUEST_HEADERS = {
"Content-Type": "application/json",
"User-Agent": "Chrome/123.0.0.0",
}
def handle_sigint(signum: Any, frame: Any) -> None: def handle_sigint(signum: Any, frame: Any) -> None:
"""Progress bar handler""" """Progress bar handler"""
@ -28,7 +40,7 @@ def handle_sigint(signum: Any, frame: Any) -> None:
signal.signal(signal.SIGINT, handle_sigint) signal.signal(signal.SIGINT, handle_sigint)
class DownloadProgressBar(): class DownloadProgressBar:
""" """
Object to manage Download process with Progress Bar from Rich Object to manage Download process with Progress Bar from Rich
""" """
@ -38,7 +50,9 @@ class DownloadProgressBar():
Class Constructor Class Constructor
""" """
self.progress = Progress( self.progress = Progress(
TextColumn("💾 Downloading [bold blue]{task.fields[filename]}", justify="right"), TextColumn(
"💾 Downloading [bold blue]{task.fields[filename]}", justify="right"
),
BarColumn(bar_width=None), BarColumn(bar_width=None),
"[progress.percentage]{task.percentage:>3.1f}%", "[progress.percentage]{task.percentage:>3.1f}%",
"", "",
@ -48,14 +62,16 @@ class DownloadProgressBar():
"", "",
TimeElapsedColumn(), TimeElapsedColumn(),
"", "",
console=console console=console,
) )
def _copy_url(self, task_id: TaskID, url: str, path: str, block_size: int = 1024) -> bool: def _copy_url(
self, task_id: TaskID, url: str, path: str, block_size: int = 1024
) -> bool:
"""Copy data from a url to a local file.""" """Copy data from a url to a local file."""
response = requests.get(url, stream=True, timeout=5) response = requests.get(url, stream=True, timeout=5, headers=REQUEST_HEADERS)
# This will break if the response doesn't contain content length # This will break if the response doesn't contain content length
self.progress.update(task_id, total=int(response.headers['Content-Length'])) self.progress.update(task_id, total=int(response.headers["Content-Length"]))
with open(path, "wb") as dest_file: with open(path, "wb") as dest_file:
self.progress.start_task(task_id) self.progress.start_task(task_id)
for data in response.iter_content(chunk_size=block_size): for data in response.iter_content(chunk_size=block_size):
@ -71,7 +87,9 @@ class DownloadProgressBar():
with self.progress: with self.progress:
with ThreadPoolExecutor(max_workers=4) as pool: with ThreadPoolExecutor(max_workers=4) as pool:
for url in urls: for url in urls:
filename = url.split("/")[-1].split('?')[0] filename = url.split("/")[-1].split("?")[0]
dest_path = os.path.join(dest_dir, filename) dest_path = os.path.join(dest_dir, filename)
task_id = self.progress.add_task("download", filename=filename, start=False) task_id = self.progress.add_task(
"download", filename=filename, start=False
)
pool.submit(self._copy_url, task_id, url, dest_path) pool.submit(self._copy_url, task_id, url, dest_path)

View file

@ -14,13 +14,20 @@ import rich
from loguru import logger from loguru import logger
from rich import console from rich import console
from eos_downloader.models.version import BASE_BRANCH_STR, BASE_VERSION_STR, REGEX_EOS_VERSION, RTYPE_FEATURE, EosVersion from eos_downloader.models.version import (
BASE_BRANCH_STR,
BASE_VERSION_STR,
REGEX_EOS_VERSION,
RTYPE_FEATURE,
EosVersion,
)
from eos_downloader.object_downloader import ObjectDownloader from eos_downloader.object_downloader import ObjectDownloader
# logger = logging.getLogger(__name__) # logger = logging.getLogger(__name__)
console = rich.get_console() console = rich.get_console()
class EOSDownloader(ObjectDownloader): class EOSDownloader(ObjectDownloader):
""" """
EOSDownloader Object to download EOS images from Arista.com website EOSDownloader Object to download EOS images from Arista.com website
@ -47,22 +54,27 @@ class EOSDownloader(ObjectDownloader):
file_path : str file_path : str
Path where EOS image is located Path where EOS image is located
""" """
logger.info('Mounting volume to disable ZTP') logger.info("Mounting volume to disable ZTP")
console.print('🚀 Mounting volume to disable ZTP') console.print("🚀 Mounting volume to disable ZTP")
raw_folder = os.path.join(file_path, "raw") raw_folder = os.path.join(file_path, "raw")
os.system(f"rm -rf {raw_folder}") os.system(f"rm -rf {raw_folder}")
os.system(f"mkdir -p {raw_folder}") os.system(f"mkdir -p {raw_folder}")
os.system( os.system(
f'guestmount -a {os.path.join(file_path, "hda.qcow2")} -m /dev/sda2 {os.path.join(file_path, "raw")}') f'guestmount -a {os.path.join(file_path, "hda.qcow2")} -m /dev/sda2 {os.path.join(file_path, "raw")}'
ztp_file = os.path.join(file_path, 'raw/zerotouch-config') )
with open(ztp_file, 'w', encoding='ascii') as zfile: ztp_file = os.path.join(file_path, "raw/zerotouch-config")
zfile.write('DISABLE=True') with open(ztp_file, "w", encoding="ascii") as zfile:
logger.info(f'Unmounting volume in {file_path}') zfile.write("DISABLE=True")
logger.info(f"Unmounting volume in {file_path}")
os.system(f"guestunmount {os.path.join(file_path, 'raw')}") os.system(f"guestunmount {os.path.join(file_path, 'raw')}")
os.system(f"rm -rf {os.path.join(file_path, 'raw')}") os.system(f"rm -rf {os.path.join(file_path, 'raw')}")
logger.info(f"Volume has been successfully unmounted at {file_path}") logger.info(f"Volume has been successfully unmounted at {file_path}")
def _parse_xml_for_version(self,root_xml: ET.ElementTree, xpath: str = './/dir[@label="Active Releases"]/dir/dir/[@label]') -> List[EosVersion]: def _parse_xml_for_version(
self,
root_xml: ET.ElementTree,
xpath: str = './/dir[@label="Active Releases"]/dir/dir/[@label]',
) -> List[EosVersion]:
""" """
Extract list of available EOS versions from Arista.com website Extract list of available EOS versions from Arista.com website
@ -77,19 +89,21 @@ class EOSDownloader(ObjectDownloader):
""" """
# XPATH: .//dir[@label="Active Releases"]/dir/dir/[@label] # XPATH: .//dir[@label="Active Releases"]/dir/dir/[@label]
if self.eos_versions is None: if self.eos_versions is None:
logger.debug(f'Using xpath {xpath}') logger.debug(f"Using xpath {xpath}")
eos_versions = [] eos_versions = []
for node in root_xml.findall(xpath): for node in root_xml.findall(xpath):
if 'label' in node.attrib and node.get('label') is not None: if "label" in node.attrib and node.get("label") is not None:
label = node.get('label') label = node.get("label")
if label is not None and REGEX_EOS_VERSION.match(label): if label is not None and REGEX_EOS_VERSION.match(label):
eos_version = EosVersion.from_str(label) eos_version = EosVersion.from_str(label)
eos_versions.append(eos_version) eos_versions.append(eos_version)
logger.debug(f"Found {label} - {eos_version}") logger.debug(f"Found {label} - {eos_version}")
logger.debug(f'List of versions found on arista.com is: {eos_versions}') logger.debug(f"List of versions found on arista.com is: {eos_versions}")
self.eos_versions = eos_versions self.eos_versions = eos_versions
else: else:
logger.debug('receiving instruction to download versions, but already available') logger.debug(
"receiving instruction to download versions, but already available"
)
return self.eos_versions return self.eos_versions
def _get_branches(self, with_rtype: str = RTYPE_FEATURE) -> List[str]: def _get_branches(self, with_rtype: str = RTYPE_FEATURE) -> List[str]:
@ -104,9 +118,11 @@ class EOSDownloader(ObjectDownloader):
Returns: Returns:
List[str]: A lsit of string that represent all availables EOS branches List[str]: A lsit of string that represent all availables EOS branches
""" """
root = self._get_folder_tree() root = self.get_folder_tree()
versions = self._parse_xml_for_version(root_xml=root) versions = self._parse_xml_for_version(root_xml=root)
return list({version.branch for version in versions if version.rtype == with_rtype}) return list(
{version.branch for version in versions if version.rtype == with_rtype}
)
def latest_branch(self, rtype: str = RTYPE_FEATURE) -> EosVersion: def latest_branch(self, rtype: str = RTYPE_FEATURE) -> EosVersion:
""" """
@ -121,11 +137,12 @@ class EOSDownloader(ObjectDownloader):
selected_branch = EosVersion.from_str(BASE_BRANCH_STR) selected_branch = EosVersion.from_str(BASE_BRANCH_STR)
for branch in self._get_branches(with_rtype=rtype): for branch in self._get_branches(with_rtype=rtype):
branch = EosVersion.from_str(branch) branch = EosVersion.from_str(branch)
if branch > selected_branch: selected_branch = max(selected_branch, branch)
selected_branch = branch
return selected_branch return selected_branch
def get_eos_versions(self, branch: Union[str,None] = None, rtype: Union[str,None] = None) -> List[EosVersion]: def get_eos_versions(
self, branch: Union[str, None] = None, rtype: Union[str, None] = None
) -> List[EosVersion]:
""" """
Get a list of available EOS version available on arista.com Get a list of available EOS version available on arista.com
@ -139,16 +156,22 @@ class EOSDownloader(ObjectDownloader):
Returns: Returns:
List[EosVersion]: A list of versions available List[EosVersion]: A list of versions available
""" """
root = self._get_folder_tree() root = self.get_folder_tree()
result = [] result = []
for version in self._parse_xml_for_version(root_xml=root): for version in self._parse_xml_for_version(root_xml=root):
if branch is None and (version.rtype == rtype or rtype is None): if branch is None and (version.rtype == rtype or rtype is None):
result.append(version) result.append(version)
elif branch is not None and version.is_in_branch(branch) and version.rtype == rtype: elif (
branch is not None
and version.is_in_branch(branch)
and version.rtype == rtype
):
result.append(version) result.append(version)
return result return result
def latest_eos(self, branch: Union[str,None] = None, rtype: str = RTYPE_FEATURE) -> EosVersion: def latest_eos(
self, branch: Union[str, None] = None, rtype: str = RTYPE_FEATURE
) -> EosVersion:
""" """
Get latest version of EOS Get latest version of EOS
@ -168,7 +191,9 @@ class EOSDownloader(ObjectDownloader):
latest_branch = self.latest_branch(rtype=rtype) latest_branch = self.latest_branch(rtype=rtype)
else: else:
latest_branch = EosVersion.from_str(branch) latest_branch = EosVersion.from_str(branch)
for version in self.get_eos_versions(branch=str(latest_branch.branch), rtype=rtype): for version in self.get_eos_versions(
branch=str(latest_branch.branch), rtype=rtype
):
if version > selected_version: if version > selected_version:
if rtype is not None and version.rtype == rtype: if rtype is not None and version.rtype == rtype:
selected_version = version selected_version = version

View file

@ -16,11 +16,11 @@ from eos_downloader.tools import exc_to_str
# logger = logging.getLogger(__name__) # logger = logging.getLogger(__name__)
BASE_VERSION_STR = '4.0.0F' BASE_VERSION_STR = "4.0.0F"
BASE_BRANCH_STR = '4.0' BASE_BRANCH_STR = "4.0"
RTYPE_FEATURE = 'F' RTYPE_FEATURE = "F"
RTYPE_MAINTENANCE = 'M' RTYPE_MAINTENANCE = "M"
RTYPES = [RTYPE_FEATURE, RTYPE_MAINTENANCE] RTYPES = [RTYPE_FEATURE, RTYPE_MAINTENANCE]
# Regular Expression to capture multiple EOS version format # Regular Expression to capture multiple EOS version format
@ -29,8 +29,12 @@ RTYPES = [RTYPE_FEATURE, RTYPE_MAINTENANCE]
# 4.21.1M # 4.21.1M
# 4.28.10.F # 4.28.10.F
# 4.28.6.1M # 4.28.6.1M
REGEX_EOS_VERSION = re.compile(r"^.*(?P<major>4)\.(?P<minor>\d{1,2})\.(?P<patch>\d{1,2})(?P<other>\.\d*)*(?P<rtype>[M,F])*$") REGEX_EOS_VERSION = re.compile(
REGEX_EOS_BRANCH = re.compile(r"^.*(?P<major>4)\.(?P<minor>\d{1,2})(\.?P<patch>\d)*(\.\d)*(?P<rtype>[M,F])*$") r"^.*(?P<major>4)\.(?P<minor>\d{1,2})\.(?P<patch>\d{1,2})(?P<other>\.\d*)*(?P<rtype>[M,F])*$"
)
REGEX_EOS_BRANCH = re.compile(
r"^.*(?P<major>4)\.(?P<minor>\d{1,2})(\.?P<patch>\d)*(\.\d)*(?P<rtype>[M,F])*$"
)
class EosVersion(BaseModel): class EosVersion(BaseModel):
@ -59,11 +63,12 @@ class EosVersion(BaseModel):
Args: Args:
BaseModel (Pydantic): Pydantic Base Model BaseModel (Pydantic): Pydantic Base Model
""" """
major: int = 4 major: int = 4
minor: int = 0 minor: int = 0
patch: int = 0 patch: int = 0
rtype: Optional[str] = 'F' rtype: Optional[str] = "F"
other: Any other: Any = None
@classmethod @classmethod
def from_str(cls, eos_version: str) -> EosVersion: def from_str(cls, eos_version: str) -> EosVersion:
@ -84,7 +89,7 @@ class EosVersion(BaseModel):
Returns: Returns:
EosVersion object EosVersion object
""" """
logger.debug(f'receiving version: {eos_version}') logger.debug(f"receiving version: {eos_version}")
if REGEX_EOS_VERSION.match(eos_version): if REGEX_EOS_VERSION.match(eos_version):
matches = REGEX_EOS_VERSION.match(eos_version) matches = REGEX_EOS_VERSION.match(eos_version)
# assert matches is not None # assert matches is not None
@ -95,7 +100,7 @@ class EosVersion(BaseModel):
# assert matches is not None # assert matches is not None
assert matches is not None assert matches is not None
return cls(**matches.groupdict()) return cls(**matches.groupdict())
logger.error(f'Error occured with {eos_version}') logger.error(f"Error occured with {eos_version}")
return EosVersion() return EosVersion()
@property @property
@ -106,7 +111,7 @@ class EosVersion(BaseModel):
Returns: Returns:
str: branch from version str: branch from version
""" """
return f'{self.major}.{self.minor}' return f"{self.major}.{self.minor}"
def __str__(self) -> str: def __str__(self) -> str:
""" """
@ -118,8 +123,8 @@ class EosVersion(BaseModel):
str: A standard EOS version string representing <MAJOR>.<MINOR>.<PATCH><RTYPE> str: A standard EOS version string representing <MAJOR>.<MINOR>.<PATCH><RTYPE>
""" """
if self.other is None: if self.other is None:
return f'{self.major}.{self.minor}.{self.patch}{self.rtype}' return f"{self.major}.{self.minor}.{self.patch}{self.rtype}"
return f'{self.major}.{self.minor}.{self.patch}{self.other}{self.rtype}' return f"{self.major}.{self.minor}.{self.patch}{self.other}{self.rtype}"
def _compare(self, other: EosVersion) -> float: def _compare(self, other: EosVersion) -> float:
""" """
@ -141,58 +146,68 @@ class EosVersion(BaseModel):
float: -1 if ver1 < ver2, 0 if ver1 == ver2, 1 if ver1 > ver2 float: -1 if ver1 < ver2, 0 if ver1 == ver2, 1 if ver1 > ver2
""" """
if not isinstance(other, EosVersion): if not isinstance(other, EosVersion):
raise ValueError(f'could not compare {other} as it is not an EosVersion object') raise ValueError(
f"could not compare {other} as it is not an EosVersion object"
)
comparison_flag: float = 0 comparison_flag: float = 0
logger.warning(f'current version {self.__str__()} - other {str(other)}') # pylint: disable = unnecessary-dunder-call logger.warning(
f"current version {self.__str__()} - other {str(other)}" # pylint: disable = unnecessary-dunder-call
)
for key, _ in self.dict().items(): for key, _ in self.dict().items():
if comparison_flag == 0 and self.dict()[key] is None or other.dict()[key] is None: if (
logger.debug(f'{key}: local None - remote None') comparison_flag == 0
logger.debug(f'{key}: local {self.dict()} - remote {other.dict()}') and self.dict()[key] is None
or other.dict()[key] is None
):
logger.debug(f"{key}: local None - remote None")
logger.debug(f"{key}: local {self.dict()} - remote {other.dict()}")
return comparison_flag return comparison_flag
logger.debug(f'{key}: local {self.dict()[key]} - remote {other.dict()[key]}') logger.debug(
f"{key}: local {self.dict()[key]} - remote {other.dict()[key]}"
)
if comparison_flag == 0 and self.dict()[key] < other.dict()[key]: if comparison_flag == 0 and self.dict()[key] < other.dict()[key]:
comparison_flag = -1 comparison_flag = -1
if comparison_flag == 0 and self.dict()[key] > other.dict()[key]: if comparison_flag == 0 and self.dict()[key] > other.dict()[key]:
comparison_flag = 1 comparison_flag = 1
if comparison_flag != 0: if comparison_flag != 0:
logger.info(f'comparison result is {comparison_flag}') logger.info(f"comparison result is {comparison_flag}")
return comparison_flag return comparison_flag
logger.info(f'comparison result is {comparison_flag}') logger.info(f"comparison result is {comparison_flag}")
return comparison_flag return comparison_flag
@typing.no_type_check @typing.no_type_check
def __eq__(self, other): def __eq__(self, other):
""" Implement __eq__ function (==) """ """Implement __eq__ function (==)"""
return self._compare(other) == 0 return self._compare(other) == 0
@typing.no_type_check @typing.no_type_check
def __ne__(self, other): def __ne__(self, other):
# type: ignore # type: ignore
""" Implement __nw__ function (!=) """ """Implement __nw__ function (!=)"""
return self._compare(other) != 0 return self._compare(other) != 0
@typing.no_type_check @typing.no_type_check
def __lt__(self, other): def __lt__(self, other):
# type: ignore # type: ignore
""" Implement __lt__ function (<) """ """Implement __lt__ function (<)"""
return self._compare(other) < 0 return self._compare(other) < 0
@typing.no_type_check @typing.no_type_check
def __le__(self, other): def __le__(self, other):
# type: ignore # type: ignore
""" Implement __le__ function (<=) """ """Implement __le__ function (<=)"""
return self._compare(other) <= 0 return self._compare(other) <= 0
@typing.no_type_check @typing.no_type_check
def __gt__(self, other): def __gt__(self, other):
# type: ignore # type: ignore
""" Implement __gt__ function (>) """ """Implement __gt__ function (>)"""
return self._compare(other) > 0 return self._compare(other) > 0
@typing.no_type_check @typing.no_type_check
def __ge__(self, other): def __ge__(self, other):
# type: ignore # type: ignore
""" Implement __ge__ function (>=) """ """Implement __ge__ function (>=)"""
return self._compare(other) >= 0 return self._compare(other) >= 0
def match(self, match_expr: str) -> bool: def match(self, match_expr: str) -> bool:
@ -236,7 +251,7 @@ class EosVersion(BaseModel):
"['<', '>', '==', '<=', '>=', '!=']. " "['<', '>', '==', '<=', '>=', '!=']. "
f"You provided: {match_expr}" f"You provided: {match_expr}"
) )
logger.debug(f'work on comparison {prefix} with base release {match_version}') logger.debug(f"work on comparison {prefix} with base release {match_version}")
possibilities_dict = { possibilities_dict = {
">": (1,), ">": (1,),
"<": (-1,), "<": (-1,),
@ -263,7 +278,7 @@ class EosVersion(BaseModel):
bool: True if current version is in provided branch, otherwise False bool: True if current version is in provided branch, otherwise False
""" """
try: try:
logger.debug(f'reading branch str:{branch_str}') logger.debug(f"reading branch str:{branch_str}")
branch = EosVersion.from_str(branch_str) branch = EosVersion.from_str(branch_str)
except Exception as error: # pylint: disable = broad-exception-caught except Exception as error: # pylint: disable = broad-exception-caught
logger.error(exc_to_str(error)) logger.error(exc_to_str(error))

View file

@ -8,8 +8,13 @@
eos_downloader class definition eos_downloader class definition
""" """
from __future__ import (absolute_import, division, print_function, from __future__ import (
unicode_literals, annotations) absolute_import,
annotations,
division,
print_function,
unicode_literals,
)
import base64 import base64
import glob import glob
@ -26,22 +31,35 @@ from loguru import logger
from rich import console from rich import console
from tqdm import tqdm from tqdm import tqdm
from eos_downloader import (ARISTA_DOWNLOAD_URL, ARISTA_GET_SESSION, from eos_downloader import (
ARISTA_SOFTWARE_FOLDER_TREE, EVE_QEMU_FOLDER_PATH, ARISTA_DOWNLOAD_URL,
MSG_INVALID_DATA, MSG_TOKEN_EXPIRED) ARISTA_GET_SESSION,
ARISTA_SOFTWARE_FOLDER_TREE,
EVE_QEMU_FOLDER_PATH,
MSG_INVALID_DATA,
MSG_TOKEN_EXPIRED,
)
from eos_downloader.data import DATA_MAPPING from eos_downloader.data import DATA_MAPPING
from eos_downloader.download import DownloadProgressBar from eos_downloader.download import DownloadProgressBar, REQUEST_HEADERS
# logger = logging.getLogger(__name__) # logger = logging.getLogger(__name__)
console = rich.get_console() console = rich.get_console()
class ObjectDownloader(): class ObjectDownloader:
""" """
ObjectDownloader Generic Object to download from Arista.com ObjectDownloader Generic Object to download from Arista.com
""" """
def __init__(self, image: str, version: str, token: str, software: str = 'EOS', hash_method: str = 'md5sum'):
def __init__(
self,
image: str,
version: str,
token: str,
software: str = "EOS",
hash_method: str = "md5sum",
): # pylint: disable=R0917
""" """
__init__ Class constructor __init__ Class constructor
@ -70,10 +88,10 @@ class ObjectDownloader():
self.hash_method = hash_method self.hash_method = hash_method
self.timeout = 5 self.timeout = 5
# Logging # Logging
logger.debug(f'Filename built by _build_filename is {self.filename}') logger.debug(f"Filename built by _build_filename is {self.filename}")
def __str__(self) -> str: def __str__(self) -> str:
return f'{self.software} - {self.image} - {self.version}' return f"{self.software} - {self.image} - {self.version}"
# def __repr__(self): # def __repr__(self):
# return str(self.__dict__) # return str(self.__dict__)
@ -102,16 +120,18 @@ class ObjectDownloader():
str: str:
Filename to search for on Arista.com Filename to search for on Arista.com
""" """
logger.info('start build') logger.info("start build")
if self.software in DATA_MAPPING: if self.software in DATA_MAPPING:
logger.info(f'software in data mapping: {self.software}') logger.info(f"software in data mapping: {self.software}")
if self.image in DATA_MAPPING[self.software]: if self.image in DATA_MAPPING[self.software]:
logger.info(f'image in data mapping: {self.image}') logger.info(f"image in data mapping: {self.image}")
return f"{DATA_MAPPING[self.software][self.image]['prepend']}-{self.version}{DATA_MAPPING[self.software][self.image]['extension']}" return f"{DATA_MAPPING[self.software][self.image]['prepend']}-{self.version}{DATA_MAPPING[self.software][self.image]['extension']}"
return f"{DATA_MAPPING[self.software]['default']['prepend']}-{self.version}{DATA_MAPPING[self.software]['default']['extension']}" return f"{DATA_MAPPING[self.software]['default']['prepend']}-{self.version}{DATA_MAPPING[self.software]['default']['extension']}"
raise ValueError(f'Incorrect value for software {self.software}') raise ValueError(f"Incorrect value for software {self.software}")
def _parse_xml_for_path(self, root_xml: ET.ElementTree, xpath: str, search_file: str) -> str: def _parse_xml_for_path(
self, root_xml: ET.ElementTree, xpath: str, search_file: str
) -> str:
# sourcery skip: remove-unnecessary-cast # sourcery skip: remove-unnecessary-cast
""" """
_parse_xml Read and extract data from XML using XPATH _parse_xml Read and extract data from XML using XPATH
@ -132,18 +152,18 @@ class ObjectDownloader():
str str
File Path on Arista server side File Path on Arista server side
""" """
logger.debug(f'Using xpath {xpath}') logger.debug(f"Using xpath {xpath}")
logger.debug(f'Search for file {search_file}') logger.debug(f"Search for file {search_file}")
console.print(f'🔎 Searching file {search_file}') console.print(f"🔎 Searching file {search_file}")
for node in root_xml.findall(xpath): for node in root_xml.findall(xpath):
# logger.debug('Found {}', node.text) # logger.debug('Found {}', node.text)
if str(node.text).lower() == search_file.lower(): if str(node.text).lower() == search_file.lower():
path = node.get('path') path = node.get("path")
console.print(f' -> Found file at {path}') console.print(f" -> Found file at {path}")
logger.info(f'Found {node.text} at {node.get("path")}') logger.info(f'Found {node.text} at {node.get("path")}')
return str(node.get('path')) if node.get('path') is not None else '' return str(node.get("path")) if node.get("path") is not None else ""
logger.error(f'Requested file ({self.filename}) not found !') logger.error(f"Requested file ({self.filename}) not found !")
return '' return ""
def _get_hash(self, file_path: str) -> str: def _get_hash(self, file_path: str) -> str:
""" """
@ -165,10 +185,10 @@ class ObjectDownloader():
dl_rich_progress_bar = DownloadProgressBar() dl_rich_progress_bar = DownloadProgressBar()
dl_rich_progress_bar.download(urls=[hash_url], dest_dir=file_path) dl_rich_progress_bar.download(urls=[hash_url], dest_dir=file_path)
hash_downloaded = f"{file_path}/{os.path.basename(remote_hash_file)}" hash_downloaded = f"{file_path}/{os.path.basename(remote_hash_file)}"
hash_content = 'unset' hash_content = "unset"
with open(hash_downloaded, 'r', encoding='utf-8') as f: with open(hash_downloaded, "r", encoding="utf-8") as f:
hash_content = f.read() hash_content = f.read()
return hash_content.split(' ')[0] return hash_content.split(" ")[0]
@staticmethod @staticmethod
def _compute_hash_md5sum(file: str, hash_expected: str) -> bool: def _compute_hash_md5sum(file: str, hash_expected: str) -> bool:
@ -195,7 +215,9 @@ class ObjectDownloader():
hash_md5.update(chunk) hash_md5.update(chunk)
if hash_md5.hexdigest() == hash_expected: if hash_md5.hexdigest() == hash_expected:
return True return True
logger.warning(f'Downloaded file is corrupt: local md5 ({hash_md5.hexdigest()}) is different to md5 from arista ({hash_expected})') logger.warning(
f"Downloaded file is corrupt: local md5 ({hash_md5.hexdigest()}) is different to md5 from arista ({hash_expected})"
)
return False return False
@staticmethod @staticmethod
@ -223,10 +245,12 @@ class ObjectDownloader():
hash_sha512.update(chunk) hash_sha512.update(chunk)
if hash_sha512.hexdigest() == hash_expected: if hash_sha512.hexdigest() == hash_expected:
return True return True
logger.warning(f'Downloaded file is corrupt: local sha512 ({hash_sha512.hexdigest()}) is different to sha512 from arista ({hash_expected})') logger.warning(
f"Downloaded file is corrupt: local sha512 ({hash_sha512.hexdigest()}) is different to sha512 from arista ({hash_expected})"
)
return False return False
def _get_folder_tree(self) -> ET.ElementTree: def get_folder_tree(self) -> ET.ElementTree:
""" """
_get_folder_tree Download XML tree from Arista server _get_folder_tree Download XML tree from Arista server
@ -237,15 +261,20 @@ class ObjectDownloader():
""" """
if self.session_id is None: if self.session_id is None:
self.authenticate() self.authenticate()
jsonpost = {'sessionCode': self.session_id} jsonpost = {"sessionCode": self.session_id}
result = requests.post(ARISTA_SOFTWARE_FOLDER_TREE, data=json.dumps(jsonpost), timeout=self.timeout) result = requests.post(
ARISTA_SOFTWARE_FOLDER_TREE,
data=json.dumps(jsonpost),
timeout=self.timeout,
headers=REQUEST_HEADERS,
)
try: try:
folder_tree = result.json()["data"]["xml"] folder_tree = result.json()["data"]["xml"]
return ET.ElementTree(ET.fromstring(folder_tree)) return ET.ElementTree(ET.fromstring(folder_tree))
except KeyError as error: except KeyError as error:
logger.error(MSG_INVALID_DATA) logger.error(MSG_INVALID_DATA)
logger.error(f'Server returned: {error}') logger.error(f"Server returned: {error}")
console.print(f'{MSG_INVALID_DATA}', style="bold red") console.print(f"{MSG_INVALID_DATA}", style="bold red")
sys.exit(1) sys.exit(1)
def _get_remote_filepath(self) -> str: def _get_remote_filepath(self) -> str:
@ -259,12 +288,14 @@ class ObjectDownloader():
str str
Remote path of the file to download Remote path of the file to download
""" """
root = self._get_folder_tree() root = self.get_folder_tree()
logger.debug("GET XML content from ARISTA.com") logger.debug("GET XML content from ARISTA.com")
xpath = f'.//dir[@label="{self.software}"]//file' xpath = f'.//dir[@label="{self.software}"]//file'
return self._parse_xml_for_path(root_xml=root, xpath=xpath, search_file=self.filename) return self._parse_xml_for_path(
root_xml=root, xpath=xpath, search_file=self.filename
)
def _get_remote_hashpath(self, hash_method: str = 'md5sum') -> str: def _get_remote_hashpath(self, hash_method: str = "md5sum") -> str:
""" """
_get_remote_hashpath Helper to get path of the hash's file to download _get_remote_hashpath Helper to get path of the hash's file to download
@ -275,16 +306,16 @@ class ObjectDownloader():
str str
Remote path of the hash's file to download Remote path of the hash's file to download
""" """
root = self._get_folder_tree() root = self.get_folder_tree()
logger.debug("GET XML content from ARISTA.com") logger.debug("GET XML content from ARISTA.com")
xpath = f'.//dir[@label="{self.software}"]//file' xpath = f'.//dir[@label="{self.software}"]//file'
return self._parse_xml_for_path( return self._parse_xml_for_path(
root_xml=root, root_xml=root,
xpath=xpath, xpath=xpath,
search_file=f'{self.filename}.{hash_method}', search_file=f"{self.filename}.{hash_method}",
) )
def _get_url(self, remote_file_path: str) -> str: def _get_url(self, remote_file_path: str) -> str:
""" """
_get_url Get URL to use for downloading file from Arista server _get_url Get URL to use for downloading file from Arista server
@ -302,13 +333,18 @@ class ObjectDownloader():
""" """
if self.session_id is None: if self.session_id is None:
self.authenticate() self.authenticate()
jsonpost = {'sessionCode': self.session_id, 'filePath': remote_file_path} jsonpost = {"sessionCode": self.session_id, "filePath": remote_file_path}
result = requests.post(ARISTA_DOWNLOAD_URL, data=json.dumps(jsonpost), timeout=self.timeout) result = requests.post(
if 'data' in result.json() and 'url' in result.json()['data']: ARISTA_DOWNLOAD_URL,
data=json.dumps(jsonpost),
timeout=self.timeout,
headers=REQUEST_HEADERS,
)
if "data" in result.json() and "url" in result.json()["data"]:
# logger.debug('URL to download file is: {}', result.json()) # logger.debug('URL to download file is: {}', result.json())
return result.json()["data"]["url"] return result.json()["data"]["url"]
logger.critical(f'Server returns following message: {result.json()}') logger.critical(f"Server returns following message: {result.json()}")
return '' return ""
@staticmethod @staticmethod
def _download_file_raw(url: str, file_path: str) -> str: def _download_file_raw(url: str, file_path: str) -> str:
@ -331,31 +367,40 @@ class ObjectDownloader():
""" """
chunkSize = 1024 chunkSize = 1024
r = requests.get(url, stream=True, timeout=5) r = requests.get(url, stream=True, timeout=5)
with open(file_path, 'wb') as f: with open(file_path, "wb") as f:
pbar = tqdm(unit="B", total=int(r.headers['Content-Length']), unit_scale=True, unit_divisor=1024) pbar = tqdm(
unit="B",
total=int(r.headers["Content-Length"]),
unit_scale=True,
unit_divisor=1024,
)
for chunk in r.iter_content(chunk_size=chunkSize): for chunk in r.iter_content(chunk_size=chunkSize):
if chunk: if chunk:
pbar.update(len(chunk)) pbar.update(len(chunk))
f.write(chunk) f.write(chunk)
return file_path return file_path
def _download_file(self, file_path: str, filename: str, rich_interface: bool = True) -> Union[None, str]: def _download_file(
self, file_path: str, filename: str, rich_interface: bool = True
) -> Union[None, str]:
remote_file_path = self._get_remote_filepath() remote_file_path = self._get_remote_filepath()
logger.info(f'File found on arista server: {remote_file_path}') logger.info(f"File found on arista server: {remote_file_path}")
file_url = self._get_url(remote_file_path=remote_file_path) file_url = self._get_url(remote_file_path=remote_file_path)
if file_url is not False: if file_url is not False:
if not rich_interface: if not rich_interface:
return self._download_file_raw(url=file_url, file_path=os.path.join(file_path, filename)) return self._download_file_raw(
url=file_url, file_path=os.path.join(file_path, filename)
)
rich_downloader = DownloadProgressBar() rich_downloader = DownloadProgressBar()
rich_downloader.download(urls=[file_url], dest_dir=file_path) rich_downloader.download(urls=[file_url], dest_dir=file_path)
return os.path.join(file_path, filename) return os.path.join(file_path, filename)
logger.error(f'Cannot download file {file_path}') logger.error(f"Cannot download file {file_path}")
return None return None
@staticmethod @staticmethod
def _create_destination_folder(path: str) -> None: def _create_destination_folder(path: str) -> None:
# os.makedirs(path, mode, exist_ok=True) # os.makedirs(path, mode, exist_ok=True)
os.system(f'mkdir -p {path}') os.system(f"mkdir -p {path}")
@staticmethod @staticmethod
def _disable_ztp(file_path: str) -> None: def _disable_ztp(file_path: str) -> None:
@ -379,24 +424,32 @@ class ObjectDownloader():
""" """
credentials = (base64.b64encode(self.token.encode())).decode("utf-8") credentials = (base64.b64encode(self.token.encode())).decode("utf-8")
session_code_url = ARISTA_GET_SESSION session_code_url = ARISTA_GET_SESSION
jsonpost = {'accessToken': credentials} jsonpost = {"accessToken": credentials}
result = requests.post(session_code_url, data=json.dumps(jsonpost), timeout=self.timeout) result = requests.post(
session_code_url,
data=json.dumps(jsonpost),
timeout=self.timeout,
headers=REQUEST_HEADERS,
)
if result.json()["status"]["message"] in[ 'Access token expired', 'Invalid access token']: if result.json()["status"]["message"] in [
console.print(f'{MSG_TOKEN_EXPIRED}', style="bold red") "Access token expired",
"Invalid access token",
]:
console.print(f"{MSG_TOKEN_EXPIRED}", style="bold red")
logger.error(MSG_TOKEN_EXPIRED) logger.error(MSG_TOKEN_EXPIRED)
return False return False
try: try:
if 'data' in result.json(): if "data" in result.json():
self.session_id = result.json()["data"]["session_code"] self.session_id = result.json()["data"]["session_code"]
logger.info('Authenticated on arista.com') logger.info("Authenticated on arista.com")
return True return True
logger.debug(f'{result.json()}') logger.debug(f"{result.json()}")
return False return False
except KeyError as error_arista: except KeyError as error_arista:
logger.error(f'Error: {error_arista}') logger.error(f"Error: {error_arista}")
sys.exit(1) sys.exit(1)
def download_local(self, file_path: str, checksum: bool = False) -> bool: def download_local(self, file_path: str, checksum: bool = False) -> bool:
@ -422,25 +475,33 @@ class ObjectDownloader():
bool bool
True if everything went well, False if any problem appears True if everything went well, False if any problem appears
""" """
file_downloaded = str(self._download_file(file_path=file_path, filename=self.filename)) file_downloaded = str(
self._download_file(file_path=file_path, filename=self.filename)
)
# Check file HASH # Check file HASH
hash_result = False hash_result = False
if checksum: if checksum:
logger.info('🚀 Running checksum validation') logger.info("🚀 Running checksum validation")
console.print('🚀 Running checksum validation') console.print("🚀 Running checksum validation")
if self.hash_method == 'md5sum': if self.hash_method == "md5sum":
hash_expected = self._get_hash(file_path=file_path) hash_expected = self._get_hash(file_path=file_path)
hash_result = self._compute_hash_md5sum(file=file_downloaded, hash_expected=hash_expected) hash_result = self._compute_hash_md5sum(
elif self.hash_method == 'sha512sum': file=file_downloaded, hash_expected=hash_expected
)
elif self.hash_method == "sha512sum":
hash_expected = self._get_hash(file_path=file_path) hash_expected = self._get_hash(file_path=file_path)
hash_result = self._compute_hash_sh512sum(file=file_downloaded, hash_expected=hash_expected) hash_result = self._compute_hash_sh512sum(
if not hash_result: file=file_downloaded, hash_expected=hash_expected
logger.error('Downloaded file is corrupted, please check your connection') )
console.print('❌ Downloaded file is corrupted, please check your connection') if not hash_result:
return False logger.error("Downloaded file is corrupted, please check your connection")
logger.info('Downloaded file is correct.') console.print(
console.print('✅ Downloaded file is correct.') "❌ Downloaded file is corrupted, please check your connection"
)
return False
logger.info("Downloaded file is correct.")
console.print("✅ Downloaded file is correct.")
return True return True
def provision_eve(self, noztp: bool = False, checksum: bool = True) -> None: def provision_eve(self, noztp: bool = False, checksum: bool = True) -> None:
@ -466,7 +527,7 @@ class ObjectDownloader():
# Build image name to use in folder path # Build image name to use in folder path
eos_image_name = self.filename.rstrip(".vmdk").lower() eos_image_name = self.filename.rstrip(".vmdk").lower()
if noztp: if noztp:
eos_image_name = f'{eos_image_name}-noztp' eos_image_name = f"{eos_image_name}-noztp"
# Create full path for EVE-NG # Create full path for EVE-NG
file_path = os.path.join(EVE_QEMU_FOLDER_PATH, eos_image_name.rstrip()) file_path = os.path.join(EVE_QEMU_FOLDER_PATH, eos_image_name.rstrip())
# Create folders in filesystem # Create folders in filesystem
@ -474,25 +535,30 @@ class ObjectDownloader():
# Download file to local destination # Download file to local destination
file_downloaded = self._download_file( file_downloaded = self._download_file(
file_path=file_path, filename=self.filename) file_path=file_path, filename=self.filename
)
# Convert to QCOW2 format # Convert to QCOW2 format
file_qcow2 = os.path.join(file_path, "hda.qcow2") file_qcow2 = os.path.join(file_path, "hda.qcow2")
logger.info('Converting VMDK to QCOW2 format') logger.info("Converting VMDK to QCOW2 format")
console.print('🚀 Converting VMDK to QCOW2 format...') console.print("🚀 Converting VMDK to QCOW2 format...")
os.system(f'$(which qemu-img) convert -f vmdk -O qcow2 {file_downloaded} {file_qcow2}') os.system(
f"$(which qemu-img) convert -f vmdk -O qcow2 {file_downloaded} {file_qcow2}"
)
logger.info('Applying unl_wrapper to fix permissions') logger.info("Applying unl_wrapper to fix permissions")
console.print('Applying unl_wrapper to fix permissions') console.print("Applying unl_wrapper to fix permissions")
os.system('/opt/unetlab/wrappers/unl_wrapper -a fixpermissions') os.system("/opt/unetlab/wrappers/unl_wrapper -a fixpermissions")
os.system(f'rm -f {file_downloaded}') os.system(f"rm -f {file_downloaded}")
if noztp: if noztp:
self._disable_ztp(file_path=file_path) self._disable_ztp(file_path=file_path)
def docker_import(self, image_name: str = "arista/ceos") -> None: def docker_import(
self, image_name: str = "arista/ceos", is_latest: bool = False
) -> None:
""" """
Import docker container to your docker server. Import docker container to your docker server.
@ -502,12 +568,15 @@ class ObjectDownloader():
version (str): version (str):
image_name (str, optional): Image name to use. Defaults to "arista/ceos". image_name (str, optional): Image name to use. Defaults to "arista/ceos".
""" """
docker_image = f'{image_name}:{self.version}' docker_image = f"{image_name}:{self.version}"
logger.info(f'Importing image {self.filename} to {docker_image}') logger.info(f"Importing image {self.filename} to {docker_image}")
console.print(f'🚀 Importing image {self.filename} to {docker_image}') console.print(f"🚀 Importing image {self.filename} to {docker_image}")
os.system(f'$(which docker) import {self.filename} {docker_image}') os.system(f"$(which docker) import {self.filename} {docker_image}")
for filename in glob.glob(f'{self.filename}*'): if is_latest:
console.print(f"🚀 Configuring {docker_image}:{self.version} to be latest")
os.system(f"$(which docker) tag {docker_image} {image_name}:latest")
for filename in glob.glob(f"{self.filename}*"):
try: try:
os.remove(filename) os.remove(filename)
except FileNotFoundError: except FileNotFoundError:
console.print(f'File not found: {filename}') console.print(f"File not found: {filename}")

View file

@ -2,7 +2,9 @@
disable= disable=
invalid-name, invalid-name,
logging-fstring-interpolation, logging-fstring-interpolation,
fixme fixme,
line-too-long,
too-many-arguments
[BASIC] [BASIC]
good-names=runCmds, i, y, t, c, x, e, fd, ip, v good-names=runCmds, i, y, t, c, x, e, fd, ip, v

View file

@ -6,7 +6,7 @@ build-backend = "setuptools.build_meta"
[project] [project]
name = "eos_downloader" name = "eos_downloader"
version = "v0.8.1-dev1" version = "v0.10.3"
readme = "README.md" readme = "README.md"
authors = [{ name = "Thomas Grimonet", email = "thomas.grimonet@gmail.com" }] authors = [{ name = "Thomas Grimonet", email = "thomas.grimonet@gmail.com" }]
maintainers = [ maintainers = [
@ -15,61 +15,73 @@ maintainers = [
description = "Arista EOS/CVP downloader script" description = "Arista EOS/CVP downloader script"
license = { file = "LICENSE" } license = { file = "LICENSE" }
dependencies = [ dependencies = [
"cryptography", "cryptography",
"paramiko", "paramiko",
"requests>=2.20.0", "requests>=2.20.0",
"requests-toolbelt", "requests-toolbelt",
"scp", "scp",
"tqdm", "tqdm",
"loguru", "loguru",
"rich==12.0.1", "rich>=13.5.2",
"cvprac>=1.0.7", "cvprac>=1.0.7",
"click==8.1.3", "click>=8.1.6",
"click-help-colors==0.9.1", "click-help-colors>=0.9",
"pydantic==1.10.4", "pydantic>2.0.0",
"urllib3>=2.2.2",
]
keywords = [
"eos_downloader",
"Arista",
"eos",
"cvp",
"network",
"automation",
"networking",
"devops",
"netdevops",
] ]
keywords = ["eos_downloader", "Arista", "eos", "cvp", "network", "automation", "networking", "devops", "netdevops"]
classifiers = [ classifiers = [
'Development Status :: 4 - Beta', 'Development Status :: 4 - Beta',
'Intended Audience :: Developers', 'Intended Audience :: Developers',
'Intended Audience :: System Administrators', 'Intended Audience :: System Administrators',
'Intended Audience :: Information Technology', 'Intended Audience :: Information Technology',
'Topic :: System :: Software Distribution', 'Topic :: System :: Software Distribution',
'Topic :: Terminals', 'Topic :: Terminals',
'Topic :: Utilities', 'Topic :: Utilities',
'License :: OSI Approved :: Apache Software License', 'License :: OSI Approved :: Apache Software License',
'Operating System :: OS Independent', 'Operating System :: OS Independent',
'Programming Language :: Python', 'Programming Language :: Python',
'Programming Language :: Python :: 3', 'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.8', 'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9', 'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10', 'Programming Language :: Python :: 3.10',
'Programming Language :: Python :: 3 :: Only', 'Programming Language :: Python :: 3 :: Only',
'Programming Language :: Python :: Implementation :: PyPy', 'Programming Language :: Python :: Implementation :: PyPy',
] ]
requires-python = ">=3.8" requires-python = ">=3.8"
[project.optional-dependencies] [project.optional-dependencies]
dev = [ dev = [
"isort==5.12.0", "mypy==1.13.0",
"mypy==0.991", "isort==5.13.2",
"mypy-extensions>=0.4.3", "mypy-extensions>=0.4.3",
"pre-commit>=2.20.0", "pre-commit>=2.20.0",
"pylint", "pylint",
"pytest>=7.1.2", "pytest>=7.1.2",
"pytest-cov>=2.11.1", "pytest-cov>=2.11.1",
"pytest-dependency", "pytest-dependency",
"pytest-html>=3.1.1", "pytest-html>=3.1.1",
"pytest-metadata>=1.11.0", "pytest-metadata>=1.11.0",
"pylint-pydantic>=0.1.4", "pylint-pydantic>=0.2.4",
"tox==4.0.11", "tox>=4.11",
"types-PyYAML", "types-PyYAML",
"types-paramiko", "types-paramiko",
"types-requests", "types-requests",
"typing-extensions", "typing-extensions",
"yamllint", "yamllint",
"flake8==4.0.1", "flake8>=4.0.1",
"pyflakes==2.4.0" "pyflakes>=2.4.0",
"bumpver>=2023.1126",
] ]
[project.urls] [project.urls]
@ -81,15 +93,32 @@ Contributing = "https://www.github.com/titom73/eos-downloader"
ardl = "eos_downloader.cli.cli:cli" ardl = "eos_downloader.cli.cli:cli"
lard = "eos_downloader.cli.cli:cli" lard = "eos_downloader.cli.cli:cli"
################################
# Tools
################################
[tool.setuptools.packages.find] [tool.setuptools.packages.find]
include = ["eos_downloader*"] include = ["eos_downloader*"]
namespaces = false namespaces = false
################################
# Version
################################
[tool.bumpver]
current_version = "0.10.3"
version_pattern = "MAJOR.MINOR.PATCH"
commit_message = "bump: Version {old_version} -> {new_version}"
commit = true
# No tag
tag = false
push = false
[tool.bumpver.file_patterns]
"pyproject.toml" = ['current_version = "{version}"', 'version = "v{version}"']
# mypy as per https://pydantic-docs.helpmanual.io/mypy_plugin/#enabling-the-plugin # mypy as per https://pydantic-docs.helpmanual.io/mypy_plugin/#enabling-the-plugin
[tool.mypy] [tool.mypy]
plugins = [ plugins = ["pydantic.mypy"]
"pydantic.mypy",
]
follow_imports = "skip" follow_imports = "skip"
ignore_missing_imports = true ignore_missing_imports = true
warn_redundant_casts = true warn_redundant_casts = true
@ -120,8 +149,8 @@ min_version = 4.0
envlist = envlist =
clean, clean,
lint, lint,
type type,
py{38,39,310}
[tox-full] [tox-full]
min_version = 4.0 min_version = 4.0
@ -132,13 +161,13 @@ envlist =
type, type,
report report
[gh-actions] [gh-actions-base]
python = python =
3.8: lint, type 3.8: lint, type
3.9: lint, type 3.9: lint, type
3.10: lint, type 3.10: lint, type
[gh-actions-full] [gh-actions]
python = python =
3.8: py38 3.8: py38
3.9: py39 3.9: py39
@ -154,7 +183,7 @@ commands =
description = check the code style description = check the code style
commands = commands =
flake8 --max-line-length=165 --config=/dev/null eos_downloader flake8 --max-line-length=165 --config=/dev/null eos_downloader
pylint eos_downloader pylint --rcfile=pylintrc eos_downloader
[testenv:type] [testenv:type]
description = check typing description = check typing

View file

@ -5,12 +5,13 @@
# flake8: noqa: W503 # flake8: noqa: W503
# flake8: noqa: W1202 # flake8: noqa: W1202
from __future__ import (absolute_import, division, print_function) from __future__ import absolute_import, division, print_function
import os
import eos_downloader
from eos_downloader.eos import EOSDownloader
from eos_downloader.data import DATA_MAPPING
import os
import eos_downloader
from eos_downloader.data import DATA_MAPPING
from eos_downloader.eos import EOSDownloader
# --------------------------------------------------------------- # # --------------------------------------------------------------- #
# MOOCK data to use for testing # MOOCK data to use for testing
@ -18,99 +19,99 @@ from eos_downloader.data import DATA_MAPPING
# Get Auth token # Get Auth token
# eos_token = os.getenv('ARISTA_TOKEN') # eos_token = os.getenv('ARISTA_TOKEN')
eos_token = os.getenv('ARISTA_TOKEN', 'invalid_token') eos_token = os.getenv("ARISTA_TOKEN", "invalid_token")
eos_token_invalid = 'invalid_token' eos_token_invalid = "invalid_token"
eos_dataset_valid = [ eos_dataset_valid = [
{ {
'image': 'EOS', "image": "EOS",
'version': '4.26.3M', "version": "4.26.3M",
'software': 'EOS', "software": "EOS",
'filename': 'EOS-4.26.3M.swi', "filename": "EOS-4.26.3M.swi",
'expected_hash': 'sha512sum', "expected_hash": "sha512sum",
'remote_path': '/support/download/EOS-USA/Active Releases/4.26/EOS-4.26.3M/EOS-4.26.3M.swi', "remote_path": "/support/download/EOS-USA/Active Releases/4.26/EOS-4.26.3M/EOS-4.26.3M.swi",
'compute_checksum': True "compute_checksum": True,
}, },
{ {
'image': 'EOS', "image": "EOS",
'version': '4.25.6M', "version": "4.25.6M",
'software': 'EOS', "software": "EOS",
'filename': 'EOS-4.25.6M.swi', "filename": "EOS-4.25.6M.swi",
'expected_hash': 'md5sum', "expected_hash": "md5sum",
'remote_path': '/support/download/EOS-USA/Active Releases/4.25/EOS-4.25.6M/EOS-4.25.6M.swi', "remote_path": "/support/download/EOS-USA/Active Releases/4.25/EOS-4.25.6M/EOS-4.25.6M.swi",
'compute_checksum': True "compute_checksum": True,
}, },
{ {
'image': 'vEOS-lab', "image": "vEOS-lab",
'version': '4.25.6M', "version": "4.25.6M",
'software': 'EOS', "software": "EOS",
'filename': 'vEOS-lab-4.25.6M.vmdk', "filename": "vEOS-lab-4.25.6M.vmdk",
'expected_hash': 'md5sum', "expected_hash": "md5sum",
'remote_path': '/support/download/EOS-USA/Active Releases/4.25/EOS-4.25.6M/vEOS-lab/vEOS-lab-4.25.6M.vmdk', "remote_path": "/support/download/EOS-USA/Active Releases/4.25/EOS-4.25.6M/vEOS-lab/vEOS-lab-4.25.6M.vmdk",
'compute_checksum': False "compute_checksum": False,
} },
] ]
eos_dataset_invalid = [ eos_dataset_invalid = [
{ {
'image': 'default', "image": "default",
'version': '4.26.3M', "version": "4.26.3M",
'software': 'EOS', "software": "EOS",
'filename': 'EOS-4.26.3M.swi', "filename": "EOS-4.26.3M.swi",
'expected_hash': 'sha512sum', "expected_hash": "sha512sum",
'remote_path': '/support/download/EOS-USA/Active Releases/4.26/EOS-4.26.3M/EOS-4.26.3M.swi', "remote_path": "/support/download/EOS-USA/Active Releases/4.26/EOS-4.26.3M/EOS-4.26.3M.swi",
'compute_checksum': True "compute_checksum": True,
} }
] ]
eos_version = [ eos_version = [
{ {
'version': 'EOS-4.23.1F', "version": "EOS-4.23.1F",
'is_valid': True, "is_valid": True,
'major': 4, "major": 4,
'minor': 23, "minor": 23,
'patch': 1, "patch": 1,
'rtype': 'F' "rtype": "F",
}, },
{ {
'version': 'EOS-4.23.0', "version": "EOS-4.23.0",
'is_valid': True, "is_valid": True,
'major': 4, "major": 4,
'minor': 23, "minor": 23,
'patch': 0, "patch": 0,
'rtype': None "rtype": None,
}, },
{ {
'version': 'EOS-4.23', "version": "EOS-4.23",
'is_valid': True, "is_valid": True,
'major': 4, "major": 4,
'minor': 23, "minor": 23,
'patch': 0, "patch": 0,
'rtype': None "rtype": None,
}, },
{ {
'version': 'EOS-4.23.1M', "version": "EOS-4.23.1M",
'is_valid': True, "is_valid": True,
'major': 4, "major": 4,
'minor': 23, "minor": 23,
'patch': 1, "patch": 1,
'rtype': 'M' "rtype": "M",
}, },
{ {
'version': 'EOS-4.23.1.F', "version": "EOS-4.23.1.F",
'is_valid': True, "is_valid": True,
'major': 4, "major": 4,
'minor': 23, "minor": 23,
'patch': 1, "patch": 1,
'rtype': 'F' "rtype": "F",
}, },
{ {
'version': 'EOS-5.23.1F', "version": "EOS-5.23.1F",
'is_valid': False, "is_valid": False,
'major': 4, "major": 4,
'minor': 23, "minor": 23,
'patch': 1, "patch": 1,
'rtype': 'F' "rtype": "F",
}, },
] ]

View file

@ -5,13 +5,20 @@
# flake8: noqa: W503 # flake8: noqa: W503
# flake8: noqa: W1202 # flake8: noqa: W1202
from __future__ import (absolute_import, division, print_function) from __future__ import absolute_import, division, print_function
import os
import pytest
import eos_downloader
from typing import Dict, Any, List
from tests.lib.dataset import eos_dataset_valid, eos_dataset_invalid, eos_token, eos_token_invalid
import os
from typing import Any, Dict, List
import pytest
import eos_downloader
from tests.lib.dataset import (
eos_dataset_invalid,
eos_dataset_valid,
eos_token,
eos_token_invalid,
)
@pytest.fixture @pytest.fixture
@ -19,17 +26,18 @@ from tests.lib.dataset import eos_dataset_valid, eos_dataset_invalid, eos_token,
def create_download_instance(request, DOWNLOAD_INFO): def create_download_instance(request, DOWNLOAD_INFO):
# logger.info("Execute fixture to create class elements") # logger.info("Execute fixture to create class elements")
request.cls.eos_downloader = eos_downloader.eos.EOSDownloader( request.cls.eos_downloader = eos_downloader.eos.EOSDownloader(
image=DOWNLOAD_INFO['image'], image=DOWNLOAD_INFO["image"],
software=DOWNLOAD_INFO['software'], software=DOWNLOAD_INFO["software"],
version=DOWNLOAD_INFO['version'], version=DOWNLOAD_INFO["version"],
token=eos_token, token=eos_token,
hash_method='sha512sum') hash_method="sha512sum",
)
yield yield
# logger.info('Cleanup test environment') # logger.info('Cleanup test environment')
os.system('rm -f {}*'.format(DOWNLOAD_INFO['filename'])) os.system("rm -f {}*".format(DOWNLOAD_INFO["filename"]))
def generate_test_ids_dict(val: Dict[str, Any], key: str = 'name') -> str: def generate_test_ids_dict(val: Dict[str, Any], key: str = "name") -> str:
""" """
generate_test_ids Helper to generate test ID for parametrize generate_test_ids Helper to generate test ID for parametrize
@ -50,7 +58,8 @@ def generate_test_ids_dict(val: Dict[str, Any], key: str = 'name') -> str:
return val[key] return val[key]
return "undefined_test" return "undefined_test"
def generate_test_ids_list(val: List[Dict[str, Any]], key: str = 'name') -> str:
def generate_test_ids_list(val: List[Dict[str, Any]], key: str = "name") -> str:
""" """
generate_test_ids Helper to generate test ID for parametrize generate_test_ids Helper to generate test ID for parametrize
@ -66,4 +75,4 @@ def generate_test_ids_list(val: List[Dict[str, Any]], key: str = 'name') -> str:
str str
Name of the configlet Name of the configlet
""" """
return [ entry[key] if key in entry.keys() else 'unset_entry' for entry in val ] return [entry[key] if key in entry.keys() else "unset_entry" for entry in val]

View file

@ -5,14 +5,13 @@
# flake8: noqa: W503 # flake8: noqa: W503
# flake8: noqa: W1202 # flake8: noqa: W1202
from __future__ import (absolute_import, division, print_function) from __future__ import absolute_import, division, print_function
import os import os
from eos_downloader.data import DATA_MAPPING from eos_downloader.data import DATA_MAPPING
def default_filename(version: str, info): def default_filename(version: str, info):
""" """
default_filename Helper to build default filename default_filename Helper to build default filename
@ -31,10 +30,14 @@ def default_filename(version: str, info):
""" """
if version is None or info is None: if version is None or info is None:
return None return None
return DATA_MAPPING[info['software']]['default']['prepend'] + '-' + version + '.swi' return DATA_MAPPING[info["software"]]["default"]["prepend"] + "-" + version + ".swi"
def is_on_github_actions(): def is_on_github_actions():
"""Check if code is running on a CI runner""" """Check if code is running on a CI runner"""
if "CI" not in os.environ or not os.environ["CI"] or "GITHUB_RUN_ID" not in os.environ: if (
"CI" not in os.environ
or not os.environ["CI"]
or "GITHUB_RUN_ID" not in os.environ
):
return False return False

View file

@ -45,4 +45,3 @@ class TestEosDownload_valid():
@pytest.mark.eos_download @pytest.mark.eos_download
def test_download_local(self, DOWNLOAD_INFO): def test_download_local(self, DOWNLOAD_INFO):
self.eos_downloader.download_local(file_path='.', checksum=DOWNLOAD_INFO['compute_checksum']) self.eos_downloader.download_local(file_path='.', checksum=DOWNLOAD_INFO['compute_checksum'])

View file

@ -5,126 +5,166 @@
# flake8: noqa: W503 # flake8: noqa: W503
# flake8: noqa: W1202 # flake8: noqa: W1202
from __future__ import (absolute_import, division, print_function) from __future__ import absolute_import, division, print_function
import sys import sys
from loguru import logger
import pytest import pytest
from eos_downloader.models.version import EosVersion, BASE_VERSION_STR from loguru import logger
from eos_downloader.models.version import BASE_VERSION_STR, EosVersion
from tests.lib.dataset import eos_version from tests.lib.dataset import eos_version
from tests.lib.fixtures import generate_test_ids_list from tests.lib.fixtures import generate_test_ids_list
logger.remove() logger.remove()
logger.add(sys.stderr, level="DEBUG") logger.add(sys.stderr, level="DEBUG")
@pytest.mark.parametrize("EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version,key='version'))
@pytest.mark.parametrize(
"EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version, key="version")
)
def test_eos_version_from_str(EOS_VERSION): def test_eos_version_from_str(EOS_VERSION):
version = EosVersion.from_str(EOS_VERSION['version']) version = EosVersion.from_str(EOS_VERSION["version"])
if EOS_VERSION['is_valid']: if EOS_VERSION["is_valid"]:
assert version.major == EOS_VERSION['major'] assert version.major == EOS_VERSION["major"]
assert version.minor == EOS_VERSION['minor'] assert version.minor == EOS_VERSION["minor"]
assert version.patch == EOS_VERSION['patch'] assert version.patch == EOS_VERSION["patch"]
assert version.rtype == EOS_VERSION['rtype'] assert version.rtype == EOS_VERSION["rtype"]
else: else:
assert str(version) == BASE_VERSION_STR assert str(version) == BASE_VERSION_STR
@pytest.mark.parametrize("EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version,key='version')) @pytest.mark.parametrize(
"EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version, key="version")
)
def test_eos_version_to_str(EOS_VERSION): def test_eos_version_to_str(EOS_VERSION):
version = EosVersion(**EOS_VERSION) version = EosVersion(**EOS_VERSION)
if EOS_VERSION['is_valid']: if EOS_VERSION["is_valid"]:
assert version.major == EOS_VERSION['major'] assert version.major == EOS_VERSION["major"]
assert version.minor == EOS_VERSION['minor'] assert version.minor == EOS_VERSION["minor"]
assert version.patch == EOS_VERSION['patch'] assert version.patch == EOS_VERSION["patch"]
assert version.rtype == EOS_VERSION['rtype'] assert version.rtype == EOS_VERSION["rtype"]
@pytest.mark.parametrize("EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version,key='version'))
@pytest.mark.parametrize(
"EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version, key="version")
)
def test_eos_version_branch(EOS_VERSION): def test_eos_version_branch(EOS_VERSION):
if EOS_VERSION['is_valid']: if EOS_VERSION["is_valid"]:
version = EosVersion(**EOS_VERSION) version = EosVersion(**EOS_VERSION)
assert version.branch == f'{EOS_VERSION["major"]}.{EOS_VERSION["minor"]}' assert version.branch == f'{EOS_VERSION["major"]}.{EOS_VERSION["minor"]}'
@pytest.mark.parametrize("EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version,key='version'))
@pytest.mark.parametrize(
"EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version, key="version")
)
def test_eos_version_eq_operator(EOS_VERSION): def test_eos_version_eq_operator(EOS_VERSION):
if not EOS_VERSION['is_valid']: if not EOS_VERSION["is_valid"]:
pytest.skip('not a valid version to test') pytest.skip("not a valid version to test")
version = EosVersion(**EOS_VERSION) version = EosVersion(**EOS_VERSION)
logger.warning(f'version is: {version.dict()}') logger.warning(f"version is: {version.dict()}")
assert version == version assert version == version
@pytest.mark.parametrize("EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version,key='version'))
@pytest.mark.parametrize(
"EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version, key="version")
)
def test_eos_version_ge_operator(EOS_VERSION): def test_eos_version_ge_operator(EOS_VERSION):
if not EOS_VERSION['is_valid']: if not EOS_VERSION["is_valid"]:
pytest.skip('not a valid version to test') pytest.skip("not a valid version to test")
version = EosVersion(**EOS_VERSION) version = EosVersion(**EOS_VERSION)
version_b = EosVersion.from_str(BASE_VERSION_STR) version_b = EosVersion.from_str(BASE_VERSION_STR)
assert version >= version_b assert version >= version_b
@pytest.mark.parametrize("EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version,key='version'))
@pytest.mark.parametrize(
"EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version, key="version")
)
def test_eos_version_gs_operator(EOS_VERSION): def test_eos_version_gs_operator(EOS_VERSION):
if not EOS_VERSION['is_valid']: if not EOS_VERSION["is_valid"]:
pytest.skip('not a valid version to test') pytest.skip("not a valid version to test")
version = EosVersion(**EOS_VERSION) version = EosVersion(**EOS_VERSION)
version_b = EosVersion.from_str(BASE_VERSION_STR) version_b = EosVersion.from_str(BASE_VERSION_STR)
assert version > version_b assert version > version_b
@pytest.mark.parametrize("EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version,key='version'))
@pytest.mark.parametrize(
"EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version, key="version")
)
def test_eos_version_le_operator(EOS_VERSION): def test_eos_version_le_operator(EOS_VERSION):
if not EOS_VERSION['is_valid']: if not EOS_VERSION["is_valid"]:
pytest.skip('not a valid version to test') pytest.skip("not a valid version to test")
version = EosVersion(**EOS_VERSION) version = EosVersion(**EOS_VERSION)
version_b = EosVersion.from_str(BASE_VERSION_STR) version_b = EosVersion.from_str(BASE_VERSION_STR)
assert version_b <= version assert version_b <= version
@pytest.mark.parametrize("EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version,key='version'))
@pytest.mark.parametrize(
"EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version, key="version")
)
def test_eos_version_ls_operator(EOS_VERSION): def test_eos_version_ls_operator(EOS_VERSION):
if not EOS_VERSION['is_valid']: if not EOS_VERSION["is_valid"]:
pytest.skip('not a valid version to test') pytest.skip("not a valid version to test")
version = EosVersion(**EOS_VERSION) version = EosVersion(**EOS_VERSION)
version_b = EosVersion.from_str(BASE_VERSION_STR) version_b = EosVersion.from_str(BASE_VERSION_STR)
assert version_b < version assert version_b < version
@pytest.mark.parametrize("EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version,key='version'))
@pytest.mark.parametrize(
"EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version, key="version")
)
def test_eos_version_ne_operator(EOS_VERSION): def test_eos_version_ne_operator(EOS_VERSION):
if not EOS_VERSION['is_valid']: if not EOS_VERSION["is_valid"]:
pytest.skip('not a valid version to test') pytest.skip("not a valid version to test")
version = EosVersion(**EOS_VERSION) version = EosVersion(**EOS_VERSION)
version_b = EosVersion.from_str(BASE_VERSION_STR) version_b = EosVersion.from_str(BASE_VERSION_STR)
assert version_b != version assert version_b != version
@pytest.mark.parametrize("EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version,key='version'))
@pytest.mark.parametrize(
"EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version, key="version")
)
def test_eos_version_match(EOS_VERSION): def test_eos_version_match(EOS_VERSION):
if not EOS_VERSION['is_valid']: if not EOS_VERSION["is_valid"]:
pytest.skip('not a valid version to test') pytest.skip("not a valid version to test")
version = EosVersion(**EOS_VERSION) version = EosVersion(**EOS_VERSION)
assert version.match(f'=={EOS_VERSION["version"]}') assert version.match(f'=={EOS_VERSION["version"]}')
assert version.match(f'!={BASE_VERSION_STR}') assert version.match(f"!={BASE_VERSION_STR}")
assert version.match(f'>={BASE_VERSION_STR}') assert version.match(f">={BASE_VERSION_STR}")
assert version.match(f'>{BASE_VERSION_STR}') assert version.match(f">{BASE_VERSION_STR}")
assert version.match('<=4.99.0F') assert version.match("<=4.99.0F")
assert version.match('<4.99.0F') assert version.match("<4.99.0F")
@pytest.mark.parametrize("EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version,key='version'))
@pytest.mark.parametrize(
"EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version, key="version")
)
def test_eos_version_is_in_branch(EOS_VERSION): def test_eos_version_is_in_branch(EOS_VERSION):
if not EOS_VERSION['is_valid']: if not EOS_VERSION["is_valid"]:
pytest.skip('not a valid version to test') pytest.skip("not a valid version to test")
version = EosVersion(**EOS_VERSION) version = EosVersion(**EOS_VERSION)
assert version.is_in_branch(f"{EOS_VERSION['major']}.{EOS_VERSION['minor']}") assert version.is_in_branch(f"{EOS_VERSION['major']}.{EOS_VERSION['minor']}")
@pytest.mark.parametrize("EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version,key='version'))
@pytest.mark.parametrize(
"EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version, key="version")
)
def test_eos_version_match_exception(EOS_VERSION): def test_eos_version_match_exception(EOS_VERSION):
if not EOS_VERSION['is_valid']: if not EOS_VERSION["is_valid"]:
pytest.skip('not a valid version to test') pytest.skip("not a valid version to test")
with pytest.raises(Exception) as e_info: with pytest.raises(Exception) as e_info:
version = EosVersion(**EOS_VERSION) version = EosVersion(**EOS_VERSION)
assert version.match(f'+={EOS_VERSION["version"]}') assert version.match(f'+={EOS_VERSION["version"]}')
logger.info(f'receive exception: {e_info}') logger.info(f"receive exception: {e_info}")
@pytest.mark.parametrize("EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version,key='version'))
@pytest.mark.parametrize(
"EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version, key="version")
)
def test_eos_version_compare_exception(EOS_VERSION): def test_eos_version_compare_exception(EOS_VERSION):
if not EOS_VERSION['is_valid']: if not EOS_VERSION["is_valid"]:
pytest.skip('not a valid version to test') pytest.skip("not a valid version to test")
with pytest.raises(Exception) as e_info: with pytest.raises(Exception) as e_info:
version = EosVersion(**EOS_VERSION) version = EosVersion(**EOS_VERSION)
version._compare(BASE_VERSION_STR) version._compare(BASE_VERSION_STR)
logger.info(f'receive exception: {e_info}') logger.info(f"receive exception: {e_info}")

View file

@ -14,30 +14,41 @@ from loguru import logger
import eos_downloader import eos_downloader
from eos_downloader.data import DATA_MAPPING from eos_downloader.data import DATA_MAPPING
from eos_downloader.eos import EOSDownloader from eos_downloader.eos import EOSDownloader
from tests.lib.dataset import eos_dataset_invalid, eos_dataset_valid, eos_token, eos_token_invalid from tests.lib.dataset import (
eos_dataset_invalid,
eos_dataset_valid,
eos_token,
eos_token_invalid,
)
from tests.lib.fixtures import create_download_instance from tests.lib.fixtures import create_download_instance
from tests.lib.helpers import default_filename, is_on_github_actions from tests.lib.helpers import default_filename, is_on_github_actions
logger.remove() logger.remove()
logger.add(sys.stderr, level="DEBUG") logger.add(sys.stderr, level="DEBUG")
@pytest.mark.usefixtures("create_download_instance") @pytest.mark.usefixtures("create_download_instance")
@pytest.mark.parametrize("DOWNLOAD_INFO", eos_dataset_valid, ids=['EOS-sha512', 'EOS-md5' ,'vEOS-lab-no-hash']) @pytest.mark.parametrize(
"DOWNLOAD_INFO",
eos_dataset_valid,
ids=["EOS-sha512", "EOS-md5", "vEOS-lab-no-hash"],
)
@pytest.mark.eos_download @pytest.mark.eos_download
class TestEosDownload_valid(): class TestEosDownload_valid:
def test_data(self, DOWNLOAD_INFO): def test_data(self, DOWNLOAD_INFO):
logger.info(f'test input: {DOWNLOAD_INFO}') logger.info(f"test input: {DOWNLOAD_INFO}")
logger.info(f'test build: {self.eos_downloader.__dict__}') logger.info(f"test build: {self.eos_downloader.__dict__}")
def test_eos_download_create(self, DOWNLOAD_INFO): def test_eos_download_create(self, DOWNLOAD_INFO):
my_download = eos_downloader.eos.EOSDownloader( my_download = eos_downloader.eos.EOSDownloader(
image=DOWNLOAD_INFO['image'], image=DOWNLOAD_INFO["image"],
software=DOWNLOAD_INFO['software'], software=DOWNLOAD_INFO["software"],
version=DOWNLOAD_INFO['version'], version=DOWNLOAD_INFO["version"],
token=eos_token, token=eos_token,
hash_method='sha512sum') hash_method="sha512sum",
)
logger.info(my_download) logger.info(my_download)
assert isinstance(my_download, eos_downloader.eos.EOSDownloader) assert isinstance(my_download, eos_downloader.eos.EOSDownloader)
def test_eos_download_repr_string(self, DOWNLOAD_INFO): def test_eos_download_repr_string(self, DOWNLOAD_INFO):
expected = f"{DOWNLOAD_INFO['software']} - {DOWNLOAD_INFO['image']} - {DOWNLOAD_INFO['version']}" expected = f"{DOWNLOAD_INFO['software']} - {DOWNLOAD_INFO['image']} - {DOWNLOAD_INFO['version']}"
@ -45,47 +56,56 @@ class TestEosDownload_valid():
assert str(self.eos_downloader) == expected assert str(self.eos_downloader) == expected
def test_eos_download_build_filename(self, DOWNLOAD_INFO): def test_eos_download_build_filename(self, DOWNLOAD_INFO):
assert self.eos_downloader._build_filename() == DOWNLOAD_INFO['filename'] assert self.eos_downloader._build_filename() == DOWNLOAD_INFO["filename"]
@pytest.mark.dependency(name='authentication') @pytest.mark.dependency(name="authentication")
@pytest.mark.skipif(eos_token == eos_token_invalid, reason="Token is not set correctly") @pytest.mark.skipif(
eos_token == eos_token_invalid, reason="Token is not set correctly"
)
@pytest.mark.skipif(is_on_github_actions(), reason="Running on Github Runner") @pytest.mark.skipif(is_on_github_actions(), reason="Running on Github Runner")
# @pytest.mark.xfail(reason="Deliberate - CI not set for testing AUTH") # @pytest.mark.xfail(reason="Deliberate - CI not set for testing AUTH")
@pytest.mark.webtest @pytest.mark.webtest
def test_eos_download_authenticate(self): def test_eos_download_authenticate(self):
assert self.eos_downloader.authenticate() is True assert self.eos_downloader.authenticate() is True
@pytest.mark.dependency(depends=["authentication"], scope='class') @pytest.mark.dependency(depends=["authentication"], scope="class")
@pytest.mark.webtest @pytest.mark.webtest
def test_eos_download_get_remote_file_path(self, DOWNLOAD_INFO): def test_eos_download_get_remote_file_path(self, DOWNLOAD_INFO):
assert self.eos_downloader._get_remote_filepath() == DOWNLOAD_INFO['remote_path'] assert (
self.eos_downloader._get_remote_filepath() == DOWNLOAD_INFO["remote_path"]
)
@pytest.mark.dependency(depends=["authentication"], scope='class') @pytest.mark.dependency(depends=["authentication"], scope="class")
@pytest.mark.webtest @pytest.mark.webtest
def test_eos_download_get_file_url(self, DOWNLOAD_INFO): def test_eos_download_get_file_url(self, DOWNLOAD_INFO):
url = self.eos_downloader._get_url(remote_file_path = DOWNLOAD_INFO['remote_path']) url = self.eos_downloader._get_url(
remote_file_path=DOWNLOAD_INFO["remote_path"]
)
logger.info(url) logger.info(url)
assert 'https://downloads.arista.com/EOS-USA/Active%20Releases/' in url assert "https://downloads.arista.com/EOS-USA/Active%20Releases/" in url
@pytest.mark.usefixtures("create_download_instance") @pytest.mark.usefixtures("create_download_instance")
@pytest.mark.parametrize("DOWNLOAD_INFO", eos_dataset_invalid, ids=['EOS-FAKE']) @pytest.mark.parametrize("DOWNLOAD_INFO", eos_dataset_invalid, ids=["EOS-FAKE"])
class TestEosDownload_invalid(): class TestEosDownload_invalid:
def test_data(self, DOWNLOAD_INFO): def test_data(self, DOWNLOAD_INFO):
logger.info(f'test input: {dict(DOWNLOAD_INFO)}') logger.info(f"test input: {dict(DOWNLOAD_INFO)}")
logger.info(f'test build: {self.eos_downloader.__dict__}') logger.info(f"test build: {self.eos_downloader.__dict__}")
def test_eos_download_login_error(self, DOWNLOAD_INFO): def test_eos_download_login_error(self, DOWNLOAD_INFO):
my_download = eos_downloader.eos.EOSDownloader( my_download = eos_downloader.eos.EOSDownloader(
image=DOWNLOAD_INFO['image'], image=DOWNLOAD_INFO["image"],
software=DOWNLOAD_INFO['software'], software=DOWNLOAD_INFO["software"],
version=DOWNLOAD_INFO['version'], version=DOWNLOAD_INFO["version"],
token=eos_token_invalid, token=eos_token_invalid,
hash_method=DOWNLOAD_INFO['expected_hash']) hash_method=DOWNLOAD_INFO["expected_hash"],
)
assert my_download.authenticate() is False assert my_download.authenticate() is False
@pytest.mark.dependency(name='authentication') @pytest.mark.dependency(name="authentication")
@pytest.mark.skipif(eos_token == eos_token_invalid, reason="Token is not set correctly") @pytest.mark.skipif(
eos_token == eos_token_invalid, reason="Token is not set correctly"
)
@pytest.mark.skipif(is_on_github_actions(), reason="Running on Github Runner") @pytest.mark.skipif(is_on_github_actions(), reason="Running on Github Runner")
# @pytest.mark.xfail(reason="Deliberate - CI not set for testing AUTH") # @pytest.mark.xfail(reason="Deliberate - CI not set for testing AUTH")
@pytest.mark.webtest @pytest.mark.webtest
@ -96,46 +116,48 @@ class TestEosDownload_invalid():
# @pytest.mark.skip(reason="Not yet implemented in lib") # @pytest.mark.skip(reason="Not yet implemented in lib")
def test_eos_file_name_with_incorrect_software(self, DOWNLOAD_INFO): def test_eos_file_name_with_incorrect_software(self, DOWNLOAD_INFO):
self.eos_downloader.software = 'FAKE' self.eos_downloader.software = "FAKE"
logger.info(f'test build: {self.eos_downloader.__dict__}') logger.info(f"test build: {self.eos_downloader.__dict__}")
with pytest.raises(ValueError) as e_info: with pytest.raises(ValueError) as e_info:
result = self.eos_downloader._build_filename() result = self.eos_downloader._build_filename()
logger.info(f'receive exception: {e_info}') logger.info(f"receive exception: {e_info}")
self.eos_downloader.software = DOWNLOAD_INFO['software'] self.eos_downloader.software = DOWNLOAD_INFO["software"]
@pytest.mark.webtest @pytest.mark.webtest
@pytest.mark.dependency(depends=["authentication"], scope='class') @pytest.mark.dependency(depends=["authentication"], scope="class")
def test_eos_download_get_remote_file_path_for_invlaid_software(self, DOWNLOAD_INFO): def test_eos_download_get_remote_file_path_for_invlaid_software(
self.eos_downloader.software = 'FAKE' self, DOWNLOAD_INFO
logger.info(f'Platform set to: {self.eos_downloader.software}') ):
logger.info(f'test build: {self.eos_downloader.__dict__}') self.eos_downloader.software = "FAKE"
logger.info(f"Platform set to: {self.eos_downloader.software}")
logger.info(f"test build: {self.eos_downloader.__dict__}")
with pytest.raises(ValueError) as e_info: with pytest.raises(ValueError) as e_info:
result = self.eos_downloader._build_filename() result = self.eos_downloader._build_filename()
logger.info(f'receive exception: {e_info}') logger.info(f"receive exception: {e_info}")
self.eos_downloader.software = DOWNLOAD_INFO['software'] self.eos_downloader.software = DOWNLOAD_INFO["software"]
# IMAGE TESTING # IMAGE TESTING
def test_eos_file_name_with_incorrect_image(self, DOWNLOAD_INFO): def test_eos_file_name_with_incorrect_image(self, DOWNLOAD_INFO):
self.eos_downloader.image = 'FAKE' self.eos_downloader.image = "FAKE"
logger.info(f'Image set to: {self.eos_downloader.image}') logger.info(f"Image set to: {self.eos_downloader.image}")
assert DOWNLOAD_INFO['filename'] == self.eos_downloader._build_filename() assert DOWNLOAD_INFO["filename"] == self.eos_downloader._build_filename()
self.eos_downloader.software == DOWNLOAD_INFO['image'] self.eos_downloader.software == DOWNLOAD_INFO["image"]
@pytest.mark.webtest @pytest.mark.webtest
@pytest.mark.dependency(depends=["authentication"], scope='class') @pytest.mark.dependency(depends=["authentication"], scope="class")
def test_eos_download_get_remote_file_path_for_invlaid_image(self, DOWNLOAD_INFO): def test_eos_download_get_remote_file_path_for_invlaid_image(self, DOWNLOAD_INFO):
self.eos_downloader.image = 'FAKE' self.eos_downloader.image = "FAKE"
logger.info(f'Image set to: {self.eos_downloader.image}') logger.info(f"Image set to: {self.eos_downloader.image}")
assert self.eos_downloader.authenticate() is True assert self.eos_downloader.authenticate() is True
assert DOWNLOAD_INFO['filename'] == self.eos_downloader._build_filename() assert DOWNLOAD_INFO["filename"] == self.eos_downloader._build_filename()
self.eos_downloader.image = DOWNLOAD_INFO['image'] self.eos_downloader.image = DOWNLOAD_INFO["image"]
# VERSION TESTING # VERSION TESTING
@pytest.mark.webtest @pytest.mark.webtest
@pytest.mark.dependency(depends=["authentication"], scope='class') @pytest.mark.dependency(depends=["authentication"], scope="class")
def test_eos_download_get_remote_file_path_for_invlaid_version(self, DOWNLOAD_INFO): def test_eos_download_get_remote_file_path_for_invlaid_version(self, DOWNLOAD_INFO):
self.eos_downloader.version = 'FAKE' self.eos_downloader.version = "FAKE"
logger.info(f'Version set to: {self.eos_downloader.version}') logger.info(f"Version set to: {self.eos_downloader.version}")
assert self.eos_downloader._get_remote_filepath() == '' assert self.eos_downloader._get_remote_filepath() == ""