SpringCloud之Stream消息驱动RocketMQ讲解

这篇具有很好参考价值的文章主要介绍了SpringCloud之Stream消息驱动RocketMQ讲解。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1 Stream消息驱动

本文是以 RocketMQ 为例讲解,点击此处了解SpringBoot整合RocketMQ

1.1 简介

1.1.1 定义

Spring Cloud Stream 是一个用来为微服务应用构建消息驱动能力的框架。它可以基于 Spring Boot 来创建独立的、可用于生产的 Spring 应用程序。Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,并引入了发布-订阅、消费组、分区这三个核心概念。简单的说,Spring Cloud Stream本质上就是整合了Spring BootSpring Integration,实现了一套轻量级的消息驱动的微服务框架。

1.1.2 抽象模型

我们都知道市面上有很多消息中间件,Sping Cloud Stream 为了可以集成各种各样的中间件,它抽象出了 Binder 的概念,每个消息中间件都需要有对应自己的 Binder。这样它就可以根据不同的 Binder 集成不同的中间件。下图的input和outputchannelBinder则是消息中间件和通道之间的桥梁
SpringCloud之Stream消息驱动RocketMQ讲解,# SpringCloud,MessageQueue,spring cloud,rocketmq,spring

1.1.3 绑定器

通过使用 Spring Cloud Stream,可以有效简化开发人员对消息中间件的使用复杂度,让系统开发人员可以有更多的精力关注于核心业务逻辑的处理。但是目前 Spring Cloud Stream 只支持 RabbitMQKafka 的自动化配置。
Spring Cloud Stream 提供了 Binder (负责与消息中间件进行交互),我们则通过 inputs 或者 outputs 这样的消息通道与 Binder 进行交互。

Binder 绑定器是 Spring cloud Stream 中一个非常重要的概念,实现了应用程序和消息中间件之间的隔离,同时我们也可以通过应用程序实现,消息中间件之间的通信。在我们的项目的可以继承多种绑定器,我们可以根据不同特性的消息使用不同的消息中间件。Spring Cloud Stream 为我们实现了 RabbitMQ 和Kafka 的绑定器。如果你想使用其他的消息中间件需要自己去实现绑定器接口.

1.2 操作实操

