惯性聚合 高效追踪和阅读你感兴趣的博客、新闻、科技资讯
阅读原文 在惯性聚合中打开

推荐订阅源

SecWiki News
SecWiki News
I
InfoQ
The Cloudflare Blog
人人都是产品经理
人人都是产品经理
博客园 - Franky
T
Tailwind CSS Blog
让小产品的独立变现更简单 - ezindie.com
让小产品的独立变现更简单 - ezindie.com
量子位
博客园_首页
罗磊的独立博客
V
V2EX
李成银的技术随笔
大猫的无限游戏
大猫的无限游戏
钛媒体:引领未来商业与生活新知
钛媒体:引领未来商业与生活新知
T
True Tiger Recordings
Vercel News
Vercel News
Cyberwarzone
Cyberwarzone
Cisco Talos Blog
Cisco Talos Blog
F
Fox-IT International blog
D
Darknet – Hacking Tools, Hacker News & Cyber Security
M
Microsoft Research Blog - Microsoft Research
Know Your Adversary
Know Your Adversary
爱范儿
爱范儿
The Register - Security
The Register - Security
G
Google Developers Blog
The Hacker News
The Hacker News
Malwarebytes
Malwarebytes
S
Securelist
博客园 - 三生石上(FineUI控件)
Jina AI
Jina AI
T
Threat Research - Cisco Blogs
T
The Exploit Database - CXSecurity.com
S
SegmentFault 最新的问题
博客园 - 叶小钗
F
Fortinet All Blogs
Apple Machine Learning Research
Apple Machine Learning Research
宝玉的分享
宝玉的分享
博客园 - 聂微东
T
Threatpost
博客园 - 【当耐特】
D
Docker
P
Privacy & Cybersecurity Law Blog
www.infosecurity-magazine.com
www.infosecurity-magazine.com
G
GRAHAM CLULEY
V
Visual Studio Blog
C
Cisco Blogs
IT之家
IT之家
S
Security Archives - TechRepublic
Latest news
Latest news
阮一峰的网络日志
阮一峰的网络日志

Mohuishou

