RocketMQ基础API使用以及基本原理探究

这篇具有很好参考价值的文章主要介绍了RocketMQ基础API使用以及基本原理探究。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

同步发送

等待消息返回后再继续进行下面的操作

适合可靠性要求较高,数据量小,实时响应

消费者:

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * @Author jiangxuzhao
 * @Description
 * @Date 2023/6/9
 */
public class SyncConsumer {
    public static void main(String[] args) throws MQClientException {
        // 推模式,消费者等到Broker把消息推过来
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushComsumer");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.subscribe("Sync","*");

        // 并行消费消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for(int i=0;i<list.size();i++){
                    System.out.println(i+"_消息消费成功_"+new String(list.get(i).getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.println("消费者启动成功");
    }
}

生产者:

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.nio.charset.StandardCharsets;

/**
 * @Author jiangxuzhao
 * @Description
 * @Date 2023/6/9
 */
public class SyncProducer {
    public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("SyncProducer");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        System.out.println("生产者启动成功");

        for(int i=0;i<2;i++){
            Message message = new Message("Sync", "SyncTags", (i+"_SyncProducer").getBytes(StandardCharsets.UTF_8));
            // 同步发送
            SendResult sendResult = producer.send(message);
            System.out.println(i+"_消息发送成功"+sendResult);
        }

        producer.shutdown();

    }
}

其中消费者的输出为:

0_消息消费成功_0_SyncProducer
0_消息消费成功_1_SyncProducer

前面都是0说明这种推模式broker是分两次推送的

异步发送

不等待消息返回直接进入后续的流程

消费者:

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * @Author jiangxuzhao
 * @Description
 * @Date 2023/6/9
 */
public class AsyncConsumer {
    public static void main(String[] args) throws MQClientException {
        // 推模式,消费者等到Broker把消息推过来
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushComsumer");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.subscribe("Async","*");

        // 并行消费消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for(int i=0;i<list.size();i++){
                    System.out.println(i+"_消息消费成功_"+new String(list.get(i).getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.println("消费者启动成功");
    }
}

生产者:

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;

/**
 * @Author jiangxuzhao
 * @Description
 * @Date 2023/6/10
 */
public class AsyncProducer {
    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("AsyncProducer");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        System.out.println("生产者启动成功");
        CountDownLatch countDownLatch = new CountDownLatch(100);

        for(int i=0;i<100;i++){
            final int index= i;

            Message message = new Message("Async", "AsyncTags", (i+"_AsyncProducer").getBytes(StandardCharsets.UTF_8));
            // 异步发送,回调方法由broker调用
            producer.send(message, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.println(index +"_消息发送成功_"+sendResult);
                    countDownLatch.countDown();
                }

                @Override
                public void onException(Throwable throwable) {
                    System.out.println(index +"_消息发送失败_"+throwable.getStackTrace());
                    countDownLatch.countDown();
                }
            });
        }

        // 主线程等待异步发送结束
        countDownLatch.await();
        System.out.println("发送结束");
        producer.shutdown();
    }
}

如果没有countDownLatch.await(),会出现下面的情况:

生产者启动成功
发送结束
0_消息发送失败_[Ljava.lang.StackTraceElement;@10c89bd0
10_消息发送失败_[Ljava.lang.StackTraceElement;@3d181392
11_消息发送失败_[Ljava.lang.StackTraceElement;@69da09fa
....

就是producer主线程先关闭了,但是异步发送还没结束,深入源码看这个send是用ThreadPoolExecutor异步线程池

其中消费者的输出为:

....
0_消息消费成功_78_AsyncProducer
0_消息消费成功_98_AsyncProducer
0_消息消费成功_99_AsyncProducer
0_消息消费成功_86_AsyncProducer
0_消息消费成功_87_AsyncProducer
0_消息消费成功_69_AsyncProducer
0_消息消费成功_9_AsyncProduce

单向发送

只负责发送,不管消息是否发送成功

适合收集日志,可靠性要求不高的场景

消费者:

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * @Author jiangxuzhao
 * @Description
 * @Date 2023/6/9
 */
public class OnewayConsumer {
    public static void main(String[] args) throws MQClientException {
        // 推模式,消费者等到Broker把消息推过来
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushComsumer");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.subscribe("Oneway","*");

        // 并行消费消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for(int i=0;i<list.size();i++){
                    System.out.println(i+"_消息消费成功_"+new String(list.get(i).getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.println("消费者启动成功");
    }
}

发送者:

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.nio.charset.StandardCharsets;

/**
 * @Author jiangxuzhao
 * @Description
 * @Date 2023/6/9
 */
public class OnewayProducer {
    public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("SyncProducer");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        System.out.println("生产者启动成功");

        for(int i=0;i<2;i++){
            Message message = new Message("Oneway", "OnewayTags", (i+"_OnewayProducer").getBytes(StandardCharsets.UTF_8));
            // 单向发送
            producer.send(message);
            System.out.println(i+"_消息发送成功");
        }

        producer.shutdown();

    }
}

消费者输出:

消费者启动成功
0_消息消费成功_0_OnewayProducer
0_消息消费成功_1_OnewayProduce

拉模式

随机获取一个queue的消息

生产者:

随便指定一种发送模式,这里选了异步发送

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;

/**
 * @Author jiangxuzhao
 * @Description
 * @Date 2023/6/11
 */
public class LitePullProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException, RemotingException {
        DefaultMQProducer producer = new DefaultMQProducer("AsyncProducer");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        System.out.println("生产者启动成功");
        CountDownLatch countDownLatch = new CountDownLatch(100);

        for(int i=0;i<100;i++){
            final int index= i;

            Message message = new Message("LitePull", "LitePullTags", (i+"_LitePullProducer").getBytes(StandardCharsets.UTF_8));
            // 异步发送,回调方法由broker调用
            producer.send(message, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.println(index +"_消息发送成功_"+sendResult);
                    countDownLatch.countDown();
                }

                @Override
                public void onException(Throwable throwable) {
                    System.out.println(index +"_消息发送失败_"+throwable.getStackTrace());
                    countDownLatch.countDown();
                }
            });
        }

        // 主线程等待异步发送结束
        countDownLatch.await();
        System.out.println("发送结束");
        producer.shutdown();
    }
}

消费者:

import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * @Author jiangxuzhao
 * @Description 拉模式-随机获取一个queue的消息
 * @Date 2023/6/11
 */
public class LitePullConsumer {
    public static void main(String[] args) throws MQClientException {
        DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("LitePullConsumer");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.subscribe("LitePull","*");
        consumer.start();

        while(true){
            List<MessageExt> messageExts = consumer.poll();
            System.out.println("消息接收成功");
            messageExts.forEach(n->{
                System.out.println("消息消费成功_"+n);
            });
        }

    }
}

消费者输出:

可以看出一次拉出来的消息queueId都是一样的。

消息接收成功
消息接收成功
消息接收成功
消息消费成功_MessageExt [brokerName=broker-a, queueId=2, storeSize=199, queueOffset=0, sysFlag=0, bornTimestamp=1686479335622, bornHost=/127.0.0.1:53451, storeTimestamp=1686479335660, storeHost=/127.0.0.1:10911, msgId=7F00000100002A9F000000000004DB87, commitLogOffset=318343, bodyCRC=44743681, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='LitePull', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=26, UNIQ_KEY=7F5E0001F19018B4AAC23776D8C40001, CLUSTER=DefaultCluster, TAGS=LitePullTags}, body=[54, 95, 76, 105, 116, 101, 80, 117, 108, 108, 80, 114, 111, 100, 117, 99, 101, 114], transactionId='null'}]
消息消费成功_MessageExt [brokerName=broker-a, queueId=2, storeSize=199, queueOffset=1, sysFlag=0, bornTimestamp=1686479335622, bornHost=/127.0.0.1:53451, storeTimestamp=1686479335661, storeHost=/127.0.0.1:10911, msgId=7F00000100002A9F000000000004DEA3, commitLogOffset=319139, bodyCRC=1669641829, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='LitePull', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=26, UNIQ_KEY=7F5E0001F19018B4AAC23776D8C40007, CLUSTER=DefaultCluster, TAGS=LitePullTags}, body=[49, 95, 76, 105, 116, 101, 80, 117, 108, 108, 80, 114, 111, 100, 117, 99, 101, 114], transactionId='null'}]
消息消费成功_MessageExt [brokerName=broker-a, queueId=2, storeSize=200, queueOffset=2, sysFlag=0, bornTimestamp=1686479335637, bornHost=/127.0.0.1:53451, storeTimestamp=1686479335661, storeHost=/127.0.0.1:10911, msgId=7F00000100002A9F000000000004DF6A, commitLogOffset=319338, bodyCRC=1570600805, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='LitePull', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=26, UNIQ_KEY=7F5E0001F19018B4AAC23776D8D5000F, CLUSTER=DefaultCluster, TAGS=LitePullTags}, body=[49, 54, 95, 76, 105, 116, 101, 80, 117, 108, 108, 80, 114, 111, 100, 117, 99, 101, 114], transactionId='null'}]
消息消费成功_MessageExt [brokerName=broker-a, queueId=2, storeSize=200, queueOffset=3, sysFlag=0, bornTimestamp=1686479335637, bornHost=/127.0.0.1:53451, storeTimestamp=1686479335661, storeHost=/127.0.0.1:10911, msgId=7F00000100002A9F000000000004E032, commitLogOffset=319538, bodyCRC=852476292, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='LitePull', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=26, UNIQ_KEY=7F5E0001F19018B4AAC23776D8D50010, CLUSTER=DefaultCluster, TAGS=LitePullTags}, body=[49, 55, 95, 76, 105, 116, 101, 80, 117, 108, 108, 80, 114, 111, 100, 117, 99, 101, 114], transactionId='null'}]
消息消费成功_MessageExt [brokerName=broker-a, queueId=2, storeSize=200, queueOffset=4, sysFlag=0, bornTimestamp=1686479335637, bornHost=/127.0.0.1:53451, storeTimestamp=1686479335663, storeHost=/127.0.0.1:10911, msgId=7F00000100002A9F000000000004E4E1, commitLogOffset=320737, bodyCRC=924559879, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='LitePull', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=26, UNIQ_KEY=7F5E0001F19018B4AAC23776D8D5000B, CLUSTER=DefaultCluster, TAGS=LitePullTags}, body=[49, 53, 95, 76, 105, 116, 101, 80, 117, 108, 108, 80, 114, 111, 100, 117, 99, 101, 114], transactionId='null'}]
消息消费成功_MessageExt [brokerName=broker-a, queueId=2, storeSize=200, queueOffset=5, sysFlag=0, bornTimestamp=1686479335637, bornHost=/127.0.0.1:53451, storeTimestamp=1686479335666, storeHost=/127.0.0.1:10911, msgId=7F00000100002A9F000000000004E801, commitLogOffset=321537, bodyCRC=1481186534, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='LitePull', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=26, UNIQ_KEY=7F5E0001F19018B4AAC23776D8D50017, CLUSTER=DefaultCluster, TAGS=LitePullTags}, body=[49, 52, 95, 76, 105, 116, 101, 80, 117, 108, 108, 80, 114, 111, 100, 117, 99, 101, 114], transactionId='null'}]
消息消费成功_MessageExt [brokerName=broker-a, queueId=2, storeSize=200, queueOffset=6, sysFlag=0, bornTimestamp=1686479335637, bornHost=/127.0.0.1:53451, storeTimestamp=1686479335666, storeHost=/127.0.0.1:10911, msgId=7F00000100002A9F000000000004E991, commitLogOffset=321937, bodyCRC=1743184709, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='LitePull', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=26, UNIQ_KEY=7F5E0001F19018B4AAC23776D8D5001A, CLUSTER=DefaultCluster, TAGS=LitePullTags}, body=[50, 56, 95, 76, 105, 116, 101, 80, 117, 108, 108, 80, 114, 111, 100, 117, 99, 101, 114], transactionId='null'}]
消息消费成功_MessageExt [brokerName=broker-a, queueId=2, storeSize=200, queueOffset=7, sysFlag=0, bornTimestamp=1686479335636, bornHost=/127.0.0.1:53451, storeTimestamp=1686479335667, storeHost=/127.0.0.1:10911, msgId=7F00000100002A9F000000000004EBE9, commitLogOffset=322537, bodyCRC=1446355043, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='LitePull', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=26, UNIQ_KEY=7F5E0001F19018B4AAC23776D8D4000A, CLUSTER=DefaultCluster, TAGS=LitePullTags}, body=[49, 50, 95, 76, 105, 116, 101, 80, 117, 108, 108, 80, 114, 111, 100, 117, 99, 101, 114], transactionId='null'}]
消息消费成功_MessageExt [brokerName=broker-a, queueId=2, storeSize=200, queueOffset=8, sysFlag=0, bornTimestamp=1686479335637, bornHost=/127.0.0.1:53451, storeTimestamp=1686479335667, storeHost=/127.0.0.1:10911, msgId=7F00000100002A9F000000000004F099, commitLogOffset=323737, bodyCRC=535165864, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='LitePull', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=26, UNIQ_KEY=7F5E0001F19018B4AAC23776D8D50014, CLUSTER=DefaultCluster, TAGS=LitePullTags}, body=[50, 49, 95, 76, 105, 116, 101, 80, 117, 108, 108, 80, 114, 111, 100, 117, 99, 101, 114], transactionId='null'}]
消息消费成功_MessageExt [brokerName=broker-a, queueId=2, storeSize=200, queueOffset=9, sysFlag=0, bornTimestamp=1686479335640, bornHost=/127.0.0.1:53451, storeTimestamp=1686479335672, storeHost=/127.0.0.1:10911, msgId=7F00000100002A9F000000000004F869, commitLogOffset=325737, bodyCRC=1182517109, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='LitePull', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=26, UNIQ_KEY=7F5E0001F19018B4AAC23776D8D80023, CLUSTER=DefaultCluster, TAGS=LitePullTags}, body=[51, 55, 95, 76, 105, 116, 101, 80, 117, 108, 108, 80, 114, 111, 100, 117, 99, 101, 114], transactionId='null'}]
消息接收成功
消息消费成功_MessageExt [brokerName=broker-a, queueId=1, storeSize=199, queueOffset=0, sysFlag=0, bornTimestamp=1686479335624, bornHost=/127.0.0.1:53451, storeTimestamp=1686479335655, storeHost=/127.0.0.1:10911, msgId=7F00000100002A9F000000000004D86B, commitLogOffset=317547, bodyCRC=461876872, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='LitePull', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=25, UNIQ_KEY=7F5E0001F19018B4AAC23776D8C80008, CLUSTER=DefaultCluster, TAGS=LitePullTags}, body=[56, 95, 76, 105, 116, 101, 80, 117, 108, 108, 80, 114, 111, 100, 117, 99, 101, 114], transactionId='null'}]
......

