算法:限流之令牌桶算法实现

这篇具有很好参考价值的文章主要介绍了算法:限流之令牌桶算法实现。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

简介

本章介绍令牌桶Token Bucket算法在流量限速场景的原理,以及C++实现和相关测试验证。

常见的限流算法有计数限流,固定窗口限流,滑动窗口限流,漏桶算法限流和令牌桶算法限流。令牌桶算法是限流算法的一种,其原理是系统会以一个恒定的速度往桶里放入固定数量的令牌,而如果请求需要被处理,则需要先从桶里获取对应令牌,当桶里没有令牌可取时,则拒绝服务。

令牌桶算法可应用于多种场景,本章是针对网络流控制限制场景的使用,对外发的网络数据进行控制,使数据以长期的平均速率外发,并运行一个瞬时的最高流量。

原理

定义:令牌桶提供了一种机制,限制流的平均速率,并允许流中达到所需的突发级别。

需求:项目的需求是文件下载的服务器端限流,下载请求过来后(请求其他模块有限制,这里忽略这个问题),在独立线程读取文件并发送数据到该请求的socket上,要求发送到网络的总数据要限流,但是数据不能丢弃,可以延迟等待发送。

实现:

算法:限流之令牌桶算法实现

  • 定时定量向令牌桶投入令牌,令牌桶有容量限制,令牌桶容量满了则丢弃不再投入令牌。
  • 在发送报文时,如果令牌桶的令牌数量小于要发送的网络数据大小,则等待令牌桶的数量足够再发送;如果令牌桶的令牌数量大于等于要发送的网络数据大小,则报文直接发送,并减少令牌桶中对应数量的令牌。

结论:

  • 限速速率不一定是匀速的,但长期速率是一定的。

    在令牌桶满的情况下,如果有突发流量过来,会瞬时消耗掉令牌桶的令牌,此时的理论上限速率为:令牌桶的容量+速率。比如令牌桶的容量为60M,限速速度为50MB/s,那此时的速率最高可到110MB/s的速度。

  • 单次发送的数据不能大于令牌容量,否则获取不到令牌。

  • 令牌的获取线程安全,可多线程获取。

实现

类图如下:

算法:限流之令牌桶算法实现

