async await 工作原理 - CongGreat/async-await GitHub Wiki

Welcome to the async-await wiki!

1.顺序服务器

编写一个一次只处理一个客户端的 TCP 回显服务器很简单。服务器侦听某个端口上的传入连接,当客户端连接时,服务器会与客户端通信,直到连接关闭。然后它继续侦听新连接。这个逻辑可以使用基本的套接字编程来实现

# echo_01_seq.py
import socket

def run_server(host='127.0.0.1', port=55555):
    sock = socket.socket()
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind((host, port))
    sock.listen()
    while True:
        client_sock, addr = sock.accept()
        print('Connection from', addr)
        handle_client(client_sock)

def handle_client(sock):
    while True:
        received_data = sock.recv(4096)
        if not received_data:
            break
        sock.sendall(received_data)
        print('Client disconnected:', sock.getpeername())
        sock.close()


if __name__ == '__main__':
    run_server()
上述代码做的事如下:
  1. socket.socket():创建一个新的 TCP/IP 套接字

  2. sock.bind():将套接字绑定到地址和端口

  3. sock.listen():将套接字标记为listening

  4. sock.accept():接受新的连接

  5. sock.recv():从客户端读取数据,并使用sock.sendall()将数据发送回客户端

使用一个简单的客户端来模拟请求,对上述代码进行测试。

上述服务器可以正常运行,但是有两个明显的缺点:
  1. 速度比较慢,并发性能差

  2. 如果其中某个客户端阻塞了,其余的客户端也无法获得响应

2.多线程服务器

为了解决顺序服务器的并发性能问题,我们首先想到了使用多线程。

handle_client()在单独的线程中运行该函数,其余代码保持不变

# echo_02_threads.py

import socket
import threading


def run_server(host='127.0.0.1', port=55555):
    sock = socket.socket()
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind((host, port))
    sock.listen()
    while True:
        client_sock, addr = sock.accept()
        print('Connection from', addr)
        thread = threading.Thread(target=handle_client, args=[client_sock])
        thread.start()


def handle_client(sock):
    while True:
        received_data = sock.recv(4096)
        if not received_data:
            break
        sock.sendall(received_data)

    print('Client disconnected:', sock.getpeername())
    sock.close()


if __name__ == '__main__':
    run_server()
不足之处

这种方法一定程度上解决了并发性的问题,但进一步思考,会发现它有缺点:

  • 扩展性差,线程是一种昂贵的操作,当我们的连接很多时,需要占用大量的系统资源,并且还容易成为DoS攻击的目标

3.线程池

为了解决上述问题,我们可以很自然的想到,引入线程池,这样可以有效地控制线程的总数。

# echo_03_thread_pool.py

import socket
from concurrent.futures import ThreadPoolExecutor


pool = ThreadPoolExecutor(max_workers=20)


def run_server(host='127.0.0.1', port=55555):
    sock = socket.socket()
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind((host, port))
    sock.listen()
    while True:
        client_sock, addr = sock.accept()
        print('Connection from', addr)
        pool.submit(handle_client, client_sock)


def handle_client(sock):
    while True:
        received_data = sock.recv(4096)
        if not received_data:
            break
        sock.sendall(received_data)

    print('Client disconnected:', sock.getpeername())
    sock.close()


if __name__ == '__main__':
    run_server()
不足之处

线程池的方法看起来简单实用,但是,你需要处理很多额外的事情来保证线程池的效率,如:

1.你需要处理阻塞的连接

2.你需要删除无用的长连接

4.IO多路复用 + 事件循环

让我们回到顺序服务器,进一步思考,会发现:

顺序服务器大部分的时间花在了等待上。当它没有连接的客户端时,它等待新的客户端连接。当它有一个连接的客户端时,它会等待这个客户端发送一些数据。

为了提高并发性能,服务器在等待时应该能够处理接下来发生的任何事件。 如果当前客户端没有发送任何内容,但有新客户端尝试连接,则服务器应接受新连接。它应该保持多个活动连接并回复接下来发送数据的任何客户端。

而操作系统早已考虑过上面的问题,并我们提供了接口。这便是io多路复用机制,如select()poll()epoll()。 它能将所有需要监听的文件描述符放入一个选择器中,当某个文件描述符为可读/可写状态时,便可选择出它来。

