RocketMQ的简单使用

这篇具有很好参考价值的文章主要介绍了RocketMQ的简单使用。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

大家好,我是Leo!今天来和大家分享RocketMQ的一些用法。

领域模型介绍

RocketMQ的简单使用

Producer: 用于生产消息的运行实体。

Topic: 主题,用于消息传输和存储的分组容器。

MessageQueue: 消息传输和存储的实际单元容器。

Message: 消息传输的最小单元。

ConsumerGroup: 消费者组。

Consumer: 消费者。

Subscription: 订阅关系,发布订阅模式中消息过滤、重试、消费进度的规则配置。

MQ的优势

MQ的明显优势有3个。

应用解耦: 以多服务为例,用户下单,需要通知订单服务和库存服务,我们可以通过MQ消息来解除下单和库存系统的耦合。

异步提速: 以秒杀为例,我们可以先返回秒杀结果,后续再通过MQ异步消息去插入记录和扣减库存等,减少调用的链路长度。

削峰填谷: 将某一时间内的请求量分摊到更多时间处理,比如系统A一秒只能处理10000个请求,但是我有100000个请求需要处理,我可以将请求发到MQ中,再分成10秒去消费这些请求。

当然MQ也有劣势系统可用性降低系统复杂度提高一致性问题

RocketMQ的主要角色

主要包括Producer、Broker、Consumer、NameServer Cluster。

RocketMQ的简单使用

一对多

可以通过设置不同的消费者组

不同组通过不同的消费者组既可以实现同时收到一样数量的消息,那同一个消费者组需要怎样才能收到同样数量的消息呢?

// 消费者消费模式
consumer.setMessageModel(MessageModel.BROADCASTING);

默认是集群模式CLUSTERING,设置成广播模式

既可以实现一对多的发送。

同步消息(普通消息)

同步消息需要阻塞等待消息发送结果的返回

public class ProducerDemo {

    public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {

        DefaultMQProducer producer = new DefaultMQProducer("group1");

        producer.setNamesrvAddr("localhost:9876");
        producer.start();
        Message message = new Message();
        message.setTopic("MQLearn");
        message.setTags("1.0.0");
        message.setBody("Hello MQ!".getBytes(StandardCharsets.UTF_8));
        SendResult result = producer.send(message);
        if (result.getSendStatus().equals(SendStatus.SEND_OK)) {
            System.out.println(result);
            System.out.println("发送成功:" + message);
        }

        producer.shutdown();
    }
}

异步消息

异步消息需要实现发送成功和失败的回调函数。

public class Producer {

    public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {

        DefaultMQProducer producer = new DefaultMQProducer("group1");

        producer.setNamesrvAddr("192.168.246.140:9876");
        producer.start();
        // 异步消息
        for (int i = 0; i < 10; i++) {
            Message message = new Message();
            message.setTopic("topic7");
            message.setTags("1.0.0");

            message.setBody(("Hello World !" + i).getBytes(StandardCharsets.UTF_8));
            producer.send(message, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    // 发送成功的回调方法
                    System.out.println(sendResult);
                }

                @Override
                public void onException(Throwable e) {
                    // 发送失败的回调方法
                    System.out.println(e);
                }
            });
        }
        TimeUnit.SECONDS.sleep(10);
        System.out.println("异步发送完成!");
    }
}

单向消息

单向消息就类似UDP,只顾单向发送,不管是否发送成功,常用于日志收集等场景。

public class SingleDirectionProducer {

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("group1");

        producer.setNamesrvAddr("localhost:9876");
        producer.start();
        // 单向消息
        for (int i = 0; i < 10; i++) {
            Message message = new Message();
            message.setTopic("topic8");
            message.setTags("1.0.0");
            message.setBody(("Hello World !" + i).getBytes(StandardCharsets.UTF_8));
            producer.sendOneway(message);
        }
        System.out.println("带向发送完成!");
    }
}

延时(定时)消息

RocketMQ提供的定时消息并不能指定在什么时间点去投递消息。而是根据设定的等待时间,起到延时到达的缓冲作用在RocketMQ中,延时消息的delayTimeLevel支持以下级别:

1 1s 2 5s 3 10s 4 30s 5 1m 6 2m 7 3m 8 4m 9 5m 10 6m 11 7m 12 8m 13 9m 14 10m 15 20m 16 30m 17 1h 18 2h

// 设置消息延时级别
message.setDelayTimeLevel(3);

批量消息

批量消息支持一次发送多条消息。

注意:

  • 批量消息需要有相同的topic

  • 不能是延时消息

  • 消息内容不能超过4M,可以通过producer.setMaxMessageSize()和broker进行设置设置(可以通过拆分多次发送)

public class BatchProducer {

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("group1");

        producer.setNamesrvAddr("localhost:9876");
        producer.start();
        List<Message> list = new ArrayList<>();
        // 批量消息
        for (int i = 0; i < 10; i++) {
            Message message = new Message();
            message.setTopic("topic10");
            message.setTags("1.0.0");
            message.setBody(("Hello World !" + i).getBytes(StandardCharsets.UTF_8));
            list.add(message);
        }
        SendResult result = producer.send(list);
        System.out.println(result);
        TimeUnit.SECONDS.sleep(2);
        System.out.println("发送完成!");
    }
}

