深入详解高性能消息队列中间件 RabbitMQ

这篇具有很好参考价值的文章主要介绍了深入详解高性能消息队列中间件 RabbitMQ。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

 目录

1、引言

2、什么是 RabbitMQ ?

3、RabbitMQ 优势

4、RabbitMQ 整体架构剖析

4.1、发送消息流程

4.2、消费消息流程

5、RabbitMQ 应用

5.1、广播

5.2、RPC


VC++常用功能开发汇总(专栏文章列表,欢迎订阅,持续更新...)https://blog.csdn.net/chenlycly/article/details/124272585C++软件异常排查从入门到精通系列教程(专栏文章列表,欢迎订阅,持续更新...)https://blog.csdn.net/chenlycly/article/details/125529931C++软件分析工具从入门到精通案例集锦(专栏文章正在更新中...)https://blog.csdn.net/chenlycly/article/details/131405795C/C++基础与进阶(专栏文章,持续更新中...)https://blog.csdn.net/chenlycly/category_11931267.html开源组件及数据库技术(专栏文章,持续更新中...)https://blog.csdn.net/chenlycly/category_12458859.html网络编程与网络问题分享(专栏文章,持续更新中...)https://blog.csdn.net/chenlycly/category_2276111.html

1、引言

       在进行系统设计的时候,各个模块、服务器之间为了实现数据的交互,通常是建立连接通过发送消息来进行。如果将他们一一建立连接,就会出现链路太多,每一条链路都必须感知对端等问题。此场景下消息将非常混乱,后期维护也将非常痛苦。为了解决这个问题,精简系统,引入RabbitMq。各相关模块不在相互发送消息,而将消息都发送给RabbitMQ,由RabbitMQ负责将消息传递出去。

       那么,什么是RabbitMQ?RabbitMQ又是如何实现这些功能的呢?   


       在这里,给大家重点推荐一下我的两个热门畅销专栏:

专栏1:(该专栏订阅量接近350个,有很强的实战参考价值,广受好评!)

C++软件异常排查从入门到精通系列教程(专栏文章列表,欢迎订阅,持续更新...)https://blog.csdn.net/chenlycly/article/details/125529931

本专栏根据近几年C++软件异常排查的项目实践,系统地总结了引发C++软件异常的常见原因以及排查C++软件异常的常用思路与方法,详细讲述了C++软件的调试方法与手段,以图文并茂的方式给出具体的实战问题分析实例,带领大家逐步掌握C++软件调试与异常排查的相关技术,适合基础进阶和想做技术提升的相关C++开发人员!

专栏中的文章都是通过项目实战总结出来的(通过项目实战积累了大量的异常排查素材和案例),有很强的实战参考价值!专栏文章还在持续更新中,预计文章篇数能更新到200篇以上!

专栏2: 

C/C++基础与进阶https://blog.csdn.net/chenlycly/category_11931267.html

以多年的开发实战为基础,总结并讲解一些的C/C++基础与进阶内容,以图文并茂的方式对相关知识点进行详细地展开与阐述!专栏涉及了C/C++领域的多个方面的内容,同时给出C/C++及网络方面的常见笔试面试题,并详细讲述Visual Studio常用调试手段与技巧!


2、什么是 RabbitMQ ?

深入详解高性能消息队列中间件 RabbitMQ,开源组件及数据库技术,高级消息队列协议,AMQP,RabbitMQ,消息队列,消费消息,广播,RPC

       在讲RabbitMQ之前,需要先了解一下AMQP的概念。

       AMQP,即Advanced Message Queuing Protocol(高级消息队列协议),是一个提供统一消息服务的应用层标准高级消息队列协议。AMQP是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件传递消息,不受客户端/中间件不同产品、不同开发语言等条件的限制。该协议是一种二进制协议,提供客户端应用于消息中间件之间异步、安全、高效的交互。相对于我们常见的REST API,AMQP更容易实现,可以降低开销,同时灵活性高,可以轻松的添加负载平衡和高可用性的功能,并保证消息传递,在性能上AMQP协议也相对更好一些。

       RabbitMQ是AMQP的一个开源实现,服务器端用Erlang语言编写,用于在分布式系统中存储转发消息,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、 ActionScript、XMPP、STOMP等,支持AJAX。 MQ(Messages Queue)是一种应用程序与应用程的通信方法。RabbitMQ相当于生产者与消费者的模式,消息发送端(生产者)将消息写入消息队列,消息接收端(消费者)从消息队列中取出消息、消费消息;而消息的发送端无需知道消息接受端的存在,反之亦然。

