@[TOC](【微服务学习】spring-cloud-starter-stream 4.x 版本的使用(rocketmq 版))
1. 区别
3.x 版本
3.x 版本的使用, 都是基本上都是通过注解
@EnableBinding
@Input
@Output
@StreamListener
等注解来实现对不同信道的绑定.
具体使用可参考: https://www.cnblogs.com/xfeiyun/p/16229303.html
4.x 版本
4.x 版本删除了这些注解.
具体的使用采用隐式绑定的方式(虽然文档上说不推荐这种方式,但是我也没找到其他实现方式)
具体文档参考: https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html
2. 使用
2.1 消息发送者
消息发送, 有两种方式.
方式一: 使用 StreamBridge streamBridge; 往指定信道发送消息.
方式二: 通过隐式绑定信道, 注册 Bean 发送消息
2.1.1 使用 StreamBridge streamBridge; 往指定信道发送消息
@Resource
private StreamBridge streamBridge;
// im-out 为信道名称 与 yml 中的配置绑定 spring.cloud.stream.bindings.im-out
boolean b = streamBridge.send("im-out", "要发送的消息");
2.1.2 通过隐式绑定信道, 注册 Bean 发送消息
@Bean
//注意: 此处的 im 对象是与 spring.cloud.stream.function.bindings.im-out-0 绑定的.
//其中 -out-0 是 4.x 的一种约定.
public Supplier<String> im() {
return () -> "要发送的信息";
}
绑定关系大致如图:
2.2 消息接收者
消息接收, 目前我只得到一种可行性方式, 就是隐式绑定, 与消息发送者类似
另一种方式, 使用: @ServiceActivator(outputChannel = "im") 测试发现, 只能实现异步, 并不走 MQ 发送消息.
@Bean
public Function<String, String> toUpperCase() {
return String::toUpperCase;
}
@Bean
public Consumer<String> sout() {
return s -> {
//消息处理者
System.out.println("接收到的消息切转换大写的结果为: " + s);
};
}
- 注意: 多个方法之间可以使用 “|” 间隔, 但是绑定时 多个需要按顺序写. 其中 -in-0 是一种约定
3. 实践
首先, 说一下, 我的理解, spring 更多的是往响应式异步编程的方式发展.
spring 文档中 大部分的 demo 都是采用 FLux 和 Mono 这样的响应式异步编程实现的.
3.1 消息发送者隐式绑定存在的问题
-
Q: 通过 2.1.2 发现, 消息的发送是通过注册 Bean 的方式, 那是不是在服务启动的时候,
就已经必须编辑好要发送的内容进行发送, 跟现实使用不符. -
A: 可以通过 Flux 异步将 FluxSink 暴露出去使用. (不知道怎么描述, 直接看例子吧)
3.2 场景
首先, 基础框架是我之前写的一个 IM 及时聊天框架, 所以, 场景就借助这个框架实现.
主要关注就是如何发送消息, 和如何接收消息.
3.2.1 消息发送者
- maven 依赖
<properties>
<cloud-cli.version>1.0.0</cloud-cli.version>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>17</java.version>
<!--版本用最新版的上一个版本 或 使用数量最多的一个版本-->
<guava.version>31.1-jre</guava.version>
<mybatis-plus.version>3.5.3.1</mybatis-plus.version>
<knife4j.version>3.0.3</knife4j.version>
<org.mapstruct.version>1.5.3.Final</org.mapstruct.version>
<lombok-mapstruct-binding.version>0.2.0</lombok-mapstruct-binding.version>
<redisson.version>3.19.3</redisson.version>
<spring-cloud.version>2022.0.1</spring-cloud.version>
<spring-cloud-alibaba.version>2022.0.0.0-RC1</spring-cloud-alibaba.version>
<spring-boot.version>3.0.2</spring-boot.version>
<transmittable-thread-local.version>2.14.2</transmittable-thread-local.version>
<dynamic-ds.version>3.6.1</dynamic-ds.version>
<jjwt.version>0.11.5</jjwt.version>
</properties>
<dependencies>
<!--web-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>com.lvsx</groupId>
<artifactId>cloud-core</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
</dependencies>
- 注册 Bean
/**
* MQ 发送助手
* 通过 AtomicReference 将 FluxSink 暴露出去. 通过 FluxSink.next 发送消息
*
* @return MQ
*/
@Bean
public AtomicReference<FluxSink<Message<String>>> mqSender() {
return new AtomicReference<>();
}
@Bean
//消息发送者
public Supplier<Flux<Message<String>>> im(AtomicReference<FluxSink<Message<String>>> mqSender) {
return () -> Flux.create(mqSender::set);
}
- 发送消息 方式一
@Resource
private AtomicReference<FluxSink<Message<String>>> mqSender;
FluxSink<Message<String>> messageFluxSink = mqSender.get();
try {
// message 即消息体
// Message<String> 验证过 直接发送 消息体对象, 是有问题的, 可能跟序列化有关.
String s = objectMapper.writeValueAsString(message);
Message<String> build = MessageBuilder.withPayload(s)
.setHeader(MessageConst.PROPERTY_TAGS, message.target())
.build();
messageFluxSink.next(build);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
- 发送消息 方式二
@Resource
private StreamBridge streamBridge;
String s = objectMapper.writeValueAsString(message);
Message<String> build = MessageBuilder.withPayload(s)
.setHeader(MessageConst.PROPERTY_TAGS, message.target())
.build();
// 注意 MessageBuilder.withPayload 如果直接放对象实体,
// 在 send 时, 对象实体转 byte 数组会造成数据丢失,
// 怀疑是序列化的问题(只验证过问题, 还没找解决方式, 后续会继续排查)
boolean asd = streamBridge.send("im-out", build);
- yml 配置
spring:
application:
name: im
cloud:
function:
definition: im
stream:
function:
bindings:
im-out-0: im-out
bindings:
# 消息生产者
im-out:
destination: 'im-group-topic'
content-type: application/json
group: 'im-group'
binder: rocketmq
rocketmq:
binder:
name-server: localhost:9876
group: im-group
3.2.2 消息消费者.
- 注册 Bean
@Bean
public Consumer<Flux<Message<String>>> im() {
return messageFlux -> messageFlux.subscribe(stringMessage -> System.out.println(stringMessage.getPayload()));
}
- yml 配置
spring:
application:
name: test
cloud:
function:
definition: im
stream:
function:
bindings:
im-in-0: im-in
bindings:
# 消息消费者
im-in:
destination: 'im-group-topic'
content-type: application/json
group: 'im-group'
binder: rocketmq
rocketmq:
binder:
name-server: localhost:9876
group: im-group
3.3 验证
文章来源:https://www.toymoban.com/news/detail-775641.html
文章来源地址https://www.toymoban.com/news/detail-775641.html
3.4 其他
我也不确定自己的使用方式是不是正确, 如果有大佬有标准的使用方式, 请及时评论, 大家一起学习, 感谢!
到了这里,关于【微服务学习】spring-cloud-starter-stream 4.x 版本的使用(rocketmq 版)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!