Dubbo之消费端服务RPC调用

这篇具有很好参考价值的文章主要介绍了Dubbo之消费端服务RPC调用。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

在消费端服务是基于接口调用Provider端提供的服务,所以在消费端并没有服务公共接口的实现类。

@RestController
@RequestMapping("/consumer")
public class CountryController {

    @DubboReference(version = "2.0",lazy = true)
    CountryService countryService;

    @GetMapping("/getCountry")
    public JSONObject getCountry() {
        JSONObject rtn = new JSONObject();
        rtn.put("msg",countryService.getCountry());
        return rtn;
    }
}
  1. 利用注解@DubboReference将目标接口CountryService作为CountryController类的字段属性,在解析类CountryController时获取全部字段属性并单独关注解析存在注解@DubboReference的字段属性。
  2. 通过步骤1得到服务公共接口类型,在生成RootBeanDefinition时设置其Class属性为ReferenceBean,最终将服务公共接口CountryService注册至IOC容器中。
  3. 通过JdkDynamicAopProxy对服务公共接口生成代理。

1.ReferenceAnnotationBeanPostProcessor

ReferenceAnnotationBeanPostProcessor后置处理器重置服务目标接口CountryService在IOC注册表class的属性为ReferenceBean

在Spring实例化 & 初始化 IOC容器中涉及所有bean时,触发ReferenceBean的FactoryBean特性完成接口CountryServiceIOC容器中的bean管理。

public class ReferenceBean<T> implements FactoryBean<T>{
    @Override
    public T getObject() {
        if (lazyProxy == null) {
            // 对目标类代理处理
            createLazyProxy();
        }
        return (T) lazyProxy;
    }

    private void createLazyProxy() {

        ProxyFactory proxyFactory = new ProxyFactory();
        proxyFactory.setTargetSource(new DubboReferenceLazyInitTargetSource());
        proxyFactory.addInterface(interfaceClass);
        Class<?>[] internalInterfaces = AbstractProxyFactory.getInternalInterfaces();
        for (Class<?> anInterface : internalInterfaces) {
            proxyFactory.addInterface(anInterface);
        }
        if (!StringUtils.isEquals(interfaceClass.getName(), interfaceName)) {
            Class<?> serviceInterface = ClassUtils.forName(interfaceName, beanClassLoader);
            proxyFactory.addInterface(serviceInterface);
        }
        //jdk基于接口代理
        this.lazyProxy = proxyFactory.getProxy(this.beanClassLoader);
    }

    private Object getCallProxy() throws Exception {
        // 利用 ReferenceConfig 初始化 lazyTarget 属性
        return referenceConfig.get();
    }

    private class DubboReferenceLazyInitTargetSource extends AbstractLazyCreationTargetSource {

        @Override
        protected Object createObject() throws Exception {
            return getCallProxy();
        }

        @Override
        public synchronized Class<?> getTargetClass() {
            return getInterfaceClass();
        }
    }
}

jdk动态代理技术生成目标接口代理类过程中需要注意 DubboReferenceLazyInitTargetSourcelazyTarget属性【属性赋值时机、属性使用时机】。

Dubbo中TargetSource之DubboReferenceLazyInitTargetSource可以控制属性值lazyTarget初始化时机,其实是通过抽象类AbstractLazyCreationTargetSource完成的。

1.1.服务公共接口CountryService代理过程

如下所示:接口CountryService的代理类在执行代理方法过程中涉及利用TargetSource获取lazyTarget属性。

class JdkDynamicAopProxy implements AopProxy, InvocationHandler, Serializable {
	
	// 代理目标类中的目标方法
	public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
		Object oldProxy = null;
		boolean setProxyContext = false;
		// 通过 ProxyFactory 获取TargetSource之DubboReferenceLazyInitTargetSource
		TargetSource targetSource = this.advised.targetSource;
		Object target = null;

		...

		Object retVal;
		// 此处就是获取目标接口CountryService的代理,其实就是lazyTarget属性值
		target = targetSource.getTarget();
		Class<?> targetClass = (target != null ? target.getClass() : null);
		List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass);

		if (chain.isEmpty()) {// 默认情况下成立
			Object[] argsToUse = AopProxyUtils.adaptArgumentsIfNecessary(method, args);
			retVal = AopUtils.invokeJoinpointUsingReflection(target, method, argsToUse);
		}
		else {
			...
		}
		...
		return retVal;
	}
}

根据注解得知lazyTarget属性赋值是懒加载方式得到的,即首次获取lazyTarget对象时才真正触发其完成赋值。但是实际情况是在创建目标接口的代理类时就实现赋值操作【不知道为啥?】。 