3、RabbitMQ 优势

        RabbitMQ主要有以下几个优势:

  • 可靠性(Reliablity):使用了一些机制来保证可靠性,比如持久化、传输确认、发布确认。
  • 灵活的路由(Flexible Routing):在消息进入队列之前,通过Exchange来路由消息。对于典型的路由功能,Rabbit已经提供了一些内置的Exchange来实现。针对更复杂的路由功能,可以将多个Exchange绑定在一起,也通过插件机制实现自己的Exchange。
  • 消息集群(Clustering):多个RabbitMQ服务器可以组成一个集群,形成一个逻辑Broker。
  • 高可用(Highly Avaliable Queues):队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。
  • 多种协议(Multi-protocol):支持多种消息队列协议,如STOMP、MQTT等。
  • 多种语言客户端(Many Clients):几乎支持所有常用语言,比如Java、.NET、Ruby等。
  • 管理界面(Management UI):提供了易用的用户界面,使得用户可以监控和管理消息Broker的许多方面。
  • 跟踪机制(Tracing):如果消息异常,RabbitMQ提供了消息的跟踪机制,使用者可以找出发生了什么。
  • 插件机制(Plugin System):提供了许多插件,来从多方面进行扩展,也可以编辑自己的插件。

4、RabbitMQ整体架构剖析

        在详细介绍RabbitMQ之前,先介绍几个重要的概念:

  • Queue:消息队列
  • Exchange:交换机,它会按照路由规则来投递消息
  • Routing key:路由关键字,exchange会根据它来进行消息投递
  • Bind:绑定了queue和exchange,根据路由规则将消息会投递到对应的消息队列中去。
  • Producer:消息生产者
  • Consumer:消息的消费者

       RabbitMQ的整体架构图如下所示:

深入详解高性能消息队列中间件 RabbitMQ,开源组件及数据库技术,高级消息队列协议,AMQP,RabbitMQ,消息队列,消费消息,广播,RPC

P(Producer,消息生产者)负责发送,C(Consumer,消息消费者)负责消费消息。其中交换机exchange、队列Queue的定义、exchange与Queue的绑定既可以放在发送端,也可以放在消费端,但是不管放在何处定义,要在使用前定义,否则会出错。本文统一将exchange放在生产者端来定义,而将queue的定义,queue与exchange的绑定放在消费端来处理。另外,为了防止第一次使用exchange是在消费端,可以在消费端也同时定义exchange。本文不考虑这种情况,默认在消费端使用exchange的时候已定义过。

4.1、发送消息流程

      P端发送消息的基本过程是:

1)连接服务器;
2)声明exchange,并设置其相关属性;
3)将消息发送到exchange。

其中,exchange有3种类型:fanout、routing、topic:

1)fanout不处理路由键,为空即可,只要简单的将队列绑定到交换机上,那么发送到交换机上的消息都会被转发到与该交换机绑定的所有队列上。
2)Routing处理路由键,需要将一个队列绑定到交换机上,要求消息与一个特定的路由键完全匹配。
3)Topic将路由键与某模式进行匹配,此时队列需要绑定到一个模式上。匹配的规格是”#”匹配一个或多个词,”*”匹配一个词。

4.2、消费消息流程

      C消费消息的基本过程是:

1)连接服务器;
2)声明队列queue及其属性(持久化、无消费者时是否自动删除队列等等);
3)设置routingkey,并且通过routingkey将queue与exchange绑定到一起;
4)等待消息,消费消息。

其中,queue可以设置的属性有:Exclusive、auto_delete、durable。

1)Exclusive:排他队列,如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。
2)Auto_delete:自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。
3)Durable:服务器重启后,队列不会丢失。 

      对上述的exchange、queue、binding的一个例子:

Mq.queue_bind(“QueueTest”, “ExchangeTest”, “Test”)

这个绑定的意思是:任何发送到交换机ExchangeTest的具有路由键Test的消息都会被路由到名为QueueTest的队列中。

5、RabbitMQ 应用

       一般平台的消息大致分为两种类型:notif和req-ack-notif。对应于rabbitmq正好有两种模型:publish/subscribe和rpc。下面根据实际应用来讲解这两个模型。

5.1、广播

      假设应用服务器收到了一条消息A,需要广播给其他多个业务服务器。按照图一中rabbitmq的基本结构我们应该能想到两种方式:

深入详解高性能消息队列中间件 RabbitMQ,开源组件及数据库技术,高级消息队列协议,AMQP,RabbitMQ,消息队列,消费消息,广播,RPC

Method1

深入详解高性能消息队列中间件 RabbitMQ,开源组件及数据库技术,高级消息队列协议,AMQP,RabbitMQ,消息队列,消费消息,广播,RPC

Method2

