RabbitMQ入门(详细)

这篇具有很好参考价值的文章主要介绍了RabbitMQ入门(详细)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

初始消息队列

消息队列初识

  • 消息队列∶接收并转发消息。类似于“快递公司”
  • producer :消息的发送者、生产者
  • consumer :消息的消费者,从队列获取消息,并且使用
  • queue∶先进先出的消息队列,一个queue可以对应多个 consumer

为什么要使用消息队列?

  • 代码解耦,提高系统稳定性。
  • 应对流量高峰,降低流量冲击。
  • 异步执行,提高系统响应速度。

消息队列的特性

  • 性能好。
  • 基础组件。(类似于mysql,是通用的系统)
  • 支持消息确认。(当断电了重启之后,可以对消息进行重新处理)保持了一致性。
  • 削峰(请求峰值)

RabbitMQ介绍

  • 官网:https://rabbitmq.com/

特点

  • 路由能力灵活强大
  • 开源免费
  • 支持编程语言多
  • 应用广泛,社区活跃
  • 有开箱即用的监控和管理后台

核心概念

RabbitMQ入门(详细)

Linux(CentOs7) 下安装:

官方安装指南:https://www.rabbitmq.com/install-rpm.html
我们将要安装的RabbitMQ的版本是3.8.2
https://packagecloud.io/rabbitmq/rabbitmq-server/packages/el/7/rabbitmq-server-3.8.2-1.el7.noarch.rpm
不需要单独安装Erlang环境。

安装前配置:

  • 前提:在一个新建的阿里云的Cent OS 7.6上安装,不要对yum换源,否则可能会安装失败。
  • echo “export LC_ALL=en_US.UTF-8” >> /etc/profile
  • source /etc/profile
    • 将编码格式设置为 UTF-8

Erlang下载安装

  • wget --content-disposition https://packagecloud.io/rabbitmq/erlang/packages/el/7/erlang-22.3.4.12-1.el7.x86_64.rpm/download.rpm

    • 先下载安装Erlang环境
  • 安装已下载的rpm包(可根据刚才自己选择的版本修改下面的版本号)

    • yum localinstall erlang-22.3.4.12-1.el7.x86_64.rpm
  • sudo yum install rabbitmq-server-3.8.2-1.el7.noarch
    RabbitMQ入门(详细)

RabbitMQ下载安装

  • wget --content-disposition https://packagecloud.io/rabbitmq/rabbitmq-server/packages/el/7/rabbitmq-server-3.8.13-1.el7.noarch.rpm/download.rpm
  • rpm --import https://www.rabbitmq.com/rabbitmq-release-signing-key.asc
    • 需要运行命令来将 Key 导入
  • 最后,使用 yum 进行本地安装(可根据自己选择的版本修改下面的版本号)
    • yum localinstall rabbitmq-server-3.8.13-1.el7.noarch.rpm
      RabbitMQ入门(详细)

    • 若报错:rabbitmq Unregistered Authentication Agent for unix- process:6485:746263

    • 执行以下命令:rpm -ivh --nodeps rabbitmq-server-3.8.13-1.el7.noarch.rpm

    • 再重试

  • journalctl -xe:可以查看 rabbitmq-server 日志
  • systemctl start rabbitmq-server:启动 rabbitmq 服务器
  • rabbitmqctl status:查看 状态信息。
  • systemctl enable rabbitmq-server:设置开机自启动

linux下安装rabbitmq可能会遇到的问题

相关博客:https://blog.csdn.net/m0_67402914/article/details/123972575

Linux 常用命令

  • 开启 web 管理界面:rabbitmq-plugins enable rabbitmq_management
  • 启动RabbitMQ:systemctl start rabbitmq-server
  • 设置开机启动:systemctl enable rabbitmq-server
  • 停止RabbitMQ:rabbitmqctl stop
  • 查看状态信息:rabbitmqctl status
  • 检查 RabbitMQ 服务器的状态:systemctl status rabbitmq-server

RabbitMQ Web 界面管理

  • 默认情况下,是没有安装web端的客户端插件,需要安装才可以生效。执行命令:
    • rabbitmq-plugins enable rabbitmq_management
  • 安装完毕以后,重启服务即可,执行命令:
    • systemctl restart rabbitmq-server
  • 在服务器上开放 15672 端口。
  • rabbitmq 有一个默认账号和密码是: guest
    • 默认情况只能在 localhost 本机下访问,因此想要远程访问就需要新增一个远程登录的用户。

新增用户

  • 将账号密码都设置为 admin:

    • rabbitmqctl add_user 账号 密码
  • 设置用户分配操作权限:

    • rabbitmqctl set_user_tags admin administrator:设置为管理员
  • 进入虚拟主机,添加允许访问的用户。
    RabbitMQ入门(详细)
    RabbitMQ入门(详细)

  • 访问 http://IP地址:15672 ,输入新增的用户
    RabbitMQ入门(详细)

    • 如果访问不了,看是否开放了防火墙,或阿里云服务器是否开启了安全组。

