如何在 Ubuntu VPS 上使用 Celery 与 RabbitMQ 来做队列

这篇具有很好参考价值的文章主要介绍了如何在 Ubuntu VPS 上使用 Celery 与 RabbitMQ 来做队列。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

简介


异步或非阻塞处理是一种将某些任务的执行与程序的主要流程分离的方法。这为您提供了几个优势,包括允许用户界面代码在没有中断的情况下运行。

消息传递是程序组件用来通信和交换信息的一种方法。它可以同步或异步实现,并且可以允许离散进程进行无问题的通信。消息传递通常作为传统数据库的替代实现,因为消息队列通常实现了额外的功能,提供了增加的性能,并且可以完全驻留在内存中。

Celery 是建立在异步消息传递系统上的任务队列。它可以用作编程任务可以被倾倒的桶。传递任务的程序可以继续执行和响应功能,然后稍后它可以轮询 celery 来查看计算是否完成并检索数据。

虽然 celery 是用 Python 编写的,但它的协议可以在任何语言中实现。它甚至可以通过 webhooks 与其他语言一起使用。

通过在程序环境中实现作业队列,您可以轻松卸载任务并继续处理用户的交互。这是增加应用程序响应性的简单方法,并且在执行长时间运行的计算时不会被锁定。

在本指南中,我们将在 Ubuntu 12.04 VPS 上安装和实现使用 RabbitMQ 作为消息系统的 celery 作业队列。

安装组件


安装 Celery


Celery 是用 Python 编写的,因此可以像处理常规 Python 包一样轻松安装。

我们将按照处理 Python 包的推荐程序,通过创建虚拟环境来安装我们的消息系统。这有助于我们保持环境稳定,不会影响更大的系统。

从 Ubuntu 的默认存储库安装 Python 虚拟环境包:

sudo apt-get update
sudo apt-get install python-virtualenv

我们将创建一个消息目录,在这里我们将实现我们的系统:

mkdir ~/messaging
cd ~/messaging

现在我们可以创建一个虚拟环境,通过以下命令安装 celery:

virtualenv --no-site-packages venv

配置好虚拟环境后,可以通过输入以下命令激活它:

source venv/bin/activate

您的提示符将更改以反映您现在正在使用我们上面创建的虚拟环境。这将确保我们的 Python 包安装在本地而不是全局。

如果在任何时候我们需要停用环境(现在不需要),可以输入:

deactivate

现在我们已经激活了环境,可以使用 pip 安装 celery:

pip install celery

安装 RabbitMQ


Celery 需要一个消息代理来处理来自外部源的请求。这个代理被称为“broker”。

有很多可供选择的代理选项,包括关系型数据库、NoSQL 数据库、键值存储和实际消息系统。

我们将配置 celery 使用 RabbitMQ 消息系统,因为它提供了强大、稳定的性能,并且与 celery 交互良好。这是一个很好的解决方案,因为它包含了与我们预期使用的功能很好契合的特性。

我们可以通过 Ubuntu 的存储库安装 RabbitMQ:

sudo apt-get install rabbitmq-server

RabbitMQ 服务在安装后会自动启动在我们的服务器上。

创建 Celery 实例


为了使用 celery 的任务排队功能,安装后的第一步是创建一个 celery 实例。这是一个简单的过程,导入包,创建一个“app”,然后设置 celery 能够在后台执行的任务。

让我们在我们的消息目录内创建一个名为 tasks.py 的 Python 脚本,我们可以在其中定义我们的工作人员可以执行的任务。

sudo nano ~/messaging/tasks.py

我们应该做的第一件事是从 celery 包中导入 Celery 函数:

from celery import Celery

之后,我们可以创建一个连接到默认 RabbitMQ 服务的 celery 应用程序实例:

from celery import Celery

app = Celery('tasks', backend='amqp', broker='amqp://')

Celery 函数的第一个参数是将用于标识任务的前缀名称。

backend 参数是一个可选参数,如果您希望查询后台任务的状态或检索其结果,则是必需的。

如果您的任务只是执行一些工作然后退出,而不返回在程序中使用的有用值,您可以将此参数省略。如果只有一些任务需要此功能,请在此启用它,我们可以在后面逐个案例地禁用它。

broker 参数指定连接到我们代理所需的 URL。在我们的情况下,这是运行在我们服务器上的 RabbitMQ 服务。RabbitMQ 使用一种称为“amqp”的协议运行。如果 RabbitMQ 在其默认配置下运行,celery 可以连接而无需其他信息,只需 amqp:// 方案。

构建 Celery 任务


在这个文件中,我们现在需要添加我们的任务。

每个 Celery 任务都必须使用装饰器 @app.task 来引入。这允许 Celery 识别可以添加其排队功能的函数。在每个装饰器之后,我们只需创建一个我们的工作进程可以运行的函数。

我们的第一个任务将是一个简单的函数,它将一个字符串打印到控制台。

from celery import Celery

app = Celery('tasks', backend='amqp', broker='amqp://')

@app.task
def print_hello():
    print 'hello there'

