【RocketMQ】RocketMQ标签、过滤及消息重复消费

这篇具有很好参考价值的文章主要介绍了【RocketMQ】RocketMQ标签、过滤及消息重复消费。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

【RocketMQ】RocketMQ标签、过滤及消息重复消费


参考文档: 官方文档

1. 标签(Tag)

Topic 与 Tag 都是业务上用来归类的标识,区别在于 Topic 是一级分类,而 Tag 可以理解为是二级分类。使用 Tag 可以实现对 Topic 中的消息进行过滤。

注:

  • Topic:消息主题,通过 Topic 对不同的业务消息进行分类。
  • Tag:消息标签,用来进一步区分某个 Topic 下的消息分类,消息从生产者发出即带上的属性。

【RocketMQ】RocketMQ标签、过滤及消息重复消费

什么时候应该用Topic,什么时候该用Tag?

可以从以下几个方面进行判断:

  • 消息类型是否一致:如普通消息、事务消息、定时(延时)消息、顺序消息,不同的消息类型使用不同的 Topic,无法通过 Tag 进行区分。
  • 业务是否相关联:没有直接关联的消息,如淘宝交易消息,京东物流消息使用不同的 Topic 进行区分;而同样是天猫交易消息,电器类订单、女装类订单、化妆品类订单的消息可以用 Tag 进行区分。
  • 消息优先级是否一致:如同样是物流消息,盒马必须小时内送达,天猫超市 24 小时内送达,淘宝物流则相对会慢一些,不同优先级的消息用不同的 Topic 进行区分。
  • 消息量级是否相当:有些业务消息虽然量小但是实时性要求高,如果跟某些万亿量级的消息使用同一个 Topic,则有可能会因为过长的等待时间而“饿死”,此时需要将不同量级的消息进行拆分,使用不同的 Topic。

总的来说,针对消息分类,您可以选择创建多个 Topic,或者在同一个 Topic 下创建多个 Tag。但通常情况下,不同的 Topic 之间的消息没有必然的联系,而 Tag 则用来区分同一个 Topic 下相互关联的消息,例如全集和子集的关系、流程先后的关系。


1.1 示例

生产者发送含tag的消息到broker:

@Test
public void tagProducer() throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer("tag-producer-group");
    producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    producer.start();
    //tag为vip1
    Message message = new Message("tagTopic", "vip1", "我是vip1".getBytes());
    //tag为vip2
    Message message2 = new Message("tagTopic", "vip2", "我是vip2".getBytes());
    producer.send(message);
    producer.send(message2);
    System.out.println("发送成功");
    producer.shutdown();
}

消费者1接收 tag为vip1 的消息:

@Test
public void tagConsumer1() throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tag-consumer-group-a");
    consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    //tag为vip1
    consumer.subscribe("tagTopic", "vip1");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
            System.out.println("我是VIP1的消费者,我正在消费消息:" + new String(list.get(0).getBody()));
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.in.read();
}

消费者2接收 tag为vip1和vip2 的消息:

@Test
public void tagConsumer2() throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tag-consumer-group-b");
    consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    consumer.subscribe("tagTopic", "vip1 || vip2");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
            System.out.println("我是VIP1和VIP2的消费者,我正在消费消息:" + new String(list.get(0).getBody()));
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.in.read();
}

2. 键(Keys)

RocketMQ 每个消息可以在业务层面的设置唯一标识码 keys 字段,方便将来定位消息丢失问题。 Broker 端会为每个消息创建索引(哈希索引),应用可以通过 topic、key 来查询这条消息内容,以及消息被谁消费。由于是哈希索引,请务必保证 key 尽可能唯一,这样可以避免潜在的哈希冲突。

注:msgId一定是全局唯一标识符,但是实际使用中,可能会存在相同的消息有两个不同msgId的情况(消费者主动重发、因客户端重投机制导致的重复等),这种情况就需要使业务字段进行重复消费。


2.1 示例

生产者发送一条 tag为key1 ,keys为key 的消息:

@Test
public void keyProducer() throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer("key-producer-group");
    producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    producer.start();
    String key = UUID.randomUUID().toString();
    System.out.println(key);
    Message message = new Message("keyTopic", "key1", key, "我是key1".getBytes());
    producer.send(message);
    System.out.println("发送成功");
    producer.shutdown();
}

消费者接收消息,得到keys:

@Test
public void keyConsumer1() throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("key-consumer-group-a");
    consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    consumer.subscribe("keyTopic", "*");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
            MessageExt messageExt = list.get(0);
            System.out.println("我是VIP1的消费者,我正在消费消息:" + new String(messageExt.getBody()));
            System.out.println(messageExt.getMsgId());
            System.out.println("我们业务的标识:" + messageExt.getKeys());
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.in.read();
}

3. 消息重复消费

消息为什么会重复消费?

  1. 生产者重复投递消息
  2. 消费者扩容导致重平衡

如何解决:

  1. 数据库去重表,保证幂等(数据库唯一索引约束)
  2. redis setnx
  3. 布隆过滤器

3.1 示例

生产者发送两条相同的消息:

@org.junit.jupiter.api.Test
public void repeatProducer() throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer("repeat-producer-group");
    producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    producer.start();

    String key = UUID.randomUUID().toString();
    System.out.println(key);
    //发送两条相同的消息,制造重复消费场景
    Message m1 = new Message("repeatTopic", null, key, "扣减库存".getBytes());
    Message m1Repeat = new Message("repeatTopic", null, key, "扣减库存".getBytes());
    producer.send(m1);
    producer.send(m1Repeat);
    System.out.println("发送成功");
    producer.shutdown();
}

消费者解决重复消费消息:

@org.junit.jupiter.api.Test
void repeatConsumer() throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("repeat-consumer-group");
    consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    consumer.subscribe("repeatTopic", "*");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            // 先拿key
            MessageExt messageExt = msgs.get(0);
            String keys = messageExt.getKeys();
            // 原生方式操作
            Connection connection = null;
            try {
                connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test?serverTimezone=GMT%2B8&useSSL=false", "root", "123456");
            } catch (SQLException e) {
                e.printStackTrace();
            }
            PreparedStatement statement = null;

            try {
                // 插入数据库 因为我们 key做了唯一索引
                statement = connection.prepareStatement("insert into order_log(`type`, `order_sn`, `user`) values (1,'" + keys + "','123')");
            } catch (SQLException e) {
                e.printStackTrace();
            }

            try {
                // 新增 要么成功 要么报错   修改 要么成功,要么返回0 要么报错
                statement.executeUpdate();
            } catch (SQLException e) {
                System.out.println("executeUpdate");
                if (e instanceof SQLIntegrityConstraintViolationException) {
                    // 唯一索引冲突异常
                    // 说明消息来过了
                    System.out.println("该消息来过了");
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
                e.printStackTrace();
            }

            // 处理业务逻辑
            // 如果业务报错 则删除掉这个去重表记录 delete order_log where order_sn = keys;
            System.out.println(new String(messageExt.getBody()));
            System.out.println(keys);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.in.read();
}

消费者接收到消息,直接插入数据库,由数据库根据唯一索引约束对key进行判断,如果成功插入,则继续执行业务逻辑;如果失败,则直接返回。文章来源地址https://www.toymoban.com/news/detail-467643.html