管理界面

  • amqp:5673:用于客户端连接

  • clustering:25672:用于集群

  • http:15672:用于http协议 登录管理后台
    RabbitMQ入门(详细)

  • exchanges:交换机,默认会有 7 个
    RabbitMQ入门(详细)

    • 每个交换机点进去,可以绑定队列、消息发送 等操作
  • admin:用户管理,可以用户进行 CURD 操作等。

  • 可以在右侧进入虚拟主机的管理页面
    RabbitMQ入门(详细)

    • 进入虚拟主机,可以进行 CURD 操作,添加允许访问的用户等

RabbitMQ入门(详细)

Java 基础应用

Maven:
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.17.0</version>
</dependency>
<!-- rabbitmq要求的内部用于记录日志的 -->
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-nop</artifactId>
    <version>2.0.7</version>
</dependency>

创建连接工厂

  • ConnectionFactory factory = new ConnectionFactory();

设置 RabbitMQ 地址

  • factory.setHost(String ip):设置发送对象 RabbitMQ服务器的 ip
  • factory.setUsername(String username):设置登录的用户
  • factory.setPassword(String password):密码
  • 需要在云服务器设置安全组,开启 5672 端口,授权对象为 0.0.0.0/0
    RabbitMQ入门(详细)
    RabbitMQ入门(详细)
    RabbitMQ入门(详细)

建立连接

  • Connection connection = factory.newConnection();
  • 会抛出 IOException, TimeoutException

获得信道

  • Channel channel = connection.createChannel();
  • 会抛出 IOException

声明队列

  • channel.queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
    Map<String, Object> arguments) throws IOException
    • queue:队列名
    • durable:是否声明持久队列(该队列将在服务器重启后继续存在)。
    • exclusive:是否声明一个独占队列(仅限于此连接)。
    • autoDelete:是否声明一个自动删除队列(服务器将在不再使用时删除它)。
    • Map<String, Object> arguments:队列的其他属性(构造参数),没有则为 null。
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

发布 或 消费(接收)消息

  • 消息发送之后会存储在队列里,当有其他消费者进行接收时,就会出队。
    RabbitMQ入门(详细)

发布消息:

  • channel.basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body):发布消息
    • 发布到不存在的交换将导致通道级协议异常,从而关闭通道。如果资源驱动的警报生效,Channel#basicPublish 的调用将最终阻塞。
    • exchange:要将消息发布到的交换机。
      • 可置为空串 “”,代表默认的交换机,此时会使用 routingKey 进行 查找
    • routingKey:路由键,可以是队列名
    • props:消息的其他属性配置-路由头等,没有则置为 null
    • body:消息主体,需要声明编码方式;例如: msg.getBytes(StandardCharsets.UTF_8)
String msg = "Hello World";
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes(StandardCharsets.UTF_8));

消费(接收)消息:

  • channel.basicConsume(String queue, boolean autoAck, Consumer callback):启动一个非本地的、非独占的消费者,一个服务器生成的消费者标签。
    • queue:队列名
    • autoAck:设置服务器是否 自动确认消息处理完毕。
      • 若为false,则需要在 callback 函数里,当消息处理完毕时使用 basicAck() 函数来确认。
      • channel.basicAck(long deliveryTag, boolean multiple):确认一个或多个接收到的消息。
        • deliveryTag:envelope.getDeliveryTag()。接收到的AMQP.Basic.GetOk或AMQP.Basic.Deliver的标签。
        • multiple:true确认所有到并包括所提供的交付标记的消息;false表示只确认所提供的交付标签。
    • callback:使用者处理对象的接口
      • 一般就是一个 DefaultConsumer 的匿名内部类,重写 handleDelivery() 方法
      • handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
        byte[] body):当接收到此使用者的 basic.deliver 时调用。
        • consumerTag:与消费者相关联的消费者标签。
        • envelope:打包消息的数据。
        • properties:消息的内容头数据
        • body:消息体(不透明的、特定于客户端的字节数组),转换为字符串时需要指定编码
channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                               byte[] body) throws IOException {
        String msg = new String(body, "UTF-8");
        System.out.println("收到消息:" + msg);
    }
});

关闭连接

  • 接收消息时可以不用关闭,它就能一直处于待接收状态。
  • 秉承 先开后关 原则。
channel.close();
connection.close();

交换机工作模式

