RabbitMQ中的限流、return机制、死信队列

这篇具有很好参考价值的文章主要介绍了RabbitMQ中的限流、return机制、死信队列。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

优点

缺点

1、限流

2、return机制

3、死信队列


优点

  1. 高可用性: RabbitMQ支持集群和镜像队列等多种方式实现高可用性,保证系统稳定运行。

  2. 可靠性强: RabbitMQ使用AMQP协议作为消息传递的标准,能够确保消息传递的可靠性和有序性。

  3. 灵活性强: RabbitMQ支持多种消息模式,包括点对点、发布订阅、路由、RPC等,可以根据业务需求选择合适的模式。

  4. 性能优异: RabbitMQ采用基于Erlang语言开发,具有高并发、低延迟等特点,支持快速处理大量数据和消息。

  5. 易于部署和管理: RabbitMQ提供了丰富的管理工具和API接口,可以方便地进行配置、监控和管理。同时也提供了可视化界面使得操作更加简单易懂。

  6. 开源免费:RabbitMQ是一款开源软件,并且完全免费使用。可以在任何环境下轻松部署和使用。

缺点

  1. 学习成本高: RabbitMQ涉及到很多概念和技术,对于初学者而言,学习起来可能会比较困难。

  2. 配置复杂: RabbitMQ的配置比较复杂,需要根据实际需求进行相应的设置。如果配置不当,可能会影响系统的性能和稳定性。

  3. 资源占用高: RabbitMQ使用Erlang语言开发,需要占用大量内存和CPU资源,如果部署在低配服务器上可能会导致系统出现延迟等问题。

  4. 单点故障: 尽管RabbitMQ支持集群模式,但是在某些情况下仍然存在单点故障的问题。

  5. 消息堆积: 如果消费者处理消息速度过慢或者出现异常情况时,RabbitMQ可能会导致消息堆积、内存溢出等问题。

1、限流

所谓限流,就是限制该消费者一次最多消费多少条消息

// 每次只消费1000条消息,RabbitMQ放过来的这1000条消息没有处理完,就不会再放进来新的消息
$channel->basic_qos(null, 1000, null);

该设置在平时用不上,但是在消息积压比较严重时会用到,如果在消息积压严重时不设置该值,则很可能会导致这个消费者挂掉。

2、return机制

return Listener 用于处理一些不可路由的消息!

生产者通过指定一个 exchange 和 routingkey 把消息送达到某个队列中去,然后消费者监听队列,进行消费处理。但是在某些情况下,如果我们在发送消息时,当前的 exchange 不存在或者指定的 routingkey 路由不到,这个时候如果要监听这种不可达的消息,就要使用 return Listener。

# 虽然定义了交换机,但是指定了一个没有绑定队列的路由键
<?php
declare(strict_types = 1);

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;

require __DIR__ . '/../vendor/autoload.php';

try {
    $connection = new AMQPStreamConnection('192.168.78.200', 5672, 'root', '123456');
    $channel = $connection->channel();
    // 定义一个交换机
    $channel->exchange_declare('test_return_exchange', AMQPExchangeType::DIRECT);
    $message = new AMQPMessage("Hello World!");

    // 这里我们指定了一个没有绑定队列的路由键,会导致rabbitmq找到不队列
    $channel->basic_publish($message, 'test_return_exchange', 'xxxxxx');

    while (true) {
        $channel->wait();
    }

    $channel->close();
    $connection->close();
} catch (Exception $e) {
    print_r($e->getMessage());
}

该代码虽然执行成功了,并且创建了我们声明的交换机,但是因为我们指定的路由键找不到与之绑定的队列,所以消息并不会推送进rabbitmq,但是因为rabbitmq并没有报错,所以我们会误以为推送成功了。

即使我们使用confirm机制来监听推送状态也没用,因为confirm监听不到因为没有找到交换机或者路由不到队列的情况,这里我们只能使用rabbitmq的另一个高级特性return机制,下面代码就引入了return机制:

<?php
declare(strict_types = 1);

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;

require __DIR__ . '/../vendor/autoload.php';

try {
    $connection = new AMQPStreamConnection('192.168.78.200', 5672, 'root', '123456');
    $channel = $connection->channel();
    // 定义一个交换机
    $channel->exchange_declare('test_return_exchange', AMQPExchangeType::DIRECT);
    $message = new AMQPMessage("Hello World!");

    // 指定第三个参数mandatory为true,表示当rabbitmq无法找到路由或队列时,将消息返回给生产者
    // 这里我们指定了一个没有绑定队列的路由键,会导致rabbitmq找到不队列,进而触发return
    $channel->basic_publish($message, 'test_return_exchange', 'xxxxxx', true);

    // 监听消息未找到交换机或者未找到路由键对应的路由
    $channel->set_return_listener(function ($replyCode, $replyText, $exchange, $routingKey, $message) {
        $msg  = 'oh hoo!发生错误了'.PHP_EOL;
        $msg .= '错误码:'.$replyCode.PHP_EOL;
        $msg .= '错误信息:'.$replyText.PHP_EOL;
        $msg .= '指定的交换机:'.$exchange.PHP_EOL;
        $msg .= '指定的路由键:'.$routingKey.PHP_EOL;
        $msg .= '投递的消息:'.$message->body.PHP_EOL;
        print_r($msg);
    });

    while (true) {
        $channel->wait();
    }

    $channel->close();
    $connection->close();
} catch (Exception $e) {
    print_r($e->getMessage());
}

