LRO - awokezhou/LinuxPage GitHub Wiki

概述

LRO(Large Receive Offload),是 Linux net subsystem 提供的多种网络性能优化方法的一种,用于网络数据接收过程中,将网卡驱动接收上来的数据缓存成一个大包,一次性传给协议栈,以减少协议栈开销

如下图,LRO大致处于网卡驱动与协议栈之间

LRO在net subsysten中的位置

关键代码分析

网卡驱动初始化

在网卡驱动初始化过程中,为netdev数据结构中的LRO管理器“lro_mgr”进行了初始化,设置了最大聚合数据包数量、聚合空间等内容,代码如下

#ifdef CONFIG_XXX_LRO
	xxx_priv->lro_mgr.dev = dev;
        memset(&xxx_priv->lro_mgr.stats, 0, sizeof(xxx_priv->lro_mgr.stats));
        xxx_priv->lro_mgr.features = LRO_F_NAPI;
        xxx_priv->lro_mgr.ip_summed = CHECKSUM_UNNECESSARY;
        xxx_priv->lro_mgr.ip_summed_aggr = CHECKSUM_UNNECESSARY;
        xxx_priv->lro_mgr.max_desc = ARRAY_SIZE(xxx_priv->lro_arr);    /* lro_arr 是一个含有8个元素的数组 */
        xxx_priv->lro_mgr.max_aggr = 64;    /* 最大聚合包数为64 */
        xxx_priv->lro_mgr.frag_align_pad = 0;
        xxx_priv->lro_mgr.lro_arr = xxx_priv->lro_arr;    /* 指向lro_arr数组 */
        xxx_priv->lro_mgr.get_skb_header = rt_get_skb_header;
#endif

这里主要涉及到两个数据结构:LRO管理器net_lro_mgr和LRO聚合空间net_lro_desc,它们定义在include/linux/inet_lro.h中,

struct net_lro_mgr {
	struct net_device *dev;
	struct net_lro_stats stats;

	/* LRO features */
	unsigned long features;
#define LRO_F_NAPI            1  /* Pass packets to stack via NAPI */
#define LRO_F_EXTRACT_VLAN_ID 2  /* Set flag if VLAN IDs are extracted
				    from received packets and eth protocol
				    is still ETH_P_8021Q */

	/*
	 * Set for generated SKBs that are not added to
	 * the frag list in fragmented mode
	 */
	u32 ip_summed;
	u32 ip_summed_aggr; /* Set in aggregated SKBs: CHECKSUM_UNNECESSARY
			     * or CHECKSUM_NONE */

	int max_desc; /* Max number of LRO descriptors  */
	int max_aggr; /* Max number of LRO packets to be aggregated */

	int frag_align_pad; /* Padding required to properly align layer 3
			     * headers in generated skb when using frags */

	struct net_lro_desc *lro_arr; /* Array of LRO descriptors */

	/*
	 * Optimized driver functions
	 *
	 * get_skb_header: returns tcp and ip header for packet in SKB
	 */
	int (*get_skb_header)(struct sk_buff *skb, void **ip_hdr,
			      void **tcpudp_hdr, u64 *hdr_flags, void *priv);

	/* hdr_flags: */
#define LRO_IPV4 1 /* ip_hdr is IPv4 header */
#define LRO_TCP  2 /* tcpudp_hdr is TCP header */

	/*
	 * get_frag_header: returns mac, tcp and ip header for packet in SKB
	 *
	 * @hdr_flags: Indicate what kind of LRO has to be done
	 *             (IPv4/IPv6/TCP/UDP)
	 */
	int (*get_frag_header)(struct skb_frag_struct *frag, void **mac_hdr,
			       void **ip_hdr, void **tcpudp_hdr, u64 *hdr_flags,
			       void *priv);
};

该结构体有一些标志位,几个函数指针,然后就是初始化中看到的最大包聚合数、聚合空间数量/指针等

