限流定义及目的
当系统流量达到系统或下游承受能力的阈值时对系统进行限流控制以防止系统或下游挂掉,减少影响面。
限流组成:阈值及限流策略。阈值是指系统单位时间接收到的请求qps总数;限流策略是指限流行业触发后对应的系统行为,如失败拒绝还是放入等待队列。
应用场景
如秒杀业务,商品变更消息等场景。
秒杀业务:通常来说是保护全链路系统不挂;特性是瞬时流量大;超过阈值时拒绝策略通常是返回失败。
商品变更消息:业务场景是需要将商品变更消息推送给下游商家,下游商家系统能力有限,因此需要做限制;这个场景更多是保护下游系统不挂(整条系统链路上最小短板)。
分类
4大类:计数,滑动窗口,漏桶,令牌。
计数(固定时间窗口)
单位时间窗口内进行计数,超过阈值则进行限流。
算法限制:只能限制一整秒流量。若出现第1秒中后500ms和第2秒中前500ms超过阈值则此时不生效。
实现代码
import exception.BlockException;
import java.util.concurrent.atomic.AtomicInteger;
public class CalNumProcessor {
//限流窗口大小
private static int WINDOW_TIME_MILL = 1000;
//阈值
private static final int LIMIT_NUMS = 10;
//计数器
private AtomicInteger count = new AtomicInteger(0);
//开始时间
private Long startTime;
public static void main(String[] args) throws InterruptedException {
CalNumProcessor calNumProcessor=new CalNumProcessor();
for(int i=0;i<1000;i++){
System.out.println(i);
calNumProcessor.tryIncAndLimit();
Thread.sleep(200);
}
}
/**
* 进行限流计数,超过限流阈值时会抛BlockException
*/
public void tryIncAndLimit() {
//当前时间
long curMill = System.currentTimeMillis();
//开始时间初始化
if (startTime == null) {
startTime = curMill;
}
if ((curMill - startTime) > WINDOW_TIME_MILL) {
//若时间计数超过一秒则重置计数为0并重新设置计数开始时间
count.set(0);
startTime = curMill;
}
int after = count.incrementAndGet();
if (after > LIMIT_NUMS) {
//超过阈值
throw new BlockException();
}
}
}
滑动窗口
计数有个问题在于流量放大无法限流。如1秒为计数窗口。则第一秒后半秒和第二秒前半秒累计可能超过qps限制,但由于不是一个时间窗口此时反而不能限制住。
解决思路:每次请求计数每过一个时间窗口单位进行滑动计数。
整体算法思路详细
1.构建n个时间窗口单位,每个窗口单位有对应qps变量及窗口开始时间;初始化时间窗口单位的qps为0及开始时间为当前时间;
2.每次获取qps时计算当前时间应该在哪个时间窗口单位;
3.循环处理时间窗口,如果发现当前时间与时间窗口单位超过1秒(时间窗口单位最大时间)则重置窗口开始时间及qps为0。
4.将当前时间对应时间窗口qps加1;
5.返回所有时间窗口单位qps累计值;
代码实现
import java.time.LocalTime;
import java.util.concurrent.atomic.AtomicInteger;
public class TimeWindow {
/**
* 时间窗口大小,如1秒
*/
private int windowTimeSize;
/**
* 窗口数
*/
private int windowNum;
private Window[] windows;
private int maxQps;
public TimeWindow(int maxQps, int windowTimeSize, int windowNum) {
this.maxQps = maxQps;
this.windowTimeSize = windowTimeSize;
this.windowNum = windowNum;
windows = new Window[windowNum];
for (int i = 0; i < windowNum; i++) {
windows[i] = new Window();
windows[i].setQps(new AtomicInteger(0));
windows[i].setStartTime(System.currentTimeMillis());
}
}
public static void main(String[] args) throws InterruptedException {
int qps = 2;
int count = 20;
int sleep = 300;
int success = 0;
TimeWindow timeWindow = new TimeWindow(qps, 1000, 10);
for (int i = 0; i < count; i++) {
Thread.sleep(sleep);
if (timeWindow.tryAcquire()) {
success++;
if (success % qps == 0) {
System.out.println(LocalTime.now() + ": success, ");
} else {
System.out.print(LocalTime.now() + ": success, ");
}
} else {
System.out.println(LocalTime.now() + ": fail");
}
}
System.out.println();
System.out.println("实际测试成功次数:" + success);
}
public boolean tryAcquire() {
//计算当前时间落到哪个窗口位置
int index = (int) (System.currentTimeMillis() % windowTimeSize) / (windowTimeSize / windowNum);
//当前时间若超过时间累计窗口则重置窗口参数
int r = 0;
for (int i = 0; i < windowNum; i++) {
Window curWindow = windows[i];
if ((System.currentTimeMillis() - curWindow.getStartTime()) > windowTimeSize) {
//当前时间减去时间窗口大于最大累计时间窗口大小则重置变量
curWindow.setQps(new AtomicInteger(0));
curWindow.setStartTime(System.currentTimeMillis());
}
//当前时间对应窗口累计qps
if (index == i) {
curWindow.getQps().incrementAndGet();
}
r += curWindow.getQps().get();
}
return r <= maxQps;
}
class Window {
/**
* qps
*/
private AtomicInteger qps;
/**
* 开始时间
*/
private long startTime;
public AtomicInteger getQps() {
return qps;
}
public void setQps(AtomicInteger qps) {
this.qps = qps;
}
public long getStartTime() {
return startTime;
}
public void setStartTime(long startTime) {
this.startTime = startTime;
}
}
}
漏桶算法
流出衡定,不能应对突发流量,能较好保护下游。
令牌算法
优:能处理突出流量,流入相对衡定,流出允许有波动。秒杀场景适用。
这里核心概念:令牌桶(有令牌数及桶上限2个参数),令牌,获取令牌,存放令牌;
存放令牌策略有:1、有单独线路每秒加入n个令牌(相当于qps为n);2、懒计算:当获取令牌请求到来时进行计算,计算思路:Math.min(当前时间距离上次已存放令牌时间间隔秒数*令牌qps,令牌数上限)。
代码
RateLimiter rateLimiter = RateLimiter.create(2);
for (int i = 0; i < 10; i++) {
String time = LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_TIME);
System.out.println(time + ":" + rateLimiter.tryAcquire());
Thread.sleep(250);
}
实现产品
guava,阿里的sentinal。TODO。
sentinel
sentinel地址:Sentinel工作主流程 · alibaba/Sentinel Wiki · GitHub
物理架构
客户端限流逻辑架构
限流流程
- 逻辑上sentinal分为2种处理:(1 )资源统计;(2) 处理,包含限流,降级,系统保护;
- context保存在threadLocal,存储当前访问资源信息;
- 每个资源对应的processorSlotChain只在第一次访问时才会创建,这个保存在static的chainMap中;当不存在对应资源的processorSlotChain时会初始化,这里会在syncronized锁;
ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) { //根据资源获取对应的SlotChain
ProcessorSlotChain chain = chainMap.get(resourceWrapper);
if (chain == null) {
synchronized (LOCK) {
chain = chainMap.get(resourceWrapper);
if (chain == null) {
// Entry size limit.
if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
return null;
}
chain = SlotChainProvider.newSlotChain();
Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
chainMap.size() + 1);
newMap.putAll(chainMap);
newMap.put(resourceWrapper, chain);
chainMap = newMap;
}
}
}
return chain;
}
- 各Slot说明
StaticSlot用于实时统计资源qps/线程数,底层基于滑动窗口进行计数;
ClusterBuildSlot用于统计qps,通过数,线程数,异常数等信息,原始调用方的统计信息;
FlowSlot用于限流:策略有根据qps或线程数做降级;
DegradeSlot用于熔断降级:根据响应时间策略进行熔断;可根据异常数或响应时间进行熔断;拒绝策略有:立即拒绝,升温策略,排队策略(漏斗);
SystemSlot用于系统保护,可根据系统负载来进行保护处理;
实际业务应用
导购业务
业务背景:合作B端商家通过api来获取商品报价,而系统依赖的供应商会对我们进行限流;文章来源:https://www.toymoban.com/news/detail-680059.html
解决方案:对每个B端商家进行限流,每个商家有独立限流配额,默认这个商家入驻时会给一个默认限流额度;通过sentinal的自定义限流进行处理;拒绝策略为直接拒绝;文章来源地址https://www.toymoban.com/news/detail-680059.html
String merchantCode; //商户code,外部请求传入
entry = SphU.entry("price_get_"+merchantCode);
到了这里,关于限流算法深入的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!