-----------------看过之前一致性哈希和最少活跃书的可以跳过-----------------
链接在此:Dubbo的几个负载均衡类--一致性哈希
Dubbo的几个负载均衡类--最少活跃数
Dubbo的几个负载均衡类--轮询
Dubbo的几个负载均衡类--随机
消费者发起调用过程中涉及如下几步
1:接口调用,比如DemoService.demoMethod
2:InvokerInvocationHandler.invoker:消费端启动时,通过JavassistProxyFactory.getProxy反射获取代理类,之后服务调用就直接调用这个Handler
3:MigrationInvoker.invoke:Dubbo 发起调用非常重要的一步,如果失败了,通过这个invoker做切换
4:其他
5:FailoverClusterInvoker.invoke(目前我们使用的,实际在AbstractClusterInvoker里面invoke逻辑是固定的)
6:其他
@Override
public Result invoke(final Invocation invocation) throws RpcException {
checkWhetherDestroyed();
// binding attachments into invocation.
Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments();
if (contextAttachments != null && contextAttachments.size() != 0) {
((RpcInvocation) invocation).addObjectAttachments(contextAttachments);
}
//如果设置了标签规则,则通过list方法过滤出来符合标签的几个invoker
List<Invoker<T>> invokers = list(invocation);
LoadBalance loadbalance = initLoadBalance(invokers, invocation);
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
//请求负载并且做好灾备降级
return doInvoke(invocation, invokers, loadbalance);
}
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyInvokers = invokers;
checkInvokers(copyInvokers, invocation);
String methodName = RpcUtils.getMethodName(invocation);
int len = calculateInvokeTimes(methodName);
// retry loop.
RpcException le = null; // last exception.
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
Set<String> providers = new HashSet<String>(len);
for (int i = 0; i < len; i++) {
//Reselect before retry to avoid a change of candidate `invokers`.
//NOTE: if `invokers` changed, then `invoked` also lose accuracy.
if (i > 0) {
checkWhetherDestroyed();
copyInvokers = list(invocation);
// check again
checkInvokers(copyInvokers, invocation);
}
//这里通过loadBalance做负载
Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
invoked.add(invoker);
RpcContext.getContext().setInvokers((List) invoked);
try {
Result result = invoker.invoke(invocation);
if (le != null && logger.isWarnEnabled()) {
logger.warn("Although retry the method " + methodName
+ " in the service " + getInterface().getName()
+ " was successful by the provider " + invoker.getUrl().getAddress()
+ ", but there have been failed providers " + providers
+ " (" + providers.size() + "/" + copyInvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost()
+ " using the dubbo version " + Version.getVersion() + ". Last error is: "
+ le.getMessage(), le);
}
return result;
} catch (RpcException e) {
if (e.isBiz()) { // biz exception.
throw e;
}
le = e;
} catch (Throwable e) {
le = new RpcException(e.getMessage(), e);
} finally {
providers.add(invoker.getUrl().getAddress());
}
}
throw new RpcException(le.getCode(), "Failed to invoke the method "
+ methodName + " in the service " + getInterface().getName()
+ ". Tried " + len + " times of the providers " + providers
+ " (" + providers.size() + "/" + copyInvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
+ Version.getVersion() + ". Last error is: "
+ le.getMessage(), le.getCause() != null ? le.getCause() : le);
}
-----------------看过之前一致性哈希的可以跳过-----------------
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
// Number of invokers
int length = invokers.size();
// Estimated shortest response time of all invokers
long shortestResponse = Long.MAX_VALUE;
// The number of invokers having the same estimated shortest response time
int shortestCount = 0;
// The index of invokers having the same estimated shortest response time
int[] shortestIndexes = new int[length];
// the weight of every invokers
int[] weights = new int[length];
// The sum of the warmup weights of all the shortest response invokers
int totalWeight = 0;
// The weight of the first shortest response invokers
int firstWeight = 0;
// Every shortest response invoker has the same weight value?
boolean sameWeight = true;
// Filter out all the shortest response invokers
for (int i = 0; i < length; i++) {
Invoker<T> invoker = invokers.get(i);
RpcStatus rpcStatus = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());
// Calculate the estimated response time from the product of active connections and succeeded average elapsed time.
long succeededAverageElapsed = rpcStatus.getSucceededAverageElapsed();
int active = rpcStatus.getActive() + 1;
long estimateResponse = succeededAverageElapsed * active;
int afterWarmup = getWeight(invoker, invocation);
weights[i] = afterWarmup;
// Same as LeastActiveLoadBalance
if (estimateResponse < shortestResponse) {
shortestResponse = estimateResponse;
shortestCount = 1;
shortestIndexes[0] = i;
totalWeight = afterWarmup;
firstWeight = afterWarmup;
sameWeight = true;
} else if (estimateResponse == shortestResponse) {
shortestIndexes[shortestCount++] = i;
totalWeight += afterWarmup;
if (sameWeight && i > 0
&& afterWarmup != firstWeight) {
sameWeight = false;
}
}
}
if (shortestCount == 1) {
return invokers.get(shortestIndexes[0]);
}
if (!sameWeight && totalWeight > 0) {
int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight);
for (int i = 0; i < shortestCount; i++) {
int shortestIndex = shortestIndexes[i];
offsetWeight -= weights[shortestIndex];
if (offsetWeight < 0) {
return invokers.get(shortestIndex);
}
}
}
return invokers.get(shortestIndexes[ThreadLocalRandom.current().nextInt(shortestCount)]);
}
源码中也说明了,这个和最少活跃数的逻辑差不多,只是吧活跃数改成了响应时长。响应时长是按照每个服务提供方的平均响应时间与活跃数的乘积来算。不做过多说明,可以看看这篇文章: Dubbo的几个负载均衡类--最少活跃数
总结:最短响应时长和最少活跃数的相关数据都不是实时更新的,而是根据服务提供方的实际响应在后台异步更新的。所以会有一定时间的延迟
文章来源地址https://www.toymoban.com/news/detail-816452.html
李梓萌:
[1,2,3]:出来的leastIndexes数组只有一个:[1,0,0],直接取第一个。(只走一次第一个if,后面两不符合条件)
[1,1,1]:出来的leastIndexes数组有三个:[1,1,1],如果权重再一样,随机取一个;如果权重不一样,3内随机一个权重值比如2,那么就选第三个(注意条件是:offsetWeight<0)(第一个1走的第一个if,后面两1,都是走的第二个if)
[2,1,2]:出来的leastIndexes数组有一个:[1,0,0],直接取第一个(走了两次第一个if,最后那个2不符合条件)
[2,2,1]:出来的leastIndexes数组有两个:[1,2,0],直接取第一个(先走了第一个if,再走第二个if,第三个1又走了第一个if)
[2,3,3,2,1]:出来的leastIndexes数组有两个:[1,2,0,0,0],直接取第一个(同上,中间3不符合条件)
文章来源:https://www.toymoban.com/news/detail-816452.html
到了这里,关于Dubbo的几个负载均衡类--最短响应时间的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!