如何实现支持多集群的 Kubernetes Operator? 第三方应用如何调用我们 kubebuilder 生成的自定义资源? Kubernetes 简明教程 k8s job 为何迟迟不能结束? Go 工程化(十一) 如何优雅的写出 repo 层代码 Go 工程化(十) 如何在整洁架构中使用事务? 给博客添加章节目录 使用 Notion Database 管理静态博客文章 一个普通 Go 开发的三年 4. localhost 就一定是 localhost 么? Go可用性(七) 总结: 一张图串联可用性知识点 Go可用性(六) 熔断 10. 总结 9. kubebuilder 进阶: 源码分析 8. kubebuilder 进阶: webhook 7. kubebuilder 进阶: 测试 6. kubebuilder 实战: status & event 5. kubebuilder 实战: CRUD 4. kustomize 简明教程 3. KubeBuilder 简明教程 2. Kind: 如何快速搭建本地 K8s 开发环境? 1. Operator概述: 如何对 Kubernetes 进行扩展 Go可用性(五) 自适应限流 Go可用性(四) 漏桶算法 Go可用性(二) 令牌桶原理及使用 Go可用性(一) 隔离设计 Go并发编程(十二) Singleflight Go工程化(九) 项目重构实践 Go工程化(八) 单元测试 Go工程化(七) Go Module Go工程化(六) 配置管理 Go工程化(五) API 设计下: 基于 protobuf 自动生成 gin 代码 Go工程化(四) API 设计上: 项目结构 & 设计 Go工程化(三) 依赖注入框架 wire Go工程化(二) 项目目录结构 Go工程化(一) 架构整洁之道阅读笔记 Go并发编程(十一) 总结 Go并发编程(十) 深入理解 Channel Go并发编程(九) 深入理解 Context Go并发编程(八) 深入理解 sync.Once Go并发编程(七) 深入理解 errgroup Go并发编程(六) 深入理解 WaitGroup Go并发编程(五) 深入理解 sync/atomic Go并发编程(四) 深入理解 Mutex Go并发编程(三) data race Go并发编程(二) Go 内存模型 Go并发编程(一) goroutine Go错误处理最佳实践 微服务(二) 服务发现&多租户 微服务(一) 微服务概览 5. 栈下: 深入理解 defer 4. 栈上: 如何实现一个计算器 Go Struct 初始化风格的抉择 3. 数组下: 使用 GDB 调试 Golang 代码 2. 数组上: 深入理解 slice 1. 链表: 深入理解container/list&LRU缓存的实现 Go设计模式24-总结(更新完毕) Go设计模式23-中介模式 Go设计模式22-解释器模式 Go设计模式21-命令模式 Go设计模式20-备忘录模式 Go设计模式19-访问者模式 Go设计模式18-迭代器模式 Go设计模式17-状态模式 Go设计模式16-职责链模式(Gin的中间件实现) Go设计模式15-策略模式 Go模板模式14-模板模式 Go设计模式13-观察者模式(实现简单的EventBus) Go设计模式12-享元模式 Go设计模式11-组合模式 Go设计模式10-门面模式 Go设计模式09-适配器模式 Go设计模式08-装饰器模式 Go设计模式07-桥接模式 Go设计模式06-代理模式(generate实现类似动态代理) Go设计模式05-创建型模式总结 Go设计模式04-原型模式 Go设计模式03-建造者模式 Go设计模式02-工厂模式&DI容器 笔记-让你最快速地改善代码质量的20条编程规范 Go设计模式01-单例模式 一点拙见-如何写好一个技术预研报告? Go Web小技巧(四)在单个仓库中支持多个 go mod 模块 Go Web 小技巧(三)Gin 参数绑定 Go Web 小技巧(二)GORM 使用自定义类型 Go Web 小技巧(一)简化Gin接口代码 善用工具之postman高级用法概述 go generate and ast hexo-next-algolia-search全文搜索 docker镜像瘦身&优化 GORM避坑指南之含关联关系的更新 Github Actions介绍&自动构建Github Pages博客 在blog中内嵌在线PPT 记一次net http内存泄漏 使用TravisCI自动部署Blog 使用Goland调试Go程序 一个十分边缘的gorm的bug Httprouter介绍及源码阅读 Gin源码阅读 从0.1开始
Go可用性(三) 令牌桶的实现 rate/limt
2021-04-01 · via Mohuishou

注:本文已发布超过一年,请注意您所使用工具的相关版本是否适用

本系列为 Go 进阶训练营 笔记,访问 博客: Go进阶训练营, 即可查看当前更新进度,部分文章篇幅较长,使用 PC 大屏浏览体验更佳。

在上一篇文章 Go 可用性(二) 限流 1: 令牌桶原理及使用 当中我们简单的介绍了令牌桶实现的原理,然后利用 /x/time/rate 这个库 10 行代码写了一个基于 ip 的 gin 限流中间件,那这个功能是怎么实现的呢?接下来我们就从源码层面来了解一下这个库的实现。这个实现很有意思,并没有真正的使用一个定时器不断的生成令牌,而是靠计算的方式来完成

本文源码基于 https://pkg.go.dev/golang.org/x/time@v0.0.0-20210220033141-f8bda1e9f3ba/rate

上回我们讲到,使用限速器的时候我们需要调用 NewLimiter  方法,然后 Limiter  提供了三组限速的方法,这三组方法其实都是通过调用 reserveN  实现的 reserveN  返回一个 *Reservation  指针,我们先来看一下这两个结构体吧。

Limiter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
type Limiter struct {
// 互斥锁
mu sync.Mutex

// 每秒产生 token 的速度, 其实是 float64 的一个别名
limit Limit

// 桶的大小
burst int

// 当前时间节点拥有的 tokens 数量
tokens float64

// 上次更新 token 的时间
last time.Time

// 上次限速的时间,这个时间可能是过去的某个时间也可能是将来的某个时间
lastEvent time.Time
}

