1 Stream消息驱动
本文是以 RocketMQ
为例讲解,点击此处了解SpringBoot整合RocketMQ
1.1 简介
1.1.1 定义
Spring Cloud Stream
是一个用来为微服务应用构建消息驱动能力的框架。它可以基于 Spring Boot
来创建独立的、可用于生产的 Spring
应用程序。Spring Cloud Stream
为一些供应商的消息中间件产品提供了个性化的自动化配置实现,并引入了发布-订阅、消费组、分区
这三个核心概念。简单的说,Spring Cloud Stream
本质上就是整合了Spring Boot
和Spring Integration
,实现了一套轻量级的消息驱动的微服务框架。
1.1.2 抽象模型
我们都知道市面上有很多消息中间件,Sping Cloud Stream
为了可以集成各种各样的中间件,它抽象出了 Binder
的概念,每个消息中间件都需要有对应自己的 Binder
。这样它就可以根据不同的 Binder
集成不同的中间件。下图的input和output
是channel
,Binder
则是消息中间件和通道之间的桥梁
1.1.3 绑定器
通过使用 Spring Cloud Stream
,可以有效简化开发人员对消息中间件的使用复杂度,让系统开发人员可以有更多的精力关注于核心业务逻辑的处理。但是目前 Spring Cloud Stream
只支持 RabbitMQ
和 Kafka
的自动化配置。Spring Cloud Stream
提供了 Binder
(负责与消息中间件进行交互),我们则通过 inputs
或者 outputs
这样的消息通道与 Binder
进行交互。
Binder
绑定器是 Spring cloud Stream
中一个非常重要的概念,实现了应用程序和消息中间件之间的隔离,同时我们也可以通过应用程序实现,消息中间件之间的通信。在我们的项目的可以继承多种绑定器,我们可以根据不同特性的消息使用不同的消息中间件。Spring Cloud Stream
为我们实现了 RabbitMQ 和Kafka
的绑定器。如果你想使用其他的消息中间件需要自己去实现绑定器接口.
1.2 操作实操
1.2.1 pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>RocketMQDemo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<encoding>UTF-8</encoding>
<spring.boot.version>2.6.11</spring.boot.version>
<spring.cloud.version>2021.0.4</spring.cloud.version>
<spring.cloud.alibaba>2021.0.4.0</spring.cloud.alibaba>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.4</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-remoting</artifactId>
<version>4.9.4</version>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<!--springboot父依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring.boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!--springcloud父依赖-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring.cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!--springcloudalibaba父依赖-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>${spring.cloud.alibaba}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
</project>
1.2.2 操作实体
@Data
@ToString
@NoArgsConstructor
@AllArgsConstructor
public class UserEntity {
private String name;//账号
private String pass;//密码
}
1.3 Stream 3.x 之前操作
虽然在 SpringCloudStream 3.x
版本后是可以看到 @StreamListener
和 @EnableBinding
都打上了@Deprecated
注解,但是不妨碍我们测试学习
1.3.1 自定义通道
package cn.mq;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
public interface MyChannel {
String INPUT = "test-input";
String OUTPUT = "test-output";
/**
* 这两个通道可能定义在两个不同的通道里面,这里为了方便放在同一个项目中演示
*/
// 收(订阅频道/消息消费者)
@Input(INPUT)
SubscribableChannel input();
// 发(消息生产者)
@Output(OUTPUT)
MessageChannel output();
}
1.3.2 消费消息
此处可以使用我们自定义的通道,也可以使用原装的 Sink.class
package cn.mq;
import cn.entity.UserEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
@Slf4j
//@EnableBinding(Sink.class)
@EnableBinding(MyChannel.class)
public class ReceiveMQ {
@StreamListener(MyChannel.INPUT)
public void receive(UserEntity entity){
log.info("收到消费消息:{}",entity.toString());
}
}
默认情况下,如果消费者是一个集群,此时,一条消息会被多次消费。通过消息分组,我们可以解决这个问题。
添加如下配置分组,放入组 g1:
spring.cloud.stream.bindings.test-input.group=g1
spring.cloud.stream.bindings.test-output.group=g1
1.3.3 发送消息
package cn.controller;
import cn.entity.UserEntity;
import cn.mq.MyChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@Slf4j
@RestController
public class MQController {
@Autowired
private MyChannel myChannel;
@GetMapping("/test")
public void test(){
UserEntity userEntity = new UserEntity("hello", "pass");
boolean send = myChannel.output().send(MessageBuilder.withPayload(userEntity).build());
log.info("发送消息:{},结果:{}",userEntity.toString(),send);
}
}
其中,MessageBuilder
是Spring Integration
中用于创建消息的工具类。以下是createMessage
, fromMessage
和withPayload
方法的区别:
-
createMessage
:这是一个静态方法,用于创建一个新的消息。你需要提供消息的负载(payload)和消息头(header)。
例如:Message<String> message = MessageBuilder.createMessage("Hello World", new MessageHeaders(headers));
-
fromMessage
:这个方法用于从一个已存在的消息创建一个新的消息。新的消息将会有相同的负载和消息头。这个方法通常在你想修改一个已存在消息的部分属性但保持其他部分不变时使用。
例如:Message<String> newMessage = MessageBuilder.fromMessage(oldMessage).setHeader("newHeader", "newValue").build();
-
withPayload
:这个方法用于设置消息的负载。你可以链式地调用其他方法(如setHeader
)来设置消息头。
例如:Message<String> message = MessageBuilder.withPayload("Hello World").setHeader("headerKey", "headerValue").build();
总的来说,这三个方法提供了灵活的方式来创建和修改消息,你可以根据具体的需求来选择使用哪一个。
1.3.4 配置文件
spring:
application:
name: rokcet-mq-demo
cloud:
stream:
bindings: # 配置消息通道的信息
test-input: # 自定义消费 通道
destination: test-optic
group: test
binder: rocketmq
test-output: # 自定义发送 通道
destination: test-optic
group: test
binder: rocketmq
rocketmq:
binder:
name-server: ip:port
group: test #此处定义整体消费者组名字
1.4 Stream 3.x 之后操作
1.4.1 Stream 3.x 之后讲解
由于 SpringCloudStream 3.x
版本后是 可以看到 @StreamListener
和 @EnableBinding
都打上了@Deprecated
注解。后续的版本更新中会逐渐替换成函数式的方式实现。
既然通过四大函数式接口的方式替换了注解的方式 那么该如何进行绑定呢?
通过 spring.cloud.stream.function.definition
:名称的方式进行绑定 公开 topic
。
不管是创建 Consumer
还是 Supplier
或者是 Function Stream
都会将其的 方法名称
进行 一个 topic拆封
和 绑定
假设 创建了一个 Consumer< String > myTopic
的方法,Stream
会将其 拆分成 In
和 out
两个通道:
-
输入 - <functionName> + -in- + < index >
myTopic-in-0 -
输出 - <functionName> + -out- + < index >
myTopic-out-0
注意:这里的 functionName
需要和代码声明的函数名称还有spring.cloud.stream.function.definition
下的名称保持一致
1.4.2 消费消息
package cn.mq;
import cn.entity.UserEntity;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import java.util.function.Consumer;
@Configuration
public class ReceiveMQ {
@Bean
public Consumer<Message<UserEntity>> myTopicC(){
return (data)->{
UserEntity user = data.getPayload();
MessageHeaders headers = data.getHeaders();
System.out.println("myTopicC 接收一条记录:" + user);
System.out.println("getHeaders headerFor:" + headers.get("for"));
};
}
}
1.4.3 发送消息
1.4.3.1 自动发送
package cn.mq;
import cn.entity.UserEntity;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import java.util.function.Supplier;
@Configuration
public class SendMQ {
Integer i = 1;
@Bean
public Supplier<Message<UserEntity>> myTopicP() {
return () -> {
UserEntity entity = new UserEntity();
entity.setPass(i++ + "");
entity.setName(Thread.currentThread().getName());
System.out.println("myTopicP 发送一条记录:" + entity);
return MessageBuilder
.withPayload(entity)
.build();
};
}
}
这种方式定义 suppelier
会 默认1000ms
发送一次记录
可以修改:spring.cloud.stream.poller:fixedDelay: 延迟毫秒值
1.4.3.2 手动触发
通过 StreamBridge
触发
package cn.controller;
import cn.entity.UserEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@Slf4j
@RestController
public class MQController {
@Autowired
private StreamBridge streamBridge;
@GetMapping("/test")
public void sendMsg() {
UserEntity entity = new UserEntity("hello","world");
System.out.println("sendMsg 发送一条记录:" + entity);
streamBridge
.send(
"myTopicP-out-0",
MessageBuilder.withPayload(entity)
.setHeader("for", "这是一个请求头~")
.build());
}
}
1.4.4 配置文件
spring:
cloud:
stream:
rocketmq:
binder:
name-server: localhost:9876
# -------------- 分割线 ---------------
function:
# 组装和绑定
# 手动时把 myTopicP 去掉
definition: myTopicC;myTopicP
bindings:
myTopicC-in-0:
destination: my-topic
group: test
myTopicP-out-0:
destination: my-topic
1.4.5 中转函数Function
Function< String,String >
范型中有两个参数 :一个入参,一个出参,所以在Stream
中可以用来作于一个消息中转站来使用。相当于 top-1 接受到消息 但是我不想处理 我对其数据进行一次处理 发送到 top-2 通道,交给top-2 进行数据的最终处理。
采用手动触发示例,在上面改造测试:
@Bean
public Consumer<UserEntity> testFunctionQ(){
return (data)->{
System.out.println("testFunctionQ 消息中转后接收一条记录:" + data);
};
}
@Bean
public Function<UserEntity, UserEntity> testFunction() {
return value -> {
System.out.println("中转 testFunction: " + value);
value.setPass(value.getPass().toUpperCase());
value.setName(value.getName().toUpperCase());
return value;
};
}
配置文件:
spring:
application:
name: rokcet-mq-demo
cloud:
stream:
bindings:
myTopicP-out-0:
destination: test-topic
testFunction-in-0:
destination: test-topic
group: my_input_group
testFunction-out-0:
destination: test-topic-Q
testFunctionQ-in-0:
destination: test-topic-Q
group: my_input_group-Q
rocketmq:
binder:
name-server: localhost:9876
group: test
function:
definition: testFunction;testFunctionQ
1.5 配置文件讲解
1.5.1 spring.cloud.function.definition
spring.cloud.function.definition
是一个配置属性,用于指定 Spring Cloud Function
应用程序中的函数定义。
这个属性的值是一个以 逗号分隔
(如果用逗号分隔有顺序问题
,还是最好用分号分隔
)的字符串,表示要使用的函数、消费者(Consumer)或生产者(Supplier)的名称。
在 Spring Cloud Stream
中,这个属性用于将函数、消费者或生产者与消息队列(如 RabbitMQ
、Kafka
等)进行绑定。当指定为 Supplier
时,它将作为消息队列的生产者,负责生成并发送消息;当指定为 Consumer
时,它将作为消息队列的消费者,负责接收并处理消息。
例如,假设有一个名为 process
的函数,你可以通过以下配置将其作为消费者与消息队列进行绑定:
spring.cloud.function.definition=process
这样,process
函数将作为消息队列的消费者,接收并处理来自队列的消息。同样,可以将 Supplier
与消息队列进行绑定,作为生产者生成并发送消息。
1.5.2 spring.cloud.stream.binders和bindings区别
spring.cloud.stream.binders
和spring.cloud.stream.bindings
都是Spring Cloud Stream的配置属性,但它们的用途是不同的。
-
spring.cloud.stream.binders
用于配置消息中间件的连接信息。
例如,如果使用的是RabbitMQ
,你需要在这里配置RabbitMQ
的主机名、端口、用户名和密码等信息。可以配置多个binder
,每个binder
对应一个消息中间件。 -
spring.cloud.stream.bindings
用于配置消息通道的信息。在Spring Cloud Stream
中,消息通道是消息生产者和消费者之间的桥梁。可以在这里配置通道的名称、目标(对应消息中间件中的队列或主题名)、分区等信息。
简单来说,spring.cloud.stream.binders
是用来配置消息中间件
的,而spring.cloud.stream.bindings
是用来配置消息通道
的。
spring:
cloud:
stream:
# 如果你项目里只对接一个中间件,那么不用定义binders
# 当系统要定义多个不同消息中间件的时候,使用binders定义
binders:
my-rabbit:
type: rabbit # 消息中间件类型
environment: # 连接信息
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings:
# 添加coupon - Producer
addCoupon-out-0:
destination: request-coupon-topic
content-type: application/json
binder: my-rabbit
# 添加coupon - Consumer
addCoupon-in-0:
destination: request-coupon-topic
content-type: application/json
# 消费组,同一个组内只能被消费一次
group: add-coupon-group
binder: my-rabbit
1.5.3 消费分组
在Spring Cloud Stream
中,发送者(Producer
)不需要分组,只有消费者(Consumer
)需要分组。
分组的主要目的是为了实现消息的广播
或者分区
。当多个消费者在同一个组中时,消息会被任何一个消费者消费,但不会被同一组的所有消费者消费,这就实现了消息的负载均衡。如果每个消费者有自己的组,那么每个消费者都会收到一份消息的拷贝,这就实现了消息的广播。
1.5.4 spring.cloud.stream.rocketmq.binder.group和spring.cloud.stream.bindings.通道名字.group两个属性区别
spring.cloud.stream.rocketmq.binder.group
是全局配置,用于设置默认的消费组名。如果没有在具体的通道中设置消费组名,那么就会使用这个全局配置。
spring.cloud.stream.bindings.通道名字.group
是针对具体通道的配置,用于设置该通道的消费组名。如果在这里设置了消费组名,那么就会覆盖全局配置。
总的来说,这两个属性都是用于设置消费组名的,但是作用范围不同,一个是全局的,一个是针对具体通道的。
报错
:Property 'group' is required - producerGroup
这时候就需要在 spring.cloud.stream.rocketmq.binder.group
属性中设置值,就不会报错了
1.5.5 spring.cloud.stream.bindings和spring.cloud.stream.rocketmq.bindings 区别
spring.cloud.stream.bindings
是Spring Cloud Stream
的核心配置属性,用于定义消息通道的绑定和配置。spring.cloud.stream.rocketmq.bindings
是Spring Cloud Stream
与RocketMQ
集成时的配置属性,用于定义RocketMQ
消息通道的绑定和配置。文章来源:https://www.toymoban.com/news/detail-521764.html
具体区别如下:文章来源地址https://www.toymoban.com/news/detail-521764.html
-
spring.cloud.stream.bindings
适用于所有消息中间件,而spring.cloud.stream.rocketmq.bindings
仅适用于RocketMQ。 -
spring.cloud.stream.bindings
可以配置多个消息通道,而spring.cloud.stream.rocketmq.bindings
仅配置RocketMQ
消息通道。 -
spring.cloud.stream.bindings
的配置属性包括destination、content-type、group、consumer、producer
等,而spring.cloud.stream.rocketmq.bindings
的配置属性包括topic、tags、access-key、secret-key
等,更加针对RocketMQ
的特性进行了配置。 -
spring.cloud.stream.bindings
的配置可以在应用程序的配置文件中进行,而spring.cloud.stream.rocketmq.bindings
的配置需要在RocketMQ
的配置文件中进行。
到了这里,关于SpringCloud之Stream消息驱动RocketMQ讲解的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!