asyncio - goddes4/python-study-wiki GitHub Wiki

1. Base Event Loop

Run an event loop

  • BaseEventLoop.run_forever() : Run until stop() is called.
  • BaseEventLoop.run_until_complete(coro_or_future) : Run until the Future is done. If the argument is a coroutine object, it is wrapped by async().
  • BaseEventLoop.stop() : Stop running the event loop.
  • BaseEventLoop.close() : Close the event loop. The loop must not be running.

Calls

  • BaseEventLoop.call_soon(callback, *args)
  • BaseEventLoop.call_soon_threadsafe(callback, *args)
* callback ์— keyword parameter ๋ฅผ ์ „๋‹ฌํ•˜๊ณ ์ž ํ•  ๋•Œ functools.partial() ์„ ์‚ฌ์šฉํ•œ๋‹ค.
  loop.call_soon(partial(callback, param1=value1, param2=value2))
  loop.call_soon(lambda: callback(param1=value1, param2=value2))
* handle ์„ return ํ•˜๋ฉฐ ์ด๋ฅผ ์ด์šฉํ•˜์—ฌ ํ•ด๋‹น callback ์„ ์ทจ์†Œํ•  ์ˆ˜ ์žˆ๋‹ค.
* event loop ์™€ ๋‹ค๋ฅธ ์“ฐ๋ ˆ๋“œ์—์„œ callback ์„ ๋“ฑ๋ก ํ•˜๊ณ ์ž ํ•  ๊ฒฝ์šฐ๋„ค๋Š” call_soon_threadsafe ๋ฅผ ์ด์šฉํ•ด์•ผ ํ•œ๋‹ค.

Delayed calls

  • BaseEventLoop.call_later(delay, callback, *args)
  • BaseEventLoop.call_at(when, callback, *args)
  • BaseEventLoop.time()

Tasks

  • BaseEventLoop.create_task(coro)
task = event_loop.create_task(coro)
task.add_done_callback(lambda future: print(future)))

Creating connections

  • BaseEventLoop.create_connection(protocol_factory, host=None, port=None, *, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None)
  • BaseEventLoop.create_server(protocol_factory, host=None, port=None, *, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None)

Watch file descriptors

  • BaseEventLoop.add_reader(fd, callback, *args)
  • BaseEventLoop.remove_reader(fd)
  • BaseEventLoop.add_writer(fd, callback, *args)
  • BaseEventLoop.remove_writer(fd)

UNIX signals

  • BaseEventLoop.add_signal_handler(signum, callback, *args)
  • BaseEventLoop.remove_signal_handler(sig)

Executor

  • BaseEventLoop.run_in_executor(executor, callback, *args)

Error Handling API

  • BaseEventLoop.set_exception_handler(handler)
  • BaseEventLoop.default_exception_handler(context)
  • BaseEventLoop.call_exception_handler(context)

2. Event loops

Event loop functions

  • asyncio.get_event_loop()
  • asyncio.set_event_loop(loop)
  • asyncio.new_event_loop()

3. Tasks and coroutines