# 实例化selector
sel = selectors.DefaultSelector()

# 注册事件
sel.register(sock, selectors.EVENT_READ, my_data)

# 轮询并返回已经准备好的第一个文件对象
keys_events = sel.select()
# echo_04_io_multiplexing.py

import socket
import selectors


sel = selectors.DefaultSelector()


def setup_listening_socket(host='127.0.0.1', port=55555):
    sock = socket.socket()
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind((host, port))
    sock.listen()
    sel.register(sock, selectors.EVENT_READ, accept)


def accept(sock):
    client_sock, addr = sock.accept()
    print('Connection from', addr)
    sel.register(client_sock, selectors.EVENT_READ, recv_and_send)


def recv_and_send(sock):
    received_data = sock.recv(4096)
    if received_data:
        # assume sendall won't block
        sock.sendall(received_data)
    else:
        print('Client disconnected:', sock.getpeername())
        sel.unregister(sock)
        sock.close()


def run_event_loop():
    while True:
        for key, _ in sel.select():
            callback = key.data
            sock = key.fileobj
            callback(sock)


if __name__ == '__main__':
    setup_listening_socket()
    run_event_loop()

不足之处

我们假定了sendall不会阻塞,但事实上是有可能阻塞的,而我们的代码没有处理这种情况

这里我们首先accept()在监听套接字上注册一个回调。此回调接受新客户端并recv_and_send()在每个客户端套接字上注册一个回调。程序的核心是事件循环(一个无限循环),在每次迭代时选择准备好的套接字并调用相应的注册回调。

服务器的事件循环版本可以完美地处理多个客户端,但是这种方法是用一种奇怪的写法来实现的:以回调的方式来实现的没有人会喜欢这种编码方式,尤其是当我们需要实现更复杂的服务器逻辑时

操作系统线程不会将回调风格的编程强加给我们,但它们提供了良好的并发性。它们是怎么做到的?这里的关键是操作系统挂起和恢复线程执行的能力。如果我们有可以像 OS 线程一样挂起和恢复的函数,我们就可以编写并发的单线程代码。你猜怎么着?Pyhon 允许我们编写这样的函数。

5.使用yield

# echo_05_yield_no_io.py

import socket

from event_loop_01_no_io import EventLoopNoIO


loop = EventLoopNoIO()


def run_server(host='127.0.0.1', port=55555):
    sock = socket.socket()
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind((host, port))
    sock.listen()
    while True:
        yield
        client_sock, addr = sock.accept()
        print('Connection from', addr)
        loop.create_task(handle_client(client_sock))


def handle_client(sock):
    while True:
        yield
        received_data = sock.recv(4096)
        if not received_data:
            break
        yield
        sock.sendall(received_data)

    print('Client disconnected:', sock.getpeername())
    sock.close()


if __name__ == '__main__':
    loop.create_task(run_server())
    loop.run()
# event_loop_01_no_io.py

from collections import deque


class EventLoopNoIO:
    def __init__(self):
        self.tasks_to_run = deque([])

    def create_task(self, coro):
        self.tasks_to_run.append(coro)

    def run(self):
        while self.tasks_to_run:
            task = self.tasks_to_run.popleft()
            try:
                next(task)
            except StopIteration:
                continue
            self.create_task(task)

不足之处

这个版本的运行非常受限制,任务以交错方式运行,但它们的顺序是固定的。

例如,如果当前调度的任务是接受新连接的任务,则处理已连接客户端的任务必须等到新客户端连接。

换句话说,这个版本的事件循环没有检查套接字操作是否会阻塞。正如我们所了解的,我们可以通过添加 I/O 多路复用来修复它。事件循环不应在任务运行后立即重新安排任务,而应仅在任务正在等待的套接字可供读取(或写入)时重新安排任务。任务可以通过调用一些事件循环方法来注册其读取或写入套接字的意图。或者它可以将yield这些信息传递给事件循环。下面是采用后一种方法的服务器版本

6.yield + io多路复用

# echo_06_yield_io.py

import socket

from event_loop_02_io import EventLoopIo


loop = EventLoopIo()


def run_server(host='127.0.0.1', port=55555):
    sock = socket.socket()
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind((host, port))
    sock.listen()
    while True:
        yield 'wait_read', sock
        client_sock, addr = sock.accept()
        print('Connection from', addr)
        loop.create_task(handle_client(client_sock))


