flask整合rabbitMQ插件的方式

这篇具有很好参考价值的文章主要介绍了flask整合rabbitMQ插件的方式。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

二、Python-flask-rabbitMQ-插件方式整合

引言

当今互联网应用的高并发场景下,如何保证可靠的消息传递和高效的处理成为了一项重要的挑战。在这种情况下,RabbitMQ作为一种可靠的消息队列系统,被广泛应用于各个领域。

本文将介绍如何使用RabbitMQ插件整合Flask框架,实现并发性生产者和消费者的兼容。通过这种方式,我们可以利用RabbitMQ的优势来确保消息的可靠传递,并提高应用程序的处理能力。

首先,我们需要在Flask应用程序中引入RabbitMQ插件。可以使用Pika库来连接和操作RabbitMQ。通过创建一个连接池管理连接对象,我们可以避免频繁地创建和销毁连接,提高效率。

同时,为了处理高并发的生产者,我们可以使用批量发送消息的方式来减少通道创建和消息发布的开销。通过设置缓冲区来收集一定数量或一定时间间隔内的消息,然后批量发送,可以更有效地利用资源。

此外,对于高并发的消费者,我们可以考虑使用异步的方式来处理消息。通过将消息发送任务交给后台线程或异步任务队列处理,可以避免请求的堵塞,提高应用程序的并发能力。

通过以上的优化方案,我们可以在Flask应用程序中充分利用RabbitMQ的功能,并且兼容高并发的生产者和消费者。这将帮助我们构建更可靠、高效的消息队列系统,应对日益增长的并发访问需求。

总之,使用RabbitMQ插件整合Flask框架,并采用优化方案来兼容并发性生产者和消费者,是构建可靠、高效消息传递系统的关键一步。通过这种方式,我们能够更好地应对高并发场景下的挑战,提升应用程序的性能和稳定性。

具体步骤

1 安装依赖:

使用pip安装pika库:

pip install pika

2 编写实体类:

from queue import Queue
from threading import Lock

import pika


# 定义交换机类型的枚举值
class ExchangeType:
    DEFAULT = 'default'
    DIRECT = "direct"
    FANOUT = "fanout"
    TOPIC = 'topic'


class RabbitMQ:
    def __init__(self, host='localhost', port=5672, username='guest', password='guest', pool_size=10,heartbeat=0):
        self.credentials = pika.PlainCredentials(username, password)
        self.parameters = pika.ConnectionParameters(host=host, port=port, credentials=self.credentials,
                                                    heartbeat=heartbeat)
        self.connection_pool = Queue(pool_size)  # 连接池,存储连接和信道
        self.lock = Lock()  # 互斥锁,用于对连接池的访问进行同步

        for _ in range(pool_size):
            connection = self._create_connection()
            channel = connection.channel()
            self.connection_pool.put((connection, channel))

    def _create_connection(self):
        return pika.BlockingConnection(self.parameters)

    def get_channel(self):
        with self.lock:
            connection, channel = self.connection_pool.get()  # 从连接池获取连接和信道
        return connection, channel

    def release_channel(self, connection, channel):
        with self.lock:
            self.connection_pool.put((connection, channel))  # 将连接和信道放回连接池

    def send_message(self, exchange, routing_key, message, exchange_type=ExchangeType.DIRECT):
        connection, channel = self.get_channel()
        try:
            # channel.exchange_declare(exchange=exchange, exchange_type=exchange_type)  # 声明交换机并指定类型
            channel.basic_publish(exchange=exchange, routing_key=routing_key, body=message)  # 发布消息
        finally:
            self.release_channel(connection, channel)

    def receive_messages(self, queue, callback):
        connection, channel = self.get_channel()
        try:
            channel.queue_declare(queue=queue, durable=True)  # 声明队列并标记为持久化
            # channel.queue_purge(queue=queue)  # 清空队列,以防之前的非持久化消息残留
            channel.basic_qos(prefetch_count=5)  # 每次从 RabbitMQ 获取 10 条消息
            channel.basic_consume(queue=queue, on_message_callback=callback, auto_ack=False)  # 消费消息并设置回调函数
            channel.start_consuming()  # 开始消费消息
        finally:
            self.release_channel(connection, channel)



3 编写消费者和生产者:

这里解释一下:在实际开发过程中我们发现需要使用heartbeat,在生产者和消费者中heartbeat是有细微差别的

在生产者的角色中heartbeat代表如果在heartbeat时间内没有发送(生产)消息到队列里,这个通道将会被关闭,和mq断开,所以这个处理的方法是heartbeat=0(禁止心跳),这样,不论什么时候发送发送信息到队列都可以,不会因为没有发送信息断开通道

