RabbitMQ入门实战

这篇具有很好参考价值的文章主要介绍了RabbitMQ入门实战。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

RabbitMQ 是一个开源的消息中间件,实现了高级消息队列协议(AMQP),用于在分布式系统中进行消息传递。它能够在应用之间传递消息,解耦应用组件,提高系统的可伸缩性和可维护性。RabbitMQ 使用高级消息队列协议(AMQP),这是一种开放的、标准化的协议,定义了消息格式、交换方式、队列管理等规范。拥有强大的社区支持,提供了广泛的文档和示例。它还支持插件机制,可以根据实际需求进行扩展。下面就对rabbitMQ进行实战。

官网:RabbitMQ: easy to use, flexible messaging and streaming — RabbitMQ

RabbitMQ入门实战,rabbitmq,分布式 

1. 基本概念

  • 消息队列:消息队列是一种在不同应用之间传递数据的机制。在RabbitMQ中,消息队列是通过Exchange(交换机)来进行消息路由的。
  • Exchange:Exchange负责将消息路由到一个或多个队列。RabbitMQ支持不同类型的Exchange,包括Direct、Fanout、Topic等。
  • Queue:Queue是消息的容器,消息在发送到Exchange后通过Routing Key被路由到一个或多个Queue中。

2. 使用场景

消息中间件经常被用来处理异步、削峰填谷,和多个组件之间进行解耦的作用。

  • 异步任务处理:RabbitMQ可以作为任务队列,将任务发布到队列中,然后由后台工作者异步处理。这在分布式系统中很常见,可以提高系统的响应速度。
  • 事件驱动架构:RabbitMQ可以用于实现事件驱动的架构,不同组件之间通过消息进行通信。当某个事件发生时,可以将消息发送到队列中,由其他组件进行消费。
  • 日志收集:RabbitMQ可以用于日志收集系统,应用程序将日志消息发送到队列中,日志收集器订阅队列并将日志保存到数据库或其他存储中。

3. RabbitMQ实战

3.1 rabbitMQ安装

这里演示使用docker-compose方式安装,创建一个docker-compose.yml文件并写入以下内容

version: '3'

services:
  rabbitmq:
    image: "rabbitmq:management"
    container_name: "rabbitmq-container"
    ports:
      - "5672:5672"    # RabbitMQ 默认端口
      - "15672:15672"  # RabbitMQ 管理界面端口
    volumes:
      - "./rabbitmq-data:/var/lib/rabbitmq"  # 数据文件挂载
    environment:
      RABBITMQ_DEFAULT_USER: "rabbit"
      RABBITMQ_DEFAULT_PASS: "rabbit1qz"

执行命令拉取并启动容器

docker-compose up -d

RabbitMQ入门实战,rabbitmq,分布式 执行命令查看docker容器是否正常

docker ps | grep rabbit

RabbitMQ入门实战,rabbitmq,分布式

 然后地址栏http://localhost:15672/访问rabbitMQ管理界面

RabbitMQ入门实战,rabbitmq,分布式输入用户名密码可以看到如下界面,在这个页面上可以创建Exchanges和Queue,这里就不赘述了,大家想了解的可以参考官方文档。

RabbitMQ入门实战,rabbitmq,分布式 

3.1 代码集成

上面安装完MQ组件之后,就可以用java代码进行连接测试了。使用Maven添加RabbitMQ的Java客户端库到项目里:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.14.0</version> <!-- 替换为最新版本 -->
</dependency>

 文章来源地址https://www.toymoban.com/news/detail-814280.html

3.2 发送消息

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Channel;

public class MessageSender {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost"); // RabbitMQ服务器地址

        // 创建连接
        try (Connection connection = factory.newConnection(); 
             Channel channel = connection.createChannel()) {
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);

            // 发送消息
            String message = "Hello, RabbitMQ!";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}

 

3.3 接收消息

import com.rabbitmq.client.*;

public class MessageReceiver {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost"); // RabbitMQ服务器地址

        // 创建连接
        try (Connection connection = factory.newConnection(); 
             Channel channel = connection.createChannel()) {
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);

            // 定义消息处理器
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            };

            // 监听队列,接收消息
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
            
            // 持续监听队列,不会退出
            System.out.println(" [*] Waiting for messages. To exit press Ctrl+C");
            Thread.sleep(Long.MAX_VALUE);
        }
    }
}

 

以上示例中,MessageSender类用于发送消息到名为"hello"的队列,而MessageReceiver类用于监听该队列并接收消息。

4. RabbitMQ高级特性

RabbitMQ 提供了许多高级特性,包括持久化、消息确认、事务、死信队列等。下面将结合 Java 完整代码进行说明这些高级特性。

1. 持久化