不会接收到 消费端启动的消息。

  • fanout:广播模式,将消息广播到所有与之绑定的队列,不需要设置路由键。
  • direct:直接模式,根据 RoutingKey 匹配消息路由到指定的队列。
  • topic:主题模式,生产者指定 RoutingKey 消息,根据消费端指定的队列通过模糊匹配的方式进行相应转发。
  • headers:根据发送消息内容中的 headers 属性来匹配(基本不用)。

fanout 广播模式

  • 将消息广播到所有与之绑定的队列,不需要设置路由键。
    RabbitMQ入门(详细)

direct 直接模式

  • 根据 RoutingKey 匹配消息路由到指定的队列。
    • 允许多个队列绑定相同的 RoutingKey。
      RabbitMQ入门(详细)

topic 主题模式:

  • direct 的升级。生产者指定 RoutingKey 消息,根据消费端指定的队列- 通过模糊匹配的方式进行相应转发。
  • * 可以代替一个单词。
  • # 可以替代零个或多个单词。
  • 注意:是单词,不是字符。
    RabbitMQ入门(详细)

应用:

定义交换机:

  • 发送和接收时都需要定义。
  • channel.exchangeDeclare(String exchange, BuiltinExchangeType type):主动声明一个不带额外参数的非自动删除、非持久的交换机。
    • exchange:交换机名字
    • type:交换机类型。
      • 其值是 BuiltinExchangeType 枚举类:DIRECT, FANOUT, TOPIC, HEADERS
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
- 注意:当已存在相同交换机名且类型不一致时,会报出 IOException。描述:received 'direct' but current is 'fanout'

接收时需要绑定队列:

  • channel.queueBind(String queue, String exchange, String routingKey):将队列绑定到交换机,不带额外参数。
    • routingKey:路由键。
    • 一个交换机可以绑定多个queue,甚至queueName也可以一至(同一个queue绑定了多个RoutingKey)
// 获取临时队列名
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
广播模式完整案例:
public class EmitLog {
    private static String EXCHANGE_NAME = "direct-logs";
    public static void main(String[] args) throws IOException, TimeoutException {
        // 创建工厂,建立连接,获取信道
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.65.128");
        factory.setUsername("admin");
        factory.setPassword("admin");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        String msg1 = "info: Hello World";
        String msg2 = "warning: Hello World";
        String msg3 = "error: Hello World";
        channel.basicPublish(EXCHANGE_NAME, "info", null, msg1.getBytes(StandardCharsets.UTF_8));
        channel.basicPublish(EXCHANGE_NAME, "warning", null, msg2.getBytes(StandardCharsets.UTF_8));
        channel.basicPublish(EXCHANGE_NAME, "error", null, msg3.getBytes(StandardCharsets.UTF_8));
        System.out.println("发送了消息");
        channel.close();
        connection.close();
    }
}
public class ReceiveLogs {
    private static String EXCHANGE_NAME = "direct-logs";
    public static void main(String[] args) throws IOException, TimeoutException {
        // 创建工厂,建立连接,获取信道
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.65.128");
        factory.setUsername("admin");
        factory.setPassword("admin");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        // 获取临时队列名
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "warning");
        channel.queueBind(queueName, EXCHANGE_NAME, "error");
        channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                System.out.println("收到消息:" + new String(body, "UTF-8"));
            }
        });
    }
}

其他知识点

消息分配

  • 默认 Rabbitmq 是按照消息的 数量来进行一次性平均分配 的。
  • 比如,有10个消息,2个消费者,则奇数位的消息都分配给第一个消费者,偶数位的消息分配给第二个消费者,而不考虑每个消息所需耗费的执行时间,(尽管可能某一个消费者的消息都是很费时的消息)

按压力进行平均分配(公平派遣):

  • 干完了手头上的工作,再分配得到第二个工作
    RabbitMQ入门(详细)

  • channel.basicQos(int prefetchCount):设置此通道最希望处理的消息数量。

    • 注意预取计数必须在 0到65535 之间(AMQP 0-9-1中的unsigned short)。
    • 在数量 < prefetchCount 之前,不会再接收下一个任务。
  • 关闭自动确认:

    • 置 channel.basicConsume(String queue, boolean autoAck, Consumer callback) 方法的 autoAck 参数为 false。
  • 在 basicConsume() 的 callback 函数里,当消息处理完毕时使用 basicAck() 函数来确认。

  • channel.basicAck(long deliveryTag, boolean multiple):确认一个或多个接收到的消息。文章来源地址https://www.toymoban.com/news/detail-437742.html

    • deliveryTag:envelope.getDeliveryTag()。接收到的AMQP.Basic.GetOk或AMQP.Basic.Deliver的标签。
    • multiple:true确认所有到并包括所提供的交付标记的消息;false表示只确认所提供的交付标签。
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
// 接收消息并消费
channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                               byte[] body) throws IOException {
        String msg = new String(body, "UTF-8");
        System.out.println("收到消息:" + msg);
        channel.basicAck(envelope.getDeliveryTag(), false);
    }
});

