RocketMQ使用

这篇具有很好参考价值的文章主要介绍了RocketMQ使用。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

RocketMQ角色
RocketMQ使用,学习笔记,rocketmq

RocketMQ的基本概念
RocketMQ使用,学习笔记,rocketmq

同步发送消息

/**
 * @author
 * @create 2023-04-08 17:24
 * 发送同步消息
 * 适用于重要的消息 例如消息通知,短信通知
 */
public class SyncProducer {
    public static void main(String[] args) throws Exception {
        String topic="testRocketMQ";
        String tag="Tag1";
        String body="Hello RocketMQ";
        //创建消息生产者Producer,并制订生产者组名
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        //指定NameServer地址
        producer.setNamesrvAddr("localhost:9876");
        //启动producer
        producer.start();
        //创建消息对象,指定主题topic,tag和消息体
        for (int i = 0; i < 10; i++) {
            /**
             * 主题 topic
             * 消息的tag
             * 消息内容
             */
            Message message = new Message("testRocketMQ","Tag1",("Hello RocketMQ"+i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            //发送消息
            SendResult result = producer.send(message,1000);
            //发送状态
            SendStatus status = result.getSendStatus();
            System.out.println("发送状态->"+status);
            String msgId = result.getMsgId();
            System.out.println("消息id->"+msgId);
            int queueId = result.getMessageQueue().getQueueId();
            System.out.println("消息接收队列id"+queueId);
            TimeUnit.SECONDS.sleep(1);//线程休息1s
        }
        //关闭生产者producer
        producer.shutdown();
    }
}

异步发送

/**
 * @author
 * @create 2023-04-08 19:52
 * 发送异步消息,异步监听,通过回调接口的方式接受服务端的响应
 * 适用于数据量较大,不能容忍阻塞场景的
 */
public class ASyncProducer {
    public static void main(String[] args) throws Exception {
//创建消息生产者Producer,并制订生产者组名
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        //指定NameServer地址
        producer.setNamesrvAddr("localhost:9876");
        //启动producer
        producer.start();
        //创建消息对象,指定主题topic,tag和消息体
        for (int i = 0; i < 10; i++) {
            /**
             * 主题 topic
             * 消息的tag
             * 消息内容
             */
            Message message = new Message("testRocketMQ","Tag1",("Hello RocketMQ"+i).getBytes(RemotingHelper.DEFAULT_CHARSET));

            int index=i;
            //发送消息
             producer.send(message, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.printf("%-10d OK %s %n", index,
                            sendResult.getMsgId());
                    System.out.println("发送成功"+sendResult);
                }

                @Override
                public void onException(Throwable throwable) {
                    System.out.printf("%-10d Exception %s %n", index, throwable);
                }
            });

            TimeUnit.SECONDS.sleep(1);//线程休息1s
        }
        //关闭生产者producer
        producer.shutdown();
    }

}

单向发送

/**
 * 单向发送,只管发送,不管响应
 * 例如日志收集场景
 * @author
 * @create 2023-04-08 20:22
 */
public class OneWayProducer {
    public static void main(String[] args) throws Exception {
        //创建消息生产者Producer,并制订生产者组名
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        //指定NameServer地址
        producer.setNamesrvAddr("localhost:9876");
        //启动producer
        producer.start();
        //创建消息对象,指定主题topic,tag和消息体
        for (int i = 0; i < 10; i++) {
            /**
             * 主题 topic
             * 消息的tag
             * 消息内容
             */
            Message message = new Message("testRocketMQ","Tag1",("Hello RocketMQ"+i).getBytes(RemotingHelper.DEFAULT_CHARSET));

            int index=i;
            //发送消息
            producer.sendOneway(message);

            TimeUnit.SECONDS.sleep(1);//线程休息1s
        }
        //关闭生产者producer
        producer.shutdown();
    }
}

集群消费 /负载均衡模式消费

/**
 * @author
 * @create 2023-04-08 20:29
 */
public class Consumer {
    public static void main(String[] args) throws Exception {
        // 实例化消息生产者,指定组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        // 指定Namesrv地址信息.
        consumer.setNamesrvAddr("localhost:9876");
        // 订阅Topic
        consumer.subscribe("testRocketMQ","*");
//        consumer.subscribe("base","Tag1 || Tag2 || Tag3");
//        consumer.subscribe("testRocketMQ","Tag1");
        //负载均衡模式消费  集群模式消费
        consumer.setMessageModel(MessageModel.CLUSTERING);
//        consumer.setMessageModel(MessageModel.BROADCASTING);//广播模式
        // 注册回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt ext : list) {
                    String str = new String(ext.getBody());
                    System.out.println(str);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }
}

