Yii2-Queue实现轻量级消息队列

这篇具有很好参考价值的文章主要介绍了Yii2-Queue实现轻量级消息队列。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

概述

Yii2-Queue是Yii2官方制作的一个消息队列,提供多个缺点:Syncronous, File, DB, Redis, RabbitMQ, AMQP Interop, Beanstalk, Gearman等,使用Yii2开发的时候使用该扩展比较合适.

驱动配置:

Syncronous

如果打开 handle 属性,则在使用过程中同步执行任务,开发和调试阶段使用.

return [
    'components' => [
        'queue' => [
            'class' => \yii\queue\sync\Queue::class,
            'handle' => false, // 任务是否立即执行
        ],
    ],
];

File

以文件的方式来存储消息队列

return [
    'bootstrap' => [
        'queue', // 把这个组件注册到控制台
    ],
    'components' => [
        'queue' => [
            'class' => \yii\queue\<driver>\Queue::class,
            'as log' => \yii\queue\LogBehavior::class,
            // 驱动的其他选项
        ],
    ],
];

DB

使用数据库来存储消息队列

return [
    'bootstrap' => [
        'queue', // 把这个组件注册到控制台
    ],
    'components' => [
        'db' => [
            'class' => \yii\db\Connection::class, 
            // ...
        ],
        'queue' => [
            'class' => \yii\queue\db\Queue::class,
            'db' => 'db', // DB 连接组件或它的配置
            'tableName' => "{{queue}}", // 表名
            'channel' => 'default', // Queue channel key
            'mutex' => \yii\mutex\MysqlMutex::class, // Mutex that used to sync queries
        ],
    ],
];
  • 手动创建数据表

首先在数据库中先添加数据表,假设表名是queue,以下是创建数据表的Sql语句.

CREATE TABLE `queue` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `channel` varchar(255) NOT NULL,
  `job` blob NOT NULL,
  `pushed_at` int(11) NOT NULL,
  `ttr` int(11) NOT NULL,
  `delay` int(11) NOT NULL DEFAULT 0,
  `priority` int(11) unsigned NOT NULL DEFAULT 1024,
  `reserved_at` int(11) DEFAULT NULL,
  `attempt` int(11) DEFAULT NULL,
  `done_at` int(11) DEFAULT NULL,
  PRIMARY KEY (`id`),
  KEY `channel` (`channel`),
  KEY `reserved_at` (`reserved_at`),
  KEY `priority` (`priority`)
) ENGINE=InnoDB
  • 使用数据迁移

也可以使用数据迁移来创建表,迁移文件存储目录是 path/to/extension/src/drivers/db/migrations.,使用迁移文件的话需要在配置中增加如下配置:

'controllerMap' => [
    // ...
    'migrate' => [
        'class' => 'yii\console\controllers\MigrateController',
        'migrationPath' => null,
        'migrationNamespaces' => [
            // ...
            'yii\queue\db\migrations',
        ],
    ],
],

然后使用数据迁移命令:

php yii migrate/up

Redis

使用Redis来存储消息队列,该驱动需要安装yiisoft/yii2-redis 来配合使用,配置如下:

return [
    'bootstrap' => [
        'queue', // 把这个组件注册到控制台
    ],
    'components' => [
        'redis' => [
            'class' => \yii\redis\Connection::class,
            // ...
        ],
        'queue' => [
            'class' => \yii\queue\redis\Queue::class,
            'redis' => 'redis', // 连接组件或它的配置
            'channel' => 'queue', // Queue channel key
        ],
    ],
];

RabbitMQ

这个驱动自2.0.2版本已被弃用,并且将会在2.1版本被移除,官方建议使用AMQP Interop驱动.

AMQP Interop

基于AMQP协议的驱动,配置如下:

return [
    'bootstrap' => [
        'queue', // The component registers own console commands
    ],
    'components' => [
        'queue' => [
            'class' => \yii\queue\amqp_interop\Queue::class,
            'port' => 5672,
            'user' => 'guest',
            'password' => 'guest',
            'queueName' => 'queue',
            'driver' => yii\queue\amqp_interop\Queue::ENQUEUE_AMQP_LIB,
            
            // or
            'dsn' => 'amqp://guest:guest@localhost:5672/%2F',
            
            // or, same as above
            'dsn' => 'amqp:',
        ],
    ],
];

Beanstalk

队列存放于Beanstalk,配置如下:

return [
    'bootstrap' => [
        'queue', // 把这个组件注册到控制台
    ],
    'components' => [
        'queue' => [
            'class' => \yii\queue\beanstalk\Queue::class,
            'host' => 'localhost',
            'port' => 11300,
            'tube' => 'queue',
        ],
    ],
];