到了这里,关于【RocketMQ】RocketMQ标签、过滤及消息重复消费的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 如何避免重复消费消息

    博主介绍: ✌全网粉丝3W+,全栈开发工程师,从事多年软件开发,在大厂呆过。持有软件中级、六级等证书。可提供微服务项目搭建与毕业项目实战,博主也曾写过优秀论文,查重率极低,在这方面有丰富的经验✌ 博主作品: 《Java项目案例》主要基于SpringBoot+MyBatis/MyBatis

    2024年02月10日
    浏览(79)
  • 【消息队列】聊一下如何避免消息的重复消费

    一条消息在传输过程中,为了保证消息的不丢失,可能会多少量的消息进行重试,这样就可能导致Broker接受到的消息出现重复,如果说下游系统没有针对业务上的处理,那么可能导致同一笔借款或者支付订单出现重复扣款或者重复还款的情况。业务上是不允许出现的。 在MQ

    2024年02月10日
    浏览(30)
  • RocketMQ 消息消费 轮询机制 PullRequestHoldService

    先来看看 RocketMQ 消费过程中的轮询机制是啥。首先需要补充一点消费相关的前置知识。 RocketMQ 支持多种消费方式,包括 Push 模式和 Pull 模式 Pull 模式:用户自己进行消息的拉取和消费进度的更新 Push 模式:Broker 将新的消息自动发送给用户进行消费 我们一般使用 RocketMQ 时用

    2024年02月13日
    浏览(29)
  • RocketMQ的消费模式和消息流控

    消费模式 1、Push模式--PushConsumer 消费端SDK内置了一个长轮询线程,先将消息异步拉取到SDK内置的缓存队列中,再分别提交到消费线程中,触发监听器执行本地消费逻辑。PushConsumer 消费者类型中,客户端SDK和消费逻辑的唯一边界是消费监听器接口。客户端SDK严格按照监听器的返

    2024年02月08日
    浏览(39)
  • kafka如何避免消息重复消费

    Kafka 避免消息重复消费通常依赖于以下策略和机制: Kafka使用Consumer Group ID来跟踪每个消费者所读取的消息。确保每个消费者都具有唯一的Group ID。如果多个消费者属于同一个Group ID,那么它们将共享消息,但每个分区的消息只能由一个消费者处理。 Kafka会记录每个消费者组消

    2024年01月15日
    浏览(29)
  • mq常见问题:消息丢失、消息重复消费、消息保证顺序

    mq常见问题:消息丢失、消息重复消费、消息保证顺序 消息丢失问题 拿rabbitmq举例来说,出现消息丢失的场景如下图 从图中可以看到一共有以下三种可能出现消息丢失的情况: 1 生产者丢消息 生产者在将数据发送到MQ的时候,可能由于网络等原因造成消息投递失败 2MQ自身丢

    2024年02月09日
    浏览(44)
  • RocketMQ消息ACK机制及消费进度管理

    consumer的每个实例是靠队列分配来决定如何消费消息的。那么消费进度具体是如何管理的,又是如何保证消息成功消费的(RocketMQ有保证消息肯定消费成功的特性(失败则重试)? 本文将详细解析消息具体是如何ack的,又是如何保证消费肯定成功的。 由于以上工作所有的机制

    2023年04月08日
    浏览(59)
  • 如何保证消息不被重复消费?

           在Java中,可以使用消息队列来实现消息的异步处理,其中常用的消息队列有 RabbitMQ、ActiveMQ、Kafka 等。 为了避免消息被重复消费,可以使用以下几种方法: 常见的消息队列如 Kafka、RocketMQ等提供了幂等性机制,能够确保同一条消息被消费多次时只会产生一次影响。在

    2024年02月16日
    浏览(18)
  • RocketMQ是是如何管理消费进度的?又是如何保证消息成功消费的?

    consumer的每个实例是靠队列分配来决定如何消费消息的。那么消费进度具体是如何管理的,又是如何保证消息成功消费的?(RocketMQ有保证消息肯定消费成功的特性,失败则重试) 什么是ACK 消息确认机制 在实际使用RocketMQ的时候我们并不能保证每次发送的消息都刚好能被消费者

    2023年04月13日
    浏览(36)
  • RabbitMQ防止消息重复消费、保证异步消息的幂等性

    一、rabbitmq出现消息重复的场景 1、消费成功,没有进行ack,这时 Broker 会重新发送 2、不确认(unack)或 reject 之后,重新排队,Broker 会重新发送 3、消费成功,ack时宕机,没有ack成功,消息由unack变为ready,Broker又重新发送 4、总的来说就是 Broker 发送消息后,消费端收到消息

    2024年02月13日
    浏览(31)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包