Parallel asynchronous GET requests with asyncio - lmmx/devnotes GitHub Wiki

The 3 types of async statement in Python 3.5+ to get to know here are

  • async with (asynchronous context manager)
  • async for (asynchronous generator)
  • (asynchronous generators)

via Łukasz Langa

(also will see async def asynchronous function)

Demo code via RealPython uses aiohttp

The high-level program structure will look like this:

  • Read a sequence of URLs from a local file, urls.txt.

  • Send GET requests for the URLs and decode the resulting content. If this fails, stop there for a URL.

  • Search for the URLs within href tags in the HTML of the responses.

  • Write the results to foundurls.txt.

Do all of the above as asynchronously and concurrently as possible. (Use aiohttp for the requests, and aiofiles for the file-appends. These are two primary examples of IO that are well-suited for the async IO model.)

Code example

#!/usr/bin/env python3
# areq.py

"""Asynchronously get links embedded in multiple pages' HMTL."""

import asyncio
import logging
import re
import sys
from typing import IO
import urllib.error
import urllib.parse

import aiofiles
import aiohttp
from aiohttp import ClientSession

logging.basicConfig(
    format="%(asctime)s %(levelname)s:%(name)s: %(message)s",
    level=logging.DEBUG,
    datefmt="%H:%M:%S",
    stream=sys.stderr,
)
logger = logging.getLogger("areq")
logging.getLogger("chardet.charsetprober").disabled = True

HREF_RE = re.compile(r'href="(.*?)"')

async def fetch_html(url: str, session: ClientSession, **kwargs) -> str:
    """GET request wrapper to fetch page HTML.

    kwargs are passed to `session.request()`.
    """

    resp = await session.request(method="GET", url=url, **kwargs)
    resp.raise_for_status()
    logger.info("Got response [%s] for URL: %s", resp.status, url)
    html = await resp.text()
    return html

async def parse(url: str, session: ClientSession, **kwargs) -> set:
    """Find HREFs in the HTML of `url`."""
    found = set()
    try:
        html = await fetch_html(url=url, session=session, **kwargs)
    except (
        aiohttp.ClientError,
        aiohttp.http_exceptions.HttpProcessingError,
    ) as e:
        logger.error(
            "aiohttp exception for %s [%s]: %s",
            url,
            getattr(e, "status", None),
            getattr(e, "message", None),
        )
        return found
    except Exception as e:
        logger.exception(
            "Non-aiohttp exception occured:  %s", getattr(e, "__dict__", {})
        )
        return found
    else:
        for link in HREF_RE.findall(html):
            try:
                abslink = urllib.parse.urljoin(url, link)
            except (urllib.error.URLError, ValueError):
                logger.exception("Error parsing URL: %s", link)
                pass
            else:
                found.add(abslink)
        logger.info("Found %d links for %s", len(found), url)
        return found

async def write_one(file: IO, url: str, **kwargs) -> None:
    """Write the found HREFs from `url` to `file`."""
    res = await parse(url=url, **kwargs)
    if not res:
        return None
    async with aiofiles.open(file, "a") as f:
        for p in res:
            await f.write(f"{url}\t{p}\n")
        logger.info("Wrote results for source URL: %s", url)

async def bulk_crawl_and_write(file: IO, urls: set, **kwargs) -> None:
    """Crawl & write concurrently to `file` for multiple `urls`."""
    async with ClientSession() as session:
        tasks = []
        for url in urls:
            tasks.append(
                write_one(file=file, url=url, session=session, **kwargs)
            )
        await asyncio.gather(*tasks)

if __name__ == "__main__":
    import pathlib
    import sys

    assert sys.version_info >= (3, 7), "Script requires Python 3.7+."
    here = pathlib.Path(__file__).parent

    with open(here.joinpath("urls.txt")) as infile:
        urls = set(map(str.strip, infile))

    outpath = here.joinpath("foundurls.txt")
    with open(outpath, "w") as outfile:
        outfile.write("source_url\tparsed_url\n")

    asyncio.run(bulk_crawl_and_write(file=outpath, urls=urls))

The call to run at the end of this example means you don’t have to handle the opening and closing of the event loop itself, it’s handled for you (as the Python docs for asyncio recommend)