Gearman

队列存放于Gearman,配置如下:

return [
    'bootstrap' => [
        'queue', // 把这个组件注册到控制台
    ],
    'components' => [
        'queue' => [
            'class' => \yii\queue\gearman\Queue::class,
            'host' => 'localhost',
            'port' => 4730,
            'channel' => 'queue',
        ],
    ],
];

工作流程

Job

每个任务都是一个单独的类,需要实现接口\yii\queue\JobInterface,并且实现execute函数.

任务被放入队列中,并且在不同进程中从队列中获取,如果不确定在worker的作业环境中是否可用,则应在执行任务时避免外部依赖。所有处理任务的数据都应该放到作业对象的属性中,并连同它一起发送到队列中.

如果需要处理 ActiveRecord ,那么发送它的ID而不是对象本身。在处理时必须从DB提取它。

Queue

配置好驱动后,可以使用queue组件来操作队列

具体的组件名以实际配置为准,有可能项目中需要配置多个队列

加入队列

// 将作业推送到队列并获得其ID
$id = Yii::$app->queue->push(new SomeJob());

延迟队列

$id = Yii::$app->queue->delay(5 * 60)->push(new SomeJob());

任务状态

// 这个作业等待执行。
Yii::$app->queue->isWaiting($id);

// Worker 从队列获取作业,并执行它。
Yii::$app->queue->isReserved($id);

// Worker 作业执行完成。
Yii::$app->queue->isDone($id);

记录日志

此组件提供了使用日志 LogBehavior 记录队列事件,使用时需要在配置中增加如下配置:

return [
    'components' => [
        'queue' => [
            'class' => \yii\queue\redis\Queue::class,
            'as log' => \yii\queue\LogBehavior::class
        ],
    ],
];

事件监听

队列可以触发以下事件:

Event name Event class Triggered on
Queue::EVENT_BEFORE_PUSH PushEvent Adding job to queue using Queue::push() method
Queue::EVENT_AFTER_PUSH PushEvent Adding job to queue using Queue::push() method
Queue::EVENT_BEFORE_EXEC ExecEvent Before each job execution
Queue::EVENT_AFTER_EXEC ExecEvent After each success job execution
Queue::EVENT_AFTER_ERROR ExecEvent When uncaught exception occurred during the job execution
cli\Queue:EVENT_WORKER_START WorkerEvent When worker has been started
cli\Queue:EVENT_WORKER_LOOP WorkerEvent Each iteration between requests to queue
cli\Queue:EVENT_WORKER_STOP WorkerEvent When worker has been stopped

使用示例:

Yii::$app->queue->on(Queue::EVENT_AFTER_ERROR, function ($event) {
    if ($event->error instanceof TemporaryUnprocessableJobException) {
        $queue = $event->sender;
        $queue->delay(7200)->push($event->job);    
    }
});

Worker

  • yii queue/listen [timeout]

启动一个守护进程,它可以无限查询队列.如果有新的任务,他们立即得到并执行.timeout 是下一次查询队列的时间 当命令正确地通过supervisor来实现时,这种方法是最有效的.可选参数有三个:

–verbose,-v: 将执行状态输出到控制台

–isolate: 详细模式执行作业,如果启用,将打印每个作业的执行结果

–color: 高亮显示输出结果

  • yii queue/run

获取并执行循环中的任务,直到队列为空,适合于cron搭配

  • yii queue/info

打印关于队列状态的信息

  • yii queue/clear

清除一个队列

  • yii queue/remove [id]

移除一个任务

Worker启动管理

Supervisor

Supervisor 是Linux的进程监视器,它会自动启动您的控制台进程,它的配置文件存于目录 /etc/supervisor/conf.d 下,配置示例:

[program:yii-queue-worker]
process_name=%(program_name)s_%(process_num)02d
command=/usr/bin/php7.2 /home/vagrant/code/yii2-queue-mq/yii queue/listen --verbose=1 --color=0
autostart=true
autorestart=true
user=homestead
numprocs=4
redirect_stderr=true
stdout_logfile=/home/vagrant/code/yii2-queue-mq/log/yii-queue-worker.log

上面的配置会启动4个 queue/listen Worker 并记录日志,启动 Supervisor:

sudo /etc/init.d/supervisor restart

Cron

使用系统自带的计划任务 crontab 来配合命令来处理,配置示例:

* * * * * /usr/bin/php7.2 /home/vagrant/code/yii2-queue-mq/yii queue/run

上面的配置 cron 将每分钟启动一次命令

错误与重试

任务执行过程中有可能会失败,可以通过三种方法重试.

Sync 驱动不会重试失败的工作, Gearman 不支持重试

选项配置

