分布式架构:限流算法的分析与实现
一、限流的关键作用
对于大型互联网架构中,限流的设计是必不可少的一个环节。在给定的时间内, 客户端请求次数过多, 服务器就会拦截掉部分请求,避免请求流量过大造成数据库负载高的问题。
二、常见限流算法利弊分析
计数器限流
计数器限流主要有固定窗口计数器和滑动窗口计数器。固定窗口计数器即:在单位时间内请求数达到了所限定的数量时,请求如需不被拦截则需要等待下一个单位时间开始;滑动窗口计数器:在单位时间内请求数达到了所限定的数量时,当前时刻的请求会被拦截,随时间窗口的滑动计数器数量会变化,当计数器数量小于限定数量时请求正常执行。
常规计数器限流
常规计数器限流是指在一个时间段内允许一定数量的请求执行,超过最大限制则会阻止请求直到超过当前时间段为止。如上图所示,10s内限制1000个请求,在第11s的时候计数器会从0重新开始计数。
常规计数器限流瞬时流量问题
如上图所示,常规计数器模式下,在第9.9s的时候执行了1000请求,在第10.1s时计数器已清0,此时又有1000请求到来,这样相当于在0.2s的时间内有2000请求,显然违背了限流的初衷。
滑动窗口计数器限流
滑动窗口计数器限流是在计数器限流的基础上将固定的时间段划分为若干个时间窗口,随着时间的推移,保持时间段内的滑动窗口个数,在常规计数器限流的基础上避免了瞬时流量对服务器的压力。如上图所示,0-3s内有600请求,8-13s有700请求,当第16s时新增500请求会触发限流。
计数器限流突刺现象
特点:如果在当前时间窗口最后半秒请求数突然达到最大限制,半秒后进入下一个时间窗口开始,如果请求继续在半秒内达到周期上限,则相当于1秒内请求达到2倍的限制请求数;如果60s为一个周期,在第10s的时候服务器已处理完请求,在计数器限流模式下会使得服务器空闲50s无法处理请求。
计数器限流原理简单,实现比较容易,但是也有一个痛点问题就是它的突刺现象,如上图所示,10s限制1000请求,到第2s时已达请求上限,那么在第3-10s内的请求将会持续拒绝,在服务器资源空闲的状态下会造成极大的浪费。
漏桶限流
请求进入到漏桶中,漏桶以固定的速度流出,当访问频率超过接口响应频率流速过大时拒绝请求,可以看到漏桶相当于一个队列,进队的速率不受限制,出队是固定速率。
特点:由于出水速率是固定的,当突发大流量时会导致大量请求被限制,无法处理。
漏桶限流,请求进入漏桶不受限制,并以固定的速率流出,当桶满并且当前流入的请求大于当前流出的请求时,限制请求。漏桶限流解决了计数器限流模式下流量突刺的问题,当服务器处理完请求后,只要能从漏桶中流出请求则能继续处理,不会造成长时间等待拒绝请求。
漏桶限流突发流量问题
漏桶限流突发流量问题,如上图所示,漏桶满后此时大量请求到来,由于服务器已扩容可以满足请求处理,但是漏桶会拒绝大量请求,导致无法应对突发流量问题。
令牌桶限流
令牌token以固定的速率向桶中放入令牌直至桶满,在执行请求前需要先从桶中获取令牌,形式上也相当于队列,入队以固定速率,出对不受限制,这点与漏桶刚好相反。
特点:可以应对突发流量,只要桶中有令牌即可执行请求。
三、golang语言层面实现限流算法
简单计数器限流
package?main
import?(
????"fmt"
????"sync"
????"time"
)
//?CounterLimiter?简单计数器限流
type?CounterLimiter?struct?{
????Interval?int64?????//?重新计数时间
????LastTime?time.Time?//?上一次请求时间
????MaxCount?int???????//?最大计数
????Lck??????*sync.Mutex
????ReqCount?int?//?目前的请求数
}
?
//?NewCounterLimiter?初始化简单计数器限流
func?NewCounterLimiter(interval?int64,?maxCount?int)?*CounterLimiter?{
????return?&CounterLimiter{
????????Interval:?interval,
????????LastTime:?time.Now(),
????????MaxCount:?maxCount,
????????Lck:??????new(sync.Mutex),
????????ReqCount:?0,
????}
}
?//?counterLimit?简单计数器限流实现
func?(r?*CounterLimiter)?counterLimit()?bool?{
????r.Lck.Lock()
????defer?r.Lck.Unlock()
????now?:=?time.Now()
????if?now.Unix()-r.LastTime.Unix()?>?r.Interval?{
????????r.LastTime?=?now
????????r.ReqCount?=?0
????}
????if?r.ReqCount?
滑动窗口计数器限流
package?main
?import?(
????"fmt"
????"sync"
????"time"
)
//?SlidingWindowLimiter?滑动窗口计数器限流
type?SlidingWindowLimiter?struct?{
????Interval????int64???????//?总计数时间
????LastTime????time.Time???//?上一个窗口时间
????Lck?????????*sync.Mutex?//?锁
????WinCount????[]int64?????//?窗口中请求当前数量
????TicketSize??int64???????//?窗口最大容量
????TicketCount?int64???????//?窗口个数
????CurIndex????int64???????//?目前使用的窗口下标
}
?//?NewSlidingWindowLimiter?初始化滑动窗口计数器限流
func?NewSlidingWindowLimiter(interval?int64,?ticketCount?int64,?ticketSize?int64)?*SlidingWindowLimiter?{
????return?&SlidingWindowLimiter{
????????Interval:????interval,
????????LastTime:????time.Now(),
????????TicketSize:??ticketSize,
????????TicketCount:?ticketCount,
????????WinCount:????make([]int64,?ticketSize,?ticketSize),
????????CurIndex:????0,
????????Lck:?????????new(sync.Mutex),
????}
}
?//?slidingCounterLimit?滑动窗口计数器限流实现
func?(r?*SlidingWindowLimiter)?slidingCounterLimit()?bool?{
????r.Lck.Lock()
????defer?r.Lck.Unlock()
????eachTicketTime?:=?r.Interval?/?r.TicketCount
????now?:=?time.Now()
????//?如果间隔时间超过一个窗口的时间?当前窗口置0?指向下一个窗口
????if?now.Unix()-r.LastTime.Unix()?>?eachTicketTime?{
????????r.WinCount[r.CurIndex]?=?0
????????r.CurIndex?=?(r.CurIndex?+?1)?%?r.TicketCount
????????r.LastTime?=?now
????}
????fmt.Println("当前窗口:",?r.CurIndex)
????//?当前窗口未满则正常计数
????if?r.WinCount[r.CurIndex]?
漏桶限流
package?main
import?(
????"fmt"
????"sync"
????"time"
)
//?BucketLimiter?定义漏桶算法struct
type?BucketLimiter?struct?{
????Lck??????*sync.Mutex?//?锁
????Rate?????float64?????//最大速率限制
????Balance??float64?????//漏桶的余量
????Cap??????float64?????//漏桶的最大容量限制
????LastTime?time.Time???//上次检查的时间
}
?//?NewBucketLimiter?初始化BucketLimiter
func?NewBucketLimiter(rate?int,?cap?int)?*BucketLimiter?{
????return?&BucketLimiter{
????????Lck:??????new(sync.Mutex),
????????Rate:?????float64(rate),
????????Balance:??float64(cap),
????????Cap:??????float64(cap),
????????LastTime:?time.Now(),
????}
}
//?leakyBucket?漏桶算法实现
func?(r?*BucketLimiter)?leakyBucket()?bool?{
????ok?:=?false
????r.Lck.Lock()
????defer?r.Lck.Unlock()
????now?:=?time.Now()
????dur?:=?now.Sub(r.LastTime).Seconds()?//当前时间与上一次检查时间差
????r.LastTime?=?now
????water?:=?dur?*?r.Rate?//计算这段时间内漏桶流出水的流量water
????r.Balance?+=?water????//漏桶流出water容量的水,自然漏桶的余量多出water
????if?r.Balance?>?r.Cap?{
????????r.Balance?=?r.Cap
????}
????if?r.Balance?>=?1?{?//漏桶余量足够容下当前的请求
????????r.Balance?-=?1
????????ok?=?true
????}
????return?ok
}
func?main()?{
????//?初始化?限制每秒2个请求?漏洞容量为5
????r?:=?NewBucketLimiter(2,?5)
????for?i?:=?0;?i?20;?i++?{
????????ok?:=?r.leakyBucket()
????????if?ok?{
????????????fmt.Println("pass?",?i)
????????}?else?{
????????????fmt.Println("limit?",?i)
????????}
????????time.Sleep(100?*?time.Millisecond)
????}
}
令牌桶限流
package?main
import?(
????"fmt"
????"math"
????"sync"
????"time"
)
//?TokenBucket?定义令牌桶结构
type?TokenBucket?struct?{
????LastTime?time.Time?//?当前请求时间
????Capacity?float64???//?桶的容量(存放令牌的最大量)
????Rate?????float64???//?令牌放入速度
????Tokens???float64???//?当前令牌总量
????Lck??????*sync.Mutex
}
?//?NewTokenBucket?初始化TokenBucket
func?NewTokenBucket(rate?int,?cap?int)?*TokenBucket?{
????return?&TokenBucket{
????????LastTime:?time.Now(),
????????Capacity:?float64(cap),
????????Rate:?????float64(rate),
????????Tokens:???float64(cap),
????????Lck:??????new(sync.Mutex),
????}
}
//?getToken?判断是否获取令牌(若能获取,则处理请求)
func?(r?*TokenBucket)?getToken()?bool?{
????now?:=?time.Now()
????r.Lck.Lock()
????defer?r.Lck.Unlock()
????//?先添加令牌
????tokens?:=?math.Min(r.Capacity,?r.Tokens+now.Sub(r.LastTime).Seconds()*r.Rate)
????r.Tokens?=?tokens
????if?tokens?1?{
????????//?若桶中一个令牌都没有了,则拒绝
????????return?false
????}?else?{
????????//?桶中还有令牌,领取令牌
????????r.Tokens?-=?1
????????r.LastTime?=?now
????????return?true
????}
}
?func?main()?{
????//?初始化?限制每秒2个请求?令牌桶容量为5
????r?:=?NewTokenBucket(2,?5)
????for?i?:=?0;?i?20;?i++?{
????????ok?:=?r.getToken()
????????if?ok?{
????????????fmt.Println("pass?",?i)
????????}?else?{
????????????fmt.Println("limit?",?i)
????????}
????????time.Sleep(100?*?time.Millisecond)
????}
}
四、nginx限流及实现
Nginx 提供了两种限流手段:一是控制速率,二是控制并发连接数。
- 控制速率 我们需要使用 limit_req_zone 用来限制单位时间内的请求数,即速率限制,示例配置如下:
limit_req_zone?$binary_remote_addr?zone=mylimit:10m?rate=2r/s;
server?{
????location?/?{
????????limit_req?zone=mylimit;
????}
}
limit_req zone=mylimit方案: 以上配置表示,限制每个 IP 访问的速度为 2r/s,因为 Nginx 的限流统计是基于毫秒的,我们设置的速度是 2r/s,转换一下就是 500ms 内单个 IP 只允许通过 1 个请求,从 501ms 开始才允许通过第 2 个请求。 我们使用单 IP 在 10ms 内发并发送了 6 个请求的执行结果如下: 从以上结果可以看出他的执行符合我们的预期,只有 1 个执行成功了,其他的 5 个被拒绝了(第 2 个在 501ms 才会被正常执行)。 表现为对收到的请求无延时 超过访问频率则503
limit_req zone=mylimit burst=3方案: 上面的速率控制虽然很精准但是应用于真实环境未免太苛刻了,真实情况下我们应该控制一个 IP 单位总时间内的总访问次数,而不是像上面那么精确但毫秒,我们可以使用 burst 关键字开启此设置,示例配置如下:
limit_req_zone?$binary_remote_addr?zone=mylimit:10m?rate=2r/s;
server?{
????location?/?{
????????limit_req?zone=mylimit?burst=3;
????}
}
burst=3 表示每个 IP 最多允许3个突发请求,如果单个 IP 在 10ms 内发送 6 次请求的结果如下: 从以上结果可以看出,有 1 个请求被立即处理了,3 个请求被放到 burst 队列里排队执行了,另外 2被丢弃了。 超过了burst缓冲队列长度和rate处理能力的请求被直接丢弃 表现为对收到的请求有延时 所有请求排队
limit_req?zone=mylimit?burst=3?nodelay方案:
server?{
????location?/?{
????????limit_req?zone=mylimit?burst=3?nodelay;
????}
}
如果单个 IP 在 10ms 内发送 6 次请求的结果如下: 依照在limit_req_zone中配置的rate来处理请求,同时设置了一个大小为3的缓冲队列, 当请求到来时,会爆发出一个峰值处理能力,表示这3个请求立刻处理,对于峰值处理数量之外的请求,直接丢弃 缓冲队列按rate来释放 表现为对收到的请求无延时 缓冲已满则503
2. 控制并发数 这个模块用来限制单个IP的请求数。并非所有的连接都被计数,只有在服务器处理了请求并且已经读取了整个请求头时,连接才被计数。 利用 limit_conn_zone 和 limit_conn 两个指令即可控制并发数,示例配置如下:
limit_conn_zone?$binary_remote_addr?zone=perip:10m;
limit_conn_zone?$server_name?zone=perserver:10m;
server?{
????...
????limit_conn?perip?10;
????limit_conn?perserver?100;
}
其中 limit_conn perip 10 表示限制单个 IP 同时最多能持有 10 个连接;limit_conn perserver 100 表示 server 同时能处理并发连接的总数为 100 个。 只有当 request header 被后端处理后,这个连接才进行计数。
五、基于redis实现限流算法
对于上述限流算法目前已有很多成熟的第三方库实现了,但是对于分布式系统来说无法起到严格意义上的限流,因此基于redis以gin中间件的方式实现上述限流算法。
滑动窗口计数器限流
func?Limiter(ctx?*gin.Context)?{
????now?:=?time.Now().UnixNano()
????username,?exists?:=?ctx.Get("username")
????if?!exists?{
????????ctx.JSON(http.StatusBadRequest,?gin.H{"message":?"username获取失败"})
????}
????key?:=?fmt.Sprintf(redis.KeyLimitArticleUser,?username)
????c,?err?:=?redis.Client.RedisCon.Dial()
????if?err?!=?nil?||?c?==?nil?{
????????ctx.JSON(http.StatusBadRequest,?gin.H{"message":?"redis连接失败"})
????????return
????}
????//限制五秒一次请求
????var?limit?int64?=?1
????dura?:=?time.Second?*?5
????//删除有序集合中的五秒之前的数据
????_,?err?=?c.Do("ZREMRANGEBYSCORE",?key,?"0",?fmt.Sprint(now-(dura.Nanoseconds())))
????if?err?!=?nil?{
????????ctx.JSON(http.StatusBadRequest,?gin.H{"message":?"redis操作ZREMRANGEBYSCORE失败"})
????}
????reqs,?_?:=?redisPool.Int64(c.Do("ZCARD",?key))
????if?reqs?>=?limit?{
????????ctx.AbortWithStatusJSON(http.StatusTooManyRequests,?gin.H{
????????????"status":??http.StatusTooManyRequests,
????????????"message":?"too?many?request",
????????})
????????return
????}
?????ctx.Next()
????_,?err?=?c.Do("ZADD",?key,?float64(now),?float64(now))
????if?err?!=?nil?{
????????ctx.JSON(http.StatusBadRequest,?gin.H{"message":?"redis操作ZADD失败"})
????}
????_,?err?=?c.Do("EXPIRE",?key,?dura)
????if?err?!=?nil?{
????????ctx.JSON(http.StatusBadRequest,?gin.H{"message":?"redis操作EXPIRE失败"})
????}
}
漏桶限流
//?LeakyBucket?redis实现漏桶限流
func?LeakyBucket(ctx?*gin.Context)?{
????username,?exists?:=?ctx.Get("username")
????if?!exists?{
????????ctx.JSON(http.StatusBadRequest,?gin.H{"message":?"username获取失败"})
????}
????key?:=?fmt.Sprintf(redis.KeyLeakyBucketArticleUser,?username)
????c,?err?:=?redis.Client.RedisCon.Dial()
????if?err?!=?nil?||?c?==?nil?{
????????ctx.JSON(http.StatusBadRequest,?gin.H{"message":?"redis连接失败"})
????????return
????}
????rate?:=?2???????????????????????????????????????????????????????//?每秒2个请求
????capacity?:=?5???????????????????????????????????????????????????//?桶容量
????lastTime,?err?:=?redisPool.Int64(c.Do("hget",?key,?"lastTime"))?//?上次请求时间
????now?:=?time.Now().Unix()
????water?:=?int(now-lastTime)?*?rate???????????????????????????//?经过一段时间后桶流出的请求
????balance,?err?:=?redisPool.Int(c.Do("hget",?key,?"balance"))?//?上一次桶的余量
????balance?+=?water????????????????????????????????????????????//?当前桶的余量
????if?balance?>?capacity?{
????????balance?=?capacity
????}
????if?balance?>=?1?{
????????balance--
????????lastTime?=?now?//?记录当前请求时间?秒为单位
????????c.Do("hset",?key,?"lastTime",?lastTime)
????????c.Do("hset",?key,?"balance",?balance)
????????return
????}
????//?无空闲balance可用时?429状态码限流提示
????ctx.AbortWithStatusJSON(http.StatusTooManyRequests,?gin.H{
????????"status":??http.StatusTooManyRequests,
????????"message":?"too?many?request",
????})
}
令牌桶限流
//?BucketLimit?redis实现令牌桶限流
func?BucketLimit(ctx?*gin.Context)?{
????username,?exists?:=?ctx.Get("username")
????if?!exists?{
????????ctx.JSON(http.StatusBadRequest,?gin.H{"message":?"username获取失败"})
????}
????key?:=?fmt.Sprintf(redis.KeyBucketLimitArticleUser,?username)
????c,?err?:=?redis.Client.RedisCon.Dial()
????if?err?!=?nil?||?c?==?nil?{
????????ctx.JSON(http.StatusBadRequest,?gin.H{"message":?"redis连接失败"})
????????return
????}
????rate?:=?1???????????????????????????????????????????????????????//?令牌生成速度?每秒1个token
????capacity?:=?1???????????????????????????????????????????????????//?桶容量
????tokens,?err?:=?redisPool.Int(c.Do("hget",?key,?"tokens"))???????//?桶中的令牌数
????lastTime,?err?:=?redisPool.Int64(c.Do("hget",?key,?"lastTime"))?//?上次令牌生成时间
????now?:=?time.Now().Unix()
????//?初始状态下?令牌数量为桶的容量
????existKey,?err?:=?redisPool.Int(c.Do("exists",?key))
????if?existKey?!=?1?{
????????tokens?=?capacity
????????c.Do("hset",?key,?"lastTime",?now)
????}
????deltaTokens?:=?int(now-lastTime)?*?rate?//?经过一段时间后生成的令牌
????if?deltaTokens?>?1?{
????????tokens?=?tokens?+?deltaTokens?//?增加令牌
????}
????if?tokens?>?capacity?{
????????tokens?=?capacity
????}
????if?tokens?>=?1?{
????????tokens--?//?请求进来了,令牌就减少1
????????c.Do("hset",?key,?"lastTime",?now)
????????c.Do("hset",?key,?"tokens",?tokens)
????????return
????}
????//?无空闲token可用时?429状态码限流提示
????ctx.AbortWithStatusJSON(http.StatusTooManyRequests,?gin.H{
????????"status":??http.StatusTooManyRequests,
????????"message":?"too?many?request",
????})
}
redis+lua实现线程安全的分布式限流算法
以令牌桶算法为例:
实现流程图
定义lua脚本
//?lua脚本实现令牌桶算法限流
????ScriptTokenLimit?=?`
local?rateLimit?=?redis.pcall('HMGET',KEYS[1],'lastTime','tokens')
local?lastTime?=?rateLimit[1]
local?tokens?=?tonumber(rateLimit[2])
local?capacity?=?tonumber(ARGV[1])
local?rate?=?tonumber(ARGV[2])
local?now?=?tonumber(ARGV[3])
if?tokens?==?nil?then
??tokens?=?capacity
else
??local?deltaTokens?=?math.floor((now-lastTime)*rate)
??tokens?=?tokens+deltaTokens
??if?tokens>capacity?then
????tokens?=?capacity
??end
end
local?result?=?false
lastTime?=?now
if(tokens>0)?then
??result?=?true
??tokens?=?tokens-1
end
redis.call('HMSET',KEYS[1],'lastTime',lastTime,'tokens',tokens)
return?result
`
通过lua脚本实现令牌桶算法限流
//?LuaTokenBucket?通过lua脚本实现令牌桶算法限流
func?LuaTokenBucket(c?redis.Conn,?key?string,?capacity,?rate,?now?int64)?(bool,?error)?{
????defer?c.Close()
????lua?:=?redis.NewScript(1,?ScriptTokenLimit)
????//?lua脚本中的参数为key和value
????res,?err?:=?redis.Bool(lua.Do(c,?key,?capacity,?rate,?now))
????if?err?!=?nil?{
????????return?false,?err
????}
????return?res,?nil
}
限流中间件
//?LuaTokenBucket?通过lua脚本实现令牌桶算法限流
func?LuaTokenBucket(c?redis.Conn,?key?string,?capacity,?rate,?now?int64)?(bool,?error)?{
????defer?c.Close()
????lua?:=?redis.NewScript(1,?ScriptTokenLimit)
????//?lua脚本中的参数为key和value
????res,?err?:=?redis.Bool(lua.Do(c,?key,?capacity,?rate,?now))
????if?err?!=?nil?{
????????return?false,?err
????}
????return?res,?nil
}
六、总结
计数器、漏桶、令牌桶算法限流有各自的特点及应用场景,不能单一维度地判断哪个算法最好。计数器算法实现简单,适用于对接口频次的限制,如防恶意刷帖限制等;漏桶限流适用于处理流量突刺现象,因为只要桶为空就可以接受请求;而令牌桶限流适用于应对突发流量,也是目前互联网架构中最常用的一种限流方式,只要能取到令牌即可处理请求。 nginx限流控制接口频次其实现方式实质上是用到了漏桶算法,如果是http请求并且使用了nginx作为反向代理,那么可以使用nginx作为流量入口限制的第一关。 在分布式场景下,一般选择使用redis来实现限流算法,配合lua脚本使得限流的判断是一个原子操作。