前言
本文主要有以下内容:
- 简单消息的发送
- 顺序消息的发送
- RocketMQTemplate的API介绍
环境搭建:
RocketMQ的安装教程:在官网上下载bin文件,解压到本地,并配置环境变量,如下图所示:
在 Spring boot 项目中引入 RocketMQ 依赖:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>
在application.yml增加相关配置:
server:
port: 10001
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: springboot_produce_group # 必须指定group
send-message-timeout: 3000 # 消息发送超时时长,默认3s
retry-times-when-send-failed: 3 # 同步发送消息失败重试次数,默认2
retry-times-when-send-async-failed: 3 # 异步发送消息失败重试次数,默认2
consumer:
group: springboot_consumer_group
在 Spring Boot 中使用RocketMQ很简单直接注入RocketMQTemplate对象即可:
@Resource
private RocketMQTemplate rocketMQTemplate;
基本概念
消息和主题相关
消息 message:通信交互的载体,分为事务消息,半事务消息,延迟消息,顺序消息等。
主题 topic:一类消息的集合,逻辑概念。
队列 queue:主题由一个队列或者多个队列构成,当消息发送到某一个主题时,需要选择某一个队列。
偏移量 offset:消息追加到主题的队列后会分配一个数值,表示该队列的几条消息。
消费者相关:
消费组 consume group:消费组用于订阅主题消费消息,可以订阅多个主题,一个消费组可以有多个消费者。
广播模式:同一个消费组内的所有消费者都会消费订阅主题的所有消息。即一条消息会被该消费者组的所有消费者消费。
集群模式:同一个消费组内的所有消费者只消费订阅主题的一部分消息,即一条消息只会被改消费组的一个消费者消费。
并发消费:同一个队列的消息由多线程消费且不保证消息的顺序。
顺序消费:保证同一队列的消息按顺序消费。
发送普通消息
创建MsgController,代码如下:
@RestController
@RequestMapping("send/")
@CrossOrigin(allowedHeaders = "*", origins = "*")
@Slf4j
public class MsgController {
@Resource
private RocketMQTemplate rocketMQTemplate;
@GetMapping("normal")
public void sendNormalMsg() {
Message<String> msg = MessageBuilder.withPayload("Hello,RocketMQ Normal_msg").build();
rocketMQTemplate.send("normal_msg", msg);
}
}
创建消息的消费者,只需要实现RocketMQListener接口中的方法即可,代码如下:
@Component
@RocketMQMessageListener(topic = "normal_msg", consumerGroup = "consumer_normal")
@Slf4j
public class NormalMsgConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
log.info("Receive Normal Msg: {}",message);
}
}
@RocketMQMessageListener注解用在消费者类上,指定当前类消费的主题。
topic:指定消费者的主题 comsumerGroup:指定消费者组(Consumer Group)名称,用于区分不同的消费者。
启动项目,运行结果如下图所示:
发送顺序消息
顺序消息:保证同一队列的消息按顺序消费。
在MsgController 中添加如下代码:
@GetMapping("order")
public void sendOrderMsg(){
log.info("开始发送顺序消息");
for (int j = 0; j < 10; j++) {
Message<String> sendOrderMsg = MessageBuilder.withPayload("Send Order Msg = " + j + " time: "+ LocalDateTime.now()).build();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
rocketMQTemplate.convertAndSend("msg:order", sendOrderMsg);
}
log.info("顺序消息发送结束");
}
创建对应topic消息的消费者,代码如下所示:
@Component
@RocketMQMessageListener(topic = "msg",
consumerGroup = "consumer_order_group",
selectorExpression = "order",
messageModel = MessageModel.CLUSTERING,
selectorType = SelectorType.TAG)
@Slf4j
public class OrderMsgConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
log.info("Receive Order Msg: {}",message);
}
}
@RocketMQMessageListener其他属性介绍:
- selectorExpression: 消息选择表达式,用于过滤消息,只有满足表达式条件的消息才会被消费。默认值为 *,表示订阅所有消息。
全匹配:*,默认值。
属性匹配:指定tag = ‘tagName’,上面的代码就可以改写为"tag = ‘order’"
表达式匹配:需要指定selectType = SelectorType.SQL92,见下面。
- selectorType:指明了消息选择通过tag的方式,默认值SelectorType.TAG。可选值有SelectorType.SQL92
TAG:支持"tagName"的方式配置,如果有多个标签则用||进行连接
SQL92:关键字有AND, OR, NOT, BETWEEN, IN, TRUE, FALSE, IS, NULL。支持的数据类型有Boolean, String, Decimal, Float number等。使用方式如(a > 10 AND a < 100) OR (b IS NOT NULL AND b=TRUE)
- messageModel:消息模式,可选值为 MessageModel.CLUSTERING(默认)或 MessageModel.BROADCASTING,分别表示集群模式和广播模式。
重新启动项目,运行结果如下图所示:
RocketMQTemplate的API介绍
在上面的api使用中,都没有去关注是否消息发送的状态,如是否成功,发送到了哪一个队列等。接下来就介绍一下相关API的使用
带返回值的发送普通消息SendResult syncSend(String destination, Message<?> message);
在MsgController添加如下代码:
@GetMapping("normal_result")
public void sendNormalResultMsg() {
Message<String> msg = MessageBuilder.withPayload("normal_return_result").build();
SendResult normalMsg = rocketMQTemplate.syncSend("normal_msg", msg);
log.info("normalMsg = {}",normalMsg);
}
如log所示,可以看到发送状态等信息。
发送异步消息,在MsgController中添加如下代码:
@GetMapping("callback")
public void sendNormalResultMsgWithCallback(){
Message<String> msg = MessageBuilder.withPayload("normal_return_result").build();
rocketMQTemplate.asyncSend("normal_msg", msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("success");
}
@Override
public void onException(Throwable throwable) {
log.info("error");
}
});
}
运行结果如下所示:
发送顺序消息:在第二部分以及展示过了也可以用如下代码替换
rocketMQTemplate.convertAndSend("msg:order", sendOrderMsg);
// 替换为
rocketMQTemplate.syncSendOrderly("msg:order", sendOrderMsg,String.valueOf(j));
发送单向消息
@GetMapping("oneway")
public void sendOneWay(){
Message<String> oneWay = MessageBuilder.withPayload("Send Order Msg = " + " time: "+ LocalDateTime.now()).build();
rocketMQTemplate.sendOneWay("normal_msg",oneWay);
}
运行结果如下图所示:
文章来源:https://www.toymoban.com/news/detail-631383.html
发送事务消息:暂不举例,后续补充
发送事务消息带回调:和syncSend()类似,后续补充相关用法。文章来源地址https://www.toymoban.com/news/detail-631383.html
参考资料:
- 《RocketMQ 实战》
到了这里,关于【Spring Boot】Spring Boot 集成 RocketMQ 实现简单的消息发送和消费的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!