go-zero 是如何实现令牌桶限流的?

这篇具有很好参考价值的文章主要介绍了go-zero 是如何实现令牌桶限流的?。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

原文链接:

上一篇文章介绍了 如何实现计数器限流?主要有两种实现方式,分别是固定窗口和滑动窗口,并且分析了 go-zero 采用固定窗口方式实现的源码。

但是采用固定窗口实现的限流器会有两个问题:

  1. 会出现请求量超出限制值两倍的情况
  2. 无法很好处理流量突增问题

这篇文章来介绍一下令牌桶算法,可以很好解决以上两个问题。

工作原理

算法概念如下:

  • 令牌以固定速率生成;
  • 生成的令牌放入令牌桶中存放,如果令牌桶满了则多余的令牌会直接丢弃,当请求到达时,会尝试从令牌桶中取令牌,取到了令牌的请求可以执行;
  • 如果桶空了,那么尝试取令牌的请求会被直接丢弃。

令牌桶算法既能够将所有的请求平均分布到时间区间内,又能接受服务器能够承受范围内的突发请求,因此是目前使用较为广泛的一种限流算法。

源码实现

源码分析我们还是以 go-zero 项目为例,首先来看生成令牌的部分,依然是使用 Redis 来实现。

// core/limit/tokenlimit.go

// 生成 token 速率
script = `local rate = tonumber(ARGV[1])
// 通容量
local capacity = tonumber(ARGV[2])
// 当前时间戳
local now = tonumber(ARGV[3])
// 请求数量
local requested = tonumber(ARGV[4])
// 需要多少秒才能把桶填满
local fill_time = capacity/rate
// 向下取整,ttl 为填满时间 2 倍
local ttl = math.floor(fill_time*2)
// 当前桶剩余容量,如果为 nil,说明第一次使用,赋值为桶最大容量
local last_tokens = tonumber(redis.call("get", KEYS[1]))
if last_tokens == nil then
    last_tokens = capacity
end

// 上次请求时间戳,如果为 nil 则赋值 0
local last_refreshed = tonumber(redis.call("get", KEYS[2]))
if last_refreshed == nil then
    last_refreshed = 0
end

// 距离上一次请求的时间跨度
local delta = math.max(0, now-last_refreshed)
// 距离上一次请求的时间跨度能生成的 token 数量和桶内剩余 token 数量的和
// 与桶容量比较,取二者的小值
local filled_tokens = math.min(capacity, last_tokens+(delta*rate))
// 判断请求数量和桶内 token 数量的大小
local allowed = filled_tokens >= requested
// 被请求消耗掉之后,更新剩余 token 数量
local new_tokens = filled_tokens
if allowed then
    new_tokens = filled_tokens - requested
end

// 更新 redis token
redis.call("setex", KEYS[1], ttl, new_tokens)
// 更新 redis 刷新时间
redis.call("setex", KEYS[2], ttl, now)

return allowed`

Redis 中主要保存两个 key,分别是 token 数量和刷新时间。

核心思想就是比较两次请求时间间隔内生成的 token 数量 + 桶内剩余 token 数量,和请求量之间的大小,如果满足则允许,否则则不允许。

限流器初始化:

// A TokenLimiter controls how frequently events are allowed to happen with in one second.
type TokenLimiter struct {
    // 生成 token 速率
    rate           int
    // 桶容量
    burst          int
    store          *redis.Redis
    // 桶 key
    tokenKey       string
    // 桶刷新时间 key
    timestampKey   string
    rescueLock     sync.Mutex
    // redis 健康标识
    redisAlive     uint32
    // redis 健康监控启动状态
    monitorStarted bool
    // 内置单机限流器
    rescueLimiter  *xrate.Limiter
}

// NewTokenLimiter returns a new TokenLimiter that allows events up to rate and permits
// bursts of at most burst tokens.
func NewTokenLimiter(rate, burst int, store *redis.Redis, key string) *TokenLimiter {
    tokenKey := fmt.Sprintf(tokenFormat, key)
    timestampKey := fmt.Sprintf(timestampFormat, key)

    return &TokenLimiter{
        rate:          rate,
        burst:         burst,
        store:         store,
        tokenKey:      tokenKey,
        timestampKey:  timestampKey,
        redisAlive:    1,
        rescueLimiter: xrate.NewLimiter(xrate.Every(time.Second/time.Duration(rate)), burst),
    }
}

其中有一个变量 rescueLimiter,这是一个进程内的限流器。如果 Redis 发生故障了,那么就使用这个,算是一个保障,尽量避免系统被突发流量拖垮。

