性能远超chan的无锁队列
作者:leonzgshao,腾讯CSIG后台开发工程师
| 导语 结合Java语言的高可用无锁队列框架Disruptor实现的高性能无锁队列,可实现远高于chan的高性能数据传递,解决高并发环境下chan写入数据慢的问题。 1. chan的困境1.1. chan高并发挺慢
按照我之前的认知,我以为既然go提供了这么一种通信方式,那么它的性能自然一定是有保证的。事实证明,靠想、靠自以为是不靠谱的,chan并没有想象中的那么快,尤其是并发的时候。先做一个最简单的测试,并发向chan中放入数据,看下放入的时间分布: var ( t1_10us = uint64(0) // 1-10微秒 t10_100us = uint64(0) // 10-100微秒 t100_1000us = uint64(0) // 100-1000微秒 t1_10ms = uint64(0) // 1-10毫秒 t10_100ms = uint64(0) // 10-100毫秒 t100_ms = uint64(0) // 大于100毫秒 ) var ( length = 1024 * 1024 goSize = 100 numPerGo = 10000 counter = uint64(0) slower = uint64(0) wg sync.WaitGroup ) ch := make(chan uint64, length) // 消费端 go func() { var ts time.Time var count int32 for { x := <-ch atomic.AddInt32(&count, 1) if count == 1 { ts = time.Now() } if x%100000 == 0 { fmt.Printf("read %d ", x) } if count == int32(goSize*numPerGo) { tl := time.Since(ts) fmt.Printf("read time = %d ms ", tl.Milliseconds()) } } }() wg.Add(goSize) totalS := time.Now() for i := 0; i < goSize; i++ { go func() { for j := 0; j < numPerGo; j++ { x := atomic.AddUint64(&counter, 1) ts := time.Now() ch <- x tl := time.Since(ts) ms := tl.Microseconds() if ms > 1 { atomic.AddUint64(&slower, 1) if ms < 10 { // t1_10us atomic.AddUint64(&t1_10us, 1) } else if ms < 100 { atomic.AddUint64(&t10_100us, 1) } else if ms < 1000 { atomic.AddUint64(&t100_1000us, 1) } else if ms < 10000 { atomic.AddUint64(&t1_10ms, 1) } else if ms < 100000 { atomic.AddUint64(&t10_100ms, 1) } else { atomic.AddUint64(&t100_ms, 1) } } } wg.Done() }() } wg.Wait() totalL := time.Since(totalS) fmt.Printf("write total time = [%d ms] ", totalL.Milliseconds()) time.Sleep(time.Second * 3) fmt.Printf("slow ratio = %.2f ", float64(slower)*100.0/float64(counter)) fmt.Printf("quick ratio = %.2f ", float64(goSize*numPerGo-int(slower))*100.0/float64(goSize*numPerGo)) fmt.Printf("[<1us][%d] ", counter-slower) fmt.Printf("[1-10us][%d] ", t1_10us) fmt.Printf("[10-100us][%d] ", t10_100us) fmt.Printf("[100-1000us][%d] ", t100_1000us) fmt.Printf("[1-10ms][%d] ", t1_10ms) fmt.Printf("[10-100ms][%d] ", t10_100ms) fmt.Printf("[>100ms][%d] ", t100_ms)
上述例子中,启动了100个协程,每个协程循环向chan中放入10000个对象,上面的代码在我的mac中执行结果如下: write total time = [184 ms] read time = 196 ms slow ratio = 14.72 quick ratio = 85.28 [<1us][852773] [1-10us][101126] [10-100us][45671] [100-1000us][395] [1-10ms][19] [10-100ms][16] [>100ms][0]
然而我们仔细的分析一下,可以看到如下两个点:
1)对象在放入chan中时,还是比较耗时的,尤其是会存在不小比例的耗时比较高的,例如上面的16个10-100ms间的操作,这种波动或者说抖动会高并发时会严重影响我们的性能,我们期望这种抖动尽可能降低;
2)上述测试的chan长度是1024x1024,实际上比放入对象的大小还大,也就是说 即使chan不满,也会导致一定的慢耗时 ; 1.2. chan结构
在排查原因前,我们先看下chan的结构,众所周知,chan的结构如下 runtime.hchan :type hchan struct { qcount uint // total data in the queue dataqsiz uint // size of the circular queue buf unsafe.Pointer // points to an array of dataqsiz elements elemsize uint16 closed uint32 elemtype *_type // element type sendx uint // send index recvx uint // receive index recvq waitq // list of recv waiters sendq waitq // list of send waiters // lock protects all fields in hchan, as well as several // fields in sudogs blocked on this channel. // // Do not change another G"s status while holding this lock // (in particular, do not ready a G), as this can deadlock // with stack shrinking. lock mutex }1.3. chan为什么这么慢
其实已经有很多文章介绍chan的原理了,我就不详细描述了,从影响性能的角度来看,简单来说有两点: 如果recv协程空闲,则send协程会优先给recv协程,以提高效率; recv和send协议都使用了 hchan.lock 对象,该对象是一个锁,也就是说它们之间会有竞争;
对于前者,给我们提供了一个思路,那就是由多个recv协程读取,这样可以提高放入的性能,但是具体设置多少呢?实际上业务并不好判断,因此并不是一种好的选择。
对于后者,我们先来说下这个lock,这个lock不是 sync.Mutex ,sync.Mutex 是go给我们开发者提供一个锁,这个锁的实现其实比较复杂,也有很多文章来说明,我这里就不详细说明了,但是这个锁有一个好处,它其实是一个轻量级的锁,这个轻量主要说的是这个锁是由go自身提供的,当出现锁切换时,它并不需要调用操作系统的lock来让渡出cpu资源,而仅仅是通过gopark()的方式,将目前正在执行的g放回到p中,等待其他m来调度它,它的切换相对来说没有那么耗费资源。
而hchan中的lock是一个 runtime.mutex 。这个锁是一个互斥锁,在linux系统中它的实现是futex,在没有竞争的情况下,会退化成为一个自旋操作,速度非常快,但是当竞争比较大时,它就会在内核中休眠。注意,此处是在内核中休眠,而与runtime.Mutex 是不同的,这也是为什么chan会这么慢的原因。2. 无锁队列2.1. Disruptor
熟悉java语言的小伙伴应该知道有一个比较出名的高性能无锁队列框架:Disruptor,github地址:https://github.com/LMAX-Exchange/disruptor 当然go语言也有一个对应的库go-disruptor:https://github.com/smarty-prototypes/go-disruptor
但是在实际测试中发现go-disruptor其实性能比较一般,没有Java中的优化那么多,并且go-disruptor并发写入需要使用锁,非常不优化,性能比较低。因此,就萌生了写一个go版本Disruptor想法。 2.2. Lockfree
要做到无锁实现队列模型,那么能依赖的其实就只有atomic(原子操作)。本无锁队列参考了Disruptor的实现,根据go语言的特点,加入了自己的一些思考,代码已经开源,地址:https://github.com/bruceshao/lockfree 欢迎来拍。
lockfree的核心优化点包括如下几个方面,其中很多也是我们在开发高性能程序需要关注的,可以参考: 1)绝对的无锁实现:
lockfree内部几乎所有的操作都是通过 原子变量(atomic) 来操作,仅仅有一处使用了chan,作为队列长时间为空时,消费g阻塞使用,该chan只有在队列为空的情况下触发,所以不会影响性能。 2)单一的消费g:
消费g即队列的消费者, 将消费g设置为单一g ,即整个无锁队列只有一个g用于消费,这样就屏蔽掉了读操作竞争带来的性能损耗。 3)写不等待原则:
本身无锁队列的设计初衷就是写入要快,因此对于写入的操作是不会等待的,当无法写入时会持续通过自旋加任务调度的方式处理,一方面尽量加快写入效率,另一方面则是防止占用太多CPU资源。其核心处理代码: // 获取下一个可写入序号 seq := q.seqer.next() pos := int(seq & q.mask) for { if q.abuf.disabled(pos) { q.rbuf.write(pos, v) q.abuf.enable(pos) // 如果接收方阻塞则释放 q.abuf.release() break } // 写操作持续等待,该等待仅会调用runtime.Gosched()进行当前g的调度让出 loop, _ = wait(loop, WriteWaitMax) }4)Pointer替代切片:
available切片用于标记ringbuffer中元素的可用状态。尽管其是一个[]uint8结构,但实际上当高并发对其进行赋值更新时,由于每次操作在其内部都会进行越界判断(通过汇编代码获得该信息),导致其寻址性能并不高。因此通过对切片结构中的Data进行unsafe.Pointer操作,提高了其可用状态调整的性能。 // enable 设置pos位置为可读状态,读线程可读取 func (a *available) enable(pos int) { *(*uint8)(unsafe.Pointer(uintptr(a.buf) + uintptr(pos))) = 1 } // disable 设置pos位置为可写状态,写入线程可写入值 func (a *available) disable(pos int) { *(*uint8)(unsafe.Pointer(uintptr(a.buf) + uintptr(pos))) = 0 }5)一次性内存分配:
使用环状结构Ringbuffer实现对象的传递,RingBuffer中存储对象为包含传递对象结构的结构体,可以进行一次性内存分配,提高处理的性能。 6)缓存行填充:
在计算机硬件上,为了提高效率,cpu存在多级高速缓存(通常是三级)。指令和数据会被事先加载到多级缓存中,这样cpu就不用每次与内存进行交互,从而提高效率。然而实际上不会只加载需要的数据,而是会加载需要数据的上文部分数据,因为根据程序的局部性原理,这些数据后面大概率会用到,这样就避免了再次加载,提高了效率。但是如果这样一次性加载的数据如果被多个cpu核心操作,就会涉及到一个竞争,因此每次加载和更新的数据是有冲突的(从应用程序上来看是没有冲突的),这就形成了所谓的伪共享。解决这个问题的办法就是缓存行填充,操作系统一般一次性加载的缓存行大小是64B,因此可以在其前和后各加入部分字段来解决。数据结构如下: // cursor 游标,一直持续增长的一个uint64序列 // 该序列用于wg(Write Goroutine)获取对应写入到buffer中元素的位置操作 // 通过使用atomic操作避免锁,提高性能 // 通过使用padding填充的方式,填充前面和后面各使用7个uint64(缓存行填充),避免伪共享问题 type cursor struct { p1, p2, p3, p4, p5, p6, p7 uint64 v uint64 p9, p10, p11, p12, p13, p14, p15 uint64 }7)与运算加速:
RingBuffer的容量必须设置为2的n次方,这样就可以通过与运算来代替取余运算,从而提高整体的性能。 8)泛型加速:
Go1.18版本后引入了泛型,泛型与interface有很明显的区别,从性能上来看,泛型是在编译阶段确定类型,这样可有效降低在运行时进行类型转换的耗时(经过测试,这部分还是比较耗时的)。 2.3. 核心概念
Lockfree的整体结构关系如下所示:
ringBuffer
具体对象的存放区域,通过数组(定长切片)实现环状数据结构,其中的数据对象是具体的结构体而非指针,这样可以一次性进行内存申请,避免频繁内存申请带来的系统开销。数据结构如下: type e[T any] struct { val T } // ringBuffer 具体对象的存放区域,通过数组(定长切片)实现环状数据结构 // 其中e为具体对象,非指针,这样可以一次性进行内存申请 type ringBuffer[T any] struct { buf []e[T] sequer *sequencer capacity uint64 }available
切片实现的map,通过index(或pos)标识每个位置为0或1,当长时间无法读取时会通过blockC进行阻塞,写线程完成时可释放该blockC。 其内部buf实际是[]uint8,但由于[]uint8切片在寻址时会进行游标是否越界的判断,造成性能下降,因此通过使用unsafe.Pointer直接对对应的值进行操作,从而避免越界判断,提升性能。之所以使用uint8数组而不是使用的bitmap,主要是考虑到写并发的行为,防止bit操作导致数据异常(或靠锁解决)。数据结构如下: // available 切片实现的map,通过index(或pos)标识每个位置为0或1 // 当长时间无法读取时会通过blockC进行阻塞,写线程完成时可释放该blockC // 其内部buf实际是[]uint8,但由于[]uint8切片在寻址时会进行游标是否越界的判断,造成性能下降, // 因此通过使用unsafe.Pointer直接对对应的值进行操作,从而避免越界判断,提升性能 // 之所以使用uint8是考虑到写并发的行为,防止bit操作导致数据异常(或靠锁解决) type available struct { buf unsafe.Pointer blockC chan struct{} block uint32 }sequencer
序号产生器,维护读和写两个状态,写状态具体由内部游标(cursor)维护,读取状态由自身一个uint64变量维护。它的核心方法是next(),用于获取下个可以写入的游标。数据结构如下: // sequencer 序号产生器,维护读和写两个状态,写状态具体由内部游标(cursor)维护。 // 读取状态由自身维护,变量read即可 type sequencer struct { wc *cursor ws waitStrategy rc uint64 // 读取游标,因为该值仅会被一个g修改,所以不需要使用cursor capacity uint64 }Producer
生产者,核心方法是Write,通过调用Write方法可以将对象写入到队列中。支持多个g并发操作,保证加入时处理的效率。数据结构如下: // Producer 生产者 // 核心方法是Write,通过调用Write方法可以将对象写入到队列中 type Producer[T any] struct { seqer *sequencer rbuf *ringBuffer[T] abuf *available mask uint64 status int32 }consumer
消费者,这个消费者只会有一个g操作,这样处理的好处是可以不涉及并发操作,其内部不会涉及到任何锁,对于实际的并发操作由该g进行分配。数据结构如下: // consumer 消费者,这个消费者只会有一个g操作,这样处理的好处是可以不涉及并发操作,其内部不会涉及到任何锁 // 对于实际的并发操作由该g进行分配 type consumer[T any] struct { rbuf *ringBuffer[T] abuf *available seqer *sequencer hdl EventHandler[T] parallel bool mask uint64 // 用于使用&代替%(取余)运算提高性能 status int32 // 运行状态 }waitStrategy
等待策略,该策略用于获取写入可用的sequence时进行的等待。默认提供了两个实现,SchedWaitStrategy和SleepWaitStrategy,前者使用runtime.Gosched(),后者使用time.Sleep()实现。 推荐使用SchedWaitStrategy,也可以自己实现。 EventHandler
事件处理器接口,整个项目中唯一需要用户实现的接口,该接口描述消费端收到消息时该如何处理,它使用泛型,通过编译阶段确定事件类型,提高性能。 // EventHandler 事件处理器接口 // 整个无锁队列中唯一需要用户实现的接口,该接口描述消费端收到消息时该如何处理 // 使用泛型,通过编译阶段确定事件类型,提高性能 type EventHandler[T any] interface { // OnEvent 用户侧实现,事件处理方法 OnEvent(t T) }3. 性能对比3.1. 写入耗时提升
整体上来看,Disruptor在写入和读取上的性能大概都在channel的7倍以上,数据写入的越多,性能提升越明显。 下面是buffer=1024x1024时,写入数据的耗时对比(可以看到写入时间有明显提升):
3.2. 性能对比
仍然以buffer大小为1024 x 1024为例,将写入时间进行分段,形成了如下的表,其中快速率描述的是写入耗时在微秒内的占比:
数据
(g*循环)
队列
快速率
<1us
1-10us
10-100us
100-1000us
1-10ms
10-100ms
>100ms
50*10000
chan
85.24%
426198
48630
24835
327
6
4
4
50*10000
lockfree
98.06%
490307
8340
1255
94
4
0
0
100*10000
chan
84.39%
843858
104287
51598
217
20
20
0
100*10000
lockfree
98.00%
980004
17513
2343
131
9
0
0
1000*10000
chan
10.07%
1007273
117192
50303
8822466
2714
39
13
1000*10000
lockfree
64.06%
6405519
23298
47347
3519377
3083
1376
0
5000*10000
chan
1.98%
990905
119376
48902
530
48835376
4889
22
5000*10000
lockfree
80.97%
40485785
30654
19052
466781
8987742
9986
0
10000*10000
chan
1.12%
1117019
76828
33322
1504
98746320
24960
47
10000*10000
lockfree
88.33%
88333884
46109
43460
630901
9701375
1244271
0
从上图中可以明显看出,lockfree比chan的性能会高很多: lockfree的快速率明显超过chan,并且随着写入数据的增加,其没有明显下降,而chan下降非常明显; lockfree基本没有非常大的耗时(大于100ms),而chan会存在,这种情况会导致比较强烈的抖动;
最后,git地址:https://github.com/bruceshao/lockfree 欢迎小伙伴来拍、沟通和试用。