Application developers should typically use the high-level asyncio functions, such as asyncio.run(), and should rarely need to reference the loop object or call its methods. This section is intended mostly for authors of lower-level code, libraries, and frameworks, who need finer control over the event loop behavior.

Here’s an example of how to write the event loop handling from the examples below in either form:

Code example

loop = asyncio.get_event_loop()
loop.run_until_complete(main(gen_urls()))
loop.close()

instead becomes

asyncio.run(main(gen_urls()))
  • In my experience asyncio.run can remove some bugs encountered when handling the event loop directly, as well as being simpler.

aiostream

I quite liked this demo (via):

  • I removed the pipe part and added a generator function (TODO: try an async generator)
Code example

import asyncio
import aiohttp
import random
from aiostream import stream

async def main(urls):
    async with aiohttp.ClientSession() as session:
        ws = stream.repeat(session)
        xs = stream.zip(ws, stream.iterate(urls))
        ys = stream.starmap(xs, fetch, ordered=False, task_limit=10)
        zs = stream.map(ys, process)
        await zs

async def fetch(session, url):
    await asyncio.sleep(1 + 4*random.random())
    return url

async def process(data):
    print(data)

def gen_urls():
    for s in ["qrx", "conf", "ocu", "pore", "cal", "poll"]:
        yield f"https://{s}.spin.systems"

asyncio.run(main(gen_urls())) 

...but it doesn't actually download anything!

It's clear though that the HTTP GET request would go where the asyncio.sleep is, and an example of that can be found here:

Code example

import aiohttp
import asyncio

async def fetch(session, url):
    with aiohttp.Timeout(10):
        async with session.get(url) as response:
            return await response.text()

async def fetch_all(session, urls, loop):
    results = await asyncio.wait([loop.create_task(fetch(session, url))
                                  for url in urls])
    return results

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    # breaks because of the first url
    urls = ['http://SDFKHSKHGKLHSKLJHGSDFKSJH.com',
            'http://google.com',
            'http://twitter.com']
    with aiohttp.ClientSession(loop=loop) as session:
        the_results = loop.run_until_complete(
            fetch_all(session, urls, loop))
        # do something with the the_results

I found this example didn't actually run as intended any more, but the important part is to see the GET request taking place within the fetch (async) function:

Code example

async def fetch(session, url):
    with aiohttp.Timeout(10):
        async with session.get(url) as response:
            return await response.text()

rather than

async def fetch(session, url):
    await asyncio.sleep(1 + 4*random.random())
    return url

The aiohttp.Timeout(10) context manager seems (from the docs) to be an outdated way of setting an option on the session (default seems to be 5 mins).

So I'd suggest:

async def fetch(session, url):
    async with session.get(url) as response:
        return await response.text()

and to copy the client reference:

Code example

async def fetch(client):
    async with client.get('http://python.org') as resp:
        assert resp.status == 200
        return await resp.text()

async def main():
    async with aiohttp.ClientSession() as client:
        html = await fetch(client)
        print(html)

asyncio.run(main())

Notice there's the standard assert statement for the HTTP status code, which can just be added before the return await to give the full example:

Code example

import asyncio
import aiohttp
import random
from aiostream import stream

async def main(urls):
    async with aiohttp.ClientSession() as session:
        ws = stream.repeat(session)
        xs = stream.zip(ws, stream.iterate(urls))
        ys = stream.starmap(xs, fetch, ordered=False, task_limit=10)
        zs = stream.map(ys, process)
        await zs

async def fetch(session, url):
    async with session.get(url) as response:
        assert response.status == 200
        return await response.text()

async def process(data):
    print(data)

def gen_urls():
    for s in ["qrx", "conf", "ocu", "pore", "cal", "poll"]:
        yield f"https://{s}.spin.systems"

asyncio.run(main(gen_urls()))

...though in reality my generator function providing the URLs is actually a little more complicated:

Code example

