在消费端服务是基于接口调用Provider端提供的服务,所以在消费端并没有服务公共接口的实现类。
@RestController
@RequestMapping("/consumer")
public class CountryController {
@DubboReference(version = "2.0",lazy = true)
CountryService countryService;
@GetMapping("/getCountry")
public JSONObject getCountry() {
JSONObject rtn = new JSONObject();
rtn.put("msg",countryService.getCountry());
return rtn;
}
}
- 利用注解@DubboReference将目标接口CountryService作为CountryController类的字段属性,在解析类CountryController时获取全部字段属性并单独关注解析存在注解@DubboReference的字段属性。
- 通过步骤1得到服务公共接口类型,在生成RootBeanDefinition时设置其Class属性为ReferenceBean,最终将服务公共接口CountryService注册至IOC容器中。
- 通过JdkDynamicAopProxy对服务公共接口生成代理。
1.ReferenceAnnotationBeanPostProcessor
ReferenceAnnotationBeanPostProcessor后置处理器重置服务目标接口CountryService在IOC注册表class的属性为ReferenceBean。
在Spring实例化 & 初始化 IOC容器中涉及所有bean时,触发ReferenceBean的FactoryBean特性完成接口CountryService在IOC容器中的bean管理。
public class ReferenceBean<T> implements FactoryBean<T>{
@Override
public T getObject() {
if (lazyProxy == null) {
// 对目标类代理处理
createLazyProxy();
}
return (T) lazyProxy;
}
private void createLazyProxy() {
ProxyFactory proxyFactory = new ProxyFactory();
proxyFactory.setTargetSource(new DubboReferenceLazyInitTargetSource());
proxyFactory.addInterface(interfaceClass);
Class<?>[] internalInterfaces = AbstractProxyFactory.getInternalInterfaces();
for (Class<?> anInterface : internalInterfaces) {
proxyFactory.addInterface(anInterface);
}
if (!StringUtils.isEquals(interfaceClass.getName(), interfaceName)) {
Class<?> serviceInterface = ClassUtils.forName(interfaceName, beanClassLoader);
proxyFactory.addInterface(serviceInterface);
}
//jdk基于接口代理
this.lazyProxy = proxyFactory.getProxy(this.beanClassLoader);
}
private Object getCallProxy() throws Exception {
// 利用 ReferenceConfig 初始化 lazyTarget 属性
return referenceConfig.get();
}
private class DubboReferenceLazyInitTargetSource extends AbstractLazyCreationTargetSource {
@Override
protected Object createObject() throws Exception {
return getCallProxy();
}
@Override
public synchronized Class<?> getTargetClass() {
return getInterfaceClass();
}
}
}
jdk动态代理技术生成目标接口代理类过程中需要注意 DubboReferenceLazyInitTargetSource之lazyTarget属性【属性赋值时机、属性使用时机】。
Dubbo中TargetSource之DubboReferenceLazyInitTargetSource可以控制属性值lazyTarget初始化时机,其实是通过抽象类AbstractLazyCreationTargetSource完成的。
1.1.服务公共接口CountryService代理过程
如下所示:接口CountryService的代理类在执行代理方法过程中涉及利用TargetSource获取lazyTarget属性。
class JdkDynamicAopProxy implements AopProxy, InvocationHandler, Serializable {
// 代理目标类中的目标方法
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Object oldProxy = null;
boolean setProxyContext = false;
// 通过 ProxyFactory 获取TargetSource之DubboReferenceLazyInitTargetSource
TargetSource targetSource = this.advised.targetSource;
Object target = null;
...
Object retVal;
// 此处就是获取目标接口CountryService的代理,其实就是lazyTarget属性值
target = targetSource.getTarget();
Class<?> targetClass = (target != null ? target.getClass() : null);
List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass);
if (chain.isEmpty()) {// 默认情况下成立
Object[] argsToUse = AopProxyUtils.adaptArgumentsIfNecessary(method, args);
retVal = AopUtils.invokeJoinpointUsingReflection(target, method, argsToUse);
}
else {
...
}
...
return retVal;
}
}
根据注解得知lazyTarget属性赋值是懒加载方式得到的,即首次获取lazyTarget对象时才真正触发其完成赋值。但是实际情况是在创建目标接口的代理类时就实现赋值操作【不知道为啥?】。
public abstract class AbstractLazyCreationTargetSource implements TargetSource {
/**
* Returns the lazy-initialized target object,
* creating it on-the-fly if it doesn't exist already.
* @see #createObject()
*/
@Override
public synchronized Object getTarget() throws Exception {
if (this.lazyTarget == null) {
// 触发CountryService的代理
this.lazyTarget = createObject();
}
return this.lazyTarget;
}
}
综上所述:目标接口代理类生成过程中还涉及lazyTarget属性赋值。而且发现目标接口代理类的类名、lazyTarget属性名均为CountryServiceDubboProxy0。但是两者区别是前者实例中持有的InvocationHandler类型为JdkDynamicAopProxy,后者持有的InvocationHandler类型为InvokerInvocationHandler。
1.2.ReferenceConfig
public class ReferenceConfig<T> extends ReferenceConfigBase<T> {
private transient volatile T ref;
@Override
public T get() {
if (ref == null) {
getScopeModel().getDeployer().start();
synchronized (this) {
if (ref == null) {
init();
}
}
}
return ref;
}
}
如上所示 ReferenceConfig 的核心字段 ref 最终指向类TargetSource中涉及的lazyTarget字段。
疑问:字段ref 赋值时机:
- 可能是接口CountryService生成jdk代理类的过程。
- 可能是接口CountryService代理目标方法执行过程。
- 可能是监听器DubboDeployApplicationListener处理相关事件过程。
具体是哪个阶段真正完成字段 ref 的赋值,尚未确定。
2.DubboDeployApplicationListener
该监听器的核心作用:字段 ref 的赋值流程。
public class DefaultModuleDeployer extends AbstractDeployer<ModuleModel> implements ModuleDeployer {
private final ModuleConfigManager configManager;
private final SimpleReferenceCache referenceCache;
@Override
public synchronized Future start() throws IllegalStateException {
...
onModuleStarting();
// initialize
applicationDeployer.initialize();
initialize();
// export services
exportServices();
...
// refer services 这里面是核心功能
referServices();
...
return startFuture;
}
private void referServices() {
// 配置管理器中获取 @DubboReference 注解引入的目标接口配置信息
configManager.getReferences().forEach(rc -> {
ReferenceConfig<?> referenceConfig = (ReferenceConfig<?>) rc;
...
referenceCache.get(rc);
});
}
}
public class SimpleReferenceCache implements ReferenceCache {
public <T> T get(ReferenceConfigBase<T> rc) {
String key = generator.generateKey(rc);
Class<?> type = rc.getInterfaceClass();
Object proxy = rc.get();// ReferenceConfig初始化字段ref
...
return (T) proxy;
}
}
2.1. Protocol协议之refer
接口Protocol的扩展类如下图所示:
public interface Protocol {
// 服务提供者注册目标接口
@Adaptive
<T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
// 服务消费端通过注解@DubboReference引进目标接口
@Adaptive
<T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
default List<ProtocolServer> getServers() {
return Collections.emptyList();
}
}
public class ReferenceConfig<T> extends ReferenceConfigBase<T> {
private Protocol protocolSPI;
@Override
public T get() {
if (ref == null) {
// ensure start module, compatible with old api usage
getScopeModel().getDeployer().start();
synchronized (this) {
if (ref == null) {
init();
}
}
}
return ref;
}
protected synchronized void init() {
...
ref = createProxy(referenceParameters);
...
}
private T createProxy(Map<String, String> referenceParameters) {
...
createInvokerForRemote();
...
// create service proxy
return (T) proxyFactory.getProxy(invoker, ProtocolUtils.isGeneric(generic));
}
private void createInvokerForRemote() {
// URL 协议类型为 registry
URL curUrl = urls.get(0);
invoker = protocolSPI.refer(interfaceClass, curUrl);
if (!UrlUtils.isRegistry(curUrl)) {
List<Invoker<?>> invokers = new ArrayList<>();
invokers.add(invoker);
invoker = Cluster.getCluster(scopeModel, Cluster.DEFAULT).join(new StaticDirectory(curUrl, invokers), true);
}
}
}
registry类型的url协议:代表注册中心地址,即zk的IP & 端口号等相关注册中心地址相关信息。
registry://zk-host:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-consumer&dubbo=2.0.2&pid=66082&qos.enable=false®ister-mode=instance®istry=zookeeper&release=3.0.7×tamp=1710075502746
如图所示接口Protocol扩展类中真正初始化字段refer值的是RegistryProtocol#refer。
2.1.1.RegistryProtocol
public class RegistryProtocol implements Protocol, ScopeModelAware {
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
// 将registry协议替换为zookeeper协议
url = getRegistryUrl(url);
Registry registry = getRegistry(url);
...
Cluster cluster = Cluster.getCluster(url.getScopeModel(), qs.get(CLUSTER_KEY));
return doRefer(cluster, registry, type, url, qs);
}
protected <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url, Map<String, String> parameters) {
...
// consumerUrl 即消费端当前的IP地址,该地址作用是:为provider服务提供响应地址
URL consumerUrl = new ServiceConfigURL (
p,null,null,parameters.get(REGISTER_IP_KEY),0, getPath(parameters, type),parameters,consumerAttribute
);
url = url.putAttribute(CONSUMER_URL_KEY, consumerUrl);
// migrationInvoker:MigrationInvoker
ClusterInvoker<T> migrationInvoker = getMigrationInvoker(this, cluster, registry, type, url, consumerUrl);
return interceptInvoker(migrationInvoker, url, consumerUrl);
}
protected <T> Invoker<T> interceptInvoker(ClusterInvoker<T> invoker, URL url, URL consumerUrl) {
List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);
for (RegistryProtocolListener listener : listeners) {
// 默认情况下存在监听器之MigrationRuleListener
listener.onRefer(this, invoker, consumerUrl, url);
}
return invoker;
}
}
zookeeper协议:
zookeeper://zk-host:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-consumer&dubbo=2.0.2&pid=66082&qos.enable=false®ister-mode=instance&release=3.0.7×tamp=1710075502746
consumer协议:消费端注册服务时不需要指定端口,因为provider端 & consumer端是通过tcp协议之Netty【端口:20880】完成两端信息通信。文章来源:https://www.toymoban.com/news/detail-839011.html
consumer://192.168.1.6/common.service.CountryService?application=dubbo-consumer&background=false&dubbo=2.0.2&interface=common.service.CountryService&lazy=true&methods=getCountry&pid=66082&qos.enable=false®ister-mode=instance®ister.ip=192.168.1.6&release=3.0.7&revision=2.0&side=consumer&sticky=false×tamp=1710075502715&version=2.0
文章来源地址https://www.toymoban.com/news/detail-839011.html
3.ServiceDiscoveryRegistry
public class ServiceDiscoveryRegistry extends FailbackRegistry {
private final AbstractServiceNameMapping serviceNameMapping;
@Override
public void doSubscribe(URL url, NotifyListener listener) {
url = addRegistryClusterKey(url);
serviceDiscovery.subscribe(url, listener);
boolean check = url.getParameter(CHECK_KEY, false);
String key = ServiceNameMapping.buildMappingKey(url);
Lock mappingLock = serviceNameMapping.getMappingLock(key);
mappingLock.lock();
Set<String> subscribedServices = serviceNameMapping.getCachedMapping(url);
MappingListener mappingListener = new DefaultMappingListener(url, subscribedServices, listener);
subscribedServices = serviceNameMapping.getAndListen(this.getUrl(), url, mappingListener);
mappingListeners.put(url.getProtocolServiceKey(), mappingListener);
subscribeURLs(url, listener, subscribedServices);
}
protected void subscribeURLs(URL url, NotifyListener listener, Set<String> serviceNames) {
//
serviceNames = toTreeSet(serviceNames);
String serviceNamesKey = toStringKeys(serviceNames);
String protocolServiceKey = url.getProtocolServiceKey();
logger.info(String.format("Trying to subscribe from apps %s for service key %s, ", serviceNamesKey, protocolServiceKey));
// register ServiceInstancesChangedListener
Lock appSubscriptionLock = getAppSubscription(serviceNamesKey);
try {
appSubscriptionLock.lock();
ServiceInstancesChangedListener serviceInstancesChangedListener = serviceListeners.get(serviceNamesKey);
if (serviceInstancesChangedListener == null) {
serviceInstancesChangedListener = serviceDiscovery.createListener(serviceNames);
serviceInstancesChangedListener.setUrl(url);
for (String serviceName : serviceNames) {//
// 获取提供端服务的IP、端口号等信息
List<ServiceInstance> serviceInstances = serviceDiscovery.getInstances(serviceName);
if (CollectionUtils.isNotEmpty(serviceInstances)) {
serviceInstancesChangedListener.onEvent(new ServiceInstancesChangedEvent(serviceName, serviceInstances));
}
}
serviceListeners.put(serviceNamesKey, serviceInstancesChangedListener);
}
if (!serviceInstancesChangedListener.isDestroyed()) {
serviceInstancesChangedListener.setUrl(url);
listener.addServiceListener(serviceInstancesChangedListener);
serviceInstancesChangedListener.addListenerAndNotify(protocolServiceKey, listener);
serviceDiscovery.addServiceInstancesChangedListener(serviceInstancesChangedListener);
} else {
logger.info(String.format("Listener of %s has been destroyed by another thread.", serviceNamesKey));
serviceListeners.remove(serviceNamesKey);
}
} finally {
appSubscriptionLock.unlock();
}
}
}
public abstract class AbstractServiceNameMapping implements ServiceNameMapping, ScopeModelAware {
@Override
public Set<String> getAndListen(URL registryURL, URL subscribedURL, MappingListener listener) {
String key = ServiceNameMapping.buildMappingKey(subscribedURL);
// use previously cached services.
Set<String> mappingServices = this.getCachedMapping(key);
// Asynchronously register listener in case previous cache does not exist or cache expired.
if (CollectionUtils.isEmpty(mappingServices)) {
...
} else {
ExecutorService executorService = applicationModel.getFrameworkModel().getBeanFactory()
.getBean(FrameworkExecutorRepository.class).getMappingRefreshingExecutor();
executorService.submit(new AsyncMappingTask(listener, subscribedURL, true));
}
return mappingServices;
}
private class AsyncMappingTask implements Callable<Set<String>> {
private final MappingListener listener;
private final URL subscribedURL;
private final boolean notifyAtFirstTime;
public AsyncMappingTask(MappingListener listener, URL subscribedURL, boolean notifyAtFirstTime) {
this.listener = listener;
this.subscribedURL = subscribedURL;
this.notifyAtFirstTime = notifyAtFirstTime;
}
@Override
public Set<String> call() throws Exception {
synchronized (mappingListeners) {
Set<String> mappedServices = emptySet();
try {
String mappingKey = ServiceNameMapping.buildMappingKey(subscribedURL);
if (listener != null) {
// 通过zk 客户端 获取服务提供端的服务名集合
mappedServices = toTreeSet(getAndListen(subscribedURL, listener));
Set<MappingListener> listeners = mappingListeners.computeIfAbsent(mappingKey, _k -> new HashSet<>());
listeners.add(listener);
if (CollectionUtils.isNotEmpty(mappedServices)) {
if (notifyAtFirstTime) {
// 将 提供端服务名 添加到本地集合缓存 serviceNameMapping 中
// DefaultMappingListener:本地缓存 & zk 服务端 之间保证 提供端服务名 一致性
listener.onEvent(new MappingChangedEvent(mappingKey, mappedServices));
}
}
} else {
mappedServices = get(subscribedURL);
if (CollectionUtils.isNotEmpty(mappedServices)) {
AbstractServiceNameMapping.this.putCachedMapping(mappingKey, mappedServices);
}
}
} catch (Exception e) {
logger.error("Failed getting mapping info from remote center. ", e);
}
return mappedServices;
}
}
}
}
NettyChannel真正netty发送数据。
CodecSupportNetty相关协议。
到了这里,关于Dubbo之消费端服务RPC调用的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!