Redis核心技术与实战-学习笔记(十五):消息队列(Redis的解决方案)

这篇具有很好参考价值的文章主要介绍了Redis核心技术与实战-学习笔记(十五):消息队列(Redis的解决方案)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一.消息队列

消息队列:分布式系统必备的一个基础软件,能支持组件通信消息快速读写

Redis本身支持数据的快速访问,满足消息队列的读写性能需求

二.Redis适合做消息队列吗?

消息队列的消息存取需求

消息队列存取消息的过程

  • 在分布式系统中,两个组件要基于消息队列进行通信,一个组件就会把要处理的数据以消息的形式传递给消息队列,然后这个组件就可以继续执行其他操作;
  • 远端的另一个组件从消息队列中把消息读取出来,在本地进行处理。

需求:

  • 组件1需要对采集到的数据进行求和计算,并写入数据库
  • 消息到达速度很快,组件1没有办法及时既做采集又做计算,并写入数据库

解决方案:

消息队列:

  • 组件1把数据x和y保存为JSON格式的消息,再把它发送到消息队列,这样就可以继续接受新的数据。
  • 组件2从消息队列中 把数据读取出来在服务器2上进行求和计算上,再写入数据库。

Redis核心技术与实战-学习笔记(十五):消息队列(Redis的解决方案)

通用的消息队列的架构模型:

      Redis核心技术与实战-学习笔记(十五):消息队列(Redis的解决方案)

消息队列存取消息时候,必须要满足的三个需求:

  •     消息顺序性 
  •     消息幂等性
  •     保证消息的可靠性

消息的顺序性

     消息顺序被消费者异步处理,但是消费者仍然按照生产者发送消息的顺序来处理消息,避免后被发送的消息先被处理了。

     需求:对于消息顺序性的场景来看,一旦出现消息乱序处理时,会导致业务逻辑被错误执行,给业务方造成损失。

重复消息处理

     消费者从 消息队列读取消息时,有时候会因为网络堵塞出现消息重传的情况。此时,消费者可能会收到多条重复消息。对于重复消息,消费者如果多次处理的话,可能造成一个业务逻辑被多次执行,如果业务逻辑正好要修改数据,就会出现数据被多次修改的问题。

消息可靠性

      消费者在处理消息的时候,可能出现因为故障 或者宕机导致消息没有处理完就丢失的情况。当消费者重启时候,可以重新读取消息再次进行处理,否则就会 出现消息漏处理的问题。

Redis如何实现消息队列的需求

     基于List消息队列解决方案

     List本身就是按照先进先出的顺序对数据进行存取,所以如果使用List作为消息队列保存 消息的话,就可以满足消息的顺序性

    生产者使用LPUSH命令要把发送的消息依次写入list,消费者通过RPOP命令从LIST的另一端按照消息的写入顺序,依次读取消息并处理。   

   存在问题:

     生产者往list写入数据时,List并不会主动通知消费者有新消息写入,如果消费者想要及时处理消息,就需要程序不断调用RPOP命令(比如使用一个while(1)循环),如果新消息写入,RPOP就会返回结果,否则,RPOP命令返回空值,再继续循环

     危害:

        没有新消息写入LIST消费者也要不停的调用RPOP命令,这就会导致消费者程序cpu一直消耗在执行RPOP命令上,带来不必要的性能损失

    解决:

         Redis提供了BRPOP命令。BRPOP命令,也称为阻塞式读取客户端在没有读取到队列数据时,自动阻塞,知道有新的数据写入队列,再开始读取新数据,和消费者程序在自己不停调用RPOP命令相比,这种方式能节省CPU开销。

        

重复消息的处理:消息的幂等性

       消费者程序本身可以对重复消息进行判断

      消息队列要能给每个消息提供全局唯一的ID号;另一方面,消费者程序要把已经处理过的消息ID记录下来。当收到一条消息后,消费者程序可以对比收到的消息ID和记录处理过的消息ID。来判断当前收到的消息有么有经过处理。

     如果已经处理 过了就不再处理了。这种处理特性被称为消息 幂等性。

     幂等性:对于同一消息,消费者收到生成一次的处理结果和收到多次的处理结果是一致的。

不过List本身不会为每个消息生成ID号的,所以,消息的全局唯一ID号就需要生产者程序发送消息前自行生成,生成之后,我们在用LPUSH命令把消息插入List中,需要在消息中包含这个全局唯一ID。

