前面我们知道RocketMQ 发送延时消息与顺序消息,现在我们看下怎么发送批量消息、过滤消息和事务消息。
发送批量消息
限制是这些批量消息应该有相同的 topic,相同的 waitStoreMsgOK,而且不能是延时消息。 此外,这一批消息的总大小不应超过4MB。
消息的生产者
package com.demo.rocketmq.batch;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import java.util.ArrayList;
import java.util.List;
/**
* 批量发送消息
*
* 限制是这些批量消息应该有相同的 topic,相同的 waitStoreMsgOK,而且不能是延时消息。 此外,这一批消息的总大小不应超过4MB。
*/
public class BatchProducer {
public static void main(String[] args) throws Exception {
// 1:实例化消息生产者 Producer, 指定生产组名称
DefaultMQProducer producer = new DefaultMQProducer("produceGroup");
// 2:设置NameServer的地址
producer.setNamesrvAddr("192.168.152.130:9876");
// 3:启动Producer实例
producer.start();
// 4:创建消息
String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
try {
SendResult sendResult = producer.send(messages);
// 5:通过 sendResult 返回消息是否成功送达
System.out.printf("%s%n", sendResult);
System.out.println("发送状态:"+ sendResult.getSendStatus() + ", 消息ID" + sendResult.getMsgId());
} catch (Exception e) {
e.printStackTrace();
//处理error
}
// 6:如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}
消息的消费者
package com.demo.rocketmq.batch;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class BatchConsumer {
public static void main(String[] args) throws Exception {
// 1. 创建消费者,指定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("produceGroup");
// 2. 指定 NameSever 地址
consumer.setNamesrvAddr("192.168.152.130:9876");
// 3. 订阅主题 Topic 和 tag
consumer.subscribe("BatchTest", "TagA");
// 4. 设置回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
list.forEach( messageExt -> System.out.println(new String(messageExt.getBody())));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();
}
}
消息分割
如果消息的总长度可能大于4MB时,这时候最好把消息进行分割
public class ListSplitter implements Iterator<List<Message>> {
private final int SIZE_LIMIT = 1024 * 1024 * 4;
private final List<Message> messages;
private int currIndex;
public ListSplitter(List<Message> messages) {
this.messages = messages;
}
@Override
public boolean hasNext() {
return currIndex < messages.size();
}
@Override
public List<Message> next() {
int nextIndex = currIndex;
int totalSize = 0;
for (; nextIndex < messages.size(); nextIndex++) {
Message message = messages.get(nextIndex);
int tmpSize = message.getTopic().length() + message.getBody().length;
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
}
tmpSize = tmpSize + 20; // 增加日志的开销20字节
if (tmpSize > SIZE_LIMIT) {
//单个消息超过了最大的限制
//忽略,否则会阻塞分裂的进程
if (nextIndex - currIndex == 0) {
//假如下一个子列表没有元素,则添加这个子列表然后退出循环,否则只是退出循环
nextIndex++;
}
break;
}
if (tmpSize + totalSize > SIZE_LIMIT) {
break;
} else {
totalSize += tmpSize;
}
}
List<Message> subList = messages.subList(currIndex, nextIndex);
currIndex = nextIndex;
return subList;
}
}
//把大的消息分裂成若干个小的消息
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
try {
List<Message> listItem = splitter.next();
producer.send(listItem);
} catch (Exception e) {
e.printStackTrace();
//处理error
}
}
过滤消息
RocketMQ 的消息过滤功能通过生产者和消费者对消息的属性、标签进行定义,并在 RocketMQ 服务端根据过滤条件进行筛选匹配,将符合条件的消息投递给消费者进行消费。
消息过滤主要通过以下几个关键流程实现:
-
生产者:生产者在初始化消息时预先为消息设置一些属性和标签,用于后续消费时指定过滤目标。
-
消费者:消费者在初始化及后续消费流程中通过调用订阅关系注册接口,向服务端上报需要订阅指定主题的哪些消息,即过滤条件。
-
服务端:消费者获取消息时会触发服务端的动态过滤计算,Apache RocketMQ 服务端根据消费者上报的过滤条件的表达式进行匹配,并将符合条件的消息投递给消费者。
详细的可以看下官网
这里使用 Tag 过滤,在大多数情况下,TAG是一个简单而有用的设计,其可以来选择想要的消息。例如:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");
发送消息的时候,还是按照正常的方式发送,在消费消息的时候,修改下对应的 tag 表达式就好
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class TagConsumer {
public static void main(String[] args) throws Exception{
// 1. 创建消费者,指定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("produceGroup");
// 2. 指定 NameSever 地址
consumer.setNamesrvAddr("192.168.152.130:9876");
// 3. 订阅主题 Topic 和 tag
consumer.subscribe("TagFilterTopicTest", "TagA || TagB"); // 这块做了修改
// 4. 设置回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
list.forEach( messageExt -> System.out.println(new String(messageExt.getBody())));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();
}
}
事务消息
这里是消息支持事务操作,如果发送消息失败,就可以回滚当前的操作。
整个事务消息的详细交互流程如下图所示:
生产者
对应的生产者需要添加 事务监听器 ,如果返回的状态值为 LocalTransactionState.UNKNOW
; 就会进行回查消息
producer.setTransactionListener(new TransactionListener() {
/**
* 在该方法中执行本地事务
* @param message
* @param o
* @return
*/
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
System.out.println(o);
// 如果是 TagA 就执行提交
if (StringUtils.equals("TagA", message.getTags())) {
return LocalTransactionState.COMMIT_MESSAGE;
} else if (StringUtils.equals("TagB", message.getTags())) {
// 如果是 TagB 就执行回滚
return LocalTransactionState.ROLLBACK_MESSAGE;
} else if (StringUtils.equals("TagC", message.getTags())) {
// 如果是 TagB 就返回 UNKNOW, 返回 UNKNOW 的时候,调用下面的回查方法
return LocalTransactionState.UNKNOW;
} else {
return LocalTransactionState.UNKNOW;
}
}
/**
* 该方法是 MQ 进行消息事务状态回查
* @param messageExt
* @return
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
System.out.println("回查的消息 Tag:" + messageExt.getTags() + ", 消息内容:" + new String(messageExt.getBody()));
return LocalTransactionState.COMMIT_MESSAGE;
}
});
完整代码文章来源:https://www.toymoban.com/news/detail-420290.html
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.*;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* 发送事务消息
* @author wuq
* @Time 2023-1-5 17:46
* @Description
*/
public class TransactionProducer {
public static void main(String[] args) throws Exception {
// 1:实例化消息生产者 Producer, 指定生产组名称
TransactionMQProducer producer = new TransactionMQProducer("produceGroup");
// 2:设置NameServer的地址
producer.setNamesrvAddr("192.168.220.129:9876");
// 3: 添加监听器
producer.setTransactionListener(new TransactionListener() {
/**
* 在该方法中执行本地事务
* @param message
* @param o
* @return
*/
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
System.out.println(o);
// 如果是 TagA 就执行提交
if (StringUtils.equals("TagA", message.getTags())) {
return LocalTransactionState.COMMIT_MESSAGE;
} else if (StringUtils.equals("TagB", message.getTags())) {
// 如果是 TagB 就执行回滚
return LocalTransactionState.ROLLBACK_MESSAGE;
} else if (StringUtils.equals("TagC", message.getTags())) {
// 如果是 TagB 就返回 UNKNOW, 返回 UNKNOW 的时候,调用下面的回查方法
return LocalTransactionState.UNKNOW;
} else {
return LocalTransactionState.UNKNOW;
}
}
/**
* 该方法是 MQ 进行消息事务状态回查
* @param messageExt
* @return
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
System.out.println("回查的消息 Tag:" + messageExt.getTags() + ", 消息内容:" + new String(messageExt.getBody()));
return LocalTransactionState.COMMIT_MESSAGE;
}
});
// 启动生成者
producer.start();
String[] tags = {"TagA", "TagB", "TagC"};
for (int i = 0; i < 3; i++) {
// 4:创建消息,并指定 Topic,Tag 和 消息体
Message msg = new Message("TransactionTopic", tags[i], ("RocketMQ Sync Msg " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
// 5:发送消息到一个Broker, 第二个参数:执行本地事务的回调参数
SendResult sendResult = producer.sendMessageInTransaction(msg, Map.of("callback", "true"));
// 通过sendResult返回消息是否成功送达
System.out.printf("%s%n", sendResult);
System.out.println("发送状态:"+ sendResult.getSendStatus() + ", 消息ID" + sendResult.getMsgId());
TimeUnit.SECONDS.sleep(3);
}
// 6:如果不再发送消息,关闭Producer实例。
// producer.shutdown();
}
}
对应的消费者
在测试的时候发现,会出现 tagC 两次被回查的情况,这里可能是需要做幂等控制文章来源地址https://www.toymoban.com/news/detail-420290.html
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class TransactionConsumer {
public static void main(String[] args) throws Exception {
// 1. 创建消费者,指定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("produceGroup");
// 2. 指定 NameSever 地址
consumer.setNamesrvAddr("192.168.220.129:9876");
// 3. 订阅主题 Topic 和 tag
consumer.subscribe("TransactionTopic", "*");
// 4. 设置回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
list.forEach(messageExt -> {
String body = new String(messageExt.getBody());
String tags = messageExt.getTags();
System.out.println("tag:" + tags + ", body: "+ body);
});
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();
}
}
到了这里,关于RocketMQ 发送批量消息、过滤消息和事务消息的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!