具体代码如下:

  • CountSemaphore是信号量的封装。

    信号量的容量即令牌桶的容量,提供获取令牌和投递令牌2个操作。该对象封装可作为信号量的公共库使用。

    代码分析如下:

    • acquire获取令牌的操作中,使用锁保护数据正确性,使用条件等待令牌足够才继续往下执行。
    • release增加令牌数量,并通知其他等待条件的线程继续执行。
    • 因std::mutex是非公平锁,所以获取到锁的线程是随机的,但长期来看是公平的。

    具体代码如下:

    • countsemaphore.h

      #ifndef COUNTSEMAPHORE_H
      #define COUNTSEMAPHORE_H
      
      #include <mutex>
      #include <condition_variable>
      #include <climits>
      
      class CountSemaphore
      {
      public:
          CountSemaphore(unsigned long long initCount, unsigned long long maxCount);
      
          bool acquire(unsigned long long count = 1);
      
          void release(unsigned long long count = 1);
      private:
          std::mutex m_mtx;
          std::condition_variable m_cv;
          // 当前可用数量
          unsigned long long m_updateCount = 0;
          // 最大数量
          unsigned long long m_maxCount = ULLONG_MAX;
      };
      
      #endif // COUNTSEMAPHORE_H
      
      
    • countsemaphore.cpp

      #include "countsemaphore.h"
      
      
      CountSemaphore::CountSemaphore(unsigned long long initCount, unsigned long long maxCount)
          : m_updateCount(initCount > maxCount ?  maxCount : initCount)
          , m_maxCount(maxCount)
      {
      
      }
      
      bool CountSemaphore::acquire(unsigned long long count)
      {
          std::unique_lock<std::mutex> lck(m_mtx);
      
          // 获取的数量大于最大值,不可能成功
          if (count > m_maxCount)
          {
              return false;
          }
      
          m_cv.wait(lck, [&]() -> bool { return m_updateCount >= count; });
      
          m_updateCount -= count;
      
          return true;
      }
      
      void CountSemaphore::release(unsigned long long count)
      {
          std::unique_lock<std::mutex> lck(m_mtx);
      
          auto tobeCount = m_updateCount + count;
          if (tobeCount > m_maxCount)
          {
              m_updateCount = m_maxCount;
          }
          else
          {
              m_updateCount = tobeCount;
          }
      
          m_cv.notify_all();
      }
      
      
  • TokenSpeedLimiter是令牌桶的封装。

    包含令牌桶的限速速度,令牌的投递时间间隔和令牌桶的容量。提供开始和结束投递操作和获取令牌的操作。

    其中投递的时间间隔以毫秒为单位,越小速率越均匀。

    • tokenspeedlimiter.h

      #ifndef TOKENSPEEDLIMITER_H
      #define TOKENSPEEDLIMITER_H
      
      #include "countsemaphore.h"
      
      #include <thread>
      
      class TokenSpeedLimiter
      {
      public:
          TokenSpeedLimiter(unsigned long long speed, unsigned long long capacity, unsigned long long deliveryIntervalMs);
      
          void begin();
      
          void end();
      
          bool acquireToken(unsigned long long tokenCount);
      
      private:
          void workingThread();
      
      private:
          // 限速速度(字节/s)
          unsigned long long m_limitSpeed;
          // 令牌投递时间间隔(毫秒)
          unsigned long long m_deliveryIntervalMs;
          // 信号量
          CountSemaphore m_semaphore;
          // 是否运行
          bool m_runing = false;
          // 线程
          std::shared_ptr<std::thread> m_thread = nullptr;
      };
      
      #endif // TOKENSPEEDLIMITER_H
      
      
    • tokenspeedlimiter.cpp

      #include "tokenspeedlimiter.h"
      
      #include <functional>
      
      TokenSpeedLimiter::TokenSpeedLimiter(unsigned long long speed, unsigned long long capacity, unsigned long long deliveryIntervalMs)
          : m_limitSpeed(speed)
          , m_deliveryIntervalMs(deliveryIntervalMs)
          , m_semaphore(0, capacity)
      {
      
      }
      
      void TokenSpeedLimiter::begin()
      {
          if (m_runing)
          {
              return;
          }
      
          m_runing = true;
      
          m_thread.reset(new std::thread(std::bind(&TokenSpeedLimiter::workingThread, this)));
      }
      
      void TokenSpeedLimiter::end()
      {
          m_runing = false;
      
          if (m_thread != nullptr)
          {
              m_thread->join();
      
              m_thread = nullptr;
          }
      }
      
      bool TokenSpeedLimiter::acquireToken(unsigned long long tokenCount)
      {
          return m_semaphore.acquire(tokenCount);
      }
      
      void TokenSpeedLimiter::workingThread()
      {
          auto lastTime = std::chrono::steady_clock::now();
          while(m_runing)
          {
              // 延时定时投递
              std::this_thread::sleep_for(std::chrono::milliseconds(m_deliveryIntervalMs));
      
              // 计算投递时间差
              auto curTime = std::chrono::steady_clock::now();
              auto elapsedMs = std::chrono::duration<double, std::milli>(curTime - lastTime).count();
      
              lastTime = curTime;
      
              // 根据时间差计算投递令牌的数量(除以1000换算成毫秒投递数量,然后再乘以毫秒时间差)
              auto tokens = m_limitSpeed * elapsedMs / 1000;
      
              // 投递令牌
              m_semaphore.release((unsigned long long)tokens);
          }
      }
      
      
  • main.cpp

    包含令牌桶对象的调用及测试结果打印。

    • sendDatatoNet线程模拟多线程发送数据。
    • statisticNetwork统计流量结果和每个线程发送数据的百分比。
    #include <QCoreApplication>
    #include "tokenspeedlimiter.h"
    
    #include <iostream>
    #include <map>
    #include <sstream>
    #include <iomanip>
    
    // 网络发送字节数,用于统计
    unsigned long long sendCount = 0;
    std::mutex mutexCount;
    std::map<unsigned int, unsigned long long> mapTheadIdCount;
    
    // 网络数据发送测试线程函数
    void sendDatatoNet(TokenSpeedLimiter* speedLimiter)
    {
        // 每次发送的数据包大小
        const int sizeOnePacket = 2 * 1024;
    
        while(true)
        {
            // 获取令牌
            if (!speedLimiter->acquireToken(sizeOnePacket))
            {
                continue;
            }
    
            // 统计总的发送包数量
            std::unique_lock<std::mutex> lck(mutexCount);
            sendCount += sizeOnePacket;
    
            // 统计每个线程发送的数包
            auto threadId = std::this_thread::get_id();
            auto theId = *(unsigned int *)&threadId;
    
            auto it = mapTheadIdCount.find(theId);
            if (it != mapTheadIdCount.end())
            {
                it->second += sizeOnePacket;
            }
            else
            {
                mapTheadIdCount.insert(std::make_pair(theId, sizeOnePacket));
            }
        }
    }
    
    void statisticNetwork()
    {
        auto lastTime = std::chrono::steady_clock::now();
        while(true)
        {
            // 1秒统计一次
            std::this_thread::sleep_for(std::chrono::milliseconds(1000));
    
            // 计算投递时间差
            auto curTime = std::chrono::steady_clock::now();
            auto elapsedMs = std::chrono::duration<double, std::milli>(curTime - lastTime).count();
    
            lastTime = curTime;
    
            // 打印总速率
            std::unique_lock<std::mutex> lck(mutexCount);
            if (elapsedMs > 0)
            {
                // * 1000 / elapsedMs为毫秒转换为秒
                auto curSpeed = (double)sendCount * 1000 /  1024 / 1024 / elapsedMs;
    
                std::cout << "speed: " << curSpeed << " MB/s" << std::endl;
            }
    
            // 打印每个线程发送的百分比
            std::cout << "thread send count: ";
    
            for (auto it: mapTheadIdCount)
            {
                std::cout << it.first << "(" << std::setfill(' ') << std::setw(2) << 100 * it.second / sendCount  << "%),";
            }
    
            std::cout << std::endl;
    
            mapTheadIdCount.clear();
            sendCount = 0;
        }
    }
    
    int main(int argc, char *argv[])
    {
        // 构造限速器:限速50M/s,容量为6M,间隔10ms投递令牌;当前的流量峰值为56M(50的速度 + 6M的容量)左右
        TokenSpeedLimiter speedLimiter(50 * 1024 * 1024, 50 * 1024 * 1024 / 10 * 1.2, 10);
        speedLimiter.begin();
    
        // 延时5秒,填满令牌桶容量
        std::this_thread::sleep_for(std::chrono::milliseconds(5000));
    
        // 启动网络发送线程
        for (int i = 0; i < 10; ++i)
        {
            new std::thread(sendDatatoNet, &speedLimiter);
        }
    
        // 启动统计
        statisticNetwork();
    
        return 0;
    }
    
    

    测试输出如下以及分析如下:文章来源地址https://www.toymoban.com/news/detail-494506.html

    • 因启动令牌桶对象后,延迟5秒才开始发送数据,所以第一条的速度达到了55M/s,其与(50M+6M)B/s的速度计算基本对应。
    • 后续发送速度保持再50MB/s左右,和设置的限速速度保持一致。
    • 从输出来看,数据发送的线程获取令牌长期来看是公平的。
    speed: 55.9062 MB/s
    thread send count: 356( 9%),1644( 8%),2312(11%),11112(12%),13472( 7%),14696( 6%),20588( 7%),21096(13%),22080(10%),22500(11%),
    speed: 49.3314 MB/s
    thread send count: 356( 8%),1644(11%),2312(10%),11112(12%),13472( 7%),14696( 9%),20588( 9%),21096(13%),22080( 9%),22500( 7%),
    speed: 50.6701 MB/s
    thread send count: 356( 7%),1644(11%),2312(16%),11112( 7%),13472(11%),14696( 9%),20588( 7%),21096( 6%),22080(10%),22500(11%),
    speed: 49.9935 MB/s
    thread send count: 356( 7%),1644(10%),2312( 6%),11112(12%),13472(10%),14696( 6%),20588(16%),21096( 6%),22080(11%),22500(11%),
    speed: 49.3036 MB/s
    thread send count: 356(13%),1644(11%),2312( 8%),11112( 6%),13472( 9%),14696(10%),20588( 9%),21096( 9%),22080(11%),22500(10%),
    speed: 50.6954 MB/s
    thread send count: 356(11%),1644(10%),2312(14%),11112(10%),13472( 9%),14696( 4%),20588(10%),21096( 8%),22080(10%),22500( 9%),
    speed: 49.2754 MB/s
    thread send count: 356(12%),1644( 8%),2312( 5%),11112(11%),13472(13%),14696( 9%),20588( 9%),21096(11%),22080( 8%),22500( 9%),
    speed: 50.0361 MB/s
    thread send count: 356(13%),1644( 8%),2312( 7%),11112(10%),13472( 9%),14696( 7%),20588(12%),21096(10%),22080(10%),22500( 8%),
    

