消息中间件-RocketMQ

这篇具有很好参考价值的文章主要介绍了消息中间件-RocketMQ。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

        RocketMQ是阿里巴巴开源的消息分布中间件,在阿里内部使用非常更广泛,已经经过了“双11”这种万亿级的应用场景考验。

1.安装

        下载地址:http://rocketmq.apache.org/release_notes/release-notes-4.4.0/

        下载完成后解压缩安装包到指定目录。

2.配置

           (1)环境变量:ROCKETMQ_HOME

              消息中间件-RocketMQ,rocketmq,java,java-rocketmq

           (2)Path

              消息中间件-RocketMQ,rocketmq,java,java-rocketmq

3.启动

          切换到当前下载RocketMQ的bin目录下

        (1)启动NameServer ---> start mqnamesrv.cmd

消息中间件-RocketMQ,rocketmq,java,java-rocketmq

              消息中间件-RocketMQ,rocketmq,java,java-rocketmq

        (2)启动Broker --->   start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true

消息中间件-RocketMQ,rocketmq,java,java-rocketmq

4.RocketMQ的架构及概念

        看下面这张图

        消息中间件-RocketMQ,rocketmq,java,java-rocketmq

 文章来源地址https://www.toymoban.com/news/detail-708056.html

        RocketMQ整体分为4个角色,NameServer、Broker、Producer、Consumer、

        Broker:为RocketMQ的核心,负责消息的接受,存储,投递等功能。

        NameServer:消息队列的协商者,Broker向它注册路由信息,同时Producer和Consumer向其获取路由信息。

        Producer:消息的生产者,需要从NameServer获取Borker信息,然后与Broker建立连接,向Broker发送消息。

        Consumer:消息的消费者,需要从NameServer获取Broker信息,然后与Broker建立连接,从Broker获取消息。

        另外还包括别的组件,

        Topic:用来区分不同的消息类型,发送和接收消息前都要先创建Topic,针对Topic来发送和接收消息。

        Message Queue:消息队列,一个Topic可以设置一个或多个MessageQueue,这样消息就可以并行往各个Message Queue发送消息,消费者也可以并行的从多个Message Queue读取消息,提高性能和吞吐量。

        Message:消息的载体。

5.消息发送和接收(应用)

       搭建好SpringBoot项目后,先导入RocketMQ所需的依赖:

 <!--MQ-->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.4.0</version>
        </dependency>

5.1 同步发送        

        同步发送方式比较可靠,应用也比较广泛,比如:重要的消息通知,短信通知。

消息发送方:

//发送同步消息
public class RocketMQSendTest1 {
    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
        //1.创建消息生产者,指定生产者所属的组名
        DefaultMQProducer producer = new DefaultMQProducer("myproducer-group");
        //2.指定Nameserver地址
        producer.setNamesrvAddr("127.0.0.1:9876");
        //3.启动生产者
        producer.start();
        for (int i = 0; i < 10; i++) {
            Message message = new Message("myTopic", "myTag", ("十行代码九个错误八个警告竟敢说七日精通六天学会五湖四海也不见如此三心二意之程序简直一等下流" + i).getBytes());
            //发送消息
            SendResult sendResult = producer.send(message);
            System.out.println(sendResult);
        }

        //关闭生产者
        producer.shutdown();
    }
}

消息接收方:

//接收消息
public class RocketMQReceiveTest1 {
    public static void main(String[] args) throws MQClientException {
        //创建消息消费者。指定消费者所属的组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myconsumer-group");
        //指定Nameserver地址
        consumer.setNamesrvAddr("127.0.0.1:9876");
        //指定消费者订阅的主题和标签
        consumer.subscribe("myTopic", "*");

        //设置回调函数,编写处理消息的方法
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                System.out.println("获取到的消费数据:" + list);
                System.out.println(new String(list.get(0).getBody()));
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        //启动消息消费者
        consumer.start();
        System.out.println("Consumer Starting.");
    }
}

        启动时,切记先启动消费者(接收方),再启动生产者(发送方)。

        启动测试,同步发送消息结果: 

        消息中间件-RocketMQ,rocketmq,java,java-rocketmq

5.2 异步消息

        异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。

发送方:

//发送异步消息
//异步消息比较浪费性能,经常会失败,所以多发送几次并且让线程休眠几秒
public class RocketMQSendTest2 {

    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException {
        //1.创建消息生产者,指定生产者所属的组名
        DefaultMQProducer producer = new DefaultMQProducer("myproducer-group");
        //2.设置NameServer地址
        producer.setNamesrvAddr("127.0.0.1:9876");
        //3.启动生产者
        producer.start();
        for (int i = 0; i < 10; i++) {
            //4.创建消息对象,制定主题、标签、消息体
            Message message = new Message("myTopic", "myTag2", ("十行代码九个错误八个警告竟敢说七日精通六天学会五湖四海也不见如此三心二意之程序简直一等下流").getBytes());
            //5.发送消息
            producer.send(message, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.println("发送成功:" + sendResult);
                }

                @Override
                public void onException(Throwable e) {
                    System.out.println("发送异常:" + e);
                }
            });

            //休眠
            TimeUnit.SECONDS.sleep(3);
        }

        //6.关闭生产者
        producer.shutdown();

    }
}

接收方:

public class RocketMQReceiveTest2 {
    public static void main(String[] args) throws MQClientException {
        //1.创建消息消费者,指定消费者所属的组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myconsumer-group");

        //2.指定Nameserver地址
        consumer.setNamesrvAddr("127.0.0.1:9876");
        //3.指定消费者订阅的主题和标签
        consumer.subscribe("myTopic", "*");
        consumer.setMessageModel(MessageModel.BROADCASTING);
        //4.设置回调函数,编写处理请求的方法
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {

                System.out.println(new String(list.get(0).getBody()));
                //返回消费状态
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //5.启动消息消费者
        consumer.start();
        System.out.println("Consumer Starting.");

    }
}

启动测试,异步发送消息结果: 

消息中间件-RocketMQ,rocketmq,java,java-rocketmq

5.3 单行发送消息

      该方式用在不关注发送结果的场景,比如日志发送。

//单行发送消息
public class RocketMQSendTest3 {
    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
        DefaultMQProducer producer = new DefaultMQProducer("myproducer-group");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        for (int i = 0; i < 10; i++) {
            Message message = new Message("myTopic", "myTag3", ("芜湖").getBytes());
            producer.sendOneway(message);

            TimeUnit.SECONDS.sleep(3);
        }
        producer.shutdown();
    }

}

启动两个消费者接收10条消息,结果如下:

消息中间件-RocketMQ,rocketmq,java,java-rocketmq

消息中间件-RocketMQ,rocketmq,java,java-rocketmq

存在消息丢失的情况。

以上就是对RocketMQ的初步认识啦!

 

