事件驱动实现I O多路复用 - downtiser/python-one GitHub Wiki

通过gevent模块已经能够实现I/O切换,但还没实现I/O完成后的自动切回,要实现这个功能,需要用到事件驱动编程范式

事件驱动,每收到一个请求,将请求直接扔到事件队列中,交由另一个专门处理事件的循环来处理,一个只负责接受事件,另一个只负责处理事件,所以用户发出的每一个请求都不会被遗漏,差别只是执行时间不同,先发出的请求先被解决.当外部事件发生时使用回调机制来触发相应的处理.模型大体如下:

  • 有一个事件队列,用于存放请求
  • 用户以某种方式发出一个请求,该请求会被立即放到队列中
  • 有一个循环不断从队列中取出事件,并调用相应的方法处理事件
  • 每个事件都有属于自己的处理方法

相比于单线程效率低下和多线程麻烦的线程加锁来保护共享资源,事件驱动更容易推断出程序的行为,节省资源,同时在处理慢速的I/O操作时,有着极大的优势,另外通过事件驱动的回调机制,可以在一个事件处理完毕后指定其执行一个函数。因为I/O操作是通过OS执行的,所以可以利用回调机制告诉OS,当I/O操作执行完毕后回调,返回一个信号,通知主程序这步I/O已经执行完毕了,可以继续执行这个协程,以实现自动切回.这个功能的实现,其实就是I/O多路复用

在Linux里,几个概念

  • 内存空间被划分为了两部分,一部分为内核空间,另一部分为用户空间,用户无权访问内核空间,只能通过OS的接口间接访问内核.
  • 为了控制进程的执行,内核必须有能力挂起正在CPU上运行的进程,并恢复以前挂起的某个进程的执行。这种行为被称为进程切换。因此可以说,任何进程都是在操作系统内核的支持下运行的,是与内核紧密相关的。
  • 正在执行的进程,由于期待的某些事件未发生,如请求系统资源失败、等待某种操作的完成、新数据尚未到达或无新工作做等,则由系统自动执行阻塞原语(Block),使自己由运行状态变为阻塞状态。可见,进程的阻塞是进程自身的一种主动行为,也因此只有处于运行态的进程(获得CPU),才可能将其转为阻塞状态。当进程进入阻塞状态,是不占用CPU资源的。
  • 文件描述符(file descriptor)是一个用于表述指向文件的引用的抽象化概念.文件描述符在形式上是一个非负整数。实际上,它是一个索引值,指向内核为每一个进程所维护的该进程打开文件的记录表。当程序打开一个现有文件或者创建一个新文件时,内核向进程返回一个文件描述符。
  • 缓存 I/O 又被称作标准 I/O,大多数文件系统的默认 I/O 操作都是缓存 I/O。在 Linux 的缓存 I/O 机制中,操作系统会将 I/O 的数据缓存在文件系统的页缓存( page cache )中,也就是说,数据会先被拷贝到操作系统内核的缓冲区中,然后才会从操作系统内核的缓冲区拷贝到应用程序的地址空间

Linux下的四种主要网络模式方案

  1. 阻塞I/O:socket时在等待数据到来和将数据从kernal复制到用户内存的两个阶段都阻塞
  2. 非阻塞I/O: 在等待数据到来时不会阻塞,如果数据未准备好,kernal会向用户返回一个值,通知用户数据未准备好,直到接收到数据并收到用户请求时才会下一步复制数据,所以只在第二个阶段有阻塞.需要第一阶段需要用户不断询问数据是否准备完成
  3. 多路复用I/O:类似于阻塞I/O,但其强大之处在于通过select可以一次传递大量(几百个)socket给kernal,只要有一个socket接收到了数据,就会通知用户,待用户发出获取数据的请求时,再执行第二阶段。所以,I/O 多路复用的特点是通过一种机制一个进程能同时等待多个文件描述符,而这些文件描述符(套接字描述符)其中的任意一个进入读就绪状态,select()函数就可以返回 4.异步I/O:当用户向kernal发起一个请求时,不会有任何阻塞,发送完后可以立即去执行其他任务,内核会负责等待接受数据,并将接收到的数据复制到用户内存中,一切完成后向用户发出一个信号,通知用户可以获取数据。和多路I/O相比,取消了第二阶段的阻塞,对于用户来说,效率更高