消息可靠性:

      List 类型是如何保证消息可靠性--- 备份

     背景:  消费者List中读取一条消息后,List就不会存留这条消息,所以如果消费者程序在处理消息的过程中出现了故障或者宕机,就会导致消息没有处理完成,那么消费者程序再次启动就会导致消息丢失。

    解决方案:为了存留消息,list提供了BRPOPLUSH命令,这个命令的作用就是让消费者从一个List中读取消息,同时Redis会把这个消息再插入到另一个List(可以叫作备份 List)留存。

      如果消费者程序读取了消息但是没能正常处理,等它重启以后就可以从备份List中重新读取消息并进行处理。

Redis核心技术与实战-学习笔记(十五):消息队列(Redis的解决方案)

      生产者消息发送很快,而消费者处理消息的速度缓慢,这就导致List中消息堆积的很多,给Redis内存带来压力

     启动多个消费者程序组成消费组,一起分担处理 List中消息的消息。但是List类型并不支持消费组的实现。

基于Stream消息队列解决方案

streams是Redis专门为消息队列设计 的数据类型:

  • XADD插入消息,保证有序,可以自动生成全局唯一ID;
  • XREAD用于读取消息,可以按ID读取数据;
  • XREADGROUP按消费组的形式读取消息;
  • XPENDING和XACK: XPENDING查询每个消费组内所有消费者已读取但是尚未确认消息,ASCK命令用于向消息队列确认消息处理已经完成。

XADD命令

可以往消息队列中插入新消息,消息的格式 是键-值对形式。对于插入的每一条消息,Streams可以自动为其生成一个全局唯一ID。

XADD mqstream * repo 5
"1599203861727-0"

可以往名称为mqstream的消息队列插入一条消息,消息的键为 repo, 值为5;

消息队列中的* ,表示让Redis为插入数据自动生成一个全局唯一的ID,例如"1599203861727-0"

也可以自行设定一个ID号,保证这个ID号是全局唯一的就行。不过使用*号会更加方便高效。

消息的全局唯一ID由两部分组成

  •    第一部分"1599203861727"是指当前时间戳 毫秒级
  •    第二部分表示插入消息在当前毫秒内的消息序列,这是从0开始编号的,
  •   “1599203861727-0”就表示在“1599203861727”毫秒内的第 1 条消息。

XREAD 命令

       使用XREAD命令从消息队列读取

        XREAD在读取消息时候,可以指定一个消息ID,并从这个消息ID的下一条消息开始进行读取。例如我们可以执行下面的命令,从ID号为 1599203861727-0 的消息开始,读取后续的所有消息:

XREAD BLOCK 100 STREAMS  mqstream 1599203861727-0
1) 1) "mqstream"
   2) 1) 1) "1599274912765-0"
         2) 1) "repo"
            2) "3"
      2) 1) "1599274925823-0"
         2) 1) "repo"
            2) "2"
      3) 1) "1599274927910-0"
         2) 1) "repo"
            2) "1"

消息者也可以在调用XREAD时设定block配置项,实现类似于BRPOP的阻塞读取操作。

当消息队列中没有消息时,一旦设置了block配置项,XREAD就会阻塞;

阻塞的时长可以在block配置项进行设置。

XREAD block 10000 streams mqstream $
(nil)
(10.00s)

       ,命令最后的$符号表示读取最新消息,同时设置block 10000配置项,1000的单位是毫秒,表示XREAD 在读取最新消息时,如果没有消息到来,XREAD 将阻塞 10000 毫秒(即 10 秒),然后再返回。上面命令中XREAD执行后,消息队列命令中mqstream 中一直没有消息XREAD 在 10 秒后返回空值(nil)。

     

消费组

      Stream本身可以使用XGROUP创建消费组,创建消费组后,Stream可以使用XREADGROUP命令让消费组内的消费者读取消息

      

XGROUP create mqstream group1 0
ok

   我们再执行一段命令,让GROUP1消费组中的消费者consumer1 从 mqstream 中读取所有消息

XREADGROUP group group1 cinsumer1 streams mqstream >
1) 1) "mqstream"
   2) 1) 1) "1599203861727-0"
         2) 1) "repo"
            2) "5"
      2) 1) "1599274912765-0"
         2) 1) "repo"
            2) "3"
      3) 1) "1599274925823-0"
         2) 1) "repo"
            2) "2"
      4) 1) "1599274927910-0"
         2) 1) "repo"
            2) "1"