顺序消息

顺序消息支持按照消息的发送消息先后获取消息。

比如:我的一笔订单有多个流程需要处理,比如创建->付款->推送->完成。

通过同一笔订单放到一个队列中,这样就可以解决消费的无序问题。

通过实现MessageQueueSelector来选择一个队列。

public class Producer {

    public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {

        DefaultMQProducer producer = new DefaultMQProducer("group1");

        producer.setNamesrvAddr("localhost:9876");
        producer.start();

            Message message = new Message();
            // 模拟业务ID
            int step = 10;
            message.setTopic("topic12");
            message.setTags("1.0.0");
            message.setBody(("Hello World !").getBytes(StandardCharsets.UTF_8));
            producer.send(message, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    // 队列数
                    int size = mqs.size();
                    // 取模
                    int orderId = step;
                    return mqs.get(orderId % size);
                }
            }, null);

        System.out.println("发送完成!");
    }
}
public class Consumer {

    public static void main(String[] args) throws Exception{

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
        consumer.setConsumerGroup("group1");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.subscribe("topic12", "*");
        // 消费者,起一个顺序监听,一个线程,只监听一个队列
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println(msg);
                    byte[] body = msg.getBody();
                    System.out.println(new String(body));
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        consumer.start();
        System.out.println("消费者启动了!");

    }
}

事务消息

RocketMQ中的事务消息支持在分布式场景下消息生产和本地事务的最终一致性。

RocketMQ的简单使用

大致流程为,

  1. 生产者先将消息发送至RocketMQ。

  2. RocketMQBroker将消息持久化成功后,向生产者返回ACK消息确认已经返回成功,消息状态为暂时不能投递状态。

  3. 执行本地事务逻辑。

  4. 生产者根据事务执行结果向Broker提交commit或者rollback结果。

  5. 如果在断网或者重启情况下,未收到4的结果,或者返回Unknown未知状态,在固定时间对消息进行回查。

  6. 生产者收到消息回查后,需要本地事务执行的最终结果。

  7. 生产者对本地事务状态进行二次提交或确认。

public class Producer {

    public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
        TransactionMQProducer producer = new TransactionMQProducer("group1");
        producer.setNamesrvAddr("localhost:9876");
        // 设置事务监听
        producer.setTransactionListener(new TransactionListener() {
            // 正常事务监听
            @Override
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                // 把消息保存到mysql数据库
                boolean ok = false;

                if (ok) {
                    System.out.println("正常执行事务过程");
                    return LocalTransactionState.COMMIT_MESSAGE;
                } else {
                    System.out.println("事务补偿过程");
                    return LocalTransactionState.UNKNOW;
                    //return LocalTransactionState.ROLLBACK_MESSAGE;
                }

            }
            // 事务补偿事务
            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                System.out.println("事务补偿过程");
                // sql select
                if (true) {

                } else {

                }
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        });
        producer.start();
        String msg = "Hello Transaction Message!";
        Message message = new Message("topic13", "tag", msg.getBytes(StandardCharsets.UTF_8));
        TransactionSendResult transactionSendResult = producer.sendMessageInTransaction(message, null);
        TimeUnit.SECONDS.sleep(2);
        System.out.println(transactionSendResult);
        System.out.println("发送完成!");
    }
}

消息的过滤

在RocketMQ中的消息过滤功能能通过生产者和消费者对消息的属性和Tag进行定义,在消费端可以根据过滤条件进行筛选匹配,将符合条件的消息投递给消费者进行消费。

支持两种方式:Tag标签过滤和SQL属性过滤。

Message message = new Message();
message.setTopic("topic11");
message.setTags("tag");
message.setBody(("Hello World !" + "tag").getBytes(StandardCharsets.UTF_8));
message.putUserProperty("name", "zhangsan");
message.putUserProperty("age", "16");
SendResult result = producer.send(message);

subscribe方法subExpression参数也支持Tag过滤

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
consumer.setConsumerGroup("group1");
consumer.setNamesrvAddr("112.74.125.184:9876");
consumer.subscribe("topic11", MessageSelector.bySql("age > 16"));

SpringBoot整合RocketMQ的使用

在SpringBoot项目中主要通过RocketMQTemplate进行消息的发送。

// 普通消息
rocketMQTemplate.convertAndSend("topic10", user);
rocketMQTemplate.send("topic10", MessageBuilder.withPayload(user).
SendResult result = rocketMQTemplate.syncSend("topic10", user);
// 异步消息
rocketMQTemplate.asyncSend("topic10", user, new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {
        System.out.println("成功!");
    }
    @Override
    public void onException(Throwable e) {
        System.out.println(e);
    }
}, 1000L);
// 单向消息
rocketMQTemplate.sendOneWay("topic10", user);
// 延时消息
rocketMQTemplate.syncSend("topic10", MessageBuilder.withPayload(user).build(), 2000L, 3); 
// 批量消息
rocketMQTemplate.syncSend("topic10", list, 1000);