持久化确保在 RabbitMQ 服务器重启时,队列和消息不会丢失。

代码示例

// 发布者代码
public class DurableProducer {
    // ...初始化 RabbitMQ 连接等代码...

    public void publishPersistentMessage(String message) {
        channel.basicPublish("", "durable_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");
    }
}

// 订阅者代码
public class DurableSubscriber {
    // ...初始化 RabbitMQ 连接等代码...

    public void subscribeToPersistentMessages() {
        channel.queueDeclare("durable_queue", true, false, false, null);

        channel.basicConsume("durable_queue", true, (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
            // 处理消息的逻辑...
        }, consumerTag -> {});
    }
}

 

2. 消息确认

消息确认确保消息已经被消费者成功处理。

代码示例

// 发布者代码
public class AckProducer {
    // ...初始化 RabbitMQ 连接等代码...

    public void publishAckMessage(String message) {
        channel.basicPublish("", "ack_queue", null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");
    }
}

// 订阅者代码
public class AckSubscriber {
    // ...初始化 RabbitMQ 连接等代码...

    public void subscribeToAckMessages() {
        channel.queueDeclare("ack_queue", false, false, false, null);

        channel.basicConsume("ack_queue", false, (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
            // 处理消息的逻辑...

            // 手动发送消息确认
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }, consumerTag -> {});
    }
}

 

3. 事务

RabbitMQ 支持事务,但由于性能问题,通常建议使用消息确认代替。

代码示例

// 发布者代码
public class TransactionalProducer {
    // ...初始化 RabbitMQ 连接等代码...

    public void publishTransactionalMessage(String message) throws IOException {
        try {
            channel.txSelect(); // 开启事务
            channel.basicPublish("", "transactional_queue", null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
            channel.txCommit(); // 提交事务
        } catch (IOException e) {
            channel.txRollback(); // 回滚事务
            e.printStackTrace();
        }
    }
}

// 订阅者代码
public class TransactionalSubscriber {
    // ...初始化 RabbitMQ 连接等代码...