因为这个函数不返回任何有用的信息(它将其打印到控制台),我们可以告诉 Celery 不使用后端来存储关于此任务的状态信息。这在内部更简单,需要更少的资源。

from celery import Celery

app = Celery('tasks', backend='amqp', broker='amqp://')

@app.task(ignore_result=True)
def print_hello():
    print 'hello there'

接下来,我们将添加另一个函数,它将生成素数(取自 RosettaCode)。这可能是一个长时间运行的过程,因此这是一个很好的例子,说明我们在等待结果时如何处理异步工作进程。

from celery import Celery

app = Celery('tasks', backend='amqp', broker='amqp://')

@app.task(ignore_result=True)
def print_hello():
    print 'hello there'

@app.task
def gen_prime(x):
    multiples = []
    results = []
    for i in xrange(2, x+1):
        if i not in multiples:
            results.append(i)
            for j in xrange(i*i, x+1, i):
                multiples.append(j)
    return results

因为我们关心这个函数的返回值,并且我们想知道它何时完成(以便我们可以使用结果等),所以我们不会向这个第二个任务添加 ignore_result 参数。

保存并关闭文件。

启动 Celery 工作进程


我们现在可以启动一个工作进程,它将能够接受来自应用程序的连接。它将使用我们刚刚创建的文件来了解它可以执行的任务。

启动一个工作实例就像使用 celery 命令调用应用程序名称一样简单。我们将在字符串末尾包含一个 “&” 字符,将我们的工作进程放入后台:

celery worker -A tasks &

这将启动一个应用程序,然后将其从终端分离,允许您继续使用它进行其他任务。

如果您想要启动多个工作进程,可以使用 -n 参数为每个工作进程命名:

celery worker -A tasks -n one.%h &
celery worker -A tasks -n two.%h &

当工作进程被命名时,%h 将被主机名替换。

要停止工作进程,您可以使用 kill 命令。我们可以查询进程 ID,然后根据这些信息消除工作进程。

ps auxww | grep 'celery worker' | awk '{print $2}' | xargs kill

这将允许工作进程在退出之前完成其当前任务。

如果您希望关闭所有工作进程而不等待它们完成任务,可以执行:

ps auxww | grep 'celery worker' | awk '{print $2}' | xargs kill -9

使用队列处理工作


我们可以使用我们生成的工作进程在后台为我们的程序完成工作。

我们将在 Python 解释器中探索不同的选项,而不是创建一个完整的程序来演示它是如何工作的:

python

在提示符下,我们可以将我们的函数导入到环境中:

from tasks import print_hello
from tasks import gen_prime

如果您测试这些函数,它们似乎没有任何特殊功能。第一个函数按预期打印一行:

print_hello()

hello there

第二个函数返回一个素数列表:

primes = gen_prime(1000)
print primes

如果我们给第二个函数一个更大的数字范围来检查,执行将挂起,因为它在计算中:

primes = gen_prime(50000)

通过输入 “CTRL-C” 来停止执行。这个过程显然没有在后台计算。

要访问后台工作进程,我们需要使用 .delay 方法。Celery 为我们的函数添加了额外的功能。这个方法用于将函数传递给工作进程执行。它应该立即返回:

primes = gen_prime.delay(50000)

这个任务现在正在由我们之前启动的工作进程执行。因为我们为应用程序配置了 backend 参数,我们可以检查计算的状态并访问结果。

要检查任务是否完成,我们可以使用 .ready 方法:

primes.ready()

False

“False” 的值意味着任务仍在运行,结果尚不可用。当我们得到 “True” 的值时,我们可以对答案做些什么。

primes.ready()

True

我们可以使用 .get 方法获取值。

如果我们已经使用 .ready 方法验证了值是否计算出来,那么我们可以像这样使用该方法:

print primes.get()