public abstract class AbstractLazyCreationTargetSource implements TargetSource {
	/**
	 * Returns the lazy-initialized target object,
	 * creating it on-the-fly if it doesn't exist already.
	 * @see #createObject()
	 */
	@Override
	public synchronized Object getTarget() throws Exception {
		if (this.lazyTarget == null) {
			// 触发CountryService的代理
			this.lazyTarget = createObject();
		}
		return this.lazyTarget;
	}
}

综上所述:目标接口代理类生成过程中还涉及lazyTarget属性赋值。而且发现目标接口代理类的类名、lazyTarget属性名均为CountryServiceDubboProxy0。但是两者区别是前者实例中持有的InvocationHandler类型为JdkDynamicAopProxy,后者持有的InvocationHandler类型为InvokerInvocationHandler。

1.2.ReferenceConfig

public class ReferenceConfig<T> extends ReferenceConfigBase<T> {

    private transient volatile T ref;

    @Override
    public T get() {
        if (ref == null) {
            getScopeModel().getDeployer().start();
            synchronized (this) {
                if (ref == null) {
                    init();
                }
            }
        }
        return ref;
    }
}

如上所示 ReferenceConfig 的核心字段 ref 最终指向类TargetSource中涉及的lazyTarget字段。

疑问:字段ref 赋值时机:

  1. 可能是接口CountryService生成jdk代理类的过程。
  2. 可能是接口CountryService代理目标方法执行过程。
  3. 可能是监听器DubboDeployApplicationListener处理相关事件过程。

具体是哪个阶段真正完成字段 ref 的赋值,尚未确定。


2.DubboDeployApplicationListener

该监听器的核心作用:字段 ref 的赋值流程。

public class DefaultModuleDeployer extends AbstractDeployer<ModuleModel> implements ModuleDeployer {

    private final ModuleConfigManager configManager;

    private final SimpleReferenceCache referenceCache;

    @Override
    public synchronized Future start() throws IllegalStateException {
        ...
        onModuleStarting();
        // initialize
        applicationDeployer.initialize();
        initialize();
        // export services
        exportServices();
        ...
        // refer services  这里面是核心功能
        referServices();
        ...
        return startFuture;
    } 

    private void referServices() {
        // 配置管理器中获取 @DubboReference 注解引入的目标接口配置信息
        configManager.getReferences().forEach(rc -> {
            ReferenceConfig<?> referenceConfig = (ReferenceConfig<?>) rc;
            ...
            referenceCache.get(rc);
        });
    }
}


public class SimpleReferenceCache implements ReferenceCache {

    public <T> T get(ReferenceConfigBase<T> rc) {
        String key = generator.generateKey(rc);
        Class<?> type = rc.getInterfaceClass();
        Object proxy = rc.get();// ReferenceConfig初始化字段ref
        ...
        return (T) proxy;
    }    
}

2.1. Protocol协议之refer

接口Protocol的扩展类如下图所示:

public interface Protocol {
    // 服务提供者注册目标接口
    @Adaptive
    <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
    // 服务消费端通过注解@DubboReference引进目标接口
    @Adaptive
    <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
    default List<ProtocolServer> getServers() {
        return Collections.emptyList();
    }
}
public class ReferenceConfig<T> extends ReferenceConfigBase<T> {

    private Protocol protocolSPI;
    
    @Override
    public T get() {
        if (ref == null) {
            // ensure start module, compatible with old api usage
            getScopeModel().getDeployer().start();
            synchronized (this) {
                if (ref == null) {
                    init();
                }
            }
        }
        return ref;
    }

    protected synchronized void init() {
        ...
        ref = createProxy(referenceParameters);
        ...
    }

    private T createProxy(Map<String, String> referenceParameters) {
        ...
        createInvokerForRemote();
        ...
        // create service proxy
        return (T) proxyFactory.getProxy(invoker, ProtocolUtils.isGeneric(generic));
    }

    private void createInvokerForRemote() {
        // URL 协议类型为 registry
        URL curUrl = urls.get(0);
        invoker = protocolSPI.refer(interfaceClass, curUrl);
        if (!UrlUtils.isRegistry(curUrl)) {
            List<Invoker<?>> invokers = new ArrayList<>();
            invokers.add(invoker);
            invoker = Cluster.getCluster(scopeModel, Cluster.DEFAULT).join(new StaticDirectory(curUrl, invokers), true);
        }
    }
}

