dubbo泛化调用之消费者传递JavaBean

这篇具有很好参考价值的文章主要介绍了dubbo泛化调用之消费者传递JavaBean。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、泛化调用概念

泛化调用是指在调用方没有服务方提供的 API(SDK)的情况下,对服务方进行调用,并且可以正常拿到调用结果。

二、使用场景

泛化调用主要用于实现一个通用的远程服务 Mock 框架,可通过实现 GenericService 接口处理所有服务请求。比如如下场景:

  1. 网关服务:如果要搭建一个网关服务,那么服务网关要作为所有 RPC 服务的调用端。但是网关本身不应该依赖于服务提供方的接口 API(这样会导致每有一个新的服务发布,就需要修改网关的代码以及重新部署),所以需要泛化调用的支持。

  2. 测试平台:如果要搭建一个可以测试 RPC 调用的平台,用户输入分组名、接口、方法名等信息,就可以测试对应的 RPC 服务。那么由于同样的原因(即会导致每有一个新的服务发布,就需要修改网关的代码以及重新部署),所以平台本身不应该依赖于服务提供方的接口 API。所以需要泛化调用的支持。

三、使用方式

demo 可见 dubbo 项目中的示例代码

API 部分以此 demo 为例讲解使用方式。

1. 服务接口
public interface HelloService {

    String sayHello(String name);

    CompletableFuture<String> sayHelloAsync(String name);

    CompletableFuture<Person> sayHelloAsyncComplex(String name);

    CompletableFuture<GenericType<Person>> sayHelloAsyncGenericComplex(String name);
}
2. 服务实现类
public class HelloServiceImpl implements HelloService {

    @Override
    public String sayHello(String name) {
        return "sayHello: " + name;
    }

    @Override
    public CompletableFuture<String> sayHelloAsync(String name) {
        CompletableFuture<String> future = new CompletableFuture<>();
        new Thread(() -> {
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            future.complete("sayHelloAsync: " + name);
        }).start();

        return future;
    }

    @Override
    public CompletableFuture<Person> sayHelloAsyncComplex(String name) {
        Person person = new Person(1, "sayHelloAsyncComplex: " + name);
        CompletableFuture<Person> future = new CompletableFuture<>();
        new Thread(() -> {
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            future.complete(person);
        }).start();

        return future;
    }

    @Override
    public CompletableFuture<GenericType<Person>> sayHelloAsyncGenericComplex(String name) {
        Person person = new Person(1, "sayHelloAsyncGenericComplex: " + name);
        GenericType<Person> genericType = new GenericType<>(person);
        CompletableFuture<GenericType<Person>> future = new CompletableFuture<>();
        new Thread(() -> {
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            future.complete(genericType);
        }).start();

        return future;
    }
}
3. 通过API使用泛化调用
服务启动方
  1. 在设置 ServiceConfig 时,使用setGeneric("true")来开启泛化调用

  2. 在设置 ServiceConfig 时,使用 setRef 指定实现类时,要设置一个 GenericService 的对象。而不是真正的服务实现类对象

  3. 其他设置与正常 Api 服务启动一致即可

private static String zookeeperAddress = "zookeeper://" + System.getProperty("zookeeper.address", "127.0.0.1") + ":2181";

    public static void main(String[] args) throws Exception {
        new EmbeddedZooKeeper(2181, false).start();

        //创建ApplicationConfig
        ApplicationConfig applicationConfig = new ApplicationConfig();
        applicationConfig.setName("generic-impl-provider");
        //创建注册中心配置
        RegistryConfig registryConfig = new RegistryConfig();
        registryConfig.setAddress(zookeeperAddress);
        
        //新建服务实现类,注意要使用GenericService接收
        GenericService helloService = new GenericImplOfHelloService();

        //创建服务相关配置
        ServiceConfig<GenericService> service = new ServiceConfig<>();
        service.setApplication(applicationConfig);
        service.setRegistry(registryConfig);
        service.setInterface("org.apache.dubbo.samples.generic.call.api.HelloService");
        service.setRef(helloService);
        //重点:设置为泛化调用
        //注:不再推荐使用参数为布尔值的setGeneric函数
        //应该使用referenceConfig.setGeneric("true")代替
        service.setGeneric("true");
        service.export();

        System.out.println("dubbo service started");
        
        new CountDownLatch(1).await();
    }
}
泛化调用方