Coroutine

  • ํŠน์ • ๊ทœ์น™์„ ๋”ฐ๋ฅด๋Š” generator์ด๋‹ค.
  • @asyncio.corutine์ด ๋ฐ์ฝ”๋ ˆ์ดํŒ… ๋˜์–ด ์žˆ๋‹ค.
  • yield from syntax๋ฅผ ์‚ฌ์šฉํ•œ๋‹ค.
  • coroutine์€ 2๊ฐ€์ง€ ์˜๋ฏธ๋กœ ์‚ฌ์šฉ๋œ๋‹ค.
    • coroutine function : @asyncio.coroutine์ด ๋ฐ์ฝ”๋ ˆ์ดํŒ…๋œ ํ•จ์ˆ˜
    • coroutine object : coroutine function์„ ํ˜ธ์ถœํ•ด์„œ ํš๋“ ๋œ ์˜ค๋ธŒ์ ํŠธ
  • coroutine ๋‚ด๋ถ€์—์„œ ํ•  ์ˆ˜ ์žˆ๋Š” ๊ฒƒ
    • result = yield from future : future๊ฐ€ ์™„๋ฃŒ๋  ๋•Œ ๊นŒ์ง€ ๊ธฐ๋‹ค๋ฆผ
    • result = yield from coroutine : ๋‹ค๋ฅธ coroutine์ด ๊ฒฐ๊ณผ๋ฅผ ์ƒ์‚ฐํ•  ๋•Œ ๊นŒ์ง€ ๊ธฐ๋‹ค๋ฆผ. coroutine expressions์€ ๋‹ค๋ฅธ coroutine ์—์„œ ํ˜ธ์ถœ๋˜์–ด์•ผ ํ•œ๋‹ค.
    • return expression : yield from ์„ ์‚ฌ์šฉํ•ด์„œ ๊ธฐ๋‹ค๋ฆฌ๊ณ  ์žˆ๋Š” coroutine์—๊ฒŒ ๊ฒฐ๊ณผ๋ฅผ ๋ฐ˜ํ™˜ํ•œ๋‹ค.
    • raise exception : yield from ์„ ์‚ฌ์šฉํ•ด์„œ ๊ธฐ๋‹ค๋ฆฌ๊ณ  ์žˆ๋Š” coroutine์—๊ฒŒ ์˜ˆ์™ธ๋ฅผ ๋ฐ˜ํ™˜ํ•œ๋‹ค.
  • coroutine function์€ ํ˜ธ์ถœํ•˜๋Š” ๊ฒƒ์œผ๋กœ ์ฝ”๋“œ๊ฐ€ ์‹คํ–‰๋˜์ง€ ์•Š๊ณ  coroutine object๊ฐ€ ๋ฐ˜ํ™˜ ๋œ๋‹ค.
  • connection object ์‹คํ–‰ ๋ฐฉ๋ฒ•
    • ๋‹ค๋ฅธ coroutine์—์„œ yield from coroutine ํ˜ธ์ถœ์„ ํ†ตํ•ด
    • async() ๋˜๋Š” BaseEventLoop.create_task() ๋ฅผ ํ†ตํ•ด
  • coroutine์€ ์˜ค์ง event loop ๊ฐ€ running ์ƒํƒœ์ผ ๋•Œ ์‹คํ–‰๋œ๋‹ค.

example : hello world coroutine

import asyncio

@asyncio.coroutine
def hello_world():
    print("Hello World!")

loop = asyncio.get_event_loop()
# Blocking call which returns when the hello_world() coroutine is done
loop.run_until_complete(hello_world())
loop.close()

example : coroutine displaying the current date

import asyncio
import datetime

@asyncio.coroutine
def display_date(loop):
    end_time = loop.time() + 5.0
    while True:
        print(datetime.datetime.now())
        if (loop.time() + 1.0) >= end_time:
            break
        yield from asyncio.sleep(1)

loop = asyncio.get_event_loop()
# Blocking call which returns when the display_date() coroutine is done
loop.run_until_complete(display_date(loop))
loop.close()

example : chain coroutines

import asyncio

@asyncio.coroutine
def compute(x, y):
    print("Compute %s + %s ..." % (x, y))
    yield from asyncio.sleep(1.0)
    return x + y

@asyncio.coroutine
def print_sum(x, y):
    result = yield from compute(x, y)
    print("%s + %s = %s" % (x, y, result))

loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()

Future (not thread safe)

  • cancel() : Cancel the future and schedule callbacks.
  • result() : Return the result this future represents.
  • exception() : Return the exception that was set on this future.
  • add_done_callback(fn) : Add a callback to be run when the future becomes done.
  • set_result(result) : Mark the future done and set its result.
  • set_exception(exception) : Mark the future done and set an exception.

example : future with run_until_complete()

The run_until_complete() method uses internally the add_done_callback() method to be notified when the future is done.

import asyncio

@asyncio.coroutine
def slow_operation(future):
    yield from asyncio.sleep(1)
    future.set_result('Future is done!')

