死磕源码系列之ants协程池高性能协程池ants源码剖析
1、简介ants是什么
ants 是一个高性能的 goroutine 池,实现了对大规模 goroutine 的调度管理、goroutine 复用,允许使用者在开发并发程序的时候限制 goroutine 数量,复用资源,达到更高效执行任务的效果。 功能特点自动调度海量的 goroutines,复用 goroutines 定期清理过期的 goroutines,进一步节省资源 提供了大量有用的接口:任务提交、获取运行中的 goroutine 数量、动态调整 Pool 大小、释放 Pool、重启 Pool 优雅处理 panic,防止程序崩溃 资源复用,极大节省内存使用量;在大规模批量并发任务场景下比原生 goroutine 并发具有更高的性能 非阻塞机制 核心流程
ants核心概念
Pool :协程池 WorkerArray:Pool池中的worker队列,存放所有的goWorker goWorker:运行任务的实际执行者,它启动一个 goroutine 来接受任务并执行函数调用。 2、Pool协程池Pool结构
Ants 提供了两种Pool结构:Pool和PoolWithFunc ;但两者逻辑大致一样,本文着重介绍Pool的结构 // Pool accepts the tasks from client, it limits the total of goroutines to a given number by recycling goroutines. type Pool struct { // capacity of the pool, a negative value means that the capacity of pool is limitless, an infinite pool is used to // avoid potential issue of endless blocking caused by nested usage of a pool: submitting a task to pool // which submits a new task to the same pool. // 协程池的容量 capacity int32 // running is the number of the currently running goroutines. // 正在运行的goroutines的数量 running int32 // lock for protecting the worker queue. // 锁,自旋锁,保护队列 lock sync.Locker // workers is a slice that store the available workers. // 存放池中所有的worker,workerArray包含可用workers队列和过期workers队列,只会从可用workers队列中取可用worker workers workerArray // state is used to notice the pool to closed itself. // 记录池子的状态 (关闭、开启) state int32 // cond for waiting to get an idle worker. // 条件变量 cond *sync.Cond // workerCache speeds up the obtainment of a usable worker in function:retrieveWorker. // worker 对象池 workerCache sync.Pool // waiting is the number of goroutines already been blocked on pool.Submit(), protected by pool.lock //阻塞等待的任务量 waiting int32 // 清道夫,定时清理workerarray 队列中过期的worker purgeDone int32 stopPurge context.CancelFunc // 定时器 更新pool中now的字段 ticktockDone int32 stopTicktock context.CancelFunc now atomic.Value // 需要自定义加载的配置 options *Options }Pool创建// NewPool generates an instance of ants pool. func NewPool(size int, options ...Option) (*Pool, error) { opts := loadOptions(options...) // 加载自定义的options中的配置 if size <= 0 { size = -1 } if !opts.DisablePurge {// 当 DisablePurge 为 true 时,worker 不会被清除并且是驻留的。 if expiry := opts.ExpiryDuration; expiry < 0 { return nil, ErrInvalidPoolExpiry } else if expiry == 0 { opts.ExpiryDuration = DefaultCleanIntervalTime // 默认间隔时间1s } } if opts.Logger == nil { opts.Logger = defaultLogger } p := &Pool{ capacity: int32(size), lock: syncx.NewSpinLock(),//自旋锁 options: opts, } p.workerCache.New = func() interface{} { //sync.pool 初始化 return &goWorker{ pool: p, task: make(chan func(), workerChanCap), } } if p.options.PreAlloc { if size == -1 { return nil, ErrInvalidPreAllocSize } p.workers = newWorkerArray(loopQueueType, size) //循环队列 } else { p.workers = newWorkerArray(stackType, 0) //数组 } p.cond = sync.NewCond(p.lock) // sync.cond初始化 p.goPurge() p.goTicktock() return p, nil } 自旋锁SpinLock(重点)
思考:如何设计一种自旋锁,设计自旋锁时需要注意什么?
spinLock是基于CAS机制和指数退避算法实现的一种自旋锁 package sync import ( "runtime" "sync" "sync/atomic" ) type spinLock uint32 // 实现sync.Locker接口 const maxBackoff = 16 //最大的回避次数 func (sl *spinLock) Lock() { backoff := 1 // 基于CAS机制,尝试获取锁,且使用指数退避算法来提供获取锁的成功率 for !atomic.CompareAndSwapUint32((*uint32)(sl), 0, 1) { // Leverage the exponential backoff algorithm, see https://en.wikipedia.org/wiki/Exponential_backoff. for i := 0; i < backoff; i++ { //runtime.Gosched()函数功能:使当前goroutine让出CPU时间片("回避"),让其他的goroutine获得执行的机会。当前的goroutine会在未来的某个时间点继续运行。 //注意:当一个goroutine发生阻塞,Go会自动地把与该goroutine处于同一系统线程的其他goroutines转移到另一个系统线程上去,以使这些goroutines不阻塞(从GMP模型角度来说,就是当与P绑定的M发生阻塞,P就与其解绑,然后与另一个空闲的M进行绑定 或者 去创建一个M进行绑定)。 runtime.Gosched() } if backoff < maxBackoff { backoff <<= 1 } } } func (sl *spinLock) Unlock() { //原子操作,并发安全 atomic.StoreUint32((*uint32)(sl), 0) } // NewSpinLock instantiates a spin-lock. func NewSpinLock() sync.Locker { return new(spinLock) }
sync.Locker
设计锁时必须实现该接口中的方法 // A Locker represents an object that can be locked and unlocked. type Locker interface { Lock() Unlock() }
关键知识点 sync.Locker接口 指数退避算法 atomic 原子包中的方法了解 runtime.Gosched() 3、任务执行goWorker的结构// goWorker is the actual executor who runs the tasks, // it starts a goroutine that accepts tasks and // performs function calls. //goWorker 是运行任务的实际执行者,它启动一个 goroutine 来接受任务并执行函数调用。 type goWorker struct { // pool who owns this worker. pool *Pool // 拥有worker的协议池 // task is a job should be done. task chan func() // 需要执行的任务,注意:该chan 可能是缓存区或者非缓存区,如果是多核的话,缓存区的大小是1 // recycleTime will be updated when putting a worker back into queue. recycleTime time.Time // 回收时间 }goWoker的初始化
goWorker 是sync.pool 对象池创建的;初始化在Pool创建时 p.workerCache.New = func() interface{} { return &goWorker{ pool: p, task: make(chan func(), workerChanCap), } }workerChanCap的值// workerChanCap determines whether the channel of a worker should be a buffered channel // to get the best performance. Inspired by fasthttp at // https://github.com/valyala/fasthttp/blob/master/workerpool.go#L139 workerChanCap = func() int { // Use blocking channel if GOMAXPROCS=1. // This switches context from sender to receiver immediately, // which results in higher performance (under go1.5 at least). if runtime.GOMAXPROCS(0) == 1 { return 0 } // Use non-blocking workerChan if GOMAXPROCS>1, // since otherwise the sender might be dragged down if the receiver is CPU-bound. return 1 }()Task 任务提交func (p *Pool) Submit(task func()) error { if p.IsClosed() { // 前置检查 协程池是否关闭 return ErrPoolClosed } var w *goWorker if w = p.retrieveWorker(); w == nil { //获取一个可用的worker取执行任务 return ErrPoolOverload } w.task <- task return nil }retrieveWorker (可用worker获取)// retrieveWorker returns an available worker to run the tasks. func (p *Pool) retrieveWorker() (w *goWorker) { spawnWorker := func() { // 使用sync.pool 创建worker w = p.workerCache.Get().(*goWorker) w.run() } p.lock.Lock() // 自旋锁 加锁 w = p.workers.detach() // 尝试从worker池子中获取可用的worker,注:任务执行完后,会回收worker 以便下次使用 if w != nil { // first try to fetch the worker from the queue 获取到返回 p.lock.Unlock() } else if capacity := p.Cap(); capacity == -1 || capacity > p.Running() { // 判断正在运行的worker 是否超过 pool协程词设置的容量大小,注 capacity=-1 表示池子容量无限大 // if the worker queue is empty and we don"t run out of the pool capacity, // then just spawn a new worker goroutine. p.lock.Unlock() spawnWorker() // 创建worker } else { // otherwise, we"ll have to keep them blocked and wait for at least one worker to be put back into pool. if p.options.Nonblocking { // 判断协程池是否 是非阻塞模式 ,如果是非阻塞模式下就直接返回 p.lock.Unlock() return } retry: // 阻塞模式下的逻辑 // 判断阻塞的任务数量是否超过了设置的最大阈值,如果超过直接返回 if p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks { p.lock.Unlock() return } p.addWaiting(1) // 阻塞任务数量加1 p.cond.Wait() // block and wait for an available worker p.addWaiting(-1) // 获取到可用的worker后,阻塞数量减一 if p.IsClosed() { // check pool池是否关闭 p.lock.Unlock() return } var nw int if nw = p.Running(); nw == 0 { // 如果正在执行的worker数量为0时,需要重新创建woker p.lock.Unlock() spawnWorker() return } if w = p.workers.detach(); w == nil { //从workerArray中获取可用的worker if nw < p.Cap() { // 获取不到,判断正在运行的goroutines的数量是否超过协层池的容量,没有就创建 p.lock.Unlock() spawnWorker() return } goto retry // goto 重试阻塞模式下获取可用worker的逻辑 } p.lock.Unlock() } return } WorkerArray 工作池的结构type workerArray interface { len() int // 长度 isEmpty() bool // 是否为空 insert(worker *goWorker) error // 插入 detach() *goWorker // 从WorkerArray获取可用的worker retrieveExpiry(duration time.Duration) []*goWorker //清道夫调用pool.worker中的此方法来清理pool.workers中的过期worker reset() // 重置,清空WorkerArray中所有的Worker }
workerArray 接口的实现 workerStack 和 loopQueue 任务执行// run starts a goroutine to repeat the process // that performs the function calls. func (w *goWorker) run() { w.pool.addRunning(1) // pool的running 加 一 go func() { defer func() { w.pool.addRunning(-1) w.pool.workerCache.Put(w) if p := recover(); p != nil { if ph := w.pool.options.PanicHandler; ph != nil { ph(p) } else { w.pool.options.Logger.Printf("worker exits from a panic: %v ", p) var buf [4096]byte n := runtime.Stack(buf[:], false) w.pool.options.Logger.Printf("worker exits from panic: %s ", string(buf[:n])) } } // Call Signal() here in case there are goroutines waiting for available workers. w.pool.cond.Signal() }() for f := range w.task { if f == nil { return } f() // 任务执行 if ok := w.pool.revertWorker(w); !ok { // 回收woker return } } }() }
revertWorker worker回收
逻辑简单:1、往workerArray 队列中插入;2、通知正在阻塞获取worker的goroutines // revertWorker puts a worker back into free pool, recycling the goroutines. func (p *Pool) revertWorker(worker *goWorker) bool { if capacity := p.Cap(); (capacity > 0 && p.Running() > capacity) || p.IsClosed() { p.cond.Broadcast() return false } worker.recycleTime = p.nowTime() p.lock.Lock() // To avoid memory leaks, add a double check in the lock scope. // Issue: https://github.com/panjf2000/ants/issues/113 if p.IsClosed() { p.lock.Unlock() return false } err := p.workers.insert(worker) if err != nil { p.lock.Unlock() return false } // Notify the invoker stuck in "retrieveWorker()" of there is an available worker in the worker queue. p.cond.Signal() p.lock.Unlock() return true } 定时清理过期的workerfunc (p *Pool) goPurge() { if p.options.DisablePurge { return } // Start a goroutine to clean up expired workers periodically. var ctx context.Context ctx, p.stopPurge = context.WithCancel(context.Background()) go p.purgeStaleWorkers(ctx) } // purgeStaleWorkers clears stale workers periodically, it runs in an inpidual goroutine, as a scavenger. func (p *Pool) purgeStaleWorkers(ctx context.Context) { ticker := time.NewTicker(p.options.ExpiryDuration) defer func() { ticker.Stop() atomic.StoreInt32(&p.purgeDone, 1) }() for { select { case <-ctx.Done(): return case <-ticker.C: } if p.IsClosed() { break } p.lock.Lock() expiredWorkers := p.workers.retrieveExpiry(p.options.ExpiryDuration) p.lock.Unlock() // Notify obsolete workers to stop. // This notification must be outside the p.lock, since w.task // may be blocking and may consume a lot of time if many workers // are located on non-local CPUs. for i := range expiredWorkers { expiredWorkers[i].task <- nil expiredWorkers[i] = nil } // There might be a situation where all workers have been cleaned up(no worker is running), // or another case where the pool capacity has been Tuned up, // while some invokers still get stuck in "p.cond.Wait()", // then it ought to wake all those invokers. if p.Running() == 0 || (p.Waiting() > 0 && p.Free() > 0) { p.cond.Broadcast() } } } 4、总结
至此,ants源码简单分析完毕;整体框架结构不太复杂,阅读起来也很轻松;但源码中有很多知识点需要我们花些时间去理解,比如 自旋锁的设计、sync.cond 条件变量 、sync.pool 对象池,原子操作,channel通信等,建议大家多多阅读。 5、参考资料
指数退避算法: https://en.wikipedia.org/wiki/Exponential_backoff
春耕正当时多地按下备耕启动键这段时间,广东青海重庆等地抢抓春季农业生产有利天时,按下春耕备耕启动键。广东河源春耕正当时处处显生机在广东河源龙川县佗城镇,农户们忙着备农资修翻地建沟渠,用辛勤的双手绘就属于他们的
63岁齐秦定居云南养老,秃头大肚腩成糙老头,与王祖贤差距太大了近日,有网友曝光一组齐秦与朋友吃火锅的照片。63岁的齐秦变化挺大的。大肚腩凸起,头发变少,发际线很高,露出锃亮的额头,还逐渐呈现秃顶的趋势另外,他皮肤黝黑,黑到发光,简直像非洲人一
这就开始了,韩国检方要动文在寅心腹,韩第一夫人也被拖下水一直以来,韩国政坛的走向备受瞩目,特别是在韩国总统尹锡悦上台之后,韩国两党的之间的斗争,更是达到了一个新高度。近期,韩国最大在野党共同民主党党首李在明的一则新消息,立刻成了各大媒体
福建漳平十里樱花开美丽经济来央视网消息这几天,在福建省漳平市永福镇的樱花茶园内,十万株樱花陆续绽放,青翠含黛满园绯红。总台记者黄珊万亩茶山青,十里樱花红。欢迎大家跟随着春天的脚步,来到漳平市永福镇的樱花茶园。
交广会客厅窗口外爱的延伸当清晨第一缕阳光穿过窗户照进呼和浩特站峻屹旅客服务中心,不足十平方米的小房间里,客运员马燕君翻看着旅客留言簿,脸上洋溢着幸福的滋味。为更好地服务旅客,马燕君身处的峻屹旅客服务中心,
深度丨激辩存量房贷利率能否下调本轮提前还贷潮向何处去?21世纪经济报道记者唐婧北京报道房贷利率进入3字头时代,在6以上高位站岗的购房者开始用提前还贷的方式进行自救,有关存量房贷利率调降的呼声也是日益渐隆。房贷利率由5年期LPR和银行加
西贝公布2023年货节成绩单整体营收增长103。2近日,为期一个多月的西贝年货节宣告结束。西贝餐饮集团发布了2023年货节的零售成绩单,数据显示,2023年西贝年货节依旧延续了高增长态势,整体营业额较2022年增长103。2。在京
字里行间的文明密码宋镇豪世界记忆名录中的甲骨文遗产编者按文字的出现,是人类进入文明时代的主要标志。对于有着五千年文明史的中国来说,古文字的研究有着特别重要的意义。2023年2月17日,安阳殷墟,写意中国探寻汉字起源活动开启,寻访汉
狂飙孟钰不配,高启兰不懂,只有小五才是真正懂安欣的人安欣作为狂飙中的绝对主角,其中他与高启强的主线故事异彩纷呈,让无数观众沉迷其中,但除此之外,他与三位女性的情感纠葛作为电视剧的另一个发展脉络也包含了许多细节。青梅竹马一起长大的孟钰
深圳创新人才培养机制托起幸福夕阳红人才是高质量发展的第一资源。为有效破解养老服务人才专业性不强流动性大结构不合理等发展瓶颈,近年来,深圳市紧抓双区建设双改示范重大历史机遇,探索构建了一条政校行企四方联动产学研用立体
中央一号文件,合村并镇,你准备好了吗?2023年中央一号文件颁布,合村并镇立刻成为群众关注的焦点,但是老丁经过对中央一号文件的解读,合村并镇并不是一号文件的重点,振兴乡村才是一号文件的重中之重,众所周知,乡村是中国最广
一口气进了4台法国ATOLL?访上海专业录音师资深烧友须达威先生法国珊瑚礁ATOLL用户访谈录俗话说事不过三,发烧友都是花心的,很少会重复购买同一品牌的器材,尤其是在短时间内。但上海资深发烧友须达威先生在不到一年的时间里一口气购买了4台法国AT
设计女式连体衣体现了美吗?怎么觉得不方便好内容我来评连体衣通常是一种流行的女性服装,由于它的紧身贴身的特性以及没有分开的上下部分,会使女性在上厕所时感到不方便。但是,女性穿连体衣的喜好与穿着习惯身材特点个人审美观点等因素
环西部火车游重整行装再出发来源人民铁道网提振旅游市场精气神激发旅游经济新活力环西部火车游重整行装再出发人民铁道网讯(强科康娜吕晓娇)3月18日0时30分,中国铁路兰州局集团有限公司开出Y214次首趟环西部火
河北张家口缅怀革命先烈传承红色基因本网河北讯(通讯员张奋兰编辑张立斌)一个有希望的民族不能没有英雄,一个有前途的国家不能没有先锋。2023年3月6日,中国红色先锋队河北省张家口市志愿者在宣化烈士陵园,开展缅怀革命先
东方财富交易软件一日两连崩去年信息技术服务营收逾46亿券茅东方财富(300059)交易软件崩上两次热搜。3月21日上午,东方财富交易软件出现宕机,不少投资者在社交媒体反映无法正常登录和交易。10时30分左右,东方财富交易软件恢复正常。
东方财富软件崩了一整天,拉萨天团难入股市?波及数万股民正常交易本文来源时代周报作者崔鹏志崩了的炒股软件把数万股民挡在门外。3月21日早盘,东方财富(300059。SZ)交易软件出现故障。据多名网友反馈,东方财富APP无法正常登录和委托。尝试登
毛孔粗大,真的无法逆袭吗?毛孔为什么会变得粗大毛孔粗大,真的无法逆袭吗?你的毛孔粗大情况是哪种情况除了医美,毛孔一旦被撑大,就无法变小确实没错,目前除了医美手段毛孔没法通过其他肉眼可见的办法去缩小!而且医美
欧美明星8部好片帕翠西娅维拉奎兹出生日期1971年01月31日出生地委内瑞拉,瓜希拉身高1。77m职业演员模特英文名PatriciaCarolaVelasquezSemprun(本名)家庭成员JenniferKr
大卷发这么搭,也太美太复古了,简直美翻!春天阳光正暖,我们在初春时节搭配衣装不仅要体现出季节特点,还应该展现出女性优雅的身姿和个性。要说最能凸显出女人味的发型,那一定是大卷波浪,最近就发现一位把大卷波浪搭配的非常好看的博
小米MIOS操作系统重启能否超越华为HarmonyOS?头条创作挑战赛科技界一直在传言小米正在开发的操作系统MIOS。最近,推特上的内部人士开始发布关于MIOS屏幕的截图,屏幕内容上有MIOS的标志。这次泄露引起了许多人的兴趣,虽然它看
华为自研MetaERP系统即将上线!有望推动国产ERP发展华为自研MetaERP系统即将上线!有望推动国产ERP发展!但是目前没有表明推向市场,而是供内部使用。这里科普一下ERP,也就是EnterpriseResourcePlanning