Dubbo中有一个非常本质和重要的功能,那就是服务的自动注册与发现,而这个功能是通过注册中心来实现的。而dubbo中考虑了外部许多的注册组件的实现,zk,redis,etcd,consul,eureka…
各自实现方式各有不同,但是对外表现都是一致的:都实现了 Registry 接口!
今天我们就来看看最常用的注册中心 Zookeeper 的接入实现吧!
1. dubbo在 zookeeper 中的服务目录划分
注册中心的作用主要分发服务的发布,与服务订阅,及服务推送,服务查询!而zk中,则以服务主单位进行目录划分的。
整个zookeeper的路径概览:
/dubbo: root 目录, 持久化目录 (可通过 group=xxx 自定义root目录)
/dubbo/xxx.xx.XxxService: 每个服务一个路径,持久化目录
/dubbo/xxx.xx.XxxService/configurators: 配置存放路径,默认为空
/dubbo/xxx.xx.XxxService/providers: 服务提供者目录,所有提供者以其子路径形式存储,持久化目录。各服务提供者以临时目录形式存在,路径样例如: dubbo%3A%2F%2F192.168.56.1%3A20880%2Forg.apache.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddubbo-demo-api-provider%26default%3Dtrue%26deprecated%3Dfalse%26dubbo%3D2.0.2%26dynamic%3Dtrue%26generic%3Dfalse%26interface%3Dorg.apache.dubbo.demo.DemoService%26methods%3DsayHello%2CsayHelloAsync%26pid%3D112756%26release%3D%26side%3Dprovider%26timestamp%3D1588548412331
/dubbo/xxx.xx.XxxService/consumers: 服务消费者目录, 持久化路径。所有消费者以其子路径形式存储,路径样例如下:consumer%3A%2F%2F192.168.1.4%2Forg.apache.dubbo.rpc.service.GenericService%3Fapplication%3Ddubbo-demo-api-consumer%26category%3Dconsumers%26check%3Dfalse%26dubbo%3D2.0.2%26generic%3Dtrue%26interface%3Dorg.apache.dubbo.demo.DemoService%26pid%3D139620%26side%3Dconsumer%26sticky%3Dfalse%26timestamp%3D1588596195728
/dubbo/xxx.xx.XxxService/routers: 路由配置信息,默认为空
2. zookeeper 组件的接入
Zookeeper 是在本地服务通过socket端口暴露之后,再调用 RegistryFactoryWrapper 进行获取的。
注册时的URL如下: registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-api-provider&dubbo=2.0.2&export=dubbo%3A%2F%2F192.168.56.1%3A20880%2Forg.apache.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddubbo-demo-api-provider%26bind.ip%3D192.168.56.1%26bind.port%3D20880%26default%3Dtrue%26deprecated%3Dfalse%26dubbo%3D2.0.2%26dynamic%3Dtrue%26generic%3Dfalse%26interface%3Dorg.apache.dubbo.demo.DemoService%26methods%3DsayHello%2CsayHelloAsync%26pid%3D135556%26release%3D%26side%3Dprovider%26timestamp%3D1588518573613&pid=135556®istry=zookeeper×tamp=1588518545808
zk的调用分两种情况,一是作为服务提供者时会在 ServiceConfig#doExportUrlsFor1Protocol 中,进行远程服务暴露时会拉起。二是在消费者在进行远程调用时会 ReferenceConfig#createProxy 时拉取以便获取提供者列表。我们以ServiceConfig为例看看其调用zk的过程:
// ServiceConfig#doExportUrlsFor1Protocol
// 此处 PROTOCOL 是一个SPI类,默认实例是DubboProtocol,但其在处理具体协议时会根据协议类型做出相应选择
// 此处 协议为 registry, 所以会选择 RegistryProtocol 进行export() 处理
Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
// 而在RegistryProtocol中,又有几个重要属性,是dubbo进行依赖注入完成的
// org.apache.dubbo.registry.integration.RegistryProtocol
public void setCluster(Cluster cluster) {
this.cluster = cluster;
}
public void setProtocol(Protocol protocol) {
this.protocol = protocol;
}
public void setRegistryFactory(RegistryFactory registryFactory) {
this.registryFactory = registryFactory;
}
// org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper#export
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
// 检查是否是要进行服务注册,即检查协议字段是否是 registry, 即前缀是 registry://
// 注册完成后即返回
if (UrlUtils.isRegistry(invoker.getUrl())) {
// ProtocolFilterWrapper.export()
return protocol.export(invoker);
}
return new ListenerExporterWrapper<T>(protocol.export(invoker),
Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
.getActivateExtension(invoker.getUrl(), EXPORTER_LISTENER_KEY)));
}
// org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper#export
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
// 同样判断是否是注册请求
if (UrlUtils.isRegistry(invoker.getUrl())) {
// RegistryProtocol.export()
return protocol.export(invoker);
}
return protocol.export(buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER));
}
// org.apache.dubbo.registry.integration.RegistryProtocol#export
@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
URL registryUrl = getRegistryUrl(originInvoker);
// url to export locally
URL providerUrl = getProviderUrl(originInvoker);
// Subscribe the override data
// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
// the same service. Because the subscribed is cached key with the name of the service, it causes the
// subscription information to cover.
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
//export invoker
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
// url to registry
// 获取registry实例,如 zk client
final Registry registry = getRegistry(originInvoker);
final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);
// decide if we need to delay publish
boolean register = providerUrl.getParameter(REGISTER_KEY, true);
if (register) {
// 注册服务地址到注册中心
register(registryUrl, registeredProviderUrl);
}
// register stated url on provider model
registerStatedUrl(registryUrl, registeredProviderUrl, register);
// Deprecated! Subscribe to override rules in 2.6.x or before.
// 订阅目录
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
exporter.setRegisterUrl(registeredProviderUrl);
exporter.setSubscribeUrl(overrideSubscribeUrl);
notifyExport(exporter);
//Ensure that a new exporter instance is returned every time export
return new DestroyableExporter<>(exporter);
}
/**
* Get an instance of registry based on the address of invoker
*
* @param originInvoker
* @return
*/
protected Registry getRegistry(final Invoker<?> originInvoker) {
URL registryUrl = getRegistryUrl(originInvoker);
// RegistryFactory 又是通过 SPI 机制生成的
// 会根据具体的注册中心的类型创建调用具体实例,如此处为: zookeeper, 所以会调用 ZookeeperRegistryFactory.getRegistry()
return registryFactory.getRegistry(registryUrl);
}
// 所有 RegistryFactory 都会被包装成 RegistryFactoryWrapper, 以便修饰
// org.apache.dubbo.registry.RegistryFactoryWrapper#getRegistry
@Override
public Registry getRegistry(URL url) {
// 对于zk, 会调用 ZookeeperRegistryFactory
return new ListenerRegistryWrapper(registryFactory.getRegistry(url),
Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(RegistryServiceListener.class)
.getActivateExtension(url, "registry.listeners")));
}
// org.apache.dubbo.registry.support.AbstractRegistryFactory#getRegistry(org.apache.dubbo.common.URL)
@Override
public Registry getRegistry(URL url) {
if (destroyed.get()) {
LOGGER.warn("All registry instances have been destroyed, failed to fetch any instance. " +
"Usually, this means no need to try to do unnecessary redundant resource clearance, all registries has been taken care of.");
return DEFAULT_NOP_REGISTRY;
}
url = URLBuilder.from(url)
.setPath(RegistryService.class.getName())
.addParameter(INTERFACE_KEY, RegistryService.class.getName())
.removeParameters(EXPORT_KEY, REFER_KEY)
.build();
String key = createRegistryCacheKey(url);
// Lock the registry access process to ensure a single instance of the registry
LOCK.lock();
try {
Registry registry = REGISTRIES.get(key);
if (registry != null) {
return registry;
}
//create registry by spi/ioc
// 调用子类方法创建 registry 实例,此处为 ZookeeperRegistry.createRegistry
registry = createRegistry(url);
if (registry == null) {
throw new IllegalStateException("Can not create registry " + url);
}
REGISTRIES.put(key, registry);
return registry;
} finally {
// Release the lock
LOCK.unlock();
}
}
// org.apache.dubbo.registry.zookeeper.ZookeeperRegistryFactory#createRegistry
@Override
public Registry createRegistry(URL url) {
// 最终将Zk组件接入到应用中了,后续就可以使用zk提供的相应功能了
return new ZookeeperRegistry(url, zookeeperTransporter);
}
至此,zookeeper被接入了。我们先来看下 zookeeper 注册中心构造方法实现:
// org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#ZookeeperRegistry
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
// 抽象父类处理
super(url);
if (url.isAnyHost()) {
throw new IllegalStateException("registry address == null");
}
// group=dubbo
String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT);
if (!group.startsWith(PATH_SEPARATOR)) {
group = PATH_SEPARATOR + group;
}
this.root = group;
zkClient = zookeeperTransporter.connect(url);
zkClient.addStateListener((state) -> {
if (state == StateListener.RECONNECTED) {
logger.warn("Trying to fetch the latest urls, in case there're provider changes during connection loss.\n" +
" Since ephemeral ZNode will not get deleted for a connection lose, " +
"there's no need to re-register url of this instance.");
ZookeeperRegistry.this.fetchLatestAddresses();
} else if (state == StateListener.NEW_SESSION_CREATED) {
logger.warn("Trying to re-register urls and re-subscribe listeners of this instance to registry...");
try {
ZookeeperRegistry.this.recover();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
} else if (state == StateListener.SESSION_LOST) {
logger.warn("Url of this instance will be deleted from registry soon. " +
"Dubbo client will try to re-register once a new session is created.");
} else if (state == StateListener.SUSPENDED) {
} else if (state == StateListener.CONNECTED) {
}
});
}
// org.apache.dubbo.registry.support.FailbackRegistry#FailbackRegistry
public FailbackRegistry(URL url) {
super(url);
// retry.period=5000
this.retryPeriod = url.getParameter(REGISTRY_RETRY_PERIOD_KEY, DEFAULT_REGISTRY_RETRY_PERIOD);
// since the retry task will not be very much. 128 ticks is enough.
// 当连接失败时,使用后台定时重试
retryTimer = new HashedWheelTimer(new NamedThreadFactory("DubboRegistryRetryTimer", true), retryPeriod, TimeUnit.MILLISECONDS, 128);
}
// org.apache.dubbo.registry.support.AbstractRegistry#AbstractRegistry
public AbstractRegistry(URL url) {
setUrl(url);
// 默认会开启file.cache 选项,所以一般会要求存在该文件
if (url.getParameter(REGISTRY__LOCAL_FILE_CACHE_ENABLED, true)) {
// Start file save timer
syncSaveFile = url.getParameter(REGISTRY_FILESAVE_SYNC_KEY, false);
// 默认路径 $USER_HOME/.dubbo/dubbo-registry-dubbo-demo-api-provider-127.0.0.1-2181.cache
String defaultFilename = System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getParameter(APPLICATION_KEY) + "-" + url.getAddress().replaceAll(":", "-") + ".cache";
String filename = url.getParameter(FILE_KEY, defaultFilename);
File file = null;
if (ConfigUtils.isNotEmpty(filename)) {
file = new File(filename);
if (!file.exists() && file.getParentFile() != null && !file.getParentFile().exists()) {
if (!file.getParentFile().mkdirs()) {
throw new IllegalArgumentException("Invalid registry cache file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!");
}
}
}
this.file = file;
// When starting the subscription center,
// we need to read the local cache file for future Registry fault tolerance processing.
// 如果文件存在先从其中加载原有配置
loadProperties();
notify(url.getBackupUrls());
}
}
// 如果存在注册中心缓存文件,则从其中加载各属性值
// 其值为 xxx.xxxService=xxx 格式
private void loadProperties() {
if (file != null && file.exists()) {
InputStream in = null;
try {
in = new FileInputStream(file);
properties.load(in);
if (logger.isInfoEnabled()) {
logger.info("Load registry cache file " + file + ", data: " + properties);
}
} catch (Throwable e) {
logger.warn("Failed to load registry cache file " + file, e);
} finally {
if (in != null) {
try {
in.close();
} catch (IOException e) {
logger.warn(e.getMessage(), e);
}
}
}
}
}
// org.apache.dubbo.registry.support.AbstractRegistry#notify(java.util.List<org.apache.dubbo.common.URL>)
protected void notify(List<URL> urls) {
if (CollectionUtils.isEmpty(urls)) {
return;
}
for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) {
URL url = entry.getKey();
if (!UrlUtils.isMatch(url, urls.get(0))) {
continue;
}
Set<NotifyListener> listeners = entry.getValue();
if (listeners != null) {
for (NotifyListener listener : listeners) {
try {
notify(url, listener, filterEmpty(url, urls));
} catch (Throwable t) {
logger.error("Failed to notify registry event, urls: " + urls + ", cause: " + t.getMessage(), t);
}
}
}
}
}
该构造函数的主要作用是建立与zkServer的连接实例,添加状态监听,以及本地缓存文件的处理。
3. Zookeeper 服务提供者注册
上一节我们看到在初始化 zookeeper 时,是在export()过程中通过 getRegistry() 实现的。同样,在export()过程中,在获取了注册中心实例后,还需要将服务地址注册上去,才算功成。服务注册的时序图如下:
// org.apache.dubbo.registry.integration.RegistryProtocol#export
@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
URL registryUrl = getRegistryUrl(originInvoker);
// url to export locally
URL providerUrl = getProviderUrl(originInvoker);
// Subscribe the override data
// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
// the same service. Because the subscribed is cached key with the name of the service, it causes the
// subscription information to cover.
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
//export invoker
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
// url to registry
// 1. 获取注册中心实例
final Registry registry = getRegistry(originInvoker);
final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);
// decide if we need to delay publish
// 2. 将 registeredProviderUrl 注册上去
boolean register = providerUrl.getParameter(REGISTER_KEY, true);
if (register) {
register(registryUrl, registeredProviderUrl);
}
// register stated url on provider model
registerStatedUrl(registryUrl, registeredProviderUrl, register);
// Deprecated! Subscribe to override rules in 2.6.x or before.
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
exporter.setRegisterUrl(registeredProviderUrl);
exporter.setSubscribeUrl(overrideSubscribeUrl);
notifyExport(exporter);
//Ensure that a new exporter instance is returned every time export
return new DestroyableExporter<>(exporter);
}
private void register(URL registryUrl, URL registeredProviderUrl) {
// 此处获取 registry 实际是registry的一个 wrapper, registryUrl 以 zookeeper:// 开头
Registry registry = registryFactory.getRegistry(registryUrl);
// 此处的 registeredProviderUrl 如: dubbo://192.168.56.1:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=dubbo-demo-api-provider&default=true&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello,sayHelloAsync&pid=135556&release=&side=provider×tamp=1588518573613
registry.register(registeredProviderUrl);
}
// org.apache.dubbo.registry.ListenerRegistryWrapper#register
@Override
public void register(URL url) {
try {
// 调用实际的 registry
registry.register(url);
} finally {
if (CollectionUtils.isNotEmpty(listeners)) {
RuntimeException exception = null;
for (RegistryServiceListener listener : listeners) {
if (listener != null) {
try {
listener.onRegister(url);
} catch (RuntimeException t) {
logger.error(t.getMessage(), t);
exception = t;
}
}
}
if (exception != null) {
throw exception;
}
}
}
}
// org.apache.dubbo.registry.support.FailbackRegistry#register
@Override
public void register(URL url) {
if (!acceptable(url)) {
logger.info("URL " + url + " will not be registered to Registry. Registry " + url + " does not accept service of this protocol type.");
return;
}
super.register(url);
removeFailedRegistered(url);
removeFailedUnregistered(url);
try {
// Sending a registration request to the server side
// 调用zk进行url注册
doRegister(url);
} catch (Exception e) {
Throwable t = e;
// If the startup detection is opened, the Exception is thrown directly.
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true)
&& !CONSUMER_PROTOCOL.equals(url.getProtocol());
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
// Record a failed registration request to a failed list, retry regularly
addFailedRegistered(url);
}
}
// org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#doRegister
@Override
public void doRegister(URL url) {
try {
// 创建zk node, dynamic=true, 默认创建的节点为临时节点
// url地址如: dubbo://192.168.56.1:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=dubbo-demo-api-provider&default=true&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello,sayHelloAsync&pid=34676&release=&side=provider×tamp=1588639020690
// 此地址通过 toUrlPath 转换为 zookeeper 的目录地址,比如 providers 目录,consumers 目录...
zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
// 将提供者信息转移为zk目录
private String toUrlPath(URL url) {
// 找到分类目录,最下级为当前提供者的所有url信息encode后的值
// 即 /dubbo/interfaceName/providers/xxxx
return toCategoryPath(url) + PATH_SEPARATOR + URL.encode(url.toFullString());
}
private String toCategoryPath(URL url) {
// category='', 默认为 providers
return toServicePath(url) + PATH_SEPARATOR + url.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY);
}
private String toServicePath(URL url) {
String name = url.getServiceInterface();
if (ANY_VALUE.equals(name)) {
return toRootPath();
}
// /dubbo/interfaceName
return toRootDir() + URL.encode(name);
}
服务注册主要就分两步:1. 获取registry实例(通过SPI机制); 2. 将服务的url转换为对应的路径,以临时节点的形式create到zkServer; 以服务名作为父路径,以自身服务url作为叶子路径,可以很方便在相应路径上找到所有的提供者或消费者。
4. zookeeper 服务下线处理
当应用要关闭,或者注册失败时,需要进行服务下线。当然,如果应用没有及时做下线处理,zk会通过其自身的临时节点过期机制,也会将该服务做下线处理。从而避免消费者或管理台看到无效的服务存在。
应用服务的主动下线操作是由 ShutdownHookCallbacks 来实现的,其时序图如下:
// org.apache.dubbo.config.bootstrap.DubboBootstrap#DubboBootstrap
private DubboBootstrap() {
configManager = ApplicationModel.getConfigManager();
environment = ApplicationModel.getEnvironment();
// 在 DubboBootstrap 创建时就创建几个关闭钩子
DubboShutdownHook.getDubboShutdownHook().register();
// 将 DubboBootstrap 的销毁动作添加到 DubboShutdownHook 的执行队列中,以便在关闭时一起调用
ShutdownHookCallbacks.INSTANCE.addCallback(new ShutdownHookCallback() {
@Override
public void callback() throws Throwable {
DubboBootstrap.this.destroy();
}
});
}
/**
* Register the ShutdownHook
*/
public void register() {
if (registered.compareAndSet(false, true)) {
DubboShutdownHook dubboShutdownHook = getDubboShutdownHook();
Runtime.getRuntime().addShutdownHook(dubboShutdownHook);
dispatch(new DubboShutdownHookRegisteredEvent(dubboShutdownHook));
}
}
// org.apache.dubbo.config.bootstrap.DubboBootstrap#destroy
public void destroy() {
if (destroyLock.tryLock()) {
try {
// DubboShutdownHook 实现的destroy方法
DubboShutdownHook.destroyAll();
if (started.compareAndSet(true, false)
&& destroyed.compareAndSet(false, true)) {
unregisterServiceInstance();
unexportMetadataService();
unexportServices();
unreferServices();
destroyRegistries();
DubboShutdownHook.destroyProtocols();
destroyServiceDiscoveries();
clear();
shutdown();
release();
}
} finally {
destroyLock.unlock();
}
}
}
// org.apache.dubbo.config.DubboShutdownHook#destroyAll
public static void destroyAll() {
if (destroyed.compareAndSet(false, true)) {
AbstractRegistryFactory.destroyAll();
destroyProtocols();
}
}
// org.apache.dubbo.registry.support.AbstractRegistryFactory#destroyAll
/**
* Close all created registries
*/
public static void destroyAll() {
if (!destroyed.compareAndSet(false, true)) {
return;
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Close all registries " + getRegistries());
}
// Lock up the registry shutdown process
LOCK.lock();
try {
// 获取所有注册的地址,一一进行destroy()操作
for (Registry registry : getRegistries()) {
try {
registry.destroy();
} catch (Throwable e) {
LOGGER.error(e.getMessage(), e);
}
}
REGISTRIES.clear();
} finally {
// Release the lock
LOCK.unlock();
}
}
// org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#destroy
@Override
public void destroy() {
super.destroy();
try {
// (解绑)销毁动作只需执行一次,一次会将所有需要解绑的地址全部操作完成
zkClient.close();
} catch (Exception e) {
logger.warn("Failed to close zookeeper client " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
// org.apache.dubbo.registry.support.FailbackRegistry#destroy
@Override
public void destroy() {
super.destroy();
retryTimer.stop();
}
// org.apache.dubbo.registry.support.AbstractRegistry#destroy
@Override
public void destroy() {
if (logger.isInfoEnabled()) {
logger.info("Destroy registry:" + getUrl());
}
Set<URL> destroyRegistered = new HashSet<>(getRegistered());
if (!destroyRegistered.isEmpty()) {
for (URL url : new HashSet<>(getRegistered())) {
if (url.getParameter(DYNAMIC_KEY, true)) {
try {
// 此处会将在该 register 中注册的所有地址进行解绑,所以,当前实例只需调用一次destroy()即可
unregister(url);
if (logger.isInfoEnabled()) {
logger.info("Destroy unregister url " + url);
}
} catch (Throwable t) {
logger.warn("Failed to unregister url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t);
}
}
}
}
Map<URL, Set<NotifyListener>> destroySubscribed = new HashMap<>(getSubscribed());
if (!destroySubscribed.isEmpty()) {
for (Map.Entry<URL, Set<NotifyListener>> entry : destroySubscribed.entrySet()) {
URL url = entry.getKey();
for (NotifyListener listener : entry.getValue()) {
try {
unsubscribe(url, listener);
if (logger.isInfoEnabled()) {
logger.info("Destroy unsubscribe url " + url);
}
} catch (Throwable t) {
logger.warn("Failed to unsubscribe url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t);
}
}
}
}
AbstractRegistryFactory.removeDestroyedRegistry(this);
}
// org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#doUnregister
@Override
public void doUnregister(URL url) {
try {
// 注册地址如: /dubbo/org.apache.dubbo.demo.DemoService/providers/dubbo%3A%2F%2F192.168.56.1%3A20880%2Forg.apache.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddubbo-demo-api-provider%26default%3Dtrue%26deprecated%3Dfalse%26dubbo%3D2.0.2%26dynamic%3Dtrue%26generic%3Dfalse%26interface%3Dorg.apache.dubbo.demo.DemoService%26methods%3DsayHello%2CsayHelloAsync%26pid%3D135556%26release%3D%26side%3Dprovider%26timestamp%3D1588518573613
zkClient.delete(toUrlPath(url));
} catch (Throwable e) {
throw new RpcException("Failed to unregister " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
其实就是一个关闭钩子,进行路径的主动删除操作。
5. zookeeper 消费者服务订阅
前面两个操作,服务的注册与解绑,可以针对提供者,也可以针对消费者。但是对服务订阅则更多是针对消费者!因其要时刻关注提供者的变化,接收注册中心的消息推送。其以 ReferenceConfig.get() 入口。所以,我们以消费者的视图进行解析zk的订阅过程:
// org.apache.dubbo.registry.integration.RegistryProtocol#refer
@Override
@SuppressWarnings("unchecked")
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
url = getRegistryUrl(url);
Registry registry = registryFactory.getRegistry(url);
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}
// group="a,b" or group="*"
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
String group = qs.get(GROUP_KEY);
if (group != null && group.length() > 0) {
if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
return doRefer(getMergeableCluster(), registry, type, url);
}
}
return doRefer(cluster, registry, type, url);
}
// org.apache.dubbo.registry.integration.RegistryProtocol#doRefer
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters());
URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
if (directory.isShouldRegister()) {
directory.setRegisteredConsumerUrl(subscribeUrl);
registry.register(directory.getRegisteredConsumerUrl());
}
directory.buildRouterChain(subscribeUrl);
// 注册消费者监听到 zk 中
// 添加路径 providers, configurators, routers, 以便在同时监听几个目录
directory.subscribe(toSubscribeUrl(subscribeUrl));
Invoker<T> invoker = cluster.join(directory);
List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);
if (CollectionUtils.isEmpty(listeners)) {
return invoker;
}
RegistryInvokerWrapper<T> registryInvokerWrapper = new RegistryInvokerWrapper<>(directory, cluster, invoker, subscribeUrl);
for (RegistryProtocolListener listener : listeners) {
listener.onRefer(this, registryInvokerWrapper);
}
return registryInvokerWrapper;
}
private static URL toSubscribeUrl(URL url) {
return url.addParameter(CATEGORY_KEY, PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY);
}
// org.apache.dubbo.registry.ListenerRegistryWrapper#subscribe
@Override
public void subscribe(URL url, NotifyListener listener) {
try {
registry.subscribe(url, listener);
} finally {
if (CollectionUtils.isNotEmpty(listeners)) {
RuntimeException exception = null;
for (RegistryServiceListener registryListener : listeners) {
if (registryListener != null) {
try {
registryListener.onSubscribe(url);
} catch (RuntimeException t) {
logger.error(t.getMessage(), t);
exception = t;
}
}
}
if (exception != null) {
throw exception;
}
}
}
}
// org.apache.dubbo.registry.support.FailbackRegistry#subscribe
@Override
public void subscribe(URL url, NotifyListener listener) {
super.subscribe(url, listener);
removeFailedSubscribed(url, listener);
try {
// Sending a subscription request to the server side
doSubscribe(url, listener);
} catch (Exception e) {
Throwable t = e;
List<URL> urls = getCacheUrls(url);
if (CollectionUtils.isNotEmpty(urls)) {
notify(url, listener, urls);
logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t);
} else {
// If the startup detection is opened, the Exception is thrown directly.
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true);
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
}
// Record a failed registration request to a failed list, retry regularly
addFailedSubscribed(url, listener);
}
}
// org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#doSubscribe
@Override
public void doSubscribe(final URL url, final NotifyListener listener) {
try {
if (ANY_VALUE.equals(url.getServiceInterface())) {
String root = toRootPath();
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> {
for (String child : currentChilds) {
child = URL.decode(child);
if (!anyServices.contains(child)) {
anyServices.add(child);
subscribe(url.setPath(child).addParameters(INTERFACE_KEY, child,
Constants.CHECK_KEY, String.valueOf(false)), k);
}
}
});
zkClient.create(root, false);
List<String> services = zkClient.addChildListener(root, zkListener);
if (CollectionUtils.isNotEmpty(services)) {
for (String service : services) {
service = URL.decode(service);
anyServices.add(service);
subscribe(url.setPath(service).addParameters(INTERFACE_KEY, service,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
} else {
// configurators
List<URL> urls = new ArrayList<>();
// /dubbo/org.apache.dubbo.demo.DemoService/configurators
// url 样例: provider://192.168.56.1:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=dubbo-demo-api-provider&bind.ip=192.168.56.1&bind.port=20880&category=configurators&check=false&default=true&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello,sayHelloAsync&pid=34676&release=&side=provider×tamp=1588639020690
// 它会经过 toCategoriesPath() 转换为多个控制目录,依次注册
for (String path : toCategoriesPath(url)) {
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
// listener 是 ConcurrentHashMap 中的 ConcurrentHashMap, 所以迭代也是两层的
// 当目录发生变更时,运行 notify() 方法
ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, k, toUrlsWithEmpty(url, parentPath, currentChilds)));
// 与提供者的服务注册不一样,消费者的订阅路径是 持久化的
zkClient.create(path, false);
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
// 在添加监听的同时,也返回了当前目录下的所有路径,解析该值即可以得到相应的提供者等信息
// toUrlsWithEmpty 将消费者和提供者参数进行整合,以便排出优先级进行语义覆盖
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
// 自身注册完成后,主动触发一次 notify() 以刷新信息
notify(url, listener, urls);
}
} catch (Throwable e) {
throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
private String[] toCategoriesPath(URL url) {
String[] categories;
if (ANY_VALUE.equals(url.getParameter(CATEGORY_KEY))) {
categories = new String[]{PROVIDERS_CATEGORY, CONSUMERS_CATEGORY, ROUTERS_CATEGORY, CONFIGURATORS_CATEGORY};
} else {
categories = url.getParameter(CATEGORY_KEY, new String[]{DEFAULT_CATEGORY});
}
String[] paths = new String[categories.length];
for (int i = 0; i < categories.length; i++) {
// servicePath: /dubbo/interfaceName
paths[i] = toServicePath(url) + PATH_SEPARATOR + categories[i];
}
return paths;
}
// 处理在添加监听时获取到的childPath信息,作为首次订阅时的依据
private List<URL> toUrlsWithEmpty(URL consumer, String path, List<String> providers) {
List<URL> urls = toUrlsWithoutEmpty(consumer, providers);
if (urls == null || urls.isEmpty()) {
// 针对url未匹配情况,添加一个 empty:// url 返回
int i = path.lastIndexOf(PATH_SEPARATOR);
String category = i < 0 ? path : path.substring(i + 1);
URL empty = URLBuilder.from(consumer)
.setProtocol(EMPTY_PROTOCOL)
.addParameter(CATEGORY_KEY, category)
.build();
urls.add(empty);
}
return urls;
}
private List<URL> toUrlsWithoutEmpty(URL consumer, List<String> providers) {
List<URL> urls = new ArrayList<>();
if (CollectionUtils.isNotEmpty(providers)) {
for (String provider : providers) {
// 包含 ://
if (provider.contains(PROTOCOL_SEPARATOR_ENCODED)) {
// 解码提供者url
URL url = URLStrParser.parseEncodedStr(provider);
// 判断消费者与提供者是否匹配,或者提供者是否被禁用
if (UrlUtils.isMatch(consumer, url)) {
urls.add(url);
}
}
}
}
return urls;
}
// org.apache.dubbo.common.utils.UrlUtils#isMatch
public static boolean isMatch(URL consumerUrl, URL providerUrl) {
String consumerInterface = consumerUrl.getServiceInterface();
String providerInterface = providerUrl.getServiceInterface();
//FIXME accept providerUrl with '*' as interface name, after carefully thought about all possible scenarios I think it's ok to add this condition.
// 接口名称判定
if (!(ANY_VALUE.equals(consumerInterface)
|| ANY_VALUE.equals(providerInterface)
|| StringUtils.isEquals(consumerInterface, providerInterface))) {
return false;
}
// category 类型判定
if (!isMatchCategory(providerUrl.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY),
consumerUrl.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY))) {
return false;
}
// 启用禁用标识判定
if (!providerUrl.getParameter(ENABLED_KEY, true)
&& !ANY_VALUE.equals(consumerUrl.getParameter(ENABLED_KEY))) {
return false;
}
String consumerGroup = consumerUrl.getParameter(GROUP_KEY);
String consumerVersion = consumerUrl.getParameter(VERSION_KEY);
String consumerClassifier = consumerUrl.getParameter(CLASSIFIER_KEY, ANY_VALUE);
String providerGroup = providerUrl.getParameter(GROUP_KEY);
String providerVersion = providerUrl.getParameter(VERSION_KEY);
String providerClassifier = providerUrl.getParameter(CLASSIFIER_KEY, ANY_VALUE);
// 其他判定
return (ANY_VALUE.equals(consumerGroup) || StringUtils.isEquals(consumerGroup, providerGroup) || StringUtils.isContains(consumerGroup, providerGroup))
&& (ANY_VALUE.equals(consumerVersion) || StringUtils.isEquals(consumerVersion, providerVersion))
&& (consumerClassifier == null || ANY_VALUE.equals(consumerClassifier) || StringUtils.isEquals(consumerClassifier, providerClassifier));
}
事实上,其步骤与提供者是类似的:1. 获取registry实例; 2. 注册自身消费标识到consumers目录; 3. 订阅providers,configurators,routers 子目录变更; 另外,在做订阅的同时,也拉取了提供者服务列表达到初始化的作用。
6. zookeeper 服务信息通知notify
当注册中心发现服务提供者发生了变化时,将会将该信息推送给订阅了相应路径的客户端。这个客户端首先会被前面设置的 zkListener 处理。
// ZookeeperRegistry.doSubscribe
@Override
public void doSubscribe(final URL url, final NotifyListener listener) {
...
List<URL> urls = new ArrayList<>();
for (String path : toCategoriesPath(url)) {
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
// 服务回调将由 ZookeeperRegistry.this.notify() 处理
ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, k, toUrlsWithEmpty(url, parentPath, currentChilds)));
zkClient.create(path, false);
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
notify(url, listener, urls);
...
}
// org.apache.dubbo.registry.support.FailbackRegistry#notify
@Override
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
if (url == null) {
throw new IllegalArgumentException("notify url == null");
}
if (listener == null) {
throw new IllegalArgumentException("notify listener == null");
}
try {
doNotify(url, listener, urls);
} catch (Exception t) {
// Record a failed registration request to a failed list, retry regularly
addFailedNotified(url, listener, urls);
logger.error("Failed to notify for subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
}
// org.apache.dubbo.registry.support.FailbackRegistry#doNotify
protected void doNotify(URL url, NotifyListener listener, List<URL> urls) {
super.notify(url, listener, urls);
}
// org.apache.dubbo.registry.support.AbstractRegistry#notify(org.apache.dubbo.common.URL, org.apache.dubbo.registry.NotifyListener, java.util.List<org.apache.dubbo.common.URL>)
/**
* Notify changes from the Provider side.
*
* @param url consumer side url
* @param listener listener
* @param urls provider latest urls
*/
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
if (url == null) {
throw new IllegalArgumentException("notify url == null");
}
if (listener == null) {
throw new IllegalArgumentException("notify listener == null");
}
if ((CollectionUtils.isEmpty(urls))
&& !ANY_VALUE.equals(url.getServiceInterface())) {
logger.warn("Ignore empty notify urls for subscribe url " + url);
return;
}
if (logger.isInfoEnabled()) {
logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
}
// keep every provider's category.
Map<String, List<URL>> result = new HashMap<>();
for (URL u : urls) {
if (UrlUtils.isMatch(url, u)) {
String category = u.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY);
List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>());
categoryList.add(u);
}
}
if (result.size() == 0) {
return;
}
Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());
for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
String category = entry.getKey();
List<URL> categoryList = entry.getValue();
categoryNotified.put(category, categoryList);
// 再交由最原始传入的 listener 进行处理, 即 RegistryDirectory.notify()
listener.notify(categoryList);
// We will update our cache file after each notification.
// When our Registry has a subscribe failure due to network jitter, we can return at least the existing cache URL.
saveProperties(url);
}
}
// org.apache.dubbo.registry.integration.RegistryDirectory#notify
@Override
public synchronized void notify(List<URL> urls) {
Map<String, List<URL>> categoryUrls = urls.stream()
.filter(Objects::nonNull)
.filter(this::isValidCategory)
.filter(this::isNotCompatibleFor26x)
.collect(Collectors.groupingBy(this::judgeCategory));
List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());
this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);
List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());
// 添加到所有提供者列表中,以便在后续请求可以将最新的地址信息给consumer使用
toRouters(routerURLs).ifPresent(this::addRouters);
// providers
List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
/**
* 3.x added for extend URL address
*/
ExtensionLoader<AddressListener> addressListenerExtensionLoader = ExtensionLoader.getExtensionLoader(AddressListener.class);
List<AddressListener> supportedListeners = addressListenerExtensionLoader.getActivateExtension(getUrl(), (String[]) null);
if (supportedListeners != null && !supportedListeners.isEmpty()) {
for (AddressListener addressListener : supportedListeners) {
providerURLs = addressListener.notify(providerURLs, getConsumerUrl(),this);
}
}
refreshOverrideAndInvoker(providerURLs);
}
7. zookeeper 服务解除事件订阅
除了服务下线外,还需要解除订阅关系。以便zookeeper不用再维护其监听推送。
// org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#doUnsubscribe
@Override
public void doUnsubscribe(URL url, NotifyListener listener) {
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners != null) {
ChildListener zkListener = listeners.get(listener);
if (zkListener != null) {
if (ANY_VALUE.equals(url.getServiceInterface())) {
String root = toRootPath();
zkClient.removeChildListener(root, zkListener);
} else {
// 将路径列举出来,一个个解除监听
for (String path : toCategoriesPath(url)) {
zkClient.removeChildListener(path, zkListener);
}
}
}
}
}
// 解除监听操作同样是由关闭钩子 触发的
// org.apache.dubbo.registry.support.AbstractRegistry#destroy
@Override
public void destroy() {
if (logger.isInfoEnabled()) {
logger.info("Destroy registry:" + getUrl());
}
Set<URL> destroyRegistered = new HashSet<>(getRegistered());
if (!destroyRegistered.isEmpty()) {
for (URL url : new HashSet<>(getRegistered())) {
if (url.getParameter(DYNAMIC_KEY, true)) {
try {
unregister(url);
if (logger.isInfoEnabled()) {
logger.info("Destroy unregister url " + url);
}
} catch (Throwable t) {
logger.warn("Failed to unregister url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t);
}
}
}
}
Map<URL, Set<NotifyListener>> destroySubscribed = new HashMap<>(getSubscribed());
if (!destroySubscribed.isEmpty()) {
for (Map.Entry<URL, Set<NotifyListener>> entry : destroySubscribed.entrySet()) {
URL url = entry.getKey();
for (NotifyListener listener : entry.getValue()) {
try {
// 解除监听操作
unsubscribe(url, listener);
if (logger.isInfoEnabled()) {
logger.info("Destroy unsubscribe url " + url);
}
} catch (Throwable t) {
logger.warn("Failed to unsubscribe url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t);
}
}
}
}
AbstractRegistryFactory.removeDestroyedRegistry(this);
}
// org.apache.dubbo.registry.support.FailbackRegistry#unsubscribe
@Override
public void unsubscribe(URL url, NotifyListener listener) {
super.unsubscribe(url, listener);
removeFailedSubscribed(url, listener);
try {
// Sending a canceling subscription request to the server side
// 调用 ZookeeperRegistry.doUnregister(), 删除路径监听
doUnsubscribe(url, listener);
} catch (Exception e) {
Throwable t = e;
// If the startup detection is opened, the Exception is thrown directly.
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true);
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to unsubscribe " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to unsubscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
// Record a failed registration request to a failed list, retry regularly
addFailedUnsubscribed(url, listener);
}
}
和前面的下线操作意义相同,只是一个是 delete 路径,一个是解除变更监听。
dubbo 中以服务为单位进行注册通知管理,避免了一个因应用提供者提供许多服务,而导致消费者必须处理大量数据的情况。各消费者只关注与自己相关的服务路径,从而最小化影响。通过临时节点和心跳机制,保证了各服务的准确体现。这是zk比较擅长的。虽然该点功能小,却意义重大。
作者:等你归去来文章来源:https://www.toymoban.com/news/detail-835438.html
原文地址:https://www.cnblogs.com/yougewe/p/12831556.html文章来源地址https://www.toymoban.com/news/detail-835438.html
到了这里,关于Dubbo(六):zookeeper注册中心的应用的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!