struct net_lro_desc {
	struct sk_buff *parent;
	struct sk_buff *last_skb;
	struct skb_frag_struct *next_frag;
	struct iphdr *iph;
	struct tcphdr *tcph;
	struct vlan_group *vgrp;
	__wsum  data_csum;
	__be32 tcp_rcv_tsecr;
	__be32 tcp_rcv_tsval;
	__be32 tcp_ack;
	u32 tcp_next_seq;
	u32 skb_tot_frags_len;
	u16 ip_tot_len;
	u16 tcp_saw_tstamp; 		/* timestamps enabled */
	__be16 tcp_window;
	u16 vlan_tag;
	int pkt_aggr_cnt;		/* counts aggregated packets */
	int vlan_packet;
	int mss;
	int active;
};

该结构体中含有几个sk_buff指针,应该是挂接数据包的时候使用的,还有一些和报头相关信息如ip报头、tcp报头、vlan信息等 从以上的初始化代码和两个数据结构来看,大致可以猜想到,数据包从网卡驱动进入LRO后,会将其绑定到LRO descriptor数组中,然后一个一个绑上去,直到聚合的数据包数量到达设定值后,一起发送给协议栈。但是LRO具体是怎么封装数据包的,主要是处理什么数据的,不同特性的数据怎么组合的,现在还不清楚,只能继续往下看

lro_receive_skb()

网卡驱动接收函数中,将与LRO相关的部分提取出来,大致的结构是这样

int xxx_eth_recv()
{
    ......

    lro_receive_skb();

    ......

#if defined (CONFIG_RAETH_LRO)
	if (lro_flush_needed) {
		lro_flush_all(&ei_local->lro_mgr);
		lro_flush_needed = 0;
	}
#endif    
    return count;
}

在进行了skb分配、DMA映射和各种复杂判断和操作之后,就会调用lro_receive_skb()函数,在接收函数的末尾,会根据一个全局变量lro_flush_needed来判断是否调用lro_flush_all()函数。根据函数名以及所处位置,大概猜测lro_receive_skb()就是将一帧数据聚合到LRO descriptor中,而当LRO descriptor满了,会设置lro_flush_needed为1,调用lro_flush_all()将数据发送到协议栈

lro_receive_skb()这个内核api定义在net/ipv4/inet_iro.c中,

void lro_receive_skb(struct net_lro_mgr *lro_mgr,
		     struct sk_buff *skb,
		     void *priv)
{
	if (__lro_proc_skb(lro_mgr, skb, NULL, 0, priv)) {
		if (lro_mgr->features & LRO_F_NAPI)
			netif_receive_skb(skb);
		else
			netif_rx(skb);
	}
}

该函数传入3个参数,lro_mgr是LRO管理器,skb是接收的一帧数据包,priv在这里没用到。这段代码是一个if判断,大概有这样几种情况

  • __lro_proc_skb()函数返回值不为0,if判断条件不成立,lro_receive_skb()直接返回,猜测这样的话,应该是没有被协议栈接收,drop掉了
  • __lro_proc_skb()函数返回值为0,且第2个if也成立,会调用netif_receive_skb(),这个函数就是将数据传递到协议栈的
  • __lro_proc_skb()函数返回值为0,但第2个if不成立,会调用netif_rx(),网上查过这个api是内核废弃的一个api

没有什么需要考虑的,直接进入__lro_proc_skb()去看看

static int __lro_proc_skb(struct net_lro_mgr *lro_mgr, struct sk_buff *skb,
			  struct vlan_group *vgrp, u16 vlan_tag, void *priv)
{
	struct net_lro_desc *lro_desc;
	struct iphdr *iph;
	struct tcphdr *tcph;
	u64 flags;
	int vlan_hdr_len = 0;

	if (!lro_mgr->get_skb_header ||
	    lro_mgr->get_skb_header(skb, (void *)&iph, (void *)&tcph,
				    &flags, priv))
		goto out;

	if (!(flags & LRO_IPV4) || !(flags & LRO_TCP))
		goto out;

	lro_desc = lro_get_desc(lro_mgr, lro_mgr->lro_arr, iph, tcph);
	if (!lro_desc)
		goto out;

	if ((skb->protocol == htons(ETH_P_8021Q)) &&
	    !(lro_mgr->features & LRO_F_EXTRACT_VLAN_ID))
		vlan_hdr_len = VLAN_HLEN;

