Skip to content

user guide

Get Started

On the front page you can find the installation instructions. The library is available on PyPI.org and provided without charge by Wingmen Solutions.

Authentication happens when you first make a request to the API, so you don't need to call an explicit authentication method.

Below you can find a few examples to get started. We also recommend to check out the logging and concurrency sections to optimize your experience with the library.


API Clients

Find more advanced examples in the Cisco APIC Client User Guide
Explore the wingpy client API Cisco APIC Client API Reference

Import and instantiate your client
from wingpy import CiscoAPIC

apic = CiscoAPIC(
    base_url="https://apic.example.com", # (1)!
    username="admin", # (2)!
    password="password", # (3)!
    verify=False,
)
  1. Environment variable:
    WINGPY_APIC_BASE_URL
    
  2. Environment variable:
    WINGPY_APIC_USERNAME
    
  3. Environment variable:
    WINGPY_APIC_PASSWORD
    
Run a simple query
tenants = apic.get_all("/api/class/fvTenant.json")
for tenant in tenants:
    print(tenant["fvTenant"]["attributes"]["name"])

Find more advanced examples in the Cisco Catalyst Center Client User Guide
Explore the wingpy client API Cisco Catalyst Center Client API Reference

Import and instantiate your client
from wingpy import CiscoCatalystCenter

catalyst_center = CiscoCatalystCenter(
    base_url="https://catalyst-center.example.com", # (1)!
    username="admin", # (2)!
    password="password", # (3)!
    verify=False,
)
  1. Environment variable:
    WINGPY_CATALYST_CENTER_BASE_URL
    
  2. Environment variable:
    WINGPY_CATALYST_CENTER_USERNAME
    
  3. Environment variable:
    WINGPY_CATALYST_CENTER_PASSWORD
    
Run a simple query
devices = catalyst_center.get("/dna/intent/api/v1/network-device")
for device in devices.json()["response"]:
    print(device["hostname"])

Find more advanced examples in the Cisco FMC Client User Guide
Explore the wingpy client API Cisco FMC Client API Reference

Import and instantiate your client
from wingpy import FMCClient

fmc = FMCClient(
    base_url="https://fmc.example.com", # (1)!
    username="admin", # (2)!
    password="password", # (3)!
    verify=False,
)
  1. Environment variable:
    WINGPY_FMC_BASE_URL
    
  2. Environment variable:
    WINGPY_FMC_USERNAME
    
  3. Environment variable:
    WINGPY_FMC_PASSWORD
    
Run a simple query
hosts = fmc.get_all(
    "/api/fmc_config/v1/domain/{domainUUID}/object/hosts",  
    expanded=True,
)
for host in hosts:
    print(host["name"])

Find more advanced examples in the Cisco Hyperfabric Client User Guide
Explore the wingpy client API Cisco Hyperfabric Client API Reference

Import and instantiate your client
from wingpy import CiscoHyperfabric

hyperfabric = CiscoHyperfabric(
    token="eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9...", # (1)!
)
  1. Environment variable:
    WINGPY_HYPERFABRIC_TOKEN
    
Run a simple query
devices = hyperfabric.get("/devices")
print("Total devices:", len(devices))

Find more advanced examples in the Cisco ISE Client User Guide
Explore the wingpy client API Cisco ISE Client API Reference

Import and instantiate your client
from wingpy import CiscoISE

ise = CiscoISE(
    base_url="https://ise.example.com", # (1)!
    username="admin", # (2)!
    password="password", # (3)!
    verify=False,
)
  1. Environment variable:
    WINGPY_ISE_BASE_URL
    
  2. Environment variable:
    WINGPY_ISE_USERNAME
    
  3. Environment variable:
    WINGPY_ISE_PASSWORD
    
Run a simple query
endpoints = ise.get_all("/api/v1/endpoint")
for endpoint in endpoints:
    print(endpoint["name"])

Find more advanced examples in the Cisco Meraki Dashboard Client User Guide
Explore the wingpy client API Cisco Meraki Dashboard Client API Reference

Import and instantiate your client
from wingpy import CiscoMerakiDashboard

merakidashboard = CiscoMerakiDashboard(
    token="eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9...", # (1)!
    org_name="My Organization Name", # (2)!
)
  1. Environment variable:

    WINGPY_MERAKI_DASHBOARD_TOKEN
    

  2. Environment variable:

    WINGPY_MERAKI_DASHBOARD_ORG_NAME
    

Run a simple query
orgs = hyperfabric.get_all("/organizations")
print("Total organizations:", len(orgs))

Context Manager

You can use all the wingpy clients as context managers (using the with keyword), to ensure that the session is properly closed after use and resources are freed. We generally recommend using the clients as context managers.

Using a client as a context manager
from wingpy import CiscoFMC

with CiscoFMC(
    base_url="https://fmc.example.com",
    username="admin",
    password="password",
    verify=False,
) as fmc:
    hosts = fmc.get_all("/api/fmc_config/v1/domain/{domainUUID}/object/hosts")
    for host in hosts:
        print(host["name"])