这时再运行程序,就会进行报错,我们这里是输出了错误,实际生产中应该是将错误记录到指定的日志数据表中

# 程序返回
root@204d5054860c:/data/test-yii/web# php testReturn.php 
oh hoo!发生错误了
错误码:312
错误信息:NO_ROUTE
指定的交换机:test_return_exchange
指定的路由键:xxxxxx
投递的消息:Hello World!

3、死信队列

死信队列其实只是绑定在死信交换机上的普通队列,而死信交换机也只是一个普通的交换机,不过是用来专门处理死信的交换机。

死信消息是RabbitMQ为我们做的一层保证,其实我们也可以不使用死信队列,而是在消息消费异常时,将消息主动投递到另一个交换机中或记录到日志表中等待后续专门处理。

一个消息在下列场景下会被rabbitmq判定为死信,然后投入到死信队列中。

  • 消息被拒绝

  • 消息过期

  • 队列超载

下面模拟一条消息显示被投入普通队列,这条消息被设置过期时间是10秒,在这10秒内没有消费者来处理,因此这条消息就过期了,变成了死信,这时,RabbitMQ会将它放到死信队列里,也就是我们在代码中声明的死信队列。

注意:rabbitmq中不是只可以有一个死信队列,而是可以有多个死信队列,如果我们没有指定死信队列,过期的消息将被rabbitmq遗弃。文章来源地址https://www.toymoban.com/news/detail-472773.html

<?php
declare(strict_types = 1);

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Wire\AMQPTable;

require __DIR__ . '/../vendor/autoload.php';

try {
    $connection = new AMQPStreamConnection('192.168.78.200', 5672, 'root', '123456');

    // 设置交换机名称
    $deadLetterExchangeNameName = 'dead_letter_exchange';
    $normalExchangeName = 'exchange_6';
    // 设置队列名称
    $deadLetterQueueName = 'dead_letter_queue';
    $normalQueueName = 'queue_6';
    // 设置路由键
    $deadLetterRoutingKey = 'dead_letter_key';
    $normalRoutingKey = 'route_6';

    // 创建通道
    $channel = $connection->channel();
    // 声明交换器,作为死信交换器
    $channel->exchange_declare($deadLetterExchangeNameName, AMQPExchangeType::DIRECT, false, true);
    // 声明交换器,作为普通交换器
    $channel->exchange_declare($normalExchangeName, AMQPExchangeType::DIRECT, false, true);

    $args = new AMQPTable();
    // 设置消息过期时间为25s,25秒后消息会进入死信队列
    $args->set('x-message-ttl', 25000);
    // 设置死信交换器
    $args->set('x-dead-letter-exchange', $deadLetterExchangeNameName);
    // 设置死信路由键
    $args->set('x-dead-letter-routing-key', $deadLetterRoutingKey);

    // 声明队列,作为普通队列,同时将死信队列相关参数传入
    $channel->queue_declare($normalQueueName, false, true, false, false, false, $args);
    // 声明死信队列
    $channel->queue_declare($deadLetterQueueName, false, true, false, false);

    // 将普通队列与普通交换机绑定
    $channel->queue_bind($normalQueueName, $normalExchangeName, $normalRoutingKey);
    // 将死信队列与死信交换机绑定,同时指定死信路由键
    $channel->queue_bind($deadLetterQueueName, $deadLetterExchangeNameName, $deadLetterRoutingKey);
    // 设置传递的消息
    $message = new AMQPMessage('Hello DLX Message queue_6');

    // confirm监听消息投递【与死信队列无关,仅用于排错】
    $channel->confirm_select();
    $channel->set_ack_handler(function ($message) {
        print_r('投递成功啦,消息是'.$message->body);
    }) ;
    $channel->set_nack_handler(function ($message) {
        print_r('投递失败啦,消息是'.$message->body);
    }) ;

    // return监听未找到交换机或路由问题【与死信队列无关,仅用于排错】
    $channel->set_return_listener(function ($replyCode, $replyText, $exchange, $routingKey, $message) {
        $msg  = 'oh hoo!发生错误了'.PHP_EOL;
        $msg .= '错误码:'.$replyCode.PHP_EOL;
        $msg .= '错误信息:'.$replyText.PHP_EOL;
        $msg .= '指定的交换机:'.$exchange.PHP_EOL;
        $msg .= '指定的路由键:'.$routingKey.PHP_EOL;
        $msg .= '投递的消息:'.$message->body.PHP_EOL;
        print_r($msg);
    });

    // 发送消息到RabbitMQ
    $channel->basic_publish($message, $normalExchangeName, $normalRoutingKey, true);

    while (true) {
        $channel->wait();
    }

    // 关闭通道和连接
    $channel->close();
    $connection->close();

} catch (Exception $e) {
    print_r($e->getMessage());
}