让group1消费组里的消费者consumer1从mqstream中读取所有消息,

命令">"表示从第一天尚未被消费的消息开始读取。

因为在consumer1读取消息前,group1并没有其他消费者读取过消息,所以consumer1就得到了mqstream消息队列中的所有消息。

消息队列中的消息一旦被消费组里的一个消息读取了,就不能再被该消费组内的其他消费者读取。

我们继续执行下面命令


XREADGROUP group group1 consumer2  streams mqstream 0
1) 1) "mqstream"
   2) (empty list or set)

比如说,我们执行完刚才的 XREADGROUP 命令后,再执行下面的命令,让 group1 内的 consumer2 读取消息时,consumer2 读到的就是空值,因为消息已经被 consumer1 读取完了 

消费组的目的 

    让组内多个消费者共同分担读取消息,通常会让每个消费者读取部分消息,从而实现让组内的多个消费者共同分担读取消息,实现消息读取负载在多个消费者间是均衡分布的。例如,我们执行下列命令,让 group2 中的 consumer1、2、3 各自读取一条消息。

XREADGROUP group group2 consumer1 count 1 streams mqstream >
1) 1) "mqstream"
   2) 1) 1) "1599203861727-0"
         2) 1) "repo"
            2) "5"

XREADGROUP group group2 consumer2 count 1 streams mqstream >
1) 1) "mqstream"
   2) 1) 1) "1599274912765-0"
         2) 1) "repo"
            2) "3"

XREADGROUP group group2 consumer3 count 1 streams mqstream >
1) 1) "mqstream"
   2) 1) 1) "1599274925823-0"
         2) 1) "repo"
            2) "2"

保证消费者在发生故障或者宕机再次重启时,让可以读取未处理完的消息,stream会自动使用内部队列(PENDING List)留存消费组里 每个消费者读取的消息;

直到消费者使用XACK命令通知Streams消息已经被处理完成。

如果消费者没有成功处理消息,他就不会给Stream发送XACK命令,消息仍然会留存。

此时消费者可以在重启后,用XPENDING 命令查看已读取、但尚未确认处理完成的消息。

XPEBDING mqstream group2
1) (integer) 3
2) "1599203861727-0"
3) "1599274925823-0"
4) 1) 1) "consumer1"
      2) "1"
   2) 1) "consumer2"
      2) "1"
   3) 1) "consumer3"
      2) "1"

查看group2中各个消费者已读取,但是尚未确认的消息个数。其中,XPENDING返回结果的第二行第三行分别表示group2中所有消费者读取的消息最小ID和最大ID。

XACK mqstream group2 1599274912765-0
(integer) 1
XPENDING mqstream group2 - + 10 consumer2
(empty list or set)

consumer2 就可以使用 XACK 命令通知 Streams,然后这条消息就会被删除。当我们再使用 XPENDING 命令查看时,就可以看到,consumer2 已经没有已读取、但尚未确认处理的消息了。

基于List 基于Streams
消息顺序性 LPUSH/RPOP XADD/XREAD
阻塞读取 BRPOP XREAD block
重复消息处理 生产者自行实现全局唯一ID Streams自动生成全局唯一ID
消息可靠性 BRPOPLPUSH 使用PENDING List自动存留消息,使用XPENDING查看,使XACK确认
适用场景

Redis 5.0前版本

部署环境消息总量小

Redis 5.0以后版本

部署环境消息总量大,需要以消费组的形式读取数据文章来源地址https://www.toymoban.com/news/detail-407318.html