到了这里,关于算法:限流之令牌桶算法实现的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Spring限流之限流方案分析

    限流对于一个微服务架构系统来说具有非常重要的意义,否则其中的某个微服务将成为整个系统隐藏的雪崩因素,为什么这么说? 举例来讲,某个平台有100多个微服务应用,但是作为底层的某个或某几个应用来说,将会被所有上层应用频繁调用,业务高峰期时,如果底层应用

    2024年02月13日
    浏览(33)
  • go-zero 是如何实现令牌桶限流的?

    原文链接: 上一篇文章介绍了 如何实现计数器限流?主要有两种实现方式,分别是固定窗口和滑动窗口,并且分析了 go-zero 采用固定窗口方式实现的源码。 但是采用固定窗口实现的限流器会有两个问题: 会出现请求量超出限制值两倍的情况 无法很好处理流量突增问题 这篇

    2024年02月13日
    浏览(42)
  • java进行系统的限流实现--Guava RateLimiter、简单计数、滑窗计数、信号量、令牌桶

    本文主要介绍了几种限流方法:Guava RateLimiter、简单计数、滑窗计数、信号量、令牌桶,漏桶算法和nginx限流等等 1、引入guava集成的工具 pom.xml 文件 demo代码实现 2.令牌桶算法 3、滑窗计数器 4、信号量

    2024年02月09日
    浏览(41)
  • 限流与令牌桶

    令牌桶是一种常用的流量控制技术。令牌桶本身没有丢弃和优先级策略。 原理 1.令牌以一定的速率放入桶中。 2.每个令牌允许源发送一定数量的比特。 3.发送一个包,流量调节器就要从桶中删除与包大小相等的令牌数。 4.如果没有足够的令牌发送包,这个包就会等待直

    2024年02月08日
    浏览(24)
  • guava限流器RateLimiter使用简介(Springboot实现)

    在大型分布式系统中,限流是一种重要的防护机制,可以帮助我们控制流量并减轻系统的负担。Google的Guava库提供了一种方便的限流器实现,可以帮助我们轻松地实现限流功能。本文将介绍Guava中限流器的基本概念和使用方法。 限流器是一种控制系统流量的机制,可以帮助我

    2024年02月17日
    浏览(40)
  • 令牌桶算法与Guava的实现RateLimiter源码分析

    令牌桶算法是一种限流算法。 令牌桶算法的原理就是以一个恒定的速度往桶里放入令牌,每一个请求的处理都需要从桶里先获取一个令牌,当桶里没有令牌时,则请求不会被处理,要么排队等待,要么降级处理,要么直接拒绝服务。当桶里令牌满时,新添加的令牌会被丢弃或

    2024年01月20日
    浏览(38)
  • 精确掌控并发:令牌桶算法在分布式环境下并发流量控制的设计与实现

    这是《百图解码支付系统设计与实现》专栏系列文章中的第(17)篇,也是流量控制系列的第(4)篇。点击上方关注,深入了解支付系统的方方面面。 本篇重点讲清楚令牌桶原理,在支付系统的应用场景,以及使用reids实现的核心代码。 在流量控制系列文章中的前三篇,分别

    2024年01月19日
    浏览(48)
  • 四种常见分布式限流算法实现!

    大家好,我是老三,最近公司在搞年终大促,随着各种营销活动“组合拳”打出,进站流量时不时会有一个小波峰,一般情况下,当然是流量越多越好,前提是系统能杠地住。大家都知道,一个分布式系统,有两个“弃车保帅”的策略: 限流 和 熔断 ,这期,我们就来讨论一

    2024年02月16日
    浏览(38)
  • SpringCloud(17~21章):Alibaba入门简介、Nacos服务注册和配置中心、Sentinel实现熔断与限流、Seata处理分布式事务

    Spring Cloud Netflix项目进入维护模式 https://spring.io/blog/2018/12/12/spring-cloud-greenwich-rc1-available-now 说明 Spring Cloud Netflix Projects Entering Maintenance Mode 什么是维护模式 将模块置于维护模式,意味着 Spring Cloud 团队将不会再向模块添加新功能。我们将修复 block 级别的 bug 以及安全问题,我

    2024年01月19日
    浏览(61)
  • JWT简介& JWT结构& JWT示例& 前端添加JWT令牌功能& 后端程序

    目录 1. JWT简述 1.1 什么是JWT 1.2 为什么使用JWT 1.3 JWT结构 1.4 验证过程 2. JWT示例 2.1 后台程序 2.2 前台加入jwt令牌功能 Json web token (JWT), 是为了在网络应用环境间传递声明而执行的一种基于JSON的开放标准((RFC 7519).该token被设计为紧凑且安全的,特别适用于分布式站点的单点登录

    2024年04月16日
    浏览(37)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包