上述两种方法哪一种能实现我们的目的?答案是Method1,如果采用Method2的话,queue会将消息依次分发给两个消费端,例如客户端C1收到消息1,3,5…,客户端C2收到消息2,4,6…。

       虽然此种方法不能实现我们的目的,但在此处插入一点,及每条消息的处理量可能而且几乎肯定是不同的,所以有时会出现客户端C1处理完了N条消息,但客户端C2一条还没处理完,为了解决这个问题,rabbitmq提供了公平调度的概念即Fair dispatch:Rabbitmq不会在同一时间给工作者分配多个任务,只有在工作者完成任务之后,才会再次接收到任务。

       回到刚才讨论的地方,我们已经确立了使用Method1来完成该功能,现在根据该方法进行一些简单的编码验证(注:验证语言为python)。publish/subscribe模型之P客户端代码如下:

import pika

#建立连接
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

#声明交换机
channel.exchange_declare(exchange='exchangeTest', type='fanout')

#发送消息
channel.basic_publish(exchange='exchangeTest', routing_key='', body='Hello World!')
connection.close()

publish/subscribe模型之C客户端代码:

import pika

#建立连接
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

#创建queue
channel.queue_declare(queue=’QueueTest’)

#绑定
channel.queue_bind(exchange=’exchangeTest’, queue=’QueueTest’)
def callback(ch, method, properties, body):    
print “ [x] Received %r” %(body, )    
channel.basic_consume(callback, queue =’QueueTest’, no_ack=True)
channel.start_consuming()

       AMQP支持在一个TCP连接上启用多个MQ通信channel,每个channel都可以被应用作为通信流, 被分配了一个整数标识,自动由Connection()类的.channel()方法维护。每个AMQP程序至少要有一个连接和一个channel。

5.2、RPC

       对于大部分消息我们不仅仅是通知,更多的是需要对方在接收到消息后给我们回复的。此时,
我们就需要rabbitmq提供的RPC模型,如下图所示:

深入详解高性能消息队列中间件 RabbitMQ,开源组件及数据库技术,高级消息队列协议,AMQP,RabbitMQ,消息队列,消费消息,广播,RPC

       RPC模型与广播模型相比,最大的区别是消费者客户端在接收到消息的时候,需要给发送者P回复消息。而同样的,消息生产者P也不仅仅是做为发送端了,他还需要接收来自消费端C回复的消息。

       由P到C我们知道直接将Queue1绑定到exchange上就OK了,那么C回复消息的时候通过什么回给P呢?为此,rabbitmq在P发送消息的时候,提供设置回调队列及关联ID,C在给P回复消息的时候,通过回调队列即可。提供关联ID的目的是即使P端收到Queue2的消息,也要验证Correlation_Id是否匹配,不匹配的话,直接忽略。

       使用如下的代码进行验证(注:验证语言为python),RPC模型之P端的代码如下:

import pika
class Center(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters( 
host='localhost'))
        self.channel = self.connection.channel()      
        #定义接收返回消息的队列,此处为一随机生成的队列
        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue
        #等待接收消息
        self.channel.basic_consume(self.on_response, no_ack=True,queue=self.callback_queue)

    #定义接收到返回消息的处理方法
    def on_response(self, ch, method, props, body):
        self.response = body
    def request(self, n):
        self.response = None

#发送计算请求        
self.channel.basic_publish(exchange='',
 routing_key='compute_queue', properties=pika.BasicProperties
(reply_to = self.callback_queue,), body=str(n))
        #接收返回的数据
        while self.response is None:
            self.connection.process_data_events()
        return int(self.response)
center = Center()
response = center.request(30)
print " [.] Got %r" % (response,)

RPC模型之C端代码:文章来源地址https://www.toymoban.com/news/detail-753933.html

import pika

class Center(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters( 
host='localhost'))
        self.channel = self.connection.channel()      
        #定义接收返回消息的队列,此处为一随机生成的队列
        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue

        #等待接收消息
        self.channel.basic_consume(self.on_response, no_ack=True,queue=self.callback_queue)
    #定义接收到返回消息的处理方法
    def on_response(self, ch, method, props, body):
        self.response = body
    def request(self, n):
        self.response = None

#发送计算请求        
self.channel.basic_publish(exchange='',
 routing_key='compute_queue', properties=pika.BasicProperties
(reply_to = self.callback_queue,),body=str(n))
        #接收返回的数据
        while self.response is None:
            self.connection.process_data_events()
        return int(self.response)
center = Center()
response = center.request(30)
print " [.] Got %r" % (response,)

