自适应限流以及四种经典限流算法
前言
在分布式系统中,如果某个服务节点发生故障或者网络发生异常,都有可能导致调用方被阻塞等待,如果超时时间设置很长,调用方资源很可能被耗尽。这又导致了调用方的上游系统发生资源耗尽的情况,最终导致系统雪崩。
要防止系统发生雪崩,就必须要有容错设计。如果遇到突增流量,一般的做法是对非核心业务功能采用熔断和服务降级的措施来保护核心业务功能正常服务,而对于核心功能服务,则需要采用限流的措施。
相信你看完本篇文章,一定能够对系统容错的常见策略—— 限流、熔断、降级 有更深的理解和体会。
如果对同学有帮助的话,麻烦三连哦,不胜感激!!! 概述
2.1 熔断(客户端)
在服务的依赖调用中,被调用方出现故障时,出于自我保护的目的,调用方会主动停止调用,并根据业务需要进行相应处理。调用方这种主动停止调用的行为我们称之为熔断。为什么要熔断?
假定服务A依赖服务B,当服务B处于正常状态,整个调用是健康的,服务A可以得到服务B的正常响应。当服务B出现故障时,比如响应缓慢或者响应超时,如果服务A继续请求服务B,那么服务A的响应时间也会增加,进而导致服务A响应缓慢。如果服务A不进行熔断处理,服务B的故障会传导至服务A,最终导致服务A也不可用。
2.2 限流(服务端)
限流是针对服务请求数量的一种自我保护机制,当请求数量超出服务的处理能力时,会自动丢弃新来的请求。
为什么要限流?
任何一个系统的处理能力都是有极限的,假定服务A的处理能力为QPS=100,当QPS<100时服务A可以提供正常的服务。当QPS>100时,由于请求量增大,会出现争抢服务资源的情况(数据库连接、CPU、内存等),导致服务A处理缓慢;当QPS继续增大时,可能会造成服务A响应更加缓慢甚至奔溃。如果不进行限流控制,服务A始终会面临着被大流量冲击的风险。做好系统请求流量的评估,制定合理的限流策略,是我们进行系统高可用保护的第一步。
2.3 降级
降级是通过开关配置将某些不重要的业务功能屏蔽掉,以提高服务处理能力。在大促场景中经常会对某些服务进行降级处理,大促结束之后再进行复原。
为什么要降级?
在不影响业务核心链路的情况下,屏蔽某些不重要的业务功能,可以节省系统的处理时间,提供系统的响应能力,在服务器资源固定的前提下处理更多的请求。
源码拆解和分析
3.1 熔断
无论是令牌桶、漏桶还是自适应限流的方法,总的来说都是服务端的单机限流方式。虽然服务端限流虽然可以帮助我们抗住一定的压力,但是拒绝请求毕竟还是有成本的。如果我们的本来流量可以支撑 1w qps,加了限流可以支撑在 10w qps 的情况下,仍然可以提供 1w qps 的有效请求,但是流量突然再翻了 10 倍,来到 100w qps 那么服务该挂还是得挂。
所以我们的可用性建设不仅仅是服务端做建设就可以万事大吉了,得在整个链路上的每个组件都做好自己的事情才行,今天我们就来一起看一下客户端上的限流措施:熔断。
熔断器存在三种状态: 关闭(closed) : 关闭状态下没有触发断路保护,所有的请求都正常通行 打开(open) : 当错误阈值触发之后,就进入开启状态,这个时候所有的流量都会被节流,不允许通行 半打开(half-open) : 处于打开状态一段时间之后,会尝试尝试放行一个流量来探测当前 server 端是否可以接收新流量,如果这个没有问题就会进入关闭状态,如果有问题又会回到打开状态
3.1.1 方案对比hystrix-goGoogle SRE保护算法
hystrix-go
Hystrix 是由 Netflex 开发的一款开源组件,提供了基础的熔断功能。 Hystrix 将降级的策略封装在 Command 中,提供了 run 和 fallback 两个方法,前者表示正常的逻辑,比如微服务之间的调用……,如果发生了故障,再执行 fallback 方法返回结果,我们可以把它理解成保底操作。如果正常逻辑在短时间内频繁发生故障,那么可能会触发短路,也就是之后的请求不再执行 run, 而是直接执行 fallback。
hystrix-go 则是用 go 实现的 hystrix 版,更确切的说,是简化版。只是上一次更新还是 2018 年 的一次 pr, 也就毕业了?
使用方法
hystric实现熔断一般包括两步:
第一步:配置熔断规则
第二部:设置熔断逻辑
一个简单的:// 第一步:配置熔断规则 hystrix.ConfigureCommand("wuqq", hystrix.CommandConfig{ Timeout: int(3 * time.Second), MaxConcurrentRequests: 10, SleepWindow: 5000, RequestVolumeThreshold: 10, ErrorPercentThreshold: 30, }) // 第二步:设置熔断逻辑 // Do是异步,Go是同步 _ = hystrix.Do("wuqq", func() error { // talk to other services _, err := http.Get("https://www.baidu.com/") if err != nil { fmt.Println("get error:%v",err) return err } return nil }, func(err error) error { fmt.Printf("handle error:%v ", err) return nil })
Do 函数需要三个参数,第一个参数 commmand 名称,你可以把每个名称当成一个独立当服务,第二个参数是处理正常的逻辑,比如 http 调用服务,返回参数是 err。如果处理调用失败,那么就执行第三个参数逻辑, 我们称为保底操作。由于服务错误率过高导致熔断器开启,那么之后的请求也直接回调此函数。
配置参数含义:
Timeout : 执行 command 的超时时间。
MaxConcurrentRequests :command 的最大并发量 。
SleepWindow :当熔断器被打开后,SleepWindow 的时间就是控制过多久后去尝试服务是否可用了。
RequestVolumeThreshold : 一个统计窗口 10 秒内请求数量。达到这个请求数量后才去判断是否要开启熔断
ErrorPercentThreshold :错误百分比,请求数量大于等于 RequestVolumeThreshold 并且错误率到达这个百分比后就会启动熔断
核心实现
核心实现的方法是 AllowRequest,IsOpen判断当前是否处于熔断状态,allowSingleTest就是去看是否过了一段时间需要重新进行尝试func (circuit *CircuitBreaker) AllowRequest() bool { return !circuit.IsOpen() || circuit.allowSingleTest() }
IsOpen先看当前是否已经打开了,如果已经打开了就直接返回就行了,如果还没打开就去判断请求数量是否满足要求请求的错误率是否过高,如果两个都满足就会打开熔断器func (circuit *CircuitBreaker) IsOpen() bool { circuit.mutex.RLock() o := circuit.forceOpen || circuit.open circuit.mutex.RUnlock() if o { return true } if uint64(circuit.metrics.Requests().Sum(time.Now())) < getSettings(circuit.Name).RequestVolumeThreshold { return false } if !circuit.metrics.IsHealthy(time.Now()) { // too many failures, open the circuit circuit.setOpen() return true } return false }
hystrix-go已经可以比较好的满足我们的需求,但是存在一个问题就是一旦触发了熔断,在一段时间之内就会被一刀切 的拦截请求,所以我们来看看 google sre 的一个实现
Google SRE保护算法
这个算法的好处是不会直接一刀切的丢弃所有请求,而是计算出一个概率来进行判断,当成功的请求数量越少,K越小的时候计算出的概率就越大,表示这个请求被丢弃的概率越大
Kratos源码分析func (b *sreBreaker) Allow() error { // 统计成功的请求,和总的请求 success, total := b.summary() // 计算当前的成功率 k := b.k * float64(success) if log.V(5) { log.Info("breaker: request: %d, succee: %d, fail: %d", total, success, total-success) } // 统计请求量和成功率 // 如果 qps 比较小,不触发熔断 // 如果成功率比较高,不触发熔断,如果 k = 2,那么就是成功率 >= 50% 的时候就不熔断 if total < b.request || float64(total) < k { if atomic.LoadInt32(&b.state) == StateOpen { atomic.CompareAndSwapInt32(&b.state, StateOpen, StateClosed) } return nil } if atomic.LoadInt32(&b.state) == StateClosed { atomic.CompareAndSwapInt32(&b.state, StateClosed, StateOpen) } // 计算一个概率,当 dr 值越大,那么被丢弃的概率也就越大 // dr 值是,如果失败率越高或者是 k 值越小,那么它越大 dr := math.Max(0, (float64(total)-k)/float64(total+1)) drop := b.trueOnProba(dr) if log.V(5) { log.Info("breaker: drop ratio: %f, drop: %t", dr, drop) } if drop { return ecode.ServiceUnavailable } return nil } // 通过随机来判断是否需要进行熔断 func (b *sreBreaker) trueOnProba(proba float64) (truth bool) { b.randLock.Lock() truth = b.r.Float64() < proba b.randLock.Unlock() return }
熔断与failover结合的思想
一句话总结:请求先进入CircuitBreaker根据当前熔断器策略决定请求主集群或备集群,若请求主集群且主集群请求失败,则进入Failover逻辑Failover到备集群中获取数据。
3.2 限流
限流,也称流量控制。是指系统在面临高并发,或者大流量请求的情况下,限制新的请求对系统的访问,从而保证系统的稳定性。限流会导致部分用户请求处理不及时或者被拒,这就影响了用户体验。所以一般需要在系统稳定和用户体验之间平衡一下。
3.2.1 固定窗口
固定时间内对请求书进行限制,例如说每秒请求不超过50次,那就在0-1秒,1-2秒……n-n+1秒,每秒不超过50次请求。
可是会出现一个问题,在0.99秒和1.01秒分别有50次请求,对于固定窗口方法,不会限流,但是实际上在0.99秒-1.01秒,这一段不到1s的时间内已经达到了阙值的两倍,以下的滑动窗口方法可以解决这个问题。
3.2.2 滑动窗口
算法思想 滑动时间窗口算法,是从对普通时间窗口计数的优化。 使用普通时间窗口时,我们会为每个user_id/ip维护一个KV: uidOrIp: timestamp_requestCount。假设限制1秒1000个请求,那么第100ms有一个请求,这个KV变成 uidOrIp: timestamp_1,递200ms有1个请求,我们先比较距离记录的timestamp有没有超过1s,如果没有只更新count,此时KV变成 uidOrIp: timestamp_2。当第1100ms来一个请求时,更新记录中的timestamp并重置计数,KV变成 uidOrIp: newtimestamp_1 普通时间窗口有一个问题,假设有500个请求集中在前1s的后100ms,500个请求集中在后1s的前100ms,其实在这200ms没就已经请求超限了,但是由于时间窗每经过1s就会重置计数,就无法识别到此时的请求超限。 对于滑动时间窗口,我们可以把1ms的时间窗口划分成10个time slot, 每个time slot统计某个100ms的请求数量。每经过100ms,有一个新的time slot加入窗口,早于当前时间100ms的time slot出窗口。窗口内最多维护10个time slot,储存空间的消耗同样是比较低的。适用场景 与令牌桶一样,有应对突发流量的能力go语言实现 主要就是实现sliding window算法。可以参考Bilibili开源的kratos框架里circuit breaker用循环列表保存time slot对象的实现,他们这个实现的好处是不用频繁的创建和销毁time slot对象。下面给出一个简单的基本实现:package main import ( "fmt" "sync" "time" ) var winMu map[string]*sync.RWMutex func init() { winMu = make(map[string]*sync.RWMutex) } type timeSlot struct { timestamp time.Time // 这个timeSlot的时间起点 count int // 落在这个timeSlot内的请求数 } func countReq(win []*timeSlot) int { var count int for _, ts := range win { count += ts.count } return count } type SlidingWindowLimiter struct { SlotDuration time.Duration // time slot的长度 WinDuration time.Duration // sliding window的长度 numSlots int // window内最多有多少个slot windows map[string][]*timeSlot maxReq int // win duration内允许的最大请求数 } func NewSliding(slotDuration time.Duration, winDuration time.Duration, maxReq int) *SlidingWindowLimiter { return &SlidingWindowLimiter{ SlotDuration: slotDuration, WinDuration: winDuration, numSlots: int(winDuration / slotDuration), windows: make(map[string][]*timeSlot), maxReq: maxReq, } } // 获取user_id/ip的时间窗口 func (l *SlidingWindowLimiter) getWindow(uidOrIp string) []*timeSlot { win, ok := l.windows[uidOrIp] if !ok { win = make([]*timeSlot, 0, l.numSlots) } return win } func (l *SlidingWindowLimiter) storeWindow(uidOrIp string, win []*timeSlot) { l.windows[uidOrIp] = win } func (l *SlidingWindowLimiter) validate(uidOrIp string) bool { // 同一user_id/ip并发安全 mu, ok := winMu[uidOrIp] if !ok { var m sync.RWMutex mu = &m winMu[uidOrIp] = mu } mu.Lock() defer mu.Unlock() win := l.getWindow(uidOrIp) now := time.Now() // 已经过期的time slot移出时间窗 timeoutOffset := -1 for i, ts := range win { if ts.timestamp.Add(l.WinDuration).After(now) { break } timeoutOffset = i } if timeoutOffset > -1 { win = win[timeoutOffset+1:] } // 判断请求是否超限 var result bool if countReq(win) < l.maxReq { result = true } // 记录这次的请求数 var lastSlot *timeSlot if len(win) > 0 { lastSlot = win[len(win)-1] if lastSlot.timestamp.Add(l.SlotDuration).Before(now) { lastSlot = &timeSlot{timestamp: now, count: 1} win = append(win, lastSlot) } else { lastSlot.count++ } } else { lastSlot = &timeSlot{timestamp: now, count: 1} win = append(win, lastSlot) } l.storeWindow(uidOrIp, win) return result } func (l *SlidingWindowLimiter) getUidOrIp() string { return "127.0.0.1" } func (l *SlidingWindowLimiter) IsLimited() bool { return !l.validate(l.getUidOrIp()) } func main() { limiter := NewSliding(100*time.Millisecond, time.Second, 10) for i := 0; i < 5; i++ { fmt.Println(limiter.IsLimited()) } time.Sleep(100 * time.Millisecond) for i := 0; i < 5; i++ { fmt.Println(limiter.IsLimited()) } fmt.Println(limiter.IsLimited()) for _, v := range limiter.windows[limiter.getUidOrIp()] { fmt.Println(v.timestamp, v.count) } fmt.Println("a thousand years later...") time.Sleep(time.Second) for i := 0; i < 7; i++ { fmt.Println(limiter.IsLimited()) } for _, v := range limiter.windows[limiter.getUidOrIp()] { fmt.Println(v.timestamp, v.count) } }
3.2.3 漏桶
算法思想 与令牌桶是"反向"的算法,当有请求到来时先放到木桶中,worker以固定的速度从木桶中取出请求进行响应。如果木桶已经满了,直接返回请求频率超限的错误码或者页面适用场景 流量最均匀的限流方式,一般用于流量"整形",例如保护数据库的限流。先把对数据库的访问加入到木桶中,worker再以db能够承受的qps从木桶中取出请求,去访问数据库。不太适合电商抢购和微博出现热点事件等场景的限流,一是应对突发流量不是很灵活,二是为每个user_id/ip维护一个队列(木桶),workder从这些队列中拉取任务,资源的消耗会比较大。go语言实现 通常使用队列来实现,在go语言中可以通过buffered channel来快速实现,任务加入channel,开启一定数量的worker从channel中获取任务执行。package main import ( "fmt" "sync" "time" ) // 每个请求来了,把需要执行的业务逻辑封装成Task,放入木桶,等待worker取出执行 type Task struct { handler func() Result // worker从木桶中取出请求对象后要执行的业务逻辑函数 resChan chan Result // 等待worker执行并返回结果的channel taskID int } // 封装业务逻辑的执行结果 type Result struct { } // 模拟业务逻辑的函数 func handler() Result { time.Sleep(300 * time.Millisecond) return Result{} } func NewTask(id int) Task { return Task{ handler: handler, resChan: make(chan Result), taskID: id, } } // 漏桶 type LeakyBucket struct { BucketSize int // 木桶的大小 NumWorker int // 同时从木桶中获取任务执行的worker数量 bucket chan Task // 存方任务的木桶 } func NewLeakyBucket(bucketSize int, numWorker int) *LeakyBucket { return &LeakyBucket{ BucketSize: bucketSize, NumWorker: numWorker, bucket: make(chan Task, bucketSize), } } func (b *LeakyBucket) validate(task Task) bool { // 如果木桶已经满了,返回false select { case b.bucket <- task: default: fmt.Printf("request[id=%d] is refused ", task.taskID) return false } // 等待worker执行 <-task.resChan fmt.Printf("request[id=%d] is run ", task.taskID) return true } func (b *LeakyBucket) Start() { // 开启worker从木桶拉取任务执行 go func() { for i := 0; i < b.NumWorker; i++ { go func() { for { task := <-b.bucket result := task.handler() task.resChan <- result } }() } }() } func main() { bucket := NewLeakyBucket(10, 4) bucket.Start() var wg sync.WaitGroup for i := 0; i < 20; i++ { wg.Add(1) go func(id int) { defer wg.Done() task := NewTask(id) bucket.validate(task) }(i) } wg.Wait() }
末日时钟末日时钟是一个纪念物,它象征着人类毁灭的可能性。它于1947年由美国科学家及公共政治家罗伯特奥尔德林创立,旨在提醒人们核战争和大规模杀戮的危险。末日时钟的指针每年都会被调整,根据人
5000mAh67WIMX766,12GB256GB仅1739元,小米高性价比名不虚传只要我们提到性价比这三个字,永远都离不开小米这家手机厂商,几乎每一场发布会上,小米都会把性价比挂在嘴边,而从实际发布的手机配置来看,小米性价比的确很高。不管你是追求旗舰手机次旗舰手
律斗云CEO孙霞律师一定要做自己的品牌!(1)律师做品牌能带来什么?一直专注于律师及律所网络营销获客的律斗云创始人孙霞表示,现在的客户非常聪明,他们同时咨询十几位律师,从这十几位律师中选出三到五位律师进行线下咨询,最终选
研究提出日本水泥和混凝土到2050年实现净零排放的策略7月18日,自然通讯(NatureCommunications)发表题为有效利用水泥和混凝土减少对供应侧技术的依赖以实现净零排放(EfficientUseofCementandCo
腾讯停运的老游戏又上架Steam,结果BUG频出汉化敷衍外挂满天大家好,这里是正惊游戏,我是正惊小弟今天小弟要给大家说一款腾讯运营了10年以上的FPS网游。作为最早一批使用虚幻3引擎开发的网游,画面表现秒杀当时市场中众多精品,网传甚至穿越火线都
人工智能解剖学QuoteSupplychainisthismixofleftbrain,rightbrain,heavyanalyticswithalotofgutinstinctandpeop
韩国拟投资67亿元,将2023年作为旅游大国元年(观察者网讯)据韩联社报道,韩国文化体育观光部(文体部)计划在旅游业投入1。2295万亿韩元(约合人民币67亿元),开启将韩国发展成为旅游大国的元年。报道称,韩国文体部近日与韩国观
最新通知避免前往这些地方旅游1月24日下午文旅部官方政务新媒体平台发布关于进一步加强春节期间旅游安全工作的通知各省自治区直辖市文化和旅游厅(局),新疆生产建设兵团文化体育广电和旅游局2023年春节假期进入第三
云南旅游去有风的地方偶遇刘亦菲吧!云南烟火气,舍不得公开的烟火古镇谈笑风生,闲情漫步,和本地人一日三餐不用讨好世界我在这里旅居了四季,只有小住才能感受到云南村寨深处人文风情。这里真的自带疗愈属性治愈来往的旅人在不同
组图大年初四三亚大东海游客如潮1月25日,大年初四的三亚大东海景区,游客如潮。阳光沙滩大海,吸引游客休闲度假过春节。海南日报特约记者孙清摄影报道1月25日,大年初四的三亚大东海景区,游客嗨玩海上娱乐项目。海南日
2023年春节前三天,曲靖旅游市场人气爆棚,共接待游客70。57万人次012023年春节前三天,全市累计接待游客70。57万人,同比增长59。10,实现旅游收入46477。34万元,同比增长31。64。其中14家4A级景区16家3A级景区共接待游客6