loop = asyncio.get_event_loop()
future = asyncio.Future()
asyncio.ensure_future(slow_operation(future))
loop.run_until_complete(future)
print(future.result())
loop.close()

example : future with run_forever()

import asyncio

@asyncio.coroutine
def slow_operation(future):
    yield from asyncio.sleep(1)
    future.set_result('Future is done!')

def got_result(future):
    print(future.result())
    loop.stop()

loop = asyncio.get_event_loop()
future = asyncio.Future()
asyncio.ensure_future(slow_operation(future))
future.add_done_callback(got_result)
try:
    loop.run_forever()
finally:
    loop.close()

Task (not thread safe)

Future ์˜ ์„œ๋ธŒ ํด๋ž˜์Šค๋กœ์„œ, event loop์—์„œ coroutine object ์˜ ์‹คํ–‰ ์ฑ…์ž„์„ ์ง„๋‹ค.

  • event loop ๋Š” ํ•œ๋ฒˆ์— ํ•˜๋‚˜์˜ task ๋งŒ ์‹คํ–‰์ด ๊ฐ€๋Šฅํ•˜๋‹ค.
  • task ๋ฅผ ๋ณ‘๋ ฌ๋กœ ์‹คํ–‰ํ•˜๋ ค๋ฉด ๋‹ค๋ฅธ ์“ฐ๋ ˆ๋“œ์—์„œ ๋‹ค๋ฅธ event loop ๋กœ task ๋ฅผ ์‹คํ–‰ํ•ด์•ผ ํ•œ๋‹ค.
  • task๊ฐ€ future๊ฐ€ ์™„๋ฃŒ๋ฅผ ๊ธฐ๋‹ค๋ฆฌ๋Š” ๋™์•ˆ event loop ๋Š” ์ƒˆ๋กœ์šด task ๋ฅผ ์‹คํ–‰ํ•œ๋‹ค.
  • task ์ธ์Šคํ„ด์Šค๋ฅผ ์ง์ ‘ ์ƒ์„ฑํ•˜์ง€ ๋ง๊ณ , async() ๋˜๋Š” BaseEventLoop.create_task() ๋ฅผ ์ด์šฉํ•œ๋‹ค.

example : parallel execution of tasks

import asyncio

@asyncio.coroutine
def factorial(name, number):
    f = 1
    for i in range(2, number+1):
        print("Task %s: Compute factorial(%s)..." % (name, i))
        yield from asyncio.sleep(1)
        f *= i
    print("Task %s: factorial(%s) = %s" % (name, number, f))

loop = asyncio.get_event_loop()
tasks = [
    asyncio.ensure_future(factorial("A", 2)),
    asyncio.ensure_future(factorial("B", 3)),
    asyncio.ensure_future(factorial("C", 4))]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

Task functions

  • asyncio.ensure_future(coro_or_future, *, loop=None)
  • asyncio.async(coro_or_future, *, loop=None)
  • asyncio.wait(futures, *, loop=None, timeout=None, return_when=ALL_COMPLETED)
  • asyncio.wait_for(fut, timeout, *, loop=None)

4. Transports and protocols (low-level API)

Transports

  • Transport class ๋“ค์€ ๋‹ค์–‘ํ•œ ์ข…๋ฅ˜์˜ ํ†ต์‹  ์ฑ„๋„์„ ์ถ”์ƒํ™” ํ•œ๋‹ค.
  • ์ผ๋ฐ˜์ ์œผ๋กœ ์‚ฌ์šฉ์ž๊ฐ€ ์ง์ ‘ ์ƒ์„ฑํ•˜๋Š” ํ•˜์ง€ ์•Š๋Š”๋‹ค. ๋Œ€์‹ , BaseEventLoop ์˜ ๋ฉ”์†Œ๋“œ๋ฅผ ์ด์šฉํ•œ๋‹ค.
  • TCP, UDP, SSL, subprocess pipe ๋ฅผ ์ง€์›ํ•œ๋‹ค.

