Netdev - sjherrick/code-base GitHub Wiki

Netdev

Connect to Router

router = {
    'username': USERNAME,
    'password': PASSWORD,
    'host': 192.168.0.1,
    'device_type': 'cisco_ios'
}

async with netdev.create(**router) as host:
    output = await host.send_command('sh ip int br')

Example (Run task on pool, max 40 concurrent)

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

sema = asyncio.BoundedSemaphore(value=40)

tasks = [task(router, sema) for router in routers]

results = loop.run_until_complete(
    asyncio.gather(*tasks)
)

async def task(router, sema):
    # Create netdev object and run commands
    try:
        async with sema, netdev.create(**mdf) as host:
            output = await host.send_command('sh ip int br')
        return output, None
    except Exception as e:
        return None, str(e)