def handle_client(sock):
    while True:
        yield 'wait_read', sock
        received_data = sock.recv(4096)
        if not received_data:
            break
        yield 'wait_write', sock
        sock.sendall(received_data)

    print('Client disconnected:', sock.getpeername())
    sock.close()


if __name__ == '__main__':
    loop.create_task(run_server())
    loop.run()
# event_loop_02_io.py

from collections import deque
import selectors


class EventLoopIo:
    def __init__(self):
        self.tasks_to_run = deque([])
        self.sel = selectors.DefaultSelector()

    def create_task(self, coro):
        self.tasks_to_run.append(coro)

    def run(self):
        while True:
            if self.tasks_to_run:
                task = self.tasks_to_run.popleft()
                try:
                    op, arg = next(task)
                except StopIteration:
                    continue

                if op == 'wait_read':
                    self.sel.register(arg, selectors.EVENT_READ, task)
                elif op == 'wait_write':
                    self.sel.register(arg, selectors.EVENT_WRITE, task)
                else:
                    raise ValueError('Unknown event loop operation:', op)
            else:
                for key, _ in self.sel.select():
                    task = key.data
                    sock = key.fileobj
                    self.sel.unregister(sock)
                    self.create_task(task)

不足之处

这个方法看起来完美,因为:

  • 首先,我们得到可以完美处理多个客户端的服务器

  • 其次,我们得到看起来像常规顺序代码的代码

但我们必须编写事件循环,这是通常不需要我们自己做的事情

  • 事件循环随库一起提供,在 Python 中,您最有可能使用asyncio

进一步

  • 当您使用生成器进行多任务处理时,就像我们在本节中所做的那样,您通常将它们称为协程。协程是可以通过显式让出控制权来暂停的函数,所以,根据这个定义,带有yield表达式的简单生成器可以算作协程。然而,一个真正的协程还应该能够通过调用其他协程将控制权让给它们,但生成器只能将控制权交给调用者。

  • 如果尝试将一些生成器的代码分解为子生成器,我们将看到为什么我们需要真正的协程。考虑handle_client()生成器的这两行代码:

    yield 'wait_read', sock
    received_data = sock.recv(4096)
    
  • 将它们分解为一个单独的函数会非常方便:

    def async_recv(sock, n):
        yield 'wait_read', sock
        return sock.recv(n)
    
  • 然后这样调用函数

    received_data = async_recv(sock, 4096)
    
  • 但实际上这样运行不了,因为async_recv()返回的是一个生成器,而不是received_data。我们需要使用next调用生成器,这样的话我们还需要处理StopIteration异常,这显然超过了分解两行代码的好处。

  • 在之后的python版本中,加入了send,并且可以使用yield调用子生成器,我们可以像这样调用,但是这样需要在事件循环中实现一些非常规的逻辑。

    received_data = yield async_recv(sock)
    

7.使用yield from

python3.3中,加入了一个更直观的方式来实现协程: yield from的主要步骤是:

  1. 使用send(None)运行一次子生成器。如果send()引发StopIteration异常,则捕获异常,提取结果,使其成为yield from表达式的值并停止。

  2. 如果子生成器的send()返回值无异常,则yield该值并接收一个发送给生成器的值。

  3. 当接收到一个值时,重复步骤 1 但这次send()是接收到的值。

  • 所以我们延续上一版本的思路,可以这样调用子生成器

    received_data = yield from async_recv(sock)
    

改写后:

# event_loop_03_yield_from.py

from collections import deque
import selectors


class EventLoopYieldFrom:
    def __init__(self):
        self.tasks_to_run = deque([])
        self.sel = selectors.DefaultSelector()

    def create_task(self, coro):
        self.tasks_to_run.append(coro)

    def sock_recv(self, sock, n):
        yield 'wait_read', sock
        return sock.recv(n)

    def sock_sendall(self, sock, data):
        yield 'wait_write', sock
        sock.sendall(data)

    def sock_accept(self, sock):
        yield 'wait_read', sock
        return sock.accept()

    def run(self):
        while True:
            if self.tasks_to_run:
                task = self.tasks_to_run.popleft()
                try:
                    op, arg = next(task)
                except StopIteration:
                    continue

                if op == 'wait_read':
                    self.sel.register(arg, selectors.EVENT_READ, task)
                elif op == 'wait_write':
                    self.sel.register(arg, selectors.EVENT_WRITE, task)
                else:
                    raise ValueError('Unknown event loop operation:', op)
            else:
                for key, _ in self.sel.select():
                    task = key.data
                    sock = key.fileobj
                    self.sel.unregister(sock)
                    self.create_task(task)