class StreamUrlSet(EpisodeStreamPartURL):
    def __init__(self, size, url_prefix, filename_prefix, url_suffix, zero_based=False):
        super().__init__(url_prefix, filename_prefix, url_suffix)
        self.size = size # class is an iterator not a list so record size
        self.zero_based = zero_based
        self.reset_pos()

    @property
    def size(self):
        return self._size

    @size.setter
    def size(self, n):
        self._size = n

    @property
    def pos_end(self):
        return self.size if self.zero_based else self.size + 1

    def reset_pos(self):
        self.pos = 0 if self.zero_based else 1

    def increment_pos(self):
        self.pos += 1

    def __repr__(self):
        return f"{self.size} URLs"

    def __iter__(self):
        return next(self)

    def __next__(self):
        while self.pos < self.pos_end:
            p = self.pos
            self.increment_pos()
            yield self.make_part_url(p)

HTTPX

Michael Kennedy recommends httpx instead of aiohttp (as mentioned above it’s focused on the bilateral communication of a client and a server)

[HTTPX is] single purpose -- just about being a client. Whereas aiohttp is great but is both client and server framework.

Furthermore, HTTP/2 is available (but requires the dependency to be specified to pip as httpx[http2])

In brief, you use httpx HTTP2 similarly to the above, but as async with httpx.AsyncClient(http2=True) which makes the working code snippet:

Code example

import asyncio
import httpx
import random
from aiostream import stream

async def main(urls):
    async with httpx.AsyncClient(http2=True) as session:
        ws = stream.repeat(session)
        xs = stream.zip(ws, stream.iterate(urls))
        ys = stream.starmap(xs, fetch, ordered=False, task_limit=10)
        zs = stream.map(ys, process)
        await zs

async def fetch(session, url):
    async with session.get(url) as response:
        assert response.status == 200
        return await response.text()

async def process(data):
    print(data)

def gen_urls():
    for s in ["qrx", "conf", "ocu", "pore", "cal", "poll"]:
        yield f"https://{s}.spin.systems"

asyncio.run(main(gen_urls()))
  • Note that here the response doesn’t have a text method, as HTTP/2 is a binary protocol (making it more efficient) not a text protocol

An alternative way of writing this is shown here

Code example

import aiohttp
import asyncio
import time
import httpx

async def call_url(session):
    url = "https://services.cancerimagingarchive.net/services/v3/TCIA/query/getCollectionValues"
    response = await session.request(method='GET', url=url)
    return response

async with httpx.AsyncClient() as session:
    await asyncio.gather(*[call_url(session)

with a little rejigging this becomes:

Code example

import asyncio
import httpx

def gen_urls():
    for s in ["qrx", "conf", "ocu", "pore", "cal", "poll"]:
        yield f"https://{s}.spin.systems"

async def call_url(session, url):
    response = await session.get(url)
    return response

async def main(urls):
    async with httpx.AsyncClient(http2=True) as session:
        await asyncio.gather(*[call_url(session, url) for url in urls])

asyncio.run(main(gen_urls()))

However this lacks the nice itertools-like combinatorial operators from aiostream

We can now put them back in to handle callbacks:

Code example

import asyncio
import httpx
from aiostream import stream

def gen_urls():
    for s in "qrx conf ocu pore cal poll".split():
        yield f"https://{s}.spin.systems"

async def fetch(session, url):
    response = await session.get(url)
    #response.raise_for_status()
    return response


async def process(data):
    print({data.url: data})


async def main(urls):
    async with httpx.AsyncClient(http2=True) as session:
        #await asyncio.gather(*[fetch(session, url) for url in urls])
        ws = stream.repeat(session)
        xs = stream.zip(ws, stream.iterate(urls))
        ys = stream.starmap(xs, fetch, ordered=False, task_limit=10)
        zs = stream.map(ys, process)
        return await zs

asyncio.run(main(gen_urls()))

{URL('https://cal.spin.systems'): <Response [404 not found]>}
{URL('https://pore.spin.systems'): <Response [200 OK]>}
{URL('https://qrx.spin.systems'): <Response [200 OK]>}
{URL('https://conf.spin.systems'): <Response [200 OK]>}
{URL('https://ocu.spin.systems'): <Response [200 OK]>}
{URL('https://poll.spin.systems'): <Response [200 OK]>}
  • Note that <Response [404 not found]> also shows up in STDERR
    • Uncommenting #response.raise_for_status() will throw an exception (see the example of handling this back at the start)
  • Note that the URL generator is not async (nothing is awaited unlike these examples).
    • Async. funcdefs may call sync. funcdefs but not vice versa
⚠️ **GitHub.com Fallback** ⚠️