到了这里,关于消息中间件-RocketMQ的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 消息中间件之RocketMQ源码分析(十)

    启动命令 nohup ./bin/mqnamesrv -c ./conf/namesrv.conf dev/null 21 通过脚本配置启动基本参数,比如配置文件路径、JVM参数,调用NamesrvStartup.main()方法,解析命令行的参数,将处理好的参数转化为Java实例,传递给NamesrvController实例 加载命令行传递的配置参数,调用controller.initialize()方法初

    2024年02月20日
    浏览(53)
  • 【消息中间件】RocketMQ消息重复消费场景及解决办法

    消息重复消费是各个MQ都会发生的常见问题之一,在一些比较敏感的场景下,重复消费会造成比较严重的后果,比如重复扣款等。 当系统的调用链路比较长的时候,比如系统A调用系统B,系统B再把消息发送到RocketMQ中,在系统A调用系统B的时候,如果系统B处理成功,但是迟迟

    2024年02月05日
    浏览(44)
  • 分布式消息中间件RocketMQ的应用

    所有代码同步至GitCode:https://gitcode.net/ruozhuliufeng/test-rocketmq.git 普通消息 消息发送分类 ​ Producer对于消息的发送方式也有多种选择,不同的方式会产生不同的系统效果。 同步发送消息 ​ 同步发送消息是指,Producer发出一条消息后,会在收到MQ返回的ACK之后才发下一条消息。

    2024年02月05日
    浏览(82)
  • 【消息中间件】详解三大MQ:RabbitMQ、RocketMQ、Kafka

    作者简介 前言 博主之前写过一个完整的MQ系列,包含RabbitMQ、RocketMQ、Kafka,从安装使用到底层机制、原理。专栏地址: https://blog.csdn.net/joker_zjn/category_12142400.html?spm=1001.2014.3001.5482 本文是该系列的清单综述,会拉通来聊一下三大MQ的特点和各种适合的场景。 目录 1.概述 1.1.M

    2024年02月09日
    浏览(50)
  • ActiveMQ、RabbitMQ、Kafka、RocketMQ消息中间件技术选型

    消息中间件是分布式系统中重要的组件之一,用于实现异步通信、解耦系统、提高系统可靠性和扩展性。在做消息中间件技术选型时,需要考虑多个因素,包括可靠性、性能、可扩展性、功能丰富性、社区支持和成本等。本文将五种流行的消息中间件技术:ActiveMQ、RabbitMQ、

    2024年02月11日
    浏览(47)
  • SpringBoot整合消息中间件(ActiveMQ,RabbitMQ,RocketMQ,Kafka)

    消息的发送方:生产者 消息的接收方:消费者 同步消息:发送方发送消息到接收方,接收方有所回应后才能够进行下一次的消息发送 异步消息:不需要接收方回应就可以进行下一步的发送 什么是消息队列? 当此时有很多个用户同时访问服务器,需要服务器进行操作,但此

    2024年04月27日
    浏览(48)
  • 消息中间件(MQ)对比:RabbitMQ、Kafka、ActiveMQ 和 RocketMQ

    前言 在构建分布式系统时,选择适合的消息中间件是至关重要的决策。RabbitMQ、Kafka、ActiveMQ 和 RocketMQ 是当前流行的消息中间件之一,它们各自具有独特的特点和适用场景。本文将对这四种消息中间件进行综合比较,帮助您在项目中作出明智的选择。 1. RabbitMQ 特点: 消息模

    2024年02月20日
    浏览(48)
  • 【Alibaba中间件技术系列】「RocketMQ技术专题」RocketMQ消息发送的全部流程和落盘原理分析

    RocketMQ目前在国内应该是比较流行的MQ 了,目前本人也在公司的项目中进行使用和研究,借着这个机会,分析一下RocketMQ 发送一条消息到存储一条消息的过程,这样会对以后大家分析和研究RocketMQ相关的问题有一定的帮助。 分析的总体技术范围发送到存储,本文的主要目的是

    2024年02月10日
    浏览(43)
  • 【Java中间件】RocketMQ

    Message Queue,是一种提供消息队列服务的中间件。提供了消息生产、存储、消费全过程API的软件系统。 MQ的作用 限流削峰:当用户发送超量请求时,将请求暂存,以便后期慢慢处理。如果不使用MQ暂存直接请求到业务系统中容易引起系统崩溃。 异步解耦:若上游系统和下游系

    2024年02月15日
    浏览(41)
  • Linux系统下消息中间件RocketMQ下载、安装、搭建、配置、控制台rocketmq-dashboard的安装保姆级教程 rocketmq ui

    这里给出我使用的 RocketMQ 版本(5.1.3)、RocketMQ-Dashboard 版本的百度网盘链接: 链接:https://pan.baidu.com/s/1HaKBBDGWZ0WKLGgVwIG9pw 提取码:1234 1、注意:有两种资源下载:Source表示源码、Binary是二进制包(我们下载这个):二进制包是已经编译完成后可以直接运行的,源码包是需要

    2024年02月12日
    浏览(57)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包