RocketMQ如何安全的批量发送消息❓

这篇具有很好参考价值的文章主要介绍了RocketMQ如何安全的批量发送消息❓。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

优点:

批量发送消息可以提高rocketmq的生产者性能和吞吐量。

使用场景:

  1. 发送大量小型消息时;
  2. 需要降低消息发送延迟时;
  3. 需要提高生产者性能时;

注意事项:

  1. 消息列表的大小不能超过broker设置的最大消息大小;
  2. 消息列表的大小不能超过生产证设置的maxMessageSize 参数,此参数默认为 4MB;
  3. 批量发送消息不支持消息事务;
  4. 如果代码在发送消息列表时发生异常,则可能会发生部分消息发送成功,部分消息发送失败的情况。如果要确保所有消息都已成功发送,则需要增加错误处理逻辑和消息重试机制;

批量发送消息为什么要限制maxMessageSize❓

消息列表的大小不能超过生产者设置的maxMessageSize参数,主要是为了避免消息发送延迟和消息过大导致broker出现性能问题。如果尝试发送大于maxMessageSize的消息,RocketMQ会抛出MessageTooLargeException异常,并且消息不会被发送到broker。

如果开发者在开发时遇到了消息列表大小超过maxMessageSize的情况,可以考虑以下几种处理方式:文章来源地址https://www.toymoban.com/news/detail-773690.html

    1. 提升maxMessageSize参数的大小,这样可以容纳更大的消息列表。但是,需要注意在提升参数大小时,要考虑到RocketMQ broker的性能和网络带宽等因素。
    2. 考虑将消息列表进行拆分,然后分批发送。这样可以避免一次发送过多的消息。
    3. 计算消息的大小并进行压缩。可以使用一些压缩算法,如 LZ4、Snappy 等,对消息进行压缩,以减小消息的大小。
    4. 对超过 maxMessageSize 的消息进行过滤或其他处理。可以通过业务逻辑对消息进行分组或分类,对超过 maxMessageSize 的消息进行过滤或其他处理,以避免发送超出限制的消息。

代码实现

package com.resource.sync.rocketmq;


import java.util.Iterator;
import java.util.List;

/**
 * @description:消息分割,在rocketmq中,一次性发送消息的长度不可超过4mb,此时我们需要进行切割,确保消息长度小于4mb
 **/
public class ListSplitter<T> implements Iterator<List<T>> {
    /**
     * 分割数据大小
     */
    private int sizeLimit;

    /**
     * 分割数据列表
     */
    private final List<T> messages;

    /**
     * 分割索引
     */
    private int currIndex;

    public ListSplitter(int sizeLimit, List<T> messages) {
        this.sizeLimit = sizeLimit;
        this.messages = messages;
    }

    @Override
    public boolean hasNext() {
        return currIndex < messages.size();
    }

    @Override
    public List<T> next() {
        int nextIndex = currIndex;
        int totalSize = 0;
        for (; nextIndex < messages.size(); nextIndex++) {
            T t = messages.get(nextIndex);
            totalSize = totalSize + t.toString().length();
            if (totalSize > sizeLimit) {
                break;
            }
        }
        List<T> subList = messages.subList(currIndex, nextIndex);
        currIndex = nextIndex;
        return subList;
    }
}
    private final int maxMessageSize = 1024 * 1024 * 4;

	/**
     * 消息分割(批量发送)
     */
    private void bulkSendMsg(List<Message<String>> messageList) {
        // 限制数据大小
        ListSplitter splitter = new ListSplitter(maxMessageSize, messageList);
        while (splitter.hasNext()) {
            List<Message> nextList = splitter.next();
            syncBulkSendMessage("topic", nextList);
        }
    }


    /**
     * @param topic
     * @param list
     * @description:发送实时消息(批量)
     */
    public void syncBulkSendMessage(String topic, List<Message> list) {
        SendResult sendResult = null;
        try {
            sendResult = rocketMQTemplate.syncSend(topic, list);
            if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                log.error("BULK_ROCKET_MQ_DISTRIBUTION_ERROR.RESULT_STATUS:{},MSG_ID:{}", sendResult.getSendStatus(), sendResult.getMsgId());
            }
            if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
                log.info("BULK_SEND_MSG_SUCCESS.MSG_ID:{}", sendResult.getMsgId());
            }

        } catch (Exception e) {
            log.error("BULK_ROCKET_MQ_DISTRIBUTION_ERROR:{}", e);
        }
    }