registry类型的url协议:代表注册中心地址,即zk的IP & 端口号等相关注册中心地址相关信息。

registry://zk-host:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-consumer&dubbo=2.0.2&pid=66082&qos.enable=false&register-mode=instance&registry=zookeeper&release=3.0.7&timestamp=1710075502746

 如图所示接口Protocol扩展类中真正初始化字段refer值的是RegistryProtocol#refer

Dubbo之消费端服务RPC调用,dubbo,rpc,java


2.1.1.RegistryProtocol

public class RegistryProtocol implements Protocol, ScopeModelAware {
    @Override
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        // 将registry协议替换为zookeeper协议
        url = getRegistryUrl(url);
        Registry registry = getRegistry(url);
        ...
        Cluster cluster = Cluster.getCluster(url.getScopeModel(), qs.get(CLUSTER_KEY));
        return doRefer(cluster, registry, type, url, qs);
    }

    protected <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url, Map<String, String> parameters) {
        ...
        // consumerUrl 即消费端当前的IP地址,该地址作用是:为provider服务提供响应地址
        URL consumerUrl = new ServiceConfigURL (
            p,null,null,parameters.get(REGISTER_IP_KEY),0, getPath(parameters, type),parameters,consumerAttribute
        );
        url = url.putAttribute(CONSUMER_URL_KEY, consumerUrl);
        // migrationInvoker:MigrationInvoker
        ClusterInvoker<T> migrationInvoker = getMigrationInvoker(this, cluster, registry, type, url, consumerUrl);
        return interceptInvoker(migrationInvoker, url, consumerUrl);
    }

    protected <T> Invoker<T> interceptInvoker(ClusterInvoker<T> invoker, URL url, URL consumerUrl) {
        List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);
        for (RegistryProtocolListener listener : listeners) {
            // 默认情况下存在监听器之MigrationRuleListener
            listener.onRefer(this, invoker, consumerUrl, url);
        }
        return invoker;
    }
}

zookeeper协议:

zookeeper://zk-host:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-consumer&dubbo=2.0.2&pid=66082&qos.enable=false&register-mode=instance&release=3.0.7&timestamp=1710075502746

consumer协议:消费端注册服务时不需要指定端口,因为provider端 & consumer端是通过tcp协议之Netty【端口:20880】完成两端信息通信。

consumer://192.168.1.6/common.service.CountryService?application=dubbo-consumer&background=false&dubbo=2.0.2&interface=common.service.CountryService&lazy=true&methods=getCountry&pid=66082&qos.enable=false&register-mode=instance&register.ip=192.168.1.6&release=3.0.7&revision=2.0&side=consumer&sticky=false&timestamp=1710075502715&version=2.0

 文章来源地址https://www.toymoban.com/news/detail-839011.html


3.ServiceDiscoveryRegistry

public class ServiceDiscoveryRegistry extends FailbackRegistry {

    private final AbstractServiceNameMapping serviceNameMapping;

    @Override
    public void doSubscribe(URL url, NotifyListener listener) {
        url = addRegistryClusterKey(url);
        serviceDiscovery.subscribe(url, listener);
        boolean check = url.getParameter(CHECK_KEY, false);
        String key = ServiceNameMapping.buildMappingKey(url);
        Lock mappingLock = serviceNameMapping.getMappingLock(key);
        mappingLock.lock();
        Set<String> subscribedServices = serviceNameMapping.getCachedMapping(url);
        MappingListener mappingListener = new DefaultMappingListener(url, subscribedServices, listener);
        subscribedServices = serviceNameMapping.getAndListen(this.getUrl(), url, mappingListener);
        mappingListeners.put(url.getProtocolServiceKey(), mappingListener);
        subscribeURLs(url, listener, subscribedServices);
    }

