作者 青鸟
什么是netpoll#
引用官方文档,Netpoll 是由 字节跳动 开发的高性能 NIO(Non-blocking I/O) 网络库,专注于 RPC 场景。
从源代码的角度来学习关于go的底层网络的优化也不失为一种乐趣
epoll的知识点#
什么是多路复用#
提到tcp的多路复用,首先想到的就是linux内核中支持的epoll多路复用.
epoll是一种多路复用,那么什么是多路复用呢,这里简单地捋一下多路复用的逻辑.
首先要弄清楚的是linux中,怎么实现tcp通信,在linux的网络实现中,一次tcp的握手+传输如下图所示:
这里就不再赘述tcp的连接与通信的相关知识,感兴趣的话可以看I/O 多路复用:select/poll/epoll这份面经.我们所需要了解的是服务器主进程listen了端口后,一旦与客户的连接完成,accept()函数就会返回一个连接的socket fd用于和客户端通信.在传统的通信模型中,会为每一条tcp链接分配一个进程或者线程,当数量增大时,需要维护的线程数量也会增大,线程的上下文切换也会成为可能的瓶颈之一.
在Http1.0中,会为每一个请求分配一个tcp链接,这就带来了网络上的性能瓶颈.那么我们能不能在一条tcp的连接上发送多条请求,注意这里的发送并不是以BIO的方式依次排队发送,而是像并发的一样去发送请求.换句话说,我们能不能在一条tcp请求也就是一个进程中为多个fd的请求服务,这就是多路复用.
什么是epoll#
简单的说epoll就是用来维护fd的,他会监视一个fd 集合,当集合中的socket fd中有触发时,就会交给对应的应用程序来处理.不赘述细节,这里只说一下epoll的触发模式,也就是边缘触发(ET)和水平触发(LT).
使用边缘触发模式时,当被监控的 Socket 描述符上有可读事件发生时,服务器端只会从 epoll_wait 中苏醒一次,即使进程没有调用 read 函数从内核读取数据,也依然只苏醒一次,因此我们程序要保证-次性将内核缓冲区的数据读取完.
使用水平触发模式时,当被监控的 Socket 上有可读事件发生时,服务器端不断地从 epoll wait 中苏醒,直到内核缓冲区数据被 read 函数读完才结束,目的是告诉我们有数据需要读取.
原生库的问题#
原生库:
- 原生库无法感知连接状态
- 在使用连接池时,池中存在失效连接,影响连接池的复用。
- 原生库存在goroutine 暴涨的风险
- 一个连接一个goroutine的模式,由于连接利用率低下,存在大量goroutine占用调度开销,影响性能。
- 使⽤ Epoll ET. Go Net 只有⼀个 Epoll 事件循环(因为 ET 模式被唤醒的少,且事件循环内⽆需负责读写,所以⼲的活少),⽽ Netpoll 允许有多个事件循环(循环内需要负责读写,⼲的活多,读写越重,越需要开更多 Loops)
- Go Net 不⽀持 Zero Copy,甚⾄于如果⽤⼾想要实现 BufferdConnection 这类缓存读取,还会产⽣⼆次拷⻉。
- ET模式在高并发下调度压力比较大,因为 EventLoop 本⾝只是监听事件,真正的读写操作都在⽤⼾⾃⼰的 Goroutine 函数中执⾏,不由⽹络库控制;因此每次 EventLoop监听到事件发生后,都需要唤醒对应的线程去读写数据,这里存在上下文切换开销。
netpoll:
- 解决无法感知连接状态问题
- 引入epoll主动监听机制,感知连接状态Netpoll 连接数和 Goroutine 数量没有关系,和请求数有⼀定关系,但是有 Gopool 重⽤。
- Netpoll 使⽤ LT. Netpoll 在⼤包场景下会占⽤更多的内存. Netpoll 允许有多个事件循环(循环内需要负责读写,⼲的活多,读写越重,越需要开更多 Loops)
- 引入Nocopy Buffer,向上层提供NoCopy的调用接口,编解码层面零拷贝
- Netpoll ⽀持管理⼀个 Buffer 池直接交给⽤⼾,且上层⽤⼾可以不使⽤ Read(p []byte) 接⼝⽽使⽤特定零拷⻉读取接⼝对 Buffer 进⾏管理,实现零拷⻉能⼒的传递。
- LT 单线程轮询对 cache/计算类业务更友好,因为 Cache 的特点是业务逻辑执⾏的⾮常快,所以在 readv 完了后可以⽴刻执⾏ handler 同时执⾏write,整个过程都不需要进⾏线程调度。对于计算类任务⽽⾔,越少协程切换能够让 CPU 尽可能少的做⽆效⼯作。
netpoll的优化#
相关数据结构#
首先看一副官方架构图
主要流程#
整个流程要分为三部分来看:
- netpoll 初始化:
netpoll 启动时,会初始化poll manager , 依次初始化池中每个poll对象
首先调用EpollCreate api创建一个新的Epoll对象,然后将其与当前poll对象绑定,同时还会为当前poll对象分配linkbuffer等缓冲区
为每个poll对象开启一个协程来不断轮询当前epoll上的可读可写等事件
- server 端:
启动后,从poll manager中获取一个空闲的poll ,将listener fd注册到poll中,监听accept事件
当accept 到客户端连接后,从poll manager中获取一个空闲的poll ,将客户端socket fd注册到poll中,监听可读事件
每个poll会关联一个LinkBuffer对象,当监听到客户端连接上的可读事件后,从linkbuffer中预定一块内存,将数据都读取到这块内存中来
包装一个模版任务,用于不断轮询处理linkbuffer上剩余可读数据,同时每次轮询完后,都会回调用户设置好的OnRequest函数,就是上图的handler函数
包装的模版任务会被提交到协程池中执行,也就是上图中的gopool
与内核的系统调⽤交互完全由⽹络库进⾏控制,⽤⼾对 Conn 的读写都只是在操作⼀段 Buffer ⽽已
- client 端:
启动后,建立和server端的连接 , 从opCache对象池中获取一个空闲的FDOperator对象返回,然后等待直到client socket可写
调用connection提供的相关写api如malloc,先分配一块内存用于写数据
写完需要发送给server的数据,调用flush api进行数据提交
flush api会首先尝试将数据写入socket内核缓冲区中,如果一次没写完,说明socket缓冲区写满了,此时会在poll上注册对当前socket fd可写事件监听
然后调用waitFlush api阻塞等待writeTrigger通道发送过来的可写通知
当poll线程监听到当前socket fd上发生了可写事件的时候,会向writeTrigger通道发送消息,唤醒等待的客户端
epoll接口#
我们可以先来简单看一下epoll的syscall的封装,其实就是对应了eopll的三个syscall的函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| type epollevent struct {
events uint32 // events:表示要监听的事件类型,如可读、可写等。这是一个位掩码,可以设置多个事件类型,例如 EPOLLIN 表示可读事件,EPOLLOUT 表示可写事件。
data [8]byte // 可以携带用户数据。这里的用户数据通常是对应的fd(如 ev.data.fd),以便于识别对应的文件描述符。
}
// EpollCreate implements epoll_create1.
// 内核中间加一个 ep 对象,把所有需要监听的 socket 都放到 ep 对象中
func EpollCreate(flag int) (fd int, err error) {
}
// EpollCtl implements epoll_ctl.
// epoll_ctl 负责在 epollfd 对象上,把 socket fd 增加、删除(ADD/DEL/...)到内核红黑树,并设置感兴趣的事件(EPOLLIN/EPOLLOUT/...)
func EpollCtl(epfd, op, fd int, event *epollevent) (err error) {
}
// EpollWait implements epoll_wait.
// epoll_wait: 阻塞等待直到 epollfd 内有就绪事件便返回,返回值为有效事件数,并且有效事件会记录再传⼊的 events 地址中
func EpollWait(epfd int, events []epollevent, msec int) (n int, err error) {
}
|
poll#
poll是封装的epoll对象,他会调用不同的环境下的多路复用的实现,在linux amd64下就是epoll.
然后再看poll 本身
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| type Poll interface {
// Wait 将轮询所有已注册的fds,并根据触发的事件安排处理。
Wait() error
// 关闭poll和停止wait.
Close() error
// 当没有事件触发时,可以使用Trigger来主动刷新Wait所在的循环.
// On linux systems, eventfd is used by default.
Trigger() error
// 控制file descriptor的事件,操作由PollEvent定义
Control(operator *FDOperator, event PollEvent) error
// 从cache中分配FDOperator.
Alloc() (operator *FDOperator)
// 从cache中释放FDOperator.
Free(operator *FDOperator)
}
|
这里简单看几个重要的实现,可以看出netpoll在对epoll的syscall上又封装了一层实现以满足需求.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
| // Control implements Poll.
// 注册感兴趣的事件,netpoll在epoll_ctl基础之上又封装了一层
// 从Control函数中可以看出来,netpoll会在epollevent的data字段中保存监听的fd对象信息,这里fd对象是netpoll经过封装后的FDOperator对象。
func (p *defaultPoll) Control(operator *FDOperator, event PollEvent) error {
// DON'T move `fd=operator.FD` behind inuse() call, we can only access operator before op.inuse() for avoid race
// G1: G2:
// op.inuse() op.unused()
// op.FD -- T1 op.FD = 0 -- T2
// T1 and T2 may happen together
fd := operator.FD
var op int
var evt epollevent
// 将epollevent对象的data指针指向传入的FDOperator对象
p.setOperator(unsafe.Pointer(&evt.data), operator)
switch event {
case PollReadable: // server accept a new connection and wait read
operator.inuse()
op, evt.events = syscall.EPOLL_CTL_ADD, syscall.EPOLLIN|syscall.EPOLLRDHUP|syscall.EPOLLERR
case PollWritable: // client create a new connection and wait connect finished
operator.inuse()
op, evt.events = syscall.EPOLL_CTL_ADD, EPOLLET|syscall.EPOLLOUT|syscall.EPOLLRDHUP|syscall.EPOLLERR
case PollDetach: // deregister
p.delOperator(operator)
op, evt.events = syscall.EPOLL_CTL_DEL, syscall.EPOLLIN|syscall.EPOLLOUT|syscall.EPOLLRDHUP|syscall.EPOLLERR
case PollR2RW: // connection wait read/write
op, evt.events = syscall.EPOLL_CTL_MOD, syscall.EPOLLIN|syscall.EPOLLOUT|syscall.EPOLLRDHUP|syscall.EPOLLERR
case PollRW2R: // connection wait read
op, evt.events = syscall.EPOLL_CTL_MOD, syscall.EPOLLIN|syscall.EPOLLRDHUP|syscall.EPOLLERR
}
return EpollCtl(p.fd, op, fd, &evt)
}
// Trigger implements Poll.
func (p *defaultPoll) Trigger() error {
if atomic.AddUint32(&p.trigger, 1) > 1 {
return nil
}
// MAX(eventfd) = 0xfffffffffffffffe
_, err := syscall.Write(p.wop.FD, []byte{0, 0, 0, 0, 0, 0, 0, 1})
return err
}
// openDefaultPoll 创建epoll 封装多路复用器的对象
func openDefaultPoll() (*defaultPoll, error) {
poll := new(defaultPoll)
poll.buf = make([]byte, 8)
// 调用EpollCreate创建epoll对象
p, err := EpollCreate(0)
if err != nil {
return nil, err
}
poll.fd = p
// eventfd是一种进程/线程通信的机制,他类似信号,不过eventfd只是一种通知机制
// 无法承载数据(eventfd承载的数据是8个字节),他的好处是简单并且只消耗一个fd
// 进程间通信机制: https://zhuanlan.zhihu.com/p/383395277
//此处使用eventFD是为了epoll池关闭的时候,通知那些阻塞在epoll_wait系统调用上的线程可以醒过来,然后结束自己。
r0, _, e0 := syscall.Syscall(syscall.SYS_EVENTFD2, 0, 0, 0)
if e0 != 0 {
_ = syscall.Close(poll.fd)
return nil, e0
}
poll.Reset = poll.reset
poll.Handler = poll.handler
poll.wop = &FDOperator{FD: int(r0)}
// 在epoll上注册并监听eventFd的可读事件 -- 监听r0上的可读事件
if err = poll.Control(poll.wop, PollReadable); err != nil {
_ = syscall.Close(poll.wop.FD)
_ = syscall.Close(poll.fd)
return nil, err
}
poll.opcache = newOperatorCache()
return poll, nil
}
// Wait implements Poll.
// 不断轮询注册到该epoll上的fd事件
func (p *defaultPoll) Wait() (err error) {
// init
caps, msec, n := barriercap, -1, 0
p.Reset(128, caps)
// wait
for {
if n == p.size && p.size < 128*1024 {
p.Reset(p.size<<1, caps)
}
// p.fd 就是 epoll fd
// events 就是挂载到epoll tree上的epoll item
// mesc 用于指定阻塞时间,是永久阻塞,还是阻塞一段时间,还是非阻塞IO
// 等待当前epoll上发生感兴趣的事件
n, err = EpollWait(p.fd, p.events, msec)
if err != nil && err != syscall.EINTR {
return err
}
// 如果没有发生感兴趣的事件,则将msec设置为-1,表示下一次采用永久阻塞策略来等待感兴趣的事件发生
// 然后调用Gosched完成协程调度
if n <= 0 {
msec = -1
runtime.Gosched()
continue
}
msec = 0
// 处理感兴趣的事件
//defaultPoll的Handler回调接口是在openDefaultPoll函数中被赋值的
if p.Handler(p.events[:n]) {
return nil
}
// we can make sure that there is no op remaining if Handler finished
p.opcache.free()
}
}
// 当epoll上有感兴趣的事件发生的时候,调用该函数进行处理
func (p *defaultPoll) handler(events []epollevent) (closed bool) {
var triggerRead, triggerWrite, triggerHup, triggerError bool
var err error
// 遍历所有感兴趣的事件
for i := range events {
// epollevent.data保存的是与之关联的FDOperator对象
operator := p.getOperator(0, unsafe.Pointer(&events[i].data))
if operator == nil || !operator.do() {
continue
}
var totalRead int
// 判断当前发生了什么事件
evt := events[i].events
triggerRead = evt&syscall.EPOLLIN != 0
triggerWrite = evt&syscall.EPOLLOUT != 0
triggerHup = evt&(syscall.EPOLLHUP|syscall.EPOLLRDHUP) != 0
triggerError = evt&syscall.EPOLLERR != 0
// trigger or exit gracefully
// 是否是eventFD可读事件发生了
if operator.FD == p.wop.FD {
// must clean trigger first
// 从eventFD中读取数据到buf中
syscall.Read(p.wop.FD, p.buf)
atomic.StoreUint32(&p.trigger, 0)
// if closed & exit
// 说明接收到了关闭信号,那么就关闭当前epoll
if p.buf[0] > 0 {
syscall.Close(p.wop.FD)
syscall.Close(p.fd)
operator.done()
return true
}
operator.done()
continue
}
// 发生了可读事件
if triggerRead {
// 如果FDOperator上的OnRead回调接口不为空,说明发生的是客户端的accept事件
if operator.OnRead != nil {
// for non-connection
// 调用OnRead来接收并处理客户端连接
operator.OnRead(p)
} else if operator.Inputs != nil {
// for connection
// 每个poll对象会关联一个barriers结构,该结构用于实现分散读取与集中写入的系统调用
// 每个poll对象还会关联一个LinkBuffer对象,作为读写数据缓冲区
// 此处是从LinkBuffer中分配出一块空闲内存
bs := operator.Inputs(p.barriers[i].bs)
if len(bs) > 0 {
// 读取数据到bs缓存区中
n, err := ioread(operator.FD, bs, p.barriers[i].ivs)
// 推动读指针,让写入缓冲区的数据对消费者可见,同时调用用户注册的OnRequest回调接口,处理读数据
operator.InputAck(n)
totalRead += n
if err != nil {
p.appendHup(operator)
continue
}
}
} else {
logger.Printf("NETPOLL: operator has critical problem! event=%d operator=%v", evt, operator)
}
}
operator.done()
}
// hup conns together to avoid blocking the poll.
p.onhups()
return false
}
|
下面是一个poll中的重要函数,用于回调,于是单独抽出来分析一下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
| // 当epoll上有感兴趣(也就是注册到epoll中的事件)的事件发生的时候,调用该函数进行处理
func (p *defaultPoll) handler(events []epollevent) (closed bool) {
var triggerRead, triggerWrite, triggerHup, triggerError bool
var err error
// 遍历所有感兴趣的事件
for i := range events {
// epollevent.data保存的是与之关联的FDOperator对象
operator := p.getOperator(0, unsafe.Pointer(&events[i].data))
if operator == nil || !operator.do() {
continue
}
var totalRead int
// 判断当前发生了什么事件
evt := events[i].events
triggerRead = evt&syscall.EPOLLIN != 0
triggerWrite = evt&syscall.EPOLLOUT != 0
triggerHup = evt&(syscall.EPOLLHUP|syscall.EPOLLRDHUP) != 0
triggerError = evt&syscall.EPOLLERR != 0
// trigger or exit gracefully
// 是否是eventFD可读事件发生了
if operator.FD == p.wop.FD {
// must clean trigger first
// 从eventFD中读取数据到buf中
syscall.Read(p.wop.FD, p.buf)
atomic.StoreUint32(&p.trigger, 0)
// if closed & exit
// 说明接收到了关闭信号,那么就关闭当前epoll
if p.buf[0] > 0 {
syscall.Close(p.wop.FD)
syscall.Close(p.fd)
operator.done()
return true
}
operator.done()
continue
}
// 发生了可读事件
if triggerRead {
// 如果FDOperator上的OnRead回调接口不为空,说明发生的是客户端的accept事件
if operator.OnRead != nil {
// for non-connection
// 调用OnRead来接收并处理客户端连接
operator.OnRead(p)
} else if operator.Inputs != nil {
// for connection
// 每个poll对象会关联一个barriers结构,该结构用于实现分散读取与集中写入的系统调用
// 每个poll对象还会关联一个LinkBuffer对象,作为读写数据缓冲区
// 此处是从LinkBuffer中分配出一块空闲内存
bs := operator.Inputs(p.barriers[i].bs)
if len(bs) > 0 {
// 读取数据到bs缓存区中
n, err := ioread(operator.FD, bs, p.barriers[i].ivs)
// 推动读指针,让写入缓冲区的数据对消费者可见,同时调用用户注册的OnRequest回调接口,处理读数据
operator.InputAck(n)
totalRead += n
if err != nil {
p.appendHup(operator)
continue
}
}
} else {
logger.Printf("NETPOLL: operator has critical problem! event=%d operator=%v", evt, operator)
}
}
//忽略write和hup的代码...
operator.done()
}
// hup conns together to avoid blocking the poll.
p.onhups()
return false
}
|
FDOperator#
上面接口中有和FDOperator这个参数,FDOperator封装了对fd相关的操作.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
| type FDOperator struct {
// 监听的 fd
FD int
// The poll actively fire the FDOperator when fd changes, no check the return value of FDOperator.
// 监听到读写事件后的回调函数
OnRead func(p Poll) error // accept 事件回调
OnWrite func(p Poll) error // 客户端 socket 写回调
OnHup func(p Poll) error
// The following is the required fn, which must exist when used, or directly panic.
// Fns are only called by the poll when handles connection events.
// linkbuffer 与 socket 缓冲区之间的读写API
Inputs func(vs [][]byte) (rs [][]byte)
InputAck func(n int) (err error)
// Outputs will locked if len(rs) > 0, which need unlocked by OutputAck.
Outputs func(vs [][]byte) (rs [][]byte, supportZeroCopy bool)
OutputAck func(n int) (err error)
// 用于反向查找 注册在哪个poll中
poll Poll
// protect only detach once
detached int32
// 用于 operatorCache
next *FDOperator
state int32 // CAS: 0(unused) 1(inuse) 2(do-done)
index int32 // index in operatorCache
}
|
可以预想FDOperator也会频繁的创建和销毁,所以netpoll为他添加了对象池,可以快速的分配和回收FDOperator
1
2
3
4
5
6
7
8
9
| type operatorCache struct {
first *FDOperator
cache []*FDOperator
locked int32
// freelist store the freeable operator
// to reduce GC pressure, we only store op index here
freelocked int32
freelist []int32
}
|
在上面的接口中,我们可以看到,FDOperator会执行回调,为了避免写入和读取造成的开销,netpoll实现了一个LinkBufferPool,用于链式读取和写入.
也是介于conn和FD之间的读写缓存池(其实对象池存的是linkbuffernode),我们首先看一下对象池的实现,其实用的就是go的原生实现的对象池.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| func newLinkBufferNode(size int) *linkBufferNode {
node := linkedPool.Get().(*linkBufferNode)
// reset node offset
node.off, node.malloc, node.refer, node.mode = 0, 0, 1, defaultLinkBufferMode
if size <= 0 {
node.setMode(readonlyMask, true)
return node
}
if size < LinkBufferCap {
size = LinkBufferCap
}
node.buf = malloc(0, size)
return node
}
var linkedPool = sync.Pool{
New: func() interface{} {
return &linkBufferNode{
refer: 1, // 自带 1 引用
}
},
}
|
然后我们看一下LinkBuffer的具体的实现,也就是UnsafeLinkBuffer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| // UnsafeLinkBuffer implements ReadWriter.
type UnsafeLinkBuffer struct {
length int64
mallocSize int
head *linkBufferNode // release head 链表头部
read *linkBufferNode // read head 读取位置
flush *linkBufferNode // malloc head 写入提交位置
write *linkBufferNode // malloc tail 链表尾部
// buf allocated by Next when cross-package, which should be freed when release
caches [][]byte
// for `Peek` only, avoid creating too many []byte in `caches`
// fix the issue when we have a large buffer and we call `Peek` multiple times
cachePeek []byte
}
|
这块的逻辑有点复杂,建议去看字节官方的原文- 字节跳动在 Go 网络库上的实践,里面有详细的说明.总结下来就是有效避免了在水平触发下上层应用和conn读写时候内存安全问题.
manager#
manager 用于管理多个 poller, 将fd平均分配到多个 poller 上,来负载均衡多个epoll上的fd数量,避免单个epoll上的fd数量过多
需要理解的是netpoll并不关心fd的依赖关系,只是负责将fd平均分配到多个poller上,避免单个poller也就是epoll上的fd数量过多
1
2
3
4
5
6
| type manager struct {
numLoops int32
status int32 // 0: uninitialized, 1: initializing, 2: initialized
balance loadbalance // load balancing method
polls []Poll // all the polls
}
|
实现较为简单,有意思的是这个对象内部实现了一个负载均衡器用于分配fd
内部是有两种策略,相关的实现代码也很简单粗暴,就是一个字节自己实现的快速随机rand,随机返回数组下标= =
1
2
3
4
5
6
7
| const (
// RoundRobin requests that connections are distributed to a Poll
// in a round-robin fashion.
RoundRobin LoadBalance = iota
// Random requests that connections are randomly distributed.
Random
)
|
ShardQueue#
这里的处理逻辑也是非常复杂,建议自行看源代码,不是三言两语就能解释的了.功能如下:
ShardQueue 的作用是实现一个高效的并发数据发送队列,在一条连接上分片发送数据.主要用于处理网络数据的传输。它利用了 Go 语言的并发特性和 netpoll 库的无拷贝 API. ShardQueue 将数据分成多个分片(shards),每个分片可以独立处理。这种设计可以提高并发性能,因为多个 goroutine 可以同时处理不同的分片。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
// WriterGetter 用来获取 netpoll.Writer.
type WriterGetter func() (buf netpoll.Writer, isNil bool)
// ShardQueue 使用 netpoll 的 nocopy API 来合并和发送数据。
// Data Flush 由 ShardQueue.Add 被动触发,不需要用户操作。
// 如果数据传输出现错误,连接将会被关闭。
// ShardQueue.Add:添加需要发送的数据。
type ShardQueue struct {
conn netpoll.Connection
idx, size int32
getters [][]WriterGetter // len(getters) = size
swap []WriterGetter // use for swap
locks []int32 // len(locks) = size
queueTrigger
}
|
server accept与执行回调函数逻辑#
这里的调用链路有点复杂,这里捋一下
server端在运行时会调用Run,这里会初始化用户listen connection的fd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| // Run this server.
func (s *server) Run() (err error) {
s.operator = FDOperator{
FD: s.ln.Fd(),
OnRead: s.OnRead,
OnHup: s.OnHup,
}
log.Println("Pick listen poller for server")
s.operator.poll = pollmanager.Pick()
err = s.operator.Control(PollReadable)
if err != nil {
s.onQuit(err)
}
return err
}
|
其中的OnRead接口中会处理Accept请求,用于建立connection连接
1
2
3
4
5
6
7
8
9
10
11
12
13
| // OnRead implements FDOperator.
func (s *server) OnRead(p Poll) error {
// accept socket
conn, err := s.ln.Accept()
if err == nil {
if conn != nil {
log.Println("NETPOLL: accept conn success:", conn.RemoteAddr())
s.onAccept(conn.(Conn))
}
// EAGAIN | EWOULDBLOCK if conn and err both nil
return nil
}
logger.Printf("NETPOLL: accept conn failed: %v", err)
|
onAccept会负责建立conn,
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| func (s *server) onAccept(conn Conn) {
// store & register connection
nconn := new(connection)
nconn.init(conn, s.opts)
if !nconn.IsActive() {
return
}
fd := conn.Fd()
nconn.AddCloseCallback(func(connection Connection) error {
s.connections.Delete(fd)
return nil
})
s.connections.Store(fd, nconn)
// trigger onConnect asynchronously
nconn.onConnect()
}
|
这里的init函数和onConnect函数都会去调用onProcess函数,这里不赘述这两个函数.init中使用是如果在连接onPrepare后,有input数据了,那么就会直接调用onRequest接口,去处理数据,否则就只把onRequest函数注册进connection中.然后再通过onConnect去做处理.在onProcess中处理所有的input输入.几个函数之间的关系有点复杂,可以跑一下connection的Benchmark测试来理解一下这块的逻辑并结合架构图,有点绕的.
注意这里有两个onRequest,一个是应用侧注册进的回调函数,指连接上发生读事件时 Netpoll 触发的回调.另一个是在connetcion中封装的onRequest处理函数,本质是为了在connect的时候执行一次回调
同时可以看到回调的实现默认使用了gopool,按照文档的说法,主要解决的问题是rpc服务中的栈扩张问题,可以参考下面列出的一篇文章. 如果不会面临栈扩张,建议关闭gopool采用直接回调的方式来处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
// init initialize the connection with options
func (c *connection) init(conn Conn, opts *options) (err error) {
...
return c.onPrepare(opts)
}
// onPrepare supports close connection, but not read/write data.
// connection will be registered by this call after preparing.
func (c *connection) onPrepare(opts *options) (err error)
// onRequest is responsible for executing the closeCallbacks after the connection has been closed.
func (c *connection) onRequest() (needTrigger bool)
// onConnect is responsible for executing onRequest if there is new data coming after onConnect callback finished.
func (c *connection) onConnect()
var runTask = gopool.CtxGo
// onProcess is responsible for executing the onConnect/onRequest function serially,
// and make sure the connection has been closed correctly if user call c.Close() in onConnect/onRequest function.
func (c *connection) onProcess(onConnect OnConnect, onRequest OnRequest) (processed bool) {
// add new task
runTask(c.ctx, task)
}
|
看完上面的代码分析,再回过头去看netpoll的结构图,不得不感叹netpoll简短的几行代码对于性能做到了最极致的优化,其中的很多性能优化策略都可以借鉴到平时的业务中,对架构的思考很有启发.
参考文章: