RocketMQ 发送批量消息、过滤消息和事务消息

这篇具有很好参考价值的文章主要介绍了RocketMQ 发送批量消息、过滤消息和事务消息。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

前面我们知道RocketMQ 发送延时消息与顺序消息,现在我们看下怎么发送批量消息、过滤消息和事务消息。

发送批量消息

限制是这些批量消息应该有相同的 topic,相同的 waitStoreMsgOK,而且不能是延时消息。 此外,这一批消息的总大小不应超过4MB。

消息的生产者

package com.demo.rocketmq.batch;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

import java.util.ArrayList;
import java.util.List;

/**
 * 批量发送消息
 *
 * 限制是这些批量消息应该有相同的 topic,相同的 waitStoreMsgOK,而且不能是延时消息。 此外,这一批消息的总大小不应超过4MB。
 */
public class BatchProducer {

    public static void main(String[] args) throws Exception {
        // 1:实例化消息生产者 Producer,  指定生产组名称
        DefaultMQProducer producer = new DefaultMQProducer("produceGroup");
        // 2:设置NameServer的地址
        producer.setNamesrvAddr("192.168.152.130:9876");
        // 3:启动Producer实例
        producer.start();


        // 4:创建消息
        String topic = "BatchTest";
        List<Message> messages = new ArrayList<>();
        messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
        messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
        messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));

        try {
            SendResult sendResult = producer.send(messages);

            // 5:通过 sendResult 返回消息是否成功送达
            System.out.printf("%s%n", sendResult);
            System.out.println("发送状态:"+ sendResult.getSendStatus() + ", 消息ID" + sendResult.getMsgId());
        } catch (Exception e) {
            e.printStackTrace();
            //处理error
        }

        // 6:如果不再发送消息,关闭Producer实例。
        producer.shutdown();
    }
}

消息的消费者

package com.demo.rocketmq.batch;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class BatchConsumer {

    public static void main(String[] args) throws Exception {
        // 1. 创建消费者,指定消费者组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("produceGroup");

        // 2. 指定 NameSever 地址
        consumer.setNamesrvAddr("192.168.152.130:9876");

        // 3. 订阅主题 Topic 和 tag
        consumer.subscribe("BatchTest", "TagA");

        // 4. 设置回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                list.forEach( messageExt -> System.out.println(new String(messageExt.getBody())));
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 启动消费者
        consumer.start();
    }
}

消息分割

如果消息的总长度可能大于4MB时,这时候最好把消息进行分割

public class ListSplitter implements Iterator<List<Message>> {
   private final int SIZE_LIMIT = 1024 * 1024 * 4;
   private final List<Message> messages;
   private int currIndex;
   public ListSplitter(List<Message> messages) {
           this.messages = messages;
   }
    @Override 
    public boolean hasNext() {
       return currIndex < messages.size();
   }
       @Override 
    public List<Message> next() {
       int nextIndex = currIndex;
       int totalSize = 0;
       for (; nextIndex < messages.size(); nextIndex++) {
           Message message = messages.get(nextIndex);
           int tmpSize = message.getTopic().length() + message.getBody().length;
           Map<String, String> properties = message.getProperties();
           for (Map.Entry<String, String> entry : properties.entrySet()) {
               tmpSize += entry.getKey().length() + entry.getValue().length();
           }
           tmpSize = tmpSize + 20; // 增加日志的开销20字节
           if (tmpSize > SIZE_LIMIT) {
               //单个消息超过了最大的限制
               //忽略,否则会阻塞分裂的进程
               if (nextIndex - currIndex == 0) {
                  //假如下一个子列表没有元素,则添加这个子列表然后退出循环,否则只是退出循环
                  nextIndex++;
               }
               break;
           }
           if (tmpSize + totalSize > SIZE_LIMIT) {
               break;
           } else {
               totalSize += tmpSize;
           }

       }
       List<Message> subList = messages.subList(currIndex, nextIndex);
       currIndex = nextIndex;
       return subList;
   }
}

 
//把大的消息分裂成若干个小的消息
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
  try {
      List<Message>  listItem = splitter.next();
      producer.send(listItem);
  } catch (Exception e) {
      e.printStackTrace();
      //处理error
  }
}

过滤消息

RocketMQ 的消息过滤功能通过生产者和消费者对消息的属性、标签进行定义,并在 RocketMQ 服务端根据过滤条件进行筛选匹配,将符合条件的消息投递给消费者进行消费。

消息过滤主要通过以下几个关键流程实现:

  • 生产者:生产者在初始化消息时预先为消息设置一些属性和标签,用于后续消费时指定过滤目标。

  • 消费者:消费者在初始化及后续消费流程中通过调用订阅关系注册接口,向服务端上报需要订阅指定主题的哪些消息,即过滤条件。

  • 服务端:消费者获取消息时会触发服务端的动态过滤计算,Apache RocketMQ 服务端根据消费者上报的过滤条件的表达式进行匹配,并将符合条件的消息投递给消费者。

详细的可以看下官网
这里使用 Tag 过滤,在大多数情况下,TAG是一个简单而有用的设计,其可以来选择想要的消息。例如:

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");

发送消息的时候,还是按照正常的方式发送,在消费消息的时候,修改下对应的 tag 表达式就好

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class TagConsumer {

    public static void main(String[] args) throws Exception{
        // 1. 创建消费者,指定消费者组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("produceGroup");

        // 2. 指定 NameSever 地址
        consumer.setNamesrvAddr("192.168.152.130:9876");

        // 3. 订阅主题 Topic 和 tag
        consumer.subscribe("TagFilterTopicTest", "TagA || TagB");    // 这块做了修改

        // 4. 设置回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                list.forEach( messageExt -> System.out.println(new String(messageExt.getBody())));
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 启动消费者
        consumer.start();
    }
}

事务消息

这里是消息支持事务操作,如果发送消息失败,就可以回滚当前的操作。

整个事务消息的详细交互流程如下图所示:
RocketMQ 发送批量消息、过滤消息和事务消息

生产者

对应的生产者需要添加 事务监听器 ,如果返回的状态值为 LocalTransactionState.UNKNOW; 就会进行回查消息

 producer.setTransactionListener(new TransactionListener() {

    /**
     * 在该方法中执行本地事务
     * @param message
     * @param o
     * @return
     */
    @Override
    public LocalTransactionState executeLocalTransaction(Message message, Object o) {
        System.out.println(o);

        // 如果是 TagA 就执行提交
        if (StringUtils.equals("TagA", message.getTags())) {
            return LocalTransactionState.COMMIT_MESSAGE;
        } else if (StringUtils.equals("TagB", message.getTags())) {
            // 如果是 TagB 就执行回滚
            return LocalTransactionState.ROLLBACK_MESSAGE;
        } else if (StringUtils.equals("TagC",  message.getTags())) {
            // 如果是 TagB 就返回 UNKNOW,  返回 UNKNOW 的时候,调用下面的回查方法
            return LocalTransactionState.UNKNOW;
        } else {
            return LocalTransactionState.UNKNOW;
        }
    }

    /**
     * 该方法是 MQ 进行消息事务状态回查
     * @param messageExt
     * @return
     */
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
        System.out.println("回查的消息 Tag:" + messageExt.getTags() + ", 消息内容:" + new String(messageExt.getBody()));
        return LocalTransactionState.COMMIT_MESSAGE;
    }
});

完整代码

import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.*;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
 * 发送事务消息
 * @author wuq
 * @Time 2023-1-5 17:46
 * @Description
 */
public class TransactionProducer {

    public static void main(String[] args) throws Exception {
        // 1:实例化消息生产者 Producer,  指定生产组名称
        TransactionMQProducer producer = new TransactionMQProducer("produceGroup");
        // 2:设置NameServer的地址
        producer.setNamesrvAddr("192.168.220.129:9876");

        // 3: 添加监听器
        producer.setTransactionListener(new TransactionListener() {

            /**
             * 在该方法中执行本地事务
             * @param message
             * @param o
             * @return
             */
            @Override
            public LocalTransactionState executeLocalTransaction(Message message, Object o) {
                System.out.println(o);

                // 如果是 TagA 就执行提交
                if (StringUtils.equals("TagA", message.getTags())) {
                    return LocalTransactionState.COMMIT_MESSAGE;
                } else if (StringUtils.equals("TagB", message.getTags())) {
                    // 如果是 TagB 就执行回滚
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                } else if (StringUtils.equals("TagC",  message.getTags())) {
                    // 如果是 TagB 就返回 UNKNOW,  返回 UNKNOW 的时候,调用下面的回查方法
                    return LocalTransactionState.UNKNOW;
                } else {
                    return LocalTransactionState.UNKNOW;
                }
            }

            /**
             * 该方法是 MQ 进行消息事务状态回查
             * @param messageExt
             * @return
             */
            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
                System.out.println("回查的消息 Tag:" + messageExt.getTags() + ", 消息内容:" + new String(messageExt.getBody()));
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        });

        // 启动生成者
        producer.start();


        String[] tags = {"TagA", "TagB", "TagC"};

        for (int i = 0; i < 3; i++) {
            // 4:创建消息,并指定 Topic,Tag 和 消息体
            Message msg = new Message("TransactionTopic", tags[i], ("RocketMQ Sync Msg " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));

            // 5:发送消息到一个Broker, 第二个参数:执行本地事务的回调参数
            SendResult sendResult = producer.sendMessageInTransaction(msg, Map.of("callback", "true"));

            // 通过sendResult返回消息是否成功送达
            System.out.printf("%s%n", sendResult);
            System.out.println("发送状态:"+ sendResult.getSendStatus() + ", 消息ID" + sendResult.getMsgId());

            TimeUnit.SECONDS.sleep(3);
        }

        // 6:如果不再发送消息,关闭Producer实例。
//        producer.shutdown();
    }
}

对应的消费者

在测试的时候发现,会出现 tagC 两次被回查的情况,这里可能是需要做幂等控制文章来源地址https://www.toymoban.com/news/detail-420290.html

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;

public class TransactionConsumer {

    public static void main(String[] args) throws Exception {
        // 1. 创建消费者,指定消费者组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("produceGroup");

        // 2. 指定 NameSever 地址
        consumer.setNamesrvAddr("192.168.220.129:9876");

        // 3. 订阅主题 Topic 和 tag
        consumer.subscribe("TransactionTopic", "*");

        // 4. 设置回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                list.forEach(messageExt -> {
                    String body = new String(messageExt.getBody());
                    String tags = messageExt.getTags();
                    System.out.println("tag:" + tags + ", body: "+ body);
                });
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 启动消费者
        consumer.start();
    }
}

到了这里,关于RocketMQ 发送批量消息、过滤消息和事务消息的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RocketMQ13-事务消息的理解

    RocketMQ采用了2PC(二阶段提交)的思想来实现了提交事务消息,同时增加一个补偿逻辑来处理二阶段超时或者失败的消息,如下图所示。     根据上图可以有一个大概的理解,其中分为两个流程:正常事务消息的发送及提交、事务消息的补偿流程。 2.1事务消息发送及提交 : 生

    2024年01月17日
    浏览(35)
  • RocketMQ教程-(5)-功能特性-事务消息

    事务消息为 Apache RocketMQ 中的高级特性消息,本文为您介绍事务消息的应用场景、功能原理、使用限制、使用方法和使用建议。 事务消息为 Apache RocketMQ 中的高级特性消息,本文为您介绍事务消息的应用场景、功能原理、使用限制、使用方法和使用建议。 以电商交易场景为例

    2024年02月15日
    浏览(45)
  • 【BUG事务内消息发送】事务内消息发送,事务还未结束,消息发送已被消费,查无数据怎么解决?

    在一个事务内完成插入操作,通过MQ异步通知其他微服务进行事件处理。 由于是在事务内发送,其他服务消费消息,查询数据时还不存在如何解决呢? 通过spring-tx包的TransactionSynchronizationManager事务管理器解决。 Rocketmq方法封装,通过TransactionSynchronizationManager.isSynchronizationAc

    2024年02月11日
    浏览(56)
  • RocketMQ发送消息超时异常

    说明:在使用RocketMQ发送消息时,出现下面这个异常(org.springframework.messging.MessgingException:sendDefaultImpl call timeout……); 解决:修改RocketMQ中broke.conf配置,添加下面这两行配置,重启服务后再试就可以了; 启动时,注意使用下面的命令,带上配置文件

    2024年02月13日
    浏览(63)
  • RocketMQ发送消息失败排查

    错误信息: 错误截图: 查看结果: 说明:发现对应的订阅组已经离线(查看对应的项目MQ地址和配置都是正确的),然后从服务日志中也看不出更多的问题 说明:调整服务日志级别到info,通过详细的日志信息定位发送失败的原因 日志截图: 说明:日志不断打印 closeChanne

    2024年02月04日
    浏览(36)
  • RocketMQ发送消息

    目录 一.消费模式​编辑 二.发送消息 1.普通消息 同步消息(***)  异步消息(***) 单向消息(*) 日志服务的编写思路 2.延迟消息(***) 延迟等级  3.批量消息 4.顺序消息(*) 三.Tag过滤 订阅关系的一致性 ①订阅一个Topic且订阅一个Tag ②订阅一个Topic且订阅多个Tag ③订阅多个Topic且订阅多

    2024年02月11日
    浏览(37)
  • RocketMQ 事务消息 原理及使用方法解析

    🍊 Java学习:Java从入门到精通总结 🍊 深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想 🍊 绝对不一样的职场干货:大厂最佳实践经验指南 📆 最近更新:2023年3月24日 🍊 个人简介:通信工程本硕 for NJU💪、Java程序员🌕。做过科研paper,发过专利,优秀的程序员不应该只

    2024年01月25日
    浏览(37)
  • rocketMQ-console 发送消息

    rocketMQ-console是一款非常使用的rocketMQ扩展工具 工具代码仓 mirrors / apache / rocketmq-externals · GitCode 安装详细教程 ​​​​​​rocketMQ学习笔记二:RocketMQ-Console安装、使用详解_麦田里的码农-CSDN博客_rocketmq-consoled 直接来到工具页面 ,右上角可以切换语言 发送消息流程 1.点击 最

    2024年02月14日
    浏览(36)
  • 13.RocketMQ之消息的存储与发送

    分布式队列因为有高可靠性的要求,所以数据要进行持久化存储。 消息生成者发送消息 Broker收到消息,将消息进行持久化,在存储中新增一条记录 返回ACK给生产者 Broker消息给对应的消费者,然后等待消费者返回ACK 如果消息消费者在指定时间内成功返回ack,那么MQ认为消息消

    2024年02月11日
    浏览(44)
  • [RocketMQ] Producer发送消息的总体流程 (七)

    单向发送: 把消息发向Broker服务器, 不管Broker是否接收, 只管发, 不管结果。 同步发送: 把消息发向Broker服务器, 如果Broker成功接收, 可以得到Broker的响应。 异步发送: 把消息发向Broker服务器, 如果Broker成功接收, 可以得到Broker的响应。异步所以发送消息后, 不用等待, 等到Broker服

    2024年02月11日
    浏览(42)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包