指定一个queue的消息

生产者:

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;

/**
 * @Author jiangxuzhao
 * @Description
 * @Date 2023/6/11
 */
public class LitePullProducerAssign {
    public static void main(String[] args) throws MQClientException, InterruptedException, RemotingException {
        DefaultMQProducer producer = new DefaultMQProducer("AsyncProducer");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        System.out.println("生产者启动成功");
        CountDownLatch countDownLatch = new CountDownLatch(100);

        for(int i=0;i<100;i++){
            final int index= i;

            Message message = new Message("LitePullAssign", "LitePullAssignTags", (i+"_LitePullProducerAssign").getBytes(StandardCharsets.UTF_8));
            // 异步发送,回调方法由broker调用
            producer.send(message, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.println(index +"_消息发送成功_"+sendResult);
                    countDownLatch.countDown();
                }

                @Override
                public void onException(Throwable throwable) {
                    System.out.println(index +"_消息发送失败_"+throwable.getStackTrace());
                    countDownLatch.countDown();
                }
            });
        }

        // 主线程等待异步发送结束
        countDownLatch.await();
        System.out.println("发送结束");
        producer.shutdown();
    }
}

消费者:

import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

/**
 * @Author jiangxuzhao
 * @Description 拉模式-指定一个queue的消息
 * @Date 2023/6/11
 */
public class LitePullConsumerAssign {
    public static void main(String[] args) throws MQClientException {
        DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("LitePullConsumerAssign");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.start();
        Collection<MessageQueue> messageQueues = consumer.fetchMessageQueues("LitePullAssign");
        ArrayList<MessageQueue> messageQueues1 = new ArrayList<>(messageQueues);

        consumer.assign(messageQueues1);
        consumer.seek(messageQueues1.get(0),10);
        while(true){
            List<MessageExt> messageExts = consumer.poll();
            System.out.println("消息接收成功");
            messageExts.forEach(n->{
                System.out.println("消息消费成功_"+n);
            });
        }

    }
}

在检验的时候,需要生产者先把消息写到broker里面去,consumer才能拉取到,消费者输出如下:

消息消费成功_MessageExt [brokerName=broker-a, queueId=3, storeSize=218, queueOffset=57, sysFlag=0, bornTimestamp=1686486526339, bornHost=/127.0.0.1:54222, storeTimestamp=1686486526371, storeHost=/127.0.0.1:10911, msgId=7F00000100002A9F000000000005F037, commitLogOffset=389175, bodyCRC=1241947329, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='LitePullAssign', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=72, UNIQ_KEY=7F5E0001F35618B4AAC237E491830029, CLUSTER=DefaultCluster, TAGS=LitePullAssignTags}, body=[52, 52, 95, 76, 105, 116, 101, 80, 117, 108, 108, 80, 114, 111, 100, 117, 99, 101, 114, 65, 115, 115, 105, 103, 110], transactionId='null'}]
消息消费成功_MessageExt [brokerName=broker-a, queueId=3, storeSize=218, queueOffset=58, sysFlag=0, bornTimestamp=1686486526339, bornHost=/127.0.0.1:54222, storeTimestamp=1686486526371, storeHost=/127.0.0.1:10911, msgId=7F00000100002A9F000000000005F111, commitLogOffset=389393, bodyCRC=2065607606, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='LitePullAssign', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=72, UNIQ_KEY=7F5E0001F35618B4AAC237E49183002A, CLUSTER=DefaultCluster, TAGS=LitePullAssignTags}, body=[52, 51, 95, 76, 105, 116, 101, 80, 117, 108, 108, 80, 114, 111, 100, 117, 99, 101, 114, 65, 115, 115, 105, 103, 110], transactionId='null'}]
消息消费成功_MessageExt [brokerName=broker-a, queueId=3, storeSize=218, queueOffset=59, sysFlag=0, bornTimestamp=1686486526339, bornHost=/127.0.0.1:54222, storeTimestamp=1686486526371, storeHost=/127.0.0.1:10911, msgId=7F00000100002A9F000000000005F2C5, commitLogOffset=389829, bodyCRC=1548273985, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='LitePullAssign', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=72, UNIQ_KEY=7F5E0001F35618B4AAC237E491830023, CLUSTER=DefaultCluster, TAGS=LitePullAssignTags}, body=[51, 55, 95, 76, 105, 116, 101, 80, 117, 108, 108, 80, 114, 111, 100, 117, 99, 101, 114, 65, 115, 115, 105, 103, 110], transactionId='null'}]
消息接收成功
消息消费成功_MessageExt [brokerName=broker-a, queueId=3, storeSize=218, queueOffset=60, sysFlag=0, bornTimestamp=1686486526339, bornHost=/127.0.0.1:54222, storeTimestamp=1686486526371, storeHost=/127.0.0.1:10911, msgId=7F00000100002A9F000000000005F39F, commitLogOffset=390047, bodyCRC=322785486, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='LitePullAssign', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=72, UNIQ_KEY=7F5E0001F35618B4AAC237E49183002C, CLUSTER=DefaultCluster, TAGS=LitePullAssignTags}, body=[52, 54, 95, 76, 105, 116, 101, 80, 117, 108, 108, 80, 114, 111, 100, 117, 99, 101, 114, 65, 115, 115, 105, 103, 110], transactionId='null'}]
消息消费成功_MessageExt [brokerName=broker-a, queueId=3, storeSize=218, queueOffset=61, sysFlag=0, bornTimestamp=1686486526339, bornHost=/127.0.0.1:54222, storeTimestamp=1686486526371, storeHost=/127.0.0.1:10911, msgId=7F00000100002A9F000000000005F479, commitLogOffset=390265, bodyCRC=1881692935, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='LitePullAssign', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=72, UNIQ_KEY=7F5E0001F35618B4AAC237E49183002E, CLUSTER=DefaultCluster, TAGS=LitePullAssignTags}, body=[52, 57, 95, 76, 105, 116, 101, 80, 117, 108, 108, 80, 114, 111, 100, 117, 99, 101, 114, 65, 115, 115, 105, 103, 110], transactionId='null'}]

顺序消息

生产者局部有序地发送到一个queue中,但多个queue之间是全局无序的。

生产者:通过MessageQueueSelector将消息有序地发送到同一个queue中。

package order;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.nio.charset.StandardCharsets;
import java.util.List;

/**
 * @Author jiangxuzhao
 * @Description
 * @Date 2023/6/11
 */
public class OrderProducer {
    public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("OrderProducer");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        // i的消息都是按顺序发送的
        for(int i=0;i<5;i++){
            for(int j=0;j<10;j++){
                Message message = new Message("Order","OrderTag",("order_"+i+"_step_"+j).getBytes(StandardCharsets.UTF_8));
                // MessageQueue分配
                SendResult sendResult = producer.send(message, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        Integer id=(Integer) arg;
                        int index= id%mqs.size();
                        return mqs.get(index); // 返回要发送到的MessageQueue
                    }
                }, i); // 传入的就是arg
                System.out.println("消息发送成功_"+sendResult);
            }
        }
        producer.shutdown();
    }
}

消费者:通过MessageListenerOrderly消费者每次读取消息都只从一个queue中获取。

package order;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * @Author jiangxuzhao
 * @Description
 * @Date 2023/6/11
 */
public class OrderConsumer {
    public static void main(String[] args) throws MQClientException {
        // 推模式,消费者等到Broker把消息推过来
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushComsumer");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.subscribe("Order","*");

        // 顺序消费
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                for(int i=0;i<msgs.size();i++){
                    System.out.println(i+"_消息消费成功_"+new String(msgs.get(i).getBody()));
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        consumer.start();
        System.out.println("消费者启动成功");
    }
}

消费者输出:

外循环都是为3的时候,内循环按照顺序消费

消费者启动成功
0_消息消费成功_order_3_step_0
0_消息消费成功_order_3_step_1
0_消息消费成功_order_3_step_2
0_消息消费成功_order_3_step_3
0_消息消费成功_order_3_step_4
0_消息消费成功_order_3_step_5
0_消息消费成功_order_3_step_6
0_消息消费成功_order_3_step_7
0_消息消费成功_order_3_step_8
0_消息消费成功_order_3_step_9

如果改成并发消费,输出就乱了

消费者启动成功
0_消息消费成功_order_3_step_7
0_消息消费成功_order_3_step_9
0_消息消费成功_order_3_step_4
0_消息消费成功_order_3_step_8
0_消息消费成功_order_3_step_2
0_消息消费成功_order_3_step_5
0_消息消费成功_order_3_step_0
0_消息消费成功_order_3_step_6
0_消息消费成功_order_3_step_1
0_消息消费成功_order_3_step_3

广播消息

广播消息并没有特定的消息消费者样例,这是因为这涉及到消费者的集群消费横式。

  • MessageModel.BROADCASTING:广播消息。一条消息会发给所有订阅了对应主题的消费者,不管消费者是不是同一个消费者组。
  • MessageModel.CLUSTERING:集群消息。每一条消息只会被同一个消费者组中的一个实例消费。

生产者:

选择了同步发送

package broadcast;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.nio.charset.StandardCharsets;

/**
 * @Author jiangxuzhao
 * @Description
 * @Date 2023/6/9
 */
public class BroadcastProducer {
    public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("SyncProducer");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        System.out.println("生产者启动成功");

        for(int i=0;i<2;i++){
            Message message = new Message("Broadcast", "BroadcastTags", (i+"_BroadcastProducer").getBytes(StandardCharsets.UTF_8));
            // 同步发送
            SendResult sendResult = producer.send(message);
            System.out.println(i+"_消息发送成功"+sendResult);
        }

        producer.shutdown();

    }
}

消费者:

MessageModel.BROADCASTING:广播消息

package broadcast;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

import java.util.List;

/**
 * @Author jiangxuzhao
 * @Description
 * @Date 2023/6/9
 */
