im钱包安卓版下载
数字资产服务平台

im钱包安卓版下载是全球著名的数字资产交易平台之一,主要面向全球用户提供比特币、莱特币、以太币等数字资产的币币和衍生品交易服务。

tokenpocket钱包网站首页|gro

时间:2024-03-07 19:03:31

linux kernel 网络协议栈之GRO(Generic receive offload)-CSDN博客

>

linux kernel 网络协议栈之GRO(Generic receive offload)-CSDN博客

linux kernel 网络协议栈之GRO(Generic receive offload)

最新推荐文章于 2023-01-16 20:08:55 发布

lucien

最新推荐文章于 2023-01-16 20:08:55 发布

阅读量1.8w

收藏

36

点赞数

7

分类专栏:

linux内核

linux内核

专栏收录该内容

92 篇文章

5 订阅

订阅专栏

GRO(Generic receive offload)在内核2.6.29之后合并进去的,作者是一个华裔Herbert Xu ,GRO的简介可以看这里:

http://lwn.net/Articles/358910/

先来描述一下GRO的作用,GRO是针对网络接受包的处理的,并且只是针对NAPI类型的驱动,因此如果要支持GRO,不仅要内核支持,而且驱动也必须调用相应的借口,用ethtool -K gro on来设置,如果报错就说明网卡驱动本身就不支持GRO。

GRO类似tso,可是tso只支持发送数据包,这样你tcp层大的段会在网卡被切包,然后再传递给对端,而如果没有gro,则小的段会被一个个送到协议栈,有了gro之后,就会在接收端做一个反向的操作(想对于tso).也就是将tso切好的数据包组合成大包再传递给协议栈。

如果实现了GRO支持的驱动是这样子处理数据的,在NAPI的回调poll方法中读取数据包,然后调用GRO的接口napi_gro_receive或者napi_gro_frags来将数据包feed进协议栈。而具体GRO的工作就是在这两个函数中进行的,他们最终都会调用__napi_gro_receive。下面就是napi_gro_receive,它最终会调用napi_skb_finish以及__napi_gro_receive。

gro_result_t napi_gro_receive(struct napi_struct *napi, struct sk_buff *skb)

{

skb_gro_reset_offset(skb);

return napi_skb_finish(__napi_gro_receive(napi, skb), skb);

}

然后GRO什么时候会将数据feed进协议栈呢,这里会有两个退出点,一个是在napi_skb_finish里,他会通过判断__napi_gro_receive的返回值,来决定是需要将数据包立即feed进协议栈还是保存起来,还有一个点是当napi的循环执行完毕时,也就是执行napi_complete的时候,先来看napi_skb_finish,napi_complete我们后面会详细介绍。

在NAPI驱动中,直接调用netif_receive_skb会将数据feed 进协议栈,因此这里如果返回值是NORMAL,则直接调用netif_receive_skb来将数据送进协议栈。

gro_result_t napi_skb_finish(gro_result_t ret, struct sk_buff *skb)

{

switch (ret) {

case GRO_NORMAL:

//将数据包送进协议栈

if (netif_receive_skb(skb))

ret = GRO_DROP;

break;

//表示skb可以被free,因为gro已经将skb合并并保存起来。

case GRO_DROP:

case GRO_MERGED_FREE:

//free skb

kfree_skb(skb);

break;

//这个表示当前数据已经被gro保存起来,但是并没有进行合并,因此skb还需要保存。

case GRO_HELD:

case GRO_MERGED:

break;

}

return ret;

}

GRO的主要思想就是,组合一些类似的数据包(基于一些数据域,后面会介绍到)为一个大的数据包(一个skb),然后feed给协议栈,这里主要是利用Scatter-gather IO,也就是skb的struct skb_shared_info域(我前面的blog讲述ip分片的时候有详细介绍这个域)来合并数据包。

在每个NAPI的实例都会包括一个域叫gro_list,保存了我们积攒的数据包(将要被merge的).然后每次进来的skb都会在这个链表里面进行查找,看是否需要merge。而gro_count表示当前的gro_list中的skb的个数。

struct napi_struct {

................................................

//个数

unsigned int gro_count;

......................................

//积攒的数据包

struct sk_buff *gro_list;

struct sk_buff *skb;

};

紧接着是gro最核心的一个数据结构napi_gro_cb,它是保存在skb的cb域中,它保存了gro要使用到的一些上下文,这里每个域kernel的注释都比较清楚。到后面我们会看到这些域的具体用途。

struct napi_gro_cb {

/* Virtual address of skb_shinfo(skb)->frags[0].page + offset. */

void *frag0;

/* Length of frag0. */

unsigned int frag0_len;

/* This indicates where we are processing relative to skb->data. */

int data_offset;

/* This is non-zero if the packet may be of the same flow. */

int same_flow;

/* This is non-zero if the packet cannot be merged with the new skb. */

int flush;

/* Number of segments aggregated. */

int count;

/* Free the skb? */

int free;

};

每一层协议都实现了自己的gro回调函数,gro_receive和gro_complete,gro系统会根据协议来调用对应回调函数,其中gro_receive是将输入skb尽量合并到我们gro_list中。而gro_complete则是当我们需要提交gro合并的数据包到协议栈时被调用的。

下面就是ip层和tcp层对应的回调方法:

static const struct net_protocol tcp_protocol = {

.handler = tcp_v4_rcv,

.err_handler = tcp_v4_err,

.gso_send_check = tcp_v4_gso_send_check,

.gso_segment = tcp_tso_segment,

//gso回调

.gro_receive = tcp4_gro_receive,

.gro_complete = tcp4_gro_complete,

.no_policy = 1,

.netns_ok = 1,

};

static struct packet_type ip_packet_type __read_mostly = {

.type = cpu_to_be16(ETH_P_IP),

.func = ip_rcv,

.gso_send_check = inet_gso_send_check,

.gso_segment = inet_gso_segment,

//gso回调

.gro_receive = inet_gro_receive,

.gro_complete = inet_gro_complete,

};

gro的入口函数是napi_gro_receive,它的实现很简单,就是将skb包含的gro上下文reset,然后调用__napi_gro_receive,最终通过napi_skb_finis来判断是否需要讲数据包feed进协议栈。

gro_result_t napi_gro_receive(struct napi_struct *napi, struct sk_buff *skb)

{

//reset gro对应的域

skb_gro_reset_offset(skb);

return napi_skb_finish(__napi_gro_receive(napi, skb), skb);

}

napi_skb_finish一开始已经介绍过了,这个函数主要是通过判断传递进来的ret(__napi_gro_receive的返回值),来决定是否需要feed数据进协议栈。它的第二个参数是前面处理过的skb。

这里再来看下skb_gro_reset_offset,首先要知道一种情况,那就是skb本身不包含数据(包括头也没有),而所有的数据都保存在skb_shared_info中(支持S/G的网卡有可能会这么做).此时我们如果想要合并的话,就需要将包头这些信息取出来,也就是从skb_shared_info的frags[0]中去的,在 skb_gro_reset_offset中就有做这个事情,而这里就会把头的信息保存到napi_gro_cb 的frags0中。并且此时frags必然不会在high mem,要么是线性区,要么是dma(S/G io)。 来看skb_gro_reset_offset。

void skb_gro_reset_offset(struct sk_buff *skb)

{

NAPI_GRO_CB(skb)->data_offset = 0;

NAPI_GRO_CB(skb)->frag0 = NULL;

NAPI_GRO_CB(skb)->frag0_len = 0;

//如果mac_header和skb->tail相等并且地址不在高端内存,则说明包头保存在skb_shinfo中,所以我们需要从frags中取得对应的数据包

if (skb->mac_header == skb->tail &&

!PageHighMem(skb_shinfo(skb)->frags[0].page)) {

//可以看到frag0保存的就是对应的skb的frags的第一个元素的地址

NAPI_GRO_CB(skb)->frag0 =

page_address(skb_shinfo(skb)->frags[0].page) +

skb_shinfo(skb)->frags[0].page_offset;

//然后保存对应的大小。

NAPI_GRO_CB(skb)->frag0_len = skb_shinfo(skb)->frags[0].size;

}

}

接下来就是__napi_gro_receive,它主要是遍历gro_list,然后给same_flow赋值,这里要注意,same_flow是一个标记,表示某个skb是否有可能会和当前要处理的skb是相同的流,而这里的相同会在每层都进行判断,也就是在设备层,ip层,tcp层都会判断,这里就是设备层的判断了。这里的判断很简单,有2个条件: 1 设备是否相同 2 mac的头必须相等

如果上面两个条件都满足,则说明两个skb有可能是相同的flow,所以设置same_flow,以便与我们后面合并。

static gro_result_t

__napi_gro_receive(struct napi_struct *napi, struct sk_buff *skb)

{

struct sk_buff *p;

if (netpoll_rx_on(skb))

return GRO_NORMAL;

//遍历gro_list,然后判断是否有可能两个skb 相似。

for (p = napi->gro_list; p; p = p->next) {

//给same_flow赋值

NAPI_GRO_CB(p)->same_flow =

(p->dev == skb->dev) &&

!compare_ether_header(skb_mac_header(p),

skb_gro_mac_header(skb));

NAPI_GRO_CB(p)->flush = 0;

}

//调用dev_gro_receiv

return dev_gro_receive(napi, skb);

}

接下来来看dev_gro_receive,这个函数我们分做两部分来看,第一部分是正常处理部分,第二部份是处理frag0的部分。

来看如何判断是否支持GRO,这里每个设备的features会在驱动初始化的时候被初始化,然后如果支持GRO,则会包括NETIF_F_GRO。 还有要注意的就是,gro不支持切片的ip包,因为ip切片的组包在内核的ip会做一遍,因此这里gro如果合并的话,没有多大意义,而且还增加复杂度。

在dev_gro_receive中会遍历对应的ptype(也就是协议的类链表,以前的blog有详细介绍),然后调用对应的回调函数,一般来说这里会调用文章开始说的ip_packet_type,也就是 inet_gro_receive。

而 inet_gro_receive的返回值表示我们需要立刻feed 进协议栈的数据包,如果为空,则说明不需要feed数据包进协议栈。后面会分析到这里他的详细算法。

而如果当inet_gro_receive正确返回后,如果same_flow没有被设置,则说明gro list中不存在能和当前的skb合并的项,因此此时需要将skb插入到gro list中。这个时候的返回值就是HELD。

enum gro_result dev_gro_receive(struct napi_struct *napi, struct sk_buff *skb)

{

struct sk_buff **pp = NULL;

struct packet_type *ptype;

__be16 type = skb->protocol;

struct list_head *head = &ptype_base[ntohs(type) & PTYPE_HASH_MASK];

int same_flow;

int mac_len;

enum gro_result ret;

//判断是否支持gro

if (!(skb->dev->features & NETIF_F_GRO))

goto normal;

//判断是否为切片的ip包

if (skb_is_gso(skb) || skb_has_frags(skb))

goto normal;

rcu_read_lock();

//开始遍历对应的协议表

list_for_each_entry_rcu(ptype, head, list) {

if (ptype->type != type || ptype->dev || !ptype->gro_receive)

continue;

skb_set_network_header(skb, skb_gro_offset(skb));

mac_len = skb->network_header - skb->mac_header;

skb->mac_len = mac_len;

NAPI_GRO_CB(skb)->same_flow = 0;

NAPI_GRO_CB(skb)->flush = 0;

NAPI_GRO_CB(skb)->free = 0;

//调用对应的gro接收函数

pp = ptype->gro_receive(&napi->gro_list, skb);

break;

}

rcu_read_unlock();

//如果是没有实现gro的协议则也直接调到normal处理

if (&ptype->list == head)

goto normal;

//到达这里,则说明gro_receive已经调用过了,因此进行后续的处理

//得到same_flow

same_flow = NAPI_GRO_CB(skb)->same_flow;

//看是否有需要free对应的skb

ret = NAPI_GRO_CB(skb)->free ? GRO_MERGED_FREE : GRO_MERGED;

//如果返回值pp部位空,则说明pp需要马上被feed进协议栈

if (pp) {

struct sk_buff *nskb = *pp;

*pp = nskb->next;

nskb->next = NULL;

//调用napi_gro_complete 将pp刷进协议栈

napi_gro_complete(nskb);

napi->gro_count--;

}

//如果same_flow有设置,则说明skb已经被正确的合并,因此直接返回。

if (same_flow)

goto ok;

//查看是否有设置flush和gro list的个数是否已经超过限制

if (NAPI_GRO_CB(skb)->flush || napi->gro_count >= MAX_GRO_SKBS)

goto normal;

//到达这里说明skb对应gro list来说是一个新的skb,也就是说当前的gro list并不存在可以和skb合并的数据包,因此此时将这个skb插入到gro_list的头。

napi->gro_count++;

NAPI_GRO_CB(skb)->count = 1;

skb_shinfo(skb)->gso_size = skb_gro_len(skb);

//将skb插入到gro list的头

skb->next = napi->gro_list;

napi->gro_list = skb;

//设置返回值

ret = GRO_HELD;

然后就是处理frag0的部分,以及不支持gro的处理。

这里要需要对skb_shinfo的结构比较了解,我在以前的blog对这个有很详细的介绍,可以去查阅。

pull:

//是否需要拷贝头

if (skb_headlen(skb) < skb_gro_offset(skb)) {

//得到对应的头的大小

int grow = skb_gro_offset(skb) - skb_headlen(skb);

BUG_ON(skb->end - skb->tail < grow);

//开始拷贝

memcpy(skb_tail_pointer(skb), NAPI_GRO_CB(skb)->frag0, grow);

skb->tail += grow;

skb->data_len -= grow;

//更新对应的frags[0]

skb_shinfo(skb)->frags[0].page_offset += grow;

skb_shinfo(skb)->frags[0].size -= grow;

//如果size为0了,则说明第一个页全部包含头,因此需要将后面的页全部移动到前面。

if (unlikely(!skb_shinfo(skb)->frags[0].size)) {

put_page(skb_shinfo(skb)->frags[0].page);

//开始移动。

memmove(skb_shinfo(skb)->frags,

skb_shinfo(skb)->frags + 1,

--skb_shinfo(skb)->nr_frags * sizeof(skb_frag_t));

}

}

ok:

return ret;

normal:

ret = GRO_NORMAL;

goto pull;

}

接下来就是inet_gro_receive,这个函数是ip层的gro receive回调函数,函数很简单,首先取得ip头,然后判断是否需要从frag复制数据,如果需要则复制数据

//得到偏移

off = skb_gro_offset(skb);

//得到头的整个长度(mac+ip)

hlen = off + sizeof(*iph);

//得到ip头

iph = skb_gro_header_fast(skb, off);

//是否需要复制

if (skb_gro_header_hard(skb, hlen)) {

iph = skb_gro_header_slow(skb, hlen, off);

if (unlikely(!iph))

goto out;

}

然后就是一些校验工作,比如协议是否支持gro_reveive,ip头是否合法等等

proto = iph->protocol & (MAX_INET_PROTOS - 1);

rcu_read_lock();

ops = rcu_dereference(inet_protos[proto]);

//是否支持gro

if (!ops || !ops->gro_receive)

goto out_unlock;

//ip头是否合法

if (*(u8 *)iph != 0x45)

goto out_unlock;

//ip头教研

if (unlikely(ip_fast_csum((u8 *)iph, iph->ihl)))

goto out_unlock;

然后就是核心的处理部分,它会遍历整个gro_list,然后进行same_flow和是否需要flush的判断。

这里ip层设置same_flow是根据下面的规则的: 1 4层的协议必须相同 2 tos域必须相同 3 源,目的地址必须相同

如果3个条件一个不满足,则会设置same_flow为0。 这里还有一个就是判断是否需要flush 对应的skb到协议栈,这里的判断条件是这样子的。 1 ip包的ttl不一样 2 ip包的id顺序不对 3 如果是切片包

如果上面两个条件某一个满足,则说明skb需要被flush出gro。

不过这里要注意只有两个数据包是same flow的情况下,才会进行flush判断。原因很简单,都不是有可能进行merge的包,自然没必要进行flush了。

//取出id

id = ntohl(*(__be32 *)&iph->id);

//判断是否需要切片

flush = (u16)((ntohl(*(__be32 *)iph) ^ skb_gro_len(skb)) | (id ^ IP_DF));

id >>= 16;

//开始遍历gro list

for (p = *head; p; p = p->next) {

struct iphdr *iph2;

//如果上一层已经不可能same flow则直接继续下一个

if (!NAPI_GRO_CB(p)->same_flow)

continue;

//取出ip头

iph2 = ip_hdr(p);

//开始same flow的判断

if ((iph->protocol ^ iph2->protocol) |

(iph->tos ^ iph2->tos) |

((__force u32)iph->saddr ^ (__force u32)iph2->saddr) |

((__force u32)iph->daddr ^ (__force u32)iph2->daddr)) {

NAPI_GRO_CB(p)->same_flow = 0;

continue;

}

//开始flush的判断。这里注意如果不是same_flow的话,就没必要进行flush的判断。

/* All fields must match except length and checksum. */

NAPI_GRO_CB(p)->flush |=

(iph->ttl ^ iph2->ttl) |

((u16)(ntohs(iph2->id) + NAPI_GRO_CB(p)->count) ^ id);

NAPI_GRO_CB(p)->flush |= flush;

}

NAPI_GRO_CB(skb)->flush |= flush;

//pull ip头进gro,这里更新data_offset

skb_gro_pull(skb, sizeof(*iph));

//设置传输层的头的位置

skb_set_transport_header(skb, skb_gro_offset(skb));

//调用传输层的reveive方法。

pp = ops->gro_receive(head, skb);

out_unlock:

rcu_read_unlock();

out:

NAPI_GRO_CB(skb)->flush |= flush;

}

然后就是tcp层的gro方法,它的主要实现函数是tcp_gro_receive,他的流程和inet_gro_receiv类似,就是取得tcp的头,然后对gro list进行遍历,最终会调用合并方法。

首先来看gro list遍历的部分,它对same flow的要求就是source必须相同,如果不同则设置same flow为0.如果相同则跳到found部分,进行合并处理。

//遍历gro list

for (; (p = *head); head = &p->next) {

//如果ip层已经不可能same flow则直接进行下一次匹配

if (!NAPI_GRO_CB(p)->same_flow)

continue;

th2 = tcp_hdr(p);

//判断源地址

if (*(u32 *)&th->source ^ *(u32 *)&th2->source) {

NAPI_GRO_CB(p)->same_flow = 0;

continue;

}

goto found;

}

接下来就是当找到能够合并的skb的时候的处理,这里首先来看flush的设置,这里会有4个条件: 1 拥塞状态被设置(TCP_FLAG_CWR). 2 tcp的ack的序列号不匹配 (这是肯定的,因为它只是对tso或者说gso进行反向操作) 3 skb的flag和从gro list中查找到要合并skb的flag 如果他们中的不同位 不包括TCP_FLAG_CWR | TCP_FLAG_FIN | TCP_FLAG_PSH,这三个任意一个域。 4 tcp的option域不同

如果上面4个条件有一个满足,则会设置flush为1,也就是找到的这个skb(gro list中)必须被刷出到协议栈。

这里谈一下flags域的设置问题首先如果当前的skb设置了cwr,也就是发生了拥塞,那么自然前面被缓存的数据包需要马上被刷到协议栈,以便与tcp的拥塞控制马上进行。

而FIN和PSH这两个flag自然不需要一致,因为这两个和其他的不是互斥的。

found:

flush = NAPI_GRO_CB(p)->flush;

//如果设置拥塞,则肯定需要刷出skb到协议栈

flush |= (__force int)(flags & TCP_FLAG_CWR);

//如果相差的域是除了这3个中的,就需要flush出skb

flush |= (__force int)((flags ^ tcp_flag_word(th2)) &

~(TCP_FLAG_CWR | TCP_FLAG_FIN | TCP_FLAG_PSH));

//ack的序列号必须一致

flush |= (__force int)(th->ack_seq ^ th2->ack_seq);

//tcp的option头必须一致

for (i = sizeof(*th); i < thlen; i += 4)

flush |= *(u32 *)((u8 *)th + i) ^

*(u32 *)((u8 *)th2 + i);

mss = skb_shinfo(p)->gso_size;

flush |= (len - 1) >= mss;