Protocols

  • Protocol class ๋Š” ์‚ฌ์šฉ์ž ๋„คํŠธ์›Œํฌ ํ”„๋กœํ† ์ฝœ์„ ๊ตฌํ˜„ํ•  base class์ด๋‹ค.
  • Protocol class ๋Š” transport ์™€ ์—ฐ๊ฒฐ๋˜์–ด ์‚ฌ์šฉ๋œ๋‹ค.
  • ์ˆ˜์‹  ๋ฐ์ดํ„ฐ์˜ ํŒŒ์‹ฑ๊ณผ, ์–ด๋–ค ๋ฐ์ดํ„ฐ๋ฅผ ๋ณด๋‚ผ์ง€์— ๋Œ€ํ•œ ์ฒ˜๋ฆฌ๋ฅผ ํ•œ๋‹ค.

Protocol examples

TCP echo client protocol

import asyncio

class EchoClientProtocol(asyncio.Protocol):
    def __init__(self, message, loop):
        self.message = message
        self.loop = loop

    def connection_made(self, transport):
        transport.write(self.message.encode())
        print('Data sent: {!r}'.format(self.message))

    def data_received(self, data):
        print('Data received: {!r}'.format(data.decode()))

    def connection_lost(self, exc):
        print('The server closed the connection')
        print('Stop the event lop')
        self.loop.stop()

loop = asyncio.get_event_loop()
message = 'Hello World!'
coro = loop.create_connection(lambda: EchoClientProtocol(message, loop),
                              '127.0.0.1', 8888)
loop.run_until_complete(coro)
loop.run_forever()
loop.close()

TCP echo server protocol

import asyncio

class EchoServerClientProtocol(asyncio.Protocol):
    def connection_made(self, transport):
        peername = transport.get_extra_info('peername')
        print('Connection from {}'.format(peername))
        self.transport = transport

    def data_received(self, data):
        message = data.decode()
        print('Data received: {!r}'.format(message))

        print('Send: {!r}'.format(message))
        self.transport.write(data)

        print('Close the client socket')
        self.transport.close()

loop = asyncio.get_event_loop()
# Each client connection will create a new protocol instance
coro = loop.create_server(EchoServerClientProtocol, '127.0.0.1', 8888)
server = loop.run_until_complete(coro)

# Serve requests until CTRL+c is pressed
print('Serving on {}'.format(server.sockets[0].getsockname()))
try:
    loop.run_forever()
except KeyboardInterrupt:
    pass

# Close the server
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()

UDP echo client protocol

import asyncio

class EchoClientProtocol:
    def __init__(self, message, loop):
        self.message = message
        self.loop = loop
        self.transport = None

    def connection_made(self, transport):
        self.transport = transport
        print('Send:', self.message)
        self.transport.sendto(self.message.encode())

    def datagram_received(self, data, addr):
        print("Received:", data.decode())

        print("Close the socket")
        self.transport.close()

    def error_received(self, exc):
        print('Error received:', exc)

    def connection_lost(self, exc):
        print("Socket closed, stop the event loop")
        loop = asyncio.get_event_loop()
        loop.stop()

loop = asyncio.get_event_loop()
message = "Hello World!"
connect = loop.create_datagram_endpoint(
    lambda: EchoClientProtocol(message, loop),
    remote_addr=('127.0.0.1', 9999))
transport, protocol = loop.run_until_complete(connect)
loop.run_forever()
transport.close()
loop.close()

UDP echo server protocol

import asyncio

class EchoServerProtocol:
    def connection_made(self, transport):
        self.transport = transport

    def datagram_received(self, data, addr):
        message = data.decode()
        print('Received %r from %s' % (message, addr))
        print('Send %r to %s' % (message, addr))
        self.transport.sendto(data, addr)

loop = asyncio.get_event_loop()
print("Starting UDP server")
# One protocol instance will be created to serve all client requests
listen = loop.create_datagram_endpoint(
    EchoServerProtocol, local_addr=('127.0.0.1', 9999))
transport, protocol = loop.run_until_complete(listen)

try:
    loop.run_forever()
except KeyboardInterrupt:
    pass

transport.close()
loop.close()