public class BroadcastConsumer {
    public static void main(String[] args) throws MQClientException {
        // 推模式,消费者等到Broker把消息推过来
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushComsumer");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.subscribe("Broadcast","*");
        consumer.setMessageModel(MessageModel.BROADCASTING);

        // 并行消费消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for(int i=0;i<list.size();i++){
                    System.out.println(i+"_消息消费成功_"+new String(list.get(i).getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.println("消费者启动成功");
    }
}

我这里开启了两个消费者

BroadcastConsumer输出:

消费者启动成功
0_消息消费成功_3_BroadcastProducer
0_消息消费成功_0_BroadcastProducer
0_消息消费成功_2_BroadcastProducer
0_消息消费成功_1_BroadcastProducer
0_消息消费成功_4_BroadcastProducer
0_消息消费成功_7_BroadcastProducer
0_消息消费成功_5_BroadcastProducer
0_消息消费成功_6_BroadcastProducer
0_消息消费成功_8_BroadcastProducer
0_消息消费成功_9_BroadcastProducer

BroadcastConsumer2输出:

消费者启动成功
0_消息消费成功_0_BroadcastProducer
0_消息消费成功_2_BroadcastProducer
0_消息消费成功_3_BroadcastProducer
0_消息消费成功_1_BroadcastProducer
0_消息消费成功_5_BroadcastProducer
0_消息消费成功_4_BroadcastProducer
0_消息消费成功_6_BroadcastProducer
0_消息消费成功_7_BroadcastProducer
0_消息消费成功_8_BroadcastProducer
0_消息消费成功_9_BroadcastProducer

MessageModel.CLUSTERING:集群消息

消费者改下:

package broadcast;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

import java.util.List;

/**
 * @Author jiangxuzhao
 * @Description
 * @Date 2023/6/9
 */
public class BroadcastConsumer {
    public static void main(String[] args) throws MQClientException {
        // 推模式,消费者等到Broker把消息推过来
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushComsumer");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.subscribe("Broadcast","*");
        consumer.setMessageModel(MessageModel.CLUSTERING);

        // 并行消费消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for(int i=0;i<list.size();i++){
                    System.out.println(i+"_消息消费成功_"+new String(list.get(i).getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.println("消费者启动成功");
    }
}

我这里开启了两个消费者

BroadcastConsumer输出:

消费者启动成功
0_消息消费成功_0_BroadcastProducer
0_消息消费成功_4_BroadcastProducer
0_消息消费成功_8_BroadcastProducer
0_消息消费成功_3_BroadcastProducer
0_消息消费成功_7_BroadcastProducer

BroadcastConsumer2输出:

消费者启动成功
0_消息消费成功_0_BroadcastProducer
0_消息消费成功_4_BroadcastProducer
0_消息消费成功_8_BroadcastProducer

每个消费者消费了部分

延迟消息

消息在producer.send以后不会立刻发出去,而是会在producer端延迟一段时间

生产者:

package schedule;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.nio.charset.StandardCharsets;
import java.time.LocalTime;

/**
 * @Author jiangxuzhao
 * @Description
 * @Date 2023/6/12
 */
public class ScheduleProducer {
    public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("SyncProducer");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        System.out.println("生产者启动成功");

        for(int i=0;i<2;i++){
            Message message = new Message("Schedule", "ScheduleTags", (i+"_ScheduleProducer").getBytes(StandardCharsets.UTF_8));
            // 延迟发送,单向发送
            message.setDelayTimeLevel(2); // 延迟5s
            producer.send(message);
            System.out.println(i+"_消息发送成功"+ LocalTime.now());
        }

        producer.shutdown();

    }
}

消费者:

package schedule;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.time.LocalTime;
import java.util.List;

/**
 * @Author jiangxuzhao
 * @Description
 * @Date 2023/6/12
 */
public class ScheduleConsumer {
    public static void main(String[] args) throws MQClientException {
        // 推模式,消费者等到Broker把消息推过来
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ScheduleConsumer");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.subscribe("Schedule","*");

        // 并行消费消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for(int i=0;i<list.size();i++){
                    System.out.println(i+"_消息消费成功_"+ LocalTime.now());
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.println("消费者启动成功");
    }
}

假设消费者可以立刻接收到生产者已经发出的消息

生产者输出:

生产者启动成功
0_消息发送成功22:41:44.096
1_消息发送成功22:41:44.102
22:41:44.148 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
22:41:44.151 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:10911] result: true

消费者输出:

消费者启动成功
0_消息消费成功_22:41:49.165
0_消息消费成功_22:41:49.165

批量消息

批量消息是指将多条消息合井成一个批量消息,一次发送出去。这样的好处是可以减少网络IO,提升吞吐量。

批量消息的使用限制:

  • 消息大小不能超过4M,虽然源码注释不能超1M,但是实际使用不超过4M即可。平衡整体的性能,建议保持1M左右。
  • 相同的Topic
  • 相同的waitStoreMsgOK
  • 不能是延迟消息、事务消息等

生产者:

其实就是producer.send一个ArrayList

package batch;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;

/**
 * @Author jiangxuzhao
 * @Description
 * @Date 2023/6/9
 */
public class BatchProducer {
    public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("SyncProducer");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        System.out.println("生产者启动成功");

        ArrayList<Message> list=new ArrayList<>();
        for(int i=0;i<10;i++){
            Message message = new Message("Batch", "BatchTags", (i+"_BatchProducer").getBytes(StandardCharsets.UTF_8));
            list.add(message);
        }
        // 同步发送,批量发送
        SendResult sendResult=producer.send(list);
        System.out.println("消息发送成功_"+sendResult);
        producer.shutdown();

    }
}

消费者:

package batch;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * @Author jiangxuzhao
 * @Description
 * @Date 2023/6/9
 */
public class BatchConsumer {
    public static void main(String[] args) throws MQClientException {
        // 推模式,消费者等到Broker把消息推过来
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushComsumer");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.subscribe("Batch","*");

        // 并行消费消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for(int i=0;i<list.size();i++){
                    System.out.println(i+"_消息消费成功_"+new String(list.get(i).getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.println("消费者启动成功");
    }
}

消费者输出:

消费者启动成功
0_消息消费成功_0_BatchProducer
0_消息消费成功_2_BatchProducer
0_消息消费成功_1_BatchProducer
0_消息消费成功_3_BatchProducer
0_消息消费成功_4_BatchProducer
0_消息消费成功_5_BatchProducer
0_消息消费成功_6_BatchProducer
0_消息消费成功_7_BatchProducer
0_消息消费成功_8_BatchProducer
0_消息消费成功_9_BatchProducer

过滤消息

  • 过滤消息是在broker端进行的,broker服务会比较繁忙,consumer是将过滤条件推给broker端的。
  • 只有推模式可以使用SQL过滤

Tag过滤

消费者:

package filter;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * @Author jiangxuzhao
 * @Description 过滤消息消费者-Tag方式
 * @Date 2023/6/13
 */
public class FilterTagConsumer {
    public static void main(String[] args) throws MQClientException {
        // 推模式,消费者等到Broker把消息推过来
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushComsumer");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        // 订阅指定Tag进行消息过滤
        consumer.subscribe("Filter","TagA || TagC");

        // 并行消费消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for(int i=0;i<list.size();i++){
                    System.out.println(i+"_消息消费成功_"+new String(list.get(i).getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.println("消费者启动成功");
    }
}

生产者:

package filter;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.nio.charset.StandardCharsets;

/**
 * @Author jiangxuzhao
 * @Description 过滤消息生产者-Tag方式
 * @Date 2023/6/13
 */
public class FilterTagProducer {
    public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("SyncProducer");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        System.out.println("生产者启动成功");
        String[] Tags=new String[]{"TagA","TagB","TagC"};

        for(int i=0;i<10;i++){
            // 指定tag
            Message message = new Message("Filter", Tags[i%Tags.length], (Tags[i%Tags.length]+"_FilterTagProducer").getBytes(StandardCharsets.UTF_8));
            // 同步发送
            SendResult sendResult = producer.send(message);
            System.out.println(i+"_消息发送成功"+sendResult);
        }

        producer.shutdown();

    }
}

消费者输出:

接收地异常慢

消费者启动成功
0_消息消费成功_TagA_FilterTagProducer
.....

sql过滤

消费者:

package filter;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * @Author jiangxuzhao
 * @Description 过滤消息消费者-Sql方式
 * @Date 2023/6/13
 */
public class FilterSqlConsumer {
    public static void main(String[] args) throws MQClientException {
        // 推模式,消费者等到Broker把消息推过来
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushComsumer");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        // MessageSelector.bySql过滤消息
        consumer.subscribe("Filter", MessageSelector.bySql("TAGS is not null and TAGS in ('TagA', 'TagC')"+
                "and jiangxuzhao is not null and jiangxuzhao between 0 and 3"));

        // 并行消费消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for(int i=0;i<list.size();i++){
                    System.out.println(i+"_消息消费成功_"+new String(list.get(i).getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.println("消费者启动成功");
    }
}

生产者:

package filter;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.nio.charset.StandardCharsets;

/**
 * @Author jiangxuzhao
 * @Description 过滤消息生产者-Sql方式
 * @Date 2023/6/13
 */
public class FilterSqlProducer {
    public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("SyncProducer");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        System.out.println("生产者启动成功");
        String[] Tags=new String[]{"TagA","TagB","TagC"};

        for(int i=0;i<10;i++){
            // 指定tag
            Message message = new Message("Filter", Tags[i%Tags.length], (Tags[i%Tags.length]+"_"+i+"_FilterTagProducer").getBytes(StandardCharsets.UTF_8));
            // 在消息中放入过滤属性
            message.putUserProperty("jiangxuzhao",String.valueOf(i));
            // 同步发送
            SendResult sendResult = producer.send(message);
            System.out.println(Tags[i%Tags.length]+"_jiangxuzhao:"+i+"_消息发送成功"+sendResult);
        }

        producer.shutdown();

    }
}

消费者输出:还是很慢

消费者启动成功
0_消息消费成功_TagA_0_FilterTagProducer
0_消息消费成功_TagA_3_FilterTagProducer

事务消息

主要和生产者有关系。

事务消息是在分布式系统中保证最终一致性的两阶段提交的消息实现。他可以保证本地事务执行与消息发送两个操作的原子性,也就是这两个操作一起成功或者一起失败。

RocketMQ基础API使用以及基本原理探究

事务消息机制的关键是在发送消息时会将消息转为一个half半消息,井存入RocketMQ内部的一个Topic(RMQ_ SYS_ TRANS_ HALF_ TOPIC),这个Topic对消费者是不可见的。再经过一系列事务检查通过后,再将消息转存到目标Topic,这样对消费者就可见了。

  • 事务消息不支持延迟消息和批量消息。
  • 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的transactionCheckMax参数来修改此限制,如果已经检查某条消息超过N次的话 (N = transactionCheckMax)则 Broker将丢弃此消息,井在默认情況下同时打印错误日志。可以通过重写AbstractTransactionCheckListener类来修改这个行为。
  • 事务性消息可能不止一次被检查或消费。

生产者:

package transaction;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @Author jiangxuzhao
 * @Description
 * @Date 2023/6/14
 */
public class TransactionProducer {
    public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
        // 事务消息生产者
        TransactionMQProducer producer = new TransactionMQProducer("Transaction");
        producer.setNamesrvAddr("127.0.0.1:9876");
        // 异步提交事务状态,提升性能
        ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("thread-jiangxuzhao");
                return thread;
            }
        });
        producer.setExecutorService(pool);
        // 本地事件监听器,与想要发送的Tag挂钩
        producer.setTransactionListener(new TransactionListenerImpl());
        producer.start();
        System.out.println("生产者启动成功");

        String[] Tags=new String[]{"TagA","TagB","TagC","TagD","TagE"};
        for(int i=0;i<10;i++){
            // 指定tag
            Message message = new Message("Transaction", Tags[i%Tags.length], (Tags[i%Tags.length]+"_TransactionProducer").getBytes(StandardCharsets.UTF_8));
            // 事务消息发送
            TransactionSendResult transactionSendResult = producer.sendMessageInTransaction(message, null);
            System.out.println(i+"_消息发送成功"+transactionSendResult);
        }

        // 主线程等待异步的线程回查都结束
        Thread.sleep(100000);

        producer.shutdown();
    }
}

消费者:

package transaction;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * @Author jiangxuzhao
 * @Description
 * @Date 2023/6/14
 */
public class TransactionConsumer {
    public static void main(String[] args) throws MQClientException {
        // 推模式,消费者等到Broker把消息推过来
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushComsumer");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.subscribe("Transaction","*");

        // 并行消费消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for(int i=0;i<list.size();i++){
                    System.out.println(i+"_消息消费成功_"+new String(list.get(i).getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.println("消费者启动成功");
    }
}

本地事件监听者:

package transaction;

import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

/**
 * @Author jiangxuzhao
 * @Description
 * @Date 2023/6/14
 */
public class TransactionListenerImpl implements TransactionListener {
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        String tag = msg.getTags();
        if (StringUtils.contains("TagA",tag)) {
            return LocalTransactionState.COMMIT_MESSAGE; // 直接提交
        }
        if (StringUtils.contains("TagB",tag)) {
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }else {
            return LocalTransactionState.UNKNOW;
        }
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        String tag = msg.getTags();
        if (StringUtils.contains("TagC",tag)) {
            return LocalTransactionState.COMMIT_MESSAGE; // 直接提交
        }
        if (StringUtils.contains("TagD",tag)) {
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }else {
            return LocalTransactionState.UNKNOW;
        }
    }
}

消费者输出:

TagA、TagC的消息是消费成功的

消费者启动成功
0_消息消费成功_TagA_TransactionProducer
0_消息消费成功_TagA_TransactionProducer
0_消息消费成功_TagC_TransactionProducer
0_消息消费成功_TagC_TransactionProducer
0_消息消费成功_TagC_TransactionProducer

RocketMQ常见问题

RocketMQ如何保证消息不丢失?

RocketMQ基础API使用以及基本原理探究
RocketMQ基础API使用以及基本原理探究

我们将消息流程分为三大部分,每一部分都有可能会丟失数据。