flush |= (ntohl(th2->seq) + skb_gro_len(p)) ^ ntohl(th->seq);

//如果flush有设置则不会调用 skb_gro_receive,也就是不需要进行合并,否则调用skb_gro_receive进行数据包合并

if (flush || skb_gro_receive(head, skb)) {

mss = 1;

goto out_check_final;

}

p = *head;

th2 = tcp_hdr(p);

//更新p的头。到达这里说明合并完毕,因此需要更新合并完的新包的头。

tcp_flag_word(th2) |= flags & (TCP_FLAG_FIN | TCP_FLAG_PSH);

从上面我们可以看到如果tcp的包被设置了一些特殊的flag比如PSH,SYN这类的就必须马上把数据包刷出到协议栈。

下面就是最终的一些flags判断,比如第一个数据包进来都会到这里来判断。

out_check_final:

flush = len < mss;

//根据flag得到flush

flush |= (__force int)(flags & (TCP_FLAG_URG | TCP_FLAG_PSH |

TCP_FLAG_RST | TCP_FLAG_SYN |

TCP_FLAG_FIN));

if (p && (!NAPI_GRO_CB(skb)->same_flow || flush))

pp = head;

out:

NAPI_GRO_CB(skb)->flush |= flush;

这里要知道每次我们只会刷出gro list中的一个skb节点,这是因为每次进来的数据包我们也只会匹配一个。因此如果遇到需要刷出的数据包,会在dev_gro_receive中先刷出gro list中的,然后再将当前的skb feed进协议栈。

最后就是gro最核心的一个函数skb_gro_receive,它的主要工作就是合并,它有2个参数,第一个是gro list中和当前处理的skb是same flow的skb,第二个就是我们需要合并的skb。

这里要注意就是farg_list,其实gro对待skb_shared_info和ip层切片,组包很类似,就是frags放Scatter-Gather I/O的数据包,frag_list放线性数据。这里gro 也是这样的,如果过来的skb支持Scatter-Gather I/O并且数据是只放在frags中,则会合并frags,如果过来的skb不支持Scatter-Gather I/O(数据头还是保存在skb中),则合并很简单,就是新建一个skb然后拷贝当前的skb,并将gro list中的skb直接挂载到farg_list。

先来看支持Scatter-Gather I/O的处理部分。

//一些需要用到的变量

struct sk_buff *p = *head;

struct sk_buff *nskb;

//当前的skb的 share_ino

struct skb_shared_info *skbinfo = skb_shinfo(skb);

//当前的gro list中的要合并的skb的share_info

struct skb_shared_info *pinfo = skb_shinfo(p);

unsigned int headroom;

unsigned int len = skb_gro_len(skb);

unsigned int offset = skb_gro_offset(skb);

unsigned int headlen = skb_headlen(skb);

//如果有frag_list的话,则直接去非Scatter-Gather I/O部分处理,也就是合并到frag_list.

if (pinfo->frag_list)

goto merge;

else if (headlen <= offset) {

//支持Scatter-Gather I/O的处理

skb_frag_t *frag;

skb_frag_t *frag2;

int i = skbinfo->nr_frags;

//这里遍历是从后向前。

int nr_frags = pinfo->nr_frags + i;

offset -= headlen;

if (nr_frags > MAX_SKB_FRAGS)

return -E2BIG;

//设置pinfo的frags的大小,可以看到就是加上skb的frags的大小

pinfo->nr_frags = nr_frags;

skbinfo->nr_frags = 0;

frag = pinfo->frags + nr_frags;

frag2 = skbinfo->frags + i;

//遍历赋值,其实就是地址赋值,这里就是将skb的frag加到pinfo的frgas后面。

do {

*--frag = *--frag2;

} while (--i);

//更改page_offet的值

frag->page_offset += offset;

//修改size大小

frag->size -= offset;

//更新skb的相关值

skb->truesize -= skb->data_len;

skb->len -= skb->data_len;

skb->data_len = 0;

NAPI_GRO_CB(skb)->free = 1;

//最终完成

goto done;

} else if (skb_gro_len(p) != pinfo->gso_size)

return -E2BIG;

这里gro list中的要被合并的skb我们叫做skb_s.

接下来就是不支持支持Scatter-Gather I/O(skb的头放在skb中)的处理。这里处理也比较简单,就是复制一个新的nskb,然后它的头和skb_s一样,然后将skb_s挂载到nskb的frag_list上,并且把新建的nskb挂在到gro list中,代替skb_s的位置,而当前的skb

headroom = skb_headroom(p);

nskb = alloc_skb(headroom + skb_gro_offset(p), GFP_ATOMIC);

if (unlikely(!nskb))

return -ENOMEM;

//复制头

__copy_skb_header(nskb, p);

nskb->mac_len = p->mac_len;

skb_reserve(nskb, headroom);

__skb_put(nskb, skb_gro_offset(p));

//设置各层的头

skb_set_mac_header(nskb, skb_mac_header(p) - p->data);

skb_set_network_header(nskb, skb_network_offset(p));

skb_set_transport_header(nskb, skb_transport_offset(p));

__skb_pull(p, skb_gro_offset(p));

//复制数据

memcpy(skb_mac_header(nskb), skb_mac_header(p),

p->data - skb_mac_header(p));

//对应的gro 域的赋值

*NAPI_GRO_CB(nskb) = *NAPI_GRO_CB(p);

//可以看到frag_list被赋值

skb_shinfo(nskb)->frag_list = p;

skb_shinfo(nskb)->gso_size = pinfo->gso_size;

pinfo->gso_size = 0;

skb_header_release(p);

nskb->prev = p;

//更新新的skb的数据段

nskb->data_len += p->len;

nskb->truesize += p->len;

nskb->len += p->len;

//将新的skb插入到gro list中

*head = nskb;

nskb->next = p->next;

p->next = NULL;

p = nskb;

merge:

if (offset > headlen) {

skbinfo->frags[0].page_offset += offset - headlen;

skbinfo->frags[0].size -= offset - headlen;

offset = headlen;

}

__skb_pull(skb, offset);

//将skb插入新的skb的(或者老的skb,当frag list本身存在)fraglist

p->prev->next = skb;

p->prev = skb;

skb_header_release(skb);

优惠劵

lucien

关注

关注

7

点赞

36

收藏

觉得还不错?

一键收藏

知道了

2

评论

linux kernel 网络协议栈之GRO(Generic receive offload)

GRO(Generic receive offload)在内核2.6.29之后合并进去的,作者是一个华裔Herbert Xu ,GRO的简介可以看这里:http://lwn.net/Articles/358910/先来描述一下GRO的作用,GRO是针对网络接受包的处理的,并且只是针对NAPI类型的驱动,因此如果要支持GRO,不仅要内核支持,而且驱动也必须调用相应的借口,用ethtool -

复制链接

扫一扫

专栏目录

linux内核协议栈之GRO (Generic Receive Offload)

07-17

1256

GRO(Generic Receive Offload)从软件层面实现将多个 TCP/UDP 数据包聚合在一个skb结构,然后作为一个大数据包交付给上层的网络协议栈,以减少上层协议栈处理skb的开销,提高系统接收数据包的性能。

linux网络协议栈分析

04-11

linux网络协议栈分析

2 条评论

您还未登录,请先

登录

后发表或查看评论

关闭Linux网卡offload(负载)

csrh131的博客

07-09

3177

为什么需要关闭,当前主要的原因是tcpdump抓的包,存在超大的帧,不想要。为什么这么大,原因就是网卡把接收到的数据包整合优化了,帮程序减轻了负担,这就是offload做的事情。

需要用到一个工具,ethtool,CentOS7默认自带了

查看网卡eth0都有哪些offload,ethtool -k eth0 | grep offload

tcp-segmentation-offload: on

udp-fragmentation-offload: off [fixed]

generic-segment

geno:Geno是一种golang工具,可帮助从带注释的包中生成通用包

05-21

Geno

Geno是golang工具,可帮助从带注释的包中生成通用包。 目的是实验/重现开发的通用pacakge的想法

它是如何工作的?

最重要的组件是带注释的程序包。 它是通过在接口声明的注释部分添加标签“ gen:N”来实现的。 您应该首先为空接口或要专门设置的任何其他接口添加别名。 例如:

type Data interface{} //

新别名必须在整个程序包中一致使用。 如果您需要多个专用类型,可以像这样列出它们:

type Key interface{} //

type Value interface{} //

产生中

获得带注释的程序包后,您可以使用geno工具生成专用版本。 Geno将使用参数中给定的所有实例替换所有带注释类型的实例,从而创建一个新的程序包。 命令

geno -package="champio

Linux 网卡特性配置 ethtool 详解 网卡Offload

热门推荐

chenzhjlf的专栏

02-01

2万+

网络中校验和比较

2015年10月14日

本文说明了网卡,IP层,TCP层,UDP层的校验和功能,以及异同点。

网卡校验和

高级的网卡(e1000e等千M网卡)的接收,发送的校验和的计算方法是CRC32。

Refs:http://www.wireshark.org/docs/wsug_html_chunked/ChAdvChecksums.html

http://www.in

linux kernel 网络协议栈之GRO

shage001314的专栏

06-15

1968

GRO(Generic receive offload)在内核2.6.29之后合并进去的,作者是一个华裔Herbert Xu ,GRO的简介可以看这里:

http://lwn.net/Articles/358910/

先来描述一下GRO的作用,GRO是针对网络接受包的处理的,并且只是针对NAPI类型的驱动,因此如果要支持GRO,不仅要内核支持,而且驱动也必须调用相应的借口,用ethtool -

网卡收发模式及使用

陈嘉怡的专栏

12-29

3770

RSS receive side scaling,网卡多队列,需要硬件支持。网卡接收到网络数据包后,要发送一个硬件中断,通知CPU取数据包。默认配置,都是由CPU0去做。

RPS receive packet steering,向某个CPU发送一个软中断,来接收数据包,并递交给应用程序。

RFS receive flow streering,维护两种hash表,实现将软中断分散到多颗C

【Linux4.1.12源码分析】协议栈gro收包之UDP处理

one_clouder的专栏

11-03

2197

UDP offload为udpv4_offload