消费者:在注解中可以实现根据Tag和SQL进行属性的过滤。

@Service
//@RocketMQMessageListener(
//        consumerGroup = "group1",
//        topic = "topic10",
//        selectorExpression = "tag1 || tag2"
//)
@RocketMQMessageListener(
        consumerGroup = "group1",
        topic = "topic10",
        selectorType = SelectorType.SQL92,
        selectorExpression = "age > 16",
        messageModel = MessageModel.BROADCASTING
)
public class UserConsumer implements RocketMQListener<User> {


    @Override
    public void onMessage(User message) {

    }
}

总结

今天主要分享了一下RocketMQ的一些基础使用,包括各种类型的消息的使用,偏向于代码实现部分,对于原理篇没有过多涉及。文章来源地址https://www.toymoban.com/news/detail-431549.html

到了这里,关于RocketMQ的简单使用的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 今天跟大家好好介绍一下接口工具(jmeter、postman、swagger等)

    一、接口都有哪些类型? 接口一般分为两种:1.程序内部的接口 2.系统对外的接口 系统对外的接口:比如你要从别的网站或服务器上获取资源或信息,别人肯定不会把 数据库共享给你,他只能给你提供一个他们写好的方法来获取数据,你引用他提供的接口就能使用他写好的

    2024年02月05日
    浏览(51)
  • 今天给大家介绍一篇基于springboot的医院管理系统的设计与实现

    临近学期结束,你还在做java程序网络编程,期末作业,老师的作业要求觉得大了吗?不知道毕业设计该怎么办?网页功能的数量是否太多?没有合适的类型或系统?等等。这里根据疫情当下,你想解决的问题,今天给大家介绍一篇基于springboot的医院管理系统的设计与实现。 随着科

    2023年04月14日
    浏览(76)
  • 免费分享一些Open Ai的key,供大家使用

    谁创造了ChatGPT? ChatGPT由人工智能和研究公司OpenAI创建。该公司于2022年11月30日发布推出了ChatGP OpenAI还负责创建流行的画图模型DALL-E和自动语音识别系统Whisper。 ChatGPT官方的运营成本大概是多少? GPT-3 训练一次的成本约为 140 万美元。 对于一些更大的 LLM(大型语言模型),

    2024年02月05日
    浏览(79)
  • 怎么在树莓派环境上搭建web网站,并发布到外网可访问,今天教给大家

    这非常适合设置您的第一个网站,不仅可以学习管理 wordpress 站点,还可以学习 Linux。您将需要一个树莓派(Raspberry Pi)、几个小时和一台计算机来下载映像。 树莓派(RPI) 是学习这些东西的完美设备。 下载适用于您的操作系统的树莓派镜像工具。它支持大多数操作系统(Window

    2024年02月14日
    浏览(41)
  • 免费使用微软Azure进行文字转语音!分享三个方法给大家!

    作为一个短视频制作的爱好者,使用了非常多的文字转语音工具,其中最接近真人发声的还是微软的文字转语音工具,这也是目前非常多的自媒体制作者在使用的文字转语音工具,目前有21种发声角色可以选择,而且可以自己挑整语速以及音调等功能,所以今天就给大家分享

    2024年02月01日
    浏览(48)
  • 【ChatGPT】程序员的半个老师,今天简单讲讲使用心得

    程序员有个玩笑叫做复制黏贴,原来的百度谷歌只能是碎片层面,有了chatGPT,可以实现更深层次的复制黏贴,直接帮你写好整个方法和实现步骤,不再是简单的碎片。 当然他只能做到教材级的解答,业务层面做不到,如果连业务层面都做到了,那程序员就真失业了!! 程序

    2023年04月20日
    浏览(50)
  • RocketMQ的简单使用

    大家好,我是Leo!今天来和大家分享RocketMQ的一些用法。 Producer: 用于生产消息的运行实体。 Topic: 主题,用于消息传输和存储的分组容器。 MessageQueue: 消息传输和存储的实际单元容器。 Message: 消息传输的最小单元。 ConsumerGroup: 消费者组。 Consumer: 消费者。 Subscription: 订阅关系

    2024年02月02日
    浏览(13)
  • 8年测试工程师分享,我是怎么开展性能测试的(基础篇)

    性能测试的工作是基于系统功能已经完备或者已经趋于完备之上的,在功能还不够完备的情况下没有多大的意义(后期功能完善上会对系统的性能有影响,过早进入性能测试会出现测试结果不准确、浪费测试资源);因此,性能测试首先是基于功能测试的,你必须了解其功能

    2024年02月05日
    浏览(42)
  • 分享一个403界面给大家

    先看效果图(说明:小鬼影会飘来飘去,长时间停留会有小惊喜,具体大家跑一下就知道): 代码如下: PS:发现我用文字写太生硬了,干的噎嗓子,干脆在代码里加注释了。

    2024年02月06日
    浏览(45)
  • 分享一个500页面给大家

    先看效果: 再看代码:

    2024年02月06日
    浏览(52)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包