广播模式

        consumer.setMessageModel(MessageModel.BROADCASTING);//广播模式

顺序消息的生产与消费

/**
 * 顺序消息
 */
public class Producer {
    public static void main(String[] args) throws Exception {
//        1.创建消息生产者producer ,并制定生产者组名
        DefaultMQProducer producer = new DefaultMQProducer("group1");
//        2.制定NameServer地址
        producer.setNamesrvAddr("192.168.40.147:9876");
//        producer.setSendMsgTimeout(10000);
//        3.启动Producer
        producer.start();

        List<OrderStep> orderSteps = OrderStep.buildOrders();
        for (int i=0;i<=orderSteps.size()+11;i++) {
            //4.创建消息
            Message message = new Message("OrderTopic","Order",
                    String.valueOf(orderSteps.get(i)).getBytes());

            /**
             * 参数一 消息对象
             * 参数二 消息队列的选择器
             * 参数三 选择队列的业务标识(订单ID)
             */
            //5.发送消息
            SendResult sendResult = producer.send(message, new MessageQueueSelector() {
                /**
                 * @param list  队列集合
                 * @param message  消息对象
                 * @param obj  业务标识参数
                 * @return
                 */
                @Override
                public MessageQueue select(List<MessageQueue> list, Message message, Object obj) {
                    long orderId = (long) obj;
                    int size = list.size();
                    int index = (int) (orderId % size);
                    return list.get(index);
                }
            }, orderSteps.get(i).getOrderId());  //orderSteps.get(i).getOrderId() 订单ID
        }
//        6.关闭生产者Producer
        producer.shutdown();
    }
}
/**
 * 顺序消息
 */
