【微服务学习】spring-cloud-starter-stream 4.x 版本的使用(rocketmq 版)

这篇具有很好参考价值的文章主要介绍了【微服务学习】spring-cloud-starter-stream 4.x 版本的使用(rocketmq 版)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

@[TOC](【微服务学习】spring-cloud-starter-stream 4.x 版本的使用(rocketmq 版))

1. 区别

3.x 版本

3.x 版本的使用, 都是基本上都是通过注解 
@EnableBinding 
@Input 
@Output 
@StreamListener 
等注解来实现对不同信道的绑定.
具体使用可参考: https://www.cnblogs.com/xfeiyun/p/16229303.html

spring-cloud-starter-stream-rocketmq,微服务,java-rocketmq,学习

4.x 版本

4.x 版本删除了这些注解. 
具体的使用采用隐式绑定的方式(虽然文档上说不推荐这种方式,但是我也没找到其他实现方式) 
具体文档参考: https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html

spring-cloud-starter-stream-rocketmq,微服务,java-rocketmq,学习
spring-cloud-starter-stream-rocketmq,微服务,java-rocketmq,学习spring-cloud-starter-stream-rocketmq,微服务,java-rocketmq,学习

2. 使用

2.1 消息发送者

消息发送, 有两种方式.
方式一: 使用 StreamBridge streamBridge; 往指定信道发送消息.
方式二: 通过隐式绑定信道, 注册 Bean 发送消息
2.1.1 使用 StreamBridge streamBridge; 往指定信道发送消息
	@Resource
    private StreamBridge streamBridge;
    // im-out 为信道名称 与 yml 中的配置绑定 spring.cloud.stream.bindings.im-out
	boolean b = streamBridge.send("im-out", "要发送的消息");
2.1.2 通过隐式绑定信道, 注册 Bean 发送消息
    @Bean
    //注意: 此处的 im 对象是与 spring.cloud.stream.function.bindings.im-out-0 绑定的. 
    //其中 -out-0 是 4.x 的一种约定.
    public Supplier<String> im() {
        return () -> "要发送的信息";
    }
绑定关系大致如图:

spring-cloud-starter-stream-rocketmq,微服务,java-rocketmq,学习

2.2 消息接收者

消息接收, 目前我只得到一种可行性方式, 就是隐式绑定, 与消息发送者类似
另一种方式, 使用: @ServiceActivator(outputChannel = "im") 测试发现, 只能实现异步, 并不走 MQ 发送消息.

    @Bean
    public Function<String, String> toUpperCase() {
        return String::toUpperCase;
    }

    @Bean
    public Consumer<String> sout() {
        return s -> {
            //消息处理者
            System.out.println("接收到的消息切转换大写的结果为: " + s);
        };
    }

spring-cloud-starter-stream-rocketmq,微服务,java-rocketmq,学习

  • 注意: 多个方法之间可以使用 “|” 间隔, 但是绑定时 多个需要按顺序写. 其中 -in-0 是一种约定

3. 实践

首先, 说一下, 我的理解, spring 更多的是往响应式异步编程的方式发展. 
spring 文档中 大部分的 demo 都是采用 FLux 和 Mono 这样的响应式异步编程实现的. 
3.1 消息发送者隐式绑定存在的问题
  • Q: 通过 2.1.2 发现, 消息的发送是通过注册 Bean 的方式, 那是不是在服务启动的时候,
    就已经必须编辑好要发送的内容进行发送, 跟现实使用不符.

  • A: 可以通过 Flux 异步将 FluxSink 暴露出去使用. (不知道怎么描述, 直接看例子吧)