Reservation
这个结构体挺有意思的,表示预约某个时间的 token

1
2
3
4
5
6
7
8
9
10
11
12
type Reservation struct {
// 是否能预约上
ok bool
// limter
lim *Limiter
// 预约的 token 数量
tokens int
// token 实际使用的时间
timeToAct time.Time
// 保存一下速率,因为 lim 的速率是可以被动态调整的,所以不能直接用
limit Limit
}

这个库并没有使用定时器来发放 token 而是用了 lazyload 的方式,等需要消费 token 的时候才通过时间去计算然后更新 token 的数量,下面我们先通过一个例子来看一下这个流程是怎么跑的
blog.png
如上图所示,假设我们有一个限速器,它的 token 生成速度为 1,也就是一秒一个,桶的大小为 10,每个格子表示一秒的时间间隔

  • last  表示上一次更新 token 时还有 2 个 token。
  • 现在我有一个请求进来,我总共需要 7 个 token 才能完成这个请求
  • now  表示我现在进来的时间,距离 last 已经过去了 2s,那么现在就有 4 个 token
  • 所以我如果需要 7 个 token 那么也就还需要等待 3s 中才真的有 7 个,所以这就是 timeToAct  所在的时间节点
  • 预约成功之后更新 last = now 、token = -3  因为 token 已经被预约出去了所以现在剩下的就是负数了

消费 token