    protected void subscribeURLs(URL url, NotifyListener listener, Set<String> serviceNames) {
        // 
        serviceNames = toTreeSet(serviceNames);
        String serviceNamesKey = toStringKeys(serviceNames);
        String protocolServiceKey = url.getProtocolServiceKey();
        logger.info(String.format("Trying to subscribe from apps %s for service key %s, ", serviceNamesKey, protocolServiceKey));

        // register ServiceInstancesChangedListener
        Lock appSubscriptionLock = getAppSubscription(serviceNamesKey);
        try {
            appSubscriptionLock.lock();
            ServiceInstancesChangedListener serviceInstancesChangedListener = serviceListeners.get(serviceNamesKey);
            if (serviceInstancesChangedListener == null) {
                serviceInstancesChangedListener = serviceDiscovery.createListener(serviceNames);
                serviceInstancesChangedListener.setUrl(url);
                for (String serviceName : serviceNames) {//
                    // 获取提供端服务的IP、端口号等信息
                    List<ServiceInstance> serviceInstances = serviceDiscovery.getInstances(serviceName);
                    if (CollectionUtils.isNotEmpty(serviceInstances)) {
                        serviceInstancesChangedListener.onEvent(new ServiceInstancesChangedEvent(serviceName, serviceInstances));
                    }
                }
                serviceListeners.put(serviceNamesKey, serviceInstancesChangedListener);
            }

            if (!serviceInstancesChangedListener.isDestroyed()) {
                serviceInstancesChangedListener.setUrl(url);
                listener.addServiceListener(serviceInstancesChangedListener);
                serviceInstancesChangedListener.addListenerAndNotify(protocolServiceKey, listener);
                serviceDiscovery.addServiceInstancesChangedListener(serviceInstancesChangedListener);
            } else {
                logger.info(String.format("Listener of %s has been destroyed by another thread.", serviceNamesKey));
                serviceListeners.remove(serviceNamesKey);
            }
        } finally {
            appSubscriptionLock.unlock();
        }
    }
}


public abstract class AbstractServiceNameMapping implements ServiceNameMapping, ScopeModelAware {

    @Override
    public Set<String> getAndListen(URL registryURL, URL subscribedURL, MappingListener listener) {
        String key = ServiceNameMapping.buildMappingKey(subscribedURL);
        // use previously cached services.
        Set<String> mappingServices = this.getCachedMapping(key);
        // Asynchronously register listener in case previous cache does not exist or cache expired.
        if (CollectionUtils.isEmpty(mappingServices)) {
            ...
        } else {
            ExecutorService executorService = applicationModel.getFrameworkModel().getBeanFactory()
                .getBean(FrameworkExecutorRepository.class).getMappingRefreshingExecutor();
            executorService.submit(new AsyncMappingTask(listener, subscribedURL, true));
        }

        return mappingServices;
    }

    private class AsyncMappingTask implements Callable<Set<String>> {
        private final MappingListener listener;
        private final URL subscribedURL;
        private final boolean notifyAtFirstTime;

        public AsyncMappingTask(MappingListener listener, URL subscribedURL, boolean notifyAtFirstTime) {
            this.listener = listener;
            this.subscribedURL = subscribedURL;
            this.notifyAtFirstTime = notifyAtFirstTime;
        }

        @Override
        public Set<String> call() throws Exception {
            synchronized (mappingListeners) {
                Set<String> mappedServices = emptySet();
                try {
                    String mappingKey = ServiceNameMapping.buildMappingKey(subscribedURL);
                    if (listener != null) {
                        // 通过zk 客户端 获取服务提供端的服务名集合
                        mappedServices = toTreeSet(getAndListen(subscribedURL, listener));
                        Set<MappingListener> listeners = mappingListeners.computeIfAbsent(mappingKey, _k -> new HashSet<>());
                        listeners.add(listener);
                        if (CollectionUtils.isNotEmpty(mappedServices)) {
                            if (notifyAtFirstTime) {
                                // 将 提供端服务名 添加到本地集合缓存 serviceNameMapping 中
                                // DefaultMappingListener:本地缓存 & zk 服务端 之间保证 提供端服务名 一致性
                                listener.onEvent(new MappingChangedEvent(mappingKey, mappedServices));
                            }
                        }
                    } else {
                        mappedServices = get(subscribedURL);
                        if (CollectionUtils.isNotEmpty(mappedServices)) {
                            AbstractServiceNameMapping.this.putCachedMapping(mappingKey, mappedServices);
                        }
                    }
                } catch (Exception e) {
                    logger.error("Failed getting mapping info from remote center. ", e);
                }
                return mappedServices;
            }
        }
    }
}
NettyChannel真正netty发送数据。
CodecSupportNetty相关协议。