[2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 89, 97, 101, 103, 107, 109, 113, 127, 131, 137, 139, 149, 151, 157, 163, 167, 173, 179, 181, 191, 193, 197, 199, 211, 223, 227, 229, 233, 239, 241, 251, 257, 263, 269, 271, 277, 281, 283, 293, 307, 311, 313, 317, 331, 337, 347, 349, 353, 359, 367, 373, 379, 383, 389, 397, 401, 409, 419, 421, 431, 433, 439, 443, 449, 457, 461, 463, 467, 479, 487, 491, 499, 503, 509, 521, 523,
. . .

然而,如果您在调用 .get 之前没有使用 .ready 方法,您很可能希望添加一个 “timeout” 选项,以便您的程序不必等待结果,这将违反我们的实现目的:

print primes.get(timeout=2)

如果超时,这将引发异常,您可以在程序中处理它。

结论


虽然这些信息足以让你开始在程序中使用 celery,但这只是揭开了这个库的全部功能的一角。Celery允许你将后台任务串联在一起,对任务进行分组,并以有趣的方式组合函数。

虽然 celery 是用 Python 编写的,但可以通过 Webhooks 与其他语言一起使用。这使得它非常灵活,可以将任务移到后台,而不受所选择的语言的限制。文章来源地址https://www.toymoban.com/news/detail-830220.html

到了这里,关于如何在 Ubuntu VPS 上使用 Celery 与 RabbitMQ 来做队列的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Java中如何使用消息队列实现异步(ActiveMQ,RabbitMQ,Kafka)

    在 Java 中,可以使用消息队列实现异步处理。下面是一个简单的示例代码,用于说明如何使用 ActiveMQ 实现消息队列异步处理: 添加 ActiveMQ 依赖 在 pom.xml 文件中添加以下依赖: 创建消息队列 创建一个名为 “TestQueue” 的消息队列,并配置 ActiveMQ 连接信息: 创建消息消费者

    2024年02月16日
    浏览(57)
  • 消息队列之六脉神剑:RabbitMQ、Kafka、ActiveMQ 、Redis、 ZeroMQ、Apache Pulsar对比和如何使用

    消息队列(Message Queue)是一种异步通信机制,它将消息发送者和接收者解耦,从而提高了应用程序的性能、可扩展性和可靠性。在分布式系统中,消息队列经常被用于处理高并发、异步处理、应用解耦等场景。 本篇回答将分析比较常见的六种消息队列:RabbitMQ、Kafka、Active

    2024年02月14日
    浏览(46)
  • celery笔记五之消息队列的介绍

    本文首发于公众号:Hunter后端 原文链接:celery笔记五之消息队列的介绍 前面我们介绍过 task 的处理方式,将 task 发送到队列 queue,然后 worker 从 queue 中一个个的获取 task 进行处理。 task 的队列 queue 可以是多个,处理 task 的 worker 也可以是多个,worker 可以处理任意 queue 的 t

    2024年02月09日
    浏览(43)
  • celery分布式异步任务队列-4.4.7

    version 4.4.7 学习总结 python实现、开源、遵循BSD许可的分布式任务队列; 可以处理大量消息,简单、灵活、可靠的分布式系统,专注任务的 实时处理 和 定时调度 处理; 它是线程、进程分配任务的一种机制,官方仅做支持linux开发。 五大部分: task,任务 beat,定时调度管理器

    2024年02月07日
    浏览(46)
  • docker容器内的django启动celery任务队列

    celery任务队列一般要使用redis,但是容器内的django要访问本机的redis是十分麻烦的 在容器内安装redis,或者单独启动一个redis的容器,我是单独启动一个redis容器 安装redis镜像 docker pull redis 启动redis容器 docker run -d --name redis_container redis 查看redis的IP `docker inspect -f “{{range .Netwo

    2024年02月14日
    浏览(36)
  • Django 如何使用 Celery 完成异步任务或定时任务

    以前版本的 Celery 需要一个单独的库(django-celery)才能与 Django 一起工作, 但从 Celery 3.1 开始,情况便不再如此,我们可以直接通过 Celery 库来完成在 Django 中的任务。 以 Docker 安装为例,安装一个密码为 mypassword 的 Redis 服务端 在 Django 项目中创建一个 celery.py 文件,并配置

    2023年04月25日
    浏览(50)
  • RabbitMQ如何实现延时队列

    RabbitMQ是目前最为流行的消息队列之一,它的高可靠性、高可用性和高性能使得它成为众多应用场景下的首选。在实际应用中,我们经常需要实现延时队列来解决一些业务问题,比如订单超时未支付自动取消等。本文将介绍如何使用RabbitMQ实现延时队列。 1. 延时队列的概念 延

    2024年02月16日
    浏览(39)
  • 在Windows下设置分布式队列Celery的心跳轮询

    目录 引言 一、Celery基础知识 二、Windows环境准备 三、心跳轮询机制详解 四、Celery心跳轮询设置 五、测试与验证 六、常见问题与解决方案 七、结论 随着微服务架构的普及,分布式系统在各种应用中扮演着越来越重要的角色。Celery作为一个分布式任务队列,能够帮助我们在分

    2024年01月19日
    浏览(32)
  • RabbitMQ实现延迟消息,RabbitMQ使用死信队列实现延迟消息,RabbitMQ延时队列插件

    假设有一个业务场景:超过30分钟未付款的订单自动关闭,这个功能应该怎么实现? RabbitMQ使用死信队列,可以实现消息的延迟接收。 队列有一个消息过期属性。就像丰巢超过24小时就收费一样,通过设置这个属性,超过了指定事件的消息将会被丢弃。 这个属性交:x-message

    2024年02月13日
    浏览(79)
  • 消息队列如何选择?Kafka、Pulsar、RabbitMQ还是...

    公众号: MCNU云原生 ,欢迎搜索关注,更多干货,第一时间掌握! 消息队列是当代分布式系统架构中非常重要的一部分,在应用解耦、流量削峰、异步通信等方面有非常多的应用场景。目前最为我们所熟知的消息队列有:ActiveMQ、Kafka、RabbitMQ、Pulsar和RocketMQ,他们都有哪些优

    2024年02月04日
    浏览(50)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包