celery笔记五之消息队列的介绍

这篇具有很好参考价值的文章主要介绍了celery笔记五之消息队列的介绍。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

本文首发于公众号:Hunter后端
原文链接:celery笔记五之消息队列的介绍

前面我们介绍过 task 的处理方式,将 task 发送到队列 queue,然后 worker 从 queue 中一个个的获取 task 进行处理。

task 的队列 queue 可以是多个,处理 task 的 worker 也可以是多个,worker 可以处理任意 queue 的 task,也可以处理指定 queue 的 task,这个我们在介绍 queue 的时候再做介绍。

这一篇我们来介绍一下存储 task 的队列 queue。

  1. 默认队列 task_default_queue
  2. 定义队列
  3. 将 task 指定到队列 queue 消费

以下的操作都是在 Django 系统的配置中使用。

1、默认队列 task_default_queue

当我们运行一个最简单的延时任务比如 add.delay(1, 2) 时,并没有设置一个消息队列,因为如果我们没有指定,系统会为我们创建一个默认队列。

这个默认的队列被命名为 celery,值在 app.conf.task_default_queue,我们可以查看一下:

from hunter.celery import app
app.conf.task_default_queue

# 输出为 'celery'

2、定义队列

我们可以设想一下这个场景,我们只有一个 worker 处理 task,每个 task 需要处理的时间很长,因为 worker 被占用,这样在我们的任务队列里就会积压很多的 task。

有一些需要即时处理的任务则会被推迟处理,这样的情况下,我们理想的设计是设置多个 worker,多个 worker 分别处理指定队列的 task。

关于 worker 的设置,比如添加多个 worker,给 worker 消费指定队列的 task,我们在 worker 的笔记中再介绍,这里我们介绍一下如何定义队列。

任务队列的定义如下:

# hunter/celery.py

from kombu import Queue

app.conf.task_queues = (
    Queue('blog_tasks', ),
)

当我们定义了任务队列之后,我们可以将 task 指定输出到对应的 queue,假设 blog/tasks.py 下有这样一个 task:

# blog/tasks.py
from celery import shared_task

@shared_task
def add(x, y):
    return x + y

接下来我们调用这个 task 的时候,需要指定队列:

from blog.tasks import add
add.apply_async((1, 2), queue='blog_tasks')

如果我们就这样配置 celery,这个时候如果我们直接再调用 delay() 函数,也就是不指定 queue 的话,会发现我们发出的 task 是不能被 worker 处理的。

也就是说,下面的操作是不起作用的:

from blog.tasks import add
add.delay(1, 2)  # 此时,我们的调用不会被队列接收到

如果需要在调用 task 的时候不指定队列,使用系统默认的队列,这个时候我们需要额外来指定一个 task_default_queue,celery 的配置如下:

# hunter/celery.py

app.conf.task_queues = (
    Queue('blog_tasks'),
    Queue('default_queue'),
)
app.conf.task_default_queue = 'default_queue'

这样,我们在使用延时任务的时候,就不需要指定 queue 参数了,都会走我们的默认 task 队列:

from blog.tasks import add
add.delay(1, 2)  # 队列会被 default_queue 接收到

而如果我们想实现 add 的延时任务走的是 blog_tasks 这个队列,但是我们在调用的时候不想那么麻烦每次都指定 queue 参数,这个就需要用到 task_routes 配置项了。

3、将 task 指定到队列 queue 消费

如果我们想某些函数使用指定的 queue,我们可以使用 task_routes 配置项来操作。

现在我们有两个 application,blog 和 polls,这两个 application 下都有各自的 tasks,文件的内容如下:

# blog/tasks.py
from celery import shared_task

@shared_task
def add(x, y):
    return x + y

@shared_task
def minus(x, y):
    return x - y
# polls/tasks.py
from celery import shared_task

@shared_task
def multi(x, y):
    return x * y

我们想要实现的最终的目的是在调用延时任务的时候,可以直接使用 delay() 的方式,不需要使用 apply_async(queue='xx')。

我们想要实现的功能是,polls/tasks.py 下的所有的延时任务以及 blog/tasks.py 下的 add() 函数进入 queue_1 队列

blog 下的 minus() 函数进入 queue_2 队列

其他所有的 task 都走默认的队列,default_queue。

我们可以如下配置:

app.conf.task_queues = (
    Queue('queue_1'),
    Queue('queue_2'),
    Queue('default_queue'),
)

app.conf.task_routes = {
    'polls.tasks.*': {
        'queue': 'queue_1',
    },
    'blog.tasks.add': {
        'queue': 'queue_1',
    },
    'blog.tasks.minus': {
        'queue': 'queue_2',
    },
}