public class Consumer {
    public static void main(String[] args) throws MQClientException {
        //1.创建消费者Consumer,制定消费者组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        //2.指定Nameserver地址
        consumer.setNamesrvAddr("192.168.40.147:9876");
        //3.订阅主题Topic和Tag
        consumer.subscribe("OrderTopic","*");
        //4.注册消息监听器
        consumer.registerMessageListener(new MessageListenerOrderly() {
            //使用监听器消费消息
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
                for (MessageExt message : list) {
                    System.out.println("消息内容"+new String(message.getBody()));
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        //5.启动消费者
        consumer.start();
        System.out.println("消费者启动");
    }
}
/**
 * 订单构建者
 */
public class OrderStep {
    private long orderId;
    private String desc;

    public long getOrderId() {
        return orderId;
    }

    public void setOrderId(long orderId) {
        this.orderId = orderId;
    }

    public String getDesc() {
        return desc;
    }

    public void setDesc(String desc) {
        this.desc = desc;
    }

    @Override
    public String toString() {
        return "OrderStep{" +
                "orderId=" + orderId +
                ", desc='" + desc + '\'' +
                '}';
    }
    public static List<OrderStep> buildOrders() {
        //  1039L   : 创建    付款 推送 完成
        //  1065L   : 创建   付款
        //  7235L   :创建    付款
        List<OrderStep> orderList = new ArrayList<OrderStep>();
        OrderStep orderDemo = new OrderStep();
        orderDemo.setOrderId(1039L);
        orderDemo.setDesc("创建");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(1065L);
        orderDemo.setDesc("创建");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(1039L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(7235L);
        orderDemo.setDesc("创建");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(1065L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(7235L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(1065L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(1039L);
        orderDemo.setDesc("推送");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(7235L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(1039L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);
        return orderList;
    }
}

延时消息

/**
 * 延时消息
 * @author
 * @create 2022-02-19 23:46
 */
public class Consumer {
    public static void main(String[] args) throws MQClientException {
        //1.创建消费者Consumer,制定消费者组
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        //2.指定NameServer地址
        consumer.setNamesrvAddr("192.168.40.147:9876");
        //3.订阅主题Topic和Tag
        consumer.subscribe("DelayTopic","*");

        //4.设置回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            //接受消息内容
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt message : list) {
//                    System.out.println(new String(message.getBody()));
                    // Print approximate delay time period
                    System.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");

                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //5.启动消费者consumer
        consumer.start();
        System.out.println("消费者启动");
    }
}
/**
 * 延时消息
 * @author
 * @create 2022-02-19 22:52
 */
public class Producer {
    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
//        1.创建消息生产者producer ,并制定生产者组名
        DefaultMQProducer producer = new DefaultMQProducer("group1");
//        2.制定NameServer地址
        producer.setNamesrvAddr("192.168.40.147:9876");
//        3.启动Producer
        producer.start();
//        4.创建消息对象,指定主题Topic、Tag和消息体
        for (int i = 0; i < 10; i++) {
            /**
             * 消息主题 topic
             * 消息tag
             * 消息内容
             */
            Message message = new Message("DelayTopic","Tag2",("渔阳+"+i).getBytes());
            //延时级别   设置延时时间   //从1到18 依次设定
            message.setDelayTimeLevel(2);
            
//        5.发送消息
            producer.send(message);
            System.out.println("=====================");
//            System.out.printf("%s%n", result);
            TimeUnit.SECONDS.sleep(1);  //线程休眠
        }
//        6.关闭生产者Producer
        producer.shutdown();
    }
}

批量消息

/**
 * 批量消息
 * @author
 * @create 2022-02-19 23:46
 */
public class Consumer {
    public static void main(String[] args) throws MQClientException {
        //1.创建消费者Consumer,制定消费者组
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        //2.指定NameServer地址
        consumer.setNamesrvAddr("192.168.40.147:9876");
        //3.订阅主题Topic和Tag
        consumer.subscribe("BatchTest","*");
        //4.设置回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            //接受消息内容
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt message : list) {
//                    System.out.println(new String(message.getBody()));
                    // Print approximate delay time period
                    System.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //5.启动消费者consumer
        consumer.start();
        System.out.println("消费者启动");
    }
}
/**
 * 批量消息
 * 注意点:每次只发送不超过4MB的消息
 * 如果消息的总长度可能超过4MB时,这时候最好把消息进行分割
 * @author
 * @create 2022-02-19 22:52
 */
public class Producer {
    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
//        1.创建消息生产者producer ,并制定生产者组名
        DefaultMQProducer producer = new DefaultMQProducer("group1");
//        2.制定NameServer地址
        producer.setNamesrvAddr("192.168.40.147:9876");
//        3.启动Producer
        producer.start();
//        4.创建消息对象,指定主题Topic、Tag和消息体
        String topic = "BatchTest";
        List<Message> messageList = new ArrayList<>();
        messageList.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
        messageList.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
        messageList.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
        try {
            //5.发生消息   //批量
            producer.send(messageList);
        } catch (Exception e) {
            e.printStackTrace();
            //处理error
        }
//        6.关闭生产者Producer
        producer.shutdown();
        System.out.println("over");
    }
}

过滤消息
sql模式

/**
 * 使用tag  过滤  本质就是  消费者的tag要和生产者的tag一致
 * 可以使用 "*"  代表所有tag
 * @author
 * @create 2022-02-20 14:52
 */
public class Consumer {
    public static void main(String[] args) throws MQClientException {
        //1.创建消费者Consumer,制定消费者组
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        //2.指定NameServer地址
        consumer.setNamesrvAddr("192.168.40.147:9876");
        //3.订阅主题Topic和Tag
        consumer.subscribe("FilterSQLTopic", MessageSelector.bySql("i>5"));
        //4.设置回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            //接受消息内容
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //5.启动消费者consumer
        consumer.start();
        System.out.println("消费者启动");
    }
}
/**
 * 过滤消息
 * 使用tag  过滤  本质就是  消费者的tag要和生产者的tag一致
 * @author
 * @create 2022-02-19 20:26
 */
public class Producer {
    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
//        1.创建消息生产者producer ,并制定生产者组名
        DefaultMQProducer producer = new DefaultMQProducer("group1");
//        2.制定NameServer地址
        producer.setNamesrvAddr("192.168.40.147:9876");
//        producer.setSendMsgTimeout(10000);
//        3.启动Producer
        producer.start();
//        4.创建消息对象,指定主题Topic、Tag和消息体
        for (int i = 0; i < 10; i++) {
            /**
             * 参数一:消息主题Topic
             * 参数二:消息Tag
             * 参数三:消息内容
             */
            Message message = new Message("FilterSQLTopic","Tag1",("渔阳+"+i).getBytes());
            message.putUserProperty("i",String.valueOf(i));
            //5.发送消息
            SendResult result = producer.send(message);
            //发送状态
            SendStatus status = result.getSendStatus();
            System.out.println("发送结果:" + result);
            //线程睡1秒
            TimeUnit.SECONDS.sleep(2);
        }
//        6.关闭生产者Producer
        producer.shutdown();
    }

}

tag模式

/**
 * 使用tag  过滤  本质就是  消费者的tag要和生产者的tag一致
 * 可以使用 "*"  代表所有tag
 * @author
 * @create 2022-02-20 14:52
 */
public class Consumer {
    public static void main(String[] args) throws MQClientException {
        //1.创建消费者Consumer,制定消费者组
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        //2.指定NameServer地址
        consumer.setNamesrvAddr("192.168.40.147:9876");
        //3.订阅主题Topic和Tag
        consumer.subscribe("FilterTagTopic","Tag1  || Tag2");
        //4.设置回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            //接受消息内容
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt message : list) {
                    System.out.println(new String(message.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //5.启动消费者consumer
        consumer.start();
        System.out.println("消费者启动");
    }
}
/**
 * 过滤消息
 * 使用tag  过滤  本质就是  消费者的tag要和生产者的tag一致
 * @author
 * @create 2022-02-19 20:26
 */
public class Producer {
    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
//        1.创建消息生产者producer ,并制定生产者组名
        DefaultMQProducer producer = new DefaultMQProducer("group1");
//        2.制定NameServer地址
        producer.setNamesrvAddr("192.168.40.147:9876");
//        producer.setSendMsgTimeout(10000);
//        3.启动Producer
        producer.start();

        //String [] tags = {"Tag1","Tag2","Tag3"};
//        4.创建消息对象,指定主题Topic、Tag和消息体
        for (int i = 0; i < 10; i++) {

            Message message = new Message("FilterTagTopic","Tag1",("渔阳+"+i).getBytes());
            //可以写正则表达式
			//Message message = new Message("FilterTagTopic",tags[i%tags.length],("渔阳+"+i).getBytes());
//        5.发送消息
            SendResult result = producer.send(message);
        //发送状态
            SendStatus status = result.getSendStatus();

            System.out.println("发送结果:"+result);
            System.out.println("=====================");
//            System.out.printf("%s%n", result);
        }
//        6.关闭生产者Producer
        producer.shutdown();
    }
}

事务消息

/**
 * 过滤消息
 * 使用tag  过滤  本质就是  消费者的tag要和生产者的tag一致
 * @author
 * @create 2022-02-19 20:26
 */
public class Producer {
    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
//        1.创建消息生产者producer ,并制定生产者组名
        TransactionMQProducer producer = new TransactionMQProducer("group5");
//        2.制定NameServer地址
        producer.setNamesrvAddr("192.168.40.147:9876");

        //添加事务监听器
        producer.setTransactionListener(new TransactionListener() {
            /**
             * 在该方法中执行本地事务
             * @param message
             * @param o
             * @return
             */
            @Override
            public LocalTransactionState executeLocalTransaction(Message message, Object o) {
//                if (message.getTags().equals("TAGA")){
                if (StringUtils.equals("TAGA", message.getTags())) {
                    return LocalTransactionState.COMMIT_MESSAGE;
                } else if (StringUtils.equals("TAGB", message.getTags())) {
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                } else if (StringUtils.equals("TAGC", message.getTags())) {
                    return LocalTransactionState.UNKNOW;
                }
                return LocalTransactionState.UNKNOW;
            }

            /**
             * LocalTransactionState.UNKNOW
             * 该方法时MQ进行消息事务状态回查
             * @param messageExt
             * @return
             */
            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
                System.out.println("消息的Tag:" + messageExt.getTags());
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        });

//        3.启动Producer
        producer.start();

        String[] tags={"TAGA","TAGB","TAGC"};
//        4.创建消息对象,指定主题Topic、Tag和消息体
        /**
         * 参数一:消息主题Topic
         * 参数二:消息Tag
         * 参数三:消息内容
         */
        for (int i = 0; i < 3; i++) {

            Message message = new Message("TransactionTopic",tags[i],("渔阳+"+i).getBytes());

//        5.发送消息
            SendResult result = producer.sendMessageInTransaction(message,null);
        //发送状态
            SendStatus status = result.getSendStatus();

            System.out.println("发送结果:"+result);
            System.out.println("=====================");
//            System.out.printf("%s%n", result);


        }
        //6.关闭生产者producer
        producer.shutdown();
    }
}
/**
 * 使用tag  过滤  本质就是  消费者的tag要和生产者的tag一致
 * 可以使用 "*"  代表所有tag
 * @author
 * @create 2022-02-20 14:52
 */
public class Consumer {
    public static void main(String[] args) throws MQClientException {
        //1.创建消费者Consumer,制定消费者组
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        //2.指定NameServer地址
        consumer.setNamesrvAddr("192.168.40.147:9876");
        //3.订阅主题Topic和Tag
        consumer.subscribe("TransactionTopic","*");

        //4.设置回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            //接受消息内容
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt message : list) {
                    System.out.println(new String(message.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //5.启动消费者consumer
        consumer.start();
        System.out.println("消费者启动");
    }
}

刷盘机制
RocketMQ的消息是存储到磁盘上的,这样既能保证断电后恢复, 又可以让存储的消息量超出内存的限制。RocketMQ为了提高性能,会尽可能地保证磁盘的顺序写。消息在通过Producer写入RocketMQ的时 候,有两种写磁盘方式,分布式同步刷盘和异步刷盘。
RocketMQ使用,学习笔记,rocketmq

1)同步刷盘

在返回写成功状态时,消息已经被写入磁盘。具体流程是,消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘, 然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写 成功的状态。

2)异步刷盘

在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘动作,快速写入。

####3)配置

同步刷盘还是异步刷盘,都是通过Broker配置文件里的flushDiskType 参数设置的,这个参数被配置成SYNC_FLUSH、ASYNC_FLUSH中的 一个。

RocketMQ生产者,消费者,Broker,NameServer高可用机制
Broker 高可用性:

主从复制: 每个 Broker 可以配置多个副本(Replica),其中一个副本为主副本(Master),其余的为从副本(Slave)。主副本负责处理消息的写入和读取请求,从副本通过复制主副本的数据来实现数据的冗余备份。
数据持久化: RocketMQ 使用磁盘持久化消息数据,确保即使在 Broker 重启或故障发生时,数据也能够安全地恢复。

NameServer 高可用性:

集群部署: RocketMQ 支持多个 NameServer 的集群部署,客户端可以连接到任意一个可用的 NameServer,从而实现故障转移和负载均衡。

心跳检测: NameServer 会定期发送心跳检测给 Broker 和其他 NameServer,以检测它们的健康状态。

消费者 天生就是一个高可用的保证
生产者 通过将broker搭建成双主双重的方式(投递消息时保证一定能投递成功)

消息存储(持久化机制)
存储介质
关系型数据库 DB
Apache下的ActiveMQ采用的就是JDBC的方式
文件系统
RocketMQ/Kafka/RabbitMQ均采用的是消息刷盘至所部署虚拟机/物理机的文件系统来做持久化(刷盘分为同步刷盘和异步刷盘两种方式)

RocketMQ写操作利用了顺序写的方式保证了消息存储的速度
读操作利用零拷贝 提高消息存盘和网络发送的速度

消息重试

顺序消息的重试

对于顺序消息,当消费者消费消息失败后,消息队列 RocketMQ 会自动不断进行消息重试(每次间隔时间为 1 秒),这时,应用会出现消息消费被阻塞的情况。因此,在使用顺序消息时,务必保证应用能够及时监控并处理消费失败的情况,避免阻塞现象的发生。

无序消息的重试

对于无序消息(普通、定时、延时、事务消息),当消费者消费消息失败时,您可以通过设置返回状态达到消息重试的结果。

无序消息的重试只针对集群消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息。

1)重试次数

消息队列 RocketMQ 默认允许每条消息最多重试 16 次,每次重试的间隔时间如下:

死信队列

当一条消息初次消费失败,消息队列 RocketMQ 会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 RocketMQ 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。

在消息队列 RocketMQ 中,这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。

死信特性

死信消息具有以下特性

  • 不会再被消费者正常消费。
  • 有效期与正常消息相同,均为 3 天,3 天后会被自动删除。因此,请在死信消息产生后的 3 天内及时处理。

死信队列具有以下特性:

  • 一个死信队列对应一个 Group ID, 而不是对应单个消费者实例。
  • 如果一个 Group ID 未产生死信消息,消息队列 RocketMQ 不会为其创建相应的死信队列。
  • 一个死信队列包含了对应 Group ID 产生的所有死信消息,不论该消息属于哪个 Topic。

消费幂等

消息队列 RocketMQ 消费者在接收到消息以后,有必要根据业务上的唯一 Key 对消息做幂等处理的必要性。

消费幂等的必要性

在互联网应用中,尤其在网络不稳定的情况下,消息队列 RocketMQ 的消息有可能会出现重复,这个重复简单可以概括为以下情况:

  • 发送时消息重复

    当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败。 如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。

  • 投递时消息重复

    消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。 为了保证消息至少被消费一次,消息队列 RocketMQ 的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。

  • 负载均衡时消息重复(包括但不限于网络抖动、Broker 重启以及订阅方应用重启)

    当消息队列 RocketMQ 的 Broker 或客户端重启、扩容或缩容时,会触发 Rebalance,此时消费者可能会收到重复消息。

处理方式

因为 Message ID 有可能出现冲突(重复)的情况,所以真正安全的幂等处理,不建议以 Message ID 作为处理依据。 最好的方式是以业务唯一标识作为幂等处理的关键依据

ROCKETMQ事务消息

事务消息默认补偿15次

事务消息大致流程
生产者发送一个半消息,然后执行本地事务,成功进行提交让消费者消费,反之回滚

SpringBoot整合RocketMQ

将rocketmq-spring安装到本地仓库

mvn install -Dmaven.skip.test=true
        <!--RocketMQ-->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>${rocketmq-spring-boot-starter-version}</version>
        </dependency>

生产者配置文件

# application.yaml

rocketmq:
    consumer:
        group: springboot_consumer_group
        # 一次拉取消息最大值,注意是拉取消息的最大值而非消费最大值
        pull-batch-size: 10
    name-server: 192.175.25.135:9876;192.175.25.138:9876
    producer:
        # 发送同一类消息的设置为同一个group,保证唯一
        group: my-group
        # 发送消息超时时间,默认3000
        sendMessageTimeout: 10000
        # 发送消息失败重试次数,默认2
        retryTimesWhenSendFailed: 2
        # 异步消息重试此处,默认2
        retryTimesWhenSendAsyncFailed: 2
        # 消息最大长度,默认1024 * 1024 * 4(默认4M)
        maxMessageSize: 4096
        # 压缩消息阈值,默认4k(1024 * 4)
        compressMessageBodyThreshold: 4096
        # 是否在内部发送失败时重试另一个broker,默认false
        retryNextServer: false


测试类

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Test
    public void test1(){
        rocketMQTemplate.convertAndSend("springboot-mq","hello springboot rocketmq");
        //Message<String> msg = MessageBuilder.withPayload("Hello,RocketMQ").build();
	    //SendResult sendResult = rocketMQTemplate.send(topic, msg);
    }

消费者同消息生产者
消费者同消息生产者配置文件
测试类

@Slf4j
@Component
@RocketMQMessageListener(topic = "springboot-mq",consumerGroup = "springboot-mq-consumer-1")
public class Consumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        log.info("Receive message:"+message);
    }
}

实际生产中

	@Override
    public Result confirmOrder(TradeOrder order) {
        //1.校验订单

        //2.生成预订单
        
        try {
            //3.扣减库存
            
            //4.扣减优惠券

            //5.使用余额

            //6.确认订单

            //7.返回成功状态
            return new Result(Success,Code);
        } catch (Exception e) {
            //1.确认订单失败,发送消息
            MQEntity mqEntity = new MQEntity();
            mqEntity.setOrderId(orderId);
            mqEntity.setUserId(order.getUserId());
            mqEntity.setUserMoney(order.getMoneyPaid());
            mqEntity.setGoodsId(order.getGoodsId());
            mqEntity.setGoodsNum(order.getGoodsNumber());
            mqEntity.setCouponId(order.getCouponId());

            //2.返回订单确认失败消息
            try {
                sendCancelOrder(topic,tag,order.getOrderId().toString(), JSON.toJSONString(mqEntity));
            } catch (Exception e1) {
                e1.printStackTrace();
            }
            return new Result(Fail,Code);
        }
    }

	public class MQEntity {
	
	    private Long orderId;
	    private Long couponId;
	    private Long userId;
	    private BigDecimal userMoney;
	    private Long goodsId;
	    private Integer goodsNum;
		//set  get
	}

	/**
     * 发送订单确认失败消息
     * @param topic
     * @param tag
     * @param keys
     * @param body
     */
    private void sendCancelOrder(String topic, String tag, String keys, String body) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
        //判断Topic是否为空
        if (StringUtils.isEmpty(topic)) {
            CastException.cast(ShopCode.SHOP_MQ_TOPIC_IS_EMPTY);
        }
        //判断消息内容是否为空
        if (StringUtils.isEmpty(body)) {
            CastException.cast(ShopCode.SHOP_MQ_MESSAGE_BODY_IS_EMPTY);
        }
        //消息体
        Message message = new Message(topic,tag,keys,body.getBytes());
        rocketMQTemplate.getProducer().send(message);
    }

消费者

rocketmq.name-server=192.168.25.135:9876;192.168.25.138:9876
mq.order.consumer.group.name=order_orderTopic_cancel_group
mq.order.topic=orderTopic
@Slf4j
@Component
@RocketMQMessageListener(topic = "${mq.pay.topic}",
        consumerGroup = "${mq.pay.consumer.group.name}",
        messageModel = MessageModel.BROADCASTING)
public class CancelOrderConsumer implements RocketMQListener<MessageExt>{