总共有三组消费 token 的方法 AllowN, ReserveN, and WaitN最终都是调用的reserveN`  这个方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
// now: 需要消费 token 的时间点
// n: 需要多少个 token
// maxFutureReserve: 能够等待的最长时间
func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation {
lim.mu.Lock()

// 如果发放令牌的速度无穷大的话,那么直接返回就行了,要多少可以给多少
if lim.limit == Inf {
lim.mu.Unlock()
return Reservation{
ok: true,
lim: lim,
tokens: n,
timeToAct: now,
}
}

// advance 方法会去计算当前有多少个 token
// 后面会讲到,now 其实就是传入的时间,但是 last 可能会变
now, last, tokens := lim.advance(now)

// 发放 token 之后还剩多少
tokens -= float64(n)

// 根据 token 数量计算需要等待的时间
var waitDuration time.Duration
if tokens < 0 {
waitDuration = lim.limit.durationFromTokens(-tokens)
}

// 计算是否可以发放,如果需要的量比桶的容量还大肯定是不行的
// 然后就是看需要能否容忍需要等待的时间
ok := n <= lim.burst && waitDuration <= maxFutureReserve

// Prepare reservation
r := Reservation{
ok: ok,
lim: lim,
limit: lim.limit,
}
// 如果可以的话,就把 token 分配给预约者
if ok {
r.tokens = n
r.timeToAct = now.Add(waitDuration)
}

// 更新各个字段的状态
if ok {
lim.last = now
lim.tokens = tokens
lim.lastEvent = r.timeToAct
} else {
// 为什么不 ok 也要更新 last 呢?因为 last 可能会改变
lim.last = last
}

lim.mu.Unlock()
return r
}

advance  方法用于计算 token 的数量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// now 是传入的当前的时间点,返回的 newNow 其实就是传入的参数,没有任何改变
// newLast 是更新 token 的时间
// newTokens 是 token 的数量
func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) {
// 如果当前时间比上次更新 token 的时间还要早,那么就重置一下 last
last := lim.last
if now.Before(last) {
last = now
}

// 这里为了防止溢出,先计算了将桶填满需要花费的最大时间
maxElapsed := lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens)
// 计算时间差,如果大于最大时间的话,就取最大值
elapsed := now.Sub(last)
if elapsed > maxElapsed {
elapsed = maxElapsed
}

// 计算这段时间生成的 token 数量,如果大于桶的容量,就取桶的容量
delta := lim.limit.tokensFromDuration(elapsed)
tokens := lim.tokens + delta
if burst := float64(lim.burst); tokens > burst {
tokens = burst
}

return now, last, tokens
}

这个比较有意思的是先去计算了时间的最大值,因为初始化的时候没为 last  赋值,所以 now.Before(last)  出来的结果可能是一个很大的值,再去计算 tokens 数量很可能溢出

durationFromTokens 根据 tokens 的数量计算需要花费的时间

1
2
3
4
func (limit Limit) durationFromTokens(tokens float64) time.Duration {
seconds := tokens / float64(limit)
return time.Nanosecond * time.Duration(1e9*seconds)
}

tokensFromDuration 根据时间计算 tokens 的数量

1
2
3
4
5
6
func (limit Limit) tokensFromDuration(d time.Duration) float64 {
// 这里通过拆分整数和小数部分可以减少时间上的误差
sec := float64(d/time.Second) * float64(limit)
nsec := float64(d%time.Second) * float64(limit)
return sec + nsec/1e9
}

消费 token 总结

消费 token 的逻辑就讲完了,我们大概总结一下

  • 需要消费的时候,先去计算一下,从过去到现在可以生成多少个 token
  • 然后我们通过需要的 token 减去现在拥有的 token 数量,就得到了需要预约的 token 数量
  • 再通过 token 数量转换成时间,就可以得到需要等待的时间长度,以及是否可以消费
  • 然后再通过不同的消费方法进行消费

WaitN

其他两类消费方法都很简单,调用 Reservation  进行返回, WaitN  还有一点逻辑,所以我们一起来看一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// ctx 用于控制超时, n 是需要消费的 token 数量,如果 context 的 Deadline 早于要等待的时间就会直接返回失败
func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) {
lim.mu.Lock()
burst := lim.burst
limit := lim.limit
lim.mu.Unlock()

// 先看一下是不是已经超出消费极限了
if n > burst && limit != Inf {
return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, burst)
}

// 如果 ctx 已经结束了也不用等了
select {
case <-ctx.Done():
return ctx.Err()
default:
}

// 计算一下可以等待的时间
now := time.Now()
waitLimit := InfDuration
if deadline, ok := ctx.Deadline(); ok {
waitLimit = deadline.Sub(now)
}

// 调用 reserveN 得到预约数据
r := lim.reserveN(now, n, waitLimit)

// 如果不 ok 说明预约不到
if !r.ok {
return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n)
}

// 如果可以预约到,计算一下需要等多久
delay := r.DelayFrom(now)
if delay == 0 {
return nil
}

// 启动一个 timer 进行定时
t := time.NewTimer(delay)
defer t.Stop()
select {
case <-t.C:
// We can proceed.
return nil
case <-ctx.Done():
// 如果 context 主动取消了,那么值钱预约的 token 数量需要归还
r.Cancel()
return ctx.Err()
}
}

取消消费

WaitN  当中如果预约上了,但是 Context  取消了,会调用 CancelAt  归还 tokens 我们来一起看一下是怎么实现的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
func (r *Reservation) CancelAt(now time.Time) {
// 不 ok 说明没有预约上,直接返回就行了
if !r.ok {
return
}

r.lim.mu.Lock()
defer r.lim.mu.Unlock()

// 如果没有速率限制,或者没有消费 token 或 token 已经被消费了,都不用还了
if r.lim.limit == Inf || r.tokens == 0 || r.timeToAct.Before(now) {
return
}

// 计算需要还的 token 数量
// 这里说是需要减去已经预支的 token 数量,但是我发现应该是个 bug,感觉这里减重复了
restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct))
if restoreTokens <= 0 {
return
}

// 计算当前拥有的 tokens 数量
now, _, tokens := r.lim.advance(now)

// 当前拥有的加上需要归还的就是现有的,但是不能大于桶的容量
tokens += restoreTokens
if burst := float64(r.lim.burst); tokens > burst {
tokens = burst
}

// 更新 tokens 数量
r.lim.last = now
r.lim.tokens = tokens

// 如果相等说明后面没有新的 token 消费,所以将状态重置到上一次
if r.timeToAct == r.lim.lastEvent {
prevEvent := r.timeToAct.Add(r.limit.durationFromTokens(float64(-r.tokens)))
if !prevEvent.Before(now) {
r.lim.lastEvent = prevEvent
}
}

return
}

存在的问题

除了上面提到的感觉 cancelAt 可能有一个 bug 外,云神的博客还提到了一个问题,就是如果我们 cancel 了的话,后面已经在等待的任务是不会重新调整的,举个例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
func wait() {
l := rate.NewLimiter(10, 10)
t := time.Now()
l.ReserveN(t, 10)

var wg sync.WaitGroup

ctx, cancel := context.WithTimeout(context.TODO(), time.Hour)
defer cancel()

// 注释掉下面这段就不会提前 cancel
wg.Add(1)
go func() {
defer wg.Done()
// 模拟出现问题, 200ms就取消了
time.Sleep(200 * time.Millisecond)
cancel()
}()

wg.Add(2)
go func() {
defer wg.Done()
// 如果要等,这个要等 1s 才能执行,但是我们的 ctx 200ms 就会取消
l.WaitN(ctx, 10)
fmt.Printf("[1] cost: %s\n", time.Since(t))
}()

time.Sleep(100 * time.Millisecond)

go func() {
defer wg.Done()
// 正常情况下,这个要等 1.2 s 才能执行,但是我们前面都取消了
// 这个是不是应该就只需要等 200ms 就执行了
ctx, cancel := context.WithTimeout(context.Background(), time.Hour)
defer cancel()
l.WaitN(ctx, 2)
fmt.Printf("[2] cost: %s\n", time.Since(t))
}()

wg.Wait()
}

我们先看一下不提前 cancel 的结果

1
2
[1] cost: 1.0002113s
[2] cost: 1.2007347s

再看看提前 cancel 的结果

1
2
[1] cost: 200.8268ms
[2] cost: 1.201066s

可以看到就是 1 有变化,从 1s -> 200ms 但是 2 一直都要等 1.2s

总结

仔细看了一下令牌桶的实现,但是也留下了一个疑问,如果哪位童鞋知道希望可以留言告诉我,在取消的时候,会减掉一个预约的时间,但是我发现这里其实应该是重复减了一次

1
restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct))

下面是测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func main() {
t0 := time.Now()
t1 := time.Now().Add(100 * time.Millisecond)
t2 := time.Now().Add(200 * time.Millisecond)
t3 := time.Now().Add(300 * time.Millisecond)

l := rate.NewLimiter(10, 20)
l.ReserveN(t0, 15) // 桶里还剩 5 个 token
fmt.Printf("%+v\n", l)

r := l.ReserveN(t1, 10) // 桶还有 -4 个,
fmt.Printf("%+v\n", l)

// 注释掉下面两行,最后结果还剩 8 个 token
l.ReserveN(t2, 2) // 桶里还有 -5 个
fmt.Printf("%+v\n", l)

r.CancelAt(t3)
fmt.Printf("%+v\n", l)
// 归还之前借的,运行结果 桶里还有 4 个
// 但是这里不应该剩下 6 个么,本来有 5 个,300ms 生成了 3 个,后面又预支出去 2 个
// 而且我发现如果我注释掉预支两个的代码,结果和我预期的一致,剩余 8 个token
}

参考文献

  1. Go 进阶训练营-极客时间

  2. Token bucket - Wikipedia

  3. 令牌桶算法_百度百科 (baidu.com)

  4. 限流的概念,算法,分布式限流以及微服务架构下限流的难点 - 知乎 (zhihu.com)

  5. 令牌桶工作原理 - 知乎 (zhihu.com)

  6. /x/time/rate

  7. 开源限流组件分析(二):Golang-time/rate 限速算法实现分析 - 熊喵君的博客 | PANDAYCHEN

  8. Golang rate 无法延迟重排的 BUG – 峰云就她了 (xiaorui.cc)

关注我获取更新

wechat

知乎

github

猜你喜欢