提供了四个可调用方法:

// Allow is shorthand for AllowN(time.Now(), 1).
func (lim *TokenLimiter) Allow() bool {
    return lim.AllowN(time.Now(), 1)
}

// AllowCtx is shorthand for AllowNCtx(ctx,time.Now(), 1) with incoming context.
func (lim *TokenLimiter) AllowCtx(ctx context.Context) bool {
    return lim.AllowNCtx(ctx, time.Now(), 1)
}

// AllowN reports whether n events may happen at time now.
// Use this method if you intend to drop / skip events that exceed the rate.
// Otherwise, use Reserve or Wait.
func (lim *TokenLimiter) AllowN(now time.Time, n int) bool {
    return lim.reserveN(context.Background(), now, n)
}

// AllowNCtx reports whether n events may happen at time now with incoming context.
// Use this method if you intend to drop / skip events that exceed the rate.
// Otherwise, use Reserve or Wait.
func (lim *TokenLimiter) AllowNCtx(ctx context.Context, now time.Time, n int) bool {
    return lim.reserveN(ctx, now, n)
}

最终调用的都是 reverveN 方法:

func (lim *TokenLimiter) reserveN(ctx context.Context, now time.Time, n int) bool {
    // 判断 Redis 健康状态,如果 Redis 故障,则使用进程内限流器
    if atomic.LoadUint32(&lim.redisAlive) == 0 {
        return lim.rescueLimiter.AllowN(now, n)
    }

    // 执行限流脚本
    resp, err := lim.store.EvalCtx(ctx,
        script,
        []string{
            lim.tokenKey,
            lim.timestampKey,
        },
        []string{
            strconv.Itoa(lim.rate),
            strconv.Itoa(lim.burst),
            strconv.FormatInt(now.Unix(), 10),
            strconv.Itoa(n),
        })
    // redis allowed == false
    // Lua boolean false -> r Nil bulk reply
    if err == redis.Nil {
        return false
    }
    if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
        logx.Errorf("fail to use rate limiter: %s", err)
        return false
    }
    if err != nil {
        logx.Errorf("fail to use rate limiter: %s, use in-process limiter for rescue", err)
        // 如果有异常的话,会启动进程内限流
        lim.startMonitor()
        return lim.rescueLimiter.AllowN(now, n)
    }

    code, ok := resp.(int64)
    if !ok {
        logx.Errorf("fail to eval redis script: %v, use in-process limiter for rescue", resp)
        lim.startMonitor()
        return lim.rescueLimiter.AllowN(now, n)
    }

    // redis allowed == true
    // Lua boolean true -> r integer reply with value of 1
    return code == 1
}

最后看一下进程内限流的启动与恢复:

func (lim *TokenLimiter) startMonitor() {
    lim.rescueLock.Lock()
    defer lim.rescueLock.Unlock()

    // 需要加锁保护,如果程序已经启动了,直接返回,不要重复启动
    if lim.monitorStarted {
        return
    }

    lim.monitorStarted = true
    atomic.StoreUint32(&lim.redisAlive, 0)

    go lim.waitForRedis()
}

func (lim *TokenLimiter) waitForRedis() {
    ticker := time.NewTicker(pingInterval)
    // 更新监控进程的状态
    defer func() {
        ticker.Stop()
        lim.rescueLock.Lock()
        lim.monitorStarted = false
        lim.rescueLock.Unlock()
    }()

    for range ticker.C {
        // 对 redis 进行健康监测,如果 redis 服务恢复了
        // 则更新 redisAlive 标识,并退出 goroutine
        if lim.store.Ping() {
            atomic.StoreUint32(&lim.redisAlive, 1)
            return
        }
    }
}

以上就是本文的全部内容,如果觉得还不错的话欢迎点赞转发关注,感谢支持。


参考文章:

  • https://juejin.cn/post/7052171117116522504
  • https://www.infoq.cn/article/Qg2tX8fyw5Vt-f3HH673

推荐阅读:文章来源地址https://www.toymoban.com/news/detail-639397.html

  • 如何实现计数器限流?
  • go-zero 是如何做路由管理的?