	if (!lro_desc->active) { /* start new lro session */
		if (lro_tcp_ip_check(iph, tcph, skb->len - vlan_hdr_len, NULL))
			goto out;

		skb->ip_summed = lro_mgr->ip_summed_aggr;
		lro_init_desc(lro_desc, skb, iph, tcph, vlan_tag, vgrp);
		LRO_INC_STATS(lro_mgr, aggregated);
		return 0;
	}

	if (lro_desc->tcp_next_seq != ntohl(tcph->seq))
		goto out2;

	if (lro_tcp_ip_check(iph, tcph, skb->len - vlan_hdr_len, lro_desc))
		goto out2;

	lro_add_packet(lro_desc, skb, iph, tcph);
	LRO_INC_STATS(lro_mgr, aggregated);

	if ((lro_desc->pkt_aggr_cnt >= lro_mgr->max_aggr) ||
	    lro_desc->parent->len > (0xFFFF - lro_mgr->dev->mtu))
		lro_flush(lro_mgr, lro_desc);

	return 0;

out2: /* send aggregated SKBs to stack */
	lro_flush(lro_mgr, lro_desc);

out:
	return 1;
}

大致浏览一遍该函数后,可以发现主要干了这么几件事

  • 报文类型限制 调用LRO管理器的报头分析函数,获得ip报头和tcp报头,并返回了一个flags,并且下一个if判断就是对这个flags判断,根据字面意思,大致可以断定,LRO这个功能就是对IP或者TCP包起作用的

  • 获取LRO descriptor 这里是通过一个函数lro_get_desc()来处理的,一会儿重点看一下

  • vlan信息判断,不用管

  • active判断 这里根据前文lro_get_desc()函数得来的LRO descriptor的active标志做了一个判断,注释说如果active为0,表示一个新的LRO session,然后做了一些初始化什么的,返回了0。前面分析说过了,返回0表示函数执行成功,会调用netif_receive_skb()或者netif_rx()

  • 报文挂接 经过了一些判断条件后,调用了lro_add_packet()函数,有点像是把这一帧数据加入到LRO中,然后判断了大小是否超过最大限制,如果超过,调用lro_flush(),这个函数和驱动中的lro_flush_all()有点像

这里比较重要的点在于lro_get_desc()函数、active的含义、lro_add_packet()函数以及调用lro_flush()函数的时机

lro_get_desc()

先来看看lro_get_desc()这个函数,

static struct net_lro_desc *lro_get_desc(struct net_lro_mgr *lro_mgr,
					 struct net_lro_desc *lro_arr,
					 struct iphdr *iph,
					 struct tcphdr *tcph)
{
	struct net_lro_desc *lro_desc = NULL;
	struct net_lro_desc *tmp;
	int max_desc = lro_mgr->max_desc;
	int i;

	for (i = 0; i < max_desc; i++) {
		tmp = &lro_arr[i];
		if (tmp->active)
			if (!lro_check_tcp_conn(tmp, iph, tcph)) {
				lro_desc = tmp;
				goto out;
			}
	}

	for (i = 0; i < max_desc; i++) {
		if (!lro_arr[i].active) {
			lro_desc = &lro_arr[i];
			goto out;
		}
	}

	LRO_INC_STATS(lro_mgr, no_desc);
out:
	return lro_desc;
}

该函数主要由两个for构成,看起来应该是2次遍历LRO descriptor数组,查找并返回满足条件的,所以这里的条件得好好分析 第一个for循环,判断了active状态,还调用了lro_check_tcp_conn()函数,看看lro_check_tcp_conn()干了什么

static int lro_check_tcp_conn(struct net_lro_desc *lro_desc,
			      struct iphdr *iph,
			      struct tcphdr *tcph)
{
	if ((lro_desc->iph->saddr != iph->saddr) ||
	    (lro_desc->iph->daddr != iph->daddr) ||
	    (lro_desc->tcph->source != tcph->source) ||
	    (lro_desc->tcph->dest != tcph->dest))
		return -1;
	return 0;
}

用descriptor的ip、tcp头信息和skb的信息比对,如果有一个不相同,则返回-1,都相同返回0,结合这里的判断内容以及该函数的名字,大致可以断定该函数是判断新来的skb与descriptor中已经挂接的报文是否为同一个tcp链路,从这个信息可以知道LRO每一个descriptor只管理一条tcp练级的数据,有8个descriptor,那么最多就可以同时管理8条tcp的连接数据