Authentication

When you instantiate a wingpy client, it does not immediately authenticate with the API. Instead, it waits until you make your first API call. This is done to avoid unnecessary authentication requests if you don't end up using the client.

If you want to handle authentication errors, you can catch the AuthenticationFailure exception. This is useful if you want to provide a more user-friendly error message or take specific actions when authentication fails. If you choose to do so, you can explicitly call the authenticate() method rather than relying on the implicit authentication done just in time for the first API call.

Catch authentication errors
from wingpy.exceptions import AuthenticationFailure

try:
    fmc.authenticate()
except AuthenticationFailure:
    print("Authentication failed. Check your credentials.")

Authentication with each client is handled in a client-specific method or methods. The authenticate() method invokes the client-specific authentication methods, which are responsible for obtaining the necessary authentication tokens and performing any additional setup required for the API client to function correctly. Below is a brief overview of how authentication works for each client:

_authenticate() is called first to get the authentication token.
_after_auth() is called after to detect rate-limits configuration on the controller for subsequent API calls.

_authenticate() is called first to get the authentication token.
_after_auth() is called after - this method gets the Catalyst Center version if possible and sets it in the version attribute.

_authenticate() is called first to get the authentication token.
_after_auth() is called after - this method gets the FMC version and domain UUID, which are needed for subsequent API calls.

Hyperfabric uses a token-based authentication, so you need to provide the token when instantiating the client. There are no additional authentication methods called after instantiation.

_authenticate() is called first to set the authentication options in the header.
_after_auth() is called after - this method gets the ISE version and sets the version attribute.


Concurrency

The wingpy library provides a built-in taskrunner/scheduler for concurrent API requests. This allows you to maximize throughput and is particularly useful for APIs that does not support bulk operations.

  • Does not require you to know about concurrency or threading.
  • Rate limits and connection constraints are handled automatically by the client.
  • Based on our own stress tests on actual lab equipment!

Every client instance has a pre-configured scheduler
This scheduler is optimized for the specific API client, ensuring efficient use of resources and compliance with the API's rate limits. An instance of the TaskRunner is available in the tasks attribute in each instance of any wingpy API client.

As can be gleaned from the API reference, the TaskRunner instance has two main methods:

  • schedule() - Schedules a function to be executed concurrently.
  • run() - Executes all scheduled tasks concurrently and waits for their completion.

Below are a few examples of how we recomend using the built-in scheduler for concurrent API requests.

Example: Concurrent GET Requests with Cisco Catalyst Center

Get a client
from wingpy import CiscoCatalystCenter

# Initialize the Cisco Catalyst Center client with your credentials
# Replace 'password' with your actual password from the DevNet Sandbox
cc = CiscoCatalystCenter(
    base_url="https://sandboxdnac.cisco.com",
    username="devnetuser", 
    password="password", # (1)!
    verify=False,
)
  1. Get your password at https://devnetsandbox.cisco.com/
Get device health
high_cpu = []

result = cc.get("/dna/intent/api/v1/device-health")
for device in result.json()["response"]:
    if device['cpuUtilization'] > 25:
        high_cpu.append(device["uuid"])

Now, we have a list of devices with high CPU utilization, and we want to fetch more details about each device concurrently. Instead of making multiple sequential GET requests, we can schedule them using the built-in scheduler.

Where we would normally use the get() method to make a GET request, we can instead use the built-in scheduler to manage the request. However, we still actually use the get() method, but we schedule it to run concurrently. This means that the parameters will stay the same, but we will use the schedule() method to schedule the GET requests instead of calling get() directly.

Schedule concurrent GET requests
for uuid in high_cpu:
    cc.tasks.schedule(cc.get,
        path=f"/dna/intent/api/v1/device-detail",
        params={"identifier": "uuid", "searchBy": uuid},
        _task_name=f"get-device-details-{uuid}"
    )

The tasks have been put in the queue, but they haven't been executed yet. To run all scheduled tasks concurrently, we call the run() method.

Run all scheduled tasks
results = cc.tasks.run()
print(results)
Output
{'get-device-details-29677dc7-b49e-4ff0-a8c7-e6649179accc': <Response [200 OK]>,
 'get-device-details-d145ac01-2409-46cf-86e8-8b7703c0598a': <Response [200 OK]>,
 'get-device-details-7c1b9833-1be7-43f4-b327-4663c816c4cc': <Response [200 OK]>,
 'get-device-details-9e7d73bd-3dce-4581-830f-0e022e0c5e41': <Response [200 OK]>}

As you can see, results will contain the responses - a dict of task names to httpx.Response objects from all the scheduled GET requests.

You don't have to provide a task name, they will be generated automatically if you don't. But it can be useful to provide a task name to identify the task in the results.