到了这里,关于深入详解高性能消息队列中间件 RabbitMQ的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RocketMQ on openEuler 提供高性能消息队列的稳定性解决方案

    RocketMQ on openEuler,是一种将 RocketMQ 消息中间件通过容器化的方式部署在 openEuler 操作系统上运行,借助 openEuler 系统对于 OS 缓存回收效率增强的内核特性,提升消息中间件在面向超大规模高并发、高吞吐量、低延迟场景下稳定性和可靠性的软件解决方案。 移动云 RocketMQ 消息

    2024年02月11日
    浏览(54)
  • 【中间件-Openjob】高性能任务调度框架Openjob简介及快速搭建

    一款分布式高性能任务调度框架,支持多种定时任务、延时任务、工作流设计、轻量级分布式计算、无限水平扩容,并具有较高的可伸缩性和容错性,以及完善权限管理、强大的告警监控、原生支持多语言。 基础信息 中文官网 :https://openjob.io/zh-Hans/ 开源地址 :https://githu

    2024年02月12日
    浏览(63)
  • System.Threading.Channels 高性能异步队列

    System.Threading.Channels 是.NET Core 3.0 后推出的新的集合类型, 具有异步API,高性能,线程安全等特点,它提供一个异步数据集合,可用于生产者和消费者之前的数据异步传递。 它提供如下方法: BoundedChannelOptions Provides options that control the behavior of bounded ChannelT instances. 提供通道的行

    2024年01月24日
    浏览(49)
  • [RDMA] 高性能异步的消息传递和RPC :Accelio

    1. Introduce Accelio是一个高性能异步的可靠消息传递和RPC库,能优化硬件加速。 RDMA和TCP / IP传输被实现,并且其他的传输也能被实现,如共享存储器可以利用这个高效和方便的API的优点。Accelio 是 Mellanox 公司的RDMA中间件,用于高性能异步的可靠消息传递和RPC库。 Accelio提供了一

    2024年02月12日
    浏览(42)
  • SambaNova 芯片:深入解析其架构和高性能秘诀

    原创 AI苏妲己  SambaNova——一家总部位于帕洛阿尔托的公司已经筹集了超过10亿美元的风险投资,不会直接向公司出售芯片。相反,它出售其定制技术堆栈的访问权限,该堆栈具有专门为运行最大的人工智能模型而设计的专有硬件和软件。 最近,SambaNova宣布推出了其新型SN

    2024年04月10日
    浏览(51)
  • 基于 Redis 实现高性能、低延迟的延时消息的方案演进

    🎉欢迎来系统设计专栏:基于 Redis 实现高性能、低延迟的延时消息的方案演进 📜其他专栏:java面试 数据结构 源码解读 故障分析 🎬作者简介:大家好,我是小徐🥇 ☁️博客首页:CSDN主页 小徐的博客 🌄每日一句: 好学而不勤非真好学者 📜 欢迎大家关注! ❤️ 随着

    2024年01月22日
    浏览(70)
  • Kafka 最佳实践:构建可靠、高性能的分布式消息系统

    Apache Kafka 是一个强大的分布式消息系统,被广泛应用于实时数据流处理和事件驱动架构。为了充分发挥 Kafka 的优势,需要遵循一些最佳实践,确保系统在高负载下稳定运行,数据可靠传递。本文将深入探讨 Kafka 的一些最佳实践,并提供丰富的示例代码,帮助读者更好地应用

    2024年02月03日
    浏览(63)
  • “深入理解Redis:高性能缓存和数据存储技术解析“

    标题:深入理解Redis:高性能缓存和数据存储技术解析 摘要:本文将深入探讨Redis作为一种高性能缓存和数据存储技术的原理和用法。我们将从Redis的基本特性入手,介绍其在缓存和数据存储方面的优势,并通过实际示例代码展示如何使用Redis提升应用程序的性能和可靠性。

    2024年02月16日
    浏览(53)
  • “深入理解Redis:高性能缓存与数据存储的秘密“

    标题:深入理解Redis:高性能缓存与数据存储的秘密 在现代应用程序的开发中,缓存和数据存储是非常重要的组成部分。它们不仅可以提高应用程序的性能,还可以减轻数据库和网络的负载。其中,Redis作为一种高性能的内存数据存储系统,因其出色的性能和灵活的特性而备

    2024年02月16日
    浏览(46)
  • “深入解析Redis:高性能缓存与分布式数据存储“

    标题:深入解析Redis:高性能缓存与分布式数据存储 摘要:本文将深入解析Redis,介绍其作为高性能缓存和分布式数据存储的特点和功能,并提供示例代码展示其使用方法。 正文: 一、引言 Redis是一个开源的内存数据结构存储系统,它以其高性能、灵活的数据结构以及丰富的

    2024年02月17日
    浏览(59)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包