# echo_07_yield_from.py

import socket

from event_loop_03_yield_from import EventLoopYieldFrom


loop = EventLoopYieldFrom()


def run_server(host='127.0.0.1', port=55555):
    sock = socket.socket()
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind((host, port))
    sock.listen()
    while True:
        client_sock, addr = yield from loop.sock_accept(sock)
        print('Connection from', addr)
        loop.create_task(handle_client(client_sock))


def handle_client(sock):
    while True:
        received_data = yield from loop.sock_recv(sock, 4096)
        if not received_data:
            break
        yield from loop.sock_sendall(sock, received_data)

    print('Client disconnected:', sock.getpeername())
    sock.close()


if __name__ == '__main__':
    loop.create_task(run_server())
    loop.run()

进一步

  • 这种方式很好的实现了协程,代码也和普通的顺序代码很相似

  • 那async/await又是什么呢?其实它只是一种语法特性,用于消除生成器、协程、普通函数之间的歧义

8.使用async/await

  • 当您看到一个生成器函数时,您不能立即说出它是打算用作常规生成器还是用作协程。

  • 在这两种情况下,函数看起来都像普通函数,都使用def定义,并包含一堆yield/yield from表达式。因此,为了使协程成为一个独特的概念,PEP 492Python 3.5 中引入了async/await关键字。

  • awaityield from在协程中所做的事情完全一样,await只是实现了一些额外的检查来保证等待的对象是awaited,而不是生成器或者可迭代对象

我们将服务器的函数标记为async并将yield from调用更改为await调用:

# echo_08_async_await.py

import socket

from event_loop_04_async_await import EventLoopAsyncAwait


loop = EventLoopAsyncAwait()


async def run_server(host='127.0.0.1', port=55555):
    sock = socket.socket()
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind((host, port))
    sock.listen()
    while True:
        client_sock, addr = await loop.sock_accept(sock)
        print('Connection from', addr)
        loop.create_task(handle_client(client_sock))


async def handle_client(sock):
    while True:
        received_data = await loop.sock_recv(sock, 4096)
        if not received_data:
            break
        await loop.sock_sendall(sock, received_data)

    print('Client disconnected:', sock.getpeername())
    sock.close()


if __name__ == '__main__':
    loop.create_task(run_server())
    loop.run() 

然后我们修改事件循环。 我们用@types.coroutine 装饰生成器函数,以便它们可以与 await 一起使用并通过调用 send(None) 而不是 next() 来运行任务

# event_loop_04_async_await.py

from collections import deque
import selectors
import types


class EventLoopAsyncAwait:
    def __init__(self):
        self.tasks_to_run = deque([])
        self.sel = selectors.DefaultSelector()

    def create_task(self, coro):
        self.tasks_to_run.append(coro)

    @types.coroutine
    def sock_recv(self, sock, n):
        yield 'wait_read', sock
        return sock.recv(n)

    @types.coroutine
    def sock_sendall(self, sock, data):
        yield 'wait_write', sock
        sock.sendall(data)

    @types.coroutine
    def sock_accept(self, sock):
        yield 'wait_read', sock
        return sock.accept()

    def run(self):
        while True:
            if self.tasks_to_run:
                task = self.tasks_to_run.popleft()
                try:
                    op, arg = task.send(None)
                except StopIteration:
                    continue

                if op == 'wait_read':
                    self.sel.register(arg, selectors.EVENT_READ, task)
                elif op == 'wait_write':
                    self.sel.register(arg, selectors.EVENT_WRITE, task)
                else:
                    raise ValueError('Unknown event loop operation:', op)
            else:
                for key, _ in self.sel.select():
                    task = key.data
                    sock = key.fileobj
                    self.sel.unregister(sock)
                    self.create_task(task)

参考: https://tenthousandmeters.com/blog/python-behind-the-scenes-12-how-asyncawait-works-in-python/