Register an open socket to wait for data using a protocol

import asyncio
try:
    from socket import socketpair
except ImportError:
    from asyncio.windows_utils import socketpair

# Create a pair of connected sockets
rsock, wsock = socketpair()
loop = asyncio.get_event_loop()

class MyProtocol(asyncio.Protocol):
    transport = None

    def connection_made(self, transport):
        self.transport = transport

    def data_received(self, data):
        print("Received:", data.decode())

        # We are done: close the transport (it will call connection_lost())
        self.transport.close()

    def connection_lost(self, exc):
        # The socket has been closed, stop the event loop
        loop.stop()

# Register the socket to wait for data
connect_coro = loop.create_connection(MyProtocol, sock=rsock)
transport, protocol = loop.run_until_complete(connect_coro)

# Simulate the reception of data from the network
loop.call_soon(wsock.send, 'abc'.encode())

# Run the event loop
loop.run_forever()

# We are done, close sockets and the event loop
rsock.close()
wsock.close()
loop.close()

5. Stream (high-level API)

Stream functions

  • asyncio.open_connection(host=None, port=None, *, loop=None, limit=None, **kwds)ยถ
  • asyncio.start_server(client_connected_cb, host=None, port=None, *, loop=None, limit=None, **kwds)

StreamReader

  • read(n=-1)
  • readline()
  • readexactly(n)

StreamWriter

  • write(data)
  • writelines(data)
  • write_eof()

Stream examples

TCP echo client using streams

import asyncio

@asyncio.coroutine
def tcp_echo_client(message, loop):
    reader, writer = yield from asyncio.open_connection('127.0.0.1', 8888,
                                                        loop=loop)

    print('Send: %r' % message)
    writer.write(message.encode())

    data = yield from reader.read(100)
    print('Received: %r' % data.decode())

    print('Close the socket')
    writer.close()

message = 'Hello World!'
loop = asyncio.get_event_loop()
loop.run_until_complete(tcp_echo_client(message, loop))
loop.close()

TCP echo server using streams

import asyncio

@asyncio.coroutine
def handle_echo(reader, writer):
    data = yield from reader.read(100)
    message = data.decode()
    addr = writer.get_extra_info('peername')
    print("Received %r from %r" % (message, addr))

    print("Send: %r" % message)
    writer.write(data)
    yield from writer.drain()

    print("Close the client socket")
    writer.close()

loop = asyncio.get_event_loop()
coro = asyncio.start_server(handle_echo, '127.0.0.1', 8888, loop=loop)
server = loop.run_until_complete(coro)

# Serve requests until CTRL+c is pressed
print('Serving on {}'.format(server.sockets[0].getsockname()))
try:
    loop.run_forever()
except KeyboardInterrupt:
    pass

# Close the server
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()

Get HTTP headers

import asyncio
import urllib.parse
import sys

@asyncio.coroutine
def print_http_headers(url):
    url = urllib.parse.urlsplit(url)
    if url.scheme == 'https':
        connect = asyncio.open_connection(url.hostname, 443, ssl=True)
    else:
        connect = asyncio.open_connection(url.hostname, 80)
    reader, writer = yield from connect
    query = ('HEAD {path} HTTP/1.0\r\n'
             'Host: {hostname}\r\n'
             '\r\n').format(path=url.path or '/', hostname=url.hostname)
    writer.write(query.encode('latin-1'))
    while True:
        line = yield from reader.readline()
        if not line:
            break
        line = line.decode('latin1').rstrip()
        if line:
            print('HTTP header> %s' % line)

    # Ignore the body, close the socket
    writer.close()

url = sys.argv[1]
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(print_http_headers(url))
loop.run_until_complete(task)
loop.close()

Register an open socket to wait for data using streams

import asyncio
try:
    from socket import socketpair
except ImportError:
    from asyncio.windows_utils import socketpair