3.2 场景
首先, 基础框架是我之前写的一个 IM 及时聊天框架, 所以, 场景就借助这个框架实现. 
主要关注就是如何发送消息, 和如何接收消息.
3.2.1 消息发送者
  • maven 依赖
 <properties>
        <cloud-cli.version>1.0.0</cloud-cli.version>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <java.version>17</java.version>
        <!--版本用最新版的上一个版本 或 使用数量最多的一个版本-->
        <guava.version>31.1-jre</guava.version>
        <mybatis-plus.version>3.5.3.1</mybatis-plus.version>
        <knife4j.version>3.0.3</knife4j.version>
        <org.mapstruct.version>1.5.3.Final</org.mapstruct.version>
        <lombok-mapstruct-binding.version>0.2.0</lombok-mapstruct-binding.version>
        <redisson.version>3.19.3</redisson.version>
        <spring-cloud.version>2022.0.1</spring-cloud.version>
        <spring-cloud-alibaba.version>2022.0.0.0-RC1</spring-cloud-alibaba.version>
        <spring-boot.version>3.0.2</spring-boot.version>
        <transmittable-thread-local.version>2.14.2</transmittable-thread-local.version>
        <dynamic-ds.version>3.6.1</dynamic-ds.version>
        <jjwt.version>0.11.5</jjwt.version>
    </properties>

 <dependencies>
        <!--web-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>
        <dependency>
            <groupId>com.lvsx</groupId>
            <artifactId>cloud-core</artifactId>
        </dependency>

        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
        </dependency>
    </dependencies>
  • 注册 Bean
   /**
     * MQ 发送助手
     * 通过 AtomicReference 将 FluxSink 暴露出去. 通过 FluxSink.next 发送消息
     *
     * @return MQ
     */
    @Bean
    public AtomicReference<FluxSink<Message<String>>> mqSender() {
        return new AtomicReference<>();
    }

    @Bean
    //消息发送者
    public Supplier<Flux<Message<String>>> im(AtomicReference<FluxSink<Message<String>>> mqSender) {
        return () -> Flux.create(mqSender::set);
    }
  • 发送消息 方式一
	@Resource
    private AtomicReference<FluxSink<Message<String>>> mqSender;

	
    FluxSink<Message<String>> messageFluxSink = mqSender.get();
    
    try {
    	// message 即消息体
    	// Message<String> 验证过 直接发送 消息体对象, 是有问题的, 可能跟序列化有关.
        String s = objectMapper.writeValueAsString(message);
        Message<String> build = MessageBuilder.withPayload(s)
                .setHeader(MessageConst.PROPERTY_TAGS, message.target())
                .build();

        messageFluxSink.next(build);
    } catch (JsonProcessingException e) {
        throw new RuntimeException(e);
    }
  • 发送消息 方式二
	@Resource
    private StreamBridge streamBridge;

 	String s = objectMapper.writeValueAsString(message);
    Message<String> build = MessageBuilder.withPayload(s)
             .setHeader(MessageConst.PROPERTY_TAGS, message.target())
             .build();
    // 注意 MessageBuilder.withPayload 如果直接放对象实体, 
    // 在 send 时, 对象实体转 byte 数组会造成数据丢失, 
    // 怀疑是序列化的问题(只验证过问题, 还没找解决方式, 后续会继续排查)
  	boolean asd = streamBridge.send("im-out", build);
  • yml 配置
spring:
  application:
    name: im
  cloud:
    function:
      definition: im
    stream:
      function:
        bindings:
          im-out-0: im-out
      bindings:
        # 消息生产者
        im-out:
          destination: 'im-group-topic'
          content-type: application/json
          group: 'im-group'
          binder: rocketmq
      rocketmq:
        binder:
          name-server: localhost:9876
          group: im-group
3.2.2 消息消费者.
  • 注册 Bean
    @Bean
    public Consumer<Flux<Message<String>>> im() {
        return messageFlux -> messageFlux.subscribe(stringMessage -> System.out.println(stringMessage.getPayload()));
    }
  • yml 配置
spring:
  application:
    name: test
  cloud:
    function:
      definition: im
    stream:
      function:
        bindings:
          im-in-0: im-in
      bindings:
        # 消息消费者
        im-in:
          destination: 'im-group-topic'
          content-type: application/json
          group: 'im-group'
          binder: rocketmq
      rocketmq:
        binder:
          name-server: localhost:9876
          group: im-group
3.3 验证

spring-cloud-starter-stream-rocketmq,微服务,java-rocketmq,学习
spring-cloud-starter-stream-rocketmq,微服务,java-rocketmq,学习spring-cloud-starter-stream-rocketmq,微服务,java-rocketmq,学习

spring-cloud-starter-stream-rocketmq,微服务,java-rocketmq,学习
spring-cloud-starter-stream-rocketmq,微服务,java-rocketmq,学习
spring-cloud-starter-stream-rocketmq,微服务,java-rocketmq,学习文章来源地址https://www.toymoban.com/news/detail-775641.html

3.4 其他
我也不确定自己的使用方式是不是正确, 如果有大佬有标准的使用方式, 请及时评论, 大家一起学习, 感谢!