到了这里,关于Dubbo之消费端服务RPC调用的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Java 【dubbo rpc改feign调用】feign接口异常统一处理

    【框架改造问题点记录,dubbo改为spring cloud alibaba】 【第一篇】feign接口异常统一处理 示例代码中【ApplicationException 】、【Payload 】为自定义异常类和通用结果返回实体类: 示例代码中【ApplicationException 】、【StringUtil】为自定义异常类和自定义工具,自己平替即可:

    2024年02月16日
    浏览(26)
  • 【SpringBoot集成Nacos+Dubbo】企业级项目集成微服务组件,实现RPC远程调用

    在日益增长的业务需求中,一开始使用的是每个项目独立开发,虽然都是前后端分离的项目,但是每一个项目之间互不干扰。后来,因为某种需求,需要几个项目的数据相互交错获取。 最开始的想法就是集成多个数据源。 举例 有A、B、C三个项目,对应着数据库DBa、DBb、DBc、

    2024年02月04日
    浏览(39)
  • Java 【dubbo rpc改feign调用】feign接口调用 Body parameter 4 was null

    【框架改造问题点记录,dubbo改为spring cloud alibaba】 【第四篇】feign接口调用 Body parameter 4 was null 【描述】Feign是一个声明式的Web服务客户端,它使得写HTTP客户端变得更简单。如果你在使用Feign进行服务调用时遇到了\\\"Body parameter 4 was null\\\"这样的错误,这通常意味着你尝试将一个

    2024年02月11日
    浏览(25)
  • Springboot3.X整合Dubbo3.XSpringCloudAlibaba微服务 2022.0 + Springboot3.X 集成 Dubbo实现对外调用http内部调用RPC

    近期自己新开了一套SpringCloud Alibaba微服务项目,接口使用了对外HTTP,内部RPC的设计,具体点说就是外部用户或客户端通过Nginx访问到Gateway网关再分发到各个服务,内部各个服务之间统一使用Dubbo RPC进行通信。下面是Springboot3.x集成Dubbo的分享: 1. 需要的关键依赖 2. 启动程序入

    2024年02月15日
    浏览(26)
  • Mybatis-Plus+Nacos+Dubbo进行远程RPC调用保姆级教程

    本文通过简单的示例代码和说明,让读者能够了解Mybatis-Plus+Nacos+Dubbo进行远程RPC调用的简单使用  默认你已经看过我之前的教程了,并且拥有上个教程完成的项目, 之前的教程 https://www.cnblogs.com/leafstar/p/17638782.html 项目链接在最后   1.在bank1的pom文件中引入以下依赖   2.使用

    2024年02月12日
    浏览(22)
  • jmeter测试rpc接口-使用dubbo框架调用【杭州多测师_王sir】

    1.基于SOAP架构。基于XML规范。基于WebService协议。特点:接口地址?wsdl结尾 2.基于RPC架构,基于dubbo协议,thrift协议。SpringCloud微服务。 3.基于RestFul架构,基于json规范。基于http协议(我们常用的都是这种,cms平台也是) RestFul规则∶ 接口地址: http://127.0.0.1/user , get(查询用户) , 

    2024年02月13日
    浏览(21)
  • 【Dubbo3云原生微服务开发实战】「Dubbo前奏导学」 RPC服务的底层原理和实现

    Dubbo是一款高效而强大的RPC服务框架,它旨在解决微服务架构下的服务监控和通信问题。该框架提供了Java、Golang等多语言的SDK,使得使用者可以轻松构建和开发微服务。Dubbo具备远程地址发现和通信能力,可通过Dubbo独有的身临其境的服务治理特验为主导,以提高开发人员的功

    2024年02月05日
    浏览(33)
  • 介绍 dubbo-go 并在Mac上安装,完成一次自己定义的接口RPC调用

    对开发者更透明,减少了很多的沟通成本。 RPC向远程服务器发送请求时,未必要使用 HTTP 协议,比如还可以用 TCP / IP,性能更高(内部服务更适用)。 📢 注意:在整个流程中,最终的调用并不是由注册中心来完成的。虽然注册中心会提供信息,但实际上调用方需要自己进行

    2024年02月10日
    浏览(26)
  • Dubbo源码浅析(一)—RPC框架与Dubbo

    RPC,Remote Procedure Call 即远程过程调用,与之相对的是本地服务调用,即LPC(Local Procedure Call)。本地服务调用比较常用,像我们应用内部程序 (注意此处是程序而不是方法,程序包含方法) 互相调用即为本地过程调用,而远程过程调用是指在本地调取远程过程进行使用。 而 RPC框

    2024年02月08日
    浏览(29)
  • 自定义Dubbo RPC通信协议

    Dubbo 协议层的核心SPI接口是 org.apache.dubbo.rpc.Protocol ,通过扩展该接口和围绕的相关接口,就可以让 Dubbo 使用我们自定义的协议来通信。默认的协议是 dubbo,本文提供一个 Grpc 协议的实现。 Google 提供了 Java 的 Grpc 实现,所以我们站在巨人的肩膀上即可,就不用重复造轮子了。

    2024年01月19日
    浏览(38)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包