1
0
Fork 0
cvprac/cvprac/cvp_api.py
Daniel Baumann 661e089729
Adding upstream version 1.4.0+dfsg.
Signed-off-by: Daniel Baumann <daniel@debian.org>
2025-02-05 14:18:39 +01:00

4115 lines
184 KiB
Python

# pylint: disable=fixme,too-many-locals,redefined-builtin
#
# Copyright (c) 2017, Arista Networks, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# Neither the name of Arista Networks nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# 'AS IS' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL ARISTA NETWORKS
# BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
# BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
# WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
# OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN
# IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
''' Class containing calls to CVP RESTful API.
'''
import operator
import os
import time
# This import is for proper file IO handling support for both Python 2 and 3
from io import open
from datetime import datetime
from re import split
from cvprac.cvp_client_errors import CvpApiError
try:
from urllib import quote_plus as qplus
except (AttributeError, ImportError):
from urllib.parse import quote_plus as qplus
OPERATOR_DICT = {
'>': operator.gt,
'<': operator.lt,
'>=': operator.ge,
'<=': operator.le,
'==': operator.eq
}
def sanitize_warnings(data):
''' Sanitize the warnings returned after validation.
In some cases where the configlets have both errors
and warnings, CVP may split warnings that have
`,` across multiple strings.
This method concatenates the strings back into one string
per warning, and corrects the warningCount.
Args:
data (dict): A dict that contians the result
of the validation operation
Returns:
response (dict): A dict that contains the result of the
validation operation
'''
if "warnings" not in data:
# nothing to do here, we can return as is
return data
# Since there may be warnings incorrectly split on
# ', ' within the warning text by CVP, we join all the
# warnings together using ', ' into one large string
temp_warnings = ", ".join(data['warnings']).strip()
# To split the large string again we match on the
# 'at line XXX' that should indicate the end of the warning.
# We capture as well the remaining \\n or whitespace and include
# the extra ', ' added in the previous step in the matching criteria.
# The extra ', ' is not included in the strings of the new list
temp_warnings = split(
r'(.*?at line \d+.*?),\s+',
temp_warnings
)
# The behaviour of re.split will add empty strings
# if the regex matches on the begging or ending of the line.
# Refer to https://docs.python.org/3/library/re.html#re.split
# Use filter to remove any empty strings
# that re.split inserted
data['warnings'] = list(filter(None, temp_warnings))
# Update the count of warnings to the correct value
data['warningCount'] = len(data['warnings'])
return data
class CvpApi():
''' CvpApi class contains calls to CVP RESTful API. The RESTful API
parameters are passed in as parameters to the method. The results of
the RESTful API call are converted from json to a dict and returned.
Where needed minimal processing is performed on the results.
Any method that does a cvprac get or post call could raise the
following errors:
ConnectionError: A ConnectionError is raised if there was a network
problem (e.g. DNS failure, refused connection, etc)
CvpApiError: A CvpApiError is raised if there was a JSON error.
CvpRequestError: A CvpRequestError is raised if the request is not
properly constructed.
CvpSessionLogOutError: A CvpSessionLogOutError is raised if
reponse from server indicates session was logged out.
HTTPError: A HTTPError is raised if there was an invalid HTTP response.
ReadTimeout: A ReadTimeout is raised if there was a request
timeout when reading from the connection.
Timeout: A Timeout is raised if there was a request timeout.
TooManyRedirects: A TooManyRedirects is raised if the request exceeds
the configured number of maximum redirections
ValueError: A ValueError is raised when there is no valid
CVP session. This occurs because the previous get or post request
failed and no session could be established to a CVP node. Destroy
the class and re-instantiate.
'''
# pylint: disable=too-many-public-methods
# pylint: disable=too-many-lines
def __init__(self, clnt, request_timeout=30):
''' Initialize the class.
Args:
clnt (obj): A CvpClient object
'''
self.clnt = clnt
self.log = clnt.log
self.request_timeout = request_timeout
def cvp_version_compare(self, opr, version, msg):
''' Check provided version with given operator against the current CVP
version
Args:
opr (string): The operator. Valid operators are:
> - Greater Than
< - Less Than
>= - Greater Than or Equal To
<= - Less Than or Equal To
== - Equal To
version (float): The float API Version number to compare the
running CVP version to.
'''
if opr not in OPERATOR_DICT:
self.log.error("%s is an invalid operation for version comparison", opr)
return False
# Since CVaaS is automatically the latest version of the API, if
# operators > or >= are provided we can quickly check if we are running
# on CVaaS and return True if found.
if opr in ['>', '>='] and self.clnt.is_cvaas:
return True
if self.clnt.apiversion is None:
self.get_cvp_info()
# Example: if a version of 6.0 is provided with greater than or equal
# operator (>=) we are validating that the running CVP version is
# greater than or equal to API Version 6.0.
# Hence -- self.clnt.apiversion >= 6.0
if not OPERATOR_DICT[opr](self.clnt.apiversion, version):
self.log.warning(msg)
return False
return True
def get_cvp_info(self):
''' Returns information about CVP.
Returns:
cvp_info (dict): CVP Information
'''
if not self.clnt.is_cvaas:
data = self.clnt.get('/cvpInfo/getCvpInfo.do',
timeout=self.request_timeout)
else:
# For CVaaS do not run the getCvpInfo REST API and assume the
# latest version of the API
data = {'version': 'cvaas'}
if 'version' in data and self.clnt.apiversion is None:
self.clnt.set_version(data['version'])
return data
# pylint: disable=too-many-arguments
def add_user(self, username, password, role, status, first_name,
last_name, email, user_type):
''' Add new local user to the CVP UI.
Args:
username (str): local username on CVP
password (str): password of the user
role (str): role of the user
status (str): state of the user (Enabled/Disabled)
first_name (str): first name of the user
last_name (str): last name of the user
email (str): email address of the user
user_type (str): type of AAA (Local/TACACS/RADIUS)
'''
if status not in ['Enabled', 'Disabled']:
self.log.error(f"Invalid status {status}."
" Status must be Enabled or Disabled."
" Defaulting to Disabled")
status = 'Disabled'
data = {"roles": [role],
"user": {"contactNumber": "",
"email": email,
"firstName": first_name,
"lastName": last_name,
"password": password,
"userId": username,
"userStatus": status,
"userType": user_type}}
return self.clnt.post('/user/addUser.do', data=data,
timeout=self.request_timeout)
def update_user(self, username, password, role, status, first_name,
last_name, email, user_type):
''' Updates username information, like
changing password, user role, email address, names,
disable/enable the username.
Args:
username (str): local username on CVP
password (str): password of the user
role (str): role of the user
status (str): state of the user (Enabled/Disabled)
first_name (str): first name of the user
last_name (str): last name of the user
email (str): email address of the user
user_type (str): type of AAA (Local/TACACS/RADIUS)
'''
if status not in ['Enabled', 'Disabled']:
self.log.error(f"Invalid status {status}."
f" Status must be Enabled or Disabled."
f" Defaulting to Disabled")
status = 'Disabled'
data = {"roles": [role],
"user": {"contactNumber": "",
"email": email,
"firstName": first_name,
"lastName": last_name,
"password": password,
"userId": username,
"userStatus": status,
"userType": user_type}}
return self.clnt.post(f"/user/updateUser.do?userId={username}",
data=data, timeout=self.request_timeout)
def get_user(self, username):
''' Returns specified user information
Args:
username (str): username on CVP
'''
return self.clnt.get(f"/user/getUser.do?userId={qplus(username)}",
timeout=self.request_timeout)
def get_users(self, query='', start=0, end=0):
''' Returns all users in CVP filtered by an optional query parameter
Args:
query (str): Query parameter to filter users by.
start (int): Start index for the pagination. Default is 0.
end (int): End index for the pagination. If end index is 0
then all the records will be returned. Default is 0.
Returns:
response (dict): A dict that contains the users.
{'total': 1,
'roles': {'cvpadmin': ['network-admin']},
'users': [{'userId': 'cvpadmin',
'firstName': '',
'lastName': '',
'description': '',
'email': 'cvprac@cvprac.com',
'lastAccessed': 1654555139700,
'contactNumber': '',
'userType': 'Local',
'userStatus': 'Enabled',
'currentStatus': 'Online',
'addedByUser': 'cvp system'}]}
'''
self.log.debug(f"get_users: query: {query}")
return self.clnt.get(f"/user/getUsers.do?"
f"queryparam={qplus(query)}&startIndex={start}&endIndex={end}",
timeout=self.request_timeout)
def delete_user(self, username):
''' Remove specified user from CVP
Args:
username (str): username on CVP
'''
data = [username]
return self.clnt.post('/user/deleteUsers.do', data=data,
timeout=self.request_timeout)
def get_task_by_id(self, task_id):
''' Returns the current CVP Task status for the task with the specified
TaskId.
Args:
task_id (int): CVP task identifier
Returns:
task (dict): The CVP task for the associated Id. Returns None
if the task_id was invalid.
'''
self.log.debug(f"get_task_by_id: task_id: {task_id}")
try:
task = self.clnt.get(f"/task/getTaskById.do?taskId={task_id}",
timeout=self.request_timeout)
except CvpApiError as error:
self.log.debug(f"Caught error: {error} attempting to get task.")
# Catch an invalid task_id error and return None
return None
return task
def get_tasks_by_status(self, status, start=0, end=0):
''' Returns a list of tasks with the given status.
Args:
status (str): Task status
start (int): Start index for the pagination. Default is 0.
end (int): End index for the pagination. If end index is 0
then all the records will be returned. Default is 0.
Returns:
tasks (list): The list of tasks
'''
self.log.debug(f"get_tasks_by_status: status: {status}")
data = self.clnt.get(
f"/task/getTasks.do?queryparam={status}&startIndex={start}&endIndex={end}",
timeout=self.request_timeout)
return data['data']
def get_tasks(self, start=0, end=0):
''' Returns a list of all the tasks.
Args:
start (int): Start index for the pagination. Default is 0.
end (int): End index for the pagination. If end index is 0
then all the records will be returned. Default is 0.
Returns:
tasks (dict): The 'total' key contains the number of tasks,
the 'data' key contains a list of the tasks.
'''
self.log.debug('get_tasks:')
return self.clnt.get(f"/task/getTasks.do?queryparam=&startIndex={start}&"
f"endIndex={end}", timeout=self.request_timeout)
def get_logs_by_id(self, task_id, start=0, end=0):
''' Returns the log entries for the task with the specified TaskId.
Args:
task_id (int): CVP task identifier
start (int): The first log entry to return. Default is 0.
end (int): The last log entry to return. Default is 0 which
means to return all log entries. Can be a large number to
indicate the last log entry.
Returns:
task (dict): The CVP log for the associated Id. Returns None
if the task_id was invalid.
'''
self.log.debug(f"get_logs_by_id: task_id: {task_id}")
if self.clnt.apiversion is None:
self.get_cvp_info()
if self.clnt.apiversion < 5.0:
self.log.debug('v1 - v4 /task/getLogsByID.do?')
resp = self.clnt.get(f"/task/getLogsById.do?id={task_id}&queryparam="
f"&startIndex={start}&endIndex={end}",
timeout=self.request_timeout)
else:
self.log.debug('v5 /audit/getLogs.do')
task_info = self.get_task_by_id(task_id)
stage_id = None
if 'stageId' in task_info:
stage_id = task_info['stageId']
else:
self.log.debug(f"No stage ID found for task {task_id}")
if 'ccIdV2' in task_info:
cc_id = task_info['ccIdV2']
if cc_id == '':
self.log.debug(f"No ccIdV2 for task {task_id}."
f" It was likely cancelled."
f" Using old /task/getLogsByID.do?")
resp = self.clnt.get(
f"/task/getLogsById.do?id={task_id}&queryparam="
f"&startIndex={start}&endIndex={end}",
timeout=self.request_timeout)
else:
resp = self.get_audit_logs_by_id(cc_id, stage_id)
else:
self.log.debug(f"No change ID found for task {task_id}")
resp = None
return resp
def get_audit_logs_by_id(self, cc_id, stage_id=None, data_size=75):
''' Returns the audit logs of a particular ChangeControl.
Args:
cc_id (string): change control ID from ccIdV2 field
stage_id (string): stage ID from stageId field
data_size (int): data size
Returns:
task (dict): The CVP log for the associated ccIdV2
'''
data = {"category": "ChangeControl",
"startTime": 0,
"endTime": 0,
"dataSize": data_size,
"objectKey": cc_id,
"lastRetrievedAudit": {}}
if stage_id:
data["tags"] = {"stageId": stage_id}
return self.clnt.post('/cvpservice/audit/getLogs.do?', data=data,
timeout=self.request_timeout)
def add_note_to_task(self, task_id, note):
''' Add notes to the task.
Args:
task_id (str): Task ID
note (str): Note to add to the task
'''
self.log.debug(f"add_note_to_task: task_id: {task_id} note: {note}")
data = {'workOrderId': task_id, 'note': note}
self.clnt.post('/task/addNoteToTask.do', data=data,
timeout=self.request_timeout)
def execute_task(self, task_id):
''' Execute the task. Note that if the task has failed then inspect
the task logs to determine why the task failed. If you see:
Failure response received from the netElement: Unauthorized User
then it means that the netelement does not have the same user ID
and/or password as the CVP user executing the task.
Args:
task_id (str): Task ID
'''
self.log.debug(f"execute_task: task_id: {task_id}")
data = {'data': [task_id]}
self.clnt.post('/task/executeTask.do', data=data, timeout=self.request_timeout)
def cancel_task(self, task_id):
''' Cancel the task
Args:
task_id (str): Task ID
'''
self.log.debug(f"cancel_task: task_id: {task_id}")
data = {'data': [task_id]}
return self.clnt.post('/task/cancelTask.do', data=data,
timeout=self.request_timeout)
def get_configlets(self, start=0, end=0):
''' Returns a list of all defined configlets.
Args:
start (int): Start index for the pagination. Default is 0.
end (int): End index for the pagination. If end index is 0
then all the records will be returned. Default is 0.
'''
if self.clnt.apiversion is None:
self.get_cvp_info()
configlets = self.clnt.get(f"/configlet/getConfiglets.do?"
f"startIndex={start}&endIndex={end}",
timeout=self.request_timeout)
if self.clnt.apiversion == 1.0 or self.clnt.apiversion >= 4.0:
self.log.debug('v1/v4+ Inventory API Call')
return configlets
self.log.debug('v2 Inventory API Call')
# New API getConfiglets does not return the actual configlet config
# Get the actual configlet config using getConfigletByName
if 'data' in configlets:
for configlet in configlets['data']:
full_cfglt_data = self.get_configlet_by_name(configlet['name'])
configlet['config'] = full_cfglt_data['config']
return configlets
def get_configlets_and_mappers(self):
''' Returns a list of all defined configlets and associated mappers
'''
self.log.debug(
'get_configlets_and_mappers: getConfigletsAndAssociatedMappers')
return self.clnt.get('/configlet/getConfigletsAndAssociatedMappers.do',
timeout=self.request_timeout)
def get_configlet_builder(self, c_id):
''' Returns the configlet builder data for the given configlet ID.
Args:
c_id (str): The ID (key) for the configlet to be queried.
'''
return self.clnt.get(f"/configlet/getConfigletBuilder.do?id={c_id}",
timeout=self.request_timeout)
def search_configlets(self, query, start=0, end=0):
''' Returns a list of configlets that match a search query.
Args:
query (str): A simple string of text to be matched against
the existing configlets. Not a regex.
start (int): Start index for the pagination. Default is 0.
end (int): End index for the pagination. If end index is 0
then all the records will be returned. Default is 0.
'''
self.log.debug(f"search_configlets: query: {query}")
return self.clnt.get(f"/configlet/searchConfiglets.do?"
f"queryparam={qplus(query)}&startIndex={start}&endIndex={end}",
timeout=self.request_timeout)
def get_configlet_by_name(self, name):
''' Returns the configlet with the specified name
Args:
name (str): Name of the configlet. Can contain spaces.
Returns:
configlet (dict): The configlet dict.
'''
self.log.debug(f"get_configlets_by_name: name: {name}")
return self.clnt.get(f"/configlet/getConfigletByName.do?name={qplus(name)}",
timeout=self.request_timeout)
def get_configlets_by_container_id(self, c_id, start=0, end=0):
''' Returns a list of configlets applied to the given container.
Args:
c_id (str): The container ID (key) to query.
start (int): Start index for the pagination. Default is 0.
end (int): End index for the pagination. If end index is 0
then all the records will be returned. Default is 0.
'''
return self.clnt.get(f"/provisioning/getConfigletsByContainerId.do?"
f"containerId={c_id}&startIndex={start}&endIndex={end}",
timeout=self.request_timeout)
def get_configlets_by_netelement_id(self, d_id, start=0, end=0):
''' Returns a list of configlets applied to the given device.
Args:
d_id (str): The device ID (key) to query.
start (int): Start index for the pagination. Default is 0.
end (int): End index for the pagination. If end index is 0
then all the records will be returned. Default is 0.
'''
return self.clnt.get(f"/provisioning/getConfigletsByNetElementId.do?"
f"netElementId={d_id}&startIndex={start}&endIndex={end}",
timeout=self.request_timeout)
def get_image_bundle_by_container_id(self, container_id, start=0, end=0,
scope='false'):
''' Returns a list of ImageBundles applied to the given container.
Args:
container_id (str): The container ID (key) to query.
start (int): Start index for the pagination. Default is 0.
end (int): End index for the pagination. If end index is 0
then all the records will be returned. Default is 0.
scope (string) the session scope (true or false).
'''
if scope not in ('true', 'false'):
self.log.error("scope value must be true or false. %s is an invalid value."
" Defaulting back to false", scope)
scope = 'false'
return self.clnt.get(f"/provisioning/getImageBundleByContainerId.do?"
f"containerId={container_id}&startIndex={start}&endIndex={end}"
f"&sessionScope={scope}",
timeout=self.request_timeout)
def get_configlet_history(self, key, start=0, end=0):
''' Returns the configlet history.
Args:
key (str): Key for the configlet.
start (int): The first configlet entry to return. Default is 0
end (int): The last configlet entry to return. Default is 0
which means to return all configlet entries. Can be a
large number to indicate the last configlet entry.
Returns:
history (dict): The configlet dict with the changes from
most recent to oldest.
'''
self.log.debug(f"get_configlets_history: key: {key}")
return self.clnt.get(f"/configlet/getConfigletHistory.do?configletId="
f"{key}&queryparam=&startIndex={start}&endIndex={end}",
timeout=self.request_timeout)
def get_inventory(self, start=0, end=0, query='', provisioned=True):
''' Returns the a dict of the net elements known to CVP.
Args:
start (int): The first inventory entry to return. Default is 0
end (int): The last inventory entry to return. Default is 0
which means to return all inventory entries. Can be a
large number to indicate the last inventory entry.
query (string): A value that can be used as a match to filter
returned inventory list. For example get all switches that
are running a specific version of EOS.
'''
self.log.debug('get_inventory: called')
if self.clnt.apiversion is None:
self.get_cvp_info()
if self.clnt.apiversion == 1.0:
self.log.debug('v1 Inventory API Call')
data = self.clnt.get(f"/inventory/getInventory.do?"
f"queryparam={qplus(query)}&startIndex={start}&endIndex={end}",
timeout=self.request_timeout)
return data['netElementList']
self.log.debug('v2 Inventory API Call')
data = self.clnt.get(f"/inventory/devices?provisioned={provisioned}",
timeout=self.request_timeout)
containers = self.get_containers()
for dev in data:
dev['key'] = dev['systemMacAddress']
dev['deviceInfo'] = dev['deviceStatus'] = dev['status']
dev['isMLAGEnabled'] = dev['mlagEnabled']
dev['isDANZEnabled'] = dev['danzEnabled']
dev['parentContainerId'] = dev['parentContainerKey']
dev['bootupTimeStamp'] = dev['bootupTimestamp']
dev['internalBuildId'] = dev['internalBuild']
if 'taskIdList' not in dev:
dev['taskIdList'] = []
if 'tempAction' not in dev:
dev['tempAction'] = None
dev['memTotal'] = 0
dev['memFree'] = 0
dev['sslConfigAvailable'] = False
dev['sslEnabledByCVP'] = False
dev['lastSyncUp'] = 0
dev['type'] = 'netelement'
dev['dcaKey'] = None
container_found = False
for container in containers['data']:
if dev['parentContainerKey'] == container['key']:
dev['containerName'] = container['name']
container_found = True
if not container_found:
dev['containerName'] = ''
return data
def add_devices_to_inventory(self, device_list, wait=False):
''' Add a list of devices to the specified parent container.
Args:
device_list (list): A list of devices to be added in the
form of dictionaries. Each device dictionary should
contain the following information:
- device_ip (str): ip address of device we are adding
- parent_name (str): Parent container name
- parent_key (str): Parent container key
wait (boolean): Specifies whether to allow a wait time for
devices to appear in inventory before moving them to
the specified container. Applies to v2 API only.
Example device list:
device_list = [
{
device_ip: '10.10.10.1',
parent_name: 'Tenant',
parent_key: 'root'
},
{
device_ip: '10.10.10.2',
parent_name: 'MyContainer',
parent_key: 'container-id-1234'
}
]
'''
self.log.debug('add_device_to_inventory: called')
if self.clnt.apiversion is None:
self.get_cvp_info()
if self.clnt.apiversion == 1.0:
self.log.debug('v1 Inventory API Call')
data_list = []
for device in device_list:
dev_data = {
'containerName': device['parent_name'],
'containerId': device['parent_key'],
'containerType': 'Existing',
'ipAddress': device['device_ip'],
'containerList': []
}
data_list.append(dev_data)
data = {'data': data_list}
self.clnt.post('/inventory/add/addToInventory.do?'
'startIndex=0&endIndex=0', data=data,
timeout=self.request_timeout)
else:
self.log.debug('v2 Inventory API Call')
# Create a list of device IPs
device_ips = [dev['device_ip'] for dev in device_list]
# First add the devices to the inventory in a single call
data = {'hosts': device_ips}
self.clnt.post('/inventory/devices', data=data,
timeout=self.request_timeout)
# Get the inventory list
inv = self.get_inventory()
if wait:
# With v2, the devices can take a few moments to appear
# We need them present before we can move them to a container
timeout = time.time() + 600
while device_ips and time.time() < timeout:
inv_devices = [dev['ipAddress'] for dev in inv]
device_ips = list(set(device_ips) - set(inv_devices))
if device_ips:
time.sleep(2)
inv = self.get_inventory()
if device_ips:
# If any devices did not appear, there is a problem
# Join the missing IPs into a string for output
missing_ips = ', '.join(device_ips)
raise RuntimeError(f"Devices {missing_ips} failed to appear in inventory")
# Move the devices to their specified containers
for device in device_list:
devs = [dev for dev in inv if 'ipAddress' in dev and
device['device_ip'] in dev['ipAddress']]
dev = devs[0]
container = {'key': device['parent_key'],
'name': device['parent_name']}
self.move_device_to_container('add_device_to_inventory API v2',
dev, container, False)
def add_device_to_inventory(self, device_ip, parent_name,
parent_key, wait=False):
''' Add the device to the specified parent container.
Args:
device_ip (str): ip address of device we are adding
parent_name (str): Parent container name
parent_key (str): Parent container key
'''
# Put parameters into a dictionary and call add_devices_to_inventory
device = {
'device_ip': device_ip,
'parent_name': parent_name,
'parent_key': parent_key
}
self.add_devices_to_inventory([device], wait=wait)
def retry_add_to_inventory(self, dev_mac, device_ip, username,
password):
'''Retry addition of device to Cvp inventory
Args:
dev_mac (str): MAC address of device
device_ip (str): ip address assigned to device
username (str): username for device login
password (str): password for user
'''
self.log.debug('retry_add_to_inventory: called')
if self.clnt.apiversion is None:
self.get_cvp_info()
if self.clnt.apiversion == 1.0:
self.log.debug('v1 Inventory API Call')
data = {"key": dev_mac,
"ipAddress": device_ip,
"userName": username,
"password": password}
self.clnt.post('/inventory/add/retryAddDeviceToInventory.do?'
'startIndex=0&endIndex=0',
data=data,
timeout=self.request_timeout)
else:
self.log.debug('v2 Inventory API Call')
self.log.warning(
'retry_add_to_inventory: not implemented for v2 APIs')
def delete_device(self, dev_mac):
'''Delete the device and its pending tasks from Cvp inventory
Args:
dev_mac (str): mac address of device we are deleting
For CVP 2020 this param is now required to
be the device serial number instead of MAC
address. This method will handle getting
the device serial number via the provided
MAC address.
Returns:
data (dict): Contains success or failure message
'''
self.log.debug('delete_device: called')
return self.delete_devices([dev_mac])
def delete_devices(self, dev_macs):
'''Delete the device and its pending tasks from Cvp inventory
Args:
dev_macs (list): list of mac address for
devices we're deleting
For CVP 2020 this param is now required to
be a list of device serial numbers instead
of MAC addresses. This method will handle
getting the device serial number via the
provided MAC address.
Returns:
data (dict): Contains success or failure message
'''
self.log.debug('delete_devices: called')
resp = None
if self.clnt.apiversion is None:
self.get_cvp_info()
if self.clnt.apiversion < 4.0:
data = {'data': dev_macs}
resp = self.clnt.post('/inventory/deleteDevices.do?', data=data,
timeout=self.request_timeout)
else:
self.log.warning('NOTE: The Delete Devices API has updated for'
' CVP 2020.2 and it is not required to send the'
' device serial number instead of mac address'
' when deleting a device. Looking up each devices'
'serial num based on provided MAC addresses')
devices = []
for dev_mac in dev_macs:
device_info = self.get_device_by_mac(dev_mac)
if device_info is not None and 'serialNumber' in device_info:
devices.append(device_info)
resp = self.delete_devices_by_serial(devices)
return resp
def delete_devices_by_serial(self, devices):
'''Delete the device and its pending tasks from Cvp inventory
Args:
devices (list): list of device objects to be deleted
Returns:
data (dict): Contains success or failure message
'''
device_serials = []
for device in devices:
device_serials.append(device['serialNumber'])
data = {'data': device_serials}
resp = self.clnt.delete('/inventory/devices', data=data,
timeout=self.request_timeout)
return resp
def get_non_connected_device_count(self):
'''Returns number of devices not accessible/connected in the temporary
inventory.
Returns:
data (int): Number of temporary inventory devices not
accessible/connected
'''
self.log.debug('get_non_connected_device_count: called')
if self.clnt.apiversion is None:
self.get_cvp_info()
if self.clnt.apiversion == 1.0:
self.log.debug('v1 Inventory API Call')
data = self.clnt.get(
'/inventory/add/getNonConnectedDeviceCount.do',
timeout=self.request_timeout)
return data['data']
self.log.debug('v2 Inventory API Call')
data = self.clnt.get('/inventory/devices?provisioned=false',
timeout=self.request_timeout)
unprovisioned_devs = 0
for dev in data:
if 'status' in dev and dev['status'] == '':
unprovisioned_devs += 1
return unprovisioned_devs
def save_inventory(self):
'''Saves Cvp inventory state
'''
self.log.debug('save_inventory: called')
if self.clnt.apiversion is None:
self.get_cvp_info()
if self.clnt.apiversion == 1.0:
self.log.debug('v1 Inventory API Call')
return self.clnt.post('/inventory/add/saveInventory.do',
timeout=self.request_timeout)
self.log.debug('v2 Inventory API Call')
message = 'Save Inventory not implemented/necessary for' +\
' CVP 2018.2 and beyond'
data = {'data': 0, 'message': message}
return data
def get_devices_in_container(self, name):
''' Returns a dict of the devices under the named container.
Args:
name (str): The name of the container to get devices from
'''
self.log.debug('get_devices_in_container: called')
devices = []
container = self.get_container_by_name(name)
if container:
all_devices = self.get_inventory(0, 0, name)
for device in all_devices:
if device['parentContainerId'] == container['key']:
devices.append(device)
return devices
def get_device_by_name(self, fqdn, search_by_hostname=False):
''' Returns the net element device dict for the devices fqdn name.
Args:
fqdn (str): Fully qualified domain name or hostname of the
device.
search_by_hostname (boolean): if set True will attempt to split
the fqdn string to match on the hostname portion
specifically which should be the first component
Returns:
device (dict): The net element device dict for the device if
otherwise returns an empty hash.
'''
self.log.debug(f"get_device_by_name: fqdn: {fqdn}")
# data = self.get_inventory(start=0, end=0, query=fqdn)
data = self.search_topology(fqdn)
device = {}
if 'netElementList' in data:
for netelem in data['netElementList']:
if not search_by_hostname:
if netelem['fqdn'] == fqdn:
device = netelem
break
else:
if netelem['fqdn'].split('.')[0] == fqdn:
device = netelem
break
return device
def get_device_by_mac(self, dev_mac):
''' Returns the net element device dict for the devices mac address.
Args:
dev_mac (str): MAC Address of the device.
Returns:
device (dict): The net element device dict for the device if
otherwise returns an empty hash.
'''
self.log.debug(f"get_device_by_mac: MAC address: {dev_mac}")
# data = self.get_inventory(start=0, end=0, query=dev_mac)
data = self.search_topology(dev_mac)
device = {}
if 'netElementList' in data:
for netelem in data['netElementList']:
if netelem['systemMacAddress'] == dev_mac:
device = netelem
break
return device
def get_device_by_serial(self, device_serial):
''' Returns the net element device dict for the devices serial number.
Args:
device_serial (str): Serial number of the device.
Returns:
device (dict): The net element device dict for the device if
otherwise returns an empty hash.
'''
self.log.debug(f"get_device_by_serial: Serial Number: {device_serial}")
data = self.search_topology(device_serial)
device = {}
if 'netElementList' in data:
for netelem in data['netElementList']:
if netelem['serialNumber'] == device_serial:
device = netelem
break
return device
def get_device_configuration(self, dev_mac):
''' Returns the running configuration for the device provided.
Args:
dev_mac (str): Mac address of the device to get the running
configuration for.
Returns:
device (dict): The net element device dict for the device if
otherwise returns an empty hash.
'''
self.log.debug(f"get_device_configuration: dev_mac: {dev_mac}")
if self.clnt.apiversion is None:
self.get_cvp_info()
if self.clnt.apiversion < 4.0:
data = self.clnt.get(f"/inventory/getInventoryConfiguration.do?netElementId={dev_mac}",
timeout=self.request_timeout)
else:
data = self.clnt.get(f"/inventory/device/config?netElementId={dev_mac}",
timeout=self.request_timeout)
running_config = ''
if 'output' in data:
running_config = data['output']
return running_config
def get_device_image_info(self, dev_mac):
''' Return a dict of info about a device in CVP.
Args:
dev_mac (str): Mac address of the device to get the running
configuration for.
Returns:
device_image_info (dict): Dict of image info for the device
if found. Otherwise returns None.
'''
self.log.debug(f"Attempt to get net element data for {dev_mac}")
try:
device_image_info = self.clnt.get(
f"/provisioning/getNetElementInfoById.do?netElementId={qplus(dev_mac)}",
timeout=self.request_timeout)
except CvpApiError as error:
# Catch error when device for provided MAC is not found
if 'Invalid Netelement id' in str(error):
self.log.debug(f"Device with MAC {dev_mac} not found")
return None
raise error
return device_image_info
def get_containers(self, start=0, end=0):
''' Returns a list of all the containers.
Args:
start (int): Start index for the pagination. Default is 0.
end (int): End index for the pagination. If end index is 0
then all the records will be returned. Default is 0.
Returns:
containers (dict): The 'total' key contains the number of
containers, the 'data' key contains a list of the containers
with associated info.
'''
self.log.debug('Get list of containers')
if self.clnt.apiversion is None:
self.get_cvp_info()
if self.clnt.apiversion == 1.0:
self.log.debug('v1 Inventory API Call')
return self.clnt.get(f"/inventory/add/searchContainers.do?"
f"startIndex={start}&endIndex={end}")
self.log.debug('v2 Inventory API Call')
containers = self.clnt.get('/inventory/containers')
for container in containers:
container['name'] = container['Name']
container['key'] = container['Key']
full_cont_info = self.get_container_by_id(
container['Key'])
if (full_cont_info is not None and
container['Key'] != 'root'):
container['parentName'] = full_cont_info['parentName']
for cont in containers:
if cont['Name'] == full_cont_info['parentName']:
container['parentId'] = cont['Key']
break
else:
self.log.debug(f"No container parentId found for"
f" parentName {full_cont_info['parentName']}")
container['parentId'] = None
else:
container['parentName'] = None
container['parentId'] = None
container['type'] = None
container['id'] = 21
container['factoryId'] = 1
container['userId'] = None
container['childContainerId'] = None
return {'data': containers, 'total': len(containers)}
def get_container_by_name(self, name):
''' Returns a container that exactly matches the name.
Args:
name (str): String to search for in container names.
Returns:
container (dict): Container info in dictionary format or None
'''
self.log.debug(f"Get info for container {name}")
conts = self.clnt.get(f"/provisioning/searchTopology.do?queryParam={qplus(name)}"
f"&startIndex=0&endIndex=0")
if conts['total'] > 0 and conts['containerList']:
for cont in conts['containerList']:
if cont['name'] == name:
return cont
return None
def get_container_by_id(self, key):
''' Returns a container for the given id.
Args:
key (str): String ID for container to find.
Returns:
container (dict): Container info in dictionary format or None
'''
self.log.debug(f"Get info for container {key}")
return self.clnt.get(f"/provisioning/getContainerInfoById.do?"
f"containerId={qplus(key)}")
def get_configlets_by_device_id(self, mac, start=0, end=0):
''' Returns the list of configlets applied to a device.
Args:
mac (str): Device mac address (i.e. device id)
start (int): The first configlet entry to return. Default is 0
end (int): The last configlet entry to return. Default is 0
which means to return all configlet entries. Can be a
large number to indicate the last configlet entry.
Returns:
configlets (list): The list of configlets applied to the device
'''
self.log.debug(f"get_configlets_by_device: mac: {mac}")
data = self.get_configlets_by_netelement_id(mac, start, end)
return data['configletList']
def add_configlet_builder(self, name, config, draft=False, form=None):
''' Add a confilget builder and return the key for the configlet builder.
Args:
name (str): Configlet builder name
config (str): Python configlet builder code
draft (bool): If builder is a draft
form (list): Array/list of form data
Parameters:
fieldId (str): "",
fieldLabel (str): "",
value (str): "",
type (str): "", (Options below)
('Text box',
'Text area',
'Drop down',
'Check box',
'Radio button',
'IP address',
'Password')
validation: {
mandatory (boolean): true,
},
helpText (str): "",
depends (str): "",
dataValidationErrorExist (boolean): true,
dataValidation (string): ""
Returns:
key (str): The key for the configlet
'''
if not form:
form = []
self.log.debug(f"add_configlet_builder: name: {name} config: {config} form: {form}")
data = {'name': name,
'data': {'formList': form,
'main_script': {'data': config}}}
# Create the configlet builder
self.clnt.post(f"/configlet/addConfigletBuilder.do?isDraft={draft}",
data=data, timeout=self.request_timeout)
# Get the key for the configlet
data = self.clnt.get(f"/configlet/getConfigletByName.do?name={qplus(name)}",
timeout=self.request_timeout)
return data['key']
def add_configlet(self, name, config):
''' Add a configlet and return the key for the configlet.
Args:
name (str): Configlet name
config (str): Switch config statements
Returns:
key (str): The key for the configlet
'''
self.log.debug(f"add_configlet: name: {name} config: {config}")
body = {'name': name, 'config': config}
# Create the configlet
self.clnt.post('/configlet/addConfiglet.do', data=body,
timeout=self.request_timeout)
# Get the key for the configlet
data = self.clnt.get(f"/configlet/getConfigletByName.do?name={qplus(name)}",
timeout=self.request_timeout)
return data['key']
def delete_configlet(self, name, key):
''' Delete the configlet.
Args:
name (str): Configlet name
key (str): Configlet key
'''
self.log.debug(f"delete_configlet: name: {name} key: {key}")
body = [{'name': name, 'key': key}]
# Delete the configlet
self.clnt.post('/configlet/deleteConfiglet.do', data=body,
timeout=self.request_timeout)
def update_configlet(self, config, key, name, wait_task_ids=False):
''' Update a configlet.
Args:
config (str): Switch config statements
key (str): Configlet key
name (str): Configlet name
wait_task_ids (boolean): Wait for task IDs to generate
Returns:
data (dict): Contains success or failure message
'''
self.log.debug(f"update_configlet: config: {config} key: {key} name: {name}")
# Update the configlet
body = {'config': config, 'key': key, 'name': name,
'waitForTaskIds': wait_task_ids}
return self.clnt.post('/configlet/updateConfiglet.do', data=body,
timeout=self.request_timeout)
def update_configlet_builder(self, name, key, config, draft=False,
wait_for_task=False, form=None):
''' Update an existing configlet builder.
Args:
config (str): Contents of the configlet builder configuration
key: (str): key/id of the configlet builder to be updated
name: (str): name of the configlet builder
draft (boolean): is update a draft
wait_for_task (boolean): wait for task IDs to be generated
form (list): Array/list of form data
Parameters:
fieldId (str): "",
fieldLabel (str): "",
value (str): "",
type (str): "", (Options below)
('Text box',
'Text area',
'Drop down',
'Check box',
'Radio button',
'IP address',
'Password')
validation: {
mandatory (boolean): true,
},
helpText (str): "",
depends (str): "",
dataValidationErrorExist (boolean): true,
dataValidation (string): ""
'''
if not form:
form = []
data = {
"name": name,
"waitForTaskIds": wait_for_task,
"data": {
"formList": form,
"main_script": {
"data": config
}
}
}
self.log.debug(f"update_configlet_builder: config: {config}"
f" key: {key} name: {name} form: {form}")
# Update the configlet builder
url_string = f"/configlet/updateConfigletBuilder.do?isDraft={draft}&id={key}&action=save"
return self.clnt.post(url_string, data=data, timeout=self.request_timeout)
def update_reconcile_configlet(self, dev_mac, config, key, name,
reconciled=False):
''' Update the reconcile configlet.
Args:
dev_mac (str): Mac address/Key for device whose reconcile
configlet is being updated
config (str): Reconciled config statements
key (str): Reconcile Configlet key
name (str): Reconcile Configlet name
reconciled (boolean): Wait for task IDs to generate
Returns:
data (dict): Contains success or failure message
'''
self.log.debug(f"update_reconcile_configlet: dev_mac: {dev_mac}"
f" config: {config} key: {key} name: {name}")
url_str = f"/provisioning/updateReconcileConfiglet.do?netElementId={dev_mac}"
body = {
'config': config,
'key': key,
'name': name,
'reconciled': reconciled,
'unCheckedLines': '',
}
return self.clnt.post(url_str, data=body, timeout=self.request_timeout)
def add_note_to_configlet(self, key, note):
''' Add a note to a configlet.
Args:
key (str): Configlet key
note (str): Note to be added to configlet.
'''
data = {
'key': key,
'note': note,
}
return self.clnt.post('/configlet/addNoteToConfiglet.do',
data=data, timeout=self.request_timeout)
def validate_config_for_device(self, dev_mac, config):
''' Validate a config against a device
Args:
dev_mac (str): Device MAC address
config (str): Switch config statements
Returns:
response (dict): A dict that contains the result of the
validation operation
'''
self.log.debug(f"validate_config_for_device: dev_mac: {dev_mac} config: {config}")
body = {'netElementId': dev_mac, 'config': config}
return sanitize_warnings(
self.clnt.post(
'/configlet/validateConfig.do',
data=body,
timeout=self.request_timeout
)
)
def validate_config(self, dev_mac, config):
''' Validate a config against a device and parse response to
produce log messages are return a flag for the config validity.
Args:
dev_mac (str): Device MAC address
config (str): Switch config statements
Returns:
response (boolean): A flag signifying if the config is valid or
not.
'''
self.log.debug(f"validate_config: dev_mac: {dev_mac} config: {config}")
result = self.validate_config_for_device(dev_mac, config)
validated = True
if 'warningCount' in result and result['warnings']:
for warning in result['warnings']:
self.log.warning(f"Validation of config produced warning - {warning}")
if 'errorCount' in result:
self.log.error(f"Validation of config produced {result['errorCount']} errors")
if 'errors' in result:
for error in result['errors']:
self.log.error(f"Validation of config produced error - {error}")
validated = False
if 'result' in result:
for item in result['result']:
if 'messages' in item:
for message in item['messages']:
self.log.info(f"Validation of config returned message - {message}")
return validated
def get_all_temp_actions(self, start=0, end=0):
''' Returns a list of existing temp actions.
Args:
start (int): Start index for the pagination. Default is 0.
end (int): End index for the pagination. If end index is 0
then all the records will be returned. Default is 0.
Returns:
response (dict): A dict that contains a list of the current
temp actions.
'''
url = (f"/provisioning/getAllTempActions.do?startIndex={start}&endIndex={end}")
data = self.clnt.get(url, timeout=self.request_timeout)
return data
def _add_temp_action(self, data):
''' Adds temp action that requires a saveTopology call to take effect.
Args:
data (dict): a data dict with a specific format for the
desired action.
Base Ex: data = {'data': [{specific key/value pairs}]}
'''
url = ('/provisioning/addTempAction.do?'
'format=topology&queryParam=&nodeId=root')
self.clnt.post(url, data=data, timeout=self.request_timeout)
def _save_topology_v2(self, data):
''' Confirms a previously created temp action.
Args:
data (list): a list that contains a dict with a specific
format for the desired action. Our primary use case is for
confirming existing temp actions so we most often send an
empty list to confirm an existing temp action.
Returns:
response (dict): A dict that contains a status and a list of
task ids created (if any).
Ex: {u'data': {u'status': u'success', u'taskIds': []}}
'''
url = '/provisioning/v2/saveTopology.do'
return self.clnt.post(url, data=data, timeout=self.request_timeout)
def apply_configlets_to_device(self, app_name, dev, new_configlets, # pylint: disable=too-many-locals
create_task=True, reorder_configlets=False, validate=False):
''' Apply the configlets to the device.
Args:
app_name (str): The application name to use in info field.
dev (dict): The switch device dict
new_configlets (list): List of configlet name and key pairs
create_task (bool): Determines whether or not to execute a save
and create the tasks (if any)
reorder_configlets (bool): Defaults to False. To use this
parameter you must first get the full list of configlets
applied to the device (for example via the
get_configlets_by_device_id function) and provide the
full list of configlets (in addition to any new configlets
being applied) in the desired order as the new_configlets
parameter. It is also important to keep in mind configlets
that are applied to parent containers because they will
be applied before configlets applied to the device
directly. Set this parameter to True only with the full
list of configlets being applied to the device provided
via the new_configlets parameter.
validate (bool): Defaults to False. If set to True, the function
will validate and compare the configlets to be attached and
populate the configCompareCount field in the data dict. In case
all keys are 0, ie there is no difference between designed-config
and running-config after applying the configlets, no task will be
generated.
Returns:
response (dict): A dict that contains a status and a list of
task ids created (if any).
Ex: {u'data': {u'status': u'success', u'taskIds': [u'32']}}
'''
self.log.debug(f"apply_configlets_to_device: dev: {dev} names: {new_configlets}")
# Get a list of the names and keys of the configlets
cnames = []
ckeys = []
if not reorder_configlets:
# Get all the configlets assigned to the device.
configlets = self.get_configlets_by_device_id(
dev['systemMacAddress'])
for configlet in configlets:
cnames.append(configlet['name'])
ckeys.append(configlet['key'])
# Add the new configlets to the end of the arrays
for entry in new_configlets:
cnames.append(entry['name'])
ckeys.append(entry['key'])
info = f"{app_name}: Configlet Assign: to Device {dev['fqdn']}"
info_preview = '<b>Configlet Assign:</b> to Device' + dev['fqdn']
data = {'data': [{'info': info,
'infoPreview': info_preview,
'note': '',
'action': 'associate',
'nodeType': 'configlet',
'nodeId': '',
'configletList': ckeys,
'configletNamesList': cnames,
'ignoreConfigletNamesList': [],
'ignoreConfigletList': [],
'configletBuilderList': [],
'configletBuilderNamesList': [],
'ignoreConfigletBuilderList': [],
'ignoreConfigletBuilderNamesList': [],
'toId': dev['systemMacAddress'],
'toIdType': 'netelement',
'fromId': '',
'nodeName': '',
'fromName': '',
'toName': dev['fqdn'],
'nodeIpAddress': dev['ipAddress'],
'nodeTargetIpAddress': dev['ipAddress'],
'childTasks': [],
'parentTask': ''}]}
if validate:
validation_result = self.validate_configlets_for_device(dev['systemMacAddress'], ckeys)
data['data'][0].update(
{
"configCompareCount":
{
"mismatch": validation_result['mismatch'],
"reconcile": validation_result['reconcile'],
"new": validation_result['new']
}
}
)
self.log.debug(f"apply_configlets_to_device: saveTopology data:\n{data['data']}")
self._add_temp_action(data)
if create_task:
return self._save_topology_v2([])
return None
# pylint: disable=too-many-locals
def remove_configlets_from_device(self, app_name, dev, del_configlets,
create_task=True, validate=False):
''' Remove the configlets from the device.
Args:
app_name (str): The application name to use in info field.
dev (dict): The switch device dict
del_configlets (list): List of configlet name and key pairs
create_task (bool): Determines whether or not to execute a save
and create the tasks (if any)
validate (bool): Defaults to False. If set to True, the function
will validate and compare the configlets to be attached and
populate the configCompareCount field in the data dict. In case
all keys are 0, ie there is no difference between designed-config
and running-config after applying the configlets, no task will be
generated.
Returns:
response (dict): A dict that contains a status and a list of
task ids created (if any).
Ex: {u'data': {u'status': u'success', u'taskIds': [u'35']}}
'''
self.log.debug(f"remove_configlets_from_device: dev: {dev} names: {del_configlets}")
# Get all the configlets assigned to the device.
configlets = self.get_configlets_by_device_id(dev['systemMacAddress'])
# Get a list of the names and keys of the configlets. Do not add
# configlets that are on the delete list.
keep_names = []
keep_keys = []
for configlet in configlets:
key = configlet['key']
if next((ent for ent in del_configlets if ent['key'] == key),
None) is None:
keep_names.append(configlet['name'])
keep_keys.append(key)
# Remove the names and keys of the configlets to keep and build a
# list of the configlets to remove.
del_names = []
del_keys = []
for entry in del_configlets:
del_names.append(entry['name'])
del_keys.append(entry['key'])
info = f"{app_name} Configlet Remove: from Device {dev['fqdn']}"
info_preview = '<b>Configlet Remove:</b> from Device' + dev['fqdn']
data = {'data': [{'info': info,
'infoPreview': info_preview,
'note': '',
'action': 'associate',
'nodeType': 'configlet',
'nodeId': '',
'configletList': keep_keys,
'configletNamesList': keep_names,
'ignoreConfigletNamesList': del_names,
'ignoreConfigletList': del_keys,
'configletBuilderList': [],
'configletBuilderNamesList': [],
'ignoreConfigletBuilderList': [],
'ignoreConfigletBuilderNamesList': [],
'toId': dev['systemMacAddress'],
'toIdType': 'netelement',
'fromId': '',
'nodeName': '',
'fromName': '',
'toName': dev['fqdn'],
'nodeIpAddress': dev['ipAddress'],
'nodeTargetIpAddress': dev['ipAddress'],
'childTasks': [],
'parentTask': ''}]}
if validate:
validation_result = self.validate_configlets_for_device(dev['systemMacAddress'],
keep_keys)
data['data'][0].update(
{
"configCompareCount": {
"mismatch": validation_result['mismatch'],
"reconcile": validation_result['reconcile'],
"new": validation_result['new']
}
}
)
self.log.debug(f"remove_configlets_from_device: saveTopology data:\n{data['data']}")
self._add_temp_action(data)
if create_task:
return self._save_topology_v2([])
return None
def apply_configlets_to_container(self, app_name, container,
new_configlets, create_task=True):
''' Apply the configlets to the container.
Args:
app_name (str): The application name to use in info field.
container (dict): The container dict
new_configlets (list): List of configlet name and key pairs
create_task (bool): Determines whether or not to execute a save
and create the tasks (if any)
Returns:
response (dict): A dict that contains a status and a list of
task ids created (if any).
Ex: {u'data': {u'status': u'success', u'taskIds': [u'32']}}
'''
self.log.debug(f"apply_configlets_to_container: container: {container}"
f" names: {new_configlets}")
# Get all the configlets assigned to the device.
configlets = self.get_configlets_by_container_id(container['key'])
# Get a list of the names and keys of the configlets
# Static Configlets
cnames = []
ckeys = []
# ConfigletBuilder Configlets
bnames = []
bkeys = []
if configlets['configletList']:
for configlet in configlets['configletList']:
if configlet['type'] == 'Static':
cnames.append(configlet['name'])
ckeys.append(configlet['key'])
elif configlet['type'] == 'Builder':
bnames.append(configlet['name'])
bkeys.append(configlet['key'])
# Add the new configlets to the end of the arrays
for entry in new_configlets:
cnames.append(entry['name'])
ckeys.append(entry['key'])
info = f"{app_name}: Configlet Assign: to Container {container['name']}"
info_preview = '<b>Configlet Assign:</b> to Container' + container[
'name']
data = {'data': [{'info': info,
'infoPreview': info_preview,
'note': '',
'action': 'associate',
'nodeType': 'configlet',
'nodeId': '',
'configletList': ckeys,
'configletNamesList': cnames,
'ignoreConfigletNamesList': [],
'ignoreConfigletList': [],
'configletBuilderList': bkeys,
'configletBuilderNamesList': bnames,
'ignoreConfigletBuilderList': [],
'ignoreConfigletBuilderNamesList': [],
'toId': container['key'],
'toIdType': 'container',
'fromId': '',
'nodeName': '',
'fromName': '',
'toName': container['name'],
'nodeIpAddress': '',
'nodeTargetIpAddress': '',
'childTasks': [],
'parentTask': ''}]}
self.log.debug(f"apply_configlets_to_container: saveTopology data:\n{data['data']}")
self._add_temp_action(data)
if create_task:
return self._save_topology_v2([])
return data
# pylint: disable=too-many-locals
# pylint: disable=invalid-name
def remove_configlets_from_container(self, app_name, container,
del_configlets, create_task=True):
''' Remove the configlets from the container.
Args:
app_name (str): The application name to use in info field.
container (dict): The container dict
del_configlets (list): List of configlet name and key pairs
create_task (bool): Determines whether or not to execute a save
and create the tasks (if any)
Returns:
response (dict): A dict that contains a status and a list of
task ids created (if any).
Ex: {u'data': {u'status': u'success', u'taskIds': [u'35']}}
'''
self.log.debug(f"remove_configlets_from_container: container: {container}"
f" names: {del_configlets}")
# Get all the configlets assigned to the device.
configlets = self.get_configlets_by_container_id(container['key'])
# Get a list of the names and keys of the configlets. Do not add
# configlets that are on the delete list.
keep_names = []
keep_keys = []
for configlet in configlets['configletList']:
key = configlet['key']
if next((ent for ent in del_configlets if ent['key'] == key),
None) is None:
keep_names.append(configlet['name'])
keep_keys.append(key)
# Remove the names and keys of the configlets to keep and build a
# list of the configlets to remove.
del_names = []
del_keys = []
for entry in del_configlets:
del_names.append(entry['name'])
del_keys.append(entry['key'])
info = f"{app_name} Configlet Remove: from Container {container['name']}"
info_preview = '<b>Configlet Remove:</b> from Container' + container[
'name']
data = {'data': [{'info': info,
'infoPreview': info_preview,
'note': '',
'action': 'associate',
'nodeType': 'configlet',
'nodeId': '',
'configletList': keep_keys,
'configletNamesList': keep_names,
'ignoreConfigletNamesList': del_names,
'ignoreConfigletList': del_keys,
'configletBuilderList': [],
'configletBuilderNamesList': [],
'ignoreConfigletBuilderList': [],
'ignoreConfigletBuilderNamesList': [],
'toId': container['key'],
'toIdType': 'container',
'fromId': '',
'nodeName': '',
'fromName': '',
'toName': container['name'],
'nodeIpAddress': '',
'nodeTargetIpAddress': '',
'childTasks': [],
'parentTask': ''}]}
self.log.debug(f"remove_configlets_from_container: saveTopology data:\n{data['data']}")
self._add_temp_action(data)
if create_task:
return self._save_topology_v2([])
return data
def validate_configlets_for_device(self, mac, configlet_keys,
page_type='viewConfig'):
''' Validate and compare configlets for device.
Args:
mac (str): MAC address of device to validate configlets for.
configlet_keys (list): List of configlet keys
page_type (list): Possible Values of pageType - 'viewConfig',
'managementIPValidation', 'validateConfig', etc...
Returns:
response (dict): A dict that contains ...
Ex: {"reconciledConfig": {...},
"reconcile": 0,
"new": 0,
"designedConfig": [{...}],
"total": 0,
"runningConfig": [{...}],
"isReconcileInvoked": true,
"mismatch": 0,
"warnings": [""],
"errors": [{"configletLineNo": 0,
"error": "string",
"configletId": "string"}, ...]
}
'''
self.log.debug(f"validate_configlets_for_device: "
f"MAC: {mac} - conf keys: {configlet_keys} - page_type: {page_type}")
data = {'configIdList': configlet_keys,
'netElementId': mac,
'pageType': page_type}
return self.clnt.post(
'/provisioning/v2/validateAndCompareConfiglets.do',
data=data, timeout=self.request_timeout)
def get_applied_devices(self, configlet_name, start=0, end=0):
''' Returns a list of devices to which the named configlet is applied.
Args:
configlet_name (str): The name of the configlet to be queried.
start (int): Start index for the pagination. Default is 0.
end (int): End index for the pagination. If end index is 0
then all the records will be returned. Default is 0.
'''
return self.clnt.get(f"/configlet/getAppliedDevices.do?"
f"configletName={configlet_name}&startIndex={start}&endIndex={end}",
timeout=self.request_timeout)
def get_applied_containers(self, configlet_name, start=0, end=0):
''' Returns a list of containers to which the named
configlet is applied.
Args:
configlet_name (str): The name of the configlet to be queried.
start (int): Start index for the pagination. Default is 0.
end (int): End index for the pagination. If end index is 0
then all the records will be returned. Default is 0.
'''
return self.clnt.get(f"/configlet/getAppliedContainers.do?"
f"configletName={configlet_name}&startIndex={start}&endIndex={end}",
timeout=self.request_timeout)
# pylint: disable=too-many-arguments
def _container_op(self, container_name, container_key, parent_name,
parent_key, operation):
''' Perform the operation on the container.
Args:
container_name (str): Container name
container_key (str): Container key, can be empty for add.
parent_name (str): Parent container name
parent_key (str): Parent container key
operation (str): Container operation 'add' or 'delete'.
Returns:
response (dict): A dict that contains a status and a list of
task ids created (if any).
Ex: {u'data': {u'status': u'success', u'taskIds': []}}
'''
msg = (f"{operation} container {container_name} under container {parent_name}")
data = {'data': [{'info': msg,
'infoPreview': msg,
'action': operation,
'nodeType': 'container',
'nodeId': container_key,
'toId': '',
'fromId': '',
'nodeName': container_name,
'fromName': '',
'toName': '',
'childTasks': [],
'parentTask': '',
'toIdType': 'container'}]}
if operation == 'add':
data['data'][0]['toId'] = parent_key
data['data'][0]['toName'] = parent_name
elif operation == 'delete':
data['data'][0]['fromId'] = parent_key
data['data'][0]['fromName'] = parent_name
# Perform the container operation
self._add_temp_action(data)
return self._save_topology_v2([])
def add_container(self, container_name, parent_name, parent_key):
''' Add the container to the specified parent.
Args:
container_name (str): Container name
parent_name (str): Parent container name
parent_key (str): Parent container key
Returns:
response (dict): A dict that contains a status and a list of
task ids created (if any).
Ex: {u'data': {u'status': u'success', u'taskIds': []}}
'''
self.log.debug(f"add_container: container: {container_name}"
f" parent: {parent_name} parent_key: {parent_key}")
return self._container_op(container_name, 'new_container', parent_name,
parent_key, 'add')
def delete_container(self, container_name, container_key, parent_name,
parent_key):
''' Add the container to the specified parent.
Args:
container_name (str): Container name
container_key (str): Container key
parent_name (str): Parent container name
parent_key (str): Parent container key
Returns:
response (dict): A dict that contains a status and a list of
task ids created (if any).
Ex: {u'data': {u'status': u'success', u'taskIds': []}}
'''
self.log.debug(f"delete_container: container: {container_name}"
f" container_key: {container_key} parent: {parent_name}"
f" parent_key: {parent_key}")
resp = self._container_op(container_name, container_key, parent_name,
parent_key, 'delete')
# As of CVP version 2020.1 the addTempAction.do API endpoint stopped
# raising an Error when attempting to delete a container with children.
# To account for this try to see if the container being deleted
# still exists after the attempted delete. If it still exists
# raise an error similar to how CVP behaved prior to CVP 2020.1
try:
still_exists = self.get_container_by_id(container_key)
except CvpApiError as error:
if 'Invalid Container id' in error.msg:
return resp
raise
if still_exists is not None:
raise CvpApiError('Container was not deleted. Check for children')
return resp
def get_parent_container_for_device(self, dev_mac):
''' Add the container to the specified parent.
Args:
dev_mac (str): Device mac address
Returns:
response (dict): A dict that contains the parent container info
'''
self.log.debug(f"get_parent_container_for_device: called for {dev_mac}")
data = self.clnt.get(f"/provisioning/searchTopology.do?"
f"queryParam={dev_mac}&startIndex=0&endIndex=0",
timeout=self.request_timeout)
if data['total'] > 0:
cont_name = data['netElementContainerList'][0]['containerName']
return self.get_container_by_name(cont_name)
return None
def move_device_to_container(self, app_name, device, container,
create_task=True):
''' Add the container to the specified parent.
Args:
app_name (str): String to specify info/signifier of calling app
device (dict): Device info
container (dict): Container info
create_task (bool): Determines whether or not to execute a save
and create the tasks (if any)
Returns:
response (dict): A dict that contains a status and a list of
task ids created (if any).
Ex: {u'data': {u'status': u'success', u'taskIds': []}}
'''
info = f"Device Add {device['fqdn']} to container {container['name']} by {app_name}"
self.log.debug(f"Attempting to move device {device['fqdn']} to"
f" container {container['name']}")
if 'parentContainerId' in device:
from_id = device['parentContainerId']
else:
parent_cont = self.get_parent_container_for_device(device['key'])
from_id = parent_cont['key']
data = {'data': [{'info': info,
'infoPreview': info,
'action': 'update',
'nodeType': 'netelement',
'nodeId': device['key'],
'toId': container['key'],
'fromId': from_id,
'nodeName': device['fqdn'],
'toName': container['name'],
'toIdType': 'container',
'childTasks': [],
'parentTask': ''}]}
try:
self._add_temp_action(data)
# pylint: disable=invalid-name
except CvpApiError as e:
if 'Data already exists' in str(e):
self.log.debug(f"Device {device['fqdn']} already in container {container}")
if create_task:
return self._save_topology_v2([])
return None
def search_topology(self, query, start=0, end=0):
''' Search the topology for items matching the query parameter.
Args:
query (str): Query parameter which is the name of the container
or device.
start (int): Start index for the pagination. Default is 0.
end (int): End index for the pagination. If end index is 0
then all the records will be returned. Default is 0.
Returns:
response (dict): A dict that contains the container and
netelement lists.
'''
self.log.debug(f"search_topology: query: {query} start: {start} end: {end}")
if self.clnt.apiversion is None:
self.get_cvp_info()
if self.clnt.apiversion <= 6.0:
# Original search topology endpoint
req_url = (f"/provisioning/searchTopology.do?queryParam={qplus(query)}&"
f"startIndex={start}&endIndex={end}")
else:
# Newer CVP versions should use the V3 version of search topology endpoint
req_url = (f"/provisioning/v3/searchTopology.do?queryParam={qplus(query)}&"
f"startIndex={start}&endIndex={end}")
data = self.clnt.get(req_url, timeout=self.request_timeout)
if 'netElementList' in data:
for device in data['netElementList']:
device['status'] = device['deviceStatus']
device['parentContainerKey'] = device['parentContainerId']
if 'isMLAGEnabled' in device:
# original key was mlagEnabled but it changed to isMLAGEnabled
device['mlagEnabled'] = device['isMLAGEnabled']
elif 'mlagEnabled' in device:
# Key for V3 search topology changes back to mlagEnabled again
device['isMLAGEnabled'] = device['mlagEnabled']
# Key isDANZEnabled for V3 search topology is no longer in return data.
device['danzEnabled'] = device.get('isDANZEnabled', '')
# Key bootupTimeStamp for V3 search topology is no longer in return data.
device['bootupTimestamp'] = device.get('bootupTimeStamp', '')
# Key internalBuildId for V3 search topology is no longer in return data.
device['internalBuild'] = device.get('internalBuildId', '')
return data
def filter_topology(self, node_id='root', fmt='topology',
start=0, end=0):
''' Filter the CVP topology for container and device information.
Args:
node_id (str): The container key to base the filter in.
Default is 'root', for the Tenant container.
fmt (str): The type of filter to return. Must be either
'topology' or 'list'. Default is 'topology'.
start (int): Start index for the pagination. Default is 0.
end (int): End index for the pagination. If end index is 0
then all the records will be returned. Default is 0.
'''
url = (f"/provisioning/filterTopology.do?nodeId={node_id}&"
f"format={fmt}&startIndex={start}&endIndex={end}")
return self.clnt.get(url, timeout=self.request_timeout)
def check_compliance(self, node_key, node_type):
''' Check that a device is in compliance, that is the configlets
applied to the device match the devices running configuration.
Args:
node_key (str): The device key. This is the device MAC address
Example: ff:ff:ff:ff:ff:ff
node_type (str): The device type. This is either 'netelement'
or 'container'
Returns:
response (dict): A dict that contains the results of the
compliance check.
'''
self.log.debug(f"check_compliance: node_key: {node_key} node_type: {node_type}")
data = {'nodeId': node_key, 'nodeType': node_type}
resp = self.clnt.post('/provisioning/checkCompliance.do', data=data,
timeout=self.request_timeout)
if self.clnt.apiversion is None:
self.get_cvp_info()
if self.clnt.apiversion >= 2.0:
if resp['complianceIndication'] == '':
resp['complianceIndication'] = 'NONE'
return resp
def get_event_by_id(self, e_id):
''' Return information on the requested event ID.
Args:
e_id (str): The event id to be queried.
'''
return self.clnt.get(f"/event/getEventById.do?eventId={e_id}",
timeout=self.request_timeout)
def get_default_snapshot_template(self):
''' Return the default snapshot template.
'''
if self.clnt.apiversion is None:
self.get_cvp_info()
if self.clnt.apiversion == 1.0:
self.log.debug('v1 Inventory API Call')
url = ('/snapshot/getDefaultSnapshotTemplate.do?'
'startIndex=0&endIndex=0')
return self.clnt.get(url, timeout=self.request_timeout)
self.log.debug('v2 Inventory API Call')
self.log.debug('API getDefaultSnapshotTemplate.do'
' deprecated for CVP 2018.2 and beyond')
return None
# pylint: disable=invalid-name
def capture_container_level_snapshot(self, template_key, container_key):
''' Initialize a container level snapshot event.
Args:
template_key (str): The snapshot template key to be used for
the snapshots.
container_key (str): The container key to start the
snapshots on.
'''
if self.clnt.apiversion is None:
self.get_cvp_info()
if self.clnt.apiversion == 1.0:
self.log.debug('v1 Inventory API Call')
data = {
'templateId': template_key,
'containerId': container_key,
}
return self.clnt.post('/snapshot/captureContainerLevelSnapshot.do',
data=data, timeout=self.request_timeout)
self.log.debug('v2 Inventory API Call')
self.log.debug('API captureContainerLevelSnapshot.do'
' deprecated for CVP 2018.2 and beyond')
return None
def add_image(self, filepath):
''' Add an image to a CVP cluster.
Args:
filepath (str): Local path to the image to upload.
Returns:
data (dict): Dictionary of image add data.
'''
# Get the absolute file path to be uploaded
image_path = os.path.abspath(filepath)
with open(image_path, 'rb') as image_data:
response = self.clnt.post('/image/addImage.do',
files={'file': image_data})
return response
def cancel_image(self, image_name):
''' Discard/cancel the uploaded image/image bundle before save.
Args:
image_name (string): Name of image to cancel/discard.
Returns:
data (dict): Success or error message.
'''
image_data = {'data': image_name}
return self.clnt.post('/image/cancelImages.do', data=image_data,
timeout=self.request_timeout)
def get_images(self, start=0, end=0):
''' Return a list of all images.
Args:
start (int): Start index for the pagination. Default is 0.
end (int): End index for the pagination. If end index is 0
then all the records will be returned. Default is 0.
Returns:
images (dict): The 'total' key contains the number of images,
the 'data' key contains a list of images and their info.
'''
self.log.debug('Get info about images')
return self.clnt.get(f"/image/getImages.do?queryparam=&startIndex={start}&"
f"endIndex={end}", timeout=self.request_timeout)
def get_image_bundles(self, start=0, end=0):
''' Return a list of all image bundles.
Args:
start (int): Start index for the pagination. Default is 0.
end (int): End index for the pagination. If end index is 0
then all the records will be returned. Default is 0.
Returns:
image bundles (dict): The 'total' key contains the number of
image bundles, the 'data' key contains a list of image
bundles and their info.
'''
self.log.debug('Get image bundles that can be applied to devices or'
' containers')
return self.clnt.get(f"/image/getImageBundles.do?queryparam=&"
f"startIndex={start}&endIndex={end}",
timeout=self.request_timeout)
def get_image_bundle_by_name(self, name):
''' Return a dict of info about an image bundle.
Args:
name (str): Name of image bundle to return info about.
Returns:
image bundle (dict): Dict of info specific to the image bundle
requested or None if the name requested doesn't exist.
'''
self.log.debug(f"Attempt to get image bundle {name}")
try:
image = self.clnt.get(f"/image/getImageBundleByName.do?name={qplus(name)}",
timeout=self.request_timeout)
except CvpApiError as error:
# Catch an invalid task_id error and return None
if 'Entity does not exist' in str(error):
self.log.debug(f"Bundle with name {name} does not exist")
return None
raise error
return image
def delete_image_bundle(self, image_key, image_name):
''' Delete image bundle
Args:
image_key (str): The key of the image bundle to be deleted.
image_name (str): The name of the image bundle to be deleted.
'''
bundle_data = {
'data': [{'key': image_key,
'name': image_name}]
}
return self.clnt.post('/image/deleteImageBundles.do', data=bundle_data,
timeout=self.request_timeout)
def save_image_bundle(self, name, images, certified=True):
''' Save an image bundle to a cluster.
Args:
name (str): The name of the image bundle to be saved.
images (list): A list of image names to include in the bundle.
certified (bool): Whether the image bundle is certified or
not. Default is True.
'''
certified_image = 'true' if certified else 'false'
data = {
'name': name,
'isCertifiedImage': certified_image,
'images': images,
}
return self.clnt.post('/image/saveImageBundle.do', data=data,
timeout=self.request_timeout)
def update_image_bundle(self, bundle_id, name, images, certified=True):
''' Update an existing image bundle
'''
certified_image = 'true' if certified else 'false'
data = {
'id': bundle_id,
'name': name,
'isCertifiedImage': certified_image,
'images': images,
}
return self.clnt.post('/image/updateImageBundle.do', data=data,
timeout=self.request_timeout)
def apply_image_to_device(self, image, device, create_task=True):
''' Apply an image bundle to a device
Args:
image (dict): The image info.
device (dict): Info about device to apply image to.
create_task (bool): Determines whether or not to execute a save
and create the tasks (if any)
Returns:
response (dict): A dict that contains a status and a list of
task ids created (if any). Image updates will not run until
task or tasks are executed.
Ex: {u'data': {u'status': u'success', u'taskIds': [u'32']}}
'''
return self.apply_image_to_element(image, device, device['fqdn'],
'netelement', create_task)
def apply_image_to_container(self, image, container, create_task=True):
''' Apply an image bundle to a container
Args:
image (dict): The image info.
container (dict): Info about container to apply image to.
create_task (bool): Determines whether or not to execute a save
and create the tasks (if any)
Returns:
response (dict): A dict that contains a status and a list of
task ids created (if any). Image updates will not run until
task or tasks are executed.
Ex: {u'data': {u'status': u'success', u'taskIds': [u'32']}}
'''
return self.apply_image_to_element(image, container, container['name'],
'container', create_task)
def apply_image_to_element(self, image, element, name, id_type,
create_task=True):
''' Apply an image bundle to a device or container.
Args:
image (dict): The image info.
element (dict): Info about element to apply image to. Dict
can contain device info or container info.
name (str): Name of element image is being applied to.
id_type (str): Id type of element image is being applied to.
create_task (bool): Determines whether or not to execute a save
and create the tasks (if any)
Returns:
response (dict): A dict that contains a status and a list of
task ids created (if any). Image updates will not run until
task or tasks are executed.
Ex: {u'data': {u'status': u'success', u'taskIds': [u'32']}}
'''
self.log.debug(f"Attempt to apply {image['name']} to {id_type} {name}")
info = f"Apply image: {image['name']} to {id_type} {name}"
node_id = ''
if 'imageBundleKeys' in image:
if image['imageBundleKeys']:
node_id = image['imageBundleKeys'][0]
self.log.info(f"Provided image is an image object."
f" Using first value from imageBundleKeys - {node_id}")
if 'id' in image:
node_id = image['id']
self.log.info(f"Provided image is an image bundle object."
f" Found v1 API id field - {node_id}")
elif 'key' in image:
node_id = image['key']
self.log.info(f"Provided image is an image bundle object."
f" Found v2 API key field - {node_id}")
data = {'data': [{'info': info,
'infoPreview': info,
'note': '',
'action': 'associate',
'nodeType': 'imagebundle',
'nodeId': node_id,
'toId': element['key'],
'toIdType': id_type,
'fromId': '',
'nodeName': image['name'],
'fromName': '',
'toName': name,
'childTasks': [],
'parentTask': ''}]}
self._add_temp_action(data)
if create_task:
return self._save_topology_v2([])
return None
def remove_image_from_device(self, image, device):
''' Remove the image bundle from the specified device.
Args:
image (dict): The image info.
device (dict): The device info.
Returns:
response (dict): A dict that contains a status and a list of
task ids created (if any).
Ex: {u'data': {u'status': u'success', u'taskIds': [u'32']}}
'''
return self.remove_image_from_element(image, device, device['fqdn'],
'netelement')
def remove_image_from_container(self, image, container):
''' Remove the image bundle from the specified container.
Args:
image (dict): The image info.
container (dict): The container info.
Returns:
response (dict): A dict that contains a status and a list of
task ids created (if any).
Ex: {u'data': {u'status': u'success', u'taskIds': [u'32']}}
'''
return self.remove_image_from_element(image, container,
container['name'], 'container')
def remove_image_from_element(self, image, element, name, id_type):
''' Remove the image bundle from the specified container.
Args:
image (dict): The image info.
element (dict): The container info.
name (): name.
id_type (): type.
Returns:
response (dict): A dict that contains a status and a list of
task ids created (if any).
Ex: {u'data': {u'status': u'success', u'taskIds': [u'32']}}
'''
self.log.debug(f"Attempt to remove {image['name']} from {name}")
info = f"Remove image: {image['name']} from {name}"
node_id = ''
if 'imageBundleKeys' in image:
if image['imageBundleKeys']:
node_id = image['imageBundleKeys'][0]
self.log.info(f"Provided image is an image object."
f" Using first value from imageBundleKeys - {node_id}")
if 'id' in image:
node_id = image['id']
self.log.info(f"Provided image is an image bundle object."
f" Found v1 API id field - {node_id}")
elif 'key' in image:
node_id = image['key']
self.log.info(f"Provided image is an image bundle object."
f" Found v2 API key field - {node_id}")
data = {'data': [{'info': info,
'infoPreview': info,
'note': '',
'action': 'associate',
'nodeType': 'imagebundle',
'nodeId': '',
'toId': element['key'],
'toIdType': id_type,
'fromId': '',
'nodeName': '',
'fromName': '',
'toName': name,
'ignoreNodeId': node_id,
'ignoreNodeName': image['name'],
'childTasks': [],
'parentTask': ''}]}
self._add_temp_action(data)
return self._save_topology_v2([])
def get_change_controls(self, query='', start=0, end=0):
''' Returns a list of change controls.
Args:
query (str): Query to look for in change control names
start (int): Start index for the pagination. Default is 0.
end (int): End index for the pagination. If end index is 0
then all the records will be returned. Default is 0.
Returns:
change controls (list): The list of change controls
'''
self.log.debug(f"get_change_controls: query: {query}")
if self.clnt.apiversion is None:
self.get_cvp_info()
if self.clnt.apiversion >= 3.0:
self.log.debug('v3/v4/v5 getChangeControls API Call')
self.log.warning(
'get_change_controls: change control APIs moved for v3/v4/v5')
return None
self.log.debug('v2 getChangeControls API Call')
data = self.clnt.get(
f"/changeControl/getChangeControls.do?searchText={qplus(query)}"
f"&startIndex={start}&endIndex={end}",
timeout=self.request_timeout)
if 'data' not in data:
return None
return data['data']
def change_control_available_tasks(self, query='', start=0, end=0):
''' Returns a list of tasks that are available for a change control.
Args:
query (str): Query to look for in task
start (int): Start index for the pagination. Default is 0.
end (int): End index for the pagination. If end index is 0
then all the records will be returned. Default is 0.
Returns:
tasks (list): The list of available tasks
'''
self.log.debug(f"change_control_available_tasks: query: {query}")
if self.clnt.apiversion is None:
self.get_cvp_info()
if self.clnt.apiversion >= 3.0:
self.log.debug(
'v3/v4/v5 uses existing get_task_by_status API Call')
return self.get_tasks_by_status('PENDING')
self.log.debug('v2 getTasksByStatus API Call')
data = self.clnt.get(
f"/changeControl/getTasksByStatus.do?searchText={qplus(query)}"
f"&startIndex={start}&endIndex={end}",
timeout=self.request_timeout)
if 'data' not in data:
return None
return data['data']
def create_change_control(self, name, change_control_tasks, timezone,
country_id, date_time, snapshot_template_key='',
change_control_type='Custom',
stop_on_error='false'):
''' Create change control with provided information and return
change control ID.
Args:
name (string): The name for the new change control.
change_control_tasks (list): A list of key value pairs where
the key is the Task ID and the value is the task order
as an integer.
Ex: [{'taskId': '100', 'taskOrder': 1},
{'taskId': '101', 'taskOrder': 1},
{'taskId': '102', 'taskOrder': 2}]
timezone (string): The timezone as a string.
Ex: "America/New_York"
country_id (string): The country ID.
Ex: "United States"
date_time (string): The date and time for execution.
Time is military time format.
Ex: "2018-08-22 11:30"
snapshot_template_key (string): ???
change_control_type (string): The type of change control being
created. Options are "Custom" or "Rollback".
stop_on_error (string): String representation of a boolean
to set whether this change control will stop if an error is
encountered in one of its tasks.
Returns:
response (dict): A dict that contains...
Ex: {"data": "success", "ccId": "4"}
'''
self.log.debug('create_change_control')
# {
# "timeZone": "America/New_York",
# "countryId": "United States",
# "dateTime": "2018-08-22 11:30",
# "ccName": "test2",
# "snapshotTemplateKey": "",
# "type": "Custom",
# "stopOnError": "false",
# "deletedTaskIds": [],
# "changeControlTasks": [
# {
# "taskId": "126",
# "taskOrder": 1,
# "snapshotTemplateKey": "",
# "clonedCcId": ""
# }
# ]
# }
if self.clnt.apiversion is None:
self.get_cvp_info()
if self.clnt.apiversion >= 3.0:
self.log.debug('v3/v4/v5 addOrUpdateChangeControl API Call')
self.log.warning('create_change_control:'
' change control APIs moved for v3/v4/v5')
return None
self.log.debug('v2 addOrUpdateChangeControl API Call')
task_data_list = []
for task_info in change_control_tasks:
task_list_entry = {'taskId': task_info['taskId'],
'taskOrder': task_info['taskOrder'],
'snapshotTemplateKey': snapshot_template_key,
'clonedCcId': ''}
task_data_list.append(task_list_entry)
data = {'timeZone': timezone,
'countryId': country_id,
'dateTime': date_time,
'ccName': name,
'snapshotTemplateKey': snapshot_template_key,
'type': change_control_type,
'stopOnError': stop_on_error,
'deletedTaskIds': [],
'changeControlTasks': task_data_list}
return self.clnt.post('/changeControl/addOrUpdateChangeControl.do',
data=data, timeout=self.request_timeout)
def create_change_control_v3(self, cc_id, name, tasks, sequential=True):
''' Create change control with provided information and return
change control ID.
Args:
cc_id (string): The ID for the new change control.
name (string): The name for the new change control.
tasks (list): A list of Task IDs as strings
Ex: ['10', '11', '12']
sequential (bool): A flag for running tasks sequentially or
in parallel. Defaults to True for running sequentially.
Returns:
response (dict): A dict that contains...
Ex: [{u'id': u'cc_id',
u'update_timestamp': u'...'}]
'''
self.log.debug('create_change_control_v3')
if self.clnt.apiversion is None:
self.get_cvp_info()
if self.clnt.apiversion < 3.0:
self.log.debug(f"Wrong method for API version {self.clnt.apiversion}."
f" Use create_change_control method")
self.log.warning('create_change_control_v3:'
' Use old change control APIs for old versions')
return None
self.log.debug('v3 Update change control API Call')
stages = []
if sequential:
for index, task in enumerate(tasks):
stage_id = f"stage{index}"
stage = {'stage': [{
'id': stage_id,
'action': {
'name': 'task',
'args': {
'TaskID': task,
}
}
}]}
stages.append(stage)
else:
stage_rows = []
for index, task in enumerate(tasks):
stage_id = f"stage{index}"
stage_row = {
'id': stage_id,
'action': {
'name': 'task',
'args': {
'TaskID': task,
}
}
}
stage_rows.append(stage_row)
stages.append({'stage': stage_rows})
data = {'config': {
'id': cc_id,
'name': name,
'root_stage': {
'id': 'root',
'stage_row': stages,
}
}}
return self.clnt.post('/api/v3/services/ccapi.ChangeControl/Update',
data=data, timeout=self.request_timeout)
def add_notes_to_change_control(self, cc_id, notes):
''' Add provided notes to the specified change control.
Args:
cc_id (string): The id for the change control to add notes to.
notes (string): The notes to add to the change control.
Returns:
response (dict): A dict that contains...
Ex: {"data": "success"}
'''
self.log.debug(f"add_notes_to_change_control: cc_id {cc_id}, notes {notes}")
if self.clnt.apiversion is None:
self.get_cvp_info()
if self.clnt.apiversion >= 3.0:
self.log.debug(
'v3/v4/v5 addNotesToChangeControl API Call deprecated')
self.log.warning('add_notes_to_change_control:'
' change control APIs not supported for v3/v4/v5')
return None
self.log.debug('v2 addNotesToChangeControl API Call')
data = {'ccId': cc_id,
'notes': notes}
return self.clnt.post('/changeControl/addNotesToChangeControl.do',
data=data, timeout=self.request_timeout)
def execute_change_controls(self, cc_ids):
''' Execute the change control indicated by its ccId.
Args:
cc_ids (list): A list of change control IDs to be executed.
'''
if self.clnt.apiversion is None:
self.get_cvp_info()
if self.clnt.apiversion >= 3.0:
self.log.debug(
'v3/v4/v5 /api/v3/services/ccapi.ChangeControl/Start API Call')
for cc_id in cc_ids:
resp_list = []
data = {'cc_id': cc_id}
resp = self.clnt.post(
'/api/v3/services/ccapi.ChangeControl/Start',
data=data, timeout=self.request_timeout)
resp_list.append(resp)
return resp_list
self.log.debug('v2 executeCC API Call')
cc_id_list = [{'ccId': x} for x in cc_ids]
data = {'ccIds': cc_id_list}
return self.clnt.post('/changeControl/executeCC.do', data=data,
timeout=self.request_timeout)
def approve_change_control(self, cc_id, timestamp=None):
''' Cancel the provided change controls.
Args:
cc_id (string): The change control IDs to be approved.
timestamp(string): The change controls timestamp.
'''
if not timestamp:
timestamp = datetime.utcnow().isoformat() + 'Z'
self.log.debug('approve_change_control')
if self.clnt.apiversion is None:
self.get_cvp_info()
if self.clnt.apiversion < 3.0:
self.log.debug(f"Approval methods not valid for API version {self.clnt.apiversion}."
f" Functionality did not exist")
return None
self.log.debug('v3 Approve change control API Call')
data = {'cc_id': cc_id, 'cc_timestamp': timestamp}
return self.clnt.post(
'/api/v3/services/ccapi.ChangeControl/AddApproval',
data=data, timeout=self.request_timeout)
def delete_change_control_approval(self, cc_id):
''' Cancel the provided change controls.
Args:
cc_id (string): The change control IDs to be approved.
'''
self.log.debug('delete_change_control_approval')
if self.clnt.apiversion is None:
self.get_cvp_info()
if self.clnt.apiversion < 3.0:
self.log.debug(f"Approval methods not valid for API version {self.clnt.apiversion}."
f" Functionality did not exist")
return None
self.log.debug('v3 Delete Approval for change control API Call')
data = {'cc_id': cc_id}
return self.clnt.post(
'/api/v3/services/ccapi.ChangeControl/DeleteApproval',
data=data, timeout=self.request_timeout)
def cancel_change_controls(self, cc_ids):
''' Cancel the provided change controls.
Args:
cc_ids (list): A list of change control IDs to be cancelled.
'''
if self.clnt.apiversion is None:
self.get_cvp_info()
if self.clnt.apiversion >= 3.0:
self.log.debug(
'v3/v4/v5 /api/v3/services/ccapi.ChangeControl/Stop API Call')
resp_list = []
for cc_id in cc_ids:
data = {'cc_id': cc_id}
resp = self.clnt.post(
'/api/v3/services/ccapi.ChangeControl/Stop',
data=data, timeout=self.request_timeout)
resp_list.append(resp)
return resp_list
self.log.debug('v2 cancelChangeControl API Call')
data = {'ccIds': cc_ids}
return self.clnt.post('/changeControl/cancelChangeControl.do',
data=data, timeout=self.request_timeout)
def delete_change_controls(self, cc_ids):
''' Delete the provided change controls.
Args:
cc_ids (list): A list of change control IDs to be deleted.
'''
msg = 'Change Control Resource APIs supported from 2021.2.0 or newer.'
if self.cvp_version_compare('>=', 6.0, msg):
self.log.debug(
'v6+ Using Resource API Change Control Delete API Call')
resp_list = []
for cc_id in cc_ids:
resp = self.change_control_delete(cc_id)
resp_list.append(resp)
return resp_list
msg = 'Change Control Service APIs supported from 2019.0.0 to 2021.2.0'
if self.cvp_version_compare('>=', 3.0, msg):
self.log.debug(
'v3/v4/v5 /api/v3/services/ccapi.ChangeControl/Delete'
' API Call')
resp_list = []
for cc_id in cc_ids:
data = {'cc_id': cc_id}
resp = self.clnt.post(
'/api/v3/services/ccapi.ChangeControl/Delete',
data=data, timeout=self.request_timeout)
resp_list.append(resp)
return resp_list
self.log.debug('v2 deleteChangeControl API Call')
data = {'ccIds': cc_ids}
return self.clnt.post('/changeControl/deleteChangeControls.do',
data=data, timeout=self.request_timeout)
def get_change_control_info(self, cc_id):
''' Get the detailed information for a single change control.
Args:
cc_id (string): The id for the change control to be retrieved.
Returns:
response (dict): A dict that contains...
Ex: {'ccId': '4',
'ccName': 'test_api_1541106830',
'changeControlTasks': {'data': [<task data>],
'total': 1},
'classId': 68,
'containerName': '',
'countryId': '',
'createdBy': 'cvpadmin',
'createdTimestamp': 1541106831629,
'dateTime': '',
'deviceCount': 1,
'executedBy': 'cvpadmin',
'executedTimestamp': 1541106831927,
'factoryId': 1,
'id': 68,
'key': '4',
'notes': '',
'postSnapshotEndTime': 0,
'postSnapshotStartTime': 0,
'preSnapshotEndTime': 0,
'preSnapshotStartTime': 0,
'progressStatus': {<status>},
'scheduledBy': '',
'scheduledByPassword': '',
'scheduledTimestamp': 0,
'snapshotTemplateKey': '',
'snapshotTemplateName': None,
'status': 'Inprogress',
'stopOnError': False,
'taskCount': 1,
'taskEndTime': 0,
'taskStartTime': 0,
'timeZone': '',
'type': 'Custom'}
'''
self.log.debug(f"get_change_control_info: {cc_id}")
if self.clnt.apiversion is None:
self.get_cvp_info()
if self.clnt.apiversion >= 3.0:
self.log.debug('get_change_control_info method deprecated for'
' v3/v4/v5. Moved to get_change_control_status')
self.log.warning('get_change_control_info:'
' info change control API moved for v3/v4/v5 to'
' status')
return None
self.log.debug('v2 getChangeControlInformation.do API Call')
try:
resp = self.clnt.get(
f"/changeControl/getChangeControlInformation.do?"
f"startIndex=0&endIndex=0&ccId={cc_id}",
timeout=self.request_timeout)
except CvpApiError as error:
if 'No data found' in error.msg:
return None
raise
return resp
def get_change_control_status(self, cc_id):
''' Get the detailed information for a single change control.
Args:
cc_id (string): The id for the change control to be retrieved.
Returns:
response (dict): A dict that contains...
Ex:
[{u'status': {u'error': u'',
u'id': u'cc_id',
u'stages': {u' ': {
u'error': u'',
u'state': u'Completed'},
u'Task_0_1': {
u'error': u'',
u'state': u'Completed'}
},
u'state': u'Completed'}}]
'''
self.log.debug(f"get_change_control_status: {cc_id}")
if self.clnt.apiversion is None:
self.get_cvp_info()
if self.clnt.apiversion < 3.0:
self.log.debug(f"get_change_control_status method not supported"
f" for API version {self.clnt.apiversion}. Use old"
f" get_change_control_info method")
return None
self.log.debug(
'v3 /api/v3/services/ccapi.ChangeControl/GetStatus API Call')
data = {'cc_id': cc_id}
return self.clnt.post(
'/api/v3/services/ccapi.ChangeControl/GetStatus',
data=data, timeout=self.request_timeout)
def reset_device(self, app_name, device, create_task=True):
''' Reset device by moving it to the Undefined Container.
Args:
app_name (str): String to specify info/signifier of calling app
device (dict): Device info
create_task (bool): Determines whether or not to execute a save
and create the tasks (if any)
Returns:
response (dict): A dict that contains a status and a list of
task ids created (if any).
Ex: {u'data': {u'status': u'success', u'taskIds': []}}
'''
info = (f"App {app_name} resetting device {device['fqdn']} and moving it to Undefined")
self.log.debug(info)
if 'parentContainerId' in device:
from_id = device['parentContainerId']
else:
parent_cont = self.get_parent_container_for_device(device['key'])
if parent_cont and 'key' in parent_cont:
from_id = parent_cont['key']
else:
from_id = ''
data = {'data': [{'info': info,
'infoPreview': info,
'action': 'reset',
'nodeType': 'netelement',
'nodeId': device['key'],
'toId': 'undefined_container',
'fromId': from_id,
'nodeName': device['fqdn'],
'toName': 'Undefined',
'toIdType': 'container',
'childTasks': [],
'parentTask': ''}]}
try:
self._add_temp_action(data)
except CvpApiError as error:
if 'Data already exists' in str(error):
self.log.debug(f"Device {device['fqdn']} already in container Undefined")
if create_task:
return self._save_topology_v2([])
return None
def deploy_device(self, device, container, configlets=None,
image_bundle=None, create_task=True,
app_name='Deploy_device'):
''' Move a device from the undefined container to a target container.
Optionally apply device-specific configlets and an image.
Args:
device (dict): unique key for the device
container (str): name of container to move device to
configlets (list): list of dicts with configlet key/name pairs
image_bundle (str): name of image bundle to apply to device
create_task (boolean): Create task for this deploy device
sequence.
app_name (str): calling application name for logging purposes
Returns:
response (dict): A dict that contains a status and a list of
task ids created (if any).
Ex: {u'data': {u'status': u'success', u'taskIds': [u'32']}}
'''
info = f"Deploy device {device['fqdn']} to container {container}"
self.log.debug(info)
container_info = self.get_container_by_name(container)
# Add action for moving device to specified container
self.move_device_to_container(app_name, device, container_info,
create_task=False)
# Get proposed configlets device will inherit from container it is
# being moved to.
prop_conf = self.clnt.get(f"/provisioning/getTempConfigsByNetElementId."
f"do?netElementId={device['key']}")
new_configlets = prop_conf['proposedConfiglets']
if configlets:
new_configlets.extend(configlets)
self.apply_configlets_to_device('deploy_device', device,
new_configlets, create_task=False)
# Apply image to the device
if image_bundle:
image_bundle_info = self.get_image_bundle_by_name(image_bundle)
self.apply_image_to_device(image_bundle_info, device,
create_task=False)
if create_task:
return self._save_topology_v2([])
return None
def create_enroll_token(self, duration, devices=None):
''' Create TerminAttr enrollment token for device authentication
via certificates.
Args:
devices (list): list of device Serial Numbers for which the
token should be generated. The default is all devices.
duration (string): the token's validity time (max 1 month),
accepted formats for the legacy endpoint: "24h", "86400s", "60m"
accepted format for the new endpoint: "86400s" (only seconds)
Returns:
response (list) on CVaaS: A list that contains the generated
enrollment token.
Ex: [{'enrollmentToken':{'token': <token>, 'groups': [],
'reenrollDevices': <devices list>,
'validFor': <duration e.g 86400s>, 'field_mask': None}}]
response (dict) on CV on-prem: A dictionary that contains the
generated enrollment token.
Ex: {'data': <token>}
'''
endpoint_legacy = '/cvpservice/enroll/createToken'
endpoint = '/api/resources/admin.Enrollment/AddEnrollmentToken'
if not devices:
devices = ["*"]
# For on-prem check the version as it is only supported from 2021.2.0+
if not self.clnt.is_cvaas:
if self.clnt.apiversion is None:
self.get_cvp_info()
# TODO: update this check when 2024.2.0 is released
if self.clnt.apiversion >= 6.0:
self.log.debug('v6 /cvpservice/enroll/createToken')
data = {"reenrollDevices": devices, "duration": duration}
return self.clnt.post(endpoint_legacy, data=data, timeout=self.request_timeout)
self.log.warning(
'Enrollment Tokens only supported on CVP 2021.2.0+')
return None
data = {
"enrollmentToken": {"reenrollDevices": devices,
"validFor": duration}
}
return self.clnt.post(endpoint, data=data, timeout=self.request_timeout)
def get_all_tags(self, element_type='ELEMENT_TYPE_UNSPECIFIED', workspace_id=''):
''' Get all device and/or interface tags from the mainline workspace or all other workspaces
Args:
element_type (str): Can be ELEMENT_TYPE_DEVICE, ELEMENT_TYPE_INTERFACE and
ELEMENT_TYPE_UNSPECIFIED
set to ELEMENT_TYPE_UNSPECIFIED by default which fetches all tags
workspace_id (str): The ID of the workspace, by default it is set to an empty string
which will use the mainline workspace
Returns:
response (dict): A dict that contains a list of key-value tags
'''
msg = 'Tag.V2 Resource APIs are supported from 2021.2.0 or newer.'
# For on-prem check the version as it is only supported from 2021.2.0+
if self.cvp_version_compare('>=', 6.0, msg):
tag_url = '/api/resources/tag/v2/Tag/all'
payload = {
"partialEqFilter": [
{
"key": {
"elementType": element_type,
"workspaceId": workspace_id
}
}
]
}
self.log.debug(f"v6 {tag_url}")
return self.clnt.post(tag_url, data=payload)
return None
def get_tag_edits(self, workspace_id):
''' Show all tags edits in a workspace
Args:
workspace_id: The ID of the workspace
Returns:
response (dict): A dict that contains...
Ex.: {'data': [{'result': {'value': {'key': {'workspaceId': 'testget',
'elementType': 'string', 'label': 'string', 'value': 'string'},
'remove': False}, 'time': 'rfc3339 time', 'type': 'INITIAL'}}]}
'''
msg = 'Tag.V2 Resource APIs are supported from 2021.2.0 or newer.'
# For on-prem check the version as it is only supported from 2021.2.0+
if self.cvp_version_compare('>=', 6.0, msg):
tag_url = '/api/resources/tag/v2/TagConfig/all'
payload = {
"partialEqFilter": [
{
"key": {
"workspace_id": workspace_id
}
}
]
}
self.log.debug('v6 ' + tag_url + ' ' + str(payload))
return self.clnt.post(tag_url, data=payload)
return None
def get_tag_assignment_edits(self, workspace_id):
''' Show all tags assignment edits in a workspace
Args:
workspace_id: The ID of the workspace
Returns:
response (dict): A dict that contains...
Ex: {'result': {'value': {'key': {'workspaceId': 'string', 'elementType': 'string',
'label': 'string', 'value': 'string', 'deviceId': 'string'},
'remove': False}, 'time': 'rfc3339', 'type': 'INITIAL'}}
'''
msg = 'Tag.V2 Resource APIs are supported from 2021.2.0 or newer.'
# For on-prem check the version as it is only supported from 2021.2.0+
if self.cvp_version_compare('>=', 6.0, msg):
tag_url = '/api/resources/tag/v2/TagAssignmentConfig/all'
payload = {
"partialEqFilter": [
{
"key": {
"workspace_id": workspace_id
}
}
]
}
self.log.debug('v6 ' + tag_url + ' ' + str(payload))
return self.clnt.post(tag_url, data=payload)
return None
def tag_config(self, element_type, workspace_id, tag_label, tag_value, remove=False):
''' Create/Delete device or interface tags.
Tag creation with the tag.v2 resource API has to be done within a workspace.
Args:
element_type (str): Can be ELEMENT_TYPE_DEVICE or ELEMENT_TYPE_INTERFACE to
create device and interface tag respectively.
workspace_id(str): The ID of the workspace.
This should be generated by the create_workspace() API call.
tag_label(str): the label of the desired tag
tag_value(str): the value of the desired tag
remove (Boolean): When set to True it will remove the device/interface tag.
When set to False (default) it will create the device/interface tag.
Returns:
response (dict): A dict that contains...
Ex: {'value': {'key': {'workspaceId': 'string', 'elementType': 'string',
'label': 'string', 'value': 'string'}},
'time': 'rfc3339 time'}
'''
msg = 'Tag.V2 Resource APIs are supported from 2021.2.0 or newer.'
# For on-prem check the version as it is only supported from 2021.2.0+
if self.cvp_version_compare('>=', 6.0, msg):
tag_url = '/api/resources/tag/v2/TagConfig'
payload = {
"key": {
"elementType": element_type,
"workspaceId": workspace_id,
"label": tag_label,
"value": tag_value
},
"remove": remove
}
self.log.debug(f"v6 {tag_url} " + str(payload))
return self.clnt.post(tag_url, data=payload)
return None
def tag_assignment_config(self, element_type, workspace_id, tag_label,
tag_value, device_id, interface_id, remove=False):
''' Assign/Unassign device or interface tags.
Tag assignment with the tag.v2 resource API has to be done within a workspace.
Args:
element_type (str): can be ELEMENT_TYPE_DEVICE or ELEMENT_TYPE_INTERFACE to
create device and interface tag respectively
workspace_id(str): the ID of the workspace. This should be generated by
the create_workspace() API call.
tag_label(str): the label of the desired tag
tag_value(str): the value of the desired tag
device_id (str): the Serial Number of the device
interface_id (str): the interface name of the device, e.g.: Ethernet1
remove (Boolean): When set to True it will remove the device/interface
tag assignment.
When set to False (default) it will create the device/interface tag assignment.
Returns:
response (dict): A dict that contains...
Ex: {'value': {'key': {'workspaceId': 'string', 'elementType': 'string',
'label': 'string', 'value': 'string',
'deviceId': 'string', 'interfaceId': 'string'},
'remove': Boolean},'time': 'rfc3339 time'}
'''
msg = 'Tag.V2 Resource APIs are supported from 2021.2.0 or newer.'
# For on-prem check the version as it is only supported from 2021.2.0+
if self.cvp_version_compare('>=', 6.0, msg):
tag_url = '/api/resources/tag/v2/TagAssignmentConfig'
payload = {
"key": {
"elementType": element_type,
"workspaceId": workspace_id,
"label": tag_label,
"value": tag_value,
"deviceId": device_id,
"interfaceId": interface_id
},
"remove": remove
}
self.log.debug(f"v6 {tag_url} " + str(payload))
return self.clnt.post(tag_url, data=payload)
return None
def get_all_workspaces(self):
''' Get state information for all workspaces
Returns:
response (dict): A dict that contains a list of key-values for workspaces
'''
msg = 'Workspace Resource APIs are supported from 2021.2.0 or newer.'
# For on-prem check the version as it is only supported from 2021.2.0+
if self.cvp_version_compare('>=', 6.0, msg):
workspace_url = '/api/resources/workspace/v1/Workspace/all'
payload = {}
self.log.debug(f"v6 {workspace_url}")
return self.clnt.post(workspace_url, data=payload)
return None
def get_workspace(self, workspace_id):
''' Get state information for all workspaces
Returns:
response (dict): A dict that contains a list of key-values for workspaces
'''
msg = 'Workspace Resource APIs are supported from 2021.2.0 or newer.'
# For on-prem check the version as it is only supported from 2021.2.0+
if self.cvp_version_compare('>=', 6.0, msg):
workspace_url = f"/api/resources/workspace/v1/Workspace?key.workspaceId={workspace_id}"
self.log.debug(f"v6 {workspace_url}")
return self.clnt.get(workspace_url)
return None
def workspace_config(self, workspace_id, display_name,
description='', request='REQUEST_UNSPECIFIED',
request_id=''):
''' Create, Build and Submit workspaces.
Args:
workspace_id (str): The (unique) name of the workspace.
Previously used names cannot be used if the workspace was closed or abandoned.
display_name (str): The display name of the workspace.
description (str): The description of the workspace.
request (string): Can have the following values:
- REQUEST_UNSPECIFIED
- REQUEST_START_BUILD
- REQUEST_CANCEL_BUILD
- REQUEST_SUBMIT
- REQUEST_ABANDON
- REQUEST_ROLLBACK
request_id (str): An arbitrary requestId that is required for the
build and submit process.
Returns:
response (dict): A dict that contains...
Ex: {'value': {'key': {'workspaceId': 'string'},
'displayName': 'string','description': 'string',
'requestParams': {'requestId': 'string'}
},
'time': 'rfc3339 time'}
'''
msg = 'Workspace Resource APIs are supported from 2021.2.0 or newer.'
# For on-prem check the version as it is only supported from 2021.2.0+
if self.cvp_version_compare('>=', 6.0, msg):
workspace_url = '/api/resources/workspace/v1/WorkspaceConfig'
payload = {
"key": {
"workspaceId": workspace_id
},
"displayName": display_name,
"description": description,
"request": request,
"requestParams": {
"requestId": request_id
}
}
self.log.debug('v6 ' + str(workspace_url) + ' ' + str(payload))
return self.clnt.post(workspace_url, data=payload)
return None
def workspace_build_status(self, workspace_id, build_id):
''' Verify the state of the workspace build process.
Args:
workspace_id (str): The (unique) name of the workspace.
build_id (str): The buildId of the workspace for which the
build process was requested.
Returns:
response (dict): A dict that contains...
Ex: {'value': {'key': {'workspaceId': 'string', 'buildId': 'string'},
'state': 'BUILD_STATE_SUCCESS', 'buildResults': {'values': ...
'''
msg = 'Workspace Resource APIs are supported from 2021.2.0 or newer.'
# For on-prem check the version as it is only supported from 2021.2.0+
if self.cvp_version_compare('>=', 6.0, msg):
params = f"key.workspaceId={workspace_id}&key.buildId={build_id}"
workspace_url = '/api/resources/workspace/v1/WorkspaceBuild?' + params
self.log.debug(f"v6 {workspace_url + params}")
return self.clnt.get(workspace_url, timeout=self.request_timeout)
return None
def change_control_get_one(self, cc_id, cc_time=None):
''' Get the configuration and status of a change control using Resource APIs.
Supported versions: CVP 2021.2.0 or newer and CVaaS.
Args:
cc_id (str): The ID of the change control.
cc_time (str): Time indicates the time for which you are interested in the data.
If no time is given, the server will use the time at which it makes the request.
The time format is RFC 3339, e.g.: 2021-12-24T11:30:00.00Z.
Returns:
response (dict): A dict that contains...
Ex: {"value":{"key":{"id":"rL6Tog6UU"}, "change":{"name":"Change 20211213_210554",
"rootStageId":"kZUWqyIArD",
"stages":{"values":{"kZUWqyIArD":{"name":"Change 20211213_210554 Root",
"rows":{"values":[{"values":["vazWhKyVRR"]}]}},
"vazWhKyVRR":{"name":"Update Config",
"action":{"name":"task", "timeout":3000,
"args":{"values":{"TaskID":"538"}}}}}},
"notes":"", "time":"2021-12-13T21:05:58.813750128Z", "user":"cvpadmin"},
"approve":{"value":true, "time":"2021-12-13T21:11:26.788753264Z",
"user":"cvpadmin"}}, "time":"2021-12-13T21:11:26.788753264Z"}%
'''
msg = 'Change Control Resource APIs are supported from 2021.2.0 or newer.'
# For on-prem check the version as it is only supported from 2021.2.0+
if self.cvp_version_compare('>=', 6.0, msg):
if cc_time is None:
params = f"key.id={cc_id}"
else:
params = f"key.id={cc_id}&time={cc_time}"
cc_url = '/api/resources/changecontrol/v1/ChangeControl?' + params
self.log.debug(f"v6 {cc_url}")
try:
response = self.clnt.get(cc_url, timeout=self.request_timeout)
except Exception as error:
if 'resource not found' in str(error):
return None
raise error
return response
return None
def change_control_get_all(self):
''' Get the configuration and status of all Change Controls using Resource APIs.
Supported versions: CVP 2021.2.0 or newer and CVaaS.
Returns:
response (dict): A dict that contains a list of all Change Controls.
'''
msg = 'Change Control Resource APIs are supported from 2021.2.0 or newer.'
# For on-prem check the version as it is only supported from 2021.2.0+
if self.cvp_version_compare('>=', 6.0, msg):
cc_url = '/api/resources/changecontrol/v1/ChangeControl/all'
self.log.debug(f"v6 {cc_url}")
return self.clnt.get(cc_url, timeout=self.request_timeout)
return None
def change_control_approval_get_one(self, cc_id, cc_time=None):
''' Get the state of a specific Change Control's approve config using Resource APIs.
Supported versions: CVP 2021.2.0 or newer and CVaaS.
Args:
cc_id (str): The ID of the change control.
cc_time (str): Time indicates the time for which you are interested in the data.
If no time is given, the server will use the time at which it makes the request.
The time format is RFC 3339, e.g.: 2021-12-24T11:30:00.00Z.
Returns:
response (dict): A dict that contains...
Ex: {'value': {'key': {'id': '<CC ID>'}, 'approve': {'value': True},
'version': '2021-12-13T21:05:58.813750128Z'},
'time': '2021-12-13T21:11:26.788753264Z'}
'''
msg = 'Change Control Resource APIs are supported from 2021.2.0 or newer.'
# For on-prem check the version as it is only supported from 2021.2.0+
if self.cvp_version_compare('>=', 6.0, msg):
if cc_time is None:
params = f"key.id={cc_id}"
else:
params = f"key.id={cc_id}&time={cc_time}"
cc_url = '/api/resources/changecontrol/v1/ApproveConfig?' + params
cc_status = self.change_control_get_one(cc_id)
if cc_status is None:
return None
if 'value' in cc_status and 'approve' not in cc_status['value']:
self.log.warning("The change has not been approved yet."
" A change has to be approved at least once for the 'approve'"
" state to be populated.")
return None
return self.clnt.get(cc_url, timeout=self.request_timeout)
return None
def change_control_approval_get_all(self):
''' Get state information for all Change Control Approvals using Resource APIs.
Supported versions: CVP 2021.2.0 or newer and CVaaS.
Returns:
response (dict): A dict that contains a list of all Change Control Approval Configs.
'''
msg = 'Change Control Resource APIs are supported from 2021.2.0 or newer.'
# For on-prem check the version as it is only supported from 2021.2.0+
if self.cvp_version_compare('>=', 6.0, msg):
cc_url = '/api/resources/changecontrol/v1/ApproveConfig/all'
self.log.debug(f"v6 {cc_url}")
return self.clnt.get(cc_url, timeout=self.request_timeout)
return None
def change_control_approve(self, cc_id, notes="", approve=True):
''' Approve/Unapprove a change control using Resource APIs.
Supported versions: CVP 2021.2.0 or newer and CVaaS.
Args:
cc_id (str): The ID of the change control.
notes (str): An optional approval note.
approve (bool): Set to True to approve a change and to False to unapprove a change.
The default is True.
'''
cc_url = '/api/resources/changecontrol/v1/ApproveConfig'
# For on-prem check the version as it is only supported from 2021.2.0+
# Since the get_change_control already checks this, no need to check it again
cc_status = self.change_control_get_one(cc_id)
if cc_status is None:
return None
if ('value' in cc_status and 'change' in cc_status['value'] and
'time' in cc_status['value']['change']):
version = cc_status['value']['change']['time']
else:
self.log.error('The version timestamp was not found in the CC status.')
return None
payload = {
"key": {
"id": cc_id
},
"approve": {
"value": approve,
"notes": notes
},
"version": version
}
return self.clnt.post(cc_url, data=payload, timeout=self.request_timeout)
def change_control_delete(self, cc_id):
''' Delete a pending Change Control using Resource APIs.
Supported versions: CVP 2021.2.0 or newer and CVaaS.
Args:
cc_id (str): The ID of the change control.
'''
msg = 'Change Control Resource APIs are supported from 2021.2.0 or newer.'
# For on-prem check the version as it is only supported from 2021.2.0+
if self.cvp_version_compare('>=', 6.0, msg):
params = f"key.id={cc_id}"
cc_url = '/api/resources/changecontrol/v1/ChangeControlConfig?' + params
self.log.debug(f"v6 {cc_url}")
return self.clnt.delete(cc_url, timeout=self.request_timeout)
return None
def change_control_create_with_custom_stages(self, custom_cc=None):
''' Create a Change Control with custom stage hierarchy using Resource APIs.
Supported versions: CVP 2021.2.0 or newer and CVaaS.
Args:
custom_cc (dict): A dictionary with the entire stage hierarchy.
Ex1: {'key': {'id': '409b94d1-c0cb-4d74-8f88-89f66f13f109'},
'change': {'name': 'Change_20211217_034338',
'notes': 'cvprac CC',
'rootStageId': 'root',
'stages': {'values': {'root': {'name': 'root',
'rows': {'values': [{'values': ['1-2']},
{'values': ['3']}]}},
'1-2': {'name': 'stages 1-2',
'rows': {'values': [{'values': ['1ab']},
{'values': ['2']}]}},
'1a': {'action': {'args': {'values': {'TaskID': '1242'}},
'name': 'task',
'timeout': 3000},
'name': 'stage 1a'},
'1ab': {'name': 'stage 1ab',
'rows': {'values': [{'values': ['1a',
'1b']}]}},
'1b': {'action': {'args': {'values': {'TaskID': '1243'}},
'timeout': 3000},
'name': 'stage 1b'},
'2': {'action': {'args': {'values': {'TaskID': '1240'}},
'name': 'task',
'timeout': 3000},
'name': 'stage 2'},
'3': {'action': {'args': {'values': {'TaskID': '1241'}},
'name': 'task',
'timeout': 3000},
'name': 'stage 3'},
}}}}
The above would result in the following hierarchy:
root (series)
|- stages 1-2 (series)
| |- stage 1ab (parallel)
| | |- stage 1a
| | |- stage 1b
| |- stage 2
|- stage 3
Ex2 (MLAG ISSU):
{'key': {'id': 'PXs9cKimC'},
'change': {'name': 'Change 20211217_040530',
'notes': '',
'rootStageId': 'root',
'stages': {'values': { 'root': {'name': 'Change 20211217_040530 Root',
'rows': {
'values': [{'values': ['left-leafs']}],
}},
'upgrade1': {'action': {
'args': {'values': {'TaskID': '1242'}},
'name': 'task',
'timeout': 3000},
'name': 'Image Upgrade'},
'pre-mlag-check-l2': {'action': {
'args': {
'values': {
'DeviceID': 'SN2'}},
'name': 'mlaghealthcheck'},
'name': 'Check MLAG Health'},
'left-leafs': {'name': 'left-leafs',
'rows': {
'values': [{'values': ['leaf1']},
{'values': ['leaf2']}]}},
'upgrade2': {'action': {'args': {
'values': {'TaskID': '1243'}},
'name': 'task',
'timeout': 3000},
'name': 'Image Upgrade'},
'pre-mlag-check-l1': {'action': {
'args': {
'values': {
'DeviceID': 'SN1'}},
'name': 'mlaghealthcheck'},
'name': 'Check MLAG Health'},
'post-mlag-check-l2': {'action': {
'args': {
'values': {
'DeviceID': 'SN1'}},
'name': 'mlaghealthcheck'},
'name': 'Check MLAG Health'},
'leaf1': {'name': 'leaf1',
'rows': {
'values': [{'values': [
'pre-mlag-check-l1']},
{'values': [
'upgrade1']},
{'values': [
'post-mlag-check-l1'],
}]}},
'post-mlag-check-l1': {'action': {
'args': {
'values': {
'DeviceID': 'SN2'}},
'name': 'mlaghealthcheck'},
'name': 'Check MLAG Health'},
'leaf2': {'name': 'leaf2',
'rows': {'values': [{'values': [
'pre-mlag-check-l2']},
{'values': [
'upgrade2']},
{'values': [
'post-mlag-check-l2'],
}]}}}}}
}
The above would result in the following hierarchy:
root (series)
|- left-leafs (series)
|- leaf1 (series)
| |- pre-mlag-check-l1
| |- upgrade1
| |- post-mlag-check-l1
|- leaf2 (series)
|- pre-mlag-check-l1
|- upgrade1
|- post-mlag-check-l1
Returns:
response (dict): A dict that contains...
Ex: {'value': {'key': {'id':cc_id,
'time': '...'}
'''
msg = 'Change Control Resource APIs are supported from 2021.2.0 or newer.'
# For on-prem check the version as it is only supported from 2021.2.0+
if self.cvp_version_compare('>=', 6.0, msg):
payload = custom_cc
cc_url = '/api/resources/changecontrol/v1/ChangeControlConfig'
self.log.debug('v6 ' + str(cc_url) + ' ' + str(payload))
return self.clnt.post(cc_url, data=payload)
return None
def change_control_create_for_tasks(self, cc_id, name, tasks, series=True):
''' Create a simple Change Control for tasks using Resource APIs.
Supported versions: CVP 2021.2.0 or newer and CVaaS.
This function will create a change with either all Task actions in series or parallel.
For custom stage hierarchy the change_control_create_with_custom_stages()
should be used.
Args:
cc_id (string): The ID for the new change control.
name (string): The name for the new change control.
tasks (list): A list of Task IDs as strings
Ex: ['10', '11', '12']
series (bool): A flag for running tasks in series or
in parallel. Defaults to True for running in series.
Returns:
response (dict): A dict that contains...
Ex: {'value': {'key': {'id':cc_id,
'time': '...'}
'''
stages = {'values': {'root': {'name': 'root', 'rows': {'values': []}}}}
if series:
for index, task in enumerate(tasks):
stage_id = f"stage{index}"
stages['values']['root']['rows']['values'].append({'values': [stage_id]})
stages['values'][stage_id] = {
'action': {
'args': {
'values': {
'TaskID': task,
},
},
'name': 'task',
'timeout': 3000,
},
'name': stage_id,
}
else:
stages['values']['root']['rows']['values'].append({'values': []})
for index, task in enumerate(tasks):
stage_id = f"stage{index}"
stages['values']['root']['rows']['values'][0]['values'].append(stage_id)
stages['values'][stage_id] = {
'action': {
'args': {
'values': {
'TaskID': task,
},
},
'name': 'task',
'timeout': 3000,
},
'name': stage_id,
}
msg = 'Change Control Resource APIs are supported from 2021.2.0 or newer.'
# For on-prem check the version as it is only supported from 2021.2.0+
if self.cvp_version_compare('>=', 6.0, msg):
payload = {
'key': {
'id': cc_id
},
'change': {
'name': name,
'rootStageId': 'root',
'notes': 'randomString',
'stages': stages
}
}
cc_url = '/api/resources/changecontrol/v1/ChangeControlConfig'
self.log.debug('v6 ' + str(cc_url) + ' ' + str(payload))
return self.clnt.post(cc_url, data=payload, timeout=self.request_timeout)
return None
def change_control_start(self, cc_id, notes=""):
''' Start a Change Control using Resource APIs.
Supported versions: CVP 2021.2.0 or newer and CVaaS.
Args:
cc_id (string): The ID for the new change control.
notes (string): An optional note.
Returns:
response (dict): A dict that contains...
Ex: {"value":{"key":{"id":cc_id}, "start":{"value":true, "notes":"note"}},
"time":"2021-12-14T21:02:21.830306071Z"}
'''
msg = 'Change Control Resource APIs are supported from 2021.2.0 or newer.'
# For on-prem check the version as it is only supported from 2021.2.0+
if self.cvp_version_compare('>=', 6.0, msg):
payload = {
"key": {
"id": cc_id
},
"start": {
"value": True,
"notes": notes
}
}
cc_url = '/api/resources/changecontrol/v1/ChangeControlConfig'
self.log.debug('v6 ' + str(cc_url) + ' ' + str(payload))
return self.clnt.post(cc_url, data=payload, timeout=self.request_timeout)
return None
def change_control_stop(self, cc_id, notes=""):
''' Stop a Change Control using Resource APIs.
Supported versions: CVP 2021.2.0 or newer and CVaaS.
Args:
cc_id (string): The ID for the new change control.
notes (string): An optional note.
Returns:
response (dict): A dict that contains...
Ex: {"value":{"key":{"id":cc_id}, "start":{"value":false, "notes":"note"}},
"time":"2021-12-14T21:02:21.830306071Z"}
'''
msg = 'Change Control Resource APIs are supported from 2021.2.0 or newer.'
# For on-prem check the version as it is only supported from 2021.2.0+
if self.cvp_version_compare('>=', 6.0, msg):
payload = {
"key": {
"id": cc_id
},
"start": {
"value": False,
"notes": notes
}
}
cc_url = '/api/resources/changecontrol/v1/ChangeControlConfig'
self.log.debug('v6 ' + str(cc_url) + ' ' + str(payload))
return self.clnt.post(cc_url, data=payload, timeout=self.request_timeout)
return None
def change_control_schedule(self, cc_id, schedule_time, notes=""):
''' Schedule a Change Control using Resource APIs.
Supported versions: CVP 2022.1.0 or newer and CVaaS.
Args:
cc_id (string): The ID for the new change control.
schedule_time (string): rfc3339 time format, e.g: 2021-12-23T02:07:00.0Z
notes (string): An optional note.
Returns:
response (dict): A dict that contains...
Ex: {"value":{"key":{"id":"5821c7c1-e276-4387-b60a"},
"schedule":{"value":"2021-12-23T02:07:00Z",
"notes":"CC schedule via curl"}},
"time":"2021-12-23T02:06:18.739965204Z"}
'''
msg = 'Change Control Scheduling via Resource APIs are supported from 2022.1.0 or newer.'
# For on-prem check the version as it is only supported from 2022.1.0+
if self.cvp_version_compare('>=', 8.0, msg):
payload = {
"key": {
"id": cc_id
},
"schedule": {
"value": schedule_time,
"notes": notes
}
}
cc_url = '/api/resources/changecontrol/v1/ChangeControlConfig'
self.log.debug('v8 ' + str(cc_url) + ' ' + str(payload))
return self.clnt.post(cc_url, data=payload, timeout=self.request_timeout)
return None
def device_decommissioning(self, device_id, request_id):
''' Decommission a device using Resource APIs.
Supported versions: CVP 2021.3.0 or newer and CVaaS.
Args:
device_id (string): Serial Number of the device.
request_id (string): Key identifies the request to decommission the device.
Recommended to generate uuid with str(uuid.uuid4()).
Returns:
response (dict): Returns None if device is not found else return dict that contains
Ex: {'value': {'key': {'requestId': '4a4ba5a2-9886-4cd5-84d6-bdaf85a9f091'},
'deviceId': 'BAD032986065E8DC14CBB6472EC314A6'},
'time': '2022-02-12T02:58:30.765459650Z'}
'''
device_exists = False
inventory = self.get_inventory(provisioned=False)
for device in inventory:
if device['serialNumber'] == device_id:
device_exists = True
break
if device_exists:
msg = 'Decommissioning via Resource APIs are supported from 2021.3.0 or newer.'
# For on-prem check the version as it is only supported from 2021.3.0+
if self.cvp_version_compare('>=', 7.0, msg):
payload = {
"key": {
"request_id": request_id
},
"device_id": device_id
}
url = '/api/resources/inventory/v1/DeviceDecommissioningConfig'
self.log.debug('v7 ' + str(url) + ' ' + str(payload))
return self.clnt.post(url, data=payload, timeout=self.request_timeout)
else:
self.log.warning("Device with %s serial number does not exist (or is not registered)"
" to decommission", device_id)
return None
def device_decommissioning_status_get_one(self, request_id):
''' Get the decommission status of a device using Resource APIs.
Supported versions: CVP 2021.3.0 or newer and CVaaS.
Args:
request_id (string): key identifies the request to decommission the device
Returns:
response (dict): A dict that contains...
Ex:{"result":{"value":{"key":{"requestId":"123456789"},
"status":"DECOMMISSIONING_STATUS_IN_PROGRESS",
"statusMessage":"Disabled TerminAttr, waiting for device to be marked inactive"},
"time":"2022-02-04T19:41:46.376310308Z","type":"INITIAL"}}
'''
msg = 'Decommissioning via Resource APIs are supported from 2021.3.0 or newer.'
# For on-prem check the version as it is only supported from 2021.3.0+
if self.cvp_version_compare('>=', 7.0, msg):
params = f"key.requestId={request_id}"
url = '/api/resources/inventory/v1/DeviceDecommissioning?' + params
self.log.debug('v7 ' + str(url))
return self.clnt.get(url, timeout=self.request_timeout)
return None
def device_decommissioning_status_get_all(self, status="DECOMMISSIONING_STATUS_UNSPECIFIED"):
''' Get the decommissioning status of all devices using Resource APIs.
Supported versions: CVP 2021.3.0 or newer and CVaaS.
Args:
status (enum): By default it will get the decommissioning status for all devices.
Possible values:
"DECOMMISSIONING_STATUS_UNSPECIFIED" or 0,
"DECOMMISSIONING_STATUS_IN_PROGRESS" or 1,
"DECOMMISSIONING_STATUS_FAILURE" or 2,
"DECOMMISSIONING_STATUS_SUCCESS" or 3
Returns:
response (dict): A dict that contains...
Ex: {"result":{"value":{"key":{"requestId":"123456789"},
"status":"DECOMMISSIONING_STATUS_IN_PROGRESS",
"statusMessage":"Disabled TerminAttr, waiting for device to be marked inactive"},
"time":"2022-02-04T19:41:46.376310308Z","type":"INITIAL"}}
'''
msg = 'Decommissioning via Resource APIs are supported from 2021.3.0 or newer.'
# For on-prem check the version as it is only supported from 2021.3.0+
if self.cvp_version_compare('>=', 7.0, msg):
payload = {
"partialEqFilter": [
{
"status": status,
}
]
}
url = '/api/resources/inventory/v1/DeviceDecommissioning/all'
self.log.debug('v7 ' + str(url))
return self.clnt.post(url, data=payload, timeout=self.request_timeout)
return None
def add_role(self, name, description, moduleList):
''' Add new local role to the CVP UI.
Args:
name (str): local role name on CVP
description (str): role description
moduleList (list): list of modules (name (str) and mode (str))
'''
data = {"name": name,
"description": description,
"moduleList": moduleList}
return self.clnt.post('/role/createRole.do', data=data,
timeout=self.request_timeout)
def update_role(self, rolekey, name, description, moduleList):
''' Updates role information, like
role name, description and role modules.
Args:
rolekey (str): local role key on CVP
name (str): local role name on CVP
description (str): role description
moduleList (list): list of modules (name (str) and mode (str))
'''
data = {"key": rolekey,
"name": name,
"description": description,
"moduleList": moduleList}
return self.clnt.post('/role/updateRole.do', data=data,
timeout=self.request_timeout)
def get_role(self, rolekey):
''' Returns specified role information.
Args:
rolekey (str): role key on CVP
Returns:
response (dict): Returns a dict that contains the role.
Ex: {'name': 'Test Role', 'key': 'role_1599019487020581247',
'description': 'Test'...}
'''
return self.clnt.get(f"/role/getRole.do?roleId={rolekey}",
timeout=self.request_timeout)
def get_roles(self):
''' Get all the user roles in CloudVision.
Returns:
response (dict): Returns a dict that contains all the user role states..
Ex: {'total': 7, 'roles': [{'name': 'Test Role', 'key': 'role_1599019487020581247',
'description': 'Test'...}]}
'''
url = '/role/getRoles.do?startIndex=0&endIndex=0'
return self.clnt.get(url, timeout=self.request_timeout)
def delete_role(self, rolekey):
''' Remove specified role from CVP
Args:
rolekey (str): role key on CVP
'''
data = [rolekey]
return self.delete_roles(data)
def delete_roles(self, rolekeys):
''' Remove specified roles from CVP
Args:
rolekeys (list): list of role keys (str) on CVP
'''
return self.clnt.post('/role/deleteRoles.do', data=rolekeys,
timeout=self.request_timeout)
def svc_account_token_get_all(self):
''' Get all service account token states using Resource APIs.
Supported versions: CVP 2021.3.0 or newer and CVaaS.
Returns:
response (list): Returns a list of dictionaries that contains...
Ex: [{'value': {'key': {'id': 'randomId'}, 'user': 'string',
'description': 'string','valid_until': '2022-11-02T06:58:53Z',
'created_by': 'string', 'last_used': None},
'time': '2022-05-03T15:38:53.725014447Z', 'type': 'INITIAL'}, ...]
'''
msg = 'Service Account Resource APIs are supported from 2021.3.0+.'
if self.cvp_version_compare('>=', 7.0, msg):
url = '/api/v3/services/arista.serviceaccount.v1.TokenService/GetAll'
self.log.debug(f"v7 {url}")
return self.clnt.post(url)
return None
def svc_account_token_get_one(self, token_id):
''' Get a service account token's state using Resource APIs
Supported versions: CVP 2021.3.0 or newer and CVaaS.
Returns:
response (list): Returns a list of dict that contains...
Ex: [{'value': {'key': {'id': 'randomId'}, 'user': 'string',
'description': 'string', 'valid_until': '2022-11-02T06:58:53Z',
'created_by': 'cvpadmin', 'last_used': None},
'time': '2022-05-03T15:38:53.725014447Z', 'type': 'INITIAL'}]
'''
msg = 'Service Account Resource APIs are supported from 2021.3.0+.'
if self.cvp_version_compare('>=', 7.0, msg):
payload = {"key": {"id": token_id}}
url = '/api/v3/services/arista.serviceaccount.v1.TokenService/GetOne'
self.log.debug(f"v7 {url} {payload}")
return self.clnt.post(url, data=payload)
return None
def svc_account_token_delete(self, token_id):
''' Delete a service account token using Resource APIs.
Supported versions: CVP 2021.3.0 or newer and CVaaS.
Args:
token_id (string): The id of the service account token.
Returns:
response (list): Returns a list of dict that contains the time of deletion:
Ex: [{'key': {'id': '<token_id>'},
'time': '2022-07-26T15:29:03.687167871Z'}]
'''
msg = 'Service Account Resource APIs are supported from 2021.3.0+.'
if self.cvp_version_compare('>=', 7.0, msg):
payload = {"key": {"id": token_id}}
url = '/api/v3/services/arista.serviceaccount.v1.TokenConfigService/Delete'
self.log.debug(f"v7 {url} {payload}")
return self.clnt.post(url, data=payload)
return None
def svc_account_token_set(self, username, duration, description):
''' Create a service account token using Resource APIs.
Supported versions: CVP 2021.3.0 or newer and CVaaS.
Args:
username (string): The service account username for which the token will be
generated.
duration (string): The validity of the service account in seconds e.g.: "20000s"
The maximum value is 1 year in seconds e.g.: "31536000s"
description (string): The description of the service account token.
Returns:
response (list): Returns a list of dict that contains the token:
Ex: [{'value': {'key': {'id': '<userId>'}, 'user': 'ansible',
'description': 'cvprac test',
'valid_for': '550s', 'token': '<ey...>'}]
'''
payload = {'value': {'description': description,
'user': username,
'valid_for': duration}}
msg = 'Service Account Resource APIs are supported from 2021.3.0+.'
if self.cvp_version_compare('>=', 7.0, msg):
url = '/api/v3/services/arista.serviceaccount.v1.TokenConfigService/Set'
self.log.debug(f"v7 {url} {payload}")
return self.clnt.post(url, data=payload)
return None
def svc_account_get_all(self):
''' Get all service account states using Resource APIs.
Supported versions: CVP 2021.3.0 or newer and CVaaS.
Returns:
response (list): Returns a list of dictionaries that contains...
Ex: [{'value': {'key': {'name': 'ansible'}, 'status': 'ACCOUNT_STATUS_ENABLED',
'description': 'lab-tests', 'groups': {'values': ['network-admin']}},
'time': '2022-02-10T04:28:14.251684869Z', 'type': 'INITIAL'}, ...]
'''
msg = 'Service Account Resource APIs are supported from 2021.3.0+.'
if self.cvp_version_compare('>=', 7.0, msg):
url = '/api/v3/services/arista.serviceaccount.v1.AccountService/GetAll'
self.log.debug(f"v7 {url}")
return self.clnt.post(url)
return None
def svc_account_get_one(self, username):
''' Get a service account's state using Resource APIs
Supported versions: CVP 2021.3.0 or newer and CVaaS.
Args:
username (string): The service account username.
Returns:
response (list): Returns a list of dict that contains...
Ex: [{'value': {'key': {'name': 'ansible'}, 'status': 'ACCOUNT_STATUS_ENABLED',
'description': 'lab-tests', 'groups': {'values': ['network-admin']}},
'time': '2022-02-10T04:28:14.251684869Z'}]
'''
msg = 'Service Account Resource APIs are supported from 2021.3.0+.'
if self.cvp_version_compare('>=', 7.0, msg):
payload = {"key": {"name": username}}
url = '/api/v3/services/arista.serviceaccount.v1.AccountService/GetOne'
self.log.debug(f"v7 {url} {payload}")
return self.clnt.post(url, data=payload)
return None
def svc_account_set(self, username, description, roles, status):
''' Create a service account using Resource APIs.
Supported versions: CVP 2021.3.0 or newer and CVaaS.
Args:
username (string): The service account username.
description (string): The description of the service account.
roles (list): The list of role IDs. Default roles have a human readable name,
e.g.: 'network-admin', 'network-operator';
other roles will have the format of 'role_<unix_timestamp>',
e.g. 'role_1658850344592739349'.
cvprac automatically converts non-default role names to role IDs.
status (enum): The status of the service account. Possible values:
0 or 'ACCOUNT_STATUS_UNSPECIFIED'
1 or 'ACCOUNT_STATUS_ENABLED'
2 or 'ACCOUNT_STATUS_DISABLED'
Returns:
response (list): Returns a list of dict that contains...
Ex: [{'value': {'key': {'name': 'cvprac2'}, 'status': 'ACCOUNT_STATUS_ENABLED',
'description': 'testapi', 'groups': {'values':
['network-admin', 'role_1658850344592739349']}},
'time': '2022-07-26T18:19:55.392173445Z'}]
'''
msg = 'Service Account Resource APIs are supported from 2021.3.0+.'
if self.cvp_version_compare('>=', 7.0, msg):
role_ids = []
all_roles = self.get_roles()
for role in all_roles['roles']:
if role['key'] in roles or role['name'] in roles:
role_ids.append(role['key'])
if len(roles) != len(role_ids):
self.log.warning(f"Not all provided roles {roles} are valid. "
f"Only using the found valid roles {role_ids}")
payload = {'value': {'description': description,
'groups': {'values': role_ids},
'key': {'name': username},
'status': status}}
url = '/api/v3/services/arista.serviceaccount.v1.AccountConfigService/Set'
self.log.debug(f"v7 {url} {payload}")
return self.clnt.post(url, data=payload)
return None
def svc_account_delete(self, username):
''' Delete a service account using Resource APIs.
Supported versions: CVP 2021.3.0 or newer and CVaaS.
Args:
username (string): The service account username.
Returns:
response (list): Returns a list of dict that contains the time of deletion:
Ex: [{'key': {'name': 'cvprac2'},
'time': '2022-07-26T18:26:53.637425846Z'}]
'''
msg = 'Service Account Resource APIs are supported from 2021.3.0+.'
if self.cvp_version_compare('>=', 7.0, msg):
payload = {"key": {"name": username}}
url = '/api/v3/services/arista.serviceaccount.v1.AccountConfigService/Delete'
self.log.debug(f"v7 {url} {payload}")
return self.clnt.post(url, data=payload)
return None
def svc_account_delete_expired_tokens(self):
''' Delete all service account tokens using Resource APIs.
Supported versions: CVP 2021.3.0 or newer and CVaaS.
Returns:
response (list): Returns a list of dict that contains the list of tokens
that were deleted:
Ex: [{'value': {'key': {'id': '091f48a2808'},'user': 'cvprac3',
'description': 'cvprac test', 'valid_until': '2022-07-26T18:31:18Z',
'created_by': 'cvpadmin', 'last_used': None},
'time': '2022-07-26T18:30:28.022504853Z','type': 'INITIAL'},
{'value': {'key': {'id': '2f6325d9c'},...]
'''
tokens = self.svc_account_token_get_all()
expired_tokens = []
for tok in tokens:
token = tok['value']
if datetime.strptime(token['valid_until'], "%Y-%m-%dT%H:%M:%SZ") < datetime.utcnow():
self.svc_account_token_delete(token['key']['id'])
expired_tokens.append(tok)
return expired_tokens