步骤:

  1. 在设置 ReferenceConfig 时,使用 setGeneric("true") 来开启泛化调用

  2. 配置完 ReferenceConfig 后,使用 referenceConfig.get() 获取到 GenericService 类的实例

  3. 使用其 $invoke 方法获取结果

  4. 其他设置与正常 Api 服务启动一致即可

    private static GenericService genericService;
    public static void main(String[] args) throws Exception {
        //创建ApplicationConfig
        ApplicationConfig applicationConfig = new ApplicationConfig();
        applicationConfig.setName("generic-call-consumer");
        //创建注册中心配置
        RegistryConfig registryConfig = new RegistryConfig();
        registryConfig.setAddress("zookeeper://127.0.0.1:2181");
        //创建服务引用配置
        ReferenceConfig<GenericService> referenceConfig = new ReferenceConfig<>();
        //设置接口
        referenceConfig.setInterface("org.apache.dubbo.samples.generic.call.api.HelloService");
        applicationConfig.setRegistry(registryConfig);
        referenceConfig.setApplication(applicationConfig);
        //重点:设置为泛化调用
        //注:不再推荐使用参数为布尔值的setGeneric函数
        //应该使用referenceConfig.setGeneric("true")代替
        referenceConfig.setGeneric(true);
        //设置异步,不必须,根据业务而定。
        referenceConfig.setAsync(true);
        //设置超时时间
        referenceConfig.setTimeout(7000);
        
        //获取服务,由于是泛化调用,所以获取的一定是GenericService类型
        genericService = referenceConfig.get();
        
        //使用GenericService类对象的$invoke方法可以代替原方法使用
        //第一个参数是需要调用的方法名
        //第二个参数是需要调用的方法的参数类型数组,为String数组,里面存入参数的全类名。
        //第三个参数是需要调用的方法的参数数组,为Object数组,里面存入需要的参数。
        Object result = genericService.$invoke("sayHello", new String[]{"java.lang.String"}, new Object[]{"world"});
        //使用CountDownLatch,如果使用同步调用则不需要这么做。
        CountDownLatch latch = new CountDownLatch(1);
        //获取结果
        CompletableFuture<String> future = RpcContext.getContext().getCompletableFuture();
        future.whenComplete((value, t) -> {
            System.err.println("invokeSayHello(whenComplete): " + value);
            latch.countDown();
        });
        //打印结果
        System.err.println("invokeSayHello(return): " + result);
        latch.await();
    }

四、消费者如何传递JavaBean

在Dubbo中,泛化调用可以实现对任何类型的服务方法的调用,包括复杂的参数类型。具体实现方式如下:

1. 首先需要使用泛化调用的客户端对象(GenericService)来调用服务方法,并传入方法名和参数列表。参数列表可以使用Object数组来表示,如下所示

GenericService genericService = referenceConfig.get();
Object[] params = new Object[]{arg1, arg2, ...};
Object result = genericService.$invoke(methodName, parameterTypes, params);

2. 对于复杂参数类型,需要先生成对应的JavaBean类,并将参数封装到该类中。然后将该JavaBean对象作为参数列表中的一个元素传入泛化调用方法中,如下所示:

@Data
public class ComplexRequest implements Serializable {

    private Map<String, Object> content;

    private String key;

    private String value;

    private List<User> users;
}


@Data
public class User implements Serializable {

    private String name;

    private String age;
}

 3. 消费者调用代码完整如下

    @Test
    public void testDubbo() {
        ApplicationConfig applicationConfig = new ApplicationConfig();
        applicationConfig.setName("generic-call-consumer");
        //创建注册中心配置
        RegistryConfig registryConfig = new RegistryConfig();
        registryConfig.setAddress("zookeeper://127.0.0.1:2181");
        //创建服务引用配置
        ReferenceConfig<GenericService> referenceConfig = new ReferenceConfig<>();
        //设置接口, 提供者的接口名
        referenceConfig.setInterface("com.test.api.provideFacade");
        applicationConfig.setRegistry(registryConfig);
        referenceConfig.setApplication(applicationConfig);
        //重点:设置为泛化调用
        //注:不再推荐使用参数为布尔值的setGeneric函数
        //应该使用referenceConfig.setGeneric("true")代替
        referenceConfig.setGeneric(true);
        //设置异步,不必须,根据业务而定。
        referenceConfig.setAsync(false);
        //设置超时时间
        referenceConfig.setTimeout(7000);
        //获取服务,由于是泛化调用,所以获取的一定是GenericService类型
        GenericService genericService = referenceConfig.get();
        //使用GenericService类对象的$invoke方法可以代替原方法使用
        //第一个参数是需要调用的方法名
        //第二个参数是需要调用的方法的参数类型数组,为String数组,里面存入参数的全类名。
        //第三个参数是需要调用的方法的参数数组,为Object数组,里面存入需要的参数。
        ComplexRequest entity = new ComplexRequest();
        Map<String, Object> map = new HashMap<>();
        map.put("key", "key");
        map.put("value", "value");
        entity.setContent(map);
        entity.setKey("key");
        entity.setValue("value");

        List<User> users = new ArrayList<>();
        User user = new User();
        user.setName("name");
        user.setAge("18");
        users.add(user);
        entity.setUsers(user);

        // 构造泛化调用参数
        //消费者调用方的实体类
        String[] paramTypes = new String[]{"com.test.api.request.ComplexRequest"};
        Object[] params = new Object[]{entity};

        Object result = genericService.$invoke("method", paramTypes, params);
        System.out.println(result);
    }

4. 提供者代码如下

public interface ProvideFacade {

    PlainResult<Response> method(ComplexRequest req);
}


public class testFacadeImpl implements testFacade {