在消费者的角色中heartbeat代表每隔heartbeat来消费一次,没有也没关系,但是要来,如果我们我们向上面一样禁止heartbeat,那样消费者永远不会来MQ队列消费数据了,所以这里的heartbeat需要设置一个值,来MQ拿数据,即使MQ没有数据也没有关系文章来源地址https://www.toymoban.com/news/detail-768905.html

    # 初始化 RabbitMQ 实例 RABBITMQ_PASSWORD
    rabbitmq = RabbitMQ(host=app.config['RABBITMQ_HOST'], port=app.config['RABBITMQ_PORT'],
                        username=app.config['RABBITMQ_USERNAME'], password=app.config['RABBITMQ_PASSWORD'])

    # 在应用上下文中注册 RabbitMQ 实例
    app.config['RABBITMQ'] = rabbitmq

    # 初始化 RabbitMQ 实例 RABBITMQ_PASSWORD
    rabbitmq_consume = RabbitMQ(host=app.config['RABBITMQ_HOST'], port=app.config['RABBITMQ_PORT'],
                                username=app.config['RABBITMQ_USERNAME'], password=app.config['RABBITMQ_PASSWORD'],
                                pool_size=1, heartbeat=15)

    # 在应用上下文中注册 RabbitMQ 实例
    app.config['RABBITMQ_CONSUME'] = rabbitmq_consume
    # consume_mq(app)
    #
    thread = threading.Thread(target=consume_mq, args=(app,))

    # 启动线程
    thread.start()
    
def consume_mq(app):
    def ack_message(channel, delivery_tag):
        infoLog.info(f'ack_message thread id: {threading.get_ident()}')
        if channel.is_open:
            infoLog.info("处理完成回调")
            channel.basic_ack(delivery_tag)
        else:
            errorLog.error("通道已经关闭")
            # Channel is already closed, so we can't ACK this message;
            # logs and/or do something that makes sense for your app in this case.

    def do_work(channel, delivery_tag, body):
        try:
            print(f"消息队列内容 {body.decode()}")
            # 处理rabbitMQ内容
            to_transcribe(body.decode())
        except Exception as e:
            traceback.print_exc()
            errorLog.error(str(e))
            errorLog.exception("An error occurred:")
        finally:
            cb = functools.partial(ack_message, channel, delivery_tag)
            channel.connection.add_callback_threadsafe(cb)

    # 启动消费者程序,开始接收和处理消息
    def callback(channel, method, properties, body):
        try:
            infoLog.info("处理消息消息队列内容")
            delivery_tag = method.delivery_tag
            t = threading.Thread(target=do_work, args=(channel, delivery_tag, body))
            t.start()
        except Exception as e:
            errorLog.error(str(e))
            errorLog.exception("An error occurred:")
        # channel.basic_ack(delivery_tag=method.delivery_tag)

    def on_message(channel, method_frame, header_frame, body):
        infoLog.info(f'on_message thread id: {threading.get_ident()}')

    # 启动消费者程序,开始接收和处理消息
    with app.app_context():
        rabbitmq = current_app.config['RABBITMQ_CONSUME']
        rabbitmq.receive_messages('qwen_queue', callback)



4 初始化消费者和生产者:

def create_app():
    app = Flask(__name__)
    connect_mq_v1(app)

5 其他地方使用生产者


class MessageHandler:
    """处理存放音频,将所有的任务都放在MQ里面"""

    def __init__(self, dir_name,uuid_str, back_url, file_url, request_type, file_name, *args, **kwargs):

        # 文件夹名称
        self.dir_name = dir_name
        # 文件名称
        self.file_name = file_name
        # 文件上传类型
        self.request_type = request_type
        # 文件存储位置
        self.file_url = file_url
        # 客户端回调地址
        self.back_url = back_url
        # 唯一标识
        self.uuid_str = uuid_str

    def send(self):
        """
        :param content_type:队列类型
        :param rpc:MQ对象
        :return:
        """
        try:
            # 发送消息队列
            # rpc.send_expire(body=json.dumps(self.to_json()), exchange='audio_queue', key='audio_queue')
            rabbitmq = current_app.config['RABBITMQ']
            rabbitmq.send_message('audio_queue', 'audio_queue', json.dumps(self.to_json()))
            print("发送消息到mq成功,用于存放音频信息")
        except Exception as e:
            print(f"发送消息到mq服务失败,请检查, {e}")

    def to_json(self):
        _dict = self.__dict__
        return _dict
# 将请求体和uuid放到rabbitMQ中
MessageHandler(**dates).send()

