浅谈golang的io模型 - gistao/blog GitHub Wiki
在聊io模型前,先说说应用场景,毕竟离开具体场景谈技术基本就是空谈了。windows平台的完成端口很牛逼,但这里我们只聊Linux相关的;numa架构网络吞吐很牛逼,但这里我们只聊SMP架构下的;阻塞、非阻塞、多路复用等网络api技术迭代很久了,我们只聊epoll ET模型下的;业务层面有专注上传的,有专注下载的,这里我们聊req ack模式的。这些应用比较广,技术适用性强,所以基于这些前提聊。
消息的流大致抽象成 recv -> process -> write
- 一旦有可读,那么就去最快的读
这里有两个因子:及时发现和高效读。
发现是说程序需要及时的调用epoll_ctl将fd事件添加到红黑树(红黑树),注册事件到内核并等待回调,或者直接看下是否已经有事件ready了,为了减少人为delay,所以最好是在fd生命周期内仅add一次,del一次,避免fd被频繁的add和del,LT模型是当事件ready时会一直通知,比如可以写了,但是程序没有数据可写,为了避免频繁通知将fd写摘掉,等到有数据了再添加,这样就不好了。及时发现的关键是程序能不能时时刻刻在wait,比如程序get到事件了,然后程序处理事件,处理这段时间内没有再wait,这期间有事件ready但程序无感知,这就是人为的delay,从编码来讲就是要利用好多核去wait;
高效,打个比方在网上买个东西,供应商的本地仓库无疑是最快的,是高效的,而如果供应商要去其他仓库调货过来再送就太慢了,这个从编码来说就是cache affinity。
- 一旦读完,那么最快的消费
A线程读完,然后发送给B线程消费,会有个切换的代价和delay,最简单的做法就是读和消费是绑定在一起的,在一个时间序上,站在线程的角度上看这俩就是在一个线程上。
- 一旦消费完,那么最快的写
还是那句不能人为的delay,当可写时应立即写,这块相对简单,类同于读机制。
- load balancing
线程A收到N个消息,负载较高,而线程B确很闲,或者线程A执行消息n时花费了过量时间,导致其他消息延迟,这里有个任务均衡问题,通过固定的算法将任务打散(work sharing),但任务大小是不可预估的,或者运行一段后,又出现了均衡问题,需要再平衡(work stealing)。
- 当资源(cpu、mem)达到瓶颈时,优先保证process
资源瓶颈时应优先保证process,如果保recv的话,会减少process过多的时间片,导致recv的数据不能被处理,继而影响到空闲内存,只有process顺畅,write和recv才能良性循环,这是业务天然决定的。
这里有个流控(flow control)的概念,一般是通过申请token来控制任务并发度,通过队列长度来控制任务排队度,通过交互来控制对端重试频率。
- io/worker模型
io线程处理recv和write,worker线程处理process,通过队列以达到解耦互不影响效果,但是引入了很多问题,比如内存过载,需要额外添加内存水位策略来解决,同时结合前边分析来看不是很高效。
- nginx模型
nginx每个worker进程将读写捆绑在一起,整体通过锁来实现fd的work sharing,并且可以通过进程和cpu核绑定以加强affinity,看起来不错,但是每个worker进程彼此独立,没有work stealing,且在进程间实现这个不靠谱,这个问题在一次请求=一次连接业务下没有问题,因为连接均衡了请求处理自然均衡,但对于连接和请求不是一比一的业务,这个问题就显得相当严重。
这两天突然流行起来马斯克的第一性原理,这个原理大意是不要类比,看别人这么做自己也那么做必定创新很小,这不就是我国的知其然知其所以然嘛。
网上分析源码的很多了,这里通过程序伪代码,带着问题来分析一下(go1.13版本)。
lt, err := net.ListenTCP("tcp", addr)
for {
con, err := lt.AcceptTCP()
go func() {
in := con.recv()
out := process(in)
con.write(out)
}
}
- 伪代码ListenTCP对应的go源码逻辑
newFD,然后一路init
var serverInit sync.Once
func (pd *pollDesc) init(fd *FD) error {
serverInit.Do(runtime_pollServerInit)
ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
sync.Once,可以看到runtime_pollServerInit只会被执行一次,这个函数会调用
func netpollinit() {
epfd = epollcreate1(_EPOLL_CLOEXEC)
相当于创建了一个全局唯一epoll对象
runtime_pollOpen(uintptr(fd.Sysfd))这个函数调用了
func netpollopen(fd uintptr, pd *pollDesc) int32 {
var ev epollevent
ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
*(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}
相当于将listen fd加入到epoll中,用的是ET模式,另外,所有的fd在生命周期只会被add和del一次,go的实现是不错的。
为了只add一次,所有fd的可写事件必然一起add,如果一个新连接进来,可写事件是会被立即检查出的,你可能会担心程序空跑一次,在go里不用担心, 首先调度是不应该被io wait住的,超时时间设置的是0:不等待。
func netpoll(block bool) gList {
if epfd == -1 {
return gList{}
}
waitms := int32(-1)
if !block {
waitms = 0
}
var events [128]epollevent
retry:
n := epollwait(epfd, &events[0], int32(len(events)), waitms)
netpoll执行了epollwait来检查ready事件,这个函数可以被所有的M执行,包括监控M(sysmon)和M(gc),注意虽然M数量和cpu核数量不一样,但是P和cpu核数一样,并且go调度保证了只有P个并行M,所以go的这块实现符合前边分析的多核wait因素,实现是不错的。
由于epoll是全局唯一,await出多个fd事件是如何sharing的?
// Finds a runnable goroutine to execute.
// Tries to steal from other P's, get g from global queue, poll network.
func findrunnable() (gp *g, inheritTime bool) {
// local runq
if gp, inheritTime := runqget(_p_); gp != nil {
return gp, inheritTime
}
// global runq
if sched.runqsize != 0 {
lock(&sched.lock)
gp := globrunqget(_p_, 0)
unlock(&sched.lock)
if gp != nil {
return gp, false
}
}
// Poll network.
if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
if list := netpoll(false); !list.empty() { // non-blocking
gp := list.pop()
injectglist(&list)
casgstatus(gp, _Gwaiting, _Grunnable)
if trace.enabled {
traceGoUnpark(gp, 0)
}
return gp, false
}
// steal
for i := 0; i < 4; i++ {
for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
if sched.gcwaiting != 0 {
goto top
}
stealRunNextG := i > 2 // first look for ready queues with more than 1 g
if gp := runqsteal(_p_, allp[enum.position()], stealRunNextG); gp != nil {
return gp, false
}
}
}
这个函数是用作线程(M)找可运行的g。
第三段代码:调用netpoll检查事件,取出事件链表,取一个g在当前线程去执行,剩下的全放到g全局可运行队列里去(injectglist)。
第二段代码:从全局队列取出一半事件放在绑定的p的本地队列里,然后直接执行一个g。
第一段代码:从绑定的p本地队列取出g来执行。
第四段代码:从别的p本地队列偷一半g过来。第二段直接取一半有没有担心过多,没事,这就被偷了一半,所以看起来go就是用分而治之的二分法来做的sharing,我相信应该是没做调度测试的,因为要么轮训取1,要么取1半,简单嘛。
本地队列是无锁环形队列,全局队列是双端队列,总结一下sharing就是一把大锁(全局队列)然后大家抢,抢多了(因为你也不知道哪个fd事件重)就被偷(stealing)。感觉不是很好,首先有个大锁,其次说下cache affinity,这里有两层含义,1是事件ready后,其实数据已经被内核从网卡读到缓存里,然后g(ready线程)从缓存拷贝(recv)到用户态这个是1个cache 是否miss;2是数据处理g和1的cache是否miss,这两个在go里都是不考虑的,在不改变go源码的情况下,我们只能尽量保证recv数据后就赶紧处理,内核层面的事情就更不用考虑了,但是有很多业务是全双工的,对端发完一个数据后可能紧跟着发下一个数据,这时我们只能继续recv,然后读到的上一条数据可以go出一个协程来执行,这个协程优先被本P处理,当没有后续数据污染cache时,go调度处理本协程还是有affinity的。还有一个问题是一次recv并没有读完整一个消息,需要我们多次读,那么最后处理的线程上必定不能保证有所有的cache,即发生cache miss。
- 伪代码AcceptTCP对应的go源码
// returns true if IO is ready, or false if timedout or closed
// waitio - wait only for completed IO, ignore errors
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
// need to recheck error states after setting gpp to WAIT
// this is necessary because runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl
// do the opposite: store to closing/rd/wd, membarrier, load of rg/wg
if waitio || netpollcheckerr(pd, mode) == 0 {
gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
}
// be careful to not lose concurrent READY notification
old := atomic.Xchguintptr(gpp, 0)
if old > pdWait {
throw("runtime: corrupted polldesc")
}
return old == pdReady
如果当前没有可被accept的事件,那么当前g会被休眠(gopark),有事件后会被调度唤醒继续执行。伪代码的recv和write和accept流程是一样的,不再罗嗦。
Linux有几个关于ucontext的api,可以保存上下文到堆栈上以及切换上下文,这里的gopark就是将当前栈g切到g0上,g0就是那堆调度代码,意思就是线程m将继续执行下一个g,挂起的g只是在等待被调度的一块代码。
有的业务req后不用等ack,会继续req,server recv后,需要继续recv,第一次recv数据的process需要新起一个g,这个g会放到本p的本地队列里,m将第二次recv gopark后,就接着对process g进行处理了,同一个线程同一个核,cache affinity不错,但是这个g只是放到p本地队列的最后位置,假如还有更靠前的g要处理,那cache就污染了,注意本地队列最大容量256,放不了会放到全局队列里。
网上找的,速度性能呈现数量级的下降,下边是我这一个老机器执行lscpu得到的cache的大小
前边基本已经分析过了,这里做个测试:起一些协程,执行sleep,查看协程跑在哪个线程、哪个核上。
/* CPU affinity in Go */
package main
import (
"fmt"
"math/rand"
// "runtime"
"time"
)
/*
#define _GNU_SOURCE
#include <sched.h>
#include <pthread.h>
void lock_thread(int cpuid) {
pthread_t tid;
cpu_set_t cpuset;
tid = pthread_self();
CPU_ZERO(&cpuset);
CPU_SET(cpuid, &cpuset);
pthread_setaffinity_np(tid, sizeof(cpu_set_t), &cpuset);
}
*/
import "C"
func randSleep() {
time.Sleep(time.Duration(rand.Intn(300)) * time.Millisecond)
}
func worker(id int) {
// runtime.LockOSThread()
// C.lock_thread(C.int(id))
for {
fmt.Printf("worker: %d, TID:%d, CPU: %d\n", id, C.pthread_self(), C.sched_getcpu())
randSleep()
}
}
func main() {
for i := 0; i < 2; i++ {
go worker(i)
}
time.Sleep(1 * time.Second)
}
执行下看看
worker表示g;TID表示线程id;CPU表示cpu核id。
go为了高并发,执行的是相当的飘逸。
取消掉worker函数的两行注释,将g、m和cpu核绑定下增强下cache affinity,再运行一把
不飘了,这需要我们引入cgo,且只能在g层面里设置lockos,相当于每个g要独占一个m,试想一下我们有上百万个连接……
从前边分析可以看出,go是为了高并发而生,而不是高吞吐,并且io只是它调度的一个子集,要求简单可靠,总体看比前边两个io模型要好的多。
cache affinity呢,相当不好。