  • 生产阶段:Producer通过网络将消息发送给Broker,这个发送可能会发生丢失。比如网络延迟不可达等。
  • 存储阶段:Broker肯定是先把消息放到内存的,然后根据刷盘策路持久化到硬盘中,刚收到Producer的消息,放入内存,但是异常宕机了,导致消息丢失。
  • 消费阶段:消费失败。比如先提交ack再消费,消费过程中出现异常,该消息就出现了丢失。

解决方案:

  • 生产阶段:使用同步发送失败重试机制;异步发送重写回调方法检查发送结果;Ack确认机制。
  • 存储阶段:同步刷盘机制;集群模式采用同步复制。
  • 消费阶段:正常消费处理完成才提交ACK;如果处理异常返回重试标识。

RocketMQ的消息持久化机制

RocketMQ的消息持久化机制是指将消息存储在磁盘上,以确保消息能够可靠地存储和检素。RocketMQ 的消息持久化机制涉及到以下三个角色:Commitlog、ConsumeQueue 和 IndexFile.

  • CommitLog:消息真正的存储文件,所有的消息(所有messageQueue的消息)都存在 CommitLog文件中。

RocketMQ默认会将消息数据先存储到内存中的一个缓冲区,每当缓冲区中积累了一定量的消息或者一定时间后,就会将缓冲区中的消息批量写入到磁盘上的CommitLog 文件中(同步刷盘写入cache,继而直接写入磁盘;异步刷盘写入cache,就不管写入磁盘了)。消息在写入 CommitLog 文件后就可以被消费者消费了。

Commitlog文件的大小固定1G,写满之后生成新的文件。

并且采用的是顺序写的方式。