通过在组件中增加 ttr 和 attempts来设置,这种设置作用范围是全局

  • ttr: 设置在队列中保留工作的时间,如果一份作业在这段时间没有执行,它将返回队列进行重试
  • attempts 选项设置了最大的尝试次数,如果尝试已经结束,作业作还没有完成,它将从队列中移除
'components' => [
    'queue' => [
        'class' => \yii\queue\<driver>\Queue::class,
        'ttr' => 5 * 60, // Max time for anything job handling 
        'attempts' => 3, // Max number of attempts
    ],
]

实现重试接口

可以通过为Job实现 RetryableJobInterface 的 getTtr() 和 canRetry() 接口来配置任务保留时间和最大尝试次数.

class SomeJob extends BaseObject implements RetryableJobInterface
{
    public function execute($queue)
    {
        //...
    }

    public function getTtr()
    {
        return 15 * 60;
    }

    public function canRetry($attempt, $error)
    {
        return ($attempt < 5) && ($error instanceof TemporaryException);
    }
}

监听事件

通过监听 Queue::EVENT_BEFORE_PUSH 和 Queue::EVENT_AFTER_ERROR 事件来分别设置 TTR 和 是否可以重试.

事件处理程序在 RetryableJobInterface 方法之后执行,因此具有最高优先级

  • Queue::EVENT_BEFORE_PUSH: 设置TTR
Yii::$app->queue->on(Queue::EVENT_BEFORE_PUSH, function (PushEvent $event) {
    if ($event->job instanceof SomeJob) {
        $event->ttr = 300;
    }
});
  • Queue::EVENT_AFTER_ERROR: 设置是否可以重试
Yii::$app->queue->on(Queue::EVENT_AFTER_ERROR, function (ExecEvent $event) {
    if ($event->job instanceof SomeJob) {
        $event->retry = ($event->attempt < 5) && ($event->error instanceof TemporaryException);
    }
});

调试

通过向 Yii2 调试模块中增加个面板,面板显示计数器和队列任务列表,需要安装 yiisoft/yii2-debug.

return [
    'modules' => [
        'debug' => [
            'class' => \yii\debug\Module::class,
            'panels' => [
                'queue' => \yii\queue\debug\Panel::class,
            ],
        ],
    ],
];

代码中使用

本次以Redis驱动为例,使用这个扩展,完成后台批量为用户发送推广邮件的功能.

安装所需扩展

require --prefer-dist yiisoft/yii2-queue

编写EmailJob

EmailJob继承了BaseJob,而BaseJob实现了JobInterfaceRetryableJobInterface接口,并覆盖了execute,getTtr,canRetry三个函数.

为了完整看到测试日志,EmailJob在发送邮件前sleep了5秒.

<?php

namespace common\jobs;

use yii\base\ViewNotFoundException;

/**
 * Class EmailJob
 * @property array $args
 * @package common\jobs
 */
class EmailJob extends BaseJob
{

    public $view, $viewParams, $to, $subject, $body;

    public function execute($queue)
    {
        parent::execute($queue);
        sleep(5);
        $response = \Yii::$app->mailer->compose($this->view, $this->viewParams)
            ->setTo($this->to)
            ->setSubject($this->subject)
            ->send();
        $this->stdout($response);
    }

    public function getTtr()
    {
        return 15 * 60;
    }

    public function canRetry($attempt, $error)
    {
        $this->stdout($attempt.'---'.$error);
        return ($attempt < 5) && ($error instanceof ViewNotFoundException);
    }
}

模拟添加队列

读取用户表中的所有用户,并未他们都发送一封测试邮件

$queue = \Yii::$app->queue;
$users = User::find()->asArray()->all();
foreach ($users as $user){
    $config = [
        'view' => 'mqTest',
        'viewParams' => [
            'username' => explode('@', $user->email)[0],
        ],
        'to' => $user->email,
        'subject' => '测试邮件',
    ];
    $jobID = $queue->push(new EmailJob($config));
    $this->log($jobID);
}

完成代码见jormin/yii2-queue-mq文章来源地址https://www.toymoban.com/news/detail-457049.html

测试运行

  • 启动测试脚本,像队列中添加EmailJob
vagrant@homestead:~/code/yii2-queue-mq$ php yii mq
  • 每隔一秒查询下Job的状态
vagrant@homestead:~/code/yii2-queue-mq$ php yii mq/status 1
  • 查看队列当前信息
vagrant@homestead:~/code/yii2-queue-mq$ php yii queue/info
  • 开始执行队列,并注意队列中Job的执行状态
vagrant@homestead:~/code/yii2-queue-mq$ php yii queue/run --verbose --isolate --color

