系列文章目录和关于我
一丶什么是Sentinel
Sentinel官网
Sentinel 是面向分布式、多语言异构化服务架构的流量治理组件,主要以流量为切入点
,从流量路由
、流量控制
、流量整形
、熔断降级
、系统自适应过载保护
、热点流量防护
等多个维度来帮助开发者保障微服务的稳定性。
流量整形:限制流出某一网络的某一连接的流量与突发,使这类报文以比较均匀的速度向外发送。流量整形通常使用缓冲区和令牌桶来完成,当报文的发送速度过快时,首先在缓冲区进行缓存,在令牌桶的控制下再均匀地发送这些被缓冲的报文
二丶主要功能
1.流量控制
任意时间到来的请求往往是随机不可控的,而系统的处理能力是有限的。我们需要根据系统的处理能力对流量进行控制
流量控制有以下几个角度:
- 资源的调用关系,例如资源的调用链路,资源和资源之间的关系;
- 运行指标,例如 QPS、线程池、系统负载等;
- 控制的效果,例如直接限流、冷启动、排队等。
2.熔断降级
当调用链路中某个资源出现不稳定,例如,表现为 timeout,异常比例升高的时候,则对这个资源的调用进行限制,并让请求快速失败,避免影响到其它的资源,最终产生雪崩的效果。
为何发生雪崩:如下图中服务D奄奄一息,其他服务对它具备依赖,对服务D发起调用的时候常常超时,RT很大,导致服务G线程都block在调用服务D的这一步中,导致服务G也奄奄一息,久而久之其他服务都处于不可服务的状态。
3.系统负载保护
当系统负载较高的时候,如果还持续让请求进入,可能会导致系统崩溃,无法响应。Sentinel 提供了对应的保护机制,让系统的入口流量和系统的负载达到一个平衡,保证系统在能力范围之内处理最多的请求。
三丶基本使用
1.sentinel依赖引入
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-core</artifactId>
<version>版本号</version>
</dependency>
2.初始化限流规则
private static void initFlowQpsRule() {
//限流规则列表
List<FlowRule> rules = new ArrayList<FlowRule>();
//第一个规则
FlowRule rule1 = new FlowRule();
//rule1针对什么资源(资源:指你要对什么限流,一般是方法名称)
rule1.setResource(KEY);
// 设置限流阈值为20
rule1.setCount(20);
//限流策略:
// 0:线程数(那么就是20个线程并发访问的时候限流)
//1:qps:每秒访问资源的数量,超过20进行限流
rule1.setGrade(RuleConstant.FLOW_GRADE_QPS);
//将受来源限制的应用程序名称
rule1.setLimitApp("default");
rules.add(rule1);
FlowRuleManager.loadRules(rules);
}
3.限流使用
Entry entry = null;
try {
entry= SphU.entry("资源名称");
//执行正常业务逻辑
} catch (BlockException e) {
//执行被限流后的业务逻辑
} finally {
if (entry != null) {
entry.close();
}
}
4.控制台设置限流规则
5.控制台设置降级规则
四丶基本原理
1.基本概念
-
Entry
在Sentinel中,所有的资源都对应一个资源名称以及一个 Entry。
Entry负责记录当前调用的信息:
- 创建时间:用于统计rt
- 当前节点Node:当前上下文中资源的统计信息(Node是Sentinel中的一个接口,负责记录实时统计信息)。
- 来源Node:调用来源方信息。
-
Slot
每一个 Entry 创建的时候,同时也会创建一系列功能插槽(slot chain)。这些插槽有不同的职责。
2.原理图
五丶源码学习
Sentinel不仅支持单机流控,还支持集群流控制,个人认为在流量分配均匀的情况下,单机流控完全够用了,并且集群流控需要额外的资源来进行集群服务器信息同步,感觉用处不是很大。
0.Sentinel SPI
一个框架需要考虑到扩展性,实现扩展性的一个很好的方式就是SPI(Service Provider Interface)SPI可与实现将装配的控制权移到程序之外,实现使用方和提供方的解耦。
Sentinel提供了SpiLoader方便进行SPI服务实现的加载,Sentinel中很多核心组件都依赖此类进行加载。常见使用如下
SpiLoader.of(xxx.class).loadInstanceListSorted()
0.1 of 方法创建SpiLoader实例
这种double check + synchronized实现线程安全的方式在Sentinel中非常常见。
0.2 SpiLoader对象加载服务提供者
SpiLoader会使用ClassLoader读取META-INF/services/服务提供者class全限定类名
文件中的数据,并解析@Spi注解中的内容,决定是否单例,是否默认实现,以及顺序等内容,加载服务实现并返回结果。
1.FlowRuleManager 管理FlowRule
- FlowRule: 表示对资源采取何种流控手段。
- FlowRuleManager:提供FlowRule的存储,查询等功能。
FlowRuleManager内部使用map来存储管理资源和对应的流控规则(一个资源可存在多个流控规则)。
另外还提供PropertyListener来实现FlowRule变化后的回调。
2.SphU.entry 进行流控
2.1 Env 调用InitFunc#init
这里会使用Env中的static final单例CtSph对象进行流控规则,并且会进行InitFunc#init的调用
2.2 包装资源为StringResourceWrapper
ResourceWrapper是对资源的包装,存在两个实现,MethodResourceWrapper在调用SphU#entry(Method method)的时候使用到。
上面原理图中提到,Sentinel具备slot链条,在获取链条的时候,会根据ResourceWrapper从缓存map中获取,hash的规则是ResourceWrapper的名称,MethodResourceWrapper的名称是:方法定义类全限定名称:方法名称(参数类型全限定类名)
。
2.3 entryWithPriority 进行流控
2.3.1 获取Context
Context保存当前调用元数据,首先会通过ContextUtil从ThreadLocal中获取,如果不存在会new一个,并且设置到ThreadLocal中,随着Entry#close方法的时候会进行资源释放。
构建Context的时候,会设置其中的Node(名称为sentinel_default_context的EntranceNode),
Node在Sentinel中用来生成树状结构,描述方法的调用
2.3.2 lookProcessChain构建Slot执行链条
slot链条式sentinel实现流量统计,和限流的核心,获取slot链条ProcessorSlot的代码如下
- 从缓存map中获取
-
SPI获取SlotChainBuilder
这里是一个扩展点,我们可与实现基于配置中心的chainBuilder,实现slot链条配置化
-
DefaultSlotChainBuilder 构造ProcessorSlotChain
ProcessorSlotChain也是一个ProcessorSlot处理器插槽,内部使用net属性串联下一个AbstractLinkedProcessorSlot。
我们可扩展自己的ProcessorSlot,使用SPI机制轻松加入到ProcessorSlotChain中。
2.3.2 依次执行ProcessorSlot#entry
1.NodeSelectorSlot
NodeSelectorSlot 负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流降级
2.ClusterBuilderSlot
ClusterBuilderSlot会生成用于存储资源的统计信息以及调用者信息的ClusterNode,ClusterNode会负责存储该资源的 RT, QPS, thread count 等等,这些信息将用作为多维度限流,降级的依据。
3.StatisticSlot
用于记录、统计不同纬度的 runtime 指标监控信息
统计请求通过数量使用了StatisticNode#addPassRequest方法
最终都是使用ArrayMetric进行记录, ArrayMetric 内部使用OccupiableBucketLeapArray或者BucketLeapArray进行计数,具体如何记录在后续章节中分析。
4.AuthoritySlot
基于白名单黑名单逻辑的权限校验,默认情况是没有启用的。
5.SystemSlot
通过系统的状态,例如 load,cpu使用率等,来控制总的入口流量。qps使用的是滑动窗口算法进行统计,load,cpu使用率这些指标通过com.sun.management.OperatingSystemMXBean获取。
6.FlowSlot
根据预设的限流规则以及前面 slot 统计的状态,来进行流量控制。
可看到,每一个配置的FlowRule,都会调用canPassCheck检查,只要存在任何一个不满足要求,都会抛出FlowException
这里会根据FlowRule#setControlBehavior的不同选择不同的TrafficShapingController进行校验,也可以通过FlowRule#setRater直接指定的实现。
-
DefaultController :默认流量整形控制器,超过任何规则的阈值后,新的请求就会立即拒绝,拒绝方式为抛出
FlowException
这里代码逻辑较为简单,获取当前qps或者线程数,如果acquireCount加上当前qps大于count(阈值)那么返回false。
另外可以看到Sentinel提供了预占能力。
实现的难点在于怎么统计qps,统计线程数,这部分在后续章节中进行学习。
-
ThrottlingController :均速排队,严格控制请求通过的时间间隔,也即是让请求以均匀的速度通过,对应的是漏桶算法。下面是判断是否通过的代码:
private boolean checkPassUsingCachedMs(int acquireCount, double maxCountPerStat) { //当前时间 long currentTime = TimeUtil.currentTimeMillis(); //statDurationMs = 1000ms 即1s(假设qps限制为20,maxCountPerStat=20) //1000ms * 1(请求数量)/ maxCountPerStat = 产生1个令牌所需要耗费的时间 long costTime = Math.round(1.0d * statDurationMs * acquireCount / maxCountPerStat); // costTime + 上一个请求通过的时间 = 什么时间,此令牌可以产生 long expectedTime = costTime + latestPassedTime.get(); //如果当前时间小于 满足令牌要求的时间(expectedTime) if (expectedTime <= currentTime) { // Contention may exist here, but it's okay. //设置最后通过请求的时间(latestPassedTime是AtomicLong) latestPassedTime.set(currentTime); return true; } else { // 计算等待的时间 long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis(); //等待的时间超过了排队等待的阈值(默认情况下500ms)那么返回false if (waitTime > maxQueueingTimeMs) { return false; } //上一次请求通过的时间,加上产生令牌需要的时间(addAndGet是自旋+cas操作) long oldTime = latestPassedTime.addAndGet(costTime); //等待时间 waitTime = oldTime - TimeUtil.currentTimeMillis(); //如果超过了等待阈值 那么cas减小时间,相当于回滚操作 if (waitTime > maxQueueingTimeMs) { latestPassedTime.addAndGet(-costTime); return false; } // sleep当前线程进行等待 if (waitTime > 0) { sleepMs(waitTime); } return true; } }
其实这个代码也不是天衣无缝:
这段代码也有很妙的地方:
-
WarmUpRateLimiterController/WarmUpController: 预热/冷启动方式。当系统长期处理低水平的情况下,当流量突然增加时,直接把系统拉升到高水位可能瞬间把系统压垮。通过"冷启动",让通过的流量缓慢增加,在一定时间内逐渐增加到阈值的上限,给系统一个预热的时间,避免冷系统被压垮。
这部分源码设计到guava的预热算法,后续了解学习
7.DegradeSlot&DefaultCircuitBreakerSlot
二者都是熔断器插槽,并且短路原理一致,DegradeSlot在用户个资源指定DegradeRule(降级规则)的时候会根据DegradeRule构造出断路器CircuitBreaker,而DefaultCircuitBreakerSlot则是用户配置同一的短路规则,并没有给资源指定特定规则的时候,会使用默认规则生成CircuitBreaker。
DegradeRule分为三种:
- RuleConstant.DEGRADE_GRADE_RT
根据rt来进行熔断,对应ResponseTimeCircuitBreaker(响应时间断路器)
- RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO
根据异常比列进行熔断,对应ExceptionCircuitBreaker
- RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT
根据错误进行熔断,对应ExceptionCircuitBreaker
统计rt,错误次数,错误率都是基于LeapArray(滑动窗口算法)进行统计。
-
在DegradeSlot&DefaultCircuitBreakerSlot的entry方法(限流操作会调用到此方法)此方法会轮流调用CircuitBreaker#tryPass进行短路校验。
这里都会使用AbstractCircuitBreaker#tryPass
CircuitBreaker具备三种状态:
- Open,断路器打开,此时会判断是否大于接口恢复时间,如果大于那么修改为半开,让一个请求先试试水,如果成功那么从半开修改为开。
- Close:断路器关闭了,此时所有请求都可以通过。并且根据指标(rt,错误率,错误次数)来决定是否将断路器修改为开
- Half open,半开状态,此时除了试水的线程可以通过,其他线程都会抛出DegradeException,试水线程会根据是否调用异常,来决定修改为开还是关
可以看出断路器最重要的是维护这三种状态,并且状态的切换需要保证线程安全
如果断路器处于Open,首先会通过retryTimeoutArrived方法判断当前时间是否大于恢复时间点,如果不是,说明短路了返回false,DegradeSlot会抛出DegradeException,阻止调用。如果大于恢复时间点,会调用fromOpenToHalfOpen 尝试cas open->half_open,并且注册回调,此回调会在 entry.close()的时候被触发。
-
在DegradeSlot&DefaultCircuitBreakerSlot的exit方法中,会触发circuitBreaker#onRequestComplete(如果执行顺利没有出现BlockException异常的话)
这里会根据DegradeRule调用不同的CircuitBreaker
-
ExceptionCircuitBreaker
-
从Entry当中拿到执行的错误,如果具备错误,那么更新滑动窗口中的错误数
private void handleStateChangeWhenThresholdExceeded(Throwable error) { //当前是开,那么什么也不做,降级slot在开的状态会判断当前时间和恢复时间,实现降级效果,so,这里不需要做什么 if (currentState.get() == State.OPEN) { return; } //半开 if (currentState.get() == State.HALF_OPEN) { // 半开但是执行过程中无异常,那么调整为close,说明被调用方法接口已经稳定了,可以修改为close if (error == null) { fromHalfOpenToClose(); } else { //如果具备异常,那么修改为开,并且更新接口恢复时间,实现熔断 fromHalfOpenToOpen(1.0d); } return; } //开状态,滑动窗口统计错误数,和总数 List<SimpleErrorCounter> counters = stat.values(); long errCount = 0; long totalCount = 0; //统计错误数,和总数 for (SimpleErrorCounter counter : counters) { errCount += counter.errorCount.sum(); totalCount += counter.totalCount.sum(); } //小于 最小请求数(默认5)认为样本太少,直接啥也不做 if (totalCount < minRequestAmount) { return; } //默认根据错误次数,curCount记录错误次数 double curCount = errCount; //根据错误比例,curCount记录错误率 if (strategy == DEGRADE_GRADE_EXCEPTION_RATIO) { // Use errorRatio curCount = errCount * 1.0d / totalCount; } //错误比例 or 错误率超过了阈值,那么调整为开,并且更新接口恢复时间,实现熔断 if (curCount > threshold) { transformToOpen(curCount); } }
-
ResponseTimeCircuitBreaker
和上面类似,只不过是根据rt来判断,而不是错误次数,错误率。同使用滑动窗口实现计数。
-
2.4 LeapArry是如何实现滑动窗口计数的
ArrayMetirc被StatisticSlot调用addPass方法
这里的data便是LeapArray的子类BucketLeapArray,或者OccupiableBucketLeapArray
2.4.1构造方法
从构造方法我们可以看出LeapArray的构成
可以看到数据的存储使用了AtomicReferenceArray,其中sampleCount = 2,intervalInMs = 1000,也就说默认只有两个窗口,时间跨度为1s。
2.4.2 获取当前时间对应的桶
这一段就是滑动窗口的精髓
public WindowWrap<T> currentWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}
//根据当前时间,计算当前时间位于窗口数组中的下标
// 比如当前是 1600ms,窗口总大小为1000ms,一共两个格子,每一个格子大小为500ms
//1600ms 落在(1600/500)% 2 = 1
int idx = calculateTimeIdx(timeMillis);
// 计算当前时间 对应的起始 => 1600 - 1600%500 = 1500
// |0~500|500~1000|1000~1500|1500~2000| 1600位于1500~2000中所以起始为1500
long windowStart = calculateWindowStart(timeMillis);
while (true) {
//获取当前时间对应的桶
WindowWrap<T> old = array.get(idx);
//如果桶为null
if (old == null) {
//创建新元素
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
//cas 设置到数组中
if (array.compareAndSet(idx, null, window)) {
return window;
} else {
//如果cas失败,说明在另外一个线程也是这个index,让当前线程yield放弃cpu
Thread.yield();
}
} else if (windowStart == old.windowStart()) {
//旧桶的start和当前桶一样:意味着是同一秒
//比如600ms也是位于下标1,但是start 是500,就和当前1600ms的start 1500不同 意味着不是同一秒
return old;
} else if (windowStart > old.windowStart()) {
//比如600ms也是位于下标1,但是start 是500,
// 就和当前1600ms的start 1500,1500>500,意味着当前这个桶已经过期了
//上锁
if (updateLock.tryLock()) {
try {
//设置windowStart 并且清空旧桶
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
//上锁失败,那么
Thread.yield();
}
} else if (windowStart < old.windowStart()) {
//当前start小于旧桶start
//这种情况不应该发生,因为时间时越来越来大的
//除非当前线程 一值分配不到时间片?
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
}
这里有一个变量windowStart,就如同一个版本,标志了元素是否过期(旧元素windowStart小于当前windowStart说明旧元素过期),是否位于一个桶(windowStart相同说明位于同一个下标,比如1600,和1700,windowStart都是1500)
2.4.3 改变桶记录的值
上面看了是如何拿到当前时间对应下标的桶的,那么桶中记录的数据是如何变更的?
可以看到这里使用了LongAdder进行计数,因为一个桶可能存在多个线程并发更新,使用LongAdder实现热点数据分离,也减少缓存伪共享带来的开销。(JUC源码学习笔记4——原子类源码分析,CAS,Volatile内存屏障,缓存伪共享与UnSafe相关方法)
2.4.4 如何获取当前qps
上面我们直到了,通过的请求,根据时间对应了ArrayMetric滑动窗口数组中的一个元素,这个元素是MetricBucket类型,内部有一个LongAdder的数组,来记录pass,block(通过的请求,被拦截的请求)的数目,那么怎么根据这些数据获取当前qps昵?FlowSlot根据qps限流得使用这个数据呀!
我们看一下这里的pass方法
再看下values方法,入参就是当前时间,这个当前时间timeMills用来排除过期元素
拿通过次数自然是使用了LongAdder#sum方法,但是LongAdder#sum不具备瞬时一致性,也就说可能遍历到n位置,但是其他线程更改n-1位置,n-1的变化不会体现在sum的总和中。
这里有一段代码我觉得有点秒(个人理解,一点拙见不一定对)
就是这里统计pass会先执行data.currentWindow(),把当前时间对应的滑动窗口元素进行初始化。
比如执行data.currentWindow()当前时间点是1001ms,对应桶为null,那么这一步会初始化1001ms对应的桶元素。
后续values遍历的时间是1003ms,这时候就可以拿到1001ms初始化桶的引用返回,后续加入存在另外一个线程在1004ms进行更新,当前线程执行pass累加的时候就又更大概率统计到
为什么我说更大概率,因为如果不初始化的化,1004ms另外一个线程去初始化会new一个桶元素,这个过程是更耗时的,当前线程values遍历可能就没办法拿到1004ms这个线程初始化的桶了(有道理,但是不多doge)。
六丶总结
-
学到了啥:
- 学习到了SPI和责任链,这两大解耦+扩展利器。学习到了自旋+CAS的操作。
- SPI由一方制定规范,比如SpringBoot的spring.factories,另一方进行扩展,实现服务使用方和提供方的解耦合,以及修改SPI文件内容进行扩展!
- 责任链简直无处不在,在众多框架中,平时工作也常常使用到,使用责任链,责任单一,并且通过改变链条实现扩展!
- 对熔断有了更深刻的理解,特别Open状态根据恢复时间判断+cas修改为半开,让一个线程试试水的操作,我大受震撼!
- 了解到Sentinel还有除了针对单个资源进行限流,还有系统负载限流的功能,这个有点厉害,我一开始还想这个该咋实现,看了SystemSlot,发现自己还是太狭隘了。
- 学习到了SPI和责任链,这两大解耦+扩展利器。学习到了自旋+CAS的操作。
-
不足:
-
对Sentinel的Node理解不够,文档上说Sentine支持调用链路关系进行限流,这个功能挺牛的,但是我并没有详细阅读这部分源码。文章来源:https://www.toymoban.com/news/detail-449542.html
-
对预热的实现没有深入理解,主要是Guava的预热模型,让二阳的我有点晕了,后续结合guava中的限流进行学习。文章来源地址https://www.toymoban.com/news/detail-449542.html
-
到了这里,关于Sentinel基本使用与源码分析的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!