客户端
关键属性
HostReactor
- Map<String, ScheduledFuture<?>> futureMap:缓存向服务端请求ServiceInfo的定时任务
- Map<String, ServiceInfo> serviceInfoMap:缓存从服务端获取的Service信息
- Map<String, Object> updatingMap:用来标记是是否存在其他请求向服务端以相同的条件请求ServiceInfo
ServiceManager
- Map<String, Map<String, Service>> serviceMap:服务端缓存注册信息的Map:Map(namespace, Map(group::serviceName, Service)).
NacosNamingService.getAllInstances
获取所有的Instance集合
- boolean subscribe:true,请求结果需要进行缓存并设置定时任务更新缓存;false:直接从服务端查询并返回结果
public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters,
boolean subscribe) throws NacosException {
ServiceInfo serviceInfo;
if (subscribe) {
//请求结果需要进行缓存并设置定时任务更新缓存
serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
} else {
// 直接向服务端请求并返回结果,客户端不做缓存,不设置定时任务
serviceInfo = hostReactor
.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
}
List<Instance> list;
if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
return new ArrayList<Instance>();
}
return list;
}
HostReactor.getServiceInfoDirectlyFromServer
直接向服务端请求并返回结果,客户端不做缓存,不设置定时任务
public ServiceInfo getServiceInfoDirectlyFromServer(final String serviceName, final String clusters)
throws NacosException {
String result = serverProxy.queryList(serviceName, clusters, 0, false);
if (StringUtils.isNotEmpty(result)) {
return JacksonUtils.toObj(result, ServiceInfo.class);
}
return null;
}
HostReactor.getServiceInfo
根据serviceName和cluster集合获取实例集合,请求结果需要进行缓存并设置定时任务更新缓存
1.从客户端缓存(HostReactor.serviceInfoMap)中获取ServiceInfo
2.本地缓存中不存在对应的值,新建ServiceInfo实例并加入serviceInfoMap中,同时将serviceName加入updatingMap中并向服务端请求获取Instance集合并更新ServiceInfo,更新完成后从updatingMap中删除这个serviceName
3.本地缓存中存在对应的ServiceInfo,且updatingMap中有serviceName时,说明上一个请求正在请求服务端获取信息,调用wait方法,上一个请求更新完内存后会调用notifyAll方法
4.调用scheduleUpdateIfAbsent方法定时向服务端请求并更新serviceInfoMap的值
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
String key = ServiceInfo.getKey(serviceName, clusters);
if (failoverReactor.isFailoverSwitch()) {
return failoverReactor.getService(key);
}
ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
if (null == serviceObj) {
serviceObj = new ServiceInfo(serviceName, clusters);
serviceInfoMap.put(serviceObj.getKey(), serviceObj);
updatingMap.put(serviceName, new Object());
updateServiceNow(serviceName, clusters);
updatingMap.remove(serviceName);
// 说明存在请求在向服务端请求Instance
} else if (updatingMap.containsKey(serviceName)) {
if (UPDATE_HOLD_INTERVAL > 0) {
// hold a moment waiting for update finish
synchronized (serviceObj) {
try {
serviceObj.wait(UPDATE_HOLD_INTERVAL);
} catch (InterruptedException e) {
NAMING_LOGGER
.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
}
}
}
}
// 添加定时任务,向服务端请求并更新serviceInfoMap的值
scheduleUpdateIfAbsent(serviceName, clusters);
return serviceInfoMap.get(serviceObj.getKey());
}
HostReactor.updateServiceNow
// 从服务端获取信息并保存到客户端内存中
private void updateServiceNow(String serviceName, String clusters) {
try {
updateService(serviceName, clusters);
} catch (NacosException e) {
NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);
}
}
// 从服务端获取信息并保存到客户端内存中
public void updateService(String serviceName, String clusters) throws NacosException {
ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
try {
// 请求服务端获取信息
String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);
// 处理从服务端获得的消息(将结果保存到HostReactor.serviceInfoMap中)
if (StringUtils.isNotEmpty(result)) {
processServiceJson(result);
}
} finally {
// 唤醒HostReactor.getServiceInfo中调用wait的线程
if (oldService != null) {
synchronized (oldService) {
oldService.notifyAll();
}
}
}
}
// 处理从服务端获取的消息
// 1.先将json字符串转换为ServiceInfo
// 2.之前在serviceInfoMap存在值,将新得到的ServiceInfo保存到serviceInfoMap中,并将现在的和之前的进行比较,如果内容发生变化,写日志,并发送InstancesChangeEvent事件
// 3.之前在serviceInfoMap不存在值,将新得到的ServiceInfo保存到serviceInfoMap中,写日志,并发送InstancesChangeEvent事件
// 4.返回第一步的ServiceInfo
public ServiceInfo processServiceJson(String json) {
ServiceInfo serviceInfo = JacksonUtils.toObj(json, ServiceInfo.class);
String serviceKey = serviceInfo.getKey();
if (serviceKey == null) {
return null;
}
ServiceInfo oldService = serviceInfoMap.get(serviceKey);
if (pushEmptyProtection && !serviceInfo.validate()) {
//empty or error push, just ignore
return oldService;
}
boolean changed = false;
if (oldService != null) {
if (oldService.getLastRefTime() > serviceInfo.getLastRefTime()) {
NAMING_LOGGER.warn("out of date data received, old-t: " + oldService.getLastRefTime() + ", new-t: "
+ serviceInfo.getLastRefTime());
}
serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
Map<String, Instance> oldHostMap = new HashMap<String, Instance>(oldService.getHosts().size());
for (Instance host : oldService.getHosts()) {
oldHostMap.put(host.toInetAddr(), host);
}
Map<String, Instance> newHostMap = new HashMap<String, Instance>(serviceInfo.getHosts().size());
for (Instance host : serviceInfo.getHosts()) {
newHostMap.put(host.toInetAddr(), host);
}
Set<Instance> modHosts = new HashSet<Instance>();
Set<Instance> newHosts = new HashSet<Instance>();
Set<Instance> remvHosts = new HashSet<Instance>();
List<Map.Entry<String, Instance>> newServiceHosts = new ArrayList<Map.Entry<String, Instance>>(
newHostMap.entrySet());
for (Map.Entry<String, Instance> entry : newServiceHosts) {
Instance host = entry.getValue();
String key = entry.getKey();
if (oldHostMap.containsKey(key) && !StringUtils
.equals(host.toString(), oldHostMap.get(key).toString())) {
modHosts.add(host);
continue;
}
if (!oldHostMap.containsKey(key)) {
newHosts.add(host);
}
}
for (Map.Entry<String, Instance> entry : oldHostMap.entrySet()) {
Instance host = entry.getValue();
String key = entry.getKey();
if (newHostMap.containsKey(key)) {
continue;
}
if (!newHostMap.containsKey(key)) {
remvHosts.add(host);
}
}
if (newHosts.size() > 0) {
changed = true;
NAMING_LOGGER.info("new ips(" + newHosts.size() + ") service: " + serviceInfo.getKey() + " -> "
+ JacksonUtils.toJson(newHosts));
}
if (remvHosts.size() > 0) {
changed = true;
NAMING_LOGGER.info("removed ips(" + remvHosts.size() + ") service: " + serviceInfo.getKey() + " -> "
+ JacksonUtils.toJson(remvHosts));
}
if (modHosts.size() > 0) {
changed = true;
updateBeatInfo(modHosts);
NAMING_LOGGER.info("modified ips(" + modHosts.size() + ") service: " + serviceInfo.getKey() + " -> "
+ JacksonUtils.toJson(modHosts));
}
serviceInfo.setJsonFromServer(json);
if (changed) {
NotifyCenter.publishEvent(new InstancesChangeEvent(this.notifierEventScope, serviceInfo.getName(), serviceInfo.getGroupName(),
serviceInfo.getClusters(), serviceInfo.getHosts()));
DiskCache.write(serviceInfo, cacheDir);
}
} else {
changed = true;
NAMING_LOGGER.info("init new ips(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> "
+ JacksonUtils.toJson(serviceInfo.getHosts()));
serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
NotifyCenter.publishEvent(new InstancesChangeEvent(this.notifierEventScope, serviceInfo.getName(), serviceInfo.getGroupName(),
serviceInfo.getClusters(), serviceInfo.getHosts()));
serviceInfo.setJsonFromServer(json);
DiskCache.write(serviceInfo, cacheDir);
}
MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());
if (changed) {
NAMING_LOGGER.info("current ips:(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> "
+ JacksonUtils.toJson(serviceInfo.getHosts()));
}
return serviceInfo;
}
NamingProxy.queryList
向服务端发送http请求来获取该请求条件下的Instance集合文章来源:https://www.toymoban.com/news/detail-523583.html
- String serviceName:服务名称
- String clusters:用“,”隔开的cluster集合的字符串
- boolean healthyOnly:返回结果里的实例是否健康,默认为false
public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly)
throws NacosException {
final Map<String, String> params = new HashMap<String, String>(8);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, serviceName);
params.put("clusters", clusters);
params.put("udpPort", String.valueOf(udpPort));
params.put("clientIP", NetUtils.localIP());
params.put("healthyOnly", String.valueOf(healthyOnly));
return reqApi(UtilAndComs.nacosUrlBase + "/instance/list", params, HttpMethod.GET);
}
服务端
InstanceController.list
@GetMapping("/list")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.READ)
public ObjectNode list(HttpServletRequest request) throws Exception {
String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);
String agent = WebUtils.getUserAgent(request);
String clusters = WebUtils.optional(request, "clusters", StringUtils.EMPTY);
String clientIP = WebUtils.optional(request, "clientIP", StringUtils.EMPTY);
int udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort", "0"));
String env = WebUtils.optional(request, "env", StringUtils.EMPTY);
boolean isCheck = Boolean.parseBoolean(WebUtils.optional(request, "isCheck", "false"));
String app = WebUtils.optional(request, "app", StringUtils.EMPTY);
String tenant = WebUtils.optional(request, "tid", StringUtils.EMPTY);
boolean healthyOnly = Boolean.parseBoolean(WebUtils.optional(request, "healthyOnly", "false"));
return doSrvIpxt(namespaceId, serviceName, agent, clusters, clientIP, udpPort, env, isCheck, app, tenant,
healthyOnly);
}
InstanceController.doSrvIpxt
在服务端中根据条件查询Instance集合,方法太长了,这里只显示部分关键代码
healthyOnly为true时:默认情况下返回的所有满足条件且健康且能接受请求的Instance,如果不存在健康状态的Instance,返回所有满足条件且能接受请求的Instance。
healthyOnly为false时:返回所有满足条件的能接受请求的Instance,不管健康状态。文章来源地址https://www.toymoban.com/news/detail-523583.html
public ObjectNode doSrvIpxt(String namespaceId, String serviceName, String agent, String clusters, String clientIP,
int udpPort, String env, boolean isCheck, String app, String tid, boolean healthyOnly) throws Exception {
ClientInfo clientInfo = new ClientInfo(agent);
ObjectNode result = JacksonUtils.createEmptyJsonNode();
// 根据namespaceId和serviceName从ServiceManager.serviceMap中获取Service
Service service = serviceManager.getService(namespaceId, serviceName);
long cacheMillis = switchDomain.getDefaultCacheMillis();
// 不存在Service时,返回给客户端空集合,下面if逻辑中省略了部分参数组装代码
if (service == null) {
result.replace("hosts", JacksonUtils.createEmptyArrayNode());
return result;
}
List<Instance> srvedIPs;
// 如果clusters为空,获取所有的Instance,否则获取clusters对应的Cluster集合里的所有Instance
srvedIPs = service.srvIPs(Arrays.asList(StringUtils.split(clusters, ",")));
// 使用Service中的selector对上面获得的Instance集合进行过滤
if (service.getSelector() != null && StringUtils.isNotBlank(clientIP)) {
srvedIPs = service.getSelector().select(clientIP, srvedIPs);
}
// 不存在Instance时,返回给客户端空集合,下面if逻辑中省略了部分参数组装代码
if (CollectionUtils.isEmpty(srvedIPs)) {
result.set("hosts", JacksonUtils.createEmptyArrayNode());
return result;
}
Map<Boolean, List<Instance>> ipMap = new HashMap<>(2);
ipMap.put(Boolean.TRUE, new ArrayList<>());
ipMap.put(Boolean.FALSE, new ArrayList<>());
// 将Instance根据健康状态进行分类
for (Instance ip : srvedIPs) {
ipMap.get(ip.isHealthy()).add(ip);
}
double threshold = service.getProtectThreshold();
// 如果集合中健康状态的Instance的占比<=Serice.protectThreshold,会将不健康的Instance加入健康的集合中,即默认所有的Instance都是健康的,等效于healthyOnly为false,Serice.protectThreshold的默认值为0,即healthyOnly为true时,返回的所有Instance要么都是健康的,要么都是不健康的
if ((float) ipMap.get(Boolean.TRUE).size() / srvedIPs.size() <= threshold) {
ipMap.get(Boolean.TRUE).addAll(ipMap.get(Boolean.FALSE));
ipMap.get(Boolean.FALSE).clear();
}
// 保存需要返回给客户端的Instance信息
ArrayNode hosts = JacksonUtils.createEmptyArrayNode();
for (Map.Entry<Boolean, List<Instance>> entry : ipMap.entrySet()) {
List<Instance> ips = entry.getValue();
// healthyOnly=true,说明客户端只要健康状态的Instance,会过滤掉不健康的的Instance,即key为false的时不处理
if (healthyOnly && !entry.getKey()) {
continue;
}
for (Instance instance : ips) {
// 如果Instance不能接收外部的请求,跳过当前Instance
if (!instance.isEnabled()) {
continue;
}
ObjectNode ipObj = JacksonUtils.createEmptyJsonNode();
// 省略了将instance的内从组装到ipObj里的过程
hosts.add(ipObj);
}
}
result.replace("hosts", hosts);
....
return result;
}
到了这里,关于Nacos 1.4.x 服务发现源码阅读的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!