到了这里,关于Yii2-Queue实现轻量级消息队列的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • [Netty源码] Netty轻量级对象池实现分析 (十三)

    1.对象池技术介绍 对象池其实就是缓存一些对象从而避免大量创建同一个类型的对象, 类似线程池。对象池缓存了一些已经创建好的对象, 避免需要的时候创建。同时限制了实例的个数。 池化技术最终要的就是重复的使用池内已经创建的对象。 创建对象的开销大 会创建大量的

    2023年04月18日
    浏览(44)
  • Spring Boot整合Postgres实现轻量级全文搜索

    有这样一个带有搜索功能的用户界面需求: 搜索流程如下所示: 这个需求涉及两个实体: “评分(Rating)、用户名(Username)”数据与 User 实体相关 “创建日期(create date)、观看次数(number of views)、标题(title)、正文(body)”与 Story 实体相关 需要支持的功能对 User

    2024年02月19日
    浏览(42)
  • 教你使用PHP实现一个轻量级HTML模板引擎

    🏆作者简介,黑夜开发者,全栈领域新星创作者✌,2023年6月csdn上海赛道top4。多年电商行业从业经验,对系统架构,数据分析处理等大规模应用场景有丰富经验。 🏆本文已收录于PHP专栏:PHP进阶实战教程。 🏆另有专栏PHP入门基础教程,希望各位大佬多多支持❤️。 在 W

    2024年02月15日
    浏览(45)
  • OpenHarmony实战开发-如何实现一个轻量级输入法应用。

    ​ 本示例使用inputMethodEngine实现一个轻量级输入法应用kikaInput,支持在运行OpenHarmony OS的智能终端上。 使用说明 1.使用hdc shell aa start ability -a InputMethod -b cn.openharmony.inputmethodchoosedialog命令拉起切换输入法弹窗,点击kikainput切换输入法到当前应用。 2.点击应用中的编辑框,拉起

    2024年04月24日
    浏览(54)
  • 聊聊Cola-StateMachine轻量级状态机的实现

    在分析Seata的saga模式实现时,实在是被其复杂的 json 状态语言定义文件劝退,我是有点没想明白为啥要用这么来实现状态机;盲猜可能是基于可视化的状态机设计器来定制化流程,更方便快捷且上手快吧,毕竟可以通过UI直接操作,设计状态流转图,但我暂时不太能get到。对

    2024年02月08日
    浏览(36)
  • 【HarmonyOS】API6使用storage实现轻量级数据存储

     写在前面 本篇内容基于API6 JS语言进行开发,通过结合轻量级数据存储开发指导的文档,帮助大家完成一个实际的代码案例,通过这个小案例,可以实现简单数据的存储。 参考文档:文档中心 1、页面布局 首先我们编写一个简单的页面布局,页面中只有一个文本和两个按钮

    2024年02月14日
    浏览(38)
  • 轻量级服务器nginix:如何实现Spring项目的负载均衡

    点两下这个package,就会在target目录下生成一个war包 把这个warb包拿出来,放在桌面上备用 数据多的话,选择sql文件,直接运行 数据导入成功了 8080和8081都可以 虚拟机的位置: nginx的位置 /usr/local/nginx/conf 两台虚拟机所在的位置: tomcat Tomcat2 运行状态良好 首先在 修改nginx里的

    2024年02月01日
    浏览(45)
  • 轻量级web开发框架:Flask本地部署及实现公网访问界面

    本篇文章讲解如何在本地安装Flask,以及如何将其web界面发布到公网上并进行远程访问。 Flask是目前十分流行的web框架,采用Python编程语言来实现相关功能。较其他同类型框架更为灵活、轻便、安全且容易上手。它可以很好地结合MVC模式进行开发,开发人员分工合作,小型团

    2024年02月05日
    浏览(59)
  • SimSearch:一个轻量级的springboot项目索引构建工具,实现快速模糊搜索

    大部分项目都会涉及模糊搜索功能,而实现模糊搜索一般分为两个派系: like简约派系 搜索引擎派系 对于较为大型的项目来说,使用Solr、ES或者Milvus之类的引擎是比较流行的选择了(效果只能说优秀),而对于中小型项目,如果考虑这些较为重型的引擎,就意味着开发成本和

    2024年02月02日
    浏览(89)
  • Unity实现camera数据注入RMP推送或轻量级RTSP服务模块

    技术背景 随着技术的不断进步和应用的不断深化,Unity3D VR应用的前景非常广阔,它广泛应用于教育、医疗、军事、工业设计、虚拟数字人等多个领域。 教育领域:Unity3D VR技术可以用来创建虚拟现实教室,让学生能够身临其境地体验课程内容,提高学习效果和兴趣; 医疗领

    2024年02月15日
    浏览(30)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包