select poll epoll都能够实现多路复用

select的缺点是默认只支持1024个连接,而且不会返回是哪些连接接收到了的数据,poll解除了限制,但其它没什么长进,而epoll具有所有的优点,目前应用最广泛,不过不支持Windows

select实现多路复用

import socket, select, queue

server = socket.socket()
server.setblocking(False)  #将服务器端设置为非阻塞模式
server.bind(('localhost', 6080))
server.listen(50)

client_in = [server,]  #这个列表用于让select检测其中的socket实例是否发送数据(是否活动)
client_out = []
queue_dic = {}
print('等待连接....')
while True:
    read, write, error = select.select(client_in, client_out, client_in)
    # 第一个参数代表要监测的socket,第二个参数用于用于监控和接受要发送的数据,第三个表示要select检测是否异常的socket
    # 返回3个列表,分别存储发送数据的socket,要接受数据的socket,出现异常的socket
    try:
        print(read,error)
        for r in read:
            if r == server: #如果是sever有活动,说明有新的连接接入.
                conn, addr = server.accept()
                print('有客户端登入:',addr)
                client_in.append(conn)
                queue_dic[conn] = queue.Queue() #为实例创建要接受数据的队列
            else: #说明客户端在发送数据
                data = r.recv(1024)
                if data:
                    print(data.decode())
                    queue_dic[r].put(data) #将要转发给客户端的数据存入字典中
                    client_out.append(r)   #在下一次循环时让select返回这个socket到write列表中
                else:
                    if r in client_out:
                        client_out.remove(r)
                    queue_dic.pop(r)
                    client_in.remove(r)
                    print('客户端已断开')
                    r.close()

        for w in write: #处理要接受数据的socket
            client_data = queue_dic[w].get()
            w.send(client_data.decode().upper().encode())
            if queue_dic[w].qsize() == 0:
                client_out.remove(w)

        for e in error:  #找到异常的socket,并将其移出观测列表和数据列表
            if e in client_out:
                client_out.remove(e)
            queue_dic.pop(e)
            client_in.remove(e)
    except ConnectionResetError:
        for e in read:
            if e in client_out:
                client_out.remove(e)
            queue_dic.pop(e)
            client_in.remove(e)

        print('客户端强制中断了连接!')

select效率不高,在python上还有一个模块selector,是对select,poll,epoll的上层封装,默认使用的是epoll

#Downtiser
import selectors #对epoll的封装,默认使用epoll,在windows上由于不支持epoll,会使用select
import socket

sel = selectors.DefaultSelector() #类似于select.select()


def accept(sock, mask):
    conn, addr = sock.accept()  # 创建socket连接
    print('mask>>>',mask)
    print('accepted', conn, 'from', addr)
    conn.setblocking(False)
    sel.register(conn, selectors.EVENT_READ, read)   #新链接注册回调函数为read


def read(conn, mask):
    data = conn.recv(1024)
    if data:
        print(data.decode())
        conn.send(data)
    else:
        print('closing', conn)
        sel.unregister(conn)
        conn.close()


server = socket.socket()
server.bind(('localhost', 6080))
server.listen(100)
server.setblocking(False)
sel.register(server, selectors.EVENT_READ, accept)  #当server端有socket连入时就执行accept函数

while True:
    events = sel.select()   # 默认为阻塞,当有活动时,就继续执行
    for key, mask in events:
        callback = key.data  #callback就是accept
        callback(key.fileobj, mask)  #key.fileobj是文件句柄