到了这里,关于flask整合rabbitMQ插件的方式的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Spring整合RabbitMQ-配制文件方式-3-消息拉模式

    拉消息的消费者 spring-rabbit.xml 当启动消费者后,便可获取到发送至队列的消息 检查队列的消息的情况: 经过检查确认,发现消息已经被消费了。 至此拉模式的消费者完成。

    2024年02月09日
    浏览(40)
  • Spring整合RabbitMQ-配制文件方式-1-消息生产者

    Spring-amqp是对AMQP的一些概念的一些抽象,Spring-rabbit是对RabbitMQ操作的封装实现。 主要有几个核心类 RabbitAdmin 、 RabbitTemplate 、 SimpleMessageListenerContainer 等 RabbitAdmin 类完成对Exchange、Queue、Binding的操作,在容器中管理 了 RabbitAdmin 类的时候,可以对Exchange、Queue、Binding进行自动声

    2024年02月09日
    浏览(42)
  • Spring整合RabbitMQ-配制文件方式-2-推模式消费者

    推模式的消费者 在推模式中使用可以两种实现: 使用ChannelAwareMessageListener. 除消息外,还提供了Channel这个对象,通过channel可以有更大的灵活性。 使用MessageListener 基本的消息的临时。普通的场景基本够用。 此处以ChannelAwareMessageListener为样例: spring-rabbit.xml 容器启动类 首先

    2024年02月09日
    浏览(43)
  • 使用python的pika链接rabbitMq断裂

    比如我们执行一个很长的任务的时候,执行结束ack确认发现确认失败,mq都断了。 只要是使用pyhon的pika都会出现这个问题,因为pika本身是没有主动发送心跳机制的(你用java的话是没问题的) 解决方式: 在链接中heartbeat=0 credentials = pika.PlainCredentials(\\\'xxx\\\',\\\'xxx\\\') connection = pika.Blo

    2024年02月03日
    浏览(40)
  • Python三方库:Pika(RabbitMQ基础使用)

    Python有多种插件都支持RabbitMQ,本文介绍的是RabbitMQ推荐的Pika插件。使用pip直接安装即可  pip install pika  。 MQ(Message Queue,消息队列),是一个在消息传输过程中保存消息的容器,多用在分布式系统之间进行通信。 MQ优势 应用解耦:提高系统容错性和可维护性。 异步提速:

    2024年04月16日
    浏览(54)
  • RabbitMQ实现延迟消息,RabbitMQ使用死信队列实现延迟消息,RabbitMQ延时队列插件

    假设有一个业务场景:超过30分钟未付款的订单自动关闭,这个功能应该怎么实现? RabbitMQ使用死信队列,可以实现消息的延迟接收。 队列有一个消息过期属性。就像丰巢超过24小时就收费一样,通过设置这个属性,超过了指定事件的消息将会被丢弃。 这个属性交:x-message

    2024年02月13日
    浏览(79)
  • RabbitMQ Stream插件使用详解

    2.4版为RabbitMQ流插件引入了对RabbitMQStream插件Java客户端的初始支持。 RabbitStreamTemplate StreamListener容器 将spring rabbit流依赖项添加到项目中: 您可以使用RabbitAdmin bean,使用QueueBuilder.stream()方法指定队列类型,正常地配置队列。例如: 然而,这仅在您还使用non-stream 组件(如

    2024年04月27日
    浏览(30)
  • rabbitmq整合springboot:ChannelAwareMessageListener和@RabbitListener的使用

    Springboot中使用Rabbimq监听队列中有两种方式,一种是@RabbitListener注解的方式,一种是实现springboot:ChannelAwareMessageListener接口的方式 前者使用如下: 消费者: 生产者: 后者使用方式: 配置文件:

    2024年02月12日
    浏览(40)
  • 查看RabbitMQ日志---trace插件的使用

    我的RabbitMQ是安装在docker里面的  所以我以下的方法都是根据这个路径去操作的   如果RabbitMQ安装在其他地方  请自行百度 1. 显示正在运行的RabbitMQ容器的名称或ID:   这将启动所有正在运行的 Docker 容器,并包含 RabbitMQ 容器的信息。 使用 docker exec 命令来运行 rabbitmq-plugin

    2024年02月16日
    浏览(44)
  • RabbitMQ使用延迟插件,代码量直接减少一半!

    今天介绍一下使用RabbitMQ的延迟插件方便实现延迟消息的方案。 RabbitMQ 是一个由 Erlang 语言开发的 AMQP(高级消息队列协议) 的开源实现。 RabbitMQ 是轻量级且易于部署的,能支持多种消息协议。 RabbitMQ 可以部署在分布式和联合配置中,以满足高规模、高可用性的需求。 具体特

    2024年02月09日
    浏览(49)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包