Java RabbitMQ消息队列简单使用

这篇具有很好参考价值的文章主要介绍了Java RabbitMQ消息队列简单使用。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、消息队列

什么是消息队列

消息队列,即MQ,Message Queue。

java rabbitmq发送消息,Java,中间件,java-rabbitmq,rabbitmq,java

消息队列是典型的:生产者、消费者模型。生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,这样就实现了生产者和消费者的解耦。

二、RabbitMQ

RabbitMQ是基于AMQP的一款消息管理系统。

支持主流的操作系统,Linux、Windows、MacOX等。

支持多种开发语言,Java、Python、Ruby、.NET、PHP、C/C++、node.js等。

官网: Messaging that just works — RabbitMQ

官方教程:RabbitMQ Tutorials — RabbitMQ

RabbitMQ 基本概念

java rabbitmq发送消息,Java,中间件,java-rabbitmq,rabbitmq,java

Message
消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。

Publisher
消息的生产者,也是一个向交换器发布消息的客户端应用程序。

Exchange
交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。

Binding
绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。

Queue
消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

Connection
网络连接,比如一个TCP连接。无论生产者还是消费者,都需要与RabbitMQ建立连接后才可以完成消息的生产和消费。

Channel
信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。

Consumer
消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。

Virtual Host
虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。

Broker
表示消息队列服务器实体。

 三、简单消息模型使用

RabbitMQ是一个消息代理:它接受和转发消息。 你可以把它想象成一个邮局:当你把邮件放在邮箱里时,你可以确定邮差先生最终会把邮件发送给你的收件人。 在这个比喻中,RabbitMQ是邮政信箱,邮局和邮递员。

RabbitMQ与邮局的主要区别是它不处理纸张,而是接受,存储和转发数据消息的二进制数据块。

java rabbitmq发送消息,Java,中间件,java-rabbitmq,rabbitmq,java

P(producer/ publisher):生产者,一个发送消息的用户应用程序。

C(consumer):消费者,消费和接收有类似的意思,消费者是一个主要用来等待接收消息的用户应用程序

队列(红色区域):rabbitmq内部类似于邮箱的一个概念。虽然消息流经rabbitmq和你的应用程序,但是它们只能存储在队列中。队列只受主机的内存和磁盘限制,实质上是一个大的消息缓冲区。许多生产者可以发送消息到一个队列,许多消费者可以尝试从一个队列接收数据。

总之:生产者将消息发送到队列,消费者从队列中获取消息,队列是存储消息的缓冲区。

代码演示

引入依赖

 <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-amqp</artifactId>
       <version>2.0.6.RELEASE</version>
</dependency>

 获取连接

package com.util;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class ConnectionUtil {
    /**
     * 建立与RabbitMQ的连接
     * @return
     * @throws Exception
     */
    public static Connection getConnection() throws Exception {
        //定义连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置服务地址
        factory.setHost("192.168.33.88");
        //TCP端口
        factory.setPort(5672);
        //设置账号信息,用户名、密码、vhost
        factory.setVirtualHost("vhost_NetDataGather");
        factory.setUsername("admin");
        factory.setPassword("123456");
        // 通过工程获取连接
        Connection connection = factory.newConnection();
        return connection;
    }
}

生产者

import com.util.ConnectionUtil;
 
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
 * 生产者
 */
public class Send {
     //声明队列名称
    private final static String QUEUE_NAME = "simple_queue";
 
    public static void main(String[] argv) throws Exception {
        // 生产者和Broker建立TCP连接。
        Connection connection = ConnectionUtil.getConnection();
        // 生产者和Broker建立通道。
        Channel channel = connection.createChannel();
        // 声明(创建)队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 消息内容
        String message = "Hello World!";
        for (int i = 0; i < 10; i++) {
            // 向指定的队列中发送消息
            message=message+i;
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        }
        //关闭通道和连接
        channel.close();
        connection.close();
    }
}

消费者

import java.io.IOException;
 
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
 
import com.util.ConnectionUtil;
 
/**
 * 消费者
 */
public class Recv {
    private final static String QUEUE_NAME = "simple_queue";
 
    public static void main(String[] argv) throws Exception {
        // 获取到连接
        Connection connection = ConnectionUtil.getConnection();
        // 创建通道
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 定义队列的消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                    byte[] body) throws IOException {
                // body 即消息体
                String msg = new String(body);
            }
        };
        // 监听队列,第二个参数:是否自动进行消息确认。
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

上述代码中:消息一旦被消费者接收,队列中的消息就会被删除。

如果消费者领取消息后,还没执行操作就挂掉了呢?或者抛出了异常?消息消费失败,但是RabbitMQ无从得知,这样消息就丢失了!

因此,RabbitMQ有一个ACK机制。当消费者获取消息后,会向RabbitMQ发送回执ACK,告知消息已经被接收。不过这种回执ACK分两种情况:

自动ACK:消息一旦被接收,消费者自动发送ACK。

手动ACK:消息接收后,不会发送ACK,需要手动调用。

手动ACK

import java.io.IOException;
 
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
 
import com.util.ConnectionUtil;
 
/**
 * 消费者,手动进行ACK
 */
public class Recv2 {
    private final static String QUEUE_NAME = "simple_queue";
 
