一.什么是消息队列
1.简介
在介绍消息队列之前,应该先了解什么是 AMQP(Advanced Message Queuing Protocol, 高级消息队列协议,点击查看)
消息(Message)是指在应用间 传送的数据,消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象;而 消息队列(Message Queue)是一种 应用间的 通信方式,消息发送后可以 立即返回,由 消息系统来确保消息的 可靠传递, 消息发布者只管把消息发布到 MQ 中而不用管谁来取, 消息使用者只管从 MQ 中取消息而不管是谁发布的,这样发布者和使用者都不用知道对方的存在,它是典型的 生产者-消费者模型,生产者不断向消息队列生产消息,消费者不断的从队列获取消息。因为消息的生产和消费都是 异步的,并且只关心消息的发送和接收,没有业务逻辑的浸入,这样就实现了生产者和消费者的 解耦
2.总结
(1).消息队列是队列结构的 中间件
(2).消息发送后,不需要立即处理,而是由消息系统来处理
(3).消息处理是 消息使用者(消费者) 按顺序处理的
3.结构图
二.为什么要使用消息队列
消息队列是一种应用间的异步协作机制,是分布式系统中的重要的组件,主要目的是为了解决应用 藕合, 异步消息, 流城削锋, 冗余,扩展性,排序保证等问题,实现 高性能, 高并发, 可伸缩和 最终一致性架构,下面举例说明
业务解耦
以常见的订单系统为例,用户点击【下单】按钮之后的业务逻辑可能包括:扣减库存、生成相应单据、发红包、发短信通知,商品配送等业务;在业务发展初期这些逻辑可能放在一起同步执行,随着业务的发展订单量增长,需要提升系统服务的性能,这时可以将一些不需要立即生效的操作拆分出来异步执行,或者单独拆分出来作为一个独立的系统,比如生成相应单据为订单系统,扣减库存为库存系统,发放红包独立为红包系统、发短信通知为短信系统,商品配送为配送系统等。这种场景下就可以用 MQ ,在下单的主流程(比如扣减库存、生成相应单据)完成之后发送一条消息到 MQ 让主流程快速完结,而由另外的单独线程拉取MQ的消息(或者由 MQ 推送消息),当发现 MQ 中有发红包或发短信或商品配送之类的消息时,执行相应的业务系统逻辑,这样各个业务系统相互独立,就很方便进行分离部署,防止某一系统故障引起的连锁故障
流量削峰
流量削峰一般在秒杀或者团抢活动中广泛使用
(1).由来
主要是还是来自于互联网的业务场景,例如:春节火车票抢购,大量的用户需要同一时间去抢购;以及双11秒杀, 短时间上亿的用户涌入,瞬间流量巨大(高并发),比如:200万人准备在凌晨12:00准备抢购一件商品,但是商品的数量缺是有限的100-500件左右。这样真实能购买到该件商品的用户也只有几百人左右, 但是从业务上来说,秒杀活动是希望更多的人来参与,也就是抢购之前希望有越来越多的人来看购买商品。但是,在抢购时间达到后,用户开始真正下单时,秒杀的服务器后端却不希望同时有几百万人同时发起抢购请求。因为服务器的处理资源是有限的,所以出现峰值的时候,很容易导致服务器宕机,用户无法访问的情况出现。这就好比出行的时候存在早高峰和晚高峰的问题,为了解决这个问题,出行就有了错峰限行的解决方案。同理,在线上的秒杀等业务场景,也需要类似的解决方案,需要平安度过同时抢购带来的流量峰值的问题,这就是流量削峰的由来。
(2).怎样来实现流量削峰方案
削峰从本质上来说就是更多地延缓用户请求,以及层层过滤用户的访问需求,遵从“最后落地到数据库的请求数要尽量少”的原则。
1).消息队列解决削峰
要对流量进行削峰,最容易想到的解决方案就是用消息队列来缓冲瞬时流量,把同步的直接调用转换成异步的间接推送,中间通过一个队列在一端承接瞬时的流量洪峰,在另一端平滑地将消息推送出去。
消息队列中间件主要解决应用耦合,异步消息, 流量削锋等问题;常用消息队列系统:目前在生产环境,使用较多的消息队列有 ActiveMQ、RabbitMQ、 ZeroMQ、Kafka、MetaMQ、RocketMQ 等。
在这里,消息队列就像“水库”一样,拦蓄上游的洪水,削减进入下游河道的洪峰流量,从而达到减免洪水灾害的目的。
2).流量削峰漏斗:层层削峰
针对秒杀场景还有一种方法,就是对请求进行分层过滤,从而过滤掉一些无效的请求。分层过滤其实就是采用“漏斗”式设计来处理请求的,如下图所示:
这样就像漏斗一样,尽量把数据量和请求量一层一层地过滤和减少了
I.分层过滤的核心思想
通过在不同的层次尽可能地过滤掉无效请求
通过CDN过滤掉大量的图片,静态资源的请求
再通过类似Redis这样的分布式缓存,过滤请求等就是典型的在上游拦截读请求
II.分层过滤的基本原则
对写数据进行基于时间的合理分片,过滤掉过期的失效请求
对写请求做限流保护,将超出系统承载能力的请求过滤掉
涉及到的读数据不做强一致性校验,减少因为一致性校验产生瓶颈的问题
对写数据进行强一致性校验,只保留最后有效的数据
最终,让“漏斗”最末端(数据库)的才是有效请求,例如:当用户真实达到订单和支付的流程,这个是需要数据强一致性的。
(3).总结
1).对于秒杀这样的高并发场景业务,最基本的原则就是将请求拦截在系统上游,降低下游压力。如果不在前端拦截很可能造成数据库(mysql、oracle等)读写锁冲突,甚至导致死锁,最终还有可能出现雪崩等场景。
2).划分好动静资源,静态资源使用CDN进行服务分发
3).充分利用缓存(redis等),增加QPS,从而加大整个集群的吞吐量
4).高峰值流量是压垮系统很重要的原因,所以需要RabbitMQ等消息队列在一端承接瞬时的流量洪峰,在另一端平滑地将消息推送出去
在
异步处理
用户注册后,需要发送注册邮件和注册短信
三.RabbitMQ介绍
RabbitMQ是一个由 erlang语言开发的,实现了AMQP协议的标准的开源消息代理和队列服务器( 消息队列中间件)
1.常见的消息队列中间件
2.RabbitMQ特性
可靠性(Reliability)
RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认
灵活的路由(Flexible Routing)
在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange
消息集群(Clustering)
多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker
高可用(Highly Available Queues)
队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用
多种协议(Multi-protocol)
RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等
多语言客户端(Many Clients)
RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby ,PHP等
管理界面(Management UI)
RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面
跟踪机制(Tracing)
如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么
插件机制(Plugin System)
RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件
3.RabbitMQ工作原理
内部实际上也是 AMQP 中的基本概念
上图各个模块的说明:
Message: 消息,消息是不具名的,它由消息头和消息体组成,消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等
Publisher: 消息的生产者,也是一个向交换器发布消息的客户端应用程序
Consumer:消息的消费者,表示一个从消息队列中取得消息的客户端应用程序
Broker: 接收和分发消息的应用,表示消息队列服务器实体,RabbitMQ Server就是Message Broker
Virtual host: 虚拟主机(共享相同的身份认证和加密环境的独立服务器域),表示一批交换器、消息队列和相关对象,类似于mysql的数据库,当多个不同的用户使用同一个RabbitMQ server提供的服务时,可以划分出多个vhost,每个用户在自己的vhost创建exchange、queue等.每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制,vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 /
Connection: publisher、consumer和broker之间的网络连接,比如:TCP连接,断开连接的操作只会在client端进行,Broker不会断开连接,除非出现网络故障或broker服务出现问题
Channel: 管道,多路复用连接中的一条独立的双向数据流通道,信道是建立在真实的TCP连接内地虚拟连接(逻辑连接),如果应用程序支持多线程,通常每个多线程创建单独的channel进行通讯, 因为AMQP 方法中包含了channel id帮助客户端和broker识别channel,所以channel之间是完全隔离的,AMQP 命令都是通过管道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过管道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销将是巨大的,效率也较低,所以引入了管道的概念,目的是为了减少操作系统建立TCP 连接的开销,以复用一条 TCP 连接
Exchange: 交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列,message到达broker的第一站,根据分发规则,匹配查询表中的路由键(routing key),分发消息到queue中去,常用的类型有:直连交换机-direct (point-to-point), 主题交换机-topic (publish-subscribe),扇型交换机-fanout (multicast), 头交换机-headers(amq.match (and amq.headers in RabbitMQ))
Queue: 消息队列,用来保存消息直到发送给消费者,它是消息的容器,也是消息的终点,一个消息可投入一个或多个队列,消息最终被送到这里等待消费者连接到这个队列将其取走
Binding: 绑定,消息队列(queue)和交换器(exchange)之间的虚拟连接, binding中可以包含routing key, Binding信息被保存到exchange中的查询表中,用于message的分发依据,一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表
四.RabbitMQ安装与启动
见linux下,docker-compose 安装nignx,php,mysql,redis,rabbitmq,mongo
端口说明:安装完rabbitmq后,有几个常见端口:
4369: epmd(Erlang Port Mapper Daemon),erlang服务端口
5672: client端通信端口
15672:http Api客户端,管理UI(仅在启用了管理插件的情况下使用)
25672:用于节点间通信(Erlang分发服务器端口)
五.RabbitMQ几个重要特性概念讲解
队列模式-简单队列模式,工作队列模式
ACK&NACK消费确认机制&重回队列机制
消息持久化
公平调度(限流机制)
幂等性
return机制
消息的可靠性投递
下面是RabbitMQ和消息所涉及到的一些 术语
生产(Producing)的意思就是发送:发送消息的程序就是一个生产者(producer),一般用"P"来表示:
队列(queue)就是存在于RabbitMQ中邮箱的名称:虽然消息的传输经过了RabbitMQ和你的应用程序,但是它只能被存储于队列当中。实质上队列就是个巨大的消息缓冲区,它的大小只受主机内存和硬盘限制。多个生产者(producers)可以把消息发送给同一个队列,同样,多个消费者(consumers)也能够从同一个队列(queue)中获取数据。队列可以绘制成这样(图上是队列的名称):
消费(Consuming)和接收(receiving)是同一个意思:一个消费者(consumer)就是一个等待获取消息的程序。把它绘制为"C":
以php框架yii2为参照
简单队列模式(simple queue)
发送 单个消息的生产者,以及 接收消息并将其 打印出来的消费者。将忽略RabbitMQ API中的一些细节。 在下图中,“P”是生产者,“C”是消费者。中间的框是一个队列(保存消息的地方)
(1).生产者发布消息步骤
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
// 创建连接
$connection = new AMQPStreamConnection($host, $port, $user, $pass, $v_host="/");
// 创建channel
$channel = $connection->channel();
// 初始化队列,并持久化(声明队列)
$channel->queue_declare($queue_name, false, true, false, false);
//消息内容
$data = "this is message2";
// 声明消息,并持久化(创建消息)
$mes = new AMQPMessage($data, ["delivery_mode" => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
// 把消息推到队列里(发布消息)
$channel->basic_publish($mes, '', $queue_name);
//关闭通道和连接
$channel->close();
$connection->close();
上面声明队列方法queue_declare()参数详解
(2).消费者消费消息步骤
//核心代码
basic_consume($queue = '', $consumer_tag = '', $no_local = false,$no_ack = false,$exclusive = false,$nowait = false,$callback = null,$ticket = null,$arguments = array())
上面消费消息方法basic_consume()参数详解
(3).具体代码展示
rabbitmq配置
"rabbitMq" => [
"base" => [
'host' => '192.168.0.5', // host地址
'port' => 5672, // 端口
"user" => "user", // 账户
'pass' => 123456, // 密码
"v_host" => "order", // 对应Virtual Hosts
],
"queue_name" => [
"name1" => "goods", // 队列名称
],
]
生产者代码
<?php
/**
* 生产者生产消息
*/
namespace console\controllers\simple;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class PublisherController extends Controller
{
public $enableCsrfValidation=false;
public function actionIndex()
{
//rabbitmq相关配置
$rabbitMqConfig = Yii::$app->params["rabbitMq"];
$config = $rabbitMqConfig["base"];
$queue_name = $rabbitMqConfig["queue_name"]["name1"]; // 队列名称
// 创建连接
$connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
// 创建channel
$channel = $connection->channel();
// 初始化队列,并持久化
$channel->queue_declare($queue_name, false, true, false, false);
//消息
$data = "this is message2";
// 声明消息,并持久化
$mes = new AMQPMessage($data, ["delivery_mode" => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
// 把消息推到队列里
$channel->basic_publish($mes, '', $queue_name);
//关闭通道和连接
$channel->close();
$connection->close();
}
}
消费者代码
<?php
/**
* 消费者消费消息
*/
namespace console\controllers\simple;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
class ConsumerController extends Controller
{
public $enableCsrfValidation=false;
public function actionIndex()
{
//rabbitmq相关配置
$rabbitMqConfig = Yii::$app->params["rabbitMq"];
$config = $rabbitMqConfig["base"];
$queue_name = $rabbitMqConfig["queue_name"]["name1"]; // 队列名称
// 创建连接
$connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
// 创建channel
$channel = $connection->channel();
// 初始化队列,并持久化
$channel->queue_declare($queue_name, false, true, false, false);
// 消费消息
$callback = function ($msg) {
echo "reviced: " . $msg->body . "\n";
};
$channel->basic_consume($queue_name, "", false, true, false, false,$callback);
// 监控
while ($channel->is_open()){
$channel->wait();
}
//关闭通道和连接
$channel->close();
$connection->close();
}
}
工作队列模式(worker queue)
创建一个 工作队列(Work Queue),它会发送一些 耗时的任务给 多个工作者(Worker),工作队列(又称: 任务队列——Task Queues)是为了 避免等待一些占用大量资源、时间的操作,当把任务(Task)当作消息发送到队列中,一个运行在后台的工作者(worker)进程就会取出任务然后处理,当运行多个工作者(workers),任务就会在它们之间 共享。这个概念在网络应用中是非常有用的,它可以在短暂的HTTP请求中处理一些复杂的任务,使用工作队列的一个 好处就是它能够 并行的处理队列。如果堆积了很多任务,只需要添加 更多的工作者(workers)就可以了,这就是所谓的 循环调度,扩展很简单
(1).具体代码展示
rabbitmq配置
"rabbitMq" => [
"base" => [
'host' => '192.168.0.5', // host地址
'port' => 5672, // 端口
"user" => "user", // 账户
'pass' => 123456, // 密码
"v_host" => "order", // 对应Virtual Hosts
],
"queue_name" => [
"name1" => "goods", // 队列名称
"name2" => "task_queue", // 队列名称
],
]
生产者代码
<?php
/**
* 生产者生产消息
*/
namespace console\controllers\worker;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class PublisherController extends Controller
{
public $enableCsrfValidation=false;
public function actionIndex()
{
//rabbitmq相关配置
$rabbitMqConfig = Yii::$app->params["rabbitMq"];
$config = $rabbitMqConfig["base"];
$queue_name = $rabbitMqConfig["queue_name"]["name2"]; // 队列名称
// 创建连接
$connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
// 创建channel
$channel = $connection->channel();
// 初始化队列,并持久化
$channel->queue_declare($queue_name, false, true, false, false);
// 生产多条消息
for ($i = 0; $i <= 10; ++$i) {
//消息
$data = "this is " . $i. " message";
// 声明消息,并持久化
$mes = new AMQPMessage($data, ["delivery_mode" => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
// 把消息推到队列里
$channel->basic_publish($mes, '', $queue_name);
}
//关闭通道和连接
$channel->close();
$connection->close();
}
}
消费者代码
当生产者生产了多条费时的消息时,一个消费者不能满足需要,可以添加多个消费者处理生产者的消息,多个消费者之间采用 轮询的方式获取队列的消息,并把该消息发送给对应的用户
比如:可以对一个队列的消息开多个消费者,这里我们开了两个消费者,里面的代码都是一致的
<?php
/**
* 消费者消费消息
*/
namespace console\controllers\worker;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
class ConsumerController extends Controller
{
public $enableCsrfValidation=false;
public function actionIndex()
{
//rabbitmq相关配置
$rabbitMqConfig = Yii::$app->params["rabbitMq"];
$config = $rabbitMqConfig["base"];
$queue_name = $rabbitMqConfig["queue_name"]["name2"]; // 队列名称
// 创建连接
$connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
// 创建channel
$channel = $connection->channel();
// 初始化队列,并持久化
$channel->queue_declare($queue_name, false, true, false, false);
// 消费消息
$callback = function ($msg) {
echo "reviced: " . $msg->body . "\n";
};
$channel->basic_consume($queue_name, "", false, true, false, false,$callback);
// 监控
while ($channel->is_open()){
$channel->wait();
}
//关闭通道和连接
$channel->close();
$connection->close();
}
}
<?php
/**
* 消费者2消费消息
*/
namespace console\controllers\worker;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
class Consumer2Controller extends Controller
{
public $enableCsrfValidation=false;
public function actionIndex()
{
//rabbitmq相关配置
$rabbitMqConfig = Yii::$app->params["rabbitMq"];
$config = $rabbitMqConfig["base"];
$queue_name = $rabbitMqConfig["queue_name"]["name2"]; // 队列名称
// 创建连接
$connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
// 创建channel
$channel = $connection->channel();
// 初始化队列,并持久化
$channel->queue_declare($queue_name, false, true, false, false);
// 消费消息
$callback = function ($msg) {
echo "reviced: " . $msg->body . "\n";
};
$channel->basic_consume($queue_name, "", false, true, false, false,$callback);
// 监控
while ($channel->is_open()){
$channel->wait();
}
//关闭通道和连接
$channel->close();
$connection->close();
}
}
ACK消费确认机制&NACK&重回队列机制
ACK消费确认机制
当处理一个 比较耗时得任务的时候,想知道消费者(consumers) 运行到一半就挂掉时, 正在处理的消息/发送给当前工作者的消息会怎样,当消息在队列中 没有进行持久化操作时,消息被RabbitMQ发送给消费者(consumers)之后,马上就会在内存中 移除。这种情况,只要把一个 工作者(worker)停止,正在处理的消息就会 丢失。同时,所有发送到这个工作者的还没有处理的消息 都会丢失。所以,如果不想消息丢失,当一个工作者(worker)挂掉了,希望任务会重新发送给其他的工作者(worker),RabbitMQ就提供了 消息响应( acknowledgments)。消费者会通过一个 ack(响应),告诉RabbitMQ已经收到并处理了某条消息,然后RabbitMQ就会 释放并删除这条消息。如果消费者(consumer) 挂掉了, 没有发送响应,RabbitMQ就会认为消息 没有被完全处理,然后 重新发送给 其他消费者(consumer)。这样,即使工作者(workers)偶尔的挂掉,也不会丢失消息。消息是没有超时这个概念的,当工作者与它断开连的时候,RabbitMQ会重新发送消息,这样在处理一个耗时非常长的消息任务的时候就不会出问题了。在该讲解中,将使用手动消息确认,通过为 no_ack参数传递 false,一旦有任务完成,使用d.ack()(false)向RabbitMQ服务器发送消费完成的确认(这个
确认消息是单次传递的)
//核心代码
basic_consume($queue = '', $consumer_tag = '', $no_local = false, $no_ack = false,$exclusive = false,$nowait = false,$callback = null,$ticket = null,$arguments = array())
// 消费消息
$callback = function ($msg) {
echo "reviced: " . $msg->body . "\n";
// 消费ack
$msg->ack();
};
// 第四个参数: 需要ack确认,这里我们在callback手动确认
$channel->basic_consume($queue_name, "", false, false, false, false,$callback);
rabbitmq配置
"rabbitMq" => [
"base" => [
'host' => '192.168.0.5', // host地址
'port' => 5672, // 端口
"user" => "user", // 账户
'pass' => 123456, // 密码
"v_host" => "order", // 对应Virtual Hosts
],
"queue_name" => [
"name1" => "goods", // 队列名称
"name2" => "task_queue", // 队列名称
"name3" => "task_ack", // 队列名称
],
]
生产者代码
<?php
/**
* 生产者生产消息
*/
namespace console\controllers\ack;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class PublisherController extends Controller
{
public $enableCsrfValidation=false;
public function actionIndex()
{
//rabbitmq相关配置
$rabbitMqConfig = Yii::$app->params["rabbitMq"];
$config = $rabbitMqConfig["base"];
$queue_name = $rabbitMqConfig["queue_name"]["name3"]; // 队列名称
// 创建连接
$connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
// 创建channel
$channel = $connection->channel();
// 初始化队列,并持久化
$channel->queue_declare($queue_name, false, true, false, false);
// 生产多条消息
for ($i = 0; $i <= 10; ++$i) {
//消息
$data = "this is " . $i. " message";
// 声明消息,并持久化
$mes = new AMQPMessage($data, ["delivery_mode" => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
// 把消息推到队列里
$channel->basic_publish($mes, '', $queue_name);
}
//关闭通道和连接
$channel->close();
$connection->close();
}
}
消费者代码
<?php
/**
* 消费者消费消息
*/
namespace console\controllers\ack;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
class ConsumerController extends Controller
{
public $enableCsrfValidation=false;
public function actionIndex()
{
//rabbitmq相关配置
$rabbitMqConfig = Yii::$app->params["rabbitMq"];
$config = $rabbitMqConfig["base"];
$queue_name = $rabbitMqConfig["queue_name"]["name3"]; // 队列名称
// 创建连接
$connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
// 创建channel
$channel = $connection->channel();
// 初始化队列,并持久化
$channel->queue_declare($queue_name, false, true, false, false);
// 消费消息
$callback = function ($msg) {
echo "reviced: " . $msg->body . "\n";
// 消费ack
$msg->ack();
};
// 第二个参数:同一时刻服务器只会发送1条消息给消费者
$channel->basic_qos(null, 1, null);
// 第四个参数: 需要ack确认,这里我们在callback手动确认
$channel->basic_consume($queue_name, "", false, false, false, false,$callback);
// 监控
while ($channel->is_open()){
$channel->wait();
}
//关闭通道和连接
$channel->close();
$connection->close();
}
}
NACK&重回队列机制
当设置了方法 basic_consume中 $no_ack = false 时,使用手工 ACK 方式,除了ACK外,其实还有 NACK 方式,当手工 AcK 时,会发送给Broker( 服务器)一个应答,代表消息处理成功,Broker就可回送响应给生产端 .
NACK 则表示消息处理失败,如果设设置了重回队列, Broker 端就会将没有成功处理的消息重新发送
通俗来讲:
手工ACK:消费成功了,向发起者确认
NACK:消费失败,让生产者重新发
一般在实际应用中,都会关闭重回队列,也就是设置为false
使用方式
消费端消费时.如果由于业务异常,可以手工 NACK 记录日志,然后进行补偿
API :basic_nack($delivery_tag, $multiple = false, $requeue = false)
如果由于服务器宕机等严里问题,就需要手工 ACK 保障消费端消费成功
API :basic_ack($delivery_tag, $multiple = false)
4.消息持久化
如果没有特意告诉RabbitMQ,那么在它退出或者崩溃的时候,将会丢失所有队列和消息,为了确保消息不会丢失,有两个事情是需要注意的:必须把“队列”和“消息”设为持久化首先,为了不让队列消失,需要把队列声明为持久化(durable),但这里面会有一定的问题,它会返回一个错误,可以使用一个快捷的解决方法——用不同名字的队列,例如task_queue
代码如上面生产者/消费者所示:
// 初始化队列,并持久化
$channel->queue_declare($queue_name, false, true, false, false);
消息持久化配置: " delivery_mode" => AMQPMessage::DELIVERY_MODE_PERSISTENT
5.公平调度(限流机制)
为什么要限流
当工作者处理消息时,会出现这么一个问题:比如有两个工作者(workers),处理奇数消息的比较繁忙,处理偶数消息的比较轻松,然而RabbitMQ并不知道这些,它仍然一如既往的派发消息,这时因为RabbitMQ只管分发进入队列的消息,不会关心有多少消费者(consumer)没有作出响应,它盲目的把第n-th条消息发给第n-th个消费者
假设还有这样的场景:RabbitMQ服务器有上万条未处理的消息,随便打开一个消费者Client ,会造成巨量的消息瞬间全部推送过来,然而单个客户端无法同时处理这么多数据,此时很有可能导致服务器崩溃,严重的可能导致线上的故障
还有一些其他的场景:比如说单个 生产端一分钟产生了几百条数据,但是单个消费端 一分钟可能只能处理 60 条,这个时候生产端-消费端肯定是不平衡的,通常生产端是没办法做限制的,所以消费端肯定需要做一些限流措施,否则如果超出最大负载,可能导致 消费端 性能下降,服务器卡顿甚至崩溃等一系列严重后果
RabbitMQ 提供了一种 qos (服务质质量保证)功能,即在 非自动确认消息的前提下,如果一定数目的消息 ( 通过基于 生产者或者 channel设置 Qos 的值) 未被确认前,不消费新的消息
不能设置自动签收功能( auto_ack = false ),如果消息未被确认,就不会到达 消费端 ,目的就是给 生产端 减压
这是可以设置预取计数值为1,告诉RabbitMQ一次只向一个worker发送一条消息,换句话说,在处理并确认前一个消息之前,不要向工作人员发送新消息
如上面ACK消费确认机制中消费者代码:
// 第二个参数:同一时刻服务器只会发送1条消息给消费者
//basic_qos($prefetch_size, $prefetch_count, $a_global)
$channel->basic_qos(null, 1, null);
参数说明:
$prefetch_size:单条消息的大小限制, 通常设置为 0 ,表示不做限制
$prefetch_count:一次最多能处理多少条消息
$a_global:是否将上面设置: true 应用于 channel 级别, false 代表消费者级别
$prefetch_size,$a_global这两项, RabbitMQ 没有实现,暂且不研究.$prefetch_count 在auto_ack = f alse 的情况下生效,即在自动应答的情况下该值无效
6.幂等性概念
一句话概括: 用户对于同一操作发起的一次请求或者多次请求的结果是一致的
比如:对一个SQL执行100次1000次,我们可以借鉴数据库的乐观锁机制:比如我们执行一条更新库存的SQL语句:update T_reps set count = count -1, version = version +1 where version = 1,数据库的乐观锁在执行更新操作前一先去数据库查询此version ,然后执行更新语句,以此version作为条件,如果执行更新时有其他人先更新了这张表的数据,那么这个条件就不生效了,也就不会执行操作了,通过这种 乐观锁的机制来保障幕等性
消费端 - 幂等性保障
在海量订单产生的业务高峰期,如何避免 消息的重复消费问题?
在业务高峰期:容易产生 消息重复消费问题,当消费端消费完消息时,在给生产者端返回ack时由于 网络中断,导致生产端 未收到确认信息,该条消息就会 重新发送并 被消费端消费,但实际上该消费端已成功消费了该条消息,这就造成了重复消费.而 消费端实现 幂等性,就意味着:消息不会被多次消费,即使收到了很多一样的消息
业界主流的幂等性操作解决方案:
(1)唯一Id + 指纹码 机制,核心:利用数据库主键去重
唯一Id: 业务表主键
指纹码: 为了区别每次正常操作的码,每次操作时生成指纹码;可以用时间截+业务编号或者标志位(具体视业务场景而定 )
select count(1) from t_order where id = 唯一Id + 指纹码
优势:实现简单
弊端:高并发下有数据库写入的性能瓶颈
解决方案:根据ID进行分库分表进行算法路由
(2)利用Redis的原子性去实现
第一:是否要进行数据落库,如果落库的话,关键解决的问题是数据库和缓存如何做到原子性?
第二:如果不进行落库,那么都存储到缓存中,如何设置定时同步的策略?
7.return机制
Return Listener用于处理一些不可路由的消息,也是生产阶段添加的一个监听
消息生产者通过指定一个Exchange和Routing Key,把消息送达到某一个队列中去,然后消费者监听队列,进行消费处理操作
但是在某些情况下,如果在发送消息的时候,当前的Exchange不存在或者指定的路由key路由不到,这个时候如果需要监听这种不可达的消息,就要使用Return Listener
在API中有一个关键的配置项 Mandatory:如果为true,则监听器会接收到路由不可达的消息,然后进行后续处理,如果为false,那么Broker(服务器)会自动删除该消息
8.消息的可靠性投递
(1).什么是生产端的可靠性投递?
保障消息的成功发出
保障MQ节点的成功接收
发送端收到MQ节点(Broker)确认应答
完善的消息进行补偿机制(在大厂一般都不会加事务,都是进行补偿操作)
在实际生产中,很难保障前三点的完全可靠,比如在 极端的环境中,生产者发送消息失败了,发送端在接受确认应答时突然发生网络闪断等等情况,很难保障可靠性投递,所以就需要有第四点完善的 消息补偿机制
(2).解决方案
方案一 消息信息落库,对消息状态进行打标(常见方案)
将消息持久化到 DB中并设置状态值,收到 Consumer 的应答就改变当前记录的状态
再轮询重新发送没接收到应答的消息,注意这里要设置重试次数
方案实现流程
比如下单成功
步骤1
对订单数据入ORDER_DB 订单库,并对因此生成的业务消息入 MSG_DB 消息库,此处由于采用了两个数据库,需要两次持久化操作,为了保证数据的一致性,有人可能就想采用分布式事务,但在大厂实践中,基本都是采用补偿机制
这里一定要保证步骤1中消息都存储成功了,没有出现任问异常情况,然后生产端再进行消息发送.如果失败了就进行快速失败机制
对业务数据和消息入库完毕就进入步骤2
步骤2
发送消息到MQ服务上,如果一切正常无误消费者监听到该消息,进入步骤3
步骤3
生产端有一个 confi rm Listener ,异步监听 Broker(服务端) 回送的响应,从而判断消息是否投递成功
步骤4
如果成功,去数据库查询该消息.并将消息状态更新为 1
步骤5
如果出现意外情况,消费者未接收到或者Listener 接收确认时发生网络闪断,导致生产端的Listener 就永远收不到这条消息的 confi rm应答了,也就是说这条消息的状态就一直为0 了,这时候就需要用到分布式定时任务来从 MSG_DB 数据库抓取那些超时了还未被消费的消息,重新发送一遍。此时需要设置一个规则,比如说消息在入库时候设置一个临界值 timeout , 5 分钟之后如果还是0的状态那就需要把消息抽取出来。这里使用的是分布式定时任务,去定时抓取 MSG_DB中距离消息创建时间超过 5 分钟的且状态为0 的消息
步骤6
把抓取出来的消息进行重新投递( Retry Send ) ,也就是从第二步开始继续往下走
步骤7
当然有些消息可能就是由于一些实际的问题无法路由到 Broker ,比如Routing Key设置不对,对应的队列被误删除了,那么这种消息即使重试多次也仍然无法投递成功,所以需要对重复次数做限制,比如限制 3 次,如果投递次数大于3次,那么就将消息状态更新为 2 ,表示这个消息最终投递失败,然后通过补偿机制,人工去处理,实际生产中.这种情况还是比较少的,但是不能没有这个补偿机制,要不然就做不到可靠性了
缺点
在第一步需要更新或者插入操作数据库2次
优化
不需要消息进行持久化 只需要业务持久化
方案二 消息的延迟投递,做二次确认,回调检查(不常用,大厂在用的高并发方案)
方案实现流程
步骤1
(上游服务: Upstream service )业务入库,然后send 消息到broker,这两步是有先后顺序的
步骤2
进行消息延迟发送到新的队列(延迟时间为 5 分钟:业务决定)
步骤3
(下游服务: Downstream service )监听到消息然后处理消息
步骤4
下游服务 send confirm生成新的消息到 broker (这里是一个新的队列 )
步骤5
callback service 去监听这个消息,并且入库,如果监听到,表示这个消息已经消费成功
步骤6
callback service 去检查 步骤2投递的延迟消息是否 在msgDB里面是否消费成功,不存在或者消费失败就会 Resend command
如果在第 1 , 2 , 4 步失败,如果成功broker 会给一个 confirm ,失败当然没有,这是消息可靠性投递的里要保障
9.注意
关于队列大小: 如果所有的工作者都处理繁忙状态,队列就会被填满,需要留意这个问题,要么添加更多的工作者(workers),要么使用其他策略
六.RabbitMQ几种常见的交换器模式
1.消息模型基本介绍
前面的教程中,讲的是 发送消息到队列并从中取出消息,现在介绍RabbitMQ中 完整的消息模型
简单的概括一下之前讲的:
发布者(producer)是发布消息的应用程序
队列(queue)用于消息存储的缓冲
消费者(consumer)是接收消息的应用程序
RabbitMQ消息模型的 核心理念是:发布者(producer) 不会直接发送任何消息给 队列,事实上,发布者(producer)甚至不知道消息是否已经被投递到队列。发布者(producer)只需要 把消息发送给一个交换机(exchange),交换机非常简单,它一边从发布者方 接收消息,一边把消息 推送到队列,交换机 必须知道如何处理它接收到的消息,是应该 路由到 指定的队列还是是 多个队列,或者是直接 忽略消息,这些规则是通过交换机类型(exchange type)来定义的
有几个可供选择的交换器类型:direct, topic, headers和fanout
direct(直连/定向交换器) |
消息与一个特定的路由键完全匹配 |
topic(主题交换器) |
使用通配符*,#,让路由键和某种模式进行匹配 |
headers(头交换器) |
不处理路由键,而是根据发送的消息内容中的 headers 属性进行匹配 |
fanout(扇型交换器) |
发布/ 订阅模式可以理解为广播模式:即exchange会将消息转发到所有绑定到这个exchange的队列上,这种类型在发送消息,queue bind时,都将忽略route key,也就是说不需要设置 route key |
Routing Key(路由键): 生产者将消息发送给交换器,一般都会指定一个Routing Key,用来指定这个消息的路由规则,而这个Routing Key需要与交换器类型和绑定键(Binding Key)联合使用才能生效
Binding(绑定):它是Exchange与Queue之间的虚拟连接,通俗的讲就是交换器和队列之间的联系(这个队列(Queue)对这个交换机(Exchange)的消息感兴趣),实现了根据不同的Routing Key(路由规则),交换机将消息路由(发送)到对应的Queue上
2.交换器核心方法
//试探性申请一个交换器,若该交换器存在,则跳过,不存在则创建
exchange_declare($exchange,$type,$passive = false,$durable = false,$auto_delete = true,$internal = false,$nowait = false,$arguments = array(),$ticket = null)
参数名 |
默认值 |
解释 |
$exchange |
交换器名称 |
|
$type |
交换器类型: ’’ 默认交换机 匿名交换器 未显示声明类型都是该类型 fanout 扇形交换器 会发送消息到它所知道的所有队列,每个消费者获取的消息都是一致的 headers 头部交换器 direct 直连交换器,该交换机将会对绑定键(binding key)和路由键(routing key)进行精确匹配 topic 主题交换器 该交换机会对路由键正则匹配,必须是*(一个单词)、#(多个单词,以.分割) ,eg:user.key .abc.* 类型的key |
|
$passive |
false |
只判断不创建(一般用于判断交换器是否存在) true: 1.如果exchange已存在则直接连接并且不检查配置比如已存在的exchange是fanout,新需要建立的是direct,也不会报错; 2.如果exchange不存在则直接报错 false: 1.如果exchange不存在则创建新的exchange 2.如果exchange已存在则判断配置是否相同,如果配置不相同则直接报错,比如已存在的exchange是fanout,新需要建立的是direct,会报错。 |
$durable |
false |
设置是否持久化,设置为true,表示持久化,反之非持久化,持久化可以将交换器存盘,在服务器重启的时候不会丢失相关信息 |
$auto_delete |
true |
设置是否自动删除,设置为true时,表示自动删除。自动删除的前提:至少有一个队列或者交换器与这个交换器绑定,之后所有与这个交换器绑定的队列或者交换器都与此解绑 |
$internal |
false |
设置是否为内置的,设置为true表示是内置的交换器,客户端程序无法直接发送消息到这个交换器,只能通过交换器路由到这个交换器 |
$nowait |
false |
如果为true,表示不等待服务器回执信息,函数将返回null,可以提高访问速度 |
$arguments |
array() |
其他结构化参数 |
$ticket |
null |
3.fanout模式(广播模式)
广播模式可以理解为:发布/订阅模式,即exchange会将消息转发到所有绑定到这个exchange的队列上。针对这种广播模式,在发送消息,queue bind时,都将忽略route key,也就是说不需要设置 route key
案例一
一个生产者生产消息并发布消息到交换器上,多个消费者订阅该交换器,并与之队列绑定,消费消息
rabbitmq配置
"rabbitMq" => [
"base" => [
'host' => '192.168.0.5', // host地址
'port' => 5672, // 端口
"user" => "user", // 账户
'pass' => 123456, // 密码
"v_host" => "order", // 对应Virtual Hosts
],
"exchange_name" => [
"name1" => "exch", // 交换器名称
],
]
生产者
<?php
/**
* 交换器fanout(广播)模式: 生产者生产消息
*/
namespace console\controllers\exchange\fanout;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class PublisherController extends Controller
{
public $enableCsrfValidation=false;
public function actionIndex()
{
//rabbitmq相关配置
$rabbitMqConfig = Yii::$app->params["rabbitMq"];
$config = $rabbitMqConfig["base"];
$exchangeName = $rabbitMqConfig["exchange_name"]["name1"];
// 创建连接
$connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
// 创建channel
$channel = $connection->channel();
// 声明并初始化交换器
$channel->exchange_declare($exchangeName, AMQPExchangeType::FANOUT, false, false, false);
// 声明一个数据
$data = "this is a exchange message";
// 初始化消息并持久化
$msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]);
// 发布消息到交换器
$channel->basic_publish($msg, $exchangeName);
//关闭通道和连接
$channel->close();
$connection->close();
}
}
消费者(多个)
消费者1
<?php
/**
* 交换器fanout(广播)模式: 消费者消费消息
*/
namespace console\controllers\exchange\fanout;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
class ConsumerController extends Controller
{
public $enableCsrfValidation = false;
public function actionIndex()
{
//rabbitmq相关配置
$rabbitMqConfig = Yii::$app->params["rabbitMq"];
$config = $rabbitMqConfig["base"];
// $queueName = $rabbitMqConfig["queue_name"]["name4"];
$exchangeName = $rabbitMqConfig["exchange_name"]["name1"];
// 创建连接
$connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
// 创建channel
$channel = $connection->channel();
// 声明对应的交换器
$channel->exchange_declare($exchangeName, AMQPExchangeType::FANOUT, false, false, false);
// 声明队列
list($queueName) = $channel->queue_declare('', false, false, true, false);
// 交换机与队列绑定
$channel->queue_bind($queueName, $exchangeName);
// 消息回调处理
$callback = function ($meg) {
echo "revince: " . $meg->body. "\n";
$meg->ack();
};
// 设置消费者处理消息限制,第二个参数:同一时刻服务器只会发送1条消息给消费者消费
$channel->basic_qos(null, 1, null);
// 消费者消费消息: 第四个参数: 需要ack确认
$channel->basic_consume($queueName, '', false, false, false, false, $callback);
// 监控
while ($channel->is_open()) {
$channel->wait();
}
//关闭通道和连接
$channel->close();
$connection->close();
}
}
消费者2
<?php
/**
* 交换器fanout(广播)模式: 消费者消费消息
*/
namespace console\controllers\exchange\fanout;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
class Consumer2Controller extends Controller
{
public $enableCsrfValidation = false;
public function actionIndex()
{
//rabbitmq相关配置
$rabbitMqConfig = Yii::$app->params["rabbitMq"];
$config = $rabbitMqConfig["base"];
// $queueName = $rabbitMqConfig["queue_name"]["name4"];
$exchangeName = $rabbitMqConfig["exchange_name"]["name1"];
// 创建连接
$connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
// 创建channel
$channel = $connection->channel();
// 声明对应的交换器
$channel->exchange_declare($exchangeName, AMQPExchangeType::FANOUT, false, false, false);
// 声明队列
list($queueName) = $channel->queue_declare('', false, false, true, false);
// 交换机与队列绑定
$channel->queue_bind($queueName, $exchangeName);
// 消息回调处理
$callback = function ($meg) {
echo "revince: " . $meg->body. "\n";
$meg->ack();
};
// 设置消费者处理消息限制,第二个参数:同一时刻服务器只会发送1条消息给消费者消费
$channel->basic_qos(null, 1, null);
// 消费者消费消息: 第四个参数: 需要ack确认
$channel->basic_consume($queueName, '', false, false, false, false, $callback);
// 监控
while ($channel->is_open()) {
$channel->wait();
}
//关闭通道和连接
$channel->close();
$connection->close();
}
}
案例二
举个实际应用的场景:比方说用户注册(注销,更改姓名等)新浪,同时需要开通微博、博客、邮箱等,如果不采用队列,按照常规的线性处理,可能注册用户会特别的慢,因为在注册的时候,需要调各种其他服务器接口,如果服务很多的话,可能客户端就超时了。如果采用普通的队列,可能在处理上也会特别的慢(不是最佳方案),如果采用订阅模式,则是最优的选择
处理过程如下:
用户提交username、pwd…等之类的基本信息,将数据提交register.php中
register.php对数据进行校验,符合注册要求,生成uid,并将和基本信息json后,发布一条消息到对应的交换器中,同时直接显示用户注册成功
exchange中的多个队列,如(queue.process、queue.boke、queue.weibo、queue.email)都订阅了这个消息,根据各业务自身的逻辑来处理
总结:
1.不申明队列,因为发布/订阅模式下,是可以随时添加新的订阅队列
2.exchange的Type指定为fanout(广播模式)
3.队列不需要指定route key,绑定exchange
代码如下:
生产者
<?php
/**
* 交换器fanout(广播)模式: 生产者生产消息
*/
namespace console\controllers\exchange\fanout;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class PublisherController extends Controller
{
public function actionIndex()
{
/*
用户注册逻辑
*/
//发送消息逻辑
//rabbitmq相关配置
$rabbitMqConfig = Yii::$app->params["rabbitMq"];
$config = $rabbitMqConfig["base"];
$exchangeName = "register";
// 创建连接
$connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
// 创建channel
$channel = $connection->channel();
// 声明并初始化交换器
$channel->exchange_declare($exchangeName, AMQPExchangeType::FANOUT, false, false, false);
// 声明一个数据
$data = "{uid:xxx,reg_time:xxx}";
// 初始化消息并持久化
$msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]);
// 发布消息到交换器
$channel->basic_publish($msg, $exchangeName);
//关闭通道和连接
$channel->close();
$connection->close();
}
}
消费者(多个)
可以创建多个不同类型的消费者(开通微博、博客、邮箱)等逻辑功能的消费者
<?php
/**
* 交换器fanout(广播)模式: 消费者消费消息
*/
namespace console\controllers\exchange\fanout;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
class ConsumerController extends Controller
{
public $enableCsrfValidation = false;
public function actionIndex()
{
//rabbitmq相关配置
$rabbitMqConfig = Yii::$app->params["rabbitMq"];
$config = $rabbitMqConfig["base"];
$exchangeName = "register";
// 创建连接
$connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
// 创建channel
$channel = $connection->channel();
// 声明对应的交换器
$channel->exchange_declare($exchangeName, AMQPExchangeType::FANOUT, false, false, false);
// 声明一个匿名队列
list($queueName) = $channel->queue_declare('', false, false, true, false);
// 交换机与队列绑定
$channel->queue_bind($queueName, $exchangeName);
// 消息回调处理
$callback = function ($meg) {
//处理逻辑
echo "revince: " . $meg->body. "\n";
$meg->ack();
};
// 设置消费者处理消息限制,第二个参数:同一时刻服务器只会发送1条消息给消费者消费
$channel->basic_qos(null, 1, null);
// 消费者消费消息: 第四个参数: 需要ack确认
$channel->basic_consume($queueName, '', false, false, false, false, $callback);
// 监控
while ($channel->is_open()) {
$channel->wait();
}
//关闭通道和连接
$channel->close();
$connection->close();
}
}
4.Direct模式
Direct交换器将消息投递到路由参数 完全匹配的队列中
直接上代码
rabbitmq配置
"rabbitMq" => [
"base" => [
'host' => '192.168.0.5', // host地址
'port' => 5672, // 端口
"user" => "user", // 账户
'pass' => 123456, // 密码
"v_host" => "order", // 对应Virtual Hosts
],
"queue_name" => [
"name1" => "goods", // 队列名称
"name2" => "task_queue", // 队列名称
"name3" => "task_ack", // 队列名称
"name4" => "exchange_fanout_1", // 队列名称
],
"exchange_name" => [
"name1" => "exch", // 交换器名称
"name2" => "exch_direct_log", // 交换器名称
],
"routing_key" => [
"info_key" => "info", // 路由键
"error_key" => "error", // 路由键
"warn_key" => "warn", // 路由键
],
】
生产者
<?php
/**
* 交换器direct(routing_key-更详细的bind模式)模式: 生产者生产消息
*/
namespace console\controllers\exchange\direct;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class PublisherController extends Controller
{
public $enableCsrfValidation=false;
public function actionIndex()
{
//rabbitmq相关配置
$rabbitMqConfig = Yii::$app->params["rabbitMq"];
$config = $rabbitMqConfig["base"];
$exchangeName = $rabbitMqConfig["exchange_name"]["name2"];
$routingKey = $rabbitMqConfig["routing_key"]["error_key"];
// 创建连接
$connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
// 创建channel
$channel = $connection->channel();
// 声明并初始化交换器
$channel->exchange_declare($exchangeName, AMQPExchangeType::DIRECT, false, false, false);
// 声明一个数据
$data = "this is a ". $routingKey . " message";
// 初始化消息并持久化
$msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]);
// 发布消息到交换器, 并和路由键匹配
$channel->basic_publish($msg, $exchangeName, $routingKey);
//关闭通道和连接
$channel->close();
$connection->close();
}
}
消费者
<?php
/**
* 交换器direct(routing_key-更详细的bind模式)模式: 消费者消费消息
*/
namespace console\controllers\exchange\direct;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
class ConsumerWarnController extends Controller
{
public function actionIndex()
{
//rabbitmq相关配置
$rabbitMqConfig = Yii::$app->params["rabbitMq"];
$config = $rabbitMqConfig["base"];
$exchangeName = $rabbitMqConfig["exchange_name"]["name2"];
$routingKey = $rabbitMqConfig["routing_key"]["warn_key"]; //路由键可以修改为其他key,与生产者bind的关联
// 创建连接
$connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
// 创建channel
$channel = $connection->channel();
// 声明对应的交换器
$channel->exchange_declare($exchangeName, AMQPExchangeType::DIRECT, false, false, false);
// 声明一个匿名队列
list($queueName) = $channel->queue_declare('', false, false, true, false);
// 交换机与队列绑定,并指定routing_key
$channel->queue_bind($queueName, $exchangeName, $routingKey);
// 消息回调处理
$callback = function ($meg) {
echo "revince: " . $meg->body. "\n";
$meg->ack();
};
// 设置消费者处理消息限制,第二个参数:同一时刻服务器只会发送1条消息给消费者消费
$channel->basic_qos(null, 1, null);
// 消费者消费消息: 第四个参数: 需要ack确认
$channel->basic_consume($queueName, '', false, false, false, false, $callback);
// 监控
while ($channel->is_open()) {
$channel->wait();
}
//关闭通道和连接
$channel->close();
$connection->close();
}
}
5.topic模式
发送到topic交换器的消息不可以携带随意routing_key,它的routing_key必须是一个由 .分隔开的 词语列表,这些单词随便是什么都可以,但是最好是跟携带它们的消息有关系的词汇,以下是几个推荐的例子:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit",词语的个数可以随意,但是 不要超过255字节。binding key也必须拥有同样的格式,topic交换器背后的逻辑跟direct交换机很相似 : 一个携带着特定routing_key的消息会被topic交换机投递给绑定键与之想匹配的队列,但是它的binding key和routing_key有两个特殊应用方式:
* (星号) 用来表示一个单词
# (井号) 用来表示任意数量(零个或多个)单词
下边用图说明:
这个例子里,发送的所有消息都是用来描述小动物的,发送的消息所携带的路由键是由三个单词所组成的,这三个单词被两个.分割开,路由键里的第一个单词
描述的是动物的手脚的利索程度,第二个单词是动物的颜色,第三个是动物的种类,所以它看起来是这样的: <celerity>.<colour>.<species>。
创建了三个绑定:Q1的绑定键为 *.orange.*,Q2的绑定键为 *.*.rabbit 和 lazy.# 。
Q1-->绑定的是
中间带 orange 带 3 个单词的字符串 (*.orange.*)
Q2-->绑定的是
最后一个单词是 rabbit 的 3 个单词 (*.*.rabbit)
第一个单词是 lazy 的多个单词 (lazy.#)
这三个绑定键被可以总结为:
Q1 对所有的桔黄色动物都感兴趣
Q2 则是对所有的兔子和所有懒惰的动物感兴趣
一个携带有 quick.orange.rabbit 的消息将会被分别投递给这两个队列,携带着 lazy.orange.elephant 的消息同样也会给两个队列都投递过去。另一方面携带有 quick.orange.fox 的消息会投递给第一个队列,携带有 lazy.brown.fox 的消息会投递给第二个队列。携带有 lazy.pink.rabbit 的消息只会被投递给第二个队列一次,即使它同时匹配第二个队列的两个绑定。携带着 quick.brown.fox 的消息不会投递给任何一个队列。
注意:
如果违反约定,发送了一个携带有一个单词或者四个单词("orange" or "quick.orange.male.rabbit")的消息时,发送的消息不会投递给任何一个队列,而且会丢 失掉
但是另一方面,即使 "lazy.orange.male.rabbit" 有四个单词,他还是会匹配最后一个绑定,并且被投递到第二个队列中。
topic交换机是很强大的,它可以表现出跟其他交换机类似的行为 当一个队列的binding key为 "#"(井号) 的时候,这个队列将会无视消息的routing key,接收所有的消息。 当 * (星号) 和 # (井号) 这两个特殊字符都未在binding key中出现的时候,此时Topic交换机就拥有的direct交换机的行为
代码如下:
rabbitmq配置
"rabbitMq" => [
"base" => [
'host' => '192.168.0.5', // host地址
'port' => 5672, // 端口
"user" => "user", // 账户
'pass' => 123456, // 密码
"v_host" => "order", // 对应Virtual Hosts
],
"queue_name" => [
"name1" => "goods", // 队列名称
"name2" => "task_queue", // 队列名称
"name3" => "task_ack", // 队列名称
"name4" => "exchange_fanout_1", // 队列名称
],
"exchange_name" => [
"name1" => "exch", // 交换器名称
"name2" => "exch_direct_log", // 交换器名称
"name3" => "exch_topic_log", // 交换器名称
],
"routing_key" => [
"info_key" => "info", // 路由键
"error_key" => "error", // 路由键
"warn_key" => "warn", // 路由键
"all_key" => "#", // 所有的路由键
"user_info" => "user.info", // 路由键
"user_warn" => "user.warn", // 路由键
"user_all" => "user.*", // 匹配以user.开头的路由键
],
]
】
生产者
<?php
/**
* 交换器topic(通配符)模式: 生产者生产消息
*/
namespace console\controllers\exchange\topic;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class PublisherController extends Controller
{
public function actionIndex()
{
//rabbitmq相关配置
$rabbitMqConfig = Yii::$app->params["rabbitMq"];
$config = $rabbitMqConfig["base"];
$exchangeName = $rabbitMqConfig["exchange_name"]["name3"];
$routingKey = $rabbitMqConfig["routing_key"]["user_info"];
// 创建连接
$connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
// 创建channel
$channel = $connection->channel();
// 声明并初始化交换器
$channel->exchange_declare($exchangeName, AMQPExchangeType::TOPIC, false, false, false);
// 声明一个数据
$data = "this is a ". $routingKey . " message";
// 初始化消息并持久化
$msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]);
// 发布消息到交换器, 并和路由键匹配
$channel->basic_publish($msg, $exchangeName, $routingKey);
//关闭通道和连接
$channel->close();
$connection->close();
}
}
消费者
<?php
/**
* 交换器topic(通配符)模式: 消费者消费消息
*
*/
namespace console\controllers\exchange\topic;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
class ConsumerController extends Controller
{
public function actionIndex()
{
//rabbitmq相关配置
$rabbitMqConfig = Yii::$app->params["rabbitMq"];
$config = $rabbitMqConfig["base"];
$exchangeName = $rabbitMqConfig["exchange_name"]["name3"];
$routingKey = $rabbitMqConfig["routing_key"]["user_all"];
// 创建连接
$connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
// 创建channel
$channel = $connection->channel();
// 声明对应的交换器
$channel->exchange_declare($exchangeName, AMQPExchangeType::TOPIC, false, false, false);
// 声明队列
list($queueName) = $channel->queue_declare('', false, false, true, false);
// 交换机与队列绑定,并指定routing_key
$channel->queue_bind($queueName, $exchangeName, $routingKey);
// 消息回调处理
$callback = function ($meg) {
echo "revince: " . $meg->body. "\n";
$meg->ack();
};
// 设置消费者处理消息限制,第二个参数:同一时刻服务器只会发送1条消息给消费者消费
$channel->basic_qos(null, 1, null);
// 消费者消费消息: 第四个参数: 需要ack确认
$channel->basic_consume($queueName, '', false, false, false, false, $callback);
// 监控
while ($channel->is_open()) {
$channel->wait();
}
//关闭通道和连接
$channel->close();
$connection->close();
}
}
七.死信队列,延时队列
1.死信队列
死信( Dead Letter )是RabbitMQ 中的一种 消息机制,当在消费消息时,如果队列里的消息出现以下情况:
消息被拒绝
消息在队列的存活时间超过设置的 TTL 时间
TTL (Time To Live),即生存时间
RabbitMQ支持消息的过期时间,在消息发送时可以进行指定
RabbitMQ 支持为每个队列设号消息的超时时间,从消息入队列开始计算,只要超过了队列的超时时间配置,那么消息会被自动清除
消息队列的消息数量已经超过最大队列长度
那么该消息将成为“死信”,“死信”消息会被 RabbitMQ进行特殊处理,如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃
RabbitMQ 中有一种交换器叫 DLX,全称为 Dead 一 Letter 一 Exchange ,可以称之为 死信交换器,当消息在一个队列中变成死信( dead message )消息之后,它会被重新发送到另外一个交换器中,这个交换器就是DLX , 绑定在 DLX上的队列就称之为死信队列, 程序就可以监听这个队列中的消息,并做相应的处理,该特性 可以弥补RabbitMQ3.0以前支持的immediate参数的功能
消息变成死信有以下几种情况:
消息被拒绝消息
TTL 过期(延迟队列)
队列达到最大长度
2.延时队列
延时队列就是用来存放需要在 指定时间被处理的元素的队列.
一般可以利用 死信队列的特性来 实现延迟队列:只要给消息设置一个过期时间,消息过期就会自动进入死信队列,消费者只要监听死信队列就可以实现延迟队列了
应用场景
订单在十分钟之内未支付则自动取消
账单在一周内未支付,则自动结算
某个时间下发一条通知
用户注册成功后,如果三天内没有登陆则进行短信提醒
用户发起退款,如果三天内没有得到处理则通知相关运营人员
下面通过一个案例来更进一步了解死信队列,延时队列
案例1
订单在一段时间内未支付则自动取消,步骤:
(1).创建订单操作
(2).订单创建成功后,订单相关数据json处理
(3).构建rabbitmq消息队列,并设置消息过期时间为60秒,把订单相关json数据发布到交换器, 并和路由键匹配,生产者生产消息60秒之后,消息会进入到死信队列,消费者监听死信队列,处理订单
rabbitmq配置
"rabbitMq" => [
"base" => [
'host' => '192.168.0.5', // host地址
'port' => 5672, // 端口
"user" => "user", // 账户
'pass' => 123456, // 密码
"v_host" => "order", // 对应Virtual Hosts
],
"queue_name" => [
"name1" => "goods", // 队列名称
"name2" => "task_queue", // 队列名称
"name3" => "task_ack", // 队列名称
"name4" => "exchange_fanout_1", // 队列名称
"name5" => "queue_pay", // 订单支付队列
],
"exchange_name" => [
"name1" => "exch", // 交换器名称
"name2" => "exch_direct_log", // 交换器名称
"name3" => "exch_topic_log", // 交换器名称
"name4" => "exch_pay", // 订单支付, 交换器名称
],
"routing_key" => [
"info_key" => "info", // 路由键
"error_key" => "error", // 路由键
"warn_key" => "warn", // 路由键
"order_key" => "order_pay", // 订单支付
"all_key" => "#", // 所有的路由键
"user_info" => "user.info", // 路由键
"user_warn" => "user.warn", // 路由键
"user_all" => "user.*", // 匹配以user.开头的路由键
],
"dead_letter" => [ // 死信队列
"exchange_name" => [ // 死信队列交换机名称
"pay" => "dead_exch_pay"
],
"queue_name" => [ // 死信队列名称
"pay" => "dead_queue_pay"
],
"routing_key" => [ // 死信队列routing名称
"pay" => "dead_routing_key_pay"
]
]
]
生产者
<?php
/**
* 死信队列,延时队列: 生产者推送消息到队列,模拟订单支付
*/
namespace console\controllers\exchange\dead;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Wire\AMQPTable;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class PublisherController extends Controller
{
public $enableCsrfValidation = false;
public function actionIndex()
{
//rabbitmq相关配置
$rabbitMqConfig = Yii::$app->params["rabbitMq"];
$deadConfig = Yii::$app->params["rabbitMq"]["dead_letter"];
$config = $rabbitMqConfig["base"];
$exchangeName = $rabbitMqConfig["exchange_name"]["name4"];
$queueName = $rabbitMqConfig["queue_name"]["name5"];
$routingKey = $rabbitMqConfig["routing_key"]["order_key"];
// 创建连接
$connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
// 创建channel
$channel = $connection->channel();
// 声明并初始化交换器, 交换机类型: routing_key-更详细的bind模式
$channel->exchange_declare($exchangeName, AMQPExchangeType::DIRECT, false, false, false);
// 消息队列的额外参数
$args = new AMQPTable([
'x-message-ttl' => 2000, // 消息的过期时间
"x-dead-letter-exchange" => $deadConfig["exchange_name"]["pay"], // 死信队列交换机名称
"x-dead-letter-routing-key" => $deadConfig["routing_key"]["pay"] // 死信队列routing名称
]);
// 声明队列
$channel->queue_declare($queueName, false, true, false, false, false, $args);
// 交换机和队列绑定
$channel->queue_bind($queueName, $exchangeName, $routingKey);
// 声明死信交换机
$channel->exchange_declare($deadConfig["exchange_name"]["pay"], AMQPExchangeType::DIRECT, false, false, false);
// 声明死信队列
$channel->queue_declare($deadConfig["queue_name"]["pay"], false, true, false, false, false);
// 死信交换机和队列绑定
$channel->queue_bind($deadConfig["queue_name"]["pay"], $deadConfig["exchange_name"]["pay"], $deadConfig["routing_key"]["pay"]);
// 声明一个数据:里面可以是用户订单相关json数据
$data = "this is a dead message";
// 初始化消息并持久化
$msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]);
// 发布消息到交换器, 并和路由键匹配
$channel->basic_publish($msg, $exchangeName, $routingKey);
//关闭通道和连接
$channel->close();
$connection->close();
}
}
消费者
<?php
/**
* 死信队列,延时队列: 模拟订单支付,消费者消费消息
*
*/
namespace console\controllers\exchange\dead;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
class ConsumerController extends Controller
{
public function actionIndex()
{
//rabbitmq相关配置
$rabbitMqConfig = Yii::$app->params["rabbitMq"];
$config = $rabbitMqConfig["base"];
$exchangeName = $rabbitMqConfig["dead_letter"]["exchange_name"]["pay"];
$queueName = $rabbitMqConfig["dead_letter"]["queue_name"]["pay"];
$routingKey = $rabbitMqConfig["dead_letter"]["routing_key"]["pay"];
// 创建连接
$connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
// 创建channel
$channel = $connection->channel();
// 声明对应的交换器
$channel->exchange_declare($exchangeName, AMQPExchangeType::DIRECT, false, false, false);
// 交换机与队列绑定,并指定routing_key
$channel->queue_bind($queueName, $exchangeName, $routingKey);
// 消息回调处理
$callback = function ($meg) {
//处理订单相关数据
echo "revince: " . $meg->body. "\n";
$meg->ack();
};
// 设置消费者处理消息限制,第二个参数:同一时刻服务器只会发送1条消息给消费者消费
$channel->basic_qos(null, 1, null);
// 消费者消费消息: 第四个参数: 需要ack确认
$channel->basic_consume($queueName, '', false, false, false, false, $callback);
// 监控
while ($channel->is_open()) {
$channel->wait();
}
//关闭通道和连接
$channel->close();
$connection->close();
}
}
问题
通过上面的案例就可以实现死信队列,延时队列操作,上面看上去似乎没什么问题,实测一下就会发现 消息不会“如期死亡 ”。当先生产一个TTL为60s的消息,再生产一个TTL为5s的消息,第二个消息并不会再5s后过期进入死信队列,而是需要等到第一个消息TTL到期后,与第一个消息一同进入死信队列, 这是因为RabbitMQ 只会判断队列中的第一个消息是否过期
那么怎么来解决这个问题呢?
通过 RabbitMQ 的 rabbitmq_delayed_message_exchange 插件来解决
此插件的原理是将消息在交换机处暂存储在mnesia(一个分布式数据系统)表中,延迟投递到队列中,等到消息到期再投递到队列当中
rabbitmq_delayed_message_exchange插件安装
(1).下载地址:https://www.rabbitmq.com/community-plugins.html
注意: 要下载与rabbitmq相对应的版本
(2).把下载的插件放到指定位置
下载的文件为zip格式,将zip格式解压,插件格式为ez,将文件复制到插件目录:
Linux
/usr/lib/rabbitmq/lib/rabbitmq_server-xxx/plugins
rabbitmq-plugins list
Windows
C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.17\plugins
(3).启动插件
Linux
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
Windows文章来源:https://www.toymoban.com/news/detail-430819.html
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
启动信息:文章来源地址https://www.toymoban.com/news/detail-430819.html
(4).查看
进入:http://localhost:15672/#/exchanges
重构代码
生产者
生产者实现的关键点:
1.在声明交换机时不在是direct类型,而是x-delayed-message类型,这是由插件提供的类型
2.交换机要增加"x-delayed-type": "direct"参数设置
3.发布消息时,要在 Headers 中设置x-delay参数,来控制消息从交换机过期时间
<?php
/**
* 死信队列,延时队列插件使用: 模拟订单支付
*/
namespace console\controllers\exchange\delay;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Wire\AMQPTable;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class PublisherController extends Controller
{
public $enableCsrfValidation = false;
public function actionIndex()
{
//rabbitmq相关配置
$rabbitMqConfig = Yii::$app->params["rabbitMq"];
$delayConfig = Yii::$app->params["rabbitMq"]["delay"];
$config = $rabbitMqConfig["base"];
// 创建连接
$connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
// 创建channel
$channel = $connection->channel();
// 声明并初始化交换器, 交换机类型: 延时插件名称(x-delayed-message)
$channel->exchange_declare($delayConfig["exchange_name"]["pay"], "x-delayed-message", false, true, false);
// 消息队列的额外参数
$args = new AMQPTable(["x-delayed-type" => "direct"]);
// 声明队列
$channel->queue_declare($delayConfig["queue_name"]["pay"], false, true, false, false, false, $args);
// 交换机和队列绑定
$channel->queue_bind($delayConfig["queue_name"]["pay"], $delayConfig["exchange_name"]["pay"], $delayConfig["routing_key"]["pay"]);
// 声明一个数据
$data = "this is a dead message";
// 初始化消息并持久化
$arr = [
'delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT,
"application_headers" => new AMQPTable([
"x-delayed" => 2000 // 过期时间
])
];
$msg = new AMQPMessage($data, $arr);
// 发布消息到交换器, 并和路由键匹配
$channel->basic_publish($msg, $delayConfig["exchange_name"]["pay"], $delayConfig["routing_key"]["pay"]);
//关闭通道和连接
$channel->close();
$connection->close();
}
}
消费者
没有啥特别的修改
<?php
/**
* 死信队列,延时队列插件使用: 模拟订单支付
*
*/
namespace console\controllers\exchange\delay;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
class ConsumerController extends Controller
{
public $enableCsrfValidation = false;
public function actionIndex()
{
//rabbitmq相关配置
$rabbitMqConfig = Yii::$app->params["rabbitMq"];
$delayConfig = Yii::$app->params["rabbitMq"]["delay"];
$config = $rabbitMqConfig["base"];// 创建连接
$connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
// 创建channel
$channel = $connection->channel();
// 声明并初始化交换器, 交换机类型: 延时插件名称(x-delayed-message)
$channel->exchange_declare($delayConfig["exchange_name"]["pay"], "x-delayed-message", false, true, false);
// 交换机和队列绑定
$channel->queue_bind($delayConfig["queue_name"]["pay"], $delayConfig["exchange_name"]["pay"], $delayConfig["routing_key"]["pay"]);
// 消息回调处理
$callback = function ($meg) {
echo "revince: " . $meg->body. "\n";
$meg->ack();
};
// 设置消费者处理消息限制,第二个参数:同一时刻服务器只会发送1条消息给消费者消费
$channel->basic_qos(null, 1, null);
// 消费者消费消息: 第四个参数: 需要ack确认
$channel->basic_consume($delayConfig["queue_name"]["pay"], '', false, false, false, false, $callback);
// 监控
while ($channel->is_open()) {
$channel->wait();
}
//关闭通道和连接
$channel->close();
$connection->close();
}
}
到了这里,关于一文读懂RabbitMQ消息队列的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!