回到上级函数,其逻辑有以下几种情况

  • 如果在descirptor数组中找到一个active的,并且新进来的skb属于该descirptor管理的tcp连接中一帧数据,就将该descirptor返回
  • 如果descirptor数组中所有都为非active的,新进来的skb不属于任何一个active的descirptor,进入第2个for循环
  • 存在一个为非active的descriptor,返回该descriptor
  • 不存在为非active的descriptor,调用LRO_INC_STATS(),将LRO管理器状态变更为no_desc

从以上分析可以看出,active是一个bool量,非active的应该是还未使用的descriptor,active的是已经使用中的;一个descriptor确实只管理一条tcp连接

new LRO session

返回到上级函数,再来分析一下net LRO session的部分,这里就很好理解了,因为descriptor中不存在和新进来的skb是同一个tcp连接的,当然新来的skb应该是一个新的连接的第一帧数据,所以应该放在一个新的descriptor中 new session 会调用lro_tcp_ip_check()检查是否满足LRO的条件,这里不详细分析。调用lro_init_desc()对descriptor进行了初始化,来看看怎么初始化的

static void lro_init_desc(struct net_lro_desc *lro_desc, struct sk_buff *skb,
			  struct iphdr *iph, struct tcphdr *tcph,
			  u16 vlan_tag, struct vlan_group *vgrp)
{
	int nr_frags;
	__be32 *ptr;
	u32 tcp_data_len = TCP_PAYLOAD_LENGTH(iph, tcph);

	nr_frags = skb_shinfo(skb)->nr_frags;
	lro_desc->parent = skb;
	lro_desc->next_frag = &(skb_shinfo(skb)->frags[nr_frags]);
	lro_desc->iph = iph;
	lro_desc->tcph = tcph;
	lro_desc->tcp_next_seq = ntohl(tcph->seq) + tcp_data_len;
	lro_desc->tcp_ack = tcph->ack_seq;
	lro_desc->tcp_window = tcph->window;

	lro_desc->pkt_aggr_cnt = 1;
	lro_desc->ip_tot_len = ntohs(iph->tot_len);

	if (tcph->doff == 8) {
		ptr = (__be32 *)(tcph+1);
		lro_desc->tcp_saw_tstamp = 1;
		lro_desc->tcp_rcv_tsval = *(ptr+1);
		lro_desc->tcp_rcv_tsecr = *(ptr+2);
	}

	lro_desc->mss = tcp_data_len;
	lro_desc->vgrp = vgrp;
	lro_desc->vlan_tag = vlan_tag;
	lro_desc->active = 1;

	lro_desc->data_csum = lro_tcp_data_csum(iph, tcph,
						tcp_data_len);
}

该函数用descriptor的parent指针指向skb,并记录了ip头、tcp头、tcp next sequence number、tcp ack、窗口大小、数据域大小等信息,并将pkt_aggr_cnt计数器初始化为1。这里有一个疑问,根据前文的分析,descriptor应该是管理了一条tcp连接的所有报文,但是这里的tcp各种信息却只有单独的记录,如果下一个tcp报文到达,这些信息怎么记录呢?先留着,返回函数,new session 的最后调用了LRO_INC_STATS()更新LRO管理器的聚合数据包数量,表明一个新的数据包已经聚合,然后返回0

lro_add_packet()

如果不是一个new LRO session,会调用lro_add_packet()函数来处理,

static void lro_add_packet(struct net_lro_desc *lro_desc, struct sk_buff *skb,
			   struct iphdr *iph, struct tcphdr *tcph)
{
	struct sk_buff *parent = lro_desc->parent;
	int tcp_data_len = TCP_PAYLOAD_LENGTH(iph, tcph);

	lro_add_common(lro_desc, iph, tcph, tcp_data_len);

	skb_pull(skb, (skb->len - tcp_data_len));
	parent->truesize += skb->truesize;

	if (lro_desc->last_skb)
		lro_desc->last_skb->next = skb;
	else
		skb_shinfo(parent)->frag_list = skb;