到了这里,关于RocketMQ如何安全的批量发送消息❓的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RocketMQ发送消息失败排查

    错误信息: 错误截图: 查看结果: 说明:发现对应的订阅组已经离线(查看对应的项目MQ地址和配置都是正确的),然后从服务日志中也看不出更多的问题 说明:调整服务日志级别到info,通过详细的日志信息定位发送失败的原因 日志截图: 说明:日志不断打印 closeChanne

    2024年02月04日
    浏览(35)
  • RocketMQ发送消息超时异常

    说明:在使用RocketMQ发送消息时,出现下面这个异常(org.springframework.messging.MessgingException:sendDefaultImpl call timeout……); 解决:修改RocketMQ中broke.conf配置,添加下面这两行配置,重启服务后再试就可以了; 启动时,注意使用下面的命令,带上配置文件

    2024年02月13日
    浏览(62)
  • rocketMQ-console 发送消息

    rocketMQ-console是一款非常使用的rocketMQ扩展工具 工具代码仓 mirrors / apache / rocketmq-externals · GitCode 安装详细教程 ​​​​​​rocketMQ学习笔记二:RocketMQ-Console安装、使用详解_麦田里的码农-CSDN博客_rocketmq-consoled 直接来到工具页面 ,右上角可以切换语言 发送消息流程 1.点击 最

    2024年02月14日
    浏览(35)
  • 13.RocketMQ之消息的存储与发送

    分布式队列因为有高可靠性的要求,所以数据要进行持久化存储。 消息生成者发送消息 Broker收到消息,将消息进行持久化,在存储中新增一条记录 返回ACK给生产者 Broker消息给对应的消费者,然后等待消费者返回ACK 如果消息消费者在指定时间内成功返回ack,那么MQ认为消息消

    2024年02月11日
    浏览(43)
  • [RocketMQ] Producer发送消息的总体流程 (七)

    单向发送: 把消息发向Broker服务器, 不管Broker是否接收, 只管发, 不管结果。 同步发送: 把消息发向Broker服务器, 如果Broker成功接收, 可以得到Broker的响应。 异步发送: 把消息发向Broker服务器, 如果Broker成功接收, 可以得到Broker的响应。异步所以发送消息后, 不用等待, 等到Broker服

    2024年02月11日
    浏览(41)
  • SpringBoot集成RocketMQ实现三种消息发送方式

    目录 一、pom文件引入依赖 二、application.yml文件添加内容 三、创建producer生产者 四、创建Consumer消费者(创建两个消费者,所属一个Topic) 五、启动项目测试 RocketMQ 支持3 种消息发送方式: 同步 (sync)、异步(async)、单向(oneway)。 同步 :发送者向 MQ 执行发送消息API 时

    2024年02月13日
    浏览(46)
  • Springbootg整合RocketMQ ——使用 rocketmq-spring-boot-starter 来配置发送和消费 RocketMQ 消息

           本文解析将 RocketMQ Client 端集成为 spring-boot-starter 框架的开发细节,然后通过一个简单的示例来一步一步的讲解如何使用这个 spring-boot-starter 工具包来配置,发送和消费 RocketMQ 消息。 添加maven依赖: 修改application.properties 注意: 请将上述示例配置中的 127.0.0.1:9876 替换

    2024年03月22日
    浏览(42)
  • RocketMQ教程-(5)-功能特性-消息发送重试和流控机制

    本文为您介绍 Apache RocketMQ 的消息发送重试机制和消息流控机制。 消息发送重试 Apache RocketM Q的消息发送重试机制主要为您解答如下问题: 部分节点异常是否影响消息发送? 请求重试是否会阻塞业务调用? 请求重试会带来什么不足? 消息流控 Apache RocketMQ 的流控机制主要为

    2024年02月15日
    浏览(39)
  • 如何使用 Java 发送消息到 RabbitMQ 中的队列

    RabbitMQ是一个强大的消息队列中间件,可以实现高效的消息传递和解耦。在实际应用中,我们还可以使用更多高级特性,如消息持久化、消息确认机制、消息路由策略等,以满足复杂的业务需求。本文将介绍如何在Spring Boot应用程序中集成RabbitMQ,并实现一个简单的消息发送和

    2024年03月14日
    浏览(44)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包