    public static void main(String[] argv) throws Exception {
        // 获取到连接
        Connection connection = ConnectionUtil.getConnection();
        // 创建通道
        final Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 定义队列的消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                    byte[] body) throws IOException {
                // body 即消息体
                String msg = new String(body);
                // 手动进行ACK
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        // 监听队列,第二个参数false,手动进行ACK
        // 如果第二个参数为true,则会自动进行ACK;如果为false,则需要手动ACK。方法的声明:
        channel.basicConsume(QUEUE_NAME, false, consumer);
    }
}

以上就是RabbitMQ的简单使用讲解,更多详细使用内容,请再多多学习。文章来源地址https://www.toymoban.com/news/detail-659799.html

到了这里,关于Java RabbitMQ消息队列简单使用的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • TP5简单使用RabbitMQ实现消息队列

    在使用 RabbitMQ 之前,你要安装好 RabbitMQ 服务,具体安装方法可以参考 windows下安装RabbitMQ 1、安装扩展 进入TP5 更目录下,输入命令安装: composer require php-amqplib/php-amqplib 2、自定义命令 TP5 的自定义命令,这里也简单说下。 第一步: 创建命令类文件,新建 application/api/command

    2024年02月07日
    浏览(39)
  • RabbitMQ可靠性消息发送(java实现)

    本博客属于 《RabbitMQ基础组件封装—整体结构》的子博客 step1:消息落库,业务数据存库的同时,也要将消息记录存入数据库,二者要保证原子性; step2:Producer发送消息到MQ Broker; step3:Producer收到 broker 返回的确认消息; step4:更改消息记录库的状态(定义三种状态:0待确

    2024年02月04日
    浏览(58)
  • Java开发 - 消息队列之RabbitMQ初体验

    目录 前言 RabbitMQ 什么是RabbitMQ RabbitMQ特点 安装启动 RabbitMQ和Kafka的消息收发区别 RabbitMQ使用案例 添加依赖 添加配置 创建RabbitMQ配置类 RabbitMQ消息的发送 RabbitMQ消息的接收 测试 结语 前一篇,我们学习了Kafka的基本使用,这一篇,我们来学习RabbitMQ。他们作为消息队列本身都具

    2024年02月03日
    浏览(32)
  • 【Java】微服务——RabbitMQ消息队列(SpringAMQP实现五种消息模型)

    微服务间通讯有同步和异步两种方式: 同步通讯:就像打电话,需要实时响应。 异步通讯:就像发邮件,不需要马上回复。 两种方式各有优劣,打电话可以立即得到响应,但是你却不能跟多个人同时通话。发送邮件可以同时与多个人收发邮件,但是往往响应会有延迟。 1.

    2024年02月08日
    浏览(42)
  • RabbitMQ实现延迟消息,RabbitMQ使用死信队列实现延迟消息,RabbitMQ延时队列插件

    假设有一个业务场景:超过30分钟未付款的订单自动关闭,这个功能应该怎么实现? RabbitMQ使用死信队列,可以实现消息的延迟接收。 队列有一个消息过期属性。就像丰巢超过24小时就收费一样,通过设置这个属性,超过了指定事件的消息将会被丢弃。 这个属性交:x-message

    2024年02月13日
    浏览(49)
  • 消息队列——RabbitMQ基本概念+容器化部署和简单工作模式程序

    目录 基本概念  MQ 的优势  1.应用解耦  2.异步提速  3.削峰填谷  MQ 的劣势 使用mq的条件  常见MQ产品  RabbitMQ简介 RabbitMQ的六种工作模式   JMS RabbitMQ安装和配置。 RabbitMQ控制台使用。 RabbitMQ快速入门——生产者 需求: RabbitMQ快速入门——消费者 小结  多个系统之间的通信方

    2024年02月16日
    浏览(33)
  • RabbitMQ的消费者处理消息失败后可以重试,重试4次仍然失败发送到死信队列。

    生产者发送消息时采用雪花算法给消息设置唯一的消息id,消费者接收消息处理失败时,根据消息的唯一id统计失败次数,若没有达到失败次数限制,则让消息重回队列(在开启手动签收的前提),此时队列会再次给消费者发送消息;若达到失败次数限制,则让消息不重回队列,

    2024年02月07日
    浏览(36)
  • RabbitMQ系列(5)--使用Java实现RabbitMQ的消费者接收消息

    前言:先简单了解RabbitMQ的工作过程,方便后续开发理清思路 简略: 详细: 1、新建消费者类 效果图: 2、编写消费者消费消息的代码 例: 3、查看代码运行结果 运行代码后如果有输出生产者发送的”Hello World”信息,则证明消费者消费消息成功 4、在web页面上查看队列的消

    2024年02月06日
    浏览(32)
  • 【RabbitMQ】RabbitMQ 消息的堆积问题 —— 使用惰性队列解决消息的堆积问题

    消息的堆积问题是指在消息队列系统中,当生产者以较快的速度发送消息,而消费者处理消息的速度较慢,导致消息在队列中积累并达到队列的存储上限。在这种情况下,最早被发送的消息可能会在队列中滞留较长时间,直到超过队列的容量上限。当队列已满且没有更多的可

    2024年02月05日
    浏览(37)
  • RabbitMQ 消息队列使用

    同步调用优点: 时效性强,立即得到结果 缺点: 耦合度高 新业务新需求到来时,需要修改代码 性能和吞吐能力下降 调用服务的响应时间为所有服务的时间之和 资源浪费 调用链中的服务在等待时不会释放请求占用的资源 级联失败 一个服务执行失败会导致调用链后续所有服务失

    2024年01月21日
    浏览(41)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包