到了这里,关于RabbitMQ中的限流、return机制、死信队列的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RabbitMQ-死信交换机和死信队列

    DLX: Dead-Letter-Exchange 死信交换器,死信邮箱 当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。 如下图所示: 其实死信队列就是一个普通的交换机,有些队列的消息成为死信后,(比如过期了或者队列满了)这些死信一般情况下是会被 RabbitMQ 清理

    2024年02月08日
    浏览(36)
  • RabbitMQ延迟队列,死信队列配置

    延迟和死信队列的配置 延迟队列有效期一分钟,后进入死信队列,如果异常就进入异常队列 异常队列配置类

    2024年02月14日
    浏览(41)
  • Rabbitmq死信队列及延时队列实现

    问题:什么是延迟队列 我们常说的延迟队列是指消息进入队列后不会被立即消费,只有达到指定时间后才能被消费。 但RabbitMq中并 没有提供延迟队列功能 。那么RabbitMQ如何实现延迟队列 通过:死信队列 + RabbitMQ的TTL特性实现。 实现原理 给一个普通带有过期功能的队列绑定一

    2024年02月15日
    浏览(34)
  • 【RabbitMQ笔记10】消息队列RabbitMQ之死信队列的介绍

    这篇文章,主要介绍消息队列RabbitMQ之死信队列。 目录 一、RabbitMQ死信队列 1.1、什么是死信队列 1.2、设置过期时间TTL 1.3、配置死信交换机和死信队列(代码配置) (1)设置队列过期时间 (2)设置单条消息过期时间 (3)队列设置死信交换机 (4)配置的基本思路 1.4、配置

    2024年02月16日
    浏览(49)
  • 【RabbitMQ学习日记】——死信队列与延迟队列

    死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说, producer 将消息投递到 broker 或者直接到 queue 里了, consumer 从 queue 取出消息进行消费,但某些时候 由于特定的原因导致 queue 中的某些消息无法被消费 ,这样的消息如果没有后续的处理,就变成了死

    2024年02月06日
    浏览(44)
  • 【RabbitMQ教程】第五章 —— RabbitMQ - 死信队列

                                                                       💧 【 R a b b i t M Q 教 程 】 第 五 章 — — R a b b i t M Q − 死 信 队 列 color{#FF1493}{【RabbitMQ教程】第五章 —— RabbitMQ - 死信队列} 【 R a b b i t M Q 教 程 】 第 五 章 — — R a

    2024年02月09日
    浏览(29)
  • 死信是什么,如何运用RabbitMQ的死信机制?

    手把手教你,本地RabbitMQ服务搭建(windows) 消息队列选型——为什么选择RabbitMQ RabbitMQ 五种消息模型 RabbitMQ 能保证消息可靠性吗 推或拉? RabbitMQ 消费模式该如何选择 我们在上次讨论RabbitMQ的消息可靠性时,已经提到了死信队列(详见系列文章《RabbitMQ 能保证消息可靠性吗》

    2024年02月11日
    浏览(19)
  • RabbitMQ进阶——死信队列

    在消息队列中,执行异步任务时,通常是将消息生产者发布的消息存储在队列中,由消费者从队列中获取并处理这些消息。但是,在某些情况下,消息可能无法正常地被处理和消耗,例如:格式错误、设备故障等,这些未成功处理的消息就被称为“死信”。 为了避免这些未成

    2024年04月13日
    浏览(23)
  • RabbitMQ: 死信队列

    其实就是一个普通的队列,绑定号私信交换机,不给ttl,给上匹配的路由,等待交换机发送消息。 1.在消费者里的RabbitMQConfig配置类里,创建队列,给它加参数 第四个参数,就是放入这个队列,的一些属性参数 也就是这两个位置 对应Java代码里好像少个参数,排他性,是指,

    2024年02月09日
    浏览(33)
  • rabbitMQ引入死信队列

            指的是,从队列当中取出来的消息,到达消费方后,因为某些原因导致消息并没有被正常消费掉,这些没有被后续处理的消息就是“死信”,而保存死信的队列,就是死信队列。         为了保证订单业务的消息数据不丢失,需要使用死信队列机制,在消息消费发生

    2024年02月02日
    浏览(34)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包