static const struct net_offload udpv4_offload = {

.callbacks = {

.gso_segment = udp4_ufo_fragment,

.gro_receive = udp4_gro_receive,

.gro_complete = udp4_gro_complete

UDP 与 GRO, GSO

weixin_43855786的博客

01-16

822

udp 数据包具有相同大小时, 才会被拼接成一个大的 udp 数据包, 同时内核还会告诉上层应用原始 udp 数据包的长度信息. 这样上层应用在需要的时候也可以根据这个信息来确定 udp packet 边界. 如。不知道是不是因为 GSO, GRO 是 Linux 新增特性的原因, 在 google 上找了半天都没有找到一篇详细的介绍如何使用 GSO/GRO 的文章, 最后从 Linux 内核中与 GSO/GRO 相关的。中存放的可能是多个 UDP 数据包拼接之后的内容, 此时。

linux GRO相关

network_kid的博客

12-20

241

skb_gro_receive(kernel4.14)

daily.dev Where developers gro-3.28.2.zip

12-30

名称:daily.dev Where developers gro ---------------------------------------- 版本:3.28.2 作者:https://daily.dev/ 分类:开发者工具 ---------------------------------------- 概述:获取为您量身定制的最...

网卡gro、gso功能调试.doc

07-28

网卡gro、gso功能调试,适用于网卡性能调优

dREG:使用GRO-seq和PRO-seq检测监管元素

05-09

使用GRO-seq数据检测调节性DNA序列。 在线计算网关 我们提供了在GPU服务器上运行dREG的计算网关,用户无需安装任何软件,只需上传bigWig文件并等待结果即可,这非常简单。 请单击链接尝试此站点: 或者 (新的) ...

淘金优化器GRO2022

07-13

淘金优化器GRO2022新出的群智能算法 淘金优化器GRO2022新出的群智能算法 淘金优化器GRO2022新出的群智能算法 淘金优化器GRO2022新出的群智能算法 淘金优化器GRO2022新出的群智能算法 淘金优化器GRO2022新出的群智能...

主控制芯片,全球前5强生产商排名及市场份额分析报告.pdf

03-06

主控制芯片,全球前5强生产商排名及市场份额分析报告

SwiftUI教程-不定时更新.zip

最新发布

03-06

SwiftUI教程-不定时更新.zip

Java毕设-基于springboot+vue的在线课程管理系统(附源码,数据库,教程).zip

03-06

Java 毕业设计,Java 课程设计,基于 SpringBoot 开发的,含有代码注释,新手也可看懂。毕业设计、期末大作业、课程设计、高分必看,下载下来,简单部署,就可以使用。

包含:项目源码、数据库脚本、软件工具等,前后端代码都在里面。

该系统功能完善、界面美观、操作简单、功能齐全、管理便捷,具有很高的实际应用价值。

项目都经过严格调试,确保可以运行!

1. 技术组成

前端:html、javascript、Vue

后台框架:SpringBoot

开发环境:idea

数据库:MySql(建议用 5.7 版本,8.0 有时候会有坑)

数据库工具:navicat

部署环境:Tomcat(建议用 7.x 或者 8.x 版本), maven

2. 部署

如果部署有疑问的话,可以找我咨询

后台路径地址:localhost:8080/项目名称/admin/dist/index.html

前台路径地址:localhost:8080/项目名称/front/index.html (无前台不需要输入)

Java毕设-基于springboot+Vue的基于web的机动车号牌管理系统2(附源码,数据库,教程).zip

03-06

Java 毕业设计,Java 课程设计,基于 SpringBoot 开发的,含有代码注释,新手也可看懂。毕业设计、期末大作业、课程设计、高分必看,下载下来,简单部署,就可以使用。

包含:项目源码、数据库脚本、软件工具等,前后端代码都在里面。

该系统功能完善、界面美观、操作简单、功能齐全、管理便捷,具有很高的实际应用价值。

项目都经过严格调试,确保可以运行!

1. 技术组成

前端:html、javascript、Vue

后台框架:SpringBoot

开发环境:idea

数据库:MySql(建议用 5.7 版本,8.0 有时候会有坑)

数据库工具:navicat

部署环境:Tomcat(建议用 7.x 或者 8.x 版本), maven

2. 部署

如果部署有疑问的话,可以找我咨询

后台路径地址:localhost:8080/项目名称/admin/dist/index.html

前台路径地址:localhost:8080/项目名称/front/index.html (无前台不需要输入)

weixin168“返家乡”高校暑期社会实践微信小程序设计与开发ssm后端毕业源码案例设计

03-06

如今的信息时代,对信息的共享性,信息的流通性有着较高要求,因此传统管理方式就不适合。为了让管理模式进行升级,也为了更好的维护信息,高校暑期社会实践微信小程序的开发运用就显得很有必要。并且通过开发高校暑期社会实践微信小程序,不仅可以让所学的微信小程序技术得到实际运用,也可以掌握MySQL的使用方法,对自身编程能力也有一个检验和提升的过程。尤其是通过实践,可以对系统的开发流程加深印象,无论是前期的分析与设计,还是后期的编码测试等环节,都可以有一个深刻的了解。

借助于高校暑期社会实践微信小程序这样的工具,让信息系统化,流程化,规范化是最终的发展结果,让其遵循实际操作流程的情况下,对信息实施规范化处理,让信息通过电子的方式进行保存,无论是管理人员检索信息,可以便利化操作,真正缩短信息处理时间,节省人力和信息管理的成本。

关键字:高校暑期社会实践微信小程序,微信小程序技术,MySQL

网卡 gro 函数处理流程

06-09

网卡GRO(Generic Receive Offload)函数处理流程一般如下:

1. 网卡接收到数据包。

2. 网卡驱动将数据包放入网卡接收队列。

3. 内核网络协议栈从网卡接收队列中获取数据包。

4. 内核网络协议栈将数据包进行预处理,例如校验和验证、IP分片重组等。

5. GRO函数判断数据包是否符合GRO处理条件,例如数据包的协议类型是否为TCP、数据包的分片偏移是否为0等。

6. 如果数据包符合GRO处理条件,GRO函数将数据包放入GRO队列,等待下一步的处理。

7. 如果数据包不符合GRO处理条件,数据包将被直接放入协议栈进行进一步处理。

8. GRO函数定时或者GRO队列满了时,会对GRO队列中的数据包进行合并,生成一个大数据包。

9. GRO函数将合并后的大数据包放回到协议栈中,等待进一步处理。

需要注意的是,不同的网卡驱动实现可能有所不同,GRO函数的实现也可能有所不同。此外,GRO函数只针对TCP协议的数据包进行处理,对于其他协议的数据包不会进行GRO处理。

“相关推荐”对你有帮助么?

非常没帮助

没帮助

一般

有帮助

非常有帮助

提交

lucien

CSDN认证博客专家

CSDN认证企业博客

码龄14年

暂无认证

12

原创

23万+

周排名

141万+

总排名

35万+

访问

等级

3807

积分

60

粉丝

37

获赞

15

评论

151

收藏

私信

关注

热门文章

使用openssl命令剖析RSA私钥文件格式

40482

linux kernel 网络协议栈之GRO(Generic receive offload)

18138

Open vSwitch使用笔记

10564

linux锁机制

9716

squid代理原理与配置

8379

分类专栏

linux内核

92篇

linux驱动

22篇

安全相关

31篇

linux服务配置

5篇

android开发

1篇

生活相关

最新评论

Linux 2.6版内核中通过模块获取sys_call_table地址的方法

Anansi_safe:

insmod模块后直接提示:"已杀死"然后...就没然后了,lsmod显示模块已经被加载,但是rmmod卸不掉,只能重启电脑

isatap ipv6隧道技术原理

weixin_39482444:

???

Unable to handle kernel NULL pointer dereference.

Shidx.:

你好 大神,Kernel panic 的问题可以加个QQ 沟通交流一下吗?1015369928~

linux kernel 网络协议栈之GRO(Generic receive offload)

gannicus.guo:

对GRO每段的分析既有总结性的描述,又有细节的展开,厉害

使用openssl命令剖析RSA私钥文件格式

路和前方:

非常清晰,想请问一下楼主怎样将私钥的整数包装成私钥文件,openssl有工具吗

您愿意向朋友推荐“博客详情页”吗?

强烈不推荐

不推荐

一般般

推荐

强烈推荐

提交

最新文章

pluto list

Linux网络地址转换NAT源码分析

Netfilter源代码分析详解

2014年7篇

2013年67篇

2012年80篇

目录

目录

分类专栏

linux内核

92篇

linux驱动

22篇

安全相关

31篇

linux服务配置

5篇

android开发

1篇

生活相关

目录

评论 2

被折叠的  条评论

为什么被折叠?

到【灌水乐园】发言

查看更多评论

添加红包

祝福语

请填写红包祝福语或标题

红包数量

红包个数最小为10个

红包总金额

红包金额最低5元

余额支付

当前余额3.43元

前往充值 >

需支付:10.00元

取消

确定

下一步

知道了

成就一亿技术人!

领取后你会自动成为博主和红包主的粉丝

规则

hope_wisdom 发出的红包

实付元

使用余额支付

点击重新获取

扫码支付

钱包余额

0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。 2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值

百度知道 - 信息提示

百度知道 - 信息提示

百度首页

商城

注册

登录

网页

资讯

视频

图片

知道

文库

贴吧采购

地图更多

搜索答案

我要提问

百度知道>提示信息

知道宝贝找不到问题了>_

该问题可能已经失效。返回首页

15秒以后自动返回

帮助

 | 意见反馈

 | 投诉举报

京ICP证030173号-1   京网文【2023】1034-029号     ©2024Baidu  使用百度前必读 | 知道协议 

一文搞懂内核网络中的GRO/RFS/RPS调优 - 知乎

一文搞懂内核网络中的GRO/RFS/RPS调优 - 知乎首发于Linux内核切换模式写文章登录/注册一文搞懂内核网络中的GRO/RFS/RPS调优玩转Linux内核官方社区最新信息搜集、文章推送、教程学习、技巧分享等~1. 前言本文主要介绍内核网络中GRO、RFS、RPS等技术,并针对其对应的规则进行网络调优。重点对RPS的工作过程和内核代码进行了分析,分析了数据如何从网卡进入到协议层。2. GRO(Generic Receive Offloading)Large Receive Offloading (LRO) 是一个硬件优化,GRO 是 LRO 的一种软件实现。两种方案的主要思想都是:通过合并“足够类似”的包来减少传送给网络栈的包数,这有助于减少 CPU 的使用量。例如,考虑大文件传输的场景,包的数量非常多,大部分包都是一段文件数据。相比于每次都将小包送到网络栈,可以将收到的小包合并成一个很大的包再送到网络栈。GRO 使协议层只需处理一个 header,而将包含大量数据的整个大包送到用户程序。这类优化方式的缺点是信息丢失:包的 option 或者 flag 信息在合并时会丢失。这也是为什么大部分人不使用或不推荐使用LRO 的原因。LRO 的实现,一般来说,对合并包的规则非常宽松。GRO 是 LRO 的软件实现,但是对于包合并的规则更严苛。如果用 tcpdump 抓包,有时会看到机器收到了看起来不现实的、非常大的包, 这很可能是系统开启了 GRO。2.1 使用 ethtool 修改 GRO 配置使用 ethtool 的 -k 选项查看 GRO 配置:-K 修改 GRO 配置:$ sudo ethtool -K ens33 gro on

对于大部分驱动,修改 GRO 配置会涉及先 down 再 up 这个网卡,因此这个网卡上的连接都会中断。2.2 napi_gro_receive如果开启了 GRO,napi_gro_receive 将负责处理网络数据,并将数据送到协议栈,大部分相关的逻辑在函数 dev_gro_receive 里实现。dev_gro_receive这个函数首先检查 GRO 是否开启了,如果是,就准备做 GRO。GRO 首先遍历一个 offload filter 列表,如果高层协议认为其中一些数据属于 GRO 处理的范围,就会允许其对数据进行操作。协议层以此方式让网络设备层知道,这个 packet 是不是当前正在处理的一个需要做 GRO 的 network flow 的一部分,而且也可以通过这种方式传递一些协议相关的信息。例如,TCP 协议需要判断是否应该将一个 ACK 包合并到其他包里。net/core/dev.c:list_for_each_entry_rcu(ptype, head, list) {

if (ptype->type != type || !ptype->callbacks.gro_receive)

continue;

skb_set_network_header(skb, skb_gro_offset(skb));

skb_reset_mac_len(skb);

NAPI_GRO_CB(skb)->same_flow = 0;

NAPI_GRO_CB(skb)->flush = 0;

NAPI_GRO_CB(skb)->free = 0;

pp = ptype->callbacks.gro_receive(&napi->gro_list, skb);

break;

}

如果协议层提示是时候 flush GRO packet 了,那就到下一步处理了。这发生在 napi_gro_complete,会进一步调用相应协议的 gro_complete 回调方法,然后调用 netif_receive_skb 将包送到协议栈。这个过程见net/core/dev.c:if (pp) {

struct sk_buff *nskb = *pp;

*pp = nskb->next;

nskb->next = NULL;

napi_gro_complete(nskb);

napi->gro_count--;

}

接下来,如果协议层将这个包合并到一个已经存在的 flow,napi_gro_receive 就没什么事情需要做,因此就返回了。如果 packet 没有被合并,而且 GRO 的数量小于 MAX_GRO_SKBS( 默认是 8),就会创建一个新的 entry 加到本 CPU 的 NAPI 变量的 gro_list。net/core/dev.c:if (NAPI_GRO_CB(skb)->flush || napi->gro_count >= MAX_GRO_SKBS)

goto normal;

napi->gro_count++;

NAPI_GRO_CB(skb)->count = 1;

NAPI_GRO_CB(skb)->age = jiffies;

skb_shinfo(skb)->gso_size = skb_gro_len(skb);

skb->next = napi->gro_list;

napi->gro_list = skb;

ret = GRO_HELD;

这就是 Linux 网络栈中 GRO 的工作原理。【文章福利】小编推荐自己的Linux内核技术交流群:【865977150】整理了一些个人觉得比较好的学习书籍、视频资料共享在群文件里面,有需要的可以自行添加哦!!!前100名进群领取,额外赠送一份价值699的内核资料包(含视频教程、电子书、实战项目及代码)学习直通车:内核资料直通车:2.3 napi_skb_finish一旦 dev_gro_receive 完成,napi_skb_finish 就会被调用,如果一个 packet 被合并了 ,就释放不用的变量;或者调用 netif_receive_skb 将数据发送到网络协议栈。3. RFS (Receive Flow Steering)RFS(Receive flow steering)和 RPS 配合使用。RPS 试图在 CPU 之间平衡收包,但是没考虑数据的本地性问题,如何最大化 CPU 缓存的命中率。RFS 将属于相同 flow 的包送到相同的 CPU 进行处理,可以提高缓存命中率。调优:打开 RFSRPS 记录一个全局的 hash table,包含所有 flow 的信息。这个 hash table 的大小可以在 net.core.rps_sock_flow_entries配置:$ sudo sysctl -w net.core.rps_sock_flow_entries=32768

其次,可以设置每个 RX queue 的 flow 数量,对应着 rps_flow_cnt:例如,eth0 的 RX queue0 的 flow 数量调整到 2048:$ sudo bash -c 'echo 2048 > /sys/class/net/eth0/queues/rx-0/rps_flow_cnt'

4. RPS(Receive Packet Steering)每个 NAPI 变量都会运行在相应 CPU 的软中断的上下文中。而且,触发硬中断的这个 CPU 接下来会负责执行相应的软中断处理函数来收包。换言之,同一个 CPU 既处理硬中断,又处理相应的软中断。一些网卡(例如 Intel I350)在硬件层支持多队列。这意味着收进来的包会被通过 DMA 放到位于不同内存的队列上,而不同的队列有相应的 NAPI 变量管理软中断 poll()过程。因此, 多个 CPU 同时处理从网卡来的中断,处理收包过程。这个特性被称作 RSS(Receive Side Scaling,接收端扩展)。RPS (Receive Packet Steering,接收包控制,接收包引导)是 RSS 的一种软件实现。因为是软件实现的,意味着任何网卡都可以使用这个功能,即便是那些只有一个接收队列的网卡。但是,因为它是软件实现的,这意味着 RPS 只能在 packet 通过 DMA 进入内存后,RPS 才能开始工作。这意味着,RPS 并不会减少 CPU 处理硬件中断和 NAPI poll(软中断最重要的一部分)的时间,但是可以在 packet 到达内存后,将 packet 分到其他 CPU,从其他 CPU 进入协议栈。4.1 不使用 RPS(默认)如果 RPS 没启用,会调用__netif_receive_skb,它做一些 bookkeeping 工作,进而调用 __netif_receive_skb_core,将数据移动到离协议栈更近一步。4.2 使用 RPS如果 RPS 启用了,它会做一些计算,判断使用哪个 CPU 的 backlog queue,这个过程由 get_rps_cpu 函数完成。net/core/dev.c:cpu = get_rps_cpu(skb->dev, skb, &rflow);

if (cpu >= 0) {

ret = enqueue_to_backlog(skb, cpu, &rflow->last_qtail);

rcu_read_unlock();

return ret;

}

get_rps_cpu 会考虑 RFS 和 aRFS 设置,以此选出一个合适的 CPU,通过调用 enqueue_to_backlog 将数据放到它的 backlog queue。假如网卡支持 aRFS,你可以开启它并做如下配置:打开并配置 RPS打开并配置 RFS内核中编译期间指定了 CONFIG_RFS_ACCEL 选项。Ubuntu kernel 3.13.0 是有的打开网卡的 ntuple 支持。可以用 ethtool 查看当前的 ntuple 设置配置 IRQ(硬中断)中每个 RX 和 CPU 的对应关系以上配置完成后,aRFS 就会自动将 RX queue 数据移动到指定 CPU 的内存,每个 flow 的包都会到达同一个 CPU,不需要你再通过 ntuple 手动指定每个 flow 的配置了。4.2.1 enqueue_to_backlog首先从远端 CPU 的 struct softnet_data 变量获取 backlog queue 长度。如果 backlog 大于 netdev_max_backlog,或者超过了 flow limit,直接 drop,并更新 softnet_data 的 drop 统计。注意这是远端 CPU 的统计。net/core/dev.c:qlen = skb_queue_len(&sd->input_pkt_queue);

if (qlen <= netdev_max_backlog && !skb_flow_limit(skb, qlen)) {

if (skb_queue_len(&sd->input_pkt_queue)) {

enqueue:

__skb_queue_tail(&sd->input_pkt_queue, skb);

input_queue_tail_incr_save(sd, qtail);

return NET_RX_SUCCESS;

}

/* Schedule NAPI for backlog device */

if (!__test_and_set_bit(NAPI_STATE_SCHED, &sd->backlog.state)) {

if (!rps_ipi_queued(sd))

____napi_schedule(sd, &sd->backlog);

}

goto enqueue;

}

sd->dropped++;

kfree_skb(skb);

return NET_RX_DROP;

enqueue_to_backlog 被调用的地方很少。在基于 RPS 处理包的地方,以及 netif_rx,会调用到它。大部分驱动都不应该使用 netif_rx,而应该用 netif_receive_skb。如果没用到 RPS,驱动也没有使用 netif_rx,那增大 backlog 并不会带来益处,因为它根本没被用到。注意:检查驱动,如果它调用了 netif_receive_skb,而且没用 RPS,那增大 netdev_max_backlog 并不会带来任何性能提升,因为没有数据包会被送到 input_pkt_queue。如果 input_pkt_queue 足够小,而 flow limit 也还没达到(或者被禁掉了 ),那数据包将会被放到队列。这里的逻辑有点 funny,但大致可以归为为:如果 backlog 是空的:如果远端 CPU NAPI 变量没有运行,并且 IPI 没有被加到队列,那就 触发一个 IPI 加到队列,然后调用____napi_schedule 进一步处理。如果 backlog 非空,或者远端 CPU NAPI 变量正在运行,那就 enqueue 包 这里使用了 goto,所以代码看起来有点 tricky。net/core/dev.c:if (skb_queue_len(&sd->input_pkt_queue)) {

enqueue:

__skb_queue_tail(&sd->input_pkt_queue, skb);

input_queue_tail_incr_save(sd, qtail);

rps_unlock(sd);

local_irq_restore(flags);

return NET_RX_SUCCESS;

}

/* Schedule NAPI for backlog device

* We can use non atomic operation since we own the queue lock

*/

if (!__test_and_set_bit(NAPI_STATE_SCHED, &sd->backlog.state)) {

if (!rps_ipi_queued(sd))

____napi_schedule(sd, &sd->backlog);

}

goto enqueue;

4.2.2 Flow limitsRPS 在不同 CPU 之间分发 packet,但是,如果一个 flow 特别大,会出现单个 CPU 被打爆,而其他 CPU 无事可做(饥饿)的状态。因此引入了 flow limit 特性,放到一个 backlog 队列的属 于同一个 flow 的包的数量不能超过一个阈值。这可以保证即使有一个很大的 flow 在大量收包 ,小 flow 也能得到及时的处理。net/core/dev.c:/*

* enqueue_to_backlog is called to queue an skb to a per CPU backlog

* queue (may be a remote CPU queue).

*/

static int enqueue_to_backlog(struct sk_buff *skb, int cpu,

unsigned int *qtail)

{

struct softnet_data *sd;

unsigned long flags;

unsigned int qlen;

sd = &per_cpu(softnet_data, cpu);

local_irq_save(flags);

rps_lock(sd);

qlen = skb_queue_len(&sd->input_pkt_queue);

if (qlen <= netdev_max_backlog && !skb_flow_limit(skb, qlen)) {

if (skb_queue_len(&sd->input_pkt_queue)) {

enqueue:

__skb_queue_tail(&sd->input_pkt_queue, skb);

input_queue_tail_incr_save(sd, qtail);

rps_unlock(sd);

local_irq_restore(flags);

return NET_RX_SUCCESS;

}

/* Schedule NAPI for backlog device

* We can use non atomic operation since we own the queue lock

*/

if (!__test_and_set_bit(NAPI_STATE_SCHED, &sd->backlog.state)) {

if (!rps_ipi_queued(sd))

____napi_schedule(sd, &sd->backlog);

}

goto enqueue;

}

sd->dropped++;

rps_unlock(sd);

local_irq_restore(flags);

atomic_long_inc(&skb->dev->rx_dropped);

kfree_skb(skb);

return NET_RX_DROP;

}

默认,flow limit 功能是关掉的。要打开 flow limit,需要指定一个 bitmap(类似于 RPS 的 bitmap)。监控:由于 input_pkt_queue 打满或 flow limit 导致的丢包,在/proc/net/softnet_stat 里面的 dropped 列计数。调优 Tuning: Adjusting netdev_max_backlog to prevent drops 在调整这个值之前,请先阅读前面的“注意”。如果使用了 RPS,或者驱动调用了 netif_rx,那增加 netdev_max_backlog 可以改善在 enqueue_to_backlog 里的丢包:例如:increase backlog to 3000 with sysctl.$ sudo sysctl -w net.core.netdev_max_backlog=3000

默认值是 1000。Tuning: Adjust the NAPI weight of the backlog poll loopnet.core.dev_weight 决定了 backlog poll loop 可以消耗的整体 budget$ sudo sysctl -w net.core.dev_weight=600

默认值是 64。backlog 处理逻辑和设备驱动的 poll 函数类似,都是在软中断(softirq)的上下文中执行,因此受整体 budget 和处理时间的限制。Tuning: Enabling flow limits and tuning flow limit hash table size$ sudo sysctl -w net.core.flow_limit_table_len=8192

默认值是 4096.这只会影响新分配的 flow hash table。所以,如果你想增加 table size 的话,应该在打开 flow limit 功能之前设置这个值。打开 flow limit 功能的方式是,在/proc/sys/net/core/flow_limit_cpu_bitmap 中指定一 个 bitmask,和通过 bitmask 打开 RPS 的操作类似。4.2.3 处理 backlog 队列:NAPI poller每个 CPU 都有一个 backlog queue,其加入到 NAPI 变量的方式和驱动差不多,都是注册一个 poll 方法,在软中断的上下文中处理包。此外,还提供了一个 weight,这也和驱动类似 。注册发生在网络系统初始化的时候, net/core/dev.c的 net_dev_init 函数:sd->backlog.poll = process_backlog;

sd->backlog.weight = weight_p;

sd->backlog.gro_list = NULL;

sd->backlog.gro_count = 0;

backlog NAPI 变量和设备驱动 NAPI 变量的不同之处在于,它的 weight 是可以调节的,而设备驱动是 hardcode 64。4.2.4 process_backlogprocess_backlog 是一个循环,它会一直运行直至 weight用完,或者 backlog 里没有数据了。backlog queue 里的数据取出来,传递给__netif_receive_skb。这个函数做的事情和 RPS 关闭的情况下做的事情一样。即,__netif_receive_skb 做一些 bookkeeping 工作,然后调用__netif_receive_skb_core 将数据发送给更上面的协议层。process_backlog 和 NAPI 之间遵循的合约,和驱动和 NAPI 之间的合约相同:NAPI is disabled if the total weight will not be used. The poller is restarted with the call to ____napi_schedule from enqueue_to_backlog as described above.函数返回接收完成的数据帧数量(在代码中是变量 work),net_rx_action将会从 budget(通过 net.core.netdev_budget 可以调整)里减去这个值。4.2.5 __netif_receive_skb_core:将数据送到抓包点(tap)或协议层__netif_receive_skb_core 完成将数据送到协议栈这一繁重工作(the heavy lifting of delivering the data)。在此之前,它会先检查是否插入了 packet tap(探测点),这些 tap 是抓包用的。例如,AF_PACKET 地址族就可以插入这些抓包指令, 一般通过 libpcap 库。如果存在抓包点(tap),数据就会先到抓包点,然后才到协议层。4.2.6 送到抓包点(tap)如果有 packet tap(通常通过 libpcap),packet 会送到那里。net/core/dev.c:list_for_each_entry_rcu(ptype, &ptype_all, list) {

if (!ptype->dev || ptype->dev == skb->dev) {

if (pt_prev)

ret = deliver_skb(skb, pt_prev, orig_dev);

pt_prev = ptype;

}

}

packet 如何经过 pcap 可以阅读 net/packet/af_packet.c。4.2.7 送到协议层处理完 taps 之后,__netif_receive_skb_core 将数据发送到协议层。它会从数据包中取出协议信息,然后遍历注册在这个协议上的回调函数列表。可以看__netif_receive_skb_core 函数,net/core/dev.c:type = skb->protocol;

list_for_each_entry_rcu(ptype,

&ptype_base[ntohs(type) & PTYPE_HASH_MASK], list) {

if (ptype->type == type &&

(ptype->dev == null_or_dev || ptype->dev == skb->dev ||

ptype->dev == orig_dev)) {

if (pt_prev)

ret = deliver_skb(skb, pt_prev, orig_dev);

pt_prev = ptype;

}

}

上面的 ptype_base 是一个 hash table,定义在net/core/dev.c中:struct list_head ptype_base[PTYPE_HASH_SIZE] __read_mostly;

每种协议在上面的 hash table 的一个 slot 里,添加一个过滤器到列表里。这个列表的头用如下函数获取:static inline struct list_head *ptype_head(const struct packet_type *pt)

{

if (pt->type == htons(ETH_P_ALL))

return &ptype_all;

else

return &ptype_base[ntohs(pt->type) & PTYPE_HASH_MASK];

}

添加的时候用 dev_add_pack 这个函数。这就是协议层如何注册自身,用于处理相应协议的网络数据的。4.3 RPS 调优使用 RPS 需要在内核做配置(Ubuntu + Kernel 3.13.0 支持),而且需要一个掩码( bitmask)指定哪些 CPU 可以处理那些 RX 队列。相关的一些信息可以在内核文档里找到。bitmask 配置位于:/sys/class/net/DEVICE_NAME/queues/QUEUE/rps_cpus例如,对于 eth0 的 queue 0,你需要更改/sys/class/net/eth0/queues/rx-0/rps_cpus。内核文档里说,对一些特定的配置下,RPS 没必要了。注意:打开 RPS 之后,原来不需要处理软中断(softirq)的 CPU 这时也会参与处理。因此相应 CPU 的 NET_RX 数量,以及 si 或 sitime 占比都会相应增加。可以对比启用 RPS 前后的数据,以此来确定配置是否生效,以及是否符合预期(哪个 CPU 处理哪个网卡的哪个中断)。5. 总结本文大篇幅在分析RPS的工作原理。RPS 的工作原理是对个 packet 做 hash,以此决定分到哪个 CPU 处理。然后 packet 放到每个 CPU 独占的接收后备队列(backlog)等待处理。这个 CPU 会触发一个进程间中断( IPI,Inter-processor Interrupt)向对端 CPU。如果当时对端 CPU 没有在处理 backlog 队列收包,这个进程间中断会 触发它开始从 backlog 收包。/proc/net/softnet_stat 其中有一列是记录 softnet_data 变量(也即这个 CPU)收到了多少 IPI(received_rps 列)。因此,netif_receive_skb 或者继续将包送到协议栈,或者交给 RPS,后者会转交给其他 CPU 处理。好文推荐:2022年嵌入式开发想进互联网大厂,你技术过硬吗?从事十年嵌入式转内核开发(23K到45K),给兄弟们的一些建议腾讯首发Linux内核源码《嵌入式开发进阶笔记》差距差的不止一点点哦编辑于 2022-05-05 00:32Linux 内核Windows 内核操作系统内核​赞同 5​​添加评论​分享​喜欢​收藏​申请转载​文章被以下专栏收录Linux内核一种开源电脑操作系统内核。C语

Linux GRO流程分析 - 知乎

Linux GRO流程分析 - 知乎切换模式写文章登录/注册Linux GRO流程分析Sky创业1、概述GRO是针对报文接收方向的,是指设备链路层在接收报文处理的时候,将多个小包合并成一个大包一起上送协议栈,减少数据包在协议栈间交互的机制。可以通过ethtool -K eth0 gro on/off来打开或关闭GRO功能,GRO虽然可以提升吞吐,但同时也会带来一定是时延增加。GRO是需要网卡有NAPI的能力,驱动通过NAPI收上来包后,判断如果有启用GRO功能,则将包按流的方式先存放在napi->gro_list链表里,等NAPI收完包或GRO链表里的skb超时,或者GRO合并过程中判断需要上送协议栈处理时,将对应的gro链表的skb上送协议栈。struct napi_struct { /* The poll_list must only be managed by the entity which * changes the state of the NAPI_STATE_SCHED bit. This means * whoever atomically sets that bit can add this napi_struct * to the per-cpu poll_list, and whoever clears that bit * can remove from the list right before clearing the bit. */ struct list_head poll_list; unsigned long state; int weight; //gro链表流的个数,最多不超过8个 unsigned int gro_count; int (*poll)(struct napi_struct *, int);#ifdef CONFIG_NETPOLL spinlock_t poll_lock; int poll_owner;#endif struct net_device *dev; //gro链表 struct sk_buff *gro_list; struct sk_buff *skb; struct list_head dev_list; struct hlist_node napi_hash_node; unsigned int napi_id; RH_KABI_EXTEND(size_t size) RH_KABI_EXTEND(struct hrtimer timer)};2、流程分析ixgbe_rx_skb网卡驱动从rx ring里收到包后,调用ixgbe_rx_skb上送协议栈,ixgbe_rx_skb判断上层socket是否有在对队列polling,如果没有,则进入gro合并入口函数napi_gro_receive;static void ixgbe_rx_skb(struct ixgbe_q_vector *q_vector, struct sk_buff *skb){ skb_mark_napi_id(skb, &q_vector->napi); if (ixgbe_qv_busy_polling(q_vector)) netif_receive_skb(skb); else napi_gro_receive(&q_vector->napi, skb);}dev_gro_receivegro入口函数进一步调用dev_gro_receive,在dev_gro_receive里,先重置下skb的mac层信息,然后调用ip层提供的GRO回调函数,上层回调函数判断napi->gro_list链表里是否有跟skb是同一条流的,如果存在,则将skb合并到对应的skb里,如果不存在,返回到dev_gro_receive函数后,将新的skb插入到napi->gro_list的末尾,作为这条流的首包。static enum gro_result dev_gro_receive(struct napi_struct *napi, struct sk_buff *skb){ struct sk_buff **pp = NULL; struct packet_offload *ptype; __be16 type = skb->protocol; struct list_head *head = &offload_base; int same_flow; enum gro_result ret; int grow; if (!(skb->dev->features & NETIF_F_GRO)) goto normal; if (skb_is_gso(skb) || skb_has_frag_list(skb) || skb->csum_bad) goto normal; gro_list_prepare(napi, skb); rcu_read_lock(); list_for_each_entry_rcu(ptype, head, list) { if (ptype->type != type || !ptype->callbacks.gro_receive) continue; skb_set_network_header(skb, skb_gro_offset(skb)); skb_reset_mac_len(skb); //先将same_flow清零 NAPI_GRO_CB(skb)->same_flow = 0; NAPI_GRO_CB(skb)->flush = 0; NAPI_GRO_CB(skb)->free = 0; NAPI_GRO_CB(skb)->encap_mark = 0; NAPI_GRO_CB(skb)->recursion_counter = 0; NAPI_GRO_CB(skb)->is_atomic = 1; NAPI_GRO_CB(skb)->gro_remcsum_start = 0; /* Setup for GRO checksum validation */ switch (skb->ip_summed) { case CHECKSUM_COMPLETE: NAPI_GRO_CB(skb)->csum = skb->csum; NAPI_GRO_CB(skb)->csum_valid = 1; NAPI_GRO_CB(skb)->csum_cnt = 0; break; case CHECKSUM_UNNECESSARY: NAPI_GRO_CB(skb)->csum_cnt = skb->csum_level + 1; NAPI_GRO_CB(skb)->csum_valid = 0; break; default: NAPI_GRO_CB(skb)->csum_cnt = 0; NAPI_GRO_CB(skb)->csum_valid = 0; } pp = ptype->callbacks.gro_receive(&napi->gro_list, skb); break; } rcu_read_unlock(); if (&ptype->list == head) goto normal; //在回调网络层、传输层的gro合并回调函数时,会判断已有的gro链表是否存在相同流的 //如果存在,same_flow为置1,因此这里判断same_flow的值,如果为0,说明是流首包 //如果非0,说明skb已经被合并到gro_list里了 same_flow = NAPI_GRO_CB(skb)->same_flow; ret = NAPI_GRO_CB(skb)->free ? GRO_MERGED_FREE : GRO_MERGED; //pp为非空,说明需要flush if (pp) { struct sk_buff *nskb = *pp; *pp = nskb->next; nskb->next = NULL; napi_gro_complete(nskb); napi->gro_count--; } //如果存在同一条流的, 说明在gro_receive流程里已经将skb合入到gro_list里了,因此这里不需要再处理了 if (same_flow) goto ok; //这个skb需要直接上送协议栈,不能添加到gro_list if (NAPI_GRO_CB(skb)->flush) goto normal; //gro链表上一共有8条流了,则再添加新的一条流前,把链表里最老的那条流的skb先发送出去 if (unlikely(napi->gro_count >= MAX_GRO_SKBS)) { struct sk_buff *nskb = napi->gro_list; /* locate the end of the list to select the 'oldest' flow */ while (nskb->next) { pp = &nskb->next; nskb = *pp; } *pp = NULL; nskb->next = NULL; napi_gro_complete(nskb); } else { napi->gro_count++; } //走到这里说明,待合入的skb是这条流的首包,因此将其挂到gro_list里, //并将NAPI_GRO_CB(skb)->last指向自己 //并等待后续同一条流的skb到来 NAPI_GRO_CB(skb)->count = 1; NAPI_GRO_CB(skb)->age = jiffies; NAPI_GRO_CB(skb)->last = skb; skb_shinfo(skb)->gso_size = skb_gro_len(skb); skb->next = napi->gro_list; napi->gro_list = skb; ret = GRO_HELD;pull: grow = skb_gro_offset(skb) - skb_headlen(skb); if (grow > 0) gro_pull_from_frag0(skb, grow);ok: return ret;normal: ret = GRO_NORMAL; goto pull;}inet_gro_receiveGRO合并消息进入到ip层后,首先根据ip头的信息(源、宿ip)进一步找到skb_list里相同的流,然后判断待GRO合并的skb是否是分片数据包,分片数据包不能做GRO,最后重置下带GRO合并的skb的网络层信息后,进一步调用传输层的GRO回调函数;static struct sk_buff **inet_gro_receive(struct sk_buff **head, struct sk_buff *skb){ const struct net_offload *ops; struct sk_buff **pp = NULL; struct sk_buff *p; const struct iphdr *iph; unsigned int hlen; unsigned int off; unsigned int id; int flush = 1; int proto; off = skb_gro_offset(skb); hlen = off + sizeof(*iph); iph = skb_gro_header_fast(skb, off); if (skb_gro_header_hard(skb, hlen)) { iph = skb_gro_header_slow(skb, hlen, off); if (unlikely(!iph)) goto out; } proto = iph->protocol; rcu_read_lock(); ops = rcu_dereference(inet_offloads[proto]); if (!ops || !ops->callbacks.gro_receive) goto out_unlock; if (*(u8 *)iph != 0x45) goto out_unlock; if (unlikely(ip_fast_csum((u8 *)iph, 5))) goto out_unlock; id = ntohl(*(__be32 *)&iph->id); flush = (u16)((ntohl(*(__be32 *)iph) ^ skb_gro_len(skb)) | (id & ~IP_DF)); id >>= 16; for (p = *head; p; p = p->next) { struct iphdr *iph2; u16 flush_id //不是相同流的,跳过 if (!NAPI_GRO_CB(p)->same_flow) continue; //off为skb的data偏移,因为驱动就已经把mac头剥离了,所以这里的p->data是指向ip头 iph2 = (struct iphdr *)(p->data + off); /* The above works because, with the exception of the top * (inner most) layer, we only aggregate pkts with the same * hdr length so all the hdrs we'll need to verify will start * at the same offset. */ //再次判断ip头,确认是同一条流 if ((iph->protocol ^ iph2->protocol) | ((__force u32)iph->saddr ^ (__force u32)iph2->saddr) | ((__force u32)iph->daddr ^ (__force u32)iph2->daddr)) { NAPI_GRO_CB(p)->same_flow = 0; continue; } /* All fields must match except length and checksum. */ //分片数据包不能gro NAPI_GRO_CB(p)->flush |= (iph->ttl ^ iph2->ttl) | (iph->tos ^ iph2->tos) | (__force int)((iph->frag_off ^ iph2->frag_off) & htons(IP_DF)); NAPI_GRO_CB(p)->flush |= flush; /* We need to store of the IP ID check to be included later * when we can verify that this packet does in fact belong * to a given flow. */ flush_id = (u16)(id - ntohs(iph2->id)); /* This bit of code makes it much easier for us to identify * the cases where we are doing atomic vs non-atomic IP ID * checks. Specifically an atomic check can return IP ID * values 0 - 0xFFFF, while a non-atomic check can only * return 0 or 0xFFFF. */ if (!NAPI_GRO_CB(p)->is_atomic || !(iph->frag_off & htons(IP_DF))) { flush_id ^= NAPI_GRO_CB(p)->count; flush_id = flush_id ? 0xFFFF : 0; } /* If the previous IP ID value was based on an atomic * datagram we can overwrite the value and ignore it. */ if (NAPI_GRO_CB(skb)->is_atomic) NAPI_GRO_CB(p)->flush_id = flush_id; else NAPI_GRO_CB(p)->flush_id |= flush_id; } NAPI_GRO_CB(skb)->is_atomic = !!(iph->frag_off & htons(IP_DF)); NAPI_GRO_CB(skb)->flush |= flush; //设置ip头信息 skb_set_network_header(skb, off); /* The above will be needed by the transport layer if there is one * immediately following this IP hdr. */ //data_offset偏移增加ip头偏移 skb_gro_pull(skb, sizeof(*iph)); //设置传输层信息 skb_set_transport_header(skb, skb_gro_offset(skb)); pp = call_gro_receive(ops->callbacks.gro_receive, head, skb);out_unlock: rcu_read_unlock();out: NAPI_GRO_CB(skb)->flush |= flush; return pp;}tcp4_gro_receive进入到传输层的GRO处理函数后,首先对待合并的skb做checksum校验;static struct sk_buff **tcp4_gro_receive(struct sk_buff **head, struct sk_buff *skb){ /* Don't bother verifying checksum if we're going to flush anyway. */ //先对skb做checksum校验,检验通过后csum_valid if (!NAPI_GRO_CB(skb)->flush && skb_gro_checksum_validate(skb, IPPROTO_TCP, inet_gro_compute_pseudo)) { NAPI_GRO_CB(skb)->flush = 1; return NULL; } return tcp_gro_receive(head, skb);}校验通过后进一步调用tcp_gro_receive,在tcp_gro_receive里进一步根据tcp头部信息找到skb_list里相同的流,然后调用skb_gro_receive,skb_gro_receive为真正做GRO合并的处理函数,在skb_gro_receive将新的skb的线性区或非线性区合入到gro_skb的非线性区,合并完成后,同步更新gro_skb的data_len和len长度。如果合并过程发现gro_skb的非线性区域个数已经超过最大值(8个),则将skb最为一个新的数据包挂到gro_skb的next链表里。int skb_gro_receive(struct sk_buff **head, struct sk_buff *skb){ //走到这里说明head的skb与待合并的skb是同一条流 struct skb_shared_info *pinfo, *skbinfo = skb_shinfo(skb); //skb->data基于skb->head的偏移(此时skb->data指向tcp头) unsigned int offset = skb_gro_offset(skb); //线性区长度 unsigned int headlen = skb_headlen(skb); //skb的data数据长度(包括线性区和非线性区) unsigned int len = skb_gro_len(skb); struct sk_buff *lp, *p = *head; unsigned int delta_truesize; if (unlikely(p->len + len >= 65536)) return -E2BIG; lp = NAPI_GRO_CB(p)->last; pinfo = skb_shinfo(lp); //skb的线性区长度不超过offset,说明skb的线性区没有data数据,因此从skb的非线性区拷贝数据 //拷贝的数据放到gro_skb->last的非线性区 if (headlen <= offset) { skb_frag_t *frag; skb_frag_t *frag2; int i = skbinfo->nr_frags; int nr_frags = pinfo->nr_frags + i; //如果这个gro_skb->last的frags已经超标,则将新加入的skb挂到gro_skb->last里 if (nr_frags > MAX_SKB_FRAGS) goto merge; offset -= headlen; pinfo->nr_frags = nr_frags; skbinfo->nr_frags = 0; frag = pinfo->frags + nr_frags; frag2 = skbinfo->frags + i; do { *--frag = *--frag2; } while (--i); frag->page_offset += offset; skb_frag_size_sub(frag, offset); /* all fragments truesize : remove (head size + sk_buff) */ delta_truesize = skb->truesize - SKB_TRUESIZE(skb_end_offset(skb)); skb->truesize -= skb->data_len; skb->len -= skb->data_len; skb->data_len = 0; NAPI_GRO_CB(skb)->free = NAPI_GRO_FREE; goto done; } else if (skb->head_frag) { //将skb的线性区拷贝到拷贝到gro_skb->last的非线性区 int nr_frags = pinfo->nr_frags; skb_frag_t *frag = pinfo->frags + nr_frags; struct page *page = virt_to_head_page(skb->head); unsigned int first_size = headlen - offset; unsigned int first_offset; if (nr_frags + 1 + skbinfo->nr_frags > MAX_SKB_FRAGS) goto merge; first_offset = skb->data - (unsigned char *)page_address(page) + offset; pinfo->nr_frags = nr_frags + 1 + skbinfo->nr_frags; frag->page.p = page; frag->page_offset = first_offset; skb_frag_size_set(frag, first_size); memcpy(frag + 1, skbinfo->frags, sizeof(*frag) * skbinfo->nr_frags); /* We dont need to clear skbinfo->nr_frags here */ delta_truesize = skb->truesize - SKB_DATA_ALIGN(sizeof(struct sk_buff)); NAPI_GRO_CB(skb)->free = NAPI_GRO_FREE_STOLEN_HEAD; goto done; }merge: //gro->last的空间已满(frags个数已经达到最多的16个),将待合并的skb挂到gro_skb->last里 delta_truesize = skb->truesize; if (offset > headlen) { unsigned int eat = offset - headlen; skbinfo->frags[0].page_offset += eat; skb_frag_size_sub(&skbinfo->frags[0], eat); skb->data_len -= eat; skb->len -= eat; offset = headlen; } __skb_pull(skb, offset); if (NAPI_GRO_CB(p)->last == p) skb_shinfo(p)->frag_list = skb; else NAPI_GRO_CB(p)->last->next = skb; NAPI_GRO_CB(p)->last = skb; __skb_header_release(skb); lp = p;done: //合并完一个skb后,count计数加1 NAPI_GRO_CB(p)->count++; //data_len长度加len,len为新合并的skb的长度,因为新合并的skb都是放在p的非线性区,所以data_len要增加 p->data_len += len; p->truesize += delta_truesize; //整个skb长度增加len p->len += len; if (lp != p) { lp->data_len += len; lp->truesize += delta_truesize; lp->len += len; } NAPI_GRO_CB(skb)->same_flow = 1; return 0;}EXPORT_SYMBOL_GPL(skb_gro_receive);napi_gro_complete当GRO合并过程中判断需要刷新gro_list或者gro_list的流个数超过8个,再或者napi_poll过程判断需要刷新gro_list时,会调用napi_gro_complete处理函数,然后进一步调用ip层的complete处理函数inet_gro_complete;inet_gro_complete在ip层回调函数里,根据最新的skb->len,跟新ip头的checksum,然后进一步调用传输层的complete函数tcp4_gro_complete;在tcp4_gro_complete更新一下tcp的伪头部checksum,然后最终调用netif_receive_skb_internal将gro skb上送协议栈。static int inet_gro_complete(struct sk_buff *skb, int nhoff){ __be16 newlen = htons(skb->len - nhoff); struct iphdr *iph = (struct iphdr *)(skb->data + nhoff); const struct net_offload *ops; int proto = iph->protocol; int err = -ENOSYS; if (skb->encapsulation) { skb_set_inner_protocol(skb, cpu_to_be16(ETH_P_IP)); skb_set_inner_network_header(skb, nhoff); } //更新ip头的checksum,newlen为skb做gro合并后的新长度 csum_replace2(&iph->check, iph->tot_len, newlen); iph->tot_len = newlen; rcu_read_lock(); ops = rcu_dereference(inet_offloads[proto]); if (WARN_ON(!ops || !ops->callbacks.gro_complete)) goto out_unlock; /* Only need to add sizeof(*iph) to get to the next hdr below * because any hdr with option will have been flushed in * inet_gro_receive(). */ err = ops->callbacks.gro_complete(skb, nhoff + sizeof(*iph));out_unlock: rcu_read_unlock(); return err;}netif_receive_skb_internal在netif_receive_skb_internal里,判断是否有开启rps,如果有,则通过enqueue_to_backlog对应cpu的softnet_data的input_pkt_queue队列,如果不需要rps,则通过__netif_receive_skb进一步上送协议栈,最后通过ip层注册的回调函数ip_rcv进入ip层。static int netif_receive_skb_internal(struct sk_buff *skb){ int ret; net_timestamp_check(netdev_tstamp_prequeue, skb); if (skb_defer_rx_timestamp(skb)) return NET_RX_SUCCESS; rcu_read_lock(); //检查是否需要rps,如果要,则将报文放到cpu的softnet队列里,并且触发软中断 //软中断处理函数最终调用process_backlog从softnet队列里取出报文,上送协议栈#ifdef CONFIG_RPS if (static_key_false(&rps_needed)) { struct rps_dev_flow voidflow, *rflow = &voidflow; int cpu = get_rps_cpu(skb->dev, skb, &rflow); if (cpu >= 0) { ret = enqueue_to_backlog(skb, cpu, &rflow->last_qtail); rcu_read_unlock(); return ret; } }#endif //不需要rps,直接上送协议栈 ret = __netif_receive_skb(skb); rcu_read_unlock(); return ret;}发布于 2021-04-12 16:07Linux 内核协议栈开发​赞同 1​​1 条评论​分享​喜欢​收藏​申请

关于网卡特性TSO、UFO、GSO、LRO、GRO-阿里云开发者社区

关于网卡特性TSO、UFO、GSO、LRO、GRO-阿里云开发者社区

产品解决方案文档与社区权益中心定价云市场合作伙伴支持与服务了解阿里云售前咨询 95187-1 在线服务售后咨询 4008013260 在线服务其他服务 我要建议 我要投诉更多联系方式备案控制台开发者社区首页探索云世界探索云世界云上快速入门,热门云上应用快速查找了解更多问产品动手实践考认证TIANCHI大赛活动广场活动广场丰富的线上&线下活动,深入探索云世界任务中心做任务,得社区积分和周边高校计划让每位学生受益于普惠算力训练营资深技术专家手把手带教话题畅聊无限,分享你的技术见解开发者评测最真实的开发者用云体验乘风者计划让创作激发创新阿里云MVP遇见技术追梦人直播技术交流,直击现场下载下载海量开发者使用工具、手册,免费下载镜像站极速、全面、稳定、安全的开源镜像技术资料开发手册、白皮书、案例集等实战精华插件为开发者定制的Chrome浏览器插件探索云世界新手上云云上应用构建云上数据管理云上探索人工智能云计算弹性计算无影存储网络倚天云原生容器serverless中间件微服务可观测消息队列数据库关系型数据库NoSQL数据库数据仓库数据管理工具PolarDB开源向量数据库热门Modelscope模型即服务弹性计算云原生数据库物联网云效DevOps龙蜥操作系统平头哥钉钉开放平台大数据大数据计算实时数仓Hologres实时计算FlinkE-MapReduceDataWorksElasticsearch机器学习平台PAI智能搜索推荐人工智能机器学习平台PAI视觉智能开放平台智能语音交互自然语言处理多模态模型pythonsdk通用模型开发与运维云效DevOps钉钉宜搭支持服务镜像站码上公益

开发者社区

云计算

文章

正文

关于网卡特性TSO、UFO、GSO、LRO、GRO

2018-04-06

12743

版权

版权声明:

本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《

阿里云开发者社区用户服务协议》和

《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写

侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

简介:

我们来看下关于网卡特性的解释,不过记住GSO和GRO两个特性就好。

TSO(TCP Segmentation Offload),是利用网卡对TCP数据包分片,减轻CPU负荷的一种技术,也有人叫 LSO (Large segment offload) ,TSO是针对TCP的,UFO是针对UDP的。如果硬件支持 TSO功能,同时也需要硬件支持的TCP校验计算和分散/聚集 (Scatter Gather) 功能。如果网卡支持TSO/GSO,可以把最多64K大小的TCP payload直接往下传给协议栈,此时IP层也不会进行segmentation,网卡会生成TCP/IP包头和帧头,这样可以offload很多协议栈上的内存操作,节省CPU资源,当然如果都是小包,那么功能基本就没啥用了。

GSO(Generic Segmentation Offload),GSO是TSO的增强 ,GSO不只针对TCP,对任意协议。比TSO更通用,推迟数据分片直至发送到网卡驱动之前,此时会检查网卡是否支持分片功能(如TSO、UFO),如果支持直接发送到网卡,如果不支持就进行分片后再发往网卡。

LRO(Large Receive Offload),通过将接收到的多个TCP数据聚合成一个大的数据包,然后传递给网络协议栈处理,以减少上层协议栈处理 开销,提高系统接收TCP数据包的能力。

GRO(Generic Receive Offload),跟LRO类似,克服了LRO的一些缺点,更通用。后续的驱动都使用GRO的接口,而不是LRO。

            在系统中可以通过ethtool命令来进行查看,如下:

#ethtool -k eth0

generic-segmentation-offload: on

generic-receive-offload: on

TSO、UFO、GSO是对应网络发送, LRO、GRO是在接收方向上。

     我们只需要记住GSO/GRO两个关键字就好了,因为GSO是TSO/UFO的升级,GRO是LRO的升级。

binarydady

目录

热门文章

最新文章

为什么选择阿里云什么是云计算全球基础设施技术领先稳定可靠安全合规分析师报告产品和定价全部产品免费试用产品动态产品定价价格计算器云上成本管理解决方案技术解决方案文档与社区文档开发者社区天池大赛培训与认证权益中心免费试用高校计划企业扶持计划推荐返现计划支持与服务基础服务企业增值服务迁云服务官网公告健康看板信任中心关注阿里云关注阿里云公众号或下载阿里云APP,关注云资讯,随时随地运维管控云服务售前咨询:95187-1售后服务:400-80-13260法律声明及隐私权政策Cookies政策廉正举报安全举报联系我们加入我们阿里巴巴集团淘宝网天猫全球速卖通阿里巴巴国际交易市场1688阿里妈妈飞猪阿里云计算AliOS万网高德UC友盟优酷钉钉支付宝达摩院淘宝海外阿里云盘饿了么© 2009-2024 Aliyun.com 版权所有 增值电信业务经营许可证: 浙B2-20080101 域名注册服务机构许可: 浙D3-20210002 京D3-20220015浙公网安备 33010602009975号浙B2-20080101-4

Linux网络子系统中链路层中GRO的处理 - 知乎

Linux网络子系统中链路层中GRO的处理 - 知乎首发于TCP吞吐量提升切换模式写文章登录/注册Linux网络子系统中链路层中GRO的处理Sky创业GRO需要支持GRO的每种协议都要实现自己的报文匹配合并函数和合并完成函数。这里我们先来看看链路层上实现的自己的GRO函数。链路层的接收匹配函数__napi_gro_receive(napi, skb):该函数对报文进行匹配,并不合并报文。匹配规则(必须同时满足以下两个条件):1、两个报文的接收dev必须相同。2、两个报文的以太头必须相同。static int __napi_gro_receive(struct napi_struct *napi,struct sk_buff *skb){ struct sk_buff *p; /*遍历napi 实例上的gro_list上挂的skb, 根据上面说的匹配规则设置链表上报文的same字段*/ for (p = napi->gro_list; p; p = p->next) { NAPI_GRO_CB(p)->same_flow = (p->dev == skb->dev) && !compare_ether_header(skb_mac_header(p), skb_gro_mac_header(skb)); NAPI_GRO_CB(p)->flush = 0; } return dev_gro_receive(napi, skb);}int dev_gro_receive(struct napi_struct *napi,struct sk_buff *skb){ struct sk_buff **pp = NULL; struct packet_type *ptype; __be16 type = skb->protocol; struct list_head *head = &ptype_base[ntohs(type) & PTYPE_HASH_MASK]; int same_flow; int mac_len; int ret; /*如果接收网络设备设置成不支持GRO功能,就不进行GRO合并处理*/ if (!(skb->dev->features & NETIF_F_GRO)) { goto normal; } /*如果是ip 分片报文,不进行GRO处理,因为如果报文是经过三层转发的报文,不需要重组后再转发。 是否需要重组交由IP层进行处理,这里就不进行GRO的处理了。*/ if (skb_is_gso(skb) || skb_has_frags(skb)) { goto normal; } /*加RCU读锁对 ptype_base hahs 链表进行保护*/ rcu_read_lock(); /*遍历链表,找到处理该类型报文的ptype, *并且该类型的ptype 实现了处理gro 的函数 */ list_for_each_entry_rcu(ptype, head, list) { if (ptype->type != type || ptype->dev || !ptype->gro_receive) continue; /*如果找到了,初始化报文头指针, *并重置 skb中GRO使用的私有字段, *这些字段会在相应协议实现的GRO处理函数中进行设置 */ skb_set_network_header(skb, skb_gro_offset(skb)); mac_len = skb->network_header - skb->mac_header; skb->mac_len = mac_len; NAPI_GRO_CB(skb)->same_flow = 0; NAPI_GRO_CB(skb)->flush = 0; NAPI_GRO_CB(skb)->free = 0; /*调用该协议类型注册的GRO处理函数对报文进行处理*/ pp = ptype->gro_receive(&napi->gro_list, skb); break; } rcu_read_unlock(); /*如果没找到对该协议类型报文进行处理的GRO,不进行GRO操作*/ if (&ptype->list == head) { goto normal; } same_flow = NAPI_GRO_CB(skb)->same_flow; ret = NAPI_GRO_CB(skb)->free ? GRO_MERGED_FREE : GRO_MERGED; /*如果协议的GRO处理函数返回了合并后的报文, *就调用napi_gro_complete把报文送进协议栈进行处理 */ if (pp) { struct sk_buff *nskb = *pp; *pp = nskb->next; nskb->next = NULL; napi_gro_complete(nskb); napi->gro_count--; } /*如果same 被设置了,说明在链表上找到了相匹配的报文了, *已经合并过了,不再需要缓存了 */ if (same_flow) { goto ok; } /*如果没找到相匹配的报文,需要缓存。 *缓存前需要判断队列是否已满或该报文是否应该缓存 */ if (NAPI_GRO_CB(skb)->flush || napi->gro_count >= MAX_GRO_SKBS) { goto normal; } /*缓存没有匹配的报文到gro_list,返回值为GRO_HELD*/ napi->gro_count++; NAPI_GRO_CB(skb)->count = 1; skb_shinfo(skb)->gso_size = skb_gro_len(skb); skb->next = napi->gro_list; napi->gro_list = skb; ret = GRO_HELD;pull: /*经过这个协议栈的GRO receive的处理, *这时NAPI_GRO_CB(skb)->data_offset字段已经设置好了。 *如果GRO需要处理的数据不在skb的线性区, *把需要的数据copy到线性区,方便以后操作 */ if (skb_headlen(skb) < skb_gro_offset(skb)) { int grow = skb_gro_offset(skb) - skb_headlen(skb); BUG_ON(skb->end - skb->tail < grow); memcpy(skb_tail_pointer(skb), NAPI_GRO_CB(skb)->frag0, grow); skb->tail += grow; skb->data_len -= grow; skb_shinfo(skb)->frags[0].page_offset += grow; skb_shinfo(skb)->frags[0].size -= grow; /*如果把数据移入线性区后第一页就空了, *释放空页并把后续页依次前移 */ if (unlikely(!skb_shinfo(skb)->frags[0].size)) { put_page(skb_shinfo(skb)->frags[0].page); memmove(skb_shinfo(skb)->frags, skb_shinfo(skb)->frags + 1, (--skb_shinfo(skb)->nr_frags * sizeof(skb_frag_t))); } }ok: return ret;normal: ret = GRO_NORMAL; goto pull;}链路层的GRO完成函数:合并完成后的报文调用该函数来把报文送入协议栈。static int napi_gro_complete(struct sk_buff *skb){ struct packet_type *ptype; __be16 type = skb->protocol; struct list_head *head = &ptype_base[ntohs(type) & PTYPE_HASH_MASK]; int err = -ENOENT; /*如果没有和别的报文合并过, *就可以直接送协议栈进行处理了 */ if (NAPI_GRO_CB(skb)->count == 1) { skb_shinfo(skb)->gso_size = 0; goto out; } /*找到相关协议把报文送给协议的grp_complete函数处理*/ rcu_read_lock(); list_for_each_entry_rcu(ptype, head, list) { if (ptype->type != type || ptype->dev || !ptype->gro_complete) continue; err = ptype->gro_complete(skb); break; } rcu_read_unlock(); if (err) { WARN_ON(&ptype->list == head); kfree_skb(skb); return NET_RX_SUCCESS; } /*各层协议处理完成后,送给协议栈进行处理*/out: return netif_receive_skb(skb);}我们从上面分析看到,链路层处理完链路层上GRO的处理后,会再调用网络层上对应得GRO处理。每一层协议自己负责自己的GRO处理。上次处理完后把处理结果返给下一层。最终由链路层来根据处理结果来把报文送给协议栈 。发布于 2021-04-25 20:58通信协议网络协议TCP/IP​赞同​​添加评论​分享​喜欢​收藏​申请转载​文章被以下专栏收录TCP吞吐量提升提升TCP吞吐量,优化测

3. GRO机制 - 简书

RO机制 - 简书登录注册写文章首页下载APP会员IT技术3. GRO机制霜晨月_ScY关注赞赏支持3. GRO机制3. GRO(Generic receive offload)

上一篇已经说到了NAPI,但其中我们看到,NAPI相关的另一个东西GRO。GRO是啥,干什么的,为什么要用?本篇博客是对这个大佬博客的引用:大佬博客。我只将其代码更新到了4.18.16,感谢大佬的分享。

gro会合并多个gso_size不同的包, 会将gso_size设置成第一个包的gso_size.GRO(Generic receive offload)在内核2.6.29之后合并进去的,作者是一个华裔Herbert Xu ,GRO的简介可以看这里:就是这里。

先来描述一下GRO的作用,GRO是针对网络接受包的处理的,并且只是针对NAPI类型的驱动,因此如果要支持GRO,不仅要内核支持,而且驱动也必须调用相应的借口,用ethtool -K gro on来设置,如果报错就说明网卡驱动本身就不支持GRO。

GRO类似tso,可是tso只支持发送数据包,这样你tcp层大的段会在网卡被切包,然后再传递给对端,而如果没有gro,则小的段会被一个个送到协议栈,有了gro之后,就会在接收端做一个反向的操作(想对于tso).也就是将tso切好的数据包组合成大包再传递给协议栈。

如果实现了GRO支持的驱动是这样子处理数据的,在NAPI的回调poll方法中读取数据包,然后调用GRO的接口napi_gro_receive或者napi_gro_frags来将数据包feed进协议栈。而具体GRO的工作就是在这两个函数中进行的,他们最终都会调用__napi_gro_receive。下面就是napi_gro_receive,它最终会调用napi_skb_finish以及__napi_gro_receive。

gro_result_t napi_gro_receive(struct napi_struct *napi, struct sk_buff *skb)

{

skb_mark_napi_id(skb, napi);

trace_napi_gro_receive_entry(skb);

skb_gro_reset_offset(skb);

return napi_skb_finish(dev_gro_receive(napi, skb), skb);

}

然后GRO什么时候会将数据feed进协议栈呢,这里会有两个退出点,一个是在napi_skb_finish里,他会通过判断__napi_gro_receive的返回值,来决定是需要将数据包立即feed进协议栈还是保存起来,还有一个点是当napi的循环执行完毕时,也就是执行napi_complete的时候,先来看napi_skb_finish,napi_complete我们后面会详细介绍。

在NAPI驱动中,直接调用netif_receive_skb_internal会将数据feed 进协议栈,因此这里如果返回值是NORMAL,则直接调用netif_receive_skb_internal来将数据送进协议栈。

static gro_result_t napi_skb_finish(gro_result_t ret, struct sk_buff *skb)

{

switch (ret) {

case GRO_NORMAL:

if (netif_receive_skb_internal(skb))

ret = GRO_DROP;

break;

case GRO_DROP:

kfree_skb(skb);

break;

case GRO_MERGED_FREE:

if (NAPI_GRO_CB(skb)->free == NAPI_GRO_FREE_STOLEN_HEAD)

napi_skb_free_stolen_head(skb);

else

__kfree_skb(skb);

break;

case GRO_HELD:

case GRO_MERGED:

case GRO_CONSUMED:

break;

}

return ret;

}

GRO的主要思想就是,组合一些类似的数据包(基于一些数据域,后面会介绍到)为一个大的数据包(一个skb),然后feed给协议栈,这里主要是利用Scatter-gather IO,也就是skb的struct skb_shared_info域(我前面的blog讲述ip分片的时候有详细介绍这个域)来合并数据包。

在每个NAPI的实例都会包括一个域叫gro_list,保存了我们积攒的数据包(将要被merge的).然后每次进来的skb都会在这个链表里面进行查找,看是否需要merge。而gro_count表示当前的gro_list中的skb的个数。

struct napi_struct {

//................................................

//个数

unsigned int gro_count;

//................................................

//积攒的数据包

struct sk_buff *gro_list;

struct sk_buff *skb;

};

紧接着是gro最核心的一个数据结构napi_gro_cb,它是保存在skb的cb域中,它保存了gro要使用到的一些上下文,这里每个域kernel的注释都比较清楚。到后面我们会看到这些域的具体用途。

struct napi_gro_cb {

/* Virtual address of skb_shinfo(skb)->frags[0].page + offset. */

void *frag0;

/* Length of frag0. */

unsigned int frag0_len;

/* This indicates where we are processing relative to skb->data. */

int data_offset;

/* This is non-zero if the packet cannot be merged with the new skb. */

u16 flush;

/* Save the IP ID here and check when we get to the transport layer */

u16 flush_id;

/* Number of segments aggregated. */

u16 count;

/* Start offset for remote checksum offload */

u16 gro_remcsum_start;

/* jiffies when first packet was created/queued */

unsigned long age;

/* Used in ipv6_gro_receive() and foo-over-udp */

u16 proto;

/* This is non-zero if the packet may be of the same flow. */

u8 same_flow:1;

/* Used in tunnel GRO receive */

u8 encap_mark:1;

/* GRO checksum is valid */

u8 csum_valid:1;

/* Number of checksums via CHECKSUM_UNNECESSARY */

u8 csum_cnt:3;

/* Free the skb? */

u8 free:2;

#define NAPI_GRO_FREE 1

#define NAPI_GRO_FREE_STOLEN_HEAD 2

/* Used in foo-over-udp, set in udp[46]_gro_receive */

u8 is_ipv6:1;

/* Used in GRE, set in fou/gue_gro_receive */

u8 is_fou:1;

/* Used to determine if flush_id can be ignored */

u8 is_atomic:1;

/* Number of gro_receive callbacks this packet already went through */

u8 recursion_counter:4;

/* 1 bit hole */

/* used to support CHECKSUM_COMPLETE for tunneling protocols */

__wsum csum;

/* used in skb_gro_receive() slow path */

struct sk_buff *last;

};

每一层协议都实现了自己的gro回调函数,gro_receive和gro_complete,gro系统会根据协议来调用对应回调函数,其中gro_receive是将输入skb尽量合并到我们gro_list中。而gro_complete则是当我们需要提交gro合并的数据包到协议栈时被调用的。

下面就是ip层和tcp层对应的回调方法:

//kernel-4.18对应的回调已经不在struct net_protocol中了。

static struct packet_offload ip_packet_offload __read_mostly = {

.type = cpu_to_be16(ETH_P_IP),

.callbacks = {

.gso_segment = inet_gso_segment,

.gro_receive = inet_gro_receive,

.gro_complete = inet_gro_complete,

},

};

static const struct net_offload tcpv4_offload = {

.callbacks = {

.gso_segment = tcp4_gso_segment,

.gro_receive = tcp4_gro_receive,

.gro_complete = tcp4_gro_complete,

},

};

gro的入口函数是napi_gro_receive,它的实现很简单,就是将skb包含的gro上下文reset,然后调用__napi_gro_receive,最终通过napi_skb_finis来判断是否需要讲数据包feed进协议栈。

gro_result_t napi_gro_receive(struct napi_struct *napi, struct sk_buff *skb)

{

skb_mark_napi_id(skb, napi);

trace_napi_gro_receive_entry(skb);

skb_gro_reset_offset(skb);

return napi_skb_finish(dev_gro_receive(napi, skb), skb);

}

napi_skb_finish一开始已经介绍过了,这个函数主要是通过判断传递进来的ret(__napi_gro_receive的返回值),来决定是否需要feed数据进协议栈。它的第二个参数是前面处理过的skb。

这里再来看下skb_gro_reset_offset,首先要知道一种情况,那就是skb本身不包含数据(包括头也没有),而所有的数据都保存在skb_shared_info中(支持S/G的网卡有可能会这么做).此时我们如果想要合并的话,就需要将包头这些信息取出来,也就是从skb_shared_info的frags[0]中去的,在 skb_gro_reset_offset中就有做这个事情,而这里就会把头的信息保存到napi_gro_cb 的frags[0]中。并且此时frags必然不会在high mem,要么是线性区,要么是dma(S/G io)。 来看skb_gro_reset_offset。

void skb_gro_reset_offset(struct sk_buff *skb)

{

NAPI_GRO_CB(skb)->data_offset = 0;

NAPI_GRO_CB(skb)->frag0 = NULL;

NAPI_GRO_CB(skb)->frag0_len = 0;

//如果mac_header和skb->tail相等并且地址不在高端内存,则说明包头保存在skb_shinfo中,所以我们需要从frags中取得对应的数据包

if (skb->mac_header == skb->tail &&

!PageHighMem(skb_shinfo(skb)->frags[0].page)) {

// 可以看到frag0保存的就是对应的skb的frags的第一个元素的地址

// frag0的作用是: 有些包的包头会存在skb->frag[0]里面,gro合并时会调用skb_gro_header_slow将包头拉到线性空间中,那么在非线性skb->frag[0]中的包头部分就应该删掉。

NAPI_GRO_CB(skb)->frag0 =

page_address(skb_shinfo(skb)->frags[0].page) +

skb_shinfo(skb)->frags[0].page_offset;

//然后保存对应的大小。

NAPI_GRO_CB(skb)->frag0_len = skb_shinfo(skb)->frags[0].size;

}

}

接下来就是dev_gro_receive,它首先调用gro_list_prepare(此处由于4.18变化较大,和原博客有出入我已做了对应修改)。它主要是遍历gro_list,然后给same_flow赋值,这里要注意,same_flow是一个标记,表示某个skb是否有可能会和当前要处理的skb是相同的流,而这里的相同会在每层都进行判断,也就是在设备层,ip层,tcp层都会判断,这里就是设备层的判断了。这里的判断很简单,有2个条件:

设备是否相同

mac的头必须相等

如果上面两个条件都满足,则说明两个skb有可能是相同的flow,所以设置same_flow,以便与我们后面合并。

static void gro_list_prepare(struct napi_struct *napi, struct sk_buff *skb)

{

struct sk_buff *p;

unsigned int maclen = skb->dev->hard_header_len;

u32 hash = skb_get_hash_raw(skb);

//遍历gro_list,然后判断是否有可能两个skb 相似。

for (p = napi->gro_list; p; p = p->next) {

unsigned long diffs;

NAPI_GRO_CB(p)->flush = 0;

if (hash != skb_get_hash_raw(p)) {

NAPI_GRO_CB(p)->same_flow = 0;

continue;

}

diffs = (unsigned long)p->dev ^ (unsigned long)skb->dev;

diffs |= p->vlan_tci ^ skb->vlan_tci;

diffs |= skb_metadata_dst_cmp(p, skb);

diffs |= skb_metadata_differs(p, skb);

if (maclen == ETH_HLEN)

diffs |= compare_ether_header(skb_mac_header(p),

skb_mac_header(skb));

else if (!diffs)

diffs = memcmp(skb_mac_header(p),

skb_mac_header(skb),

maclen);

NAPI_GRO_CB(p)->same_flow = !diffs;

}

}

接下来回头看dev_gro_receive,这个函数我们分做两部分来看,第一部分是正常处理部分,第二部份是处理frag0的部分。

来看如何判断是否支持GRO,这里每个设备的features会在驱动初始化的时候被初始化,然后如果支持GRO,则会包括NETIF_F_GRO。 还有要注意的就是,gro不支持切片的ip包,因为ip切片的组包在内核的ip会做一遍,因此这里gro如果合并的话,没有多大意义,而且还增加复杂度。

在dev_gro_receive中会遍历对应的ptype(也就是协议的类链表,以前的blog有详细介绍),然后调用对应的回调函数,一般来说这里会调用文章开始说的ip_packet_type,也就是inet_gro_receive。

而 inet_gro_receive的返回值表示我们需要立刻feed 进协议栈的数据包,如果为空,则说明不需要feed数据包进协议栈。后面会分析到这里他的详细算法。

而如果当inet_gro_receive正确返回后,如果same_flow没有被设置,则说明gro list中不存在能和当前的skb合并的项,因此此时需要将skb插入到gro list中。这个时候的返回值就是HELD。

static enum gro_result dev_gro_receive(struct napi_struct *napi, struct sk_buff *skb)

{

struct sk_buff **pp = NULL;

struct packet_offload *ptype;

__be16 type = skb->protocol;

struct list_head *head = &offload_base;

int same_flow;

enum gro_result ret;

int grow;

if (netif_elide_gro(skb->dev))

goto normal;

gro_list_prepare(napi, skb);

rcu_read_lock();

list_for_each_entry_rcu(ptype, head, list) {

if (ptype->type != type || !ptype->callbacks.gro_receive)

continue;

skb_set_network_header(skb, skb_gro_offset(skb));

skb_reset_mac_len(skb);

NAPI_GRO_CB(skb)->same_flow = 0;

NAPI_GRO_CB(skb)->flush = skb_is_gso(skb) || skb_has_frag_list(skb);

NAPI_GRO_CB(skb)->free = 0;

NAPI_GRO_CB(skb)->encap_mark = 0;

NAPI_GRO_CB(skb)->recursion_counter = 0;

NAPI_GRO_CB(skb)->is_fou = 0;

NAPI_GRO_CB(skb)->is_atomic = 1;

NAPI_GRO_CB(skb)->gro_remcsum_start = 0;

/* Setup for GRO checksum validation */

switch (skb->ip_summed) {

case CHECKSUM_COMPLETE:

NAPI_GRO_CB(skb)->csum = skb->csum;

NAPI_GRO_CB(skb)->csum_valid = 1;

NAPI_GRO_CB(skb)->csum_cnt = 0;

break;

case CHECKSUM_UNNECESSARY:

NAPI_GRO_CB(skb)->csum_cnt = skb->csum_level + 1;

NAPI_GRO_CB(skb)->csum_valid = 0;

break;

default:

NAPI_GRO_CB(skb)->csum_cnt = 0;

NAPI_GRO_CB(skb)->csum_valid = 0;

}

pp = ptype->callbacks.gro_receive(&napi->gro_list, skb);//各协议注册的gro_receive

break;

}

rcu_read_unlock();

if (&ptype->list == head)

goto normal;

if (IS_ERR(pp) && PTR_ERR(pp) == -EINPROGRESS) {

ret = GRO_CONSUMED;

goto ok;

}

same_flow = NAPI_GRO_CB(skb)->same_flow;

ret = NAPI_GRO_CB(skb)->free ? GRO_MERGED_FREE : GRO_MERGED;

//如果返回值pp部位空,则说明pp需要马上被feed进协议栈

if (pp) {

struct sk_buff *nskb = *pp;

*pp = nskb->next;

nskb->next = NULL;

napi_gro_complete(nskb); //发往协议栈

napi->gro_count--;

}

//如果same_flow有设置,则说明skb已经被正确的合并,因此直接返回。

if (same_flow)

goto ok;

//查看是否有设置flush和gro list的个数是否已经超过限制

// BUG: 这里是有点不对的,因为这时的skb是比gro_list中的skb更晚到的,但是却被先feed进了协议栈

if (NAPI_GRO_CB(skb)->flush)

goto normal;

if (unlikely(napi->gro_count >= MAX_GRO_SKBS)) {

struct sk_buff *nskb = napi->gro_list;

/* locate the end of the list to select the 'oldest' flow */

while (nskb->next) {

pp = &nskb->next;

nskb = *pp;

}

*pp = NULL;

nskb->next = NULL;

napi_gro_complete(nskb); //进入协议栈

} else {

napi->gro_count++;

}

NAPI_GRO_CB(skb)->count = 1;

NAPI_GRO_CB(skb)->age = jiffies;

NAPI_GRO_CB(skb)->last = skb;

skb_shinfo(skb)->gso_size = skb_gro_len(skb);

skb->next = napi->gro_list;

napi->gro_list = skb;

ret = GRO_HELD;

pull:

grow = skb_gro_offset(skb) - skb_headlen(skb);

if (grow > 0)

gro_pull_from_frag0(skb, grow);

ok:

return ret;

normal:

ret = GRO_NORMAL;

goto pull;

}

接下来就是inet_gro_receive,这个函数是ip层的gro receive回调函数,函数很简单,首先取得ip头,然后判断是否需要从frag复制数据,如果需要则复制数据。

struct sk_buff **inet_gro_receive(struct sk_buff **head, struct sk_buff *skb)

{

const struct net_offload *ops;

struct sk_buff **pp = NULL;

struct sk_buff *p;

const struct iphdr *iph;

unsigned int hlen;

unsigned int off;

unsigned int id;

int flush = 1;

int proto;

//得到偏移

off = skb_gro_offset(skb);

//得到头的整个长度(mac+ip)

hlen = off + sizeof(*iph);

//得到ip头

iph = skb_gro_header_fast(skb, off);

//是否需要复制

if (skb_gro_header_hard(skb, hlen)) {

iph = skb_gro_header_slow(skb, hlen, off);

if (unlikely(!iph))

goto out;

}

然后就是一些校验工作,比如协议是否支持gro_reveive,ip头是否合法等等

proto = iph->protocol;

rcu_read_lock();

ops = rcu_dereference(inet_offloads[proto]);

//是否支持gro

if (!ops || !ops->callbacks.gro_receive)

goto out_unlock;

//ip头是否合法, iph->version = 4, iph->ipl = 5

if (*(u8 *)iph != 0x45)

goto out_unlock;

if (ip_is_fragment(iph))

goto out_unlock;

//ip头校验

if (unlikely(ip_fast_csum((u8 *)iph, 5)))

goto out_unlock;

然后就是核心的处理部分,它会遍历整个gro_list,然后进行same_flow和是否需要flush的判断。

这里ip层设置same_flow是根据下面的规则的:

1 4层的协议必须相同

2 tos域必须相同

3 源,目的地址必须相同

如果3个条件一个不满足,则会设置same_flow为0。 这里还有一个就是判断是否需要flush对应的skb到协议栈,这里的判断条件是这样子的。

1 ip包的ttl不一样

2 ip包的id顺序不对

3 如果是切片包

如果上面两个条件某一个满足,则说明skb需要被flush出gro。

不过这里要注意只有两个数据包是same flow的情况下,才会进行flush判断。原因很简单,都不是有可能进行merge的包,自然没必要进行flush了。

id = ntohl(*(__be32 *)&iph->id);

//判断是否需要切片

flush = (u16)((ntohl(*(__be32 *)iph) ^ skb_gro_len(skb)) | (id & ~IP_DF));

id >>= 16;

//开始遍历gro list

for (p = *head; p; p = p->next) {

struct iphdr *iph2;

u16 flush_id;

//如果上一层已经不可能same flow则直接继续下一个

if (!NAPI_GRO_CB(p)->same_flow)

continue;

iph2 = (struct iphdr *)(p->data + off);

/* The above works because, with the exception of the top

* (inner most) layer, we only aggregate pkts with the same

* hdr length so all the hdrs we'll need to verify will start

* at the same offset.

*/

//开始same flow的判断

if ((iph->protocol ^ iph2->protocol) |

((__force u32)iph->saddr ^ (__force u32)iph2->saddr) |

((__force u32)iph->daddr ^ (__force u32)iph2->daddr)) {

NAPI_GRO_CB(p)->same_flow = 0;

continue;

}

/* All fields must match except length and checksum. */

//开始flush的判断。这里注意如果不是same_flow的话,就没必要进行flush的判断。

NAPI_GRO_CB(p)->flush |=

(iph->ttl ^ iph2->ttl) |

(iph->tos ^ iph2->tos) |

((iph->frag_off ^ iph2->frag_off) & htons(IP_DF));

NAPI_GRO_CB(p)->flush |= flush;

/* We need to store of the IP ID check to be included later

* when we can verify that this packet does in fact belong

* to a given flow.

*/

flush_id = (u16)(id - ntohs(iph2->id));

/* This bit of code makes it much easier for us to identify

* the cases where we are doing atomic vs non-atomic IP ID

* checks. Specifically an atomic check can return IP ID

* values 0 - 0xFFFF, while a non-atomic check can only

* return 0 or 0xFFFF.

*/

if (!NAPI_GRO_CB(p)->is_atomic ||

!(iph->frag_off & htons(IP_DF))) {

flush_id ^= NAPI_GRO_CB(p)->count;

flush_id = flush_id ? 0xFFFF : 0;

}

/* If the previous IP ID value was based on an atomic

* datagram we can overwrite the value and ignore it.

*/

if (NAPI_GRO_CB(skb)->is_atomic)

NAPI_GRO_CB(p)->flush_id = flush_id;

else

NAPI_GRO_CB(p)->flush_id |= flush_id;

}

NAPI_GRO_CB(skb)->is_atomic = !!(iph->frag_off & htons(IP_DF));

NAPI_GRO_CB(skb)->flush |= flush;

skb_set_network_header(skb, off);

/* The above will be needed by the transport layer if there is one

* immediately following this IP hdr.

*/

/* Note : No need to call skb_gro_postpull_rcsum() here,

* as we already checked checksum over ipv4 header was 0

*/

//pull ip头进gro,这里更新data_offset

skb_gro_pull(skb, sizeof(*iph));

//设置传输层的头的位置

skb_set_transport_header(skb, skb_gro_offset(skb));

//调用传输层的reveive方法。

pp = call_gro_receive(ops->callbacks.gro_receive, head, skb);

out_unlock:

rcu_read_unlock();

out:

skb_gro_flush_final(skb, pp, flush);

return pp;

}

然后就是tcp层的gro方法,它的主要实现函数是tcp_gro_receive,他的流程和inet_gro_receiv类似,就是取得tcp的头,然后对gro list进行遍历,最终会调用合并方法。

首先来看gro list遍历的部分,它对same flow的要求就是source必须相同,如果不同则设置same flow为0.如果相同则跳到found部分,进行合并处理。

struct sk_buff **tcp_gro_receive(struct sk_buff **head, struct sk_buff *skb)

{

struct sk_buff **pp = NULL;

struct sk_buff *p;

struct tcphdr *th;

struct tcphdr *th2;

unsigned int len;

unsigned int thlen;

__be32 flags;

unsigned int mss = 1;

unsigned int hlen;

unsigned int off;

int flush = 1;

int i;

off = skb_gro_offset(skb);

hlen = off + sizeof(*th);

th = skb_gro_header_fast(skb, off);

if (skb_gro_header_hard(skb, hlen)) {

th = skb_gro_header_slow(skb, hlen, off);

if (unlikely(!th))

goto out;

}

thlen = th->doff * 4;

if (thlen < sizeof(*th))

goto out;

hlen = off + thlen;

if (skb_gro_header_hard(skb, hlen)) {

th = skb_gro_header_slow(skb, hlen, off);

if (unlikely(!th))

goto out;

}

skb_gro_pull(skb, thlen);

len = skb_gro_len(skb);

flags = tcp_flag_word(th);

//遍历gro list

for (; (p = *head); head = &p->next) {

//如果ip层已经不可能same flow则直接进行下一次匹配

if (!NAPI_GRO_CB(p)->same_flow)

continue;

th2 = tcp_hdr(p);

////判断源地址

if (*(u32 *)&th->source ^ *(u32 *)&th2->source) {

NAPI_GRO_CB(p)->same_flow = 0;

continue;

}

goto found;

}

goto out_check_final;

接下来就是当找到能够合并的skb的时候的处理,这里首先来看flush的设置,这里会有4个条件:

1 拥塞状态被设置(TCP_FLAG_CWR).

2 tcp的ack的序列号不匹配 (这是肯定的,因为它只是对tso或者说gso进行反向操作)

3 skb的flag和从gro list中查找到要合并skb的flag 如果他们中的不同位 不包括TCP_FLAG_CWR | TCP_FLAG_FIN | TCP_FLAG_PSH,这三个任意一个域。

4 tcp的option域不同

如果上面4个条件有一个满足,则会设置flush为1,也就是找到的这个skb(gro list中)必须被刷出到协议栈。

这里谈一下flags域的设置问题首先如果当前的skb设置了cwr,也就是发生了拥塞,那么自然前面被缓存的数据包需要马上被刷到协议栈,以便与tcp的拥塞控制马上进行。

而FIN和PSH这两个flag自然不需要一致,因为这两个和其他的不是互斥的。

found:

/* Include the IP ID check below from the inner most IP hdr */

flush = NAPI_GRO_CB(p)->flush;

//如果设置拥塞,则肯定需要刷出skb到协议栈

flush |= (__force int)(flags & TCP_FLAG_CWR);

//如果相差的域是除了这3个中的,就需要flush出skb

flush |= (__force int)((flags ^ tcp_flag_word(th2)) &

~(TCP_FLAG_CWR | TCP_FLAG_FIN | TCP_FLAG_PSH));

//ack的序列号必须一致

flush |= (__force int)(th->ack_seq ^ th2->ack_seq);

//tcp的option头必须一致

for (i = sizeof(*th); i < thlen; i += 4)

flush |= *(u32 *)((u8 *)th + i) ^

*(u32 *)((u8 *)th2 + i);

/* When we receive our second frame we can made a decision on if we

* continue this flow as an atomic flow with a fixed ID or if we use

* an incrementing ID.

*/

if (NAPI_GRO_CB(p)->flush_id != 1 ||

NAPI_GRO_CB(p)->count != 1 ||

!NAPI_GRO_CB(p)->is_atomic)

flush |= NAPI_GRO_CB(p)->flush_id;

else

NAPI_GRO_CB(p)->is_atomic = false;

mss = skb_shinfo(p)->gso_size;

// 0-1 = 0xFFFFFFFF, 所以skb的数据部分长度为0的包是不会被合并的

flush |= (len - 1) >= mss;

flush |= (ntohl(th2->seq) + skb_gro_len(p)) ^ ntohl(th->seq);

//如果flush有设置则不会调用 skb_gro_receive,也就是不需要进行合并,否则调用skb_gro_receive进行数据包合并

if (flush || skb_gro_receive(head, skb)) {

mss = 1;

goto out_check_final;

}

//更新p的头。到达这里说明合并完毕,因此需要更新合并完的新包的头。

tcp_flag_word(th2) |= flags & (TCP_FLAG_FIN | TCP_FLAG_PSH);

从上面我们可以看到如果tcp的包被设置了一些特殊的flag比如PSH,SYN这类的就必须马上把数据包刷出到协议栈。

下面就是最终的一些flags判断,比如第一个数据包进来都会到这里来判断。

out_check_final:

flush = len < mss;

flush |= (__force int)(flags & (TCP_FLAG_URG | TCP_FLAG_PSH |

TCP_FLAG_RST | TCP_FLAG_SYN |

TCP_FLAG_FIN));

if (p && (!NAPI_GRO_CB(skb)->same_flow || flush))

pp = head;

out:

NAPI_GRO_CB(skb)->flush |= (flush != 0);

return pp;

}

这里要知道每次我们只会刷出gro list中的一个skb节点,这是因为每次进来的数据包我们也只会匹配一个。因此如果遇到需要刷出的数据包,会在dev_gro_receive中先刷出gro list中的,然后再将当前的skb feed进协议栈。

最后就是gro最核心的一个函数skb_gro_receive,它的主要工作就是合并,它有2个参数,第一个是gro list中和当前处理的skb是same flow的skb,第二个就是我们需要合并的skb。

这里要注意就是farg_list,其实gro对待skb_shared_info和ip层切片,组包很类似,就是frags放Scatter-Gather I/O的数据包,frag_list放线性数据。这里gro 也是这样的,如果过来的skb支持Scatter-Gather I/O并且数据是只放在frags中,则会合并frags,如果过来的skb不支持Scatter-Gather I/O(数据头还是保存在skb中),则合并很简单,就是新建一个skb然后拷贝当前的skb,并将gro list中的skb直接挂载到farg_list。

先来看支持Scatter-Gather I/O的处理部分。

//一些需要用到的变量

struct sk_buff *p = *head;

struct sk_buff *nskb;

//当前的skb的 share_ino

struct skb_shared_info *skbinfo = skb_shinfo(skb);

//当前的gro list中的要合并的skb的share_info

struct skb_shared_info *pinfo = skb_shinfo(p);

unsigned int headroom;

unsigned int len = skb_gro_len(skb);

unsigned int offset = skb_gro_offset(skb);

unsigned int headlen = skb_headlen(skb);

//如果有frag_list的话,则直接去非Scatter-Gather I/O部分处理,也就是合并到frag_list.

if (pinfo->frag_list)

goto merge;

else if (headlen <= offset) {

//支持Scatter-Gather I/O的处理

skb_frag_t *frag;

skb_frag_t *frag2;

int i = skbinfo->nr_frags;

//这里遍历是从后向前。

int nr_frags = pinfo->nr_frags + i;

offset -= headlen;

if (nr_frags > MAX_SKB_FRAGS)

return -E2BIG;

//设置pinfo的frags的大小,可以看到就是加上skb的frags的大小

pinfo->nr_frags = nr_frags;

skbinfo->nr_frags = 0;

frag = pinfo->frags + nr_frags;

frag2 = skbinfo->frags + i;

//遍历赋值,其实就是地址赋值,这里就是将skb的frag加到pinfo的frgas后面。

do {

*--frag = *--frag2;

} while (--i);

//更改page_offet的值

frag->page_offset += offset;

//修改size大小

frag->size -= offset;

//更新skb的相关值

skb->truesize -= skb->data_len;

skb->len -= skb->data_len;

skb->data_len = 0;

NAPI_GRO_CB(skb)->free = 1;

//最终完成

goto done;

} else if (skb_gro_len(p) != pinfo->gso_size)

return -E2BIG;

这里gro list中的要被合并的skb我们叫做skb_s。

接下来就是不支持支持Scatter-Gather I/O(skb的头放在skb中)的处理。这里处理也比较简单,就是复制一个新的nskb,然后它的头和skb_s一样,然后将skb_s挂载到nskb的frag_list上,并且把新建的nskb挂在到gro list中,代替skb_s的位置,而当前的skb。

headroom = skb_headroom(p);

nskb = alloc_skb(headroom + skb_gro_offset(p), GFP_ATOMIC);

if (unlikely(!nskb))

return -ENOMEM;

//复制头

__copy_skb_header(nskb, p);

nskb->mac_len = p->mac_len;

skb_reserve(nskb, headroom);

__skb_put(nskb, skb_gro_offset(p));

//设置各层的头

skb_set_mac_header(nskb, skb_mac_header(p) - p->data);

skb_set_network_header(nskb, skb_network_offset(p));

skb_set_transport_header(nskb, skb_transport_offset(p));

__skb_pull(p, skb_gro_offset(p));

//复制数据

memcpy(skb_mac_header(nskb), skb_mac_header(p),

p->data - skb_mac_header(p));

//对应的gro 域的赋值

*NAPI_GRO_CB(nskb) = *NAPI_GRO_CB(p);

//可以看到frag_list被赋值

skb_shinfo(nskb)->frag_list = p;

skb_shinfo(nskb)->gso_size = pinfo->gso_size;

pinfo->gso_size = 0;

skb_header_release(p);

nskb->prev = p;

//更新新的skb的数据段

nskb->data_len += p->len;

nskb->truesize += p->len; // 应该改成 nskb->truesize += p->truesize; 更准确

nskb->len += p->len;

//将新的skb插入到gro list中

*head = nskb;

nskb->next = p->next;

p->next = NULL;

p = nskb;

merge:

if (offset > headlen) {

skbinfo->frags[0].page_offset += offset - headlen;

skbinfo->frags[0].size -= offset - headlen;

offset = headlen;

}

__skb_pull(skb, offset);

//将skb插入新的skb的(或者老的skb,当frag list本身存在)fraglist

// 这里是用p->prev来记录了p->fraglist的最后一个包,所以在gro向协议栈提交时最好加一句skb->prev = NULL;

p->prev->next = skb;

p->prev = skb;

skb_header_release(skb);

再次声明:本篇博客是对这个大佬博客的引用:感谢大佬。我只将其代码更新到了4.18.16,感谢大佬的分享。

©著作权归作者所有,转载或内容合作请联系作者 人面猴序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...沈念sama阅读 145,261评论 1赞 308死咒序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...沈念sama阅读 62,177评论 1赞 259救了他两次的神仙让他今天三更去死文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...开封第一讲书人阅读 96,329评论 0赞 214道士缉凶录:失踪的卖姜人 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...开封第一讲书人阅读 41,490评论 0赞 184港岛之恋(遗憾婚礼)正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...茶点故事阅读 49,353评论 1赞 262恶毒庶女顶嫁案:这布局不是一般人想出来的文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...开封第一讲书人阅读 39,028评论 1赞 179城市分裂传说那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...沈念sama阅读 30,611评论 2赞 276双鸳鸯连环套:你想象不到人心有多黑文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...开封第一讲书人阅读 29,383评论 0赞 171万荣杀人案实录序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...沈念sama阅读 32,749评论 0赞 215护林员之死正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...茶点故事阅读 29,460评论 2赞 219白月光启示录正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...茶点故事阅读 30,814评论 1赞 232活死人序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...沈念sama阅读 27,255评论 2赞 215日本核电站爆炸内幕正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...茶点故事阅读 31,752评论 3赞 214男人毒药:我在死后第九天来索命文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...开封第一讲书人阅读 25,685评论 0赞 9一桩弑父案,背后竟有这般阴谋文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...开封第一讲书人阅读 26,114评论 0赞 170情欲美人皮我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...沈念sama阅读 33,747评论 2赞 234代替公主和亲正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...茶点故事阅读 33,901评论 2赞 238推荐阅读更多精彩内容面试题Swift1> Swift和OC的区别1.1> Swift没有地址/指针的概念1.2> 泛型1.3> 类型严谨 对...cosWriter阅读 11,018评论 1赞 32Tcpdump简介 用简单的话来定义tcpdump,就是:dump the traffic on a network,根据使用者...JasonShi6306421阅读 1,200评论 0赞 1Chapter 3. 用户界面3.1. 介绍 现在,您已经安装了Wireshark并有可能热衷于开始捕捉您的第一个数据包。在接下来的章节中,我们...wwyyzz阅读 1,312评论 0赞 1Linux tcpdump命令详解简介 用简单的话来定义tcpdump,就是:dump the traffic on a network,根据使用者...保川阅读 5,899评论 1赞 13NBA大结局?意料之中以侮辱性的迷你中产薪资加入一支4年3冠的王朝球队,表妹近期成为了众矢之的。估计世界上除了勇蜜之外的NBA球迷都在吐...兔说八道阅读 347评论 0赞 0评论0赞赞1赞赞赏更

怎么提高网络应用性能?让DPDK GRO和GSO来帮你!-腾讯云开发者社区-腾讯云

网络应用性能?让DPDK GRO和GSO来帮你!-腾讯云开发者社区-腾讯云Linux阅码场怎么提高网络应用性能?让DPDK GRO和GSO来帮你!关注作者腾讯云开发者社区文档建议反馈控制台首页学习活动专区工具TVP最新优惠活动文章/答案/技术大牛搜索搜索关闭发布登录/注册首页学习活动专区工具TVP最新优惠活动返回腾讯云官网Linux阅码场首页学习活动专区工具TVP最新优惠活动返回腾讯云官网社区首页 >专栏 >怎么提高网络应用性能?让DPDK GRO和GSO来帮你!怎么提高网络应用性能?让DPDK GRO和GSO来帮你!Linux阅码场关注发布于 2020-07-22 11:49:572.7K0发布于 2020-07-22 11:49:57举报文章被收录于专栏:LINUX阅码场LINUX阅码场背景目前,有大量的网络应用在处理数据包的时候只需要处理数据包头,而不会操作数据负载部分,例如防火墙、TCP/IP协议栈和软件交换机。对这类网络应用而言, 包头处理产生的开销(称为“per-packet overhead”)占了整体开销的大部分。因此,如何减少包头处理开销是优化这类应用性能的关键。

减少包头处理开销最直接的方法:减少数据包数量如何减少包数量?

增大Maximum Transmission Unit (MTU)。在数据量一定的情况下,使用大MTU的数据包可携带更多数据,从而减少了包的总量。但MTU值依赖于物理链路,我们无法保证数据包经过的所有链路均使用大MTU。利用网卡特性:Large Receive Offload (LRO),UDP Fragmentation Offload (UFO)和TCP Segmentation Offload (TSO)。如图1所示,LRO将从物理链路收到的TCP包(如1500B)合并为长度更长的TCP包(如64KB);UFO和TSO将上层应用发送的长数据负载的UDP和TCP包(如64KB)拆分成长度更短的数据包(如1500B),以满足物理链路的MTU限制。通过在网卡上进行包合并和拆分,在不需要任何CPU开销的情况下,上层应用就可以处理数量大大减少的大包。然而,LRO、TSO和UFO通常只能处理TCP和UDP包,而且并非所有的网卡都支持这些特性。软件包合并 (Generic Receive Offload,GRO)和包拆分 (Generic Segmentation Offload,GSO)。与前两种方法相比,GRO和GSO有两个优点:第一,不依赖于物理链路和网卡;第二,能够支持更多的协议类型,如VxLAN和GRE。图1. LRO、UFO和TSO工作原理为了帮助基于DPDK的应用程序(如Open vSwitch)减少包头处理开销,DPDK分别于17.08和17.11支持了GRO和GSO。如图2所示, GRO和GSO是DPDK中的两个用户库,应用程序直接调用它们进行包合并和分片。图2. DPDK GRO和DPDK GSO1GRO库和GSO库结构

图3描绘了GRO库和GSO库的结构。根据数据包类型,GRO库定义了不同的GRO类型。每一种GRO类型负责合并一种类型的数据包,如TCP/IPv4 GRO处理TCP/IPv4数据包。同样的,GSO库也定义了不同的GSO类型。GRO库和GSO库分别根据MBUF的packet_type域和ol_flags域将输入的数据包交给对应的GRO和GSO类型处理。图3. GRO库和GSO库的框架2如何使用GRO库和GSO库?使用GRO和GSO库十分简单。如图4所示,只需要调用一个函数便可以对包进行合并和分片。图4. 代码示例为了支持不同的用户场景,GRO库提供了两组API:轻量模式API和重量模式API,如图5所示。轻量模式API应用于需要快速合并少量数据包的场景,而重量模式API则用于需要细粒度地控制合包并需要合并大量数据包的场景。图5. 轻量模式API和重量模式API3DPDK GRO的合包算法算法挑战在高速的网络环境下,高开销的合包算法很可能会导致网卡丢包。包乱序(“Packet Reordering”)增加了合包难度。例如Linux GRO无法合并乱序的数据包。这就要求DPDK GRO的合包算法:足够轻量以适应高速的网络环境能够合并乱序包基于Key的合包算法为解决上述两点挑战,DPDK GRO采用基于Key的合包算法,其流程如图6所示。对新到的数据包,首先按照流(“flow”)对其进行分类,再在其所在的流中寻找相邻的数据包(“neighbor”)进行合并。若无法找到匹配的流,就插入一条新流并将数据包存储到新流中。若无法找到邻居,则将数据包存储到对应的流中。基于Key的合包算法有两个特点。首先,通过流分类来加速数据包的合并是十分轻量的一种做法;其次,保存无法合并的数据包(如乱序包)使得之后对其进行合并成为可能,故减轻了包乱序对合包带来的影响。图6. 基于Key的合包算法流程例如,TCP/IPv4 GRO使用源和目的Ethernet地址、IP地址、TCP端口号以及TCP Acknowledge Number定义流,使用TCP Sequence Number和IP ID决定TCP/IPv4包是否为邻居。若两个TCP/IPv4的数据包能够合并,则它们必须属于同一个流,并且TCP序号和IP ID必须连续。4DPDK GSO的分片策略分片流程

如图7所示,将一个数据包分片有3个步骤。首先,将包的数据负载分成许多长度更小的部分;其次,为每一个数据负载部分添加包头(新形成的数据包称为GSO Segment);最后,为每个GSO segment更新包头(如TCP Sequence Number)。图7. GSO分片流程GSO Segment的结构

生成一个GSO Segment的最简单方法就是拷贝包头和数据负载部分。但频繁的数据拷贝会降低GSO性能,因此,DPDK GSO采用了一种基于零拷贝的数据结构——Two-part MBUF——来组织GSO Segment。如图8所示,一个Two-part MBUF由一个Direct MBUF和多个Indirect MBUF组成。Direct MBUF用来存储包头,Indirect MBUF则类似于指针,指向数据负载部分。利用Two-part MBUF,生成一个GSO Segment仅需拷贝长度较短的包头,而不需要拷贝较长的数据负载部分。图8. Two-part MBUF的结构GRO库和GSO库的状态目前,GRO库还处于一个初期阶段,仅对使用最广泛的TCP/IPv4数据包提供了合包支持。GSO库则支持更丰富的包类型,包括TCP/IPv4、VxLAN和GRE。作者简介胡嘉瑜,毕业于中国科学技术大学,现为英特尔软件工程师,主要从事DPDK中GRO、GSO和虚拟化方向的研发。本文参与 腾讯云自媒体分享计划,分享自微信公众号。原始发表:2020-07-21,如有侵权请联系 cloudcommunity@tencent.com 删除开源tcp/ipapi编程算法本文分享自 Linux阅码场 微信公众号,前往查看如有侵权,请联系 cloudcommunity@tencent.com 删除。本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!开源tcp/ipapi编程算法评论登录后参与评论0 条评论热度最新登录 后参与评论推荐阅读LV.关注文章0获赞0领券社区专栏文章阅读清单互动问答技术沙龙技术视频团队主页腾讯云TI平台活动自媒体分享计划邀请作者入驻自荐上首页技术竞赛资源技术周刊社区标签开发者手册开发者实验室关于社区规范免责声明联系我们友情链接腾讯云开发者扫码关注腾讯云开发者领取腾讯云代金券热门产品域名注册云服务器区块链服务消息队列网络加速云数据库域名解析云存储视频直播热门推荐人脸识别腾讯会议企业云CDN加速视频通话图像分析MySQL 数据库SSL 证书语音识别更多推荐数据安全负载均衡短信文字识别云点播商标注册小程序开发网站监控数据迁移Copyright © 2013 - 2024 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有 深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569腾讯云计算(北京)有限责任公司 京ICP证150476号 |  京ICP备11018762号 | 京公网安备号11010802020287问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档Copyright © 2013 - 2024 Tencent Cloud.All Rights Reserved. 腾讯云 版权所有登录 后参与评论00

GRO的实现 (linux网络子系统学习 第七节)-CSDN博客

>

GRO的实现 (linux网络子系统学习 第七节)-CSDN博客

GRO的实现 (linux网络子系统学习 第七节)

最新推荐文章于 2024-03-06 19:39:57 发布

weixin_33698823

最新推荐文章于 2024-03-06 19:39:57 发布

阅读量364

收藏

4

点赞数

文章标签:

网络

原文链接:http://blog.51cto.com/yaoyang/1303010

版权

GRO (generic receive offload)

概述:

GRO是在协议栈接收报文时进行减负的一种处理方式,该方式在设计上考虑了多种协议报文。主要原理是在接收端通过把多个相关的报文(比如TCP分段报文)组装成一个大的报文后再传送给协议栈进行处理,因为内核协议栈对报文的处理都是对报文头部进行处理,如果相关的多个报文合并后只有一个报文头,这样就减少了协议栈处理报文个数,加快协议栈对报文的处理速度。

GRO功能对到本机的报文能起到一定的加速作用,但如果linux 运行在转发设备上,一般不需要使用GRO功能,这时使用GRO功能反而会降低处理速度。

GRO功能和只是针对NAPI类型的驱动,网卡驱动支持GRO要调用内核提供的GRO函数进行收包。并且该功能和网络设备硬件无关,是纯软件实现的。

设计需求:

1、需要根据一定的规则进行报文的合并。需要支持各种协议的合并规则。linux实现中是要求支持GRO功能的协议自己实现自己的合并函数,gro根据报文类型调用相应的合并函数。

2、对等待合并的报文应该进行缓存。等合并好后再送进协议栈进行处理。linux实现中在每个NAPI实例中放有一个等待合并的skb队列gro_list。

数据结构:

1、NAPI中GRO相关的字段:

struct napi_struct

{

unsigned int gro_count; //gro_list 上挂的skb 个数。

struct sk_buff *gro_list; //等待合并的skb 链表

}

2、每个协议中定义自己的GRO接收合并函数和合并后处理函数。

接收合并函数定义:

struct sk_buff**(*gro_receive)(struct sk_buff **head,struct sk_buff *skb);

参数:

head:等待合并的skb链表头

skb:接收到的skb。

返回值:

如果为空,表示报文被合并后不需要现在送入协议栈。

如果不为空,表示返回的报文需要立即送入协议栈。

合并后处理函数定义:

int(*gro_complete)(struct sk_buff *skb);

该函数对合并好的报文进行进一步加工,比如更新校验和。

3、GRO功能使用skb结构体内私有空间cb[48]来存放gro所用到的一些信息。

定义结构体struct napi_gro_cb

struct napi_gro_cb

{

/*指向存在skb_shinfo(skb)->frag[0].page页的数据的头部,

GRO使用过程中,如果skb是线性的,就置为空。

如果是非线性的并且报文头部全部存在非线性区中,

就指向页中的数据起始部分

*/

void *frag0;

/*第一页中数据的长度,如果frag0 字段不为空,

就设置该字段,否则为0。( Length of frag0.)

*/

unsigned int frag0_len;

/*This indicates where we are processing

relative to skb->data.

表明skb->data到GRO需要处理的数据区的偏移量。

因为在GRO合并处理过程中skb->data是不能被改变的,

所以需要使用该字段来记录一下偏移量。

GRO处理过程中根据该记录值快速找到要处理的数据部分。

比如进入ip层进行GRO处理,这时skb->data指向ip 头,

而ip层的gro 正好要处理ip头,这时偏移量就为0.

进入传输层后进行GRO处理,这时skb->data还指向ip头,

而tcp层gro要处理tcp头,这时偏移量就是ip头部长度。

*/

int data_offset;

/*This is non-zero if the packet may be of the same flow.

标记挂在napi->gro_list上的报文是否跟现在的报文进行匹配。

每层的gro_receive都设置该标记位。

接收到一个报文后,使用该报文和挂在napi->gro_list 上

的报文进行匹配。

在链路层,使用dev 和 mac头进行匹配,如果一样表示两个报文是通一个

设备发过来的,就标记napi->gro_list上对应的skb的same为1.

到网络层,再进一步进行匹配时,只需跟napi->list上刚被链路层标记

same为1的报文进行网络层的匹配即可,不需再跟每个报文进行匹配。

如果网络层不匹配,就清除该标记。

到传输层,也是只配置被网络层标记same为1 的报文即可。

这样设计为的是减少没必要的匹配操作

*/

int same_flow;

/*This is non-zero if the packet cannot be merged

with the new skb.

如果该字段不为0,表示该数据报文没必要再等待合并,

可以直接送进协议栈进行处理了

*/

int flush;

/*该报文被合并过的次数 ,Number of segments aggregated. */

int count;

/* Free the skb? ,是否该被丢弃*/

int free;

};

#define NAPI_GRO_CB(skb) ((struct napi_gro_cb *)(skb)->cb)

NAPI_GRO_CB(skb) 的初始化:

skb_reset_offset() 来重置gro 的 cb区域。如果是skb非线性的,并且本身不包含数据(包括头也没有),而所有的数据都保存在skb_shared_info中(支持S/G的网卡有可能会这么做)。

因为合并报文时需要报文头的信息,这时报文头是存在skb_shared_info的frags[0]中的,我们使用指针指向正确的报文头部。

void skb_gro_reset_offset(struct sk_buff *skb)

{

NAPI_GRO_CB(skb)->data_offset = 0;

NAPI_GRO_CB(skb)->frag0 = NULL;

NAPI_GRO_CB(skb)->frag0_len = 0;

/*如果skb 不包括数据并且skb_shinfo(skb)->frags[0].page 不在

高端内存中,表示报文头存在skb_shinfo(skb)->frags[0].page中

*/

if (skb->mac_header == skb->tail &&

!PageHighMem(skb_shinfo(skb)->frags[0].page))

{

NAPI_GRO_CB(skb)->frag0 =

page_address(skb_shinfo(skb)->frags[0].page) +

skb_shinfo(skb)->frags[0].page_offset;

NAPI_GRO_CB(skb)->frag0_len =

skb_shinfo(skb)->frags[0].size;

}

}

GRO在如下地方将报文送进协议栈进行处理:

1、当napi的循环执行完毕时,也就是执行napi_complete的时候,调用napi_gro_flush来把能送协议栈的报文送给协议栈。一般调用napi_complete时,是NAPI一次轮询就处理完了全部的报文,这时短期内网卡可能不会进行报文的接收,所有要把napi->gro_list上的报文都送到协议栈,不用再等待合并后再送了。

void napi_complete(struct napi_struct *n)

{

......

/*把napi->gro_list上的所有报文调用napi_gro_complete都

送给协议栈,并清空grp_list

*/

napi_gro_flush(n);

......

}

void napi_gro_flush(struct napi_struct *napi)

{

struct sk_buff *skb, *next;

for (skb = napi->gro_list; skb; skb = next)

{

next = skb->next;

skb->next = NULL;

napi_gro_complete(skb);

}

napi->gro_count = 0;

napi->gro_list = NULL;

}

2、在napi_skb_finish里,他会通过判断__napi_gro_receive的返回值,来决定是需要将数据包立即送进进协议栈还是保存起来。

int napi_skb_finish(int ret, struct sk_buff *skb)

{

int err = NET_RX_SUCCESS;

switch (ret)

{

/*如果返回NORMAL,就把报文送给协议栈进行处理*/

case GRO_NORMAL:

return netif_receive_skb(skb);

/*如果报文经过检查被丢弃了,释放内存并直接返回*/

case GRO_DROP:

err = NET_RX_DROP;

/*如果报文被合并了,这时报文已经被copy走了,

释放该报文占用的内存

*/

case GRO_MERGED_FREE:

kfree_skb(skb);

break;

}

return err;

}

GRO 的收包函数:

支持GRO功能的网卡驱动必须支持NAPI接口并调用GRO的专用接收函数napi_gro_receive()来把报文送给协议栈进行处理。

int napi_gro_receive(struct napi_struct *napi,

struct sk_buff *skb)

{

skb_gro_reset_offset(skb);

return napi_skb_finish(__napi_gro_receive(napi, skb), skb);

}

napi_skb_finish根据__napi_gro_receive(napi, skb)函数返回的结果,来处理报文。如果合并完成或不需要gro处理,返回GRO_NORMAL。

__napi_gro_receive()算是链路层上实现的gro_receive函数,详解见下文。

转载于:https://blog.51cto.com/yaoyang/1303010

优惠劵

weixin_33698823

关注

关注

0

点赞

4

收藏

觉得还不错?

一键收藏

知道了

0

评论

GRO的实现 (linux网络子系统学习 第七节)

GRO (generic receive offload)概述:GRO是在协议栈接收报文时进行减负的一种处理方式,该方式在设计上考虑了多种协议报文。主要原理是在接收端通过把多个相关的报文(比如TCP分段报文)组装成一个大的报文后再传送给协议栈进行处理,因为内核协议栈对报文的处理都是对报文头部进行处理,如果相关的多个报文合并后只有一个报文头,这样就减少了协议栈处理报文个数,加快...

复制链接

扫一扫

linux内核协议栈之GRO (Generic Receive Offload)

07-17

1256

GRO(Generic Receive Offload)从软件层面实现将多个 TCP/UDP 数据包聚合在一个skb结构,然后作为一个大数据包交付给上层的网络协议栈,以减少上层协议栈处理skb的开销,提高系统接收数据包的性能。

dREG:使用GRO-seq和PRO-seq检测监管元素

05-09

dREG

使用GRO-seq数据检测调节性DNA序列。

在线计算网关

我们提供了在GPU服务器上运行dREG的计算网关,用户无需安装任何软件,只需上传bigWig文件并等待结果即可,这非常简单。 请单击链接尝试此站点:

或者

(新的)

在dREG网关上运行数据之前,请在检查服务器状态。

Exchange电子邮件用户的重要说明:

Exchange电子邮件系统可能会隔离所有电子邮件,包括单词“密码”或链接中的其他敏感内容。 ( )。

不幸的是,该垃圾邮件策略隔离了来自dREG网关的某些电子邮件。 通常,这些隔离的电子邮件不会传递到电子邮件箱,因此无法在任何电子邮件文件夹(包括垃圾邮件,垃圾邮件或收件箱)中进行检查。 如果您发现来自dREG网关的电子邮件未发送到您的电子邮件框中,请联系电子邮件系统的管理员。 对于康奈尔(Cornell)电子邮件,请检查以下链接:

抽象的

鉴定调节转

参与评论

您还未登录,请先

登录

后发表或查看评论

linux kernel 网络协议栈之GRO

shage001314的专栏

06-15

1968

GRO(Generic receive offload)在内核2.6.29之后合并进去的,作者是一个华裔Herbert Xu ,GRO的简介可以看这里:

http://lwn.net/Articles/358910/

先来描述一下GRO的作用,GRO是针对网络接受包的处理的,并且只是针对NAPI类型的驱动,因此如果要支持GRO,不仅要内核支持,而且驱动也必须调用相应的借口,用ethtool -

ixgbe网卡驱动 Ⅳ----收发包流程详解

06-01

3293

目录

1 ixgbe_ring 结构

2 ixgbe 的中断上下部

2.1 ixgbe硬件中断入口 ixgbe_msix_clean_rings

2.2 ixgbe硬件中断入口 ixgbe_intr

3 ixgbe 的中断下部

3.1所有收包软中断入口函数 net_rx_action

3.2注册poll函数 ixgbe_poll

3.2.1 收包队列处理函数ixgbe_clean_rx_irq

3.2.2 发包队列处理函数ixgbe_clean_tx_irq

1 ixgbe_r...

Linux NAPI机制分析

zheng的博客

06-25

4507

1、概述

在NAPI之前,网卡每收到一个包就会触发一个中断通知cpu读取数据包,当数据包比较多时,中断数过多势必影响cpu性能,因此Linux引入NAPI机制,NAPI就是在收到中断后,先将网卡收包模式切换成poll模式,等收包完成后重新进入中断模式,本节主要分析Linux的NAPI实现机制。

NAPI的主要流程如下图,物理网卡收到包后触发irq中断通知cpu(触发中断后,默认disable该中断),中断上半部处理里将网卡设备的napi->poll_list加入到softnet_data->

Linux网络协议栈:NAPI机制与处理流程分析(图解)

maimang1001的专栏

03-11

3982

Linux网络协议栈:NAPI机制与处理流程分析(图解)_RToax-CSDN博客_netif_napi_addTable of ContentsNAPI机制NAPI缺陷使用 NAPI 先决条件非NAPI帧的接收netif_rx -将网卡中收到的数据包放到系统中的接收队列中enqueue_to_backlog____napi_schedule函数NAPI方式NAPI帧的接收NAPI接口struct napi_struct结构 -内核处理软中断的入口netif_napi_add函数 -驱动初始时向内核注册软软

NAPI(New API)的一些浅见

u014044624的博客

12-18

637

NAPI

linux 网络 指示灯 亮,Linux网络子系统中GRO的实现

weixin_32515741的博客

05-05

553

GRO (generic receive offload)GRO是在协议栈接收报文时进行减负的一种处理方式,该方式在设计上考虑了多种协议报文。主要原理是在接收端通过把多个相关的报文(比如TCP分段报文)组装成一个大的报文后再传送给协议栈进行处理,因为内核协议栈对报文的处理都是对报文头部进行处理,如果相关的多个报文合并后只有一个报文头,这样就减少了协议栈处理报文个数,加快协议栈对报文的处理速度。GR...

GSO/TSO/GRO等对VirtIO虚机的网络性能影响分析(by quqi99)

weixin_34260991的博客

08-03

329

作者:张华  发表于:2016-04-05版权声明:能够随意转载,转载时请务必以超链接形式标明文章原始出处和作者信息及本版权声明( http://blog.csdn.net/quqi99 )IP层叫分片,TCP/UDP层叫分段。网卡能做的事(TCP/UDP组包校验和分段,IP加入包头校验与分片)尽量往网卡做,网卡不能做的也尽量迟后分片(发送)或提前合并片(接收)来降低在网络栈中传输和处理的包数...

网络收包流程-软中断中process_backlog和poll方式处理流程(二)

hzj_001的博客

09-10

1997

在硬中断中触发了软中断后,最终会调用软中断处理函数 net_rx_action,注意:硬中断流程触发软中断后退出中断上下文,但是并不会立刻进入软中断,具体的实现需要了解软中断处理流程。

1.软中断处理函数net_rx_action

具体实现详解:

static void net_rx_action(struct softirq_action *h)

{

struct ...

电子测量中的基于VB的在线红外测温系统

12-09

利用VB的微机串口通信功能,实现计算机和红外测温仪的通讯连接,通过Windows下的数据采集和工业控制应用软件,实现了数据的显示、判断、报警、保存和数据库的自动更新功能。测温系统投用后,运行稳定,轧件温度的...

raspi-gro:使用树莓派的自动种植系统

03-08

raspi-gro:使用树莓派的自动种植系统

网卡gro、gso功能调试.doc

07-28

网卡gro、gso功能调试,适用于网卡性能调优

daily.dev Where developers gro-3.28.2.zip

12-30

名称:daily.dev Where developers gro ---------------------------------------- 版本:3.28.2 作者:https://daily.dev/ 分类:开发者工具 ---------------------------------------- 概述:获取为您量身定制的最...

网络安全(黑客)——自学2024

Y7472821的博客

03-06

466

在实际的渗透测试过程中,面对复杂多变的网络环境,当常用工具不能满足实际需求的时候,往往需要对现有工具进行扩展,或者编写符合我们要求的工具、自动化脚本,这个时候就需要具备一定的编程能力。当然,产生这样的疑惑并不奇怪,毕竟网络安全这个专业在2017年才调整为国家一级学科,而且大众对于网络安全的认知度不高,了解最多的可能就是个人信息泄露或者社区经常宣传的国家反诈APP。所以,如果你对网络安全感兴趣,即使没有天生的特质也可以一试,很多人都是在学习网络安全的过程中慢慢锻炼的。到此为止,大概1个月的时间。

网络编程(IP、端口、协议、UDP、TCP)【详解】

qq_1424674116的博客

03-02

3777

网络编程是可以让设备中的程序与网络上其他设备中的程序进行数据交互的技术。

网络安全(黑客)—自学2024

最新发布

dexi113的博客

03-06

391

我在这里可以很肯定地告诉你:"网络安全有很好的发展前景,前沿网络安全技术即将崛起,或者说已经崛起"。

win11环境下使用hane WIN NFS Server搭建nfs服务

xh_w20的专栏

03-04

552

win11环境下搭建nfs,使用hane win nfs server作为服务端,客户端越是windows使用。

【电路笔记】-RC网络-时间常数

视觉与物联智能

03-02

984

Tau τ是 RC 电路在阶跃变化输入条件下从一种稳态条件变为另一种稳态条件所需的时间常数。

网卡 gro 函数处理流程

06-09

网卡GRO(Generic Receive Offload)函数处理流程一般如下:

1. 网卡接收到数据包。

2. 网卡驱动将数据包放入网卡接收队列。

3. 内核网络协议栈从网卡接收队列中获取数据包。

4. 内核网络协议栈将数据包进行预处理,例如校验和验证、IP分片重组等。

5. GRO函数判断数据包是否符合GRO处理条件,例如数据包的协议类型是否为TCP、数据包的分片偏移是否为0等。

6. 如果数据包符合GRO处理条件,GRO函数将数据包放入GRO队列,等待下一步的处理。

7. 如果数据包不符合GRO处理条件,数据包将被直接放入协议栈进行进一步处理。

8. GRO函数定时或者GRO队列满了时,会对GRO队列中的数据包进行合并,生成一个大数据包。

9. GRO函数将合并后的大数据包放回到协议栈中,等待进一步处理。

需要注意的是,不同的网卡驱动实现可能有所不同,GRO函数的实现也可能有所不同。此外,GRO函数只针对TCP协议的数据包进行处理,对于其他协议的数据包不会进行GRO处理。

“相关推荐”对你有帮助么?

非常没帮助

没帮助

一般

有帮助

非常有帮助

提交

weixin_33698823

CSDN认证博客专家

CSDN认证企业博客

码龄8年

暂无认证

135

原创

-

周排名

109万+

总排名

130万+

访问

等级

8012

积分

5657

粉丝

267

获赞

36

评论

1478

收藏

私信

关注

热门文章

EndNote在Word中插入文献不能自动生成编号 - 解决方案

21999

CentOS7内核升级、降级、指定内核版本,查看内核信息教程

15509

ping的时候第一个包为什么会丢?

12693

U-Net 和 ResNet:长短跳跃连接的重要性(生物医学图像分割)

...

11639

配置接口IP地址并通过静态路由、默认路由配置实现全网互通!

10557

最新评论

利用h5,chart.js监测手机三轴加速度,用以研究计步算法等

Laurentta:

请问为什么我得到的加速度只有一位小数的精度

hexo 给博客添加版权、共享按钮、打赏

weixin_55394975:

添加共享按钮之后也没有shareto出现呢。

WordPress Hestia主题的定制和优化

weixin_45725132:

所以该怎么删页脚

VC++使用HOOK API 屏蔽PrintScreen键截屏以及QQ和微信默认热键截屏

FIRELARVA:

不行啊,还是会截屏

WordPress Hestia主题的定制和优化

陈政浩 Howard:

大佬 求教 如何修改hestia的页脚啊???

您愿意向朋友推荐“博客详情页”吗?

强烈不推荐

不推荐

一般般

推荐

强烈推荐

提交

最新文章

【C#学习笔记】指针使用

matlab练习程序(k-means聚类)

Ext JS 4 的类系统

2019年385篇

2018年653篇

2017年899篇

2016年539篇

2015年434篇

2014年329篇

2013年359篇

2012年293篇

2011年208篇

2010年153篇

2009年124篇

2008年103篇

2007年75篇

2006年39篇

2005年24篇

2004年7篇

目录

目录

最新文章

【C#学习笔记】指针使用

matlab练习程序(k-means聚类)

Ext JS 4 的类系统

2019年385篇

2018年653篇

2017年899篇

2016年539篇

2015年434篇

2014年329篇

2013年359篇

2012年293篇

2011年208篇

2010年153篇

2009年124篇

2008年103篇

2007年75篇

2006年39篇

2005年24篇

2004年7篇

目录

评论

被折叠的  条评论

为什么被折叠?

到【灌水乐园】发言

查看更多评论

添加红包

祝福语

请填写红包祝福语或标题

红包数量

红包个数最小为10个

红包总金额

红包金额最低5元

余额支付

当前余额3.43元

前往充值 >

需支付:10.00元

取消

确定

下一步

知道了

成就一亿技术人!

领取后你会自动成为博主和红包主的粉丝

规则

hope_wisdom 发出的红包

实付元

使用余额支付

点击重新获取

扫码支付

钱包余额

0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。 2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值

linux 3.10 gro的理解和改进 - _备忘录 - 博客园

linux 3.10 gro的理解和改进 - _备忘录 - 博客园

会员

周边

新闻

博问

AI培训

云市场

所有博客

当前博客

我的博客

我的园子

账号设置

简洁模式 ...

退出登录

注册

登录

安庆

导航

博客园

首页

新随笔

联系

订阅

管理

公告

linux 3.10 gro的理解和改进

gro,将同一个flow的一定时间范围之内的skb进行合并,减少协议栈的消耗,用于收包性能提升。gro网上的资料很多,但是都很少谈到gro的改进,刚好身边有个同事也想改这块的内容,

所以将最近看的gro内容总结一下,作为记录。

gro的层次,很少有资料提到,可能是大牛们觉得太简单,但我还是记录一下,毕竟我基础不好。

 先看关键的数据结构,然后分析流程:

为了在skb中记录相关的gro信息,使用了skb的cb字段。

crash> napi_gro_cb

struct napi_gro_cb {

void *frag0;

unsigned int frag0_len;

int data_offset;

u16 flush;

u16 flush_id;

u16 count;

u16 gro_remcsum_start;

unsigned long age;

u16 proto;

u8 encap_mark : 1;

u8 csum_valid : 1;

u8 csum_cnt : 3;

u8 is_ipv6 : 1;

u8 free : 2;

u8 same_flow : 1;

u8 recursion_counter : 4;

u8 is_atomic : 1;

__wsum csum;

struct sk_buff *last;

}

SIZE: 48

48字节的cb字段,被用完了。

所有的packet 级别的gro的类型,放在一个公共链表头 offload_base 变量中管理,我测试的系统中的packet级别的gro类型有:

crash> list packet_offload.list -H offload_base -s packet_offload

ffffffff81b41bc0

struct packet_offload {

type = 8,

priority = 0,

callbacks = {

gso_segment = 0xffffffff816155b0 ,

gro_receive = 0xffffffff816159a0 ,

gro_complete = 0xffffffff816148c0

},

list = {

next = 0xffffffff81b43b40 ,

prev = 0xffffffff81b3f0e0

}

}

ffffffff81b43b20

struct packet_offload {

type = 56710,

priority = 0,

callbacks = {

gso_segment = 0xffffffff8168c670 ,

gro_receive = 0xffffffff8168c300 ,

gro_complete = 0xffffffff8168c120

},

list = {

next = 0xffffffff81b3f7c0 ,

prev = 0xffffffff81b41be0

}

}

ffffffff81b3f7a0

struct packet_offload {

type = 22629,

priority = 10,

callbacks = {

gso_segment = 0x0,

gro_receive = 0xffffffff815bbd60 ,

gro_complete = 0xffffffff815bbbe0

},

list = {

next = 0xffffffff81b3f0e0 ,

prev = 0xffffffff81b43b40

}

}

所有的inet层的gro回调,都存储在inet_offloads 数组中,根据当前加载的模块,本机器对应支持的gro就有:

p inet_offloads

inet_offloads = $48 =

{0x0, 0x0, 0x0, 0x0, 0xffffffff8176fd80 , 0x0, 0xffffffff8176f220 , 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xffffffff8176f560 , 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xffffffff81777680 , 0x0, 0x0, 0x0, 0x0, 0x0, 0xffffffff81770be0 , 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,

。。。。

0x0, 0x0, 0x0}

gro的调用查找过程如下:

从dev层,根据到来的skb,可以根据skb->protocol 作为type的类型,比如type是 .type = cpu_to_be16(ETH_P_IP),然后才会进入 ip_packet_offload 这个层次,

在 offload_base这个链表头找到对应的type,然后获取对应的callback.gro_receive 函数。

找到了对应的inet_gro_receive,就进入了packet层,那么根据iph->protocol,就在 net_offload 数组中,找到对应协议类型的gro结构,比如找到的是 tcpv4_offload。

那么针对tcp的gro,其i40e驱动的调用顺序就是:

i40e_napi_poll--->|i40e_clean_tx_irq

                        --->|i40e_clean_rx_irq-->napi_gro_receive-->dev_gro_receive-->inet_gro_receive-->tcp4_gro_receive

对应的堆栈如下:

[root@localhost caq]# stap -d i40e netif_rx.stp

System Call Monitoring Started (10 seconds)...

WARNING: DWARF expression stack underflow in CFI

0xffffffff816041a0 : tcp4_gro_receive+0x0/0x1b0 [kernel]

0xffffffff81615be9 : inet_gro_receive+0x249/0x290 [kernel]

0xffffffff815951b0 : dev_gro_receive+0x2b0/0x3e0 [kernel]

0xffffffff815955d8 : napi_gro_receive+0x38/0x130 [kernel]-------------gro处理开始

0xffffffffc01f4bde : i40e_clean_rx_irq+0x3fe/0x990 [i40e]

0xffffffffc01f5440 : i40e_napi_poll+0x2d0/0x710 [i40e]

0xffffffff81594cf3 : net_rx_action+0x173/0x380 [kernel]

0xffffffff8109404d : __do_softirq+0xfd/0x290 [kernel]

0xffffffff816c8afc : call_softirq+0x1c/0x30 [kernel]

0xffffffff8102d435 : do_softirq+0x65/0xa0 [kernel]

0xffffffff81094495 : irq_exit+0x175/0x180 [kernel]

0xffffffff816c9da6 : __irqentry_text_start+0x56/0xf0 [kernel]

0xffffffff816bc362 : ret_from_intr+0x0/0x15 [kernel]

 

理清楚了大的流程,我们再来看目前的gro小的流程。在收到一个skb的时候,我们把它和napi_struct中的gro_list的skb进行比较,看能否合并,当然合并的前提是同一个flow的,

除此之外,除了满足同一个flow,还有很多要求。

那这个gro_list最大多长呢?

/* Instead of increasing this, you should create a hash table. */

#define MAX_GRO_SKBS 8

才8个,这8个skb跟新进来的skb是flow相同的概率其实真不高,比如在tcp4_gro_receive中,我想跟踪它接着调用的 skb_gro_receive,无奈由于合并的几率太低而无法跟到,

毕竟还有一个在gro_list中停留的时间限制,为一个jiffies。后来修改了jiffies并且修改了合并的条件才能抓到。

 当然,根据作者的注释,与其将这8改大,不如改成一个hash表,不同的skb先哈希到一个flow链,然后在链中比较看能否合并,这样对于gro流程需要改动为:

1.创建flow的hash表,让skb中看到flow,然后在flow的冲突链中找对应的gro_list,然后看能否合并。

2.percpu模式,不适用napi_struct来管理gro_list.

3.修改合并条件,比如对于tcp的ack来说,目前不带数据的ack无法合并,因为才54个字节,而以太网发出的时候会填充,导致不满足如下条件:

flush = (u16)((ntohl(*(__be32 *)iph) ^ skb_gro_len(skb)) | (id & ~IP_DF));

但对于流媒体服务器来说,纯ack占入向的比例很高,需要将条件改动,由于ack还涉及到快发流程的进入和退出,所以ack合并还是有一些工作要做的。

4.修改间隔,目前限制死了是一个jiffies,比如服务器8M左右的发送码率,收到的ack间隔可以释放放大,不然合并几率也比较低,当然这个是以tcp的send_buf中的数据占用更多内存为前提的。

   所以需要一个导出到/proc文件系统的间隔字段来控制。

5.对于低速发送码率的服务器来说,可以关闭gro,对于lvs服务器来说,应该关闭gro。

水平有限,如果有错误,请帮忙提醒我。如果您觉得本文对您有帮助,可以点击下面的 推荐 支持一下我。版权所有,需要转发请带上本文源地址,博客一直在更新,欢迎 关注 。

posted on

2018-11-08 19:37 

_备忘录 

阅读(1171) 

评论(0) 

编辑 

收藏 

举报

会员力量,点亮园子希望

刷新页面返回顶部

Powered by:

博客园

Copyright © 2024 _备忘录

Powered by .NET 8.0 on Kubernetes