  • ConsumeQueue: 消息消费逻辑队列,类似数据库的索引文件。

RocketMQ 中每个主题下的每个消息队列都会对应一个 ConsumeQueue。 ConsumeQueue存储了消息的offset以及该offset对应的消息在CommitLog文件中的位置信息,便于消费者快速定位井消费消息。

每个ConsumeQueue文件固定由30万个固定大小20byte的数据块组成:据块的内容包括:commitLogOffset(8byte,消息在commitlog文件中的位置)+msgSize(4byte,消息在文件中占用的长度)+msg TagCode(8byte, 消息的tag的Hash值)。

  • IndexFile:消息索引文件,主要存储消息key与offset的对应关系,提升消息检索速度。

如果生产者在发送消息时设置了消息Key,那么RocketMQ会将消息Key值和CommitLog Offset存在IndexFie文件中,这样当消费者需要根据消息Key查询消息时,就可以直接在IndexFile文件中查找对应的CommitLog Offset,然后通过 ConsumeQueue文件快速定位并消费消息。

IndexFile文件大小固定400M,可以保存2000W个索引。

三个角色构成的消息存储结构如下:
RocketMQ基础API使用以及基本原理探究

RocketMQ如何保证消息顺序

RocketMQ架构本身是无法保证消息有序的,但是提供了相应的API保证消息有序消费。RocketMQ APl利用FIFO先进先出的特性,保证生产者消息有序进入同一队列,消费者在同一队列消费就能达到消息的有序消费。

  • 使用MessageQueueSelector编写有序消息生产者

有序消息生产者会按照一定的规则将消息发送到同一个队列中,从而保证同一个队列中的消息是有序的。RocketMQ 并不保证整个主题内所有队列的消息都是按照发送顺序排列的。

  • 使用MessageListenerOrderly进行顺序消费与之对应的MessageListenerConcurrently并行消费 (push模式)

MessageListenerOrderly是RocketMQ 专门提供的一种顺序消费的接口,它可以让消费者按照消息发送的顺序,一个一个地处理消息。这个接口支持按照消息的重试次数进行顺序消费、订单ID等作为消息键来实现顺序消费、批量消费等操作。

通过加锁的方式实现(有超时机制),一个队列同时只有一个消费者;井且存在一个定时任务,每隔一段时间就会延长锁的时间,直到整个消息队列全部消费结束。

  • 消费端自己保证消息顺序消费(pull模式)

  • 消费者井发消费时设置消费线程为1

RocketMQ 的消费者可以开启多个消费线程同时消费同一个队列中的消息,如果要保证消息的顺序,需要将消费线程数设置为1。这样,在同一个队列中,每个消息只会被单个消费者线程消费,从而保证消息的顺序性

RocketMQ事务消息原理

RocketMQ 的事务消息是一种保证消息可靠性的机制。在RocketMQ中,事务消息的实现原理主要是通过两个发送阶段和一个确认阶段来实现的。(个人理解就是分布式事务中的两阶段提交的实现)