到了这里,关于Redis核心技术与实战-学习笔记(十五):消息队列(Redis的解决方案)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 《Redis 核心技术与实战》课程学习笔记(四)

    一旦服务器宕机,内存中的数据将全部丢失。目前,Redis 的持久化主要有两大机制,即 AOF 日志和 RDB 快照。 AOF 日志是如何实现的? 我们比较熟悉的是数据库的写前日志(Write Ahead Log, WAL),也就是说,在实际写数据前,先把修改的数据记到日志文件中,以便故障时进行恢复

    2024年02月12日
    浏览(33)
  • 项目实战 — 消息队列(2){创建核心类}

    目录  一、创建项目 二、创建核心类 🍅 1、 编写交换机类,Exchange 🍅 2、编写存储消息的队列,MSGQueue 🍅 3、编写绑定类,binding 🍅 4、编写消息,Message   代码解释主要在注释上面,注意一下注释 核心类主要是在服务器模块中的。  主要是以下几个类 * 交换机 exchange *

    2024年02月15日
    浏览(26)
  • 架构核心技术之分布式消息队列

    Java全能学习+面试指南:https://javaxiaobear.cn 今天我们来学习分布式消息队列,分布式消息队列的知识结构如下图。 主要介绍以下内容: 同步架构和异步架构的区别。异步架构的主要组成部分:消息生产者、消息消费者、分布式消息队列。异步架构的两种主要模型:点对点模型

    2024年02月07日
    浏览(31)
  • 《kafka 核心技术与实战》课程学习笔记(七)

    压缩(compression)秉承了用时间去换空间的经典 trade-off 思想,具体来说就是用 CPU 时间去换磁盘空间或网络 I/O 传输量,希望以较小的 CPU 开销带来更少的磁盘占用或更少的网络 I/O 传输。 目前 Kafka 共有两大类消息格式,社区分别称之为 V1 版本和 V2 版本。 不论是哪个版本,

    2024年02月11日
    浏览(38)
  • 《kafka 核心技术与实战》课程学习笔记(八)

    Kafka 只对“已提交”的消息(committed message)做有限度的持久化保证。 第一个核心要素是“已提交的消息”。 当 Kafka 的若干个 Broker 成功地接收到一条消息并写入到日志文件后,它们会告诉生产者程序这条消息已成功提交。 可以选择只要有一个 Broker 成功保存该消息就算是已

    2024年02月16日
    浏览(31)
  • 《kafka 核心技术与实战》课程学习笔记(九)

    拦截器基本思想就是允许应用程序在不修改逻辑的情况下,动态地实现一组可插拔的事件处理逻辑链。 它能够在主业务操作的前后多个时间点上插入对应的“拦截”逻辑。 Spring MVC 拦截器的工作原理: Kafka 拦截器借鉴了这样的设计思路:可以在消息处理的前后多个时点动态

    2024年02月12日
    浏览(39)
  • 《kafka 核心技术与实战》课程学习笔记(五)

    严格来说这些配置并不单单指 Kafka 服务器端的配置,其中既有 Broker 端参数,也有主题级别的参数、JVM 端参数和操作系统级别的参数。 Broker 端参数也被称为静态参数(Static Configs): 所谓 静态参数 ,是指你必须在 Kafka 的配置文件 server.properties 中进行设置的参数 ,不管你

    2024年02月11日
    浏览(49)
  • 《kafka 核心技术与实战》课程学习笔记(十)

    Apache Kafka 的所有通信都是基于 TCP 的,而不是基于 HTTP 或其他协议。 在开发客户端时,能够利用 TCP 本身提供的一些高级功能,比如多路复用请求以及同时轮询多个连接的能力。 除了 TCP 提供的这些高级功能有可能被 Kafka 客户端的开发人员使用之外,目前已知的 HTTP 库在很多

    2024年02月11日
    浏览(51)
  • 【Redis实战】有MQ为啥不用?用Redis作消息队列!?Redis作消息队列使用方法及底层原理高级进阶

     🎉🎉欢迎光临🎉🎉 🏅我是苏泽,一位对技术充满热情的探索者和分享者。🚀🚀 🌟特别推荐给大家我的最新专栏 《Redis实战与进阶》 本专栏纯属为爱发电永久免费!!! 这是苏泽的个人主页可以看到我其他的内容哦👇👇 努力的苏泽 http://suzee.blog.csdn.net/ 我们用的是云

    2024年02月20日
    浏览(36)
  • PHP使用Redis实战实录5:Redis实现消息队列

    PHP使用Redis实战实录系列 PHP使用Redis实战实录1:宝塔环境搭建、6379端口配置、Redis服务启动失败解决方案 PHP使用Redis实战实录2:Redis扩展方法和PHP连接Redis的多种方案 PHP使用Redis实战实录3:数据类型比较、大小限制和性能扩展 PHP使用Redis实战实录4:单例模式和面向过程操作

    2024年02月11日
    浏览(33)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包