一.消息队列
消息队列:分布式系统必备的一个基础软件,能支持组件通信消息的快速读写
Redis本身支持数据的快速访问,满足消息队列的读写性能需求
二.Redis适合做消息队列吗?
消息队列的消息存取需求
消息队列存取消息的过程
- 在分布式系统中,两个组件要基于消息队列进行通信,一个组件就会把要处理的数据以消息的形式传递给消息队列,然后这个组件就可以继续执行其他操作;
- 远端的另一个组件从消息队列中把消息读取出来,在本地进行处理。
需求:
- 组件1需要对采集到的数据进行求和计算,并写入数据库;
- 消息到达速度很快,组件1没有办法及时既做采集又做计算,并写入数据库。
解决方案:
消息队列:
- 组件1把数据x和y保存为JSON格式的消息,再把它发送到消息队列,这样就可以继续接受新的数据。
- 组件2从消息队列中 把数据读取出来在服务器2上进行求和计算上,再写入数据库。
通用的消息队列的架构模型:
消息队列存取消息时候,必须要满足的三个需求:
- 消息顺序性
- 消息幂等性
- 保证消息的可靠性
消息的顺序性
消息顺序被消费者异步处理,但是消费者仍然按照生产者发送消息的顺序来处理消息,避免后被发送的消息先被处理了。
需求:对于消息顺序性的场景来看,一旦出现消息乱序处理时,会导致业务逻辑被错误执行,给业务方造成损失。
重复消息处理
消费者从 消息队列读取消息时,有时候会因为网络堵塞出现消息重传的情况。此时,消费者可能会收到多条重复消息。对于重复消息,消费者如果多次处理的话,可能造成一个业务逻辑被多次执行,如果业务逻辑正好要修改数据,就会出现数据被多次修改的问题。
消息可靠性
消费者在处理消息的时候,可能出现因为故障 或者宕机导致消息没有处理完就丢失的情况。当消费者重启时候,可以重新读取消息再次进行处理,否则就会 出现消息漏处理的问题。
Redis如何实现消息队列的需求
基于List消息队列解决方案
List本身就是按照先进先出的顺序对数据进行存取,所以如果使用List作为消息队列保存 消息的话,就可以满足消息的顺序性。
生产者使用LPUSH命令要把发送的消息依次写入list,消费者通过RPOP命令从LIST的另一端按照消息的写入顺序,依次读取消息并处理。
存在问题:
生产者往list写入数据时,List并不会主动通知消费者有新消息写入,如果消费者想要及时处理消息,就需要程序不断调用RPOP命令(比如使用一个while(1)循环),如果新消息写入,RPOP就会返回结果,否则,RPOP命令返回空值,再继续循环。
危害:
没有新消息写入LIST消费者也要不停的调用RPOP命令,这就会导致消费者程序cpu一直消耗在执行RPOP命令上,带来不必要的性能损失。
解决:
Redis提供了BRPOP命令。BRPOP命令,也称为阻塞式读取,客户端在没有读取到队列数据时,自动阻塞,知道有新的数据写入队列,再开始读取新数据,和消费者程序在自己不停调用RPOP命令相比,这种方式能节省CPU开销。
重复消息的处理:消息的幂等性
消费者程序本身可以对重复消息进行判断。
消息队列要能给每个消息提供全局唯一的ID号;另一方面,消费者程序要把已经处理过的消息ID记录下来。当收到一条消息后,消费者程序可以对比收到的消息ID和记录处理过的消息ID。来判断当前收到的消息有么有经过处理。
如果已经处理 过了就不再处理了。这种处理特性被称为消息 幂等性。
幂等性:对于同一消息,消费者收到生成一次的处理结果和收到多次的处理结果是一致的。
不过List本身不会为每个消息生成ID号的,所以,消息的全局唯一ID号就需要生产者程序发送消息前自行生成,生成之后,我们在用LPUSH命令把消息插入List中,需要在消息中包含这个全局唯一ID。
消息可靠性:
List 类型是如何保证消息可靠性--- 备份
背景: 消费者List中读取一条消息后,List就不会存留这条消息,所以如果消费者程序在处理消息的过程中出现了故障或者宕机,就会导致消息没有处理完成,那么消费者程序再次启动就会导致消息丢失。
解决方案:为了存留消息,list提供了BRPOPLUSH命令,这个命令的作用就是让消费者从一个List中读取消息,同时Redis会把这个消息再插入到另一个List(可以叫作备份 List)留存。
如果消费者程序读取了消息但是没能正常处理,等它重启以后就可以从备份List中重新读取消息并进行处理。
生产者消息发送很快,而消费者处理消息的速度缓慢,这就导致List中消息堆积的很多,给Redis内存带来压力。
启动多个消费者程序组成消费组,一起分担处理 List中消息的消息。但是List类型并不支持消费组的实现。
基于Stream消息队列解决方案
streams是Redis专门为消息队列设计 的数据类型:
- XADD插入消息,保证有序,可以自动生成全局唯一ID;
- XREAD用于读取消息,可以按ID读取数据;
- XREADGROUP按消费组的形式读取消息;
- XPENDING和XACK: XPENDING查询每个消费组内所有消费者已读取但是尚未确认消息,ASCK命令用于向消息队列确认消息处理已经完成。
XADD命令
可以往消息队列中插入新消息,消息的格式 是键-值对形式。对于插入的每一条消息,Streams可以自动为其生成一个全局唯一ID。
XADD mqstream * repo 5
"1599203861727-0"
可以往名称为mqstream的消息队列插入一条消息,消息的键为 repo, 值为5;
消息队列中的* ,表示让Redis为插入数据自动生成一个全局唯一的ID,例如"1599203861727-0"
也可以自行设定一个ID号,保证这个ID号是全局唯一的就行。不过使用*号会更加方便高效。
消息的全局唯一ID由两部分组成
- 第一部分"1599203861727"是指当前时间戳 毫秒级
- 第二部分表示插入消息在当前毫秒内的消息序列,这是从0开始编号的,
- “1599203861727-0”就表示在“1599203861727”毫秒内的第 1 条消息。
XREAD 命令
使用XREAD命令从消息队列读取
XREAD在读取消息时候,可以指定一个消息ID,并从这个消息ID的下一条消息开始进行读取。例如我们可以执行下面的命令,从ID号为 1599203861727-0 的消息开始,读取后续的所有消息:
XREAD BLOCK 100 STREAMS mqstream 1599203861727-0
1) 1) "mqstream"
2) 1) 1) "1599274912765-0"
2) 1) "repo"
2) "3"
2) 1) "1599274925823-0"
2) 1) "repo"
2) "2"
3) 1) "1599274927910-0"
2) 1) "repo"
2) "1"
消息者也可以在调用XREAD时设定block配置项,实现类似于BRPOP的阻塞读取操作。
当消息队列中没有消息时,一旦设置了block配置项,XREAD就会阻塞;
阻塞的时长可以在block配置项进行设置。
XREAD block 10000 streams mqstream $
(nil)
(10.00s)
,命令最后的$符号表示读取最新消息,同时设置block 10000配置项,1000的单位是毫秒,表示XREAD 在读取最新消息时,如果没有消息到来,XREAD 将阻塞 10000 毫秒(即 10 秒),然后再返回。上面命令中XREAD执行后,消息队列命令中mqstream 中一直没有消息XREAD 在 10 秒后返回空值(nil)。
消费组
Stream本身可以使用XGROUP创建消费组,创建消费组后,Stream可以使用XREADGROUP命令让消费组内的消费者读取消息
XGROUP create mqstream group1 0
ok
我们再执行一段命令,让GROUP1消费组中的消费者consumer1 从 mqstream 中读取所有消息
XREADGROUP group group1 cinsumer1 streams mqstream >
1) 1) "mqstream"
2) 1) 1) "1599203861727-0"
2) 1) "repo"
2) "5"
2) 1) "1599274912765-0"
2) 1) "repo"
2) "3"
3) 1) "1599274925823-0"
2) 1) "repo"
2) "2"
4) 1) "1599274927910-0"
2) 1) "repo"
2) "1"
让group1消费组里的消费者consumer1从mqstream中读取所有消息,
命令">"表示从第一天尚未被消费的消息开始读取。
因为在consumer1读取消息前,group1并没有其他消费者读取过消息,所以consumer1就得到了mqstream消息队列中的所有消息。
消息队列中的消息一旦被消费组里的一个消息读取了,就不能再被该消费组内的其他消费者读取。
我们继续执行下面命令
XREADGROUP group group1 consumer2 streams mqstream 0
1) 1) "mqstream"
2) (empty list or set)
比如说,我们执行完刚才的 XREADGROUP 命令后,再执行下面的命令,让 group1 内的 consumer2 读取消息时,consumer2 读到的就是空值,因为消息已经被 consumer1 读取完了
消费组的目的
让组内多个消费者共同分担读取消息,通常会让每个消费者读取部分消息,从而实现让组内的多个消费者共同分担读取消息,实现消息读取负载在多个消费者间是均衡分布的。例如,我们执行下列命令,让 group2 中的 consumer1、2、3 各自读取一条消息。
XREADGROUP group group2 consumer1 count 1 streams mqstream >
1) 1) "mqstream"
2) 1) 1) "1599203861727-0"
2) 1) "repo"
2) "5"
XREADGROUP group group2 consumer2 count 1 streams mqstream >
1) 1) "mqstream"
2) 1) 1) "1599274912765-0"
2) 1) "repo"
2) "3"
XREADGROUP group group2 consumer3 count 1 streams mqstream >
1) 1) "mqstream"
2) 1) 1) "1599274925823-0"
2) 1) "repo"
2) "2"
保证消费者在发生故障或者宕机再次重启时,让可以读取未处理完的消息,stream会自动使用内部队列(PENDING List)留存消费组里 每个消费者读取的消息;
直到消费者使用XACK命令通知Streams消息已经被处理完成。
如果消费者没有成功处理消息,他就不会给Stream发送XACK命令,消息仍然会留存。
此时消费者可以在重启后,用XPENDING 命令查看已读取、但尚未确认处理完成的消息。
XPEBDING mqstream group2
1) (integer) 3
2) "1599203861727-0"
3) "1599274925823-0"
4) 1) 1) "consumer1"
2) "1"
2) 1) "consumer2"
2) "1"
3) 1) "consumer3"
2) "1"
查看group2中各个消费者已读取,但是尚未确认的消息个数。其中,XPENDING返回结果的第二行第三行分别表示group2中所有消费者读取的消息最小ID和最大ID。
XACK mqstream group2 1599274912765-0
(integer) 1
XPENDING mqstream group2 - + 10 consumer2
(empty list or set)
consumer2 就可以使用 XACK 命令通知 Streams,然后这条消息就会被删除。当我们再使用 XPENDING 命令查看时,就可以看到,consumer2 已经没有已读取、但尚未确认处理的消息了。
基于List | 基于Streams | |
消息顺序性 | LPUSH/RPOP | XADD/XREAD |
阻塞读取 | BRPOP | XREAD block |
重复消息处理 | 生产者自行实现全局唯一ID | Streams自动生成全局唯一ID |
消息可靠性 | BRPOPLPUSH | 使用PENDING List自动存留消息,使用XPENDING查看,使XACK确认 |
适用场景 | Redis 5.0前版本 部署环境消息总量小 |
Redis 5.0以后版本文章来源:https://www.toymoban.com/news/detail-407318.html 部署环境消息总量大,需要以消费组的形式读取数据文章来源地址https://www.toymoban.com/news/detail-407318.html |
到了这里,关于Redis核心技术与实战-学习笔记(十五):消息队列(Redis的解决方案)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!