Rocket重试机制,消息模式,刷盘方式

这篇具有很好参考价值的文章主要介绍了Rocket重试机制,消息模式,刷盘方式。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、Consumer 批量消费(推模式)

Consumer端先启动 

Consumer端后启动. 正常情况下:应该是Consumer需要先启动

consumer.setConsumeMessageBatchMaxSize(10);//每次拉取10

package quickstart;

import java.util.List;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;

import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;

import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;

import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;

import com.alibaba.rocketmq.client.exception.MQClientException;

import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;

import com.alibaba.rocketmq.common.message.MessageExt;

/**

* Consumer,订阅消息

*/

public class Consumer {

public static void main(String[] args) throws InterruptedException, MQClientException {

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");

consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876");

consumer.setConsumeMessageBatchMaxSize(10);

/**

* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>

* 如果非第一次启动,那么按照上次消费的位置继续消费 ,(消费顺序消息的时候设置)

*/

consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

consumer.subscribe("TopicTest", "*");

consumer.registerMessageListener(new MessageListenerConcurrently() {

public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {

try {

System.out.println("msgs的长度" + msgs.size());

System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);

} catch (Exception e) {

e.printStackTrace();

return ConsumeConcurrentlyStatus.RECONSUME_LATER;

}

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

}

});由于这里是Consumer先启动,所以他回去轮询MQ上是否有订阅队列的消息,由于每次producer插入一条,Consumer就拿一条所以测试结果如下(每次size都是1

2Consumer端后启动,也就是Producer先启动

由于这里是Consumer后启动,所以MQ上也就堆积了一堆数据,Consumer

1Producer端重试

也就是ProducerMQ上发消息没有发送成功,我们可以设置发送失败重试的次数,发送并触发回调函数

2Consumer端重试

2.1exception的情况,一般重复16 10s30s1分钟、2分钟、3分钟等等

上面的代码中消费异常的情况返回

return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试

正常则返回:

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功

二、消息重试机制:消息重试分为2

1Producer端重试

2Consumer端重试

consumer.start();

System.out.println("Consumer Started.");

}

}

consumer.setConsumeMessageBatchMaxSize(10);//每次拉取10

//设置重试的次数

producer.setRetryTimesWhenSendFailed(3);

//开启生产者

producer.start();

//创建一条消息

Message msg = new Message("PushTopic", "push", "1", "我是一条普通消息".getBytes());

//发送消息

SendResult result = producer.send(msg);

//发送,并触发回调函数

producer.send(msg, new SendCallback() {

@Override

//成功的回调函数

public void onSuccess(SendResult sendResult) {

System.out.println(sendResult.getSendStatus());

System.out.println("成功了");

}

@Override

//出现异常的回调函数

public void onException(Throwable e) {

System.out.println("失败了"+e.getMessage());

}

});

package quickstart;

import java.util.List;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;

import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;

import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;

import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;

import com.alibaba.rocketmq.client.exception.MQClientException;

import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;

import com.alibaba.rocketmq.common.message.MessageExt;

/**

* Consumer,订阅消息

*/

public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");

consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876");

consumer.setConsumeMessageBatchMaxSize(10);

/**

* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>

* 如果非第一次启动,那么按照上次消费的位置继续消费

*/

consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

consumer.subscribe("TopicTest", "*");

consumer.registerMessageListener(new MessageListenerConcurrently() {

public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {

try {

// System.out.println("msgs的长度" + msgs.size());

System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);

for (MessageExt msg : msgs) {

String msgbody = new String(msg.getBody(), "utf-8");

if (msgbody.equals("Hello RocketMQ 4")) {

System.out.println("======错误=======");

int a = 1 / 0;

}

}

} catch (Exception e) {

e.printStackTrace();

if(msgs.get(0).getReconsumeTimes()==3){

//记录日志

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功

}else{

return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重试

}

}

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功

}

});

consumer.start();

System.out.println("Consumer Started.");

}

}

假如超过了多少次之后我们可以让他不再重试记录 日志。

if(msgs.get(0).getReconsumeTimes()==3){

//记录日志

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功

}

2.2超时的情况,这种情况MQ会无限制的发送给消费端。

就是由于网络的情况,MQ发送数据之后,Consumer端并没有收到导致超时。也就是消费端没有给我返回return 任何状态,这样的就认为没有到达Consumer端。

这里模拟Producer只发送一条数据。consumer端暂停1分钟并且不发送接收状态给MQ

package model;

import java.util.List;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;

import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;

import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;

import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;

import com.alibaba.rocketmq.client.exception.MQClientException;

import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;

import com.alibaba.rocketmq.common.message.MessageExt;

/**

* Consumer,订阅消息

*/