The client automatically handles the rate limits and connection constraints, so you don't have to worry about that. This means that you can potentially schedule a very large number of tasks, and the client will gracefully handle the execution of these tasks and handle rate limiting events from the API, such as HTTP 429 Too Many Requests responses, or platform-specific rate limiting events, that we have identified in our tests.

Full concurrent GET requests example (click to expand)
from wingpy import CiscoCatalystCenter

# Initialize the Cisco Catalyst Center client with your credentials
cc = CiscoCatalystCenter(
    base_url="https://sandboxdnac.cisco.com",
    username="devnetuser", 
    password="password", # (1)!
    verify=False,
)

# Get device health
high_cpu = []
result = cc.get("/dna/intent/api/v1/device-health")
for device in result.json()["response"]:
    if device['cpuUtilization'] > 25:
        high_cpu.append(device["uuid"])

# Schedule concurrent GET requests
for uuid in high_cpu:
    cc.tasks.schedule(cc.get,
        path=f"/dna/intent/api/v1/device-detail",
        params={"identifier": "uuid", "searchBy": uuid},
        _task_name=f"get-device-details-{uuid}"
    )

# Run all scheduled tasks
results = cc.tasks.run()

for result in results.values():
    print(result.json()["response"]["softwareVersion"])
  1. Get your password at https://devnetsandbox.cisco.com/

Nested Concurrency

In some cases, you might already have an event loop running, for example if you are using fastapi or uvicorn. In that case, you can enable a little trick by and allowing nested event loops, with the nest_asyncio package. This allows you to run the scheduled tasks in an existing event loop, without having to create a new one.

Full Example with wingpy, FastAPI and nest_asyncio (click to expand)
import fastapi
import nest_asyncio
from wingpy import CiscoCatalystCenter

nest_asyncio.apply()

app = fastapi.FastAPI(title="FastAPI and wingpy tester")


always_on_sandbox = {
    "base_url": "https://sandboxdnac.cisco.com",
    "username": "devnetuser",
    "password": "Cisco123!",
    "verify": False,
}


@app.get("/api/v1/wingpy/test")
async def test():
    """
    Test endpoint to verify wingpy functionality.
    """
    try:
        # Initialize CiscoCatalystCenter with the always_on_sandbox configuration
        cc = CiscoCatalystCenter(**always_on_sandbox)

        # Get device health
        high_cpu = []
        result = cc.get("/dna/intent/api/v1/device-health")
        for device in result.json()["response"]:
            if device["cpuUtilization"] > 25:
                high_cpu.append(device["uuid"])

        # Schedule concurrent GET requests
        for uuid in high_cpu:
            cc.tasks.schedule(
                cc.get,
                path="/dna/intent/api/v1/device-detail",
                params={"identifier": "uuid", "searchBy": uuid},
                _task_name=f"get-device-details-{uuid}",
            )

        # Run all scheduled tasks
        results = cc.tasks.run()

        return {"results": [result.json()["response"] for result in results.values()]}

    except Exception as e:
        return {"status": "error", "message": str(e)}


if __name__ == "__main__":
    import uvicorn

    uvicorn.run("api:app", host="0.0.0.0", port=8000, reload=True)

Logging

This section covers the logging capabilities of wingpy. The library uses the loguru package for logging, which provides a simple and flexible way to log messages.

Most of the actions performed by wingpy are logged, including API requests and responses, errors, and other important events. This logging information can be very useful for debugging and monitoring your application. It's also a cool way to see what the library is doing under the hood, and to understand the order of operations. For example, you can see the order in which API requests are made, and how long each request takes, try the the concurrent GET requests example to see how the logging works in practice.

Levels

  • TRACE: Detailed information, typically of interest only when diagnosing problems.
  • DEBUG: Fine-grained informational events that are most useful to debug an application.
  • INFO: Informational messages that highlight the progress of the application at a coarse-grained level.
  • SUCCESS: Indication that an operation was successful.
  • WARNING: Potentially harmful situations.
  • ERROR: Error events that might still allow the application to continue running.
  • CRITICAL: Very severe error events that will presumably lead the application to abort.

Please note that the TRACE level is very verbose and will output headers, bodies, and other details of the requests and responses. This is useful for debugging, but beware that it can generate a lot of output, especially for APIs with large responses or many requests - in addition the headers usually contain the API authentication token, so be careful not to log sensitive information in production environments.

Configuration

We rely on the simplicity of the loguru package for logging, which allows you to configure logging in a very straightforward way. You can set the log level and add handlers to log messages to different outputs, such as the console or a file.

Configure logging
from loguru import logger
import sys

# Set the log level to TRACE, DEBUG, INFO, SUCCESS, WARNING, ERROR, or CRITICAL
log_level = "DEBUG"

logger.remove() # Remove the default logger

logger.add(sys.stderr, level=log_level)

That's it! Wingpy will now log messages at the specified level to the console. You can also log to a file by adding a file handler, see the loguru documentation for more details.