    @Override
    public void onMessage(MessageExt messageExt) {

        try {
            //1.解析消息内容
            String body = new String(messageExt.getBody(),"UTF-8");
            //TradePay tradePay = JSON.parseObject(body,TradePay.class);
            MQEntity mqEntity = JSON.parseObject(body, MQEntity.class);
            //2.业务内容

        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }

    }
}

线程池方式优化

 executorService.submit(new Runnable() {
                @Override
                public void run() {
                    try {
                        SendResult sendResult = sendMessage(topic, 
                                                            tag, 
                                                            finalTradePay.getPayId(), 
                                                            JSON.toJSONString(finalTradePay));
                        log.info(JSON.toJSONString(sendResult));
                        if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
                            mqProducerTempMapper.deleteByPrimaryKey(mqProducerTemp.getId());
                            System.out.println("删除消息表成功");
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
        } else {
            CastException.cast(ShopCode.SHOP_PAYMENT_IS_PAID);
        }
  }
@Bean
public ThreadPoolTaskExecutor getThreadPool() {

    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

    executor.setCorePoolSize(4);

    executor.setMaxPoolSize(8);

    executor.setQueueCapacity(100);

    executor.setKeepAliveSeconds(60);

    executor.setThreadNamePrefix("Pool-A");

    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

    executor.initialize();

    return executor;

}

雪花算法

时间搓+机器号+序列号
时间戳: 雪花算法的64位ID中,首先占用了41位用于存储时间戳。这个时间戳记录了生成ID的时间,精确到毫秒级。

机器ID: 接下来的10位用于存储机器的唯一ID。在分布式系统中,不同机器需要有唯一的标识符,以避免产生重复的ID。

序列号: 最后的13位用于存储序列号。在同一毫秒内,可以生成多个ID。序列号是递增的,可以用来解决同一毫秒内生成多个ID时的冲突问题。

雪花算法的特点:

可以在分布式环境中生成唯一ID,避免了重复ID的问题。
生成的ID是递增的,有序的,方便数据库索引和查询。
生成ID的速度很快,基本上可以达到每毫秒生成数万个ID。
雪花算法的实现相对简单,不需要依赖其他外部组件。
需要注意的是,雪花算法并不保证绝对的全局唯一性,因为时钟回拨、机器ID重用等问题都可能导致ID重复。在实际使用中,应该根据具体情况来进行适当的配置和处理,以确保ID的唯一性。另外,随着分布式系统的扩展,机器ID的分配和管理也需要注意,避免冲突和重复。文章来源地址https://www.toymoban.com/news/detail-633412.html

到了这里,关于RocketMQ使用的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RocketMQ 学习教程——(二)SpringBoot 集成 RocketMQ

    在 Maven 仓库【https://mvnrepository.com/】中搜索 RocketMQ 依赖: 在 SpringBoot 项目的 Pom.xml 文件中添加对应 MQ 版本的依赖: YAML 配置 在 SpringBoot 项目的 yml 配置文件中添加以下配置: 创建监听器 创建一个 MQMsgListener 类用于监听 RocketMQ 的消息,类上标注注解: @Component 、 @RocketMQMe

    2024年02月03日
    浏览(43)
  • 【微服务学习】spring-cloud-starter-stream 4.x 版本的使用(rocketmq 版)

    @[TOC](【微服务学习】spring-cloud-starter-stream 4.x 版本的使用(rocketmq 版)) 2.1 消息发送者 2.1.1 使用 StreamBridge streamBridge; 往指定信道发送消息 2.1.2 通过隐式绑定信道, 注册 Bean 发送消息 2.2 消息接收者 注意: 多个方法之间可以使用 “|” 间隔, 但是绑定时 多个需要按顺序写. 其中

    2024年02月03日
    浏览(38)
  • RocketMQ笔记

    科普: 把数据放到消息队列叫做 生产者 从消息队列里边取数据叫做 消费者   消息队列主要有三大用途 :解耦、异步、削峰,是消息队列最主要的三大作用。   选择中间件的可以从这些维度来考虑:可靠性,性能,功能,可运维行,可拓展性,社区活跃度。目前常用的几个

    2024年02月08日
    浏览(35)
  • 使用 Docker 安装 RocketMQ 使用 docker 安装 rocketmq

    Docker常用命令大全 RocketMQ 是一个分布式的消息中间件,由 NameServer 和Broker两个角色组成,是一种典型的基于发布/订阅模式的消息通信解决方案。 NameServer 是 RocketMQ 的命名服务,可以理解为类似于 DNS 的服务,它主要负责记录 Topic 的路由信息和 Broker 的地址信息。每个 Rocket

    2024年02月13日
    浏览(38)
  • RocketMQ mqadmin java springboot python 调用笔记

    mqadmin命令列表 Python 生产者:producer.py 运行 mqadmin查询topic状态 https://github.com/apache/rocketmq-dashboard 自行编译 启动        consoumer.py 启动python消费者 可以看到my-group1已被消费  再启动一个consumer.py,产生一次消息 可以看到,只有一个consumer消费到了消息,说明默认情况下,消息

    2024年02月11日
    浏览(32)
  • SpringCloudAlibaba:消息驱动之RocketMQ学习

    目录 一、MQ简介 (一)什么是MQ (二)MQ的应用场景 1、异步解耦 2、流量削峰 (三)常见的MQ产品 二、RocketMQ入门 (一)RocketMQ安装部署 1、环境要求 2、下载RocketMQ 3、安装RocketMQ 4、启动RocketMQ 5、测试RocketMQ 6、关闭RocketMQ (二)RocketMQ控制台安装与启动 下载并解压 三、sp

    2024年02月16日
    浏览(27)
  • rocketMq消息队列原生api使用以及rocketMq整合springboot

    使用RocketMQ的原生API开发是最简单也是目前看来最牢靠的方式。这里用SpringBoot来搭建一系列消息生产者和消息消费者,来访问之前搭建的RocketMQ集群。 首先创建一个基于Maven的SpringBoot工程,引入如下依赖: RocketMQ的官网上有很多经典的测试代码,这些代码虽然依赖的版本比较

    2024年02月12日
    浏览(45)
  • Springbootg整合RocketMQ ——使用 rocketmq-spring-boot-starter 来配置发送和消费 RocketMQ 消息

           本文解析将 RocketMQ Client 端集成为 spring-boot-starter 框架的开发细节,然后通过一个简单的示例来一步一步的讲解如何使用这个 spring-boot-starter 工具包来配置,发送和消费 RocketMQ 消息。 添加maven依赖: 修改application.properties 注意: 请将上述示例配置中的 127.0.0.1:9876 替换

    2024年03月22日
    浏览(42)
  • RocketMQ的学习历程(一)----MQ简介

    1.1.MQ简介: MQ,message Queue,是一种提供消息队列服务的中间件,也称消息中间件,是一套提供了信息生产,储存,消费全过程的API软件系统。 1.2.MQ的主要用途: 流量削峰 MQ的流量削峰是指在系统面临高并发请求时,使用MQ来缓冲请求,避免数据库或其他服务承受过大的压力。 例

    2023年04月15日
    浏览(31)
  • RocketMQ的学习历程(5)----broker内部设计

    在首个学习历程中,我们已经了解了,RokctMQ简单的工作流程。 如果想要更深的理解RokcetMQ消息处理的流程,broker内部流程的理解是必要的, 这里我们就了解一下Broker内部的工作流程。 当Consumer消费消息时,需要先读取ConsumeQueue得到offset,再通过offset找到CommitLog对应的消息内

    2024年02月07日
    浏览(39)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包