1.2.1 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>RocketMQDemo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
        <encoding>UTF-8</encoding>
        <spring.boot.version>2.6.11</spring.boot.version>
        <spring.cloud.version>2021.0.4</spring.cloud.version>
        <spring.cloud.alibaba>2021.0.4.0</spring.cloud.alibaba>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.83</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
        </dependency>

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.9.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-remoting</artifactId>
            <version>4.9.4</version>
        </dependency>

    </dependencies>

    <dependencyManagement>
    <dependencies>
        <!--springboot父依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-dependencies</artifactId>
            <version>${spring.boot.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
        <!--springcloud父依赖-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>${spring.cloud.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
        <!--springcloudalibaba父依赖-->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-alibaba-dependencies</artifactId>
            <version>${spring.cloud.alibaba}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>

    </dependencies>
    </dependencyManagement>

</project>

1.2.2 操作实体

@Data
@ToString
@NoArgsConstructor
@AllArgsConstructor
public class UserEntity {
    private String name;//账号
    private String pass;//密码
}

1.3 Stream 3.x 之前操作

虽然在 SpringCloudStream 3.x 版本后是可以看到 @StreamListener@EnableBinding 都打上了@Deprecated 注解,但是不妨碍我们测试学习

1.3.1 自定义通道

package cn.mq;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;

public interface MyChannel {

    String INPUT = "test-input";
    String OUTPUT = "test-output";

    /**
     * 这两个通道可能定义在两个不同的通道里面,这里为了方便放在同一个项目中演示
     */
    // 收(订阅频道/消息消费者)
    @Input(INPUT)
    SubscribableChannel input();
    // 发(消息生产者)
    @Output(OUTPUT)
    MessageChannel output();
}

1.3.2 消费消息

此处可以使用我们自定义的通道,也可以使用原装的 Sink.class

package cn.mq;

import cn.entity.UserEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;

@Slf4j
//@EnableBinding(Sink.class)
@EnableBinding(MyChannel.class)
public class ReceiveMQ {
    @StreamListener(MyChannel.INPUT)
    public void receive(UserEntity entity){
        log.info("收到消费消息:{}",entity.toString());
    }
}

默认情况下,如果消费者是一个集群,此时,一条消息会被多次消费。通过消息分组,我们可以解决这个问题。

添加如下配置分组,放入组 g1:

spring.cloud.stream.bindings.test-input.group=g1
spring.cloud.stream.bindings.test-output.group=g1

1.3.3 发送消息

package cn.controller;

import cn.entity.UserEntity;
import cn.mq.MyChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@Slf4j
@RestController
public class MQController {
    @Autowired
    private MyChannel myChannel;
    @GetMapping("/test")
    public void test(){
        UserEntity userEntity = new UserEntity("hello", "pass");
        boolean send = myChannel.output().send(MessageBuilder.withPayload(userEntity).build());
        log.info("发送消息:{},结果:{}",userEntity.toString(),send);
    }
}

其中,MessageBuilderSpring Integration中用于创建消息的工具类。以下是createMessage, fromMessagewithPayload方法的区别:

  • createMessage:这是一个静态方法,用于创建一个新的消息。你需要提供消息的负载(payload)和消息头(header)。
    例如:Message<String> message = MessageBuilder.createMessage("Hello World", new MessageHeaders(headers));
  • fromMessage:这个方法用于从一个已存在的消息创建一个新的消息。新的消息将会有相同的负载和消息头。这个方法通常在你想修改一个已存在消息的部分属性但保持其他部分不变时使用。
    例如:Message<String> newMessage = MessageBuilder.fromMessage(oldMessage).setHeader("newHeader", "newValue").build();
  • withPayload:这个方法用于设置消息的负载。你可以链式地调用其他方法(如setHeader)来设置消息头。
    例如:Message<String> message = MessageBuilder.withPayload("Hello World").setHeader("headerKey", "headerValue").build();

总的来说,这三个方法提供了灵活的方式来创建和修改消息,你可以根据具体的需求来选择使用哪一个。

1.3.4 配置文件

spring:
  application:
    name: rokcet-mq-demo
  cloud:
    stream:
      bindings: # 配置消息通道的信息
        test-input: # 自定义消费 通道
          destination: test-optic
          group: test
          binder: rocketmq
        test-output: # 自定义发送 通道
          destination: test-optic
          group: test
          binder: rocketmq
      rocketmq:
        binder:
          name-server: ip:port
          group: test #此处定义整体消费者组名字

1.4 Stream 3.x 之后操作

1.4.1 Stream 3.x 之后讲解

由于 SpringCloudStream 3.x 版本后是 可以看到 @StreamListener@EnableBinding 都打上了@Deprecated 注解。后续的版本更新中会逐渐替换成函数式的方式实现。
既然通过四大函数式接口的方式替换了注解的方式 那么该如何进行绑定呢?
通过 spring.cloud.stream.function.definition:名称的方式进行绑定 公开 topic
不管是创建 Consumer 还是 Supplier 或者是 Function Stream都会将其的 方法名称 进行 一个 topic拆封绑定 假设 创建了一个 Consumer< String > myTopic 的方法,Stream 会将其 拆分成 Inout 两个通道:

  • 输入 - <functionName> + -in- + < index >
    myTopic-in-0
  • 输出 - <functionName> + -out- + < index >
    myTopic-out-0

注意:这里的 functionName需要和代码声明的函数名称还有spring.cloud.stream.function.definition下的名称保持一致

1.4.2 消费消息

package cn.mq;

import cn.entity.UserEntity;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;

import java.util.function.Consumer;

@Configuration
public class ReceiveMQ {
    @Bean
    public Consumer<Message<UserEntity>> myTopicC(){
        return (data)->{
            UserEntity user = data.getPayload();
            MessageHeaders headers = data.getHeaders();
            System.out.println("myTopicC 接收一条记录:" + user);
            System.out.println("getHeaders headerFor:" + headers.get("for"));
        };
    }
  }

1.4.3 发送消息

1.4.3.1 自动发送
package cn.mq;

import cn.entity.UserEntity;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;

import java.util.function.Supplier;

@Configuration
public class SendMQ {
    Integer i = 1;
    @Bean
    public Supplier<Message<UserEntity>> myTopicP() {
        return () -> {
            UserEntity entity = new UserEntity();
            entity.setPass(i++ + "");
            entity.setName(Thread.currentThread().getName());
            System.out.println("myTopicP 发送一条记录:" + entity);
            return MessageBuilder
                    .withPayload(entity)
                    .build();
        };
    }
}

这种方式定义 suppelier 会 默认1000ms 发送一次记录
可以修改:spring.cloud.stream.poller:fixedDelay: 延迟毫秒值

1.4.3.2 手动触发

通过 StreamBridge 触发

package cn.controller;

import cn.entity.UserEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@Slf4j
@RestController
public class MQController {

    @Autowired
    private StreamBridge streamBridge;

    @GetMapping("/test")
    public void sendMsg() {
        UserEntity entity = new UserEntity("hello","world");
        System.out.println("sendMsg 发送一条记录:" + entity);
        streamBridge
                .send(
                        "myTopicP-out-0",
                        MessageBuilder.withPayload(entity)
                                .setHeader("for", "这是一个请求头~")
                                .build());
    }
}

1.4.4 配置文件

spring:
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: localhost:9876
# -------------- 分割线 ---------------
      function:
      # 组装和绑定
      # 手动时把 myTopicP 去掉
        definition: myTopicC;myTopicP
      bindings:
        myTopicC-in-0:
          destination: my-topic
          group: test
       myTopicP-out-0:
          destination: my-topic

1.4.5 中转函数Function

Function< String,String > 范型中有两个参数 :一个入参,一个出参,所以在Stream中可以用来作于一个消息中转站来使用。相当于 top-1 接受到消息 但是我不想处理 我对其数据进行一次处理 发送到 top-2 通道,交给top-2 进行数据的最终处理。

采用手动触发示例,在上面改造测试:

@Bean
public Consumer<UserEntity> testFunctionQ(){
    return (data)->{
        System.out.println("testFunctionQ 消息中转后接收一条记录:" + data);
    };
}

@Bean
public Function<UserEntity, UserEntity> testFunction() {
    return value -> {
        System.out.println("中转 testFunction: " + value);
        value.setPass(value.getPass().toUpperCase());
        value.setName(value.getName().toUpperCase());
        return value;
    };
}

配置文件:

spring:
  application:
    name: rokcet-mq-demo
  cloud:
    stream:
      bindings:
        myTopicP-out-0:
          destination: test-topic
        testFunction-in-0:
          destination: test-topic
          group: my_input_group
        testFunction-out-0:
          destination: test-topic-Q
        testFunctionQ-in-0:
          destination: test-topic-Q
          group: my_input_group-Q

      rocketmq:
        binder:
          name-server: localhost:9876
          group: test
      function:
        definition: testFunction;testFunctionQ

1.5 配置文件讲解

1.5.1 spring.cloud.function.definition

spring.cloud.function.definition 是一个配置属性,用于指定 Spring Cloud Function 应用程序中的函数定义。
这个属性的值是一个以 逗号分隔(如果用逗号分隔有顺序问题,还是最好用分号分隔)的字符串,表示要使用的函数、消费者(Consumer)或生产者(Supplier)的名称。
Spring Cloud Stream 中,这个属性用于将函数、消费者或生产者与消息队列(如 RabbitMQKafka 等)进行绑定。当指定为 Supplier 时,它将作为消息队列的生产者,负责生成并发送消息;当指定为 Consumer 时,它将作为消息队列的消费者,负责接收并处理消息。
例如,假设有一个名为 process 的函数,你可以通过以下配置将其作为消费者与消息队列进行绑定:

spring.cloud.function.definition=process

这样,process 函数将作为消息队列的消费者,接收并处理来自队列的消息。同样,可以将 Supplier 与消息队列进行绑定,作为生产者生成并发送消息。

1.5.2 spring.cloud.stream.binders和bindings区别

spring.cloud.stream.bindersspring.cloud.stream.bindings都是Spring Cloud Stream的配置属性,但它们的用途是不同的。

  • spring.cloud.stream.binders用于配置消息中间件的连接信息。
    例如,如果使用的是 RabbitMQ,你需要在这里配置 RabbitMQ 的主机名、端口、用户名和密码等信息。可以配置多个binder,每个binder对应一个消息中间件。
  • spring.cloud.stream.bindings用于配置消息通道的信息。在Spring Cloud Stream中,消息通道是消息生产者和消费者之间的桥梁。可以在这里配置通道的名称、目标(对应消息中间件中的队列或主题名)、分区等信息。

简单来说,spring.cloud.stream.binders是用来配置消息中间件的,而spring.cloud.stream.bindings是用来配置消息通道的。

spring:
  cloud:
    stream:
      # 如果你项目里只对接一个中间件,那么不用定义binders
      # 当系统要定义多个不同消息中间件的时候,使用binders定义
      binders:
        my-rabbit:
          type: rabbit # 消息中间件类型
          environment: # 连接信息
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings:
        # 添加coupon - Producer
        addCoupon-out-0:
          destination: request-coupon-topic
          content-type: application/json
          binder: my-rabbit
        # 添加coupon - Consumer
        addCoupon-in-0:
          destination: request-coupon-topic
          content-type: application/json
          # 消费组,同一个组内只能被消费一次
          group: add-coupon-group
          binder: my-rabbit  

1.5.3 消费分组

Spring Cloud Stream中,发送者(Producer)不需要分组,只有消费者(Consumer)需要分组。
分组的主要目的是为了实现消息的广播或者分区。当多个消费者在同一个组中时,消息会被任何一个消费者消费,但不会被同一组的所有消费者消费,这就实现了消息的负载均衡。如果每个消费者有自己的组,那么每个消费者都会收到一份消息的拷贝,这就实现了消息的广播。

1.5.4 spring.cloud.stream.rocketmq.binder.group和spring.cloud.stream.bindings.通道名字.group两个属性区别

spring.cloud.stream.rocketmq.binder.group 是全局配置,用于设置默认的消费组名。如果没有在具体的通道中设置消费组名,那么就会使用这个全局配置。

spring.cloud.stream.bindings.通道名字.group 是针对具体通道的配置,用于设置该通道的消费组名。如果在这里设置了消费组名,那么就会覆盖全局配置。

总的来说,这两个属性都是用于设置消费组名的,但是作用范围不同,一个是全局的,一个是针对具体通道的。

报错Property 'group' is required - producerGroup
这时候就需要在 spring.cloud.stream.rocketmq.binder.group属性中设置值,就不会报错了

1.5.5 spring.cloud.stream.bindings和spring.cloud.stream.rocketmq.bindings 区别

spring.cloud.stream.bindingsSpring Cloud Stream的核心配置属性,用于定义消息通道的绑定和配置。
spring.cloud.stream.rocketmq.bindingsSpring Cloud StreamRocketMQ集成时的配置属性,用于定义RocketMQ消息通道的绑定和配置。

具体区别如下:文章来源地址https://www.toymoban.com/news/detail-521764.html

  • spring.cloud.stream.bindings 适用于所有消息中间件,而spring.cloud.stream.rocketmq.bindings仅适用于RocketMQ。
  • spring.cloud.stream.bindings可以配置多个消息通道,而spring.cloud.stream.rocketmq.bindings仅配置 RocketMQ 消息通道。
  • spring.cloud.stream.bindings 的配置属性包括destination、content-type、group、consumer、producer等,而spring.cloud.stream.rocketmq.bindings的配置属性包括topic、tags、access-key、secret-key等,更加针对RocketMQ的特性进行了配置。
  • spring.cloud.stream.bindings的配置可以在应用程序的配置文件中进行,而spring.cloud.stream.rocketmq.bindings的配置需要在RocketMQ的配置文件中进行。

到了这里,关于SpringCloud之Stream消息驱动RocketMQ讲解的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Sprint Cloud Stream整合RocketMq和websocket实现消息发布订阅

    1. 引入RocketMQ依赖 :首先,在 pom.xml 文件中添加RocketMQ的依赖: 2. 配置RocketMQ连接信息 :在 application.properties 或 application.yml 中配置RocketMQ的连接信息,包括Name Server地址等: 3.消息发布组件 4.消息发布控制器 项目结构: 接下来是websocket模块的搭建 1. 依赖添加 2.application.yml配

    2024年02月08日
    浏览(36)
  • SpringCloudAlibaba:消息驱动之RocketMQ学习

    目录 一、MQ简介 (一)什么是MQ (二)MQ的应用场景 1、异步解耦 2、流量削峰 (三)常见的MQ产品 二、RocketMQ入门 (一)RocketMQ安装部署 1、环境要求 2、下载RocketMQ 3、安装RocketMQ 4、启动RocketMQ 5、测试RocketMQ 6、关闭RocketMQ (二)RocketMQ控制台安装与启动 下载并解压 三、sp

    2024年02月16日
    浏览(29)
  • 【微服务学习】spring-cloud-starter-stream 4.x 版本的使用(rocketmq 版)

    @[TOC](【微服务学习】spring-cloud-starter-stream 4.x 版本的使用(rocketmq 版)) 2.1 消息发送者 2.1.1 使用 StreamBridge streamBridge; 往指定信道发送消息 2.1.2 通过隐式绑定信道, 注册 Bean 发送消息 2.2 消息接收者 注意: 多个方法之间可以使用 “|” 间隔, 但是绑定时 多个需要按顺序写. 其中

    2024年02月03日
    浏览(40)
  • rocketMq消息队列详细使用与实践整合spring

    使用RocketMQ的原生API开发是最简单也是目前看来最牢靠的方式。这里用SpringBoot来搭建一系列消息生产者和消息消费者,来访问之前搭建的RocketMQ集群。 首先创建一个基于Maven的SpringBoot工程,引入如下依赖: RocketMQ的官网上有很多经典的测试代码,这些代码虽然依赖的版本比较

    2024年02月13日
    浏览(42)
  • spring Cloud Stream 实战应用深度讲解

    Spring Cloud Stream 是一个框架,用于构建与共享消息传递系统连接的高度可扩展的事件驱动微服务。 该框架提供了一个灵活的编程模型,该模型建立在已经建立和熟悉的 Spring 习惯用语和最佳实践之上,包括对持久发布/订阅语义、消费者组和有状态分区的支持。 核心模块 Dest

    2024年01月24日
    浏览(40)
  • Springbootg整合RocketMQ ——使用 rocketmq-spring-boot-starter 来配置发送和消费 RocketMQ 消息

           本文解析将 RocketMQ Client 端集成为 spring-boot-starter 框架的开发细节,然后通过一个简单的示例来一步一步的讲解如何使用这个 spring-boot-starter 工具包来配置,发送和消费 RocketMQ 消息。 添加maven依赖: 修改application.properties 注意: 请将上述示例配置中的 127.0.0.1:9876 替换

    2024年03月22日
    浏览(45)
  • 【Spring Boot】Spring Boot 集成 RocketMQ 实现简单的消息发送和消费

    本文主要有以下内容: 简单消息的发送 顺序消息的发送 RocketMQTemplate的API介绍 环境搭建: RocketMQ的安装教程:在官网上下载bin文件,解压到本地,并配置环境变量,如下图所示: 在 Spring boot 项目中引入 RocketMQ 依赖: 在application.yml增加相关配置: 在 Spring Boot 中使用RocketM

    2024年02月14日
    浏览(51)
  • Spring Cloud Stream 4.0.4 rabbitmq 发送消息多function

    注意当多个消费者时,需要添加配置项:spring.cloud.function.definition 启动日志 交换机名称对应: spring.cloud.stream.bindings.demo-in-0.destination配置项的值 队列名称是交换机名称+分组名 http://localhost:8080/sendMsg?delay=10000name=zhangsan 问题总结 问题一 解决办法: 查看配置是否正确: spring

    2024年02月19日
    浏览(45)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包