    @Override
    public PlainResult<Response> method(ComplexRequest req) {}

}

5. 执行单元测试

运行消费者单元测试代码,请求提供者的dubbo泛化接口,进入到提供者机器,发现请求体已经传递过来,当提供者有多台机器时,请求会负载均衡,轮询到不同的机器上。文章来源地址https://www.toymoban.com/news/detail-707403.html

到了这里,关于dubbo泛化调用之消费者传递JavaBean的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RabbitMQ多消费者实例时,保证只有一个消费者进行消费(单活消费者模式)

    有一种业务场景,当人员组织结构变更时,会有大量数据进行推送。这些数据类型有的是add,有的是update,并且必须先add,才能进行update。 这时,为了保证消费顺序,需要 只有一个实例进行按顺序消费,其他实例仅提供日常对外服务 ,不进行消息消费。当唯一消费实例无法

    2024年02月11日
    浏览(29)
  • Kafka3.0.0版本——消费者(消费者组详细消费流程图解及消费者重要参数)

    创建一个消费者网络连接客户端,主要用于与kafka集群进行交互,如下图所示: 调用sendFetches发送消费请求,如下图所示: (1)、Fetch.min.bytes每批次最小抓取大小,默认1字节 (2)、fetch.max.wait.ms一批数据最小值未达到的超时时间,默认500ms (3)、Fetch.max.bytes每批次最大抓取大小,默

    2024年02月09日
    浏览(33)
  • 10、Kafka ------ 消费者组 和 消费者实例,分区 和 消费者实例 之间的分配策略

    形象来说:你可以把主题内的多个分区当成多个子任务、多个子任务组成项目,每个消费者实例就相当于一个员工,假如你们 team 包含2个员工。 同理: 同一主题下,每个分区最多只会分给同一个组内的一个消费者实例 消费者以组的名义来订阅主题,前面的 kafka-console-consu

    2024年01月19日
    浏览(33)
  • 13、Kafka ------ kafka 消费者API用法(消费者消费消息代码演示)

    消费者API的核心类是 KafkaConsumer,它提供了如下常用方法: 下面这些方法都体现了Kafka是一个数据流平台,消费者通过这些方法可以从分区的任意位置、重新开始读取数据。 根据KafkaConsumer不难看出,使用消费者API拉取消息很简单,基本只要几步: 1、创建KafkaConsumer对象,创建

    2024年04月11日
    浏览(30)
  • Kafka3.0.0版本——消费者(消费者组原理)

    1.1、消费者组概述 Consumer Group(CG):消费者组,由多个consumer组成。形成一个消费者组的条件,是所有消费者的groupid相同。 注意: (1)、消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费。 (2)、消费者组之间互不影响。所有的消费者

    2024年02月09日
    浏览(36)
  • kafka复习:(22)一个分区只能被消费者组中的一个消费者消费吗?

    默认情况下,一个分区只能被消费者组中的一个消费者消费。但可以自定义PartitionAssignor来打破这个限制。 一、自定义PartitionAssignor. 二、定义两个消费者,给其配置上述PartitionAssignor. 在kafka创建只有一个分区的topic : study2023 创建一个生产者往study2023这个 topic发送消息: 分别

    2024年02月10日
    浏览(26)
  • 【Kafka】【十七】消费者poll消息的细节与消费者心跳配置

    默认情况下,消费者⼀次会poll500条消息。 代码中设置了⻓轮询的时间是1000毫秒 意味着: 如果⼀次poll到500条,就直接执⾏for循环 如果这⼀次没有poll到500条。且时间在1秒内,那么⻓轮询继续poll,要么到500条,要么到1s 如果多次poll都没达到500条,且1秒时间到了,那么直接执

    2024年02月09日
    浏览(31)
  • 分布式 - 消息队列Kafka:Kafka消费者和消费者组

    1. Kafka 消费者是什么? 消费者负责订阅Kafka中的主题,并且从订阅的主题上拉取消息。与其他一些消息中间件不同的是:在Kafka的消费理念中还有一层消费组的概念,每个消费者都有一个对应的消费组。当消息发布到主题后,只会被投递给订阅它的每个消费组中的一个消费者

    2024年02月13日
    浏览(32)
  • kafka配置多个消费者groupid kafka多个消费者消费同一个partition(java)

    kafka是由Apache软件基金会开发的一个开源流处理平台。kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 kafka中partition类似数据库中的分表数据,可以起到水平扩展数据的目的,比如有a,b,c,d,e,f 6个数据,某个topic有两个partition,一

    2024年01月22日
    浏览(50)
  • Kafka3.0.0版本——消费者(消费者总体工作流程图解)

    角色划分:生产者、zookeeper、kafka集群、消费者、消费者组。如下图所示: 生产者发送消息给leader,followerr主动从leader同步数据,一个消费者可以消费某一个分区数据或者一个消费者可以消费多个分区数据。如下图所示: 每个分区的数据只能由消费者组中一个消费者消费。如下

    2024年02月09日
    浏览(40)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包