RocketMQ角色
RocketMQ的基本概念
同步发送消息
/**
* @author
* @create 2023-04-08 17:24
* 发送同步消息
* 适用于重要的消息 例如消息通知,短信通知
*/
public class SyncProducer {
public static void main(String[] args) throws Exception {
String topic="testRocketMQ";
String tag="Tag1";
String body="Hello RocketMQ";
//创建消息生产者Producer,并制订生产者组名
DefaultMQProducer producer = new DefaultMQProducer("group1");
//指定NameServer地址
producer.setNamesrvAddr("localhost:9876");
//启动producer
producer.start();
//创建消息对象,指定主题topic,tag和消息体
for (int i = 0; i < 10; i++) {
/**
* 主题 topic
* 消息的tag
* 消息内容
*/
Message message = new Message("testRocketMQ","Tag1",("Hello RocketMQ"+i).getBytes(RemotingHelper.DEFAULT_CHARSET));
//发送消息
SendResult result = producer.send(message,1000);
//发送状态
SendStatus status = result.getSendStatus();
System.out.println("发送状态->"+status);
String msgId = result.getMsgId();
System.out.println("消息id->"+msgId);
int queueId = result.getMessageQueue().getQueueId();
System.out.println("消息接收队列id"+queueId);
TimeUnit.SECONDS.sleep(1);//线程休息1s
}
//关闭生产者producer
producer.shutdown();
}
}
异步发送
/**
* @author
* @create 2023-04-08 19:52
* 发送异步消息,异步监听,通过回调接口的方式接受服务端的响应
* 适用于数据量较大,不能容忍阻塞场景的
*/
public class ASyncProducer {
public static void main(String[] args) throws Exception {
//创建消息生产者Producer,并制订生产者组名
DefaultMQProducer producer = new DefaultMQProducer("group1");
//指定NameServer地址
producer.setNamesrvAddr("localhost:9876");
//启动producer
producer.start();
//创建消息对象,指定主题topic,tag和消息体
for (int i = 0; i < 10; i++) {
/**
* 主题 topic
* 消息的tag
* 消息内容
*/
Message message = new Message("testRocketMQ","Tag1",("Hello RocketMQ"+i).getBytes(RemotingHelper.DEFAULT_CHARSET));
int index=i;
//发送消息
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d OK %s %n", index,
sendResult.getMsgId());
System.out.println("发送成功"+sendResult);
}
@Override
public void onException(Throwable throwable) {
System.out.printf("%-10d Exception %s %n", index, throwable);
}
});
TimeUnit.SECONDS.sleep(1);//线程休息1s
}
//关闭生产者producer
producer.shutdown();
}
}
单向发送
/**
* 单向发送,只管发送,不管响应
* 例如日志收集场景
* @author
* @create 2023-04-08 20:22
*/
public class OneWayProducer {
public static void main(String[] args) throws Exception {
//创建消息生产者Producer,并制订生产者组名
DefaultMQProducer producer = new DefaultMQProducer("group1");
//指定NameServer地址
producer.setNamesrvAddr("localhost:9876");
//启动producer
producer.start();
//创建消息对象,指定主题topic,tag和消息体
for (int i = 0; i < 10; i++) {
/**
* 主题 topic
* 消息的tag
* 消息内容
*/
Message message = new Message("testRocketMQ","Tag1",("Hello RocketMQ"+i).getBytes(RemotingHelper.DEFAULT_CHARSET));
int index=i;
//发送消息
producer.sendOneway(message);
TimeUnit.SECONDS.sleep(1);//线程休息1s
}
//关闭生产者producer
producer.shutdown();
}
}
集群消费 /负载均衡模式消费
/**
* @author
* @create 2023-04-08 20:29
*/
public class Consumer {
public static void main(String[] args) throws Exception {
// 实例化消息生产者,指定组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
// 指定Namesrv地址信息.
consumer.setNamesrvAddr("localhost:9876");
// 订阅Topic
consumer.subscribe("testRocketMQ","*");
// consumer.subscribe("base","Tag1 || Tag2 || Tag3");
// consumer.subscribe("testRocketMQ","Tag1");
//负载均衡模式消费 集群模式消费
consumer.setMessageModel(MessageModel.CLUSTERING);
// consumer.setMessageModel(MessageModel.BROADCASTING);//广播模式
// 注册回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt ext : list) {
String str = new String(ext.getBody());
System.out.println(str);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
广播模式
consumer.setMessageModel(MessageModel.BROADCASTING);//广播模式
顺序消息的生产与消费
/**
* 顺序消息
*/
public class Producer {
public static void main(String[] args) throws Exception {
// 1.创建消息生产者producer ,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("group1");
// 2.制定NameServer地址
producer.setNamesrvAddr("192.168.40.147:9876");
// producer.setSendMsgTimeout(10000);
// 3.启动Producer
producer.start();
List<OrderStep> orderSteps = OrderStep.buildOrders();
for (int i=0;i<=orderSteps.size()+11;i++) {
//4.创建消息
Message message = new Message("OrderTopic","Order",
String.valueOf(orderSteps.get(i)).getBytes());
/**
* 参数一 消息对象
* 参数二 消息队列的选择器
* 参数三 选择队列的业务标识(订单ID)
*/
//5.发送消息
SendResult sendResult = producer.send(message, new MessageQueueSelector() {
/**
* @param list 队列集合
* @param message 消息对象
* @param obj 业务标识参数
* @return
*/
@Override
public MessageQueue select(List<MessageQueue> list, Message message, Object obj) {
long orderId = (long) obj;
int size = list.size();
int index = (int) (orderId % size);
return list.get(index);
}
}, orderSteps.get(i).getOrderId()); //orderSteps.get(i).getOrderId() 订单ID
}
// 6.关闭生产者Producer
producer.shutdown();
}
}
/**
* 顺序消息
*/
public class Consumer {
public static void main(String[] args) throws MQClientException {
//1.创建消费者Consumer,制定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
//2.指定Nameserver地址
consumer.setNamesrvAddr("192.168.40.147:9876");
//3.订阅主题Topic和Tag
consumer.subscribe("OrderTopic","*");
//4.注册消息监听器
consumer.registerMessageListener(new MessageListenerOrderly() {
//使用监听器消费消息
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
for (MessageExt message : list) {
System.out.println("消息内容"+new String(message.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
//5.启动消费者
consumer.start();
System.out.println("消费者启动");
}
}
/**
* 订单构建者
*/
public class OrderStep {
private long orderId;
private String desc;
public long getOrderId() {
return orderId;
}
public void setOrderId(long orderId) {
this.orderId = orderId;
}
public String getDesc() {
return desc;
}
public void setDesc(String desc) {
this.desc = desc;
}
@Override
public String toString() {
return "OrderStep{" +
"orderId=" + orderId +
", desc='" + desc + '\'' +
'}';
}
public static List<OrderStep> buildOrders() {
// 1039L : 创建 付款 推送 完成
// 1065L : 创建 付款
// 7235L :创建 付款
List<OrderStep> orderList = new ArrayList<OrderStep>();
OrderStep orderDemo = new OrderStep();
orderDemo.setOrderId(1039L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(1065L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(1039L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(7235L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(1065L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(7235L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(1065L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(1039L);
orderDemo.setDesc("推送");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(7235L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(1039L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
return orderList;
}
}
延时消息
/**
* 延时消息
* @author
* @create 2022-02-19 23:46
*/
public class Consumer {
public static void main(String[] args) throws MQClientException {
//1.创建消费者Consumer,制定消费者组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
//2.指定NameServer地址
consumer.setNamesrvAddr("192.168.40.147:9876");
//3.订阅主题Topic和Tag
consumer.subscribe("DelayTopic","*");
//4.设置回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
//接受消息内容
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt message : list) {
// System.out.println(new String(message.getBody()));
// Print approximate delay time period
System.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//5.启动消费者consumer
consumer.start();
System.out.println("消费者启动");
}
}
/**
* 延时消息
* @author
* @create 2022-02-19 22:52
*/
public class Producer {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
// 1.创建消息生产者producer ,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("group1");
// 2.制定NameServer地址
producer.setNamesrvAddr("192.168.40.147:9876");
// 3.启动Producer
producer.start();
// 4.创建消息对象,指定主题Topic、Tag和消息体
for (int i = 0; i < 10; i++) {
/**
* 消息主题 topic
* 消息tag
* 消息内容
*/
Message message = new Message("DelayTopic","Tag2",("渔阳+"+i).getBytes());
//延时级别 设置延时时间 //从1到18 依次设定
message.setDelayTimeLevel(2);
// 5.发送消息
producer.send(message);
System.out.println("=====================");
// System.out.printf("%s%n", result);
TimeUnit.SECONDS.sleep(1); //线程休眠
}
// 6.关闭生产者Producer
producer.shutdown();
}
}
批量消息
/**
* 批量消息
* @author
* @create 2022-02-19 23:46
*/
public class Consumer {
public static void main(String[] args) throws MQClientException {
//1.创建消费者Consumer,制定消费者组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
//2.指定NameServer地址
consumer.setNamesrvAddr("192.168.40.147:9876");
//3.订阅主题Topic和Tag
consumer.subscribe("BatchTest","*");
//4.设置回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
//接受消息内容
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt message : list) {
// System.out.println(new String(message.getBody()));
// Print approximate delay time period
System.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//5.启动消费者consumer
consumer.start();
System.out.println("消费者启动");
}
}
/**
* 批量消息
* 注意点:每次只发送不超过4MB的消息
* 如果消息的总长度可能超过4MB时,这时候最好把消息进行分割
* @author
* @create 2022-02-19 22:52
*/
public class Producer {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
// 1.创建消息生产者producer ,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("group1");
// 2.制定NameServer地址
producer.setNamesrvAddr("192.168.40.147:9876");
// 3.启动Producer
producer.start();
// 4.创建消息对象,指定主题Topic、Tag和消息体
String topic = "BatchTest";
List<Message> messageList = new ArrayList<>();
messageList.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
messageList.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
messageList.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
try {
//5.发生消息 //批量
producer.send(messageList);
} catch (Exception e) {
e.printStackTrace();
//处理error
}
// 6.关闭生产者Producer
producer.shutdown();
System.out.println("over");
}
}
过滤消息
sql模式
/**
* 使用tag 过滤 本质就是 消费者的tag要和生产者的tag一致
* 可以使用 "*" 代表所有tag
* @author
* @create 2022-02-20 14:52
*/
public class Consumer {
public static void main(String[] args) throws MQClientException {
//1.创建消费者Consumer,制定消费者组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
//2.指定NameServer地址
consumer.setNamesrvAddr("192.168.40.147:9876");
//3.订阅主题Topic和Tag
consumer.subscribe("FilterSQLTopic", MessageSelector.bySql("i>5"));
//4.设置回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
//接受消息内容
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//5.启动消费者consumer
consumer.start();
System.out.println("消费者启动");
}
}
/**
* 过滤消息
* 使用tag 过滤 本质就是 消费者的tag要和生产者的tag一致
* @author
* @create 2022-02-19 20:26
*/
public class Producer {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
// 1.创建消息生产者producer ,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("group1");
// 2.制定NameServer地址
producer.setNamesrvAddr("192.168.40.147:9876");
// producer.setSendMsgTimeout(10000);
// 3.启动Producer
producer.start();
// 4.创建消息对象,指定主题Topic、Tag和消息体
for (int i = 0; i < 10; i++) {
/**
* 参数一:消息主题Topic
* 参数二:消息Tag
* 参数三:消息内容
*/
Message message = new Message("FilterSQLTopic","Tag1",("渔阳+"+i).getBytes());
message.putUserProperty("i",String.valueOf(i));
//5.发送消息
SendResult result = producer.send(message);
//发送状态
SendStatus status = result.getSendStatus();
System.out.println("发送结果:" + result);
//线程睡1秒
TimeUnit.SECONDS.sleep(2);
}
// 6.关闭生产者Producer
producer.shutdown();
}
}
tag模式
/**
* 使用tag 过滤 本质就是 消费者的tag要和生产者的tag一致
* 可以使用 "*" 代表所有tag
* @author
* @create 2022-02-20 14:52
*/
public class Consumer {
public static void main(String[] args) throws MQClientException {
//1.创建消费者Consumer,制定消费者组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
//2.指定NameServer地址
consumer.setNamesrvAddr("192.168.40.147:9876");
//3.订阅主题Topic和Tag
consumer.subscribe("FilterTagTopic","Tag1 || Tag2");
//4.设置回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
//接受消息内容
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt message : list) {
System.out.println(new String(message.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//5.启动消费者consumer
consumer.start();
System.out.println("消费者启动");
}
}
/**
* 过滤消息
* 使用tag 过滤 本质就是 消费者的tag要和生产者的tag一致
* @author
* @create 2022-02-19 20:26
*/
public class Producer {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
// 1.创建消息生产者producer ,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("group1");
// 2.制定NameServer地址
producer.setNamesrvAddr("192.168.40.147:9876");
// producer.setSendMsgTimeout(10000);
// 3.启动Producer
producer.start();
//String [] tags = {"Tag1","Tag2","Tag3"};
// 4.创建消息对象,指定主题Topic、Tag和消息体
for (int i = 0; i < 10; i++) {
Message message = new Message("FilterTagTopic","Tag1",("渔阳+"+i).getBytes());
//可以写正则表达式
//Message message = new Message("FilterTagTopic",tags[i%tags.length],("渔阳+"+i).getBytes());
// 5.发送消息
SendResult result = producer.send(message);
//发送状态
SendStatus status = result.getSendStatus();
System.out.println("发送结果:"+result);
System.out.println("=====================");
// System.out.printf("%s%n", result);
}
// 6.关闭生产者Producer
producer.shutdown();
}
}
事务消息
/**
* 过滤消息
* 使用tag 过滤 本质就是 消费者的tag要和生产者的tag一致
* @author
* @create 2022-02-19 20:26
*/
public class Producer {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
// 1.创建消息生产者producer ,并制定生产者组名
TransactionMQProducer producer = new TransactionMQProducer("group5");
// 2.制定NameServer地址
producer.setNamesrvAddr("192.168.40.147:9876");
//添加事务监听器
producer.setTransactionListener(new TransactionListener() {
/**
* 在该方法中执行本地事务
* @param message
* @param o
* @return
*/
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
// if (message.getTags().equals("TAGA")){
if (StringUtils.equals("TAGA", message.getTags())) {
return LocalTransactionState.COMMIT_MESSAGE;
} else if (StringUtils.equals("TAGB", message.getTags())) {
return LocalTransactionState.ROLLBACK_MESSAGE;
} else if (StringUtils.equals("TAGC", message.getTags())) {
return LocalTransactionState.UNKNOW;
}
return LocalTransactionState.UNKNOW;
}
/**
* LocalTransactionState.UNKNOW
* 该方法时MQ进行消息事务状态回查
* @param messageExt
* @return
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
System.out.println("消息的Tag:" + messageExt.getTags());
return LocalTransactionState.COMMIT_MESSAGE;
}
});
// 3.启动Producer
producer.start();
String[] tags={"TAGA","TAGB","TAGC"};
// 4.创建消息对象,指定主题Topic、Tag和消息体
/**
* 参数一:消息主题Topic
* 参数二:消息Tag
* 参数三:消息内容
*/
for (int i = 0; i < 3; i++) {
Message message = new Message("TransactionTopic",tags[i],("渔阳+"+i).getBytes());
// 5.发送消息
SendResult result = producer.sendMessageInTransaction(message,null);
//发送状态
SendStatus status = result.getSendStatus();
System.out.println("发送结果:"+result);
System.out.println("=====================");
// System.out.printf("%s%n", result);
}
//6.关闭生产者producer
producer.shutdown();
}
}
/**
* 使用tag 过滤 本质就是 消费者的tag要和生产者的tag一致
* 可以使用 "*" 代表所有tag
* @author
* @create 2022-02-20 14:52
*/
public class Consumer {
public static void main(String[] args) throws MQClientException {
//1.创建消费者Consumer,制定消费者组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
//2.指定NameServer地址
consumer.setNamesrvAddr("192.168.40.147:9876");
//3.订阅主题Topic和Tag
consumer.subscribe("TransactionTopic","*");
//4.设置回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
//接受消息内容
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt message : list) {
System.out.println(new String(message.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//5.启动消费者consumer
consumer.start();
System.out.println("消费者启动");
}
}
刷盘机制
RocketMQ的消息是存储到磁盘上的,这样既能保证断电后恢复, 又可以让存储的消息量超出内存的限制。RocketMQ为了提高性能,会尽可能地保证磁盘的顺序写。消息在通过Producer写入RocketMQ的时 候,有两种写磁盘方式,分布式同步刷盘和异步刷盘。
1)同步刷盘
在返回写成功状态时,消息已经被写入磁盘。具体流程是,消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘, 然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写 成功的状态。
2)异步刷盘
在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘动作,快速写入。
####3)配置
同步刷盘还是异步刷盘,都是通过Broker配置文件里的flushDiskType 参数设置的,这个参数被配置成SYNC_FLUSH、ASYNC_FLUSH中的 一个。
RocketMQ生产者,消费者,Broker,NameServer高可用机制
Broker 高可用性:
主从复制
: 每个 Broker 可以配置多个副本(Replica),其中一个副本为主副本(Master),其余的为从副本(Slave)。主副本负责处理消息的写入和读取请求,从副本通过复制主副本的数据来实现数据的冗余备份。数据持久化
: RocketMQ 使用磁盘持久化消息数据,确保即使在 Broker 重启或故障发生时,数据也能够安全地恢复。
NameServer 高可用性:
集群部署
: RocketMQ 支持多个 NameServer 的集群部署,客户端可以连接到任意一个可用的 NameServer,从而实现故障转移和负载均衡。
心跳检测
: NameServer 会定期发送心跳检测给 Broker 和其他 NameServer,以检测它们的健康状态。
消费者 天生就是一个高可用的保证
生产者 通过将broker搭建成双主双重的方式(投递消息时保证一定能投递成功)
消息存储(持久化机制)
存储介质
关系型数据库 DB
Apache下的ActiveMQ采用的就是JDBC的方式
文件系统
RocketMQ/Kafka/RabbitMQ均采用的是消息刷盘至所部署虚拟机/物理机的文件系统来做持久化(刷盘分为同步刷盘和异步刷盘两种方式)
RocketMQ写操作利用了顺序写的方式保证了消息存储的速度
读操作利用零拷贝 提高消息存盘和网络发送的速度
消息重试
顺序消息的重试
对于顺序消息,当消费者消费消息失败后,消息队列 RocketMQ 会自动不断进行消息重试(每次间隔时间为 1 秒),这时,应用会出现消息消费被阻塞的情况。因此,在使用顺序消息时,务必保证应用能够及时监控并处理消费失败的情况,避免阻塞现象的发生。
无序消息的重试
对于无序消息(普通、定时、延时、事务消息),当消费者消费消息失败时,您可以通过设置返回状态达到消息重试的结果。
无序消息的重试只针对集群消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息。
1)重试次数
消息队列 RocketMQ 默认允许每条消息最多重试 16 次,每次重试的间隔时间如下:
死信队列
当一条消息初次消费失败,消息队列 RocketMQ 会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 RocketMQ 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。
在消息队列 RocketMQ 中,这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。
死信特性
死信消息具有以下特性
- 不会再被消费者正常消费。
- 有效期与正常消息相同,均为 3 天,3 天后会被自动删除。因此,请在死信消息产生后的 3 天内及时处理。
死信队列具有以下特性:
- 一个死信队列对应一个 Group ID, 而不是对应单个消费者实例。
- 如果一个 Group ID 未产生死信消息,消息队列 RocketMQ 不会为其创建相应的死信队列。
- 一个死信队列包含了对应 Group ID 产生的所有死信消息,不论该消息属于哪个 Topic。
消费幂等
消息队列 RocketMQ 消费者在接收到消息以后,有必要根据业务上的唯一 Key 对消息做幂等处理的必要性。
消费幂等的必要性
在互联网应用中,尤其在网络不稳定的情况下,消息队列 RocketMQ 的消息有可能会出现重复,这个重复简单可以概括为以下情况:
-
发送时消息重复
当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败。 如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。
-
投递时消息重复
消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。 为了保证消息至少被消费一次,消息队列 RocketMQ 的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。
-
负载均衡时消息重复(包括但不限于网络抖动、Broker 重启以及订阅方应用重启)
当消息队列 RocketMQ 的 Broker 或客户端重启、扩容或缩容时,会触发 Rebalance,此时消费者可能会收到重复消息。
处理方式
因为 Message ID 有可能出现冲突(重复)的情况,所以真正安全的幂等处理,不建议以 Message ID 作为处理依据。 最好的方式是以业务唯一标识作为幂等处理的关键依据
ROCKETMQ事务消息
事务消息默认补偿15次
事务消息大致流程
生产者发送一个半消息,然后执行本地事务,成功进行提交让消费者消费,反之回滚
SpringBoot整合RocketMQ
将rocketmq-spring安装到本地仓库
mvn install -Dmaven.skip.test=true
<!--RocketMQ-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>${rocketmq-spring-boot-starter-version}</version>
</dependency>
生产者配置文件
# application.yaml
rocketmq:
consumer:
group: springboot_consumer_group
# 一次拉取消息最大值,注意是拉取消息的最大值而非消费最大值
pull-batch-size: 10
name-server: 192.175.25.135:9876;192.175.25.138:9876
producer:
# 发送同一类消息的设置为同一个group,保证唯一
group: my-group
# 发送消息超时时间,默认3000
sendMessageTimeout: 10000
# 发送消息失败重试次数,默认2
retryTimesWhenSendFailed: 2
# 异步消息重试此处,默认2
retryTimesWhenSendAsyncFailed: 2
# 消息最大长度,默认1024 * 1024 * 4(默认4M)
maxMessageSize: 4096
# 压缩消息阈值,默认4k(1024 * 4)
compressMessageBodyThreshold: 4096
# 是否在内部发送失败时重试另一个broker,默认false
retryNextServer: false
测试类
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Test
public void test1(){
rocketMQTemplate.convertAndSend("springboot-mq","hello springboot rocketmq");
//Message<String> msg = MessageBuilder.withPayload("Hello,RocketMQ").build();
//SendResult sendResult = rocketMQTemplate.send(topic, msg);
}
消费者同消息生产者
消费者同消息生产者配置文件
测试类
@Slf4j
@Component
@RocketMQMessageListener(topic = "springboot-mq",consumerGroup = "springboot-mq-consumer-1")
public class Consumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
log.info("Receive message:"+message);
}
}
实际生产中
@Override
public Result confirmOrder(TradeOrder order) {
//1.校验订单
//2.生成预订单
try {
//3.扣减库存
//4.扣减优惠券
//5.使用余额
//6.确认订单
//7.返回成功状态
return new Result(Success,Code);
} catch (Exception e) {
//1.确认订单失败,发送消息
MQEntity mqEntity = new MQEntity();
mqEntity.setOrderId(orderId);
mqEntity.setUserId(order.getUserId());
mqEntity.setUserMoney(order.getMoneyPaid());
mqEntity.setGoodsId(order.getGoodsId());
mqEntity.setGoodsNum(order.getGoodsNumber());
mqEntity.setCouponId(order.getCouponId());
//2.返回订单确认失败消息
try {
sendCancelOrder(topic,tag,order.getOrderId().toString(), JSON.toJSONString(mqEntity));
} catch (Exception e1) {
e1.printStackTrace();
}
return new Result(Fail,Code);
}
}
public class MQEntity {
private Long orderId;
private Long couponId;
private Long userId;
private BigDecimal userMoney;
private Long goodsId;
private Integer goodsNum;
//set get
}
/**
* 发送订单确认失败消息
* @param topic
* @param tag
* @param keys
* @param body
*/
private void sendCancelOrder(String topic, String tag, String keys, String body) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
//判断Topic是否为空
if (StringUtils.isEmpty(topic)) {
CastException.cast(ShopCode.SHOP_MQ_TOPIC_IS_EMPTY);
}
//判断消息内容是否为空
if (StringUtils.isEmpty(body)) {
CastException.cast(ShopCode.SHOP_MQ_MESSAGE_BODY_IS_EMPTY);
}
//消息体
Message message = new Message(topic,tag,keys,body.getBytes());
rocketMQTemplate.getProducer().send(message);
}
消费者
rocketmq.name-server=192.168.25.135:9876;192.168.25.138:9876
mq.order.consumer.group.name=order_orderTopic_cancel_group
mq.order.topic=orderTopic
@Slf4j
@Component
@RocketMQMessageListener(topic = "${mq.pay.topic}",
consumerGroup = "${mq.pay.consumer.group.name}",
messageModel = MessageModel.BROADCASTING)
public class CancelOrderConsumer implements RocketMQListener<MessageExt>{
@Override
public void onMessage(MessageExt messageExt) {
try {
//1.解析消息内容
String body = new String(messageExt.getBody(),"UTF-8");
//TradePay tradePay = JSON.parseObject(body,TradePay.class);
MQEntity mqEntity = JSON.parseObject(body, MQEntity.class);
//2.业务内容
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
线程池方式优化
executorService.submit(new Runnable() {
@Override
public void run() {
try {
SendResult sendResult = sendMessage(topic,
tag,
finalTradePay.getPayId(),
JSON.toJSONString(finalTradePay));
log.info(JSON.toJSONString(sendResult));
if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
mqProducerTempMapper.deleteByPrimaryKey(mqProducerTemp.getId());
System.out.println("删除消息表成功");
}
} catch (Exception e) {
e.printStackTrace();
}
}
});
} else {
CastException.cast(ShopCode.SHOP_PAYMENT_IS_PAID);
}
}
@Bean
public ThreadPoolTaskExecutor getThreadPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setMaxPoolSize(8);
executor.setQueueCapacity(100);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("Pool-A");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
雪花算法
时间搓+机器号+序列号
时间戳: 雪花算法的64位ID中,首先占用了41位用于存储时间戳。这个时间戳记录了生成ID的时间,精确到毫秒级。
机器ID: 接下来的10位用于存储机器的唯一ID。在分布式系统中,不同机器需要有唯一的标识符,以避免产生重复的ID。
序列号: 最后的13位用于存储序列号。在同一毫秒内,可以生成多个ID。序列号是递增的,可以用来解决同一毫秒内生成多个ID时的冲突问题。
雪花算法的特点:文章来源:https://www.toymoban.com/news/detail-633412.html
可以在分布式环境中生成唯一ID,避免了重复ID的问题。
生成的ID是递增的,有序的,方便数据库索引和查询。
生成ID的速度很快,基本上可以达到每毫秒生成数万个ID。
雪花算法的实现相对简单,不需要依赖其他外部组件。
需要注意的是,雪花算法并不保证绝对的全局唯一性,因为时钟回拨、机器ID重用等问题都可能导致ID重复。在实际使用中,应该根据具体情况来进行适当的配置和处理,以确保ID的唯一性。另外,随着分布式系统的扩展,机器ID的分配和管理也需要注意,避免冲突和重复。文章来源地址https://www.toymoban.com/news/detail-633412.html
到了这里,关于RocketMQ使用的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!