	lro_desc->last_skb = skb;
}

该函数调用了lro_add_common(),skb_pull()函数,然后做了一个last_skb的判断,先看lro_add_common()是干嘛的

static void lro_add_common(struct net_lro_desc *lro_desc, struct iphdr *iph,
			   struct tcphdr *tcph, int tcp_data_len)
{
	struct sk_buff *parent = lro_desc->parent;
	__be32 *topt;

	lro_desc->pkt_aggr_cnt++;
	lro_desc->ip_tot_len += tcp_data_len;
	lro_desc->tcp_next_seq += tcp_data_len;
	lro_desc->tcp_window = tcph->window;
	lro_desc->tcp_ack = tcph->ack_seq;

	/* don't update tcp_rcv_tsval, would not work with PAWS */
	if (lro_desc->tcp_saw_tstamp) {
		topt = (__be32 *) (tcph + 1);
		lro_desc->tcp_rcv_tsecr = *(topt + 2);
	}

	lro_desc->data_csum = csum_block_add(lro_desc->data_csum,
					     lro_tcp_data_csum(iph, tcph,
							       tcp_data_len),
					     parent->len);

	parent->len += tcp_data_len;
	parent->data_len += tcp_data_len;
	if (tcp_data_len > lro_desc->mss)
		lro_desc->mss = tcp_data_len;
}

首先是pkt_aggr_cnt++,这个好理解,pkt_aggr_cnt记录当前descriptor已经聚合了多少数据包,新添加了一个当然要自增 ip_tot_len追加了新skb的tcp数据长度,这个成员应该是记录整个tcp连接所有报文的数据大小和,这只有tcp segment类型报文才可以用,因为tcp连接建立和断开操作的数据不能合并;tcp next sequence number追加了当前skb的数据长度,这符合tcp的协议规定;窗口大小直接覆盖;校验码是追加操作 从这里看出来,LRO的聚合方式,并不是把每一帧数据通过链表之类的数据结构串起来,然后一次发给协议栈,协议栈再解开链表,一个一个去解析数据帧,而是将一个tcp连接中一个一个到来的帧的数据部分合并,因为头信息是完全一样的,然后封成一个大的tcp包,一下传给协议栈,从协议栈的角度看,它只处理了一个包

调用skb_pull()函数把新skb的指针移到其data指针去除头部信息的真正data部分,然后以last_skb->next的方式去挂接接下来的所有skb

总结一下,descripotr对于skb的聚合方式,是将一个tcp连接的第一包数据挂接再parent上,其后的所有数据包的数据部分挂接到last_skb指针及next下面,如图

skb数据部分聚合方式

最后生成的tcp数据包如下图所示

lro_flush

lor_flush()函数的执行条件是

	if ((lro_desc->pkt_aggr_cnt >= lro_mgr->max_aggr) ||
	    lro_desc->parent->len > (0xFFFF - lro_mgr->dev->mtu))
		lro_flush(lro_mgr, lro_desc);

如果descriptor的计数器达到最大值64,或者里面挂接的数据大小已经不能够再接收下一帧数据了,就会调用这个函数

static void lro_flush(struct net_lro_mgr *lro_mgr,
		      struct net_lro_desc *lro_desc)
{
	if (lro_desc->pkt_aggr_cnt > 1)
		lro_update_tcp_ip_header(lro_desc);

	skb_shinfo(lro_desc->parent)->gso_size = lro_desc->mss;

	if (lro_desc->vgrp) {
		if (lro_mgr->features & LRO_F_NAPI)
			vlan_hwaccel_receive_skb(lro_desc->parent,
						 lro_desc->vgrp,
						 lro_desc->vlan_tag);
		else
			vlan_hwaccel_rx(lro_desc->parent,
					lro_desc->vgrp,
					lro_desc->vlan_tag);

	} else {
		if (lro_mgr->features & LRO_F_NAPI)
			netif_receive_skb(lro_desc->parent);
		else
			netif_rx(lro_desc->parent);
	}

	LRO_INC_STATS(lro_mgr, flushed);
	lro_clear_desc(lro_desc);
}

该函数实际上就是通过调用netif_receive_skb(),将descriptor中的parent发送给协议栈,然后将descriptor清空