    public void subscribeToTransactionalMessages() {
        try {
            channel.queueDeclare("transactional_queue", false, false, false, null);

            while (true) {
                channel.txSelect(); // 开启事务
                GetResponse response = channel.basicGet("transactional_queue", true);
                if (response != null) {
                    String message = new String(response.getBody(), "UTF-8");
                    System.out.println(" [x] Received '" + message + "'");
                    // 处理消息的逻辑...
                    channel.txCommit(); // 提交事务
                } else {
                    channel.txRollback(); // 回滚事务
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

 

4. 死信队列

死信队列用于处理无法被消费者成功处理的消息。

代码示例

// 发布者代码
public class DeadLetterProducer {
    // ...初始化 RabbitMQ 连接等代码...

    public void publishDeadLetterMessage(String message) {
        Map<String, Object> headers = new HashMap<>();
        headers.put("x-dead-letter-exchange", "dead_letter_exchange");
        headers.put("x-dead-letter-routing-key", "dl_queue");

        AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                .headers(headers)
                .build();

        channel.basicPublish("", "original_queue", properties, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");
    }
}

// 订阅者代码
public class DeadLetterSubscriber {
    // ...初始化 RabbitMQ 连接等代码...

    public void subscribeToDeadLetterMessages() {
        channel.exchangeDeclare("dead_letter_exchange", BuiltinExchangeType.DIRECT);
        channel.queueDeclare("dl_queue", false, false, false, null);
        channel.queueBind("dl_queue", "dead_letter_exchange", "");

        channel.queueDeclare("original_queue", false, false, false, null);
        channel.queueBind("original_queue", "", "original_queue");

        channel.basicConsume("original_queue", false, (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
            // 处理消息的逻辑...

            // 模拟处理失败,将消息发送到死信队列
            channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
        }, consumerTag -> {});
    }
}

 

这些是 RabbitMQ 的一些高级特性的简单示例。在实际项目中,具体的实现可能会更加复杂,并需要根据场景进行适当的调整。

5. 总结

RabbitMQ作为一款强大的消息中间件,在异步任务处理、事件驱动架构以及日志收集等场景中都有广泛的应用。通过简单的代码实例,我们了解了RabbitMQ的基本概念以及如何在Java中使用RabbitMQ进行消息的发送和接收。

希望本文能够帮助大家入门RabbitMQ,并在实际项目中灵活应用消息队列的机制。

 

到了这里,关于RabbitMQ入门实战的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Python爬虫分布式架构 - Redis/RabbitMQ工作流程介绍

    在大规模数据采集和处理任务中,使用分布式架构可以提高效率和可扩展性。本文将介绍Python爬虫分布式架构中常用的消息队列工具Redis和RabbitMQ的工作流程,帮助你理解分布式爬虫的原理和应用。 为什么需要分布式架构? 在数据采集任务中,单机爬虫可能面临性能瓶颈和资

    2024年02月11日
    浏览(45)
  • 分布式消息队列:Kafka vs RabbitMQ vs ActiveMQ

    在现代分布式系统中,消息队列是一种常见的异步通信模式,它可以帮助系统处理高并发、高可用性以及容错等问题。在这篇文章中,我们将深入探讨三种流行的分布式消息队列:Apache Kafka、RabbitMQ和ActiveMQ。我们将讨论它们的核心概念、算法原理、特点以及使用场景。 随着

    2024年02月02日
    浏览(63)
  • 微服务学习:SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式

    目录 一、高级篇 二、面试篇 ==============实用篇============== day05-Elasticsearch01 1.初识elasticsearch 1.4.安装es、kibana 1.4.1.部署单点es 1.4.2.部署kibana 1.4.3.安装IK分词器 1.4.4.总结 2.索引库操作 2.1.mapping映射属性 2.2.索引库的CRUD 2.2.1.创建索引库和映射 2.2.2.查询索引库 2.2.3.修改索引库 2.

    2024年02月02日
    浏览(59)
  • (黑马出品_07)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式

    [此文档是在心向阳光的天域的博客加了一些有助于自己的知识体系,也欢迎大家关注这个大佬的博客](https://blog.csdn.net/sinat_38316216/category_12263516.html) [是这个视频](https://www.bilibili.com/video/BV1LQ4y127n4/?p=5spm_id_from=pageDrivervd_source=9beb0a2f0cec6f01c2433a881b54152c) 聚合 可以让我们极其方便

    2024年03月12日
    浏览(56)
  • 分布式搜索引擎(Elastic Search)+消息队列(RabbitMQ)部署(商城4)

    1、全文搜索 Elastic search可以用于实现全文搜索功能,例如商城中对商品搜索、搜索、分类搜索、订单搜索、客户搜索等。它支持复杂的查询语句、中文分词、近似搜索等功能,可以快速地搜索并返回匹配的结果。 2、日志分析 Elastic search可以用于实现实时日志分析,例

    2024年02月04日
    浏览(51)
  • 分布式 SpringCloudAlibaba、Feign与RabbitMQ实现MySQL到ES数据同步

    本文参考黑马 分布式Elastic search Elasticsearch是一款非常强大的开源搜索引擎,具备非常多强大功能,可以帮助我们从海量数据中快速找到需要的内容 同步调用 方案一:同步调用 基本步骤如下: hotel-demo对外提供接口,用来修改elasticsearch中的数据 酒店管理服务在完成数据库操

    2024年04月11日
    浏览(46)
  • (黑马出品_高级篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式

    [此文档是在心向阳光的天域的博客加了一些有助于自己的知识体系,也欢迎大家关注这个大佬的博客](https://blog.csdn.net/sinat_38316216/category_12263516.html) [是这个视频](https://www.bilibili.com/video/BV1LQ4y127n4/?p=5spm_id_from=pageDrivervd_source=9beb0a2f0cec6f01c2433a881b54152c) 消息队列在使用过程中,面

    2024年03月19日
    浏览(48)
  • 【103期】RabbitMQ 实现多系统间的分布式事务,保证数据一致性

    org.springframework.boot spring-boot-starter-amqp mysql mysql-connector-java runtime org.projectlombok lombok true org.springframework.boot spring-boot-starter-jdbc com.alibaba fastjson 1.2.17 3.2.1.2配置文件内容: server: port: 8080 spring: datasource: driver-class-name: com.mysql.cj.jdbc.Driver url: jdbc:mysql://localhost:3306/test?useUnicode=tru

    2024年04月14日
    浏览(65)
  • SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,系统详解springcloud微服务技术栈

    我们发现在微服务中有一个令人头疼的问题——部署,用Docker去解决这个部署难题 1、项目部署的问题 2、Docker 扔到一台机器上,它们的依赖难道没有干扰吗?不会,docker将打包好的程序放到一个隔离容器去运行,使用沙箱机制,避免互相干扰,之间不可见,这样就解决了混

    2023年04月24日
    浏览(46)
  • RabbitMQ:高效传递消息的魔法棒,一篇带你助力构建可靠的分布式系统(上篇)

    MQ是消息队列( Message Queue )的缩写,是一种在应用程序之间传递消息的技术。通常用于 分布式系统 或 异步通信 中,其中 发送者 将消息放入队列,而 接收者 从队列中获取消息。 这种异步通信模式允许发送者和接收者在不需要实时连接的情况下进行通信,从而提高了应用

    2024年02月15日
    浏览(48)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包