@asyncio.coroutine
def wait_for_data(loop):
    # Create a pair of connected sockets
    rsock, wsock = socketpair()

    # Register the open socket to wait for data
    reader, writer = yield from asyncio.open_connection(sock=rsock, loop=loop)

    # Simulate the reception of data from the network
    loop.call_soon(wsock.send, 'abc'.encode())

    # Wait for data
    data = yield from reader.read(100)

    # Got data, we are done: close the socket
    print("Received:", data.decode())
    writer.close()

    # Close the second socket
    wsock.close()

loop = asyncio.get_event_loop()
loop.run_until_complete(wait_for_data(loop))
loop.close()

6. Subprocess

Create a subprocess: high-level API using Process

  • asyncio.create_subprocess_exec(*args, stdin=None, stdout=None, stderr=None, loop=None, limit=None, **kwds)
  • asyncio.create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None, loop=None, limit=None, **kwds)

Create a subprocess: low-level API using subprocess.Popen

  • BaseEventLoop.subprocess_exec(protocol_factory, *args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, **kwargs)
  • BaseEventLoop.subprocess_shell(protocol_factory, cmd, *, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, **kwargs)

Constants

  • asyncio.subprocess.PIPE
  • asyncio.subprocess.STDOUT
  • asyncio.subprocess.DEVNULL

Subprocess examples

Subprocess using streams

import asyncio.subprocess
import sys

@asyncio.coroutine
def get_date():
    code = 'import datetime; print(datetime.datetime.now())'

    # Create the subprocess, redirect the standard output into a pipe
    create = asyncio.create_subprocess_exec(sys.executable, '-c', code,
                                            stdout=asyncio.subprocess.PIPE)
    proc = yield from create

    # Read one line of output
    data = yield from proc.stdout.readline()
    line = data.decode('ascii').rstrip()

    # Wait for the subprocess exit
    yield from proc.wait()
    return line

if sys.platform == "win32":
    loop = asyncio.ProactorEventLoop()
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()

date = loop.run_until_complete(get_date())
print("Current date: %s" % date)
loop.close()

7. Synchronization primitives

Locks

Lock (not thread safe)

  • acquire()
  • release()

Usage :

lock = Lock()
...
yield from lock
try:
    ...
finally:
    lock.release()

Context manager usage :

lock = Lock()
...
with (yield from lock):
     ...

Event

  • set()
  • wait()

Condition

  • notify_all()
  • wait()

Semaphores

Semaphore

  • acquire()
  • release()

8. Queues

Queue

PriorityQueue

LifoQueue

JoinableQueue


9. Develop with asyncio

Debug mode of asyncio

  • Detect coroutine objects never scheduled
    • coroutine function๊ฐ€ ํ˜ธ์ถœ๋˜๊ณ  ๊ทธ ๊ฒฐ๊ณผ ์ฆ‰, coroutine object ๊ฐ€ async() ๋˜๋Š” BaseEventLoop.create_task() ์— ์ „๋‹ฌ๋˜์ง€ ์•Š์œผ๋ฉด ํ•ด๋‹ฌ coroutine object์€ ์‹คํ–‰๋˜์ง€ ์•Š๋Š”๋‹ค.
    • ์กฐ๊ฑด : environment variable ์— PYTHONASYNCIODEBUG ๋ฅผ 1 ๋กœ ์„ค์ •
  • Pending task destroyed
    • event loop ์— ์Šค์ผ€์ฅด ๋˜๊ธฐ ์œ„ํ•ด ๋“ฑ๋ก๋œ coroutine object ๊ฐ€ ์ˆ˜ํ–‰๋˜๊ธฐ ์ „๊ฒŒ event loop ๊ฐ€ ์ข…๋ฃŒ๋˜๋ฉด ์—๋Ÿฌ ๋กœ๊ทธ ์ถœ๋ ฅ
    • ์กฐ๊ฑด : event_loop.set_debug(True)
  • Close transports and event loops
    • close()๊ฐ€ ํ˜ธ์ถœ๋˜์ง€ ์•Š์„ ๊ฒฝ์šฐ ResourceWarning ์ถœ๋ ฅ
    • ์กฐ๊ฑด : command line ์— -Wdefault ์ถ”๊ฐ€