public class Consumer {

public static void main(String[] args) throws InterruptedException, MQClientException {

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("message_consumer");

consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876");1、集群消费

2、广播消费

rocketMQ默认是集群消费,我们可以通过在Consumer来支持广播消费

三、消费模式

consumer.setConsumeMessageBatchMaxSize(10);

/**

* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>

* 如果非第一次启动,那么按照上次消费的位置继续消费

*/

consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

consumer.subscribe("TopicTest", "*");

consumer.registerMessageListener(new MessageListenerConcurrently() {

public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {

try {

// 表示业务处理时间

System.out.println("=========开始暂停===============");

Thread.sleep(60000);

for (MessageExt msg : msgs) {

System.out.println(" Receive New Messages: " + msg);

}

} catch (Exception e) {

e.printStackTrace();

return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重试

}

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功

}

});

consumer.start();

System.out.println("Consumer Started.");

}

}

consumer.setMessageModel(MessageModel.BROADCASTING);// 广播消费

package model;

import java.util.List;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;

import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;

import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;

import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;

import com.alibaba.rocketmq.client.exception.MQClientException;

import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;

import com.alibaba.rocketmq.common.message.MessageExt;

import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;

/**

* Consumer,订阅消息

*/

public class Consumer2 {

public static void main(String[] args) throws InterruptedException, MQClientException {

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("message_consumer");

consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876");

consumer.setConsumeMessageBatchMaxSize(10);

consumer.setMessageModel(MessageModel.BROADCASTING);// 广播消费

consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

consumer.subscribe("TopicTest", "*");

consumer.registerMessageListener(new MessageListenerConcurrently() {

public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {

try {异步复制和同步双写主要是主和从的关系。消息需要实时消费的,就需要采用主从模式部署

异步复制:比如这里有一主一从,我们发送一条消息到主节点之后,这样消息就算从producer端发送成功了,然后通过异步复制的方法将数据复制到从节点

同步双写:比如这里有一主一从,我们发送一条消息到主节点之后,这样消息就并不算从producer端发送成功了,需要通过同步双写的方法将数据同步到从节点后, 才算数据发

送成功。

如果rocketMq才用双master部署,ProducerMQ上写入20条数据 其中Master1中拉取了12条 。Master2中拉取了8 条,这种情况下,Master1宕机,那么我们消费数据的时

候,只能消费到Master2中的8条,Master1中的12条默认持久化,不会丢失消息,需要Master1恢复之后这12条数据才能继续被消费,如果想保证消息实时消费,就才用双

MasterSlave的模式

同步刷盘:在消息到达MQ后,RocketMQ需要将数据持久化,同步刷盘是指数据到达内存之后,必须刷到commitlog日志之后才算成功,然后返回producer数据已经发送成功。

异步刷盘:,同步刷盘是指数据到达内存之后,返回producer说数据已经发送成功。,然后再写入commitlog日志。

commitlog

commitlog就是来存储所有的元信息,包含消息体,类似于MySQLOracleredolog,所以主要有CommitLog在,Consume Queue即使数据丢失,仍然可以恢复出来。

consumequeue:记录数据的位置,以便Consume快速通过consumequeue找到commitlog中的数据

当生产者向Kafka发送消息,且正常得到响应的时候,可以确保生产者不会产生重复的消息。但是,如果生产者发送消息后,遇到网络问题,无法获取响应,生产者就无法判断该

消息是否成功提交给了Kafka。根据生产者的机制,我们知道,当出现异常时,会进行消息重传,这就可能出现“At least one”语义。为了实现“Exactly once”语义,这里提供两个

可选方案:

如果业务数据产生消息可以找到合适的字段作为主键,或是有一个全局ID生成器,可以优先考虑选用第二种方案。

为了实现消费者的“Exactly once”语义,在这里提供一种方案,供读者参考:消费者将关闭自动提交offset的功能且不再手动提交offset,这样就不使用Offsets Topic这个内部

Topic记录其offset,而是由消费者自己保存offset。这里利用事务的原子性来实现“Exactly once”语义,我们将offset和消息处理结果放在一个事务中,事务执行成功则认为此消

息被消费,否则事务回滚需要重新消费。当出现消费者宕机重启或Rebalance操作时,消费者可以从关系型数据库中找到对应的offset,然后调用KafkaConsumer.seek()方法手

动设置消费位置,从此offset处开始继续消费。

ISRIn-SyncReplica)集合表示的是目前可用alive)且消息量与Leader相差不多的副本集合,这是整个副本集合的一个子集。可用相差不多都是很模糊的描述,其实际

含义是ISR集合中的副本必须满足下面两个条件:

四、conf下的配置文件说明

五、刷盘方式

传递保证语义:

At most once:消息可能会丢,但绝不会重复传递。

At least once:消息绝不会丢,但可能会重复传递。

Exactly once: 每条消息只会被传递一次。

生产者的“Exactly once”语义方案

每个分区只有一个生产者写入消息,当出现异常或超时的情况时,生产者就要查询此分区的最后一个消息,用来决定后续操作是消息重传还是继续发送。

为每个消息添加一个全局唯一主键,生产者不做其他特殊处理,按照之前分析方式进行重传,由消费者对消息进行去重,实现“Exactly once”语义。 文章来源地址https://www.toymoban.com/news/detail-465052.html

到了这里,关于Rocket重试机制,消息模式,刷盘方式的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【RabbitMQ】RabbitMQ 消息的可靠性 —— 生产者和消费者消息的确认,消息的持久化以及消费失败的重试机制

    在现代分布式应用程序中,消息队列扮演了至关重要的角色,允许系统中的各个组件之间进行异步通信。这种通信模式提供了高度的灵活性和可伸缩性,但也引入了一系列的挑战,其中最重要的之一是消息的可靠性。 首先让我们来了解一下,在消息队列中,消息从生产者发送

    2024年02月05日
    浏览(40)
  • 【RabbitMQ】RabbitMQ 消息的可靠性 —— 生产者和消费者消息的确认,消息的持久化以及消费失败的重试机制_rabbitmq 生产者消息确认

    先自我介绍一下,小编浙江大学毕业,去过华为、字节跳动等大厂,目前阿里P7 深知大多数程序员,想要提升技能,往往是自己摸索成长,但自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前! 因此收集整理了一份《2024年最新大数据全套学习资料》,

    2024年04月26日
    浏览(74)
  • 什么是mq?可靠性、重复消息、重复消费、丢失、发送大文件、延迟、发送机制、重试、死信、幂等、有序、大小、过期、优先级、进了死信队列还能出来吗?

    “MQ” 指的是消息队列(Message Queue),是一种用于异步通信的技术。消息队列是一种中间件,用于在分布式系统中传递消息,使不同组件之间能够进行松散耦合的通信。它的核心思想是生产者将消息发送到队列,而消费者从队列中接收并处理消息。 消息队列的主要优点包括

    2024年02月06日
    浏览(38)
  • RabbitMQ:第一章:6 种工作模式以及消息确认机制

    } System.out.println(“发送数据成功”); channel.close(); connection.close(); } } 消费者一: import com.liao.rabbitmq.utils.RabbitConstant; import com.liao.rabbitmq.utils.RabbitUtils; import com.rabbitmq.client.*; import java.io.IOException; /** 消费者1 */ public class ConsumerOne { public static void main(String[] args) throws Exception { Con

    2024年04月12日
    浏览(29)
  • 【rocket-mq】消息发送报错:the broker‘s disk is full

    在本地windows系统上测试rocketmq,根据文档启动nameserver和broker后,编写用例测试消息发送,一直报以下错误,使用rocketmq自带的tools.cmd也是一样: 查看本地所有磁盘发现都还剩余空间,只是c盘最紧张,不过也显示还有10G左右。尝试网上的修改报错阈值: 仍未生效,最后关掉了

    2024年02月16日
    浏览(30)
  • 【重试】Java 中的 7 种重试机制

    随着互联网的发展项目中的业务功能越来越复杂,有一些基础服务我们不可避免的会去调用一些第三方的接口或者公司内其他项目中提供的服务,但是远程服务的健壮性和网络稳定性都是不可控因素。在测试阶段可能没有什么异常情况,但上线后可能会出现调用的接口因为内

    2024年02月16日
    浏览(29)
  • 21 | Kafka Consumer源码分析:消息消费的实现过程

    我们在上节中提到过,用于解决消息队列一些常见问题的知识和原理,最终落地到代码上,都包含在收、发消息这两个流程中。对于消息队列的生产和消费这两个核心流程,在大部分消息队列中,它实现的主要流程都是一样的,所以,通过这两节的学习之后,掌握了这两个流

    2024年02月21日
    浏览(32)
  • RabbitMq 的消息可靠性问题(二)---MQ的消息丢失和consumer消费问题

    RabbitMq 消息可靠性问题(一) — publisher发送时丢失 前面我们从publisher的方向出发解决了发送时丢失的问题,那么我们在发送消息到exchange, 再由exchange转存到queue的过程中。如果MQ宕机了,那么我们的消息是如何确保可靠性的呢?当消息由队列发到对应的消费者处理时,consumer 接

    2024年02月11日
    浏览(35)
  • 【项目实战】Kafka 重平衡 Consumer Group Rebalance 机制

    👉 博主介绍 : 博主从事应用安全和大数据领域,有8年研发经验,5年面试官经验,Java技术专家,WEB架构师,阿里云专家博主,华为云云享专家,51CTO TOP红人 Java知识图谱点击链接: 体系化学习Java(Java面试专题) 💕💕 感兴趣的同学可以收藏关注下 , 不然下次找不到哟

    2024年02月16日
    浏览(31)
  • springboot+RabbitMQ实现一条消息被所有consumer消费

            用户认证中心(Authorization center简称ac)使用jwt实现用户请求身份认证,需要支持多副本部署。系统架构如下:         用户登录后生成jwt,纵向需要通过socket长连接把jwt下发到应用集成层ws,ws再把jwt下发到应用。前端请求各应用时可以在应用的filter中校验jwt是否

    2024年02月04日
    浏览(40)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包