到了这里,关于go-zero 是如何实现令牌桶限流的?的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • go-zero 如何在任意地方获取yaml中的值

    1、config配置文件中新增全局变量 2、main函数所在的入口文件为其赋值 3、然后在想要使用的地方直接使用就可以了。 比如使用yaml中配置的JWT认证的key

    2024年01月20日
    浏览(37)
  • 【go-zero】(1):尝试使用go-zero的工具goctl进行model,controller代码生成,配置数据库,实现FindAll方法,查询数据库全部数据

    本文的原文连接是: https://blog.csdn.net/freewebsys/article/details/128707849 未经博主允许不得转载。 博主CSDN地址是:https://blog.csdn.net/freewebsys 博主掘金地址是:https://juejin.cn/user/585379920479288 博主知乎地址是:https://www.zhihu.com/people/freewebsystem 项目地址: https://go-zero.dev/cn/ go-zero 是一个集

    2023年04月24日
    浏览(44)
  • 【go-zero】go-zero分布式锁实战 | apifox测试go-zero分布式锁方式

    包地址:github.com/zeromicro/go-zero/core/stores/redis 使用场景: 为了防止并发的下载相同的excel 我们通过redis锁来控制请求相同的excel下载 个人思路: req为API传入的请求参数 然后加密成md5的字符串,这样可以处理 相同的请求

    2024年02月15日
    浏览(42)
  • 【go-zero】go-zero阿里云oss 前端上传文件到go-zero API服务 并在k8s pod中创建文件 并推送到阿里云oss 最佳实践

    问题:在本地通过上传文件,然后将文件推送到aliyun的oss中,是没问题的 但是部署到了k8s中,则出现了问题,一直报错没有创建的权限 思路:开始认为应该将该文件挂载到configmap中,然后通过这种方式修改了deployment和dockerfile。最终发现应该是go的创建文件路径方式搞错了,

    2024年02月13日
    浏览(48)
  • go-zero学习 第一章 基础

    因官网重新改版,本文是基于官网最新版本的文档并整合旧文档重新进行全面总结、归纳。 本文主要对官网 快速开始 进行提炼总结,未涉及部分将在后续章节陆续补充完善。 go-zero 的 goctl 工具下载 验证 goctl 的安装结果: goctl 一键安装 protoc 、 protoc-gen-go 、 protoc-gen-go-grp

    2024年02月09日
    浏览(50)
  • go-zero系列:接入Prometheus

    参考文档:https://zhuanlan.zhihu.com/p/463418864 https://prometheus.io/download/ 进入下载文件夹,比如prometheus-2.44.0.windows-amd64。 然后双击Prometheus.exe启动软件。 启动后,可以访问 http://127.0.0.1:9090/。就能查看Prometheus后台。 然后重启go-zero项目,能看到输出日志:Starting prometheus agent at 0.0.

    2024年02月16日
    浏览(40)
  • go-zero 开发之安装 etcd

    本文只涉及 Linux 上的安装。 二进制安装 下载二进制安装包 下载地址示例: 解压二进制安装包 删除二进制安装包 版本检查 启动 etcd 往 etcd 写读数据 Docker 安装 etcd 主要使用 Google 容器注册表(gcr.io)下的 gcr.io/etcd-development/etcd 仓库来存储其容器镜像。作为次要选项,它还使

    2024年02月04日
    浏览(44)
  • go-zero微服务实战——服务构建

    接上一节go-zero微服务实战——基本环境搭建。搭建好了微服务的基本环境,开始构建整个微服务体系了,将其他服务也搭建起来。 order的目录结构,如下 根目录 api服务 rpc服务 自定义逻辑层logic 自定义参数层models 自定义工具层util api服务和rpc服务都是基于goctl一键生成的,当

    2024年02月14日
    浏览(47)
  • go-zero的服务发现源码阅读

    服务发现原理与grpc源码解析_wangxiaoangg的博客-CSDN博客   go-zero rpc demo官方文档:rpc编写与调用 | go-zero 目录 一 服务注册 1. 创建rpc服务 2. 启动rpc服务 3. registerEtcd做了什么 4. discov.NewPublisher 服务发布者 二 服务发现 1.定义注册resolver 2.解析etcd地址创建链接 3.update方法 在看rp

    2024年02月06日
    浏览(62)
  • 使用go-zero快速构建微服务

    本文是对 使用go-zero快速构建微服务 [1] 的亲手实践 编写API Gateway代码 mkdir api goctl api -o api/bookstore.api cd api goctl api go -api bookstore.api -dir . go run bookstore.go -f etc/bookstore-api.yaml 启动API Gateway服务,默认侦听在8888端口 因为默认生成的 api/etc/bookstore-api.yml 为: 按提示下载,再次运行

    2024年02月13日
    浏览(65)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包