到了这里,关于【微服务学习】spring-cloud-starter-stream 4.x 版本的使用(rocketmq 版)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • spring-cloud-starter-gateway 4.0.6负载均衡失败

    需要引入下面负载均衡依赖否则503找不到服务

    2024年02月16日
    浏览(35)
  • org.springframework.cloud:spring-cloud-starter-openfeign:jar is missing详解

    openfeign无法导入的问题 我感觉最近带的好几个新人在搭建springCloud基础框架的时候,会犯一个非常小的错误,导致进度卡住了。 这个错误就是Feign导入的错误: ‘dependencies.dependency.version’ for org.springframework.cloud:spring-cloud-starter-openfeign:jar is missing. 表面上看是jar包没有下载下

    2024年02月07日
    浏览(44)
  • Spring Cloud【消息驱动(什么是Spring Cloud Stream、SpringCloud Stream核心概念、入门案例之消息消费者 )】(十一)

      目录 消息驱动_什么是Spring Cloud Stream 消息驱动_SpringCloud Stream核心概念

    2024年02月15日
    浏览(41)
  • Spring Cloud Stream集成Kafka

    Spring Cloud Stream是一个构建消息驱动微服务的框架,抽象了MQ的使用方式, 提供统一的API操作。Spring Cloud Stream通过Binder(绑定器)、inputs/outputs Channel完成应用程序和MQ的解耦。 Binder 负责绑定应用程序和MQ中间件,即指定应用程序是和KafKa交互还是和RabbitMQ交互或者和其他的MQ中间件交

    2024年02月13日
    浏览(46)
  • spring Cloud Stream 实战应用深度讲解

    Spring Cloud Stream 是一个框架,用于构建与共享消息传递系统连接的高度可扩展的事件驱动微服务。 该框架提供了一个灵活的编程模型,该模型建立在已经建立和熟悉的 Spring 习惯用语和最佳实践之上,包括对持久发布/订阅语义、消费者组和有状态分区的支持。 核心模块 Dest

    2024年01月24日
    浏览(38)
  • Spring cloud stream 结合 rabbitMq使用

    之前的开发主要是底层开发,没有深入涉及到消息方面。现在面对的是一个这样的场景: 假设公司项目A用了RabbitMQ,而项目B用了Kafka。这时候就会出现有两个消息框架,这两个消息框架可能编码有所不同,且结构也有所不同,而且之前甚至可能使用的是别的框架,造成了一个

    2024年02月04日
    浏览(47)
  • For artifact {org.springframework.cloud:spring-cloud-starter-config:null:jar}: The version cannot be

    For artifact {org.springframework.cloud:spring-cloud-starter-config:null:jar}: The version cannot be empty. 前情:之前down的项目一个多月没有动过,前两天想打开看看突然所有的注解、n多类报红。回想了一下,唯一的操作是改了idea中关于maven的设置。                 --把maven仓库的地址改了

    2024年02月05日
    浏览(34)
  • 导入SpringCloud-Eureka依赖的问题Cannot resolve org.springframework.cloud:spring-cloud-starter-netflix-eure

    今天使用SpringCloud时遇到导入SpringCloud-Eureka依赖的问题 Cannot resolve org.springframework.cloud:spring-cloud-starter-netflix-eureka-server:unknown 网上搜了一下,回答多的五花八门,但是不够简单粗暴,这里介绍一个简单的方法,只需要在项目的pom文件中做些设置即可 把大象放进冰箱里需要几步

    2024年02月14日
    浏览(44)
  • 【spring cloud学习】4、创建服务提供者

    注册中心Eureka Server创建并启动之后,接下来介绍如何创建一个Provider并且注册到Eureka Server中,再提供一个REST接口给其他服务调用。 首先一个Provider至少需要两个组件包依赖:Spring Boot Web服务组件和Eureka Client组件。如下所示: Spring Boot Web服务组件用于提供REST接口服务,Eure

    2024年02月11日
    浏览(37)
  • SpringBoot 如何使用 Spring Cloud Stream 处理事件

    在分布式系统中,事件驱动架构(Event-Driven Architecture,EDA)已经成为一种非常流行的架构模式。事件驱动架构将系统中的各个组件连接在一起,以便它们可以相互协作,响应事件并执行相应的操作。SpringBoot 也提供了一种方便的方式来处理事件——使用 Spring Cloud Stream。 Spr

    2024年02月10日
    浏览(48)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包