临时队列

  • 当接收者不存在时,临时队列就会自动删除,以节省空间。
  • channel.queueDeclare().getQueue();
String queueName = channel.queueDeclare().getQueue();

到了这里,关于RabbitMQ入门(详细)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 利用消息中间件RabbitMQ创建队列以及扇出(Fanout)、订阅(Direct)、主题(Topic)交换机来完成消息的发送和监听接收(完整版)

    目录 一、前期项目环境准备 1.1父项目以及子项目 1.2配置pom.xml 1.3配置application.yml 二、扇出(Fanout) 交换机实现消息的发送和接收 2.1编写子项目consumer(消费者,接收消息)的代码实现扇出(Fanout)交换机接收消息 2.1.1consumer子项目结构 2.1.2FanoutConfig类的实现扇出(Fanout)交

    2024年02月05日
    浏览(56)
  • Message queue 消息队列--RabbitMQ 【基础入门】

                                🎉🎉欢迎来到我的CSDN主页!🎉🎉                     🏅我是平顶山大师,一个在CSDN分享笔记的博主。📚📚     🌟推荐给大家我的博客专栏《Message queue 消息队列--RabbitMQ 【基础入门】》。🎯🎯                     🎁如果感觉还

    2024年01月20日
    浏览(60)
  • MQ消息队列,以及RabbitMQ详细(中1)五种rabbitMQ实用模型

    书接上文,展示一下五种模型我使用的是spring could 微服务的框架 文章说明:         本文章我会分享总结5种实用的rabbitMQ的实用模型 1、hello world简单模型 2、work queues工作队列 3、Publish/Subscribe发布订阅模型 4、Routing路由模型 5、Topics 主题模型 (赠送) 6、消息转换器 Rabbi

    2024年02月05日
    浏览(52)
  • RabbitMQ入门 消息队列快速入门 SpringAMQP WorkQueue 队列和交换机 Fanout Direct exchange RAbbitMQ单体部署

    微服务间通讯有同步和异步两种方式: 同步通讯:就像打电话,需要实时响应。 异步通讯:就像发邮件,不需要马上回复。 两种方式各有优劣,打电话可以立即得到响应,但是你却不能跟多个人同时通话。发送邮件可以同时与多个人收发邮件,但是往往响应会有延迟。 1.

    2024年04月08日
    浏览(67)
  • RabbitMQ-同步和异步通讯、安装和入门案例、SpringAMQP(5个消息发送接收Demo,jackson消息转换器)

    微服务间通讯有同步和异步两种方式: 同步通讯:就像打电话,需要实时响应。 异步通讯:就像发邮件,不需要马上回复。 两种方式各有优劣,打电话可以立即得到响应,但是你却不能跟多个人同时通话。发送邮件可以同时与多个人收发邮件,但是往往响应会有延迟。 1.

    2024年02月11日
    浏览(31)
  • Kafka消息队列实现消息的发送和接收

    消息在Kafka消息队列中发送和接收过程如下图所示: 消息生产者Producer产生消息数据,发送到Kafka消息队列中,一台Kafka节点只有一个Broker,消息会存储在Kafka的Topic(主题中),不同类型的消息数据会存储在不同的Topic中,可以利用Topic实现消息的分类,消息消费者Consumer会订阅

    2024年02月11日
    浏览(48)
  • 90、RabbitMQ如何确保消息发送?消息接收?

    信道需要设置为 confirm 模式,则所有在信道上发布的消息都会分配一个唯一 ID。 一旦消息被投递到queue(可持久化的消息需要写入磁盘),信道会发送一个确认给生产者(包含消息唯一 ID) 如果 RabbitMQ 发生内部错误从而导致消息丢失,会发送一条 nack (未确认) 消息给生产者。

    2024年02月16日
    浏览(44)
  • 【初始RabbitMQ】死信队列的实现

    死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息 进行消费,但某些时候由于特定的 原因导致 queue 中的某些消息无法被消费 ,这样的消息如果没有 后续的处理,就变成了死

    2024年02月21日
    浏览(33)
  • 【初始RabbitMQ】工作队列的实现

    工作队列(又称为任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。 相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进 程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理

    2024年02月21日
    浏览(34)
  • RabbitMQ实现延迟消息,RabbitMQ使用死信队列实现延迟消息,RabbitMQ延时队列插件

    假设有一个业务场景:超过30分钟未付款的订单自动关闭,这个功能应该怎么实现? RabbitMQ使用死信队列,可以实现消息的延迟接收。 队列有一个消息过期属性。就像丰巢超过24小时就收费一样,通过设置这个属性,超过了指定事件的消息将会被丢弃。 这个属性交:x-message

    2024年02月13日
    浏览(74)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包