app.conf.task_default_queue = 'default_queue'

如果想获取更多后端相关文章,可扫码关注阅读:
celery笔记五之消息队列的介绍文章来源地址https://www.toymoban.com/news/detail-494282.html

到了这里,关于celery笔记五之消息队列的介绍的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 消息队列之RabbitMQ介绍

    提示:这里可以添加系列文章的所有文章的目录,目录需要自己手动添加 消息队列之RabbitMQ介绍 提示:写完文章后,目录可以自动生成,如何生成可参考右边的帮助文档 提示:这里可以添加本文要记录的大概内容: 在当今的分布式系统和微服务架构中,消息队列扮演着至关

    2024年01月18日
    浏览(24)
  • 消息队列介绍与对比

            消息队列不是什么新鲜玩意了,网上也是一大堆消息队列的介绍。本文只记录自己消息队列的使用过程,和自己总结的消息队列的对比。         消息队列广泛应用主要得益于如下特性:         1、非实时性。一些业务并不需要实时处理;         2、

    2024年01月20日
    浏览(25)
  • 消息队列介绍

    MQ(message queue),本质是个队列,FIFO 先入先出,只不过队列中存放的内容是 message 而已,还是一种跨进程的通信机制,用于上下游传递消息。在互联网架构中,MQ 是一种非常常 见的上下游“逻辑解耦+物理解耦”的消息通信服务。使用了 MQ 之后,消息发送上游只需要依赖 MQ,不

    2024年01月21日
    浏览(21)
  • 消息队列中间件介绍

    消息队列介绍   消息队列中间件是大型系统中的重要组件,已经逐渐成为企业系统内部通信的核心手段。它具有松耦合、异步消息、流量削峰、可靠投递、广播、流量控制、最终一致性等一系列功能,已经成为异步RPC的主要手段之一。 目前常见的消息中间件有ActiveMQ、Rabbi

    2024年02月04日
    浏览(30)
  • 消息队列-------Rabbitmq介绍和安装

    消息队列就是基础数据结构中的“先进先出”的一种数据机构。想一下,生活中买东西,需要排队,先排的人先买消费,就是典型的“先进先出” MQ是一直存在,不过随着微服务架构的流行,成了解决微服务之间问题的常用工具。  1.应用解耦  单体应用---》分布式应用  

    2024年02月10日
    浏览(31)
  • MQ消息队列(主要介绍RabbitMQ)

    消息队列概念:是在消息的传输过程中保存消息的容器。 作用:异步处理、应用解耦、流量控制..... RabbitMQ:     SpringBoot继承RabbitMQ步骤:         1.加入依赖          2.配置         3.开启(如果不需要监听消息也就是不消费就不需要该注解开启)         4.创建队列、

    2024年02月11日
    浏览(33)
  • 中间件RabbitMQ消息队列介绍

    1.1 什么是 MQ MQ ( message queue ),从字面意思上看,本质是个队列, FIFO 先入先出,只不过队列中存放的内容是 message 而已,还是一种跨进程的通信机制,用于上下游传递消息。在互联网架构中, MQ 是一种非常常 见的上下游 逻辑解耦+物理解耦 的消息通信服务。使用了 MQ 之

    2024年02月13日
    浏览(44)
  • linux系统消息队列的模式和介绍

    点对点模式(生产者消费者模型) P2P模式包含三个角色:消息队列(Queue)、发送者(Sender)、接收者(Receiver)。每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,直到它们被消费或超时。 发布/订阅模式(发布者和订阅者模型) Pub/Sub模式包含三

    2024年02月21日
    浏览(33)
  • 消息队列学习笔记

    异步处理:处理完关键步骤后直接返回结果,后续放入队列慢慢处理 流量控制: 使用消息队列隔离网关和后端服务,以达到流量控制和保护后端服务的目的。能根据下游的处理能力自动调节流量,达到“削峰填谷”的作用 网关在收到请求后,将请求放入请求消息队列; 后端

    2024年02月12日
    浏览(32)
  • 【图解RabbitMQ-3】消息队列RabbitMQ介绍及核心流程

    🧑‍💻作者名称:DaenCode 🎤作者简介:CSDN实力新星,后端开发两年经验,曾担任甲方技术代表,业余独自创办智源恩创网络科技工作室。会点点Java相关技术栈、帆软报表、低代码平台快速开发。技术尚浅,闭关学习中······ 😎人生感悟:尝尽人生百味,方知世间冷暖。

    2024年02月09日
    浏览(29)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包