  • 发送消息的预处理阶段:在发送事务消息之前,RocketMQ 会将消息的状态设置为 "Preparing”,并将消息存储到消息存储库中。
  • 执行本地事务:当预处理阶段完成后,消息发送者需要执行本地事务,并返回执行结果 (commit 或 rollback)
  • 消息的二次确认阶段:根据本地事务的执行结果,如果是commit,则 RocketMQ 将消息的状态设置为 “Committing”:否则将消息的状态设置为 “Rollback”
  • 完成事务:最后在消息的消费者消费该消息时,RocketMQ 会根据消息的状态来决定是否提交该消息。如果消息的状态是“Commiting”,则直接提交该消息;否则
    忽略该消息。

需要注意的是,如果在消息发送的过程中出现异常或者网络故降等问题,RocketMQ 会触发消息回查机制。在回查过程中,RocketMQ 会调用消息发送方提供的回查接口来确认事务的提交状态,从而解决消息投递的不确定性。
RocketMQ基础API使用以及基本原理探究文章来源地址https://www.toymoban.com/news/detail-490107.html

到了这里,关于RocketMQ基础API使用以及基本原理探究的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 微信小程序的启动和渲染过程(加组件分类和组件的基本使用以及API分类)

    关于微信小程序知识点一共做了六个博客,涵盖大部分内容,有想学习的可以按照以下顺序查看 1.微信小程序的启动和渲染过程(加组件分类和组件的基本使用以及API分类) 2.微信小程序wxml的数据和事件的绑定,以及条件和列表的渲染 3.微信小程序wxss相关介绍、全局配置和tabbar知识

    2024年02月11日
    浏览(61)
  • 【jvm系列-09】垃圾回收底层原理和算法以及JProfiler的基本使用

    JVM系列整体栏目 内容 链接地址 【一】初识虚拟机与java虚拟机 https://blog.csdn.net/zhenghuishengq/article/details/129544460 【二】jvm的类加载子系统以及jclasslib的基本使用 https://blog.csdn.net/zhenghuishengq/article/details/129610963 【三】运行时私有区域之虚拟机栈、程序计数器、本地方法栈 https

    2023年04月22日
    浏览(58)
  • 【深入浅出RocketMQ原理及实战】「消息队列架构分析」帮你梳理RocketMQ或Kafka的选择理由以及二者PK

    前提背景 大家都知道,市面上有许多开源的MQ,例如,RocketMQ、Kafka、RabbitMQ等等,现在Pulsar也开始发光,今天我们谈谈笔者最常用的RocketMQ和Kafka,想必大家早就知道二者之间的特点以及区别,但是在实际场景中,二者的选取有可能会范迷惑,那么今天笔者就带领大家分析一下

    2024年02月19日
    浏览(54)
  • 爬虫入门到精通_基础篇1(爬虫基本原理讲解, Urllib库基本使用)

    发起请求:通过HTTP库向目标站点发起请求,即发送一个Request,请求可以包含额外的headers等信息,等待服务器响应。 获取响应内容:如果服务器能正常响应,会得到一个Response,Response的内容便是所要获取的页面内容,类型可能有HTML,Json字符串,二进制数据(如图片视频)等类型。

    2024年01月23日
    浏览(40)
  • 使用canal+rocketmq实现将mysql数据同步到es

    实际开发过程中,经常遇到数据库与缓存不一致的问题,造成这种问题的原因有很多,其中缓存数据没有及时更新、缓存中过期的数据没有及时更新,导致缓存中存在失效数据,导致数据库与缓存不一致。而这种问题的出现大部分都是因为同步延迟、缓存失效、过期和错误使

    2024年02月11日
    浏览(49)
  • @ControllerAdvice 注解使用及原理探究

    最近在新项目的开发过程中,遇到了个问题,需要将一些异常的业务流程返回给前端,需要提供给前端不同的响应码,前端再在次基础上做提示语言的国际化适配。这些异常流程涉及业务层和控制层的各个地方,如果每个地方都写一些重复代码显得很冗余。 然后查询解决方案

    2024年02月14日
    浏览(60)
  • 【Git】Git(分布式项目管理工具)在Windows本地/命令行中的基本操作以及在gitee中的操作,使用命令行、图形化界面,进行提交,同步,克隆

    介绍 这里是小编成长之路的历程,也是小编的学习之路。希望和各位大佬们一起成长! 以下为小编最喜欢的两句话: 要有最朴素的生活和最遥远的梦想,即使明天天寒地冻,山高水远,路远马亡。 一个人为什么要努力? 我见过最好的答案就是:因为我喜欢的东西都很贵,

    2024年02月04日
    浏览(66)
  • 关于使用RocketMQ搭建多Master多Slave模式(同步)集群时遇到的问题

    1.1) 操作系统: Linux 虚拟机: VMware Workstation 16 Pro 、WSL   Openjdk Version : 11.0.19   使用 RocketMQ 进行 多 Master 多 Slave 模式(同步)集群的搭建 2)集群配置: # nameserver xxx.xxx.xxx.xxx rocketmq-nameserver1 xxx.xxx.xxx.xxx rocketmq-nameserver2  # broker #  在 VMware Workstation 上启动 xxx.xxx.xxx.xxx:10

    2024年02月16日
    浏览(49)
  • rocketmq-console基本使用

    作用:rocketmq-console是rocketmq的一款可视化工具,提供了mq的使用详情等功能。 下载rocketmq组件 rocketmq :下载地址、github地址 下载地址如下图所示: 下载rocketmq-console插件 ​rocketmq-console是一款rocketmq的可视化工具,可在rocketmq-console的可视化界面查看topic信息、消费者组信息、模

    2024年02月11日
    浏览(35)
  • @ControllerAdvice注解使用及原理探究 | 京东物流技术团队

    最近在新项目的开发过程中,遇到了个问题,需要将一些异常的业务流程返回给前端,需要提供给前端不同的响应码,前端再在次基础上做提示语言的国际化适配。这些异常流程涉及业务层和控制层的各个地方,如果每个地方都写一些重复代码显得很冗余。 然后查询解决方案

    2024年02月14日
    浏览(48)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包