redis【stream】:对redis流数据类型的详细介绍

这篇具有很好参考价值的文章主要介绍了redis【stream】:对redis流数据类型的详细介绍。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

stream产生原因

stream的概念

stream底层实现

stream的常用指令

常用命令一览:

xadd命令

xread命令

xlen命令

xrange命令

xrevrange命令

xtrim命令

xdel命令

xgroup命令

xinfo命令

xpending命令

xreadgroup命令

xack命令

xclaim命令


stream产生原因

redis在设计之初,就试图在保证自身缓存作用在市场上占优的基础上开发与MQ类似的消息队列,以增强自己在市场中的竞争优势,在redis1.0时,我们使用list就能模拟实现一个简单的消息队列,按照插入顺序排序,你可以添加一个元素到列表的头部(左边)或者尾部(右边)。所以常用来做异步队列使用,将需要延后处理的任务结构体序列化成字符串塞进 Redis 的列表,另一个线程从这个列表中轮询数据进行处理。具体如下图:
redis【stream】:对redis流数据类型的详细介绍

但是使用list模拟消息队列存在很大的痛点:类似于lpush和lpop这样的指令只能实现点对点的模式,一旦涉及(pub/sub)这样的发布订阅的场景(一对多),list无法满足;除此之外,list存在的一个很大的痛点问题是消息无法持久化,如果出现网络断开、Redis 宕机等,消息就会被丢弃。而且也没有 Ack 机制来保证数据的可靠性,假设一个消费者都没有,那消息就直接被丢弃了。所以在redis5.0之后,stream这一更强大的数据结构应运而生

redis【stream】:对redis流数据类型的详细介绍

stream的概念

stream是redis5.0之后引入的数据类型,用一句话说,stream是redis版本的MQ中间件+阻塞队列,Stream流实现消息队列,它支持消息的持久化、支持自动生成全局唯一 ID、支持ack确认消息的模式、支持消费组模式等,让消息队列更加的稳定和可靠

stream底层实现

我们通过一张图和一个表格对redis的底层实现进行说明 :

redis【stream】:对redis流数据类型的详细介绍

一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的 ID 和对应的内容

1
Message Content
消息内容
2
Consumer group
消费组,通过XGROUP CREATE 命令创建,同一个消费组可以有多个消费者
3
Last_delivered_id
游标,每个消费组会有个游标 last_delivered_id,任意一个消费者读取了消息都会使游标 last_delivered_id 往前移动。
4
Consumer
消费者,消费组中的消费者
5
Pending_ids
消费者会有一个状态变量,用于记录被当前消费已读取但未ack的消息Id,如果客户端没有ack,这个变量里面的消息ID会越来越多,一旦某个消息被ack它就开始减少。这个pending_ids变量在Redis官方被称之为 PEL(Pending Entries List),记录了当前已经被客户端读取的消息,但是还没有 ack (Acknowledge character:确认字符),它用来确保客户端至少消费了消息一次,而不会在网络传输的中途丢失了没处理

stream的常用指令

常用命令一览:

队列相关指令
redis【stream】:对redis流数据类型的详细介绍

 消费者相关指令redis【stream】:对redis流数据类型的详细介绍

 四个特殊符号:
redis【stream】:对redis流数据类型的详细介绍

xadd命令

语法格式为:

XADD key ID field value [field value ...]

  • key,用来指定 stream 的名字
  • ID,用来指定 ID 值,最常用的是 *
  • field value [field value ...],key-value类型数据

xadd 用来在指定的 key 中添加消息,如果 key 不存在,则自动创建。添加的消息为 key-value 类型,可以一次添加多个消息。

指定 ID,最常用的是 *,表示由 redis 自动生成 ID,自动生成的 ID 为 1526919030474-55 格式,由毫秒时间戳和序列号组成,序列号用于区分同一毫秒内生成的消息,保证 ID 始终是递增的。如果由于一些其他原因系统时钟慢了,导致生成的时间戳小于了 redis 中记录的值,则会取系统中记录的最大值继续递增,保证 ID 的递增状态。

1
2
3
4

127.0.0.1:6379> xadd message 1 key1 value1 key2 value2
"1-0"
127.0.0.1:6379> xadd message 1-2 key1 value1 key2 value2
"1-2"

ID 在一般情况下是由 redis 自动指定的,但其实 ID 也是可以自定义的,为了保证 ID 的自增状态,手动指定的 ID 必须要大于系统中存在的 ID,只不过一般不这么做。

1
2

127.0.0.1:6379> xadd message * key1 value1 key2 value2
"1604475735664-0"

以上命令添加了 key1-value 和 key2-value2 两条消息到 message 这个 key 中,返回值为当前消息的 ID,由 redis 自动生成,此时消息队列中就有一条消息可以被读取了。

1
2

127.0.0.1:6379> xadd message maxlen 10 * key3 value3
"1604476672762-0"

maxlen 参数用来限制 key 中消息的最大数量,但是精确限制 key 中消息的数量是低效的,可以使用 ~ 符号粗略的限制 key 中消息的数量,redis 会在可以删除整个宏结点时才去删除多余的消息,实际数量可能会比限制数量多几十个,这是正常的,但是不会少于限制的数量。

xread命令

语法格式为:

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]

  • [COUNT count],用来获取消息的数量
  • [BLOCK milliseconds],用来设置阻塞模式和阻塞超时时间,默认为非阻塞
  • id [id ...],用来设置读取的起始 ID,相当于 where id >来获取最新的消息 ID,非阻塞模式下无意义。
  • key,指定 stream 的名字

xread 命令用于从一个或多个 key 中读取消息,仅返回 ID 值大于参数中传入的 ID 的消息。此命令有阻塞用法和非阻塞用法,如果 key 中没有消息,则返回空。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

127.0.0.1:6379> xread streams message 0
1) 1) "message"
   2) 1) 1) "1-0"
         2) 1) "key1"
            2) "value1"
            3) "key2"
            4) "value2"
      2) 1) "1-2"
         2) 1) "key1"
            2) "value1"
            3) "key2"
            4) "value2"
      3) 1) "1604476672762-0"
         2) 1) "key3"
            2) "value3"

以上命令在非阻塞模式输出了所有的消息,因为不存在 ID 比 0 还小的消息,所以输出了所有的消息。

阻塞模式中,可以使用 $ 符号来获取最新的消息。如果在指定超时时间内没有新的消息,则返回空。

1
2
3
4
5
6
7
8
9

127.0.0.1:6379> xread count 10 block 10000 streams message $
(nil)
(10.02s)
127.0.0.1:6379> xread count 10 block 10000 streams message $
1) 1) "message"
   2) 1) 1) "1604478070071-0"
         2) 1) "keyblock2"
            2) "value"
(5.00s)

输入命令后,可以观察到命令没有任何输出,此时新开一个 redis-cli,输入 xadd message * keyblock2 value 将一条新的消息添加到 key 中,可以看到上面的命令返回了刚才添加的值和阻塞时间。

xlen命令

语法格式:

XLEN key

返回 key 中消息的数量,如果 key 不存在,则会返回 0。即使 key 中消息的数量为 0,key 也不会被自动删除,因为可能还存在和 key 关联的消费者组。

1
2

127.0.0.1:6379> xlen message
(integer) 5

返回 message 中所有消息的数量。

xrange命令

语法如下:

XRANGE key start end [COUNT count]

  • key,指定 stream 的名字
  • start,起始 ID
  • end,终止 ID
  • [COUNT count],读取的数量

该命令返回与给定的 ID 范围相匹配的消息。ID 的范围由 start 和 end 参数来指定。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

127.0.0.1:6379> xrange message - +
1) 1) "1-0"
   2) 1) "key1"
      2) "value1"
      3) "key2"
      4) "value2"
2) 1) "1-2"
   2) 1) "key1"
      2) "value1"
      3) "key2"
      4) "value2"
3) 1) "1604476672762-0"
   2) 1) "key3"
      2) "value3"
4) 1) "1604478059261-0"
   2) 1) "keyblock"
      2) "value"
5) 1) "1604478070071-0"
   2) 1) "keyblock2"
      2) "value"

此命令由两个特殊的 ID,使用 - 表示最小的 ID 值,使用 + 表示最大的 ID 值,可以查询所有的消息。

1
2
3
4
5
6
7
8
9
10
11

127.0.0.1:6379> xrange message 1 1
1) 1) "1-0"
   2) 1) "key1"
      2) "value1"
      3) "key2"
      4) "value2"
2) 1) "1-2"
   2) 1) "key1"
      2) "value1"
      3) "key2"
      4) "value2"

即使 ID 不完整也可以,只输入 ID 值会输出所有拥有相同 ID 不同序列号的消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14

127.0.0.1:6379> xrange message - + count 3
1) 1) "1-0"
   2) 1) "key1"
      2) "value1"
      3) "key2"
      4) "value2"
2) 1) "1-2"
   2) 1) "key1"
      2) "value1"
      3) "key2"
      4) "value2"
3) 1) "1604476672762-0"
   2) 1) "key3"
      2) "value3"

使用 count 参数可以限制输出消息的数量。

通过简单的循环可以使用少量内存迭代一个 key 中所有的值,只需要将上次迭代的结果中最大的 ID 作为下一次迭代的起始 ID 即可。

xrevrange命令

语法说明:

XREVRANGE key end start [COUNT count]

xrevrange 命令和 xrange 命令语法完全相同,只有一点不同,xrevrange 是反向遍历的,不再赘述。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

127.0.0.1:6379> xrevrange message + -
1) 1) "1604478070071-0"
   2) 1) "keyblock2"
      2) "value"
2) 1) "1604478059261-0"
   2) 1) "keyblock"
      2) "value"
3) 1) "1604476672762-0"
   2) 1) "key3"
      2) "value3"
4) 1) "1-2"
   2) 1) "key1"
      2) "value1"
      3) "key2"
      4) "value2"
5) 1) "1-0"
   2) 1) "key1"
      2) "value1"
      3) "key2"
      4) "value2"

xtrim命令

语法说明:

XTRIM key MAXLEN [~] count

  • key,指定 stream 的名字
  • maxlen,指定修剪策略,当前只实现了这一种
  • [~],是否近似修剪
  • count,修剪后的数量

xtrim 命令会从 ID 值比较小的消息开始丢弃。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

127.0.0.1:6379> xread streams message 0
1) 1) "message"
   2) 1) 1) "1-0"
         2) 1) "key1"
            2) "value1"
            3) "key2"
            4) "value2"
      2) 1) "1-2"
         2) 1) "key1"
            2) "value1"
            3) "key2"
            4) "value2"
      3) 1) "1604476672762-0"
         2) 1) "key3"
            2) "value3"
      4) 1) "1604478059261-0"
         2) 1) "keyblock"
            2) "value"
      5) 1) "1604478070071-0"
         2) 1) "keyblock2"
            2) "value"
127.0.0.1:6379> xtrim message maxlen 4
(integer) 1

xtrim 命令的返回值是修剪掉的 ID 的数量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

127.0.0.1:6379> xrange message - +
1) 1) "1-2"
   2) 1) "key1"
      2) "value1"
      3) "key2"
      4) "value2"
2) 1) "1604476672762-0"
   2) 1) "key3"
      2) "value3"
3) 1) "1604478059261-0"
   2) 1) "keyblock"
      2) "value"
4) 1) "1604478070071-0"
   2) 1) "keyblock2"
      2) "value"

再次查看可以看到 key 中的消息数量被修剪掉了一个,只剩下了四个。

1
2

127.0.0.1:6379> xtrim message maxlen ~ 2
(integer) 0

如果使用了 ~ 参数,则可能不会进行修剪。此参数告诉 redis 在能够删除整个宏节点时才执行修剪,这样做效率更高,并且可以保证消息的数量不小于所需要的数量。

xdel命令

语法说明:

XDEL key ID [ID ...]

  • key,指定 stream 的名字
  • ID [ID ...],需要删除的 ID 值

xdel 命令用于从 key 中删除指定 ID 的消息,当 ID 不存在时,返回的数目可能和删除的数目不一致。在执行 xdel 命令时,redis 并不会在内存中删除对应的消息,而只会把它标记为删除,在所有节点都被删除之后整个节点被销毁,内存被回收。

1
2
3
4
5
6
7
8
9
10
11
12

127.0.0.1:6379> xdel message 1-2
(integer) 1
127.0.0.1:6379> xrange message - +
1) 1) "1604476672762-0"
   2) 1) "key3"
      2) "value3"
2) 1) "1604478059261-0"
   2) 1) "keyblock"
      2) "value"
3) 1) "1604478070071-0"
   2) 1) "keyblock2"
      2) "value"

删除了 ID 为 1-2 的消息。

xgroup命令

语法说明:

XGROUP [CREATE key groupname id-or-] [DESTROY key groupname] [CREATECONSUMER key groupname consumername] [DELCONSUMER key groupname consumername]

  • [CREATE key groupname id-or-,分组将可以读取指定 key 的新消息,将不能读取历史消息。也可以指定任意的开始 ID。
  • [SETID key groupname id-or-$],重新给已存在的分组设置消息读取的起点。例如将起点设置为 0就可以重新读取所有的历史消息
  • [DESTROY key groupname],销毁指定 key 中的一个分组
  • [CREATECONSUMER key groupname consumername],在指定的 key 和指定的分组中创建一个消费者。当某个命令提及了新的消费者名称时,也会自动创建新的消费者。
  • [DELCONSUMER key groupname consumername],在指定的 key 和指定的分组中销毁一个消费者。

xgroup 是一个命令组,可以通过不同的关键字执行不同的命令。

1
2
3
4
5
6
7
8
9
10
11

127.0.0.1:6379> xgroup create message read_group $
OK
127.0.0.1:6379> xreadgroup group read_group read streams message >
(nil)
127.0.0.1:6379> xadd message * readkey readvalue
"1604494179721-0"
127.0.0.1:6379> xreadgroup group read_group read streams message >
1) 1) "message"
   2) 1) 1) "1604494179721-0"
         2) 1) "readkey"
            2) "readvalue"

使用 creat 命令创建一个 read_group 分组,指定 ID 的起点为最后一个 ID,直接读取的话是空值(xreadgroup命令下面说)。

使用 xadd 命令添加一条新的消息,再次读取发现可以正常读取到新加入的消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

127.0.0.1:6379> xgroup setid message read_group 0
OK
127.0.0.1:6379> xreadgroup group read_group read streams message >
1) 1) "message"
   2) 1) 1) "1604476672762-0"
         2) 1) "key3"
            2) "value3"
      2) 1) "1604478059261-0"
         2) 1) "keyblock"
            2) "value"
      3) 1) "1604478070071-0"
         2) 1) "keyblock2"
            2) "value"
      4) 1) "1604494179721-0"
         2) 1) "readkey"
            2) "readvalue"

使用 setid 命令重新设置读取 ID 的起点,可以读取到所有的历史消息。

1
2
3
4
5
6
7
8
9

127.0.0.1:6379> xinfo groups message
1) 1) "name"
   2) "read_group"
   3) "consumers"
   4) (integer) 1
   5) "pending"
   6) (integer) 4
   7) "last-delivered-id"
   8) "1604494179721-0"

使用 xinfo 命令查询新创建的分组信息,可以看到分组名字,消费者数量,最后加入的消息的 ID 值(xinfo 命令下边说)。

1
2
3
4
5
6
7

127.0.0.1:6379> xinfo consumers message read_group
1) 1) "name"
   2) "read"
   3) "pending"
   4) (integer) 4
   5) "idle"
   6) (integer) 476449

使用 xinfo 命令查看新加入的消费者的信息,可以看到消费者名字,处于 pending(待处理) 状态的消息数量(pending 状态下边说)。

1
2
3
4

127.0.0.1:6379> xgroup delconsumer message read_group read
(integer) 4
127.0.0.1:6379> xinfo consumers message read_group
(empty array)

使用 delconsumer 命令删除分组中的消费者,使用 xinfo 命令查看分组中的消费者,返回一个空数组,说明删除成功。delconsumer 命令的返回值为当前消费者所拥有的 pending(待处理) 状态的消息数量。

1
2
3
4

127.0.0.1:6379> xgroup destroy message read_group
(integer) 1
127.0.0.1:6379> xinfo groups message
(empty array)

使用 destroy 命令删除 message 中的分组,使用 xinfo 命令查看,返回一个空数组,说明删除成功。destroy 命令的返回值为删除成功的分组数量。注意:即使由活跃的消费者和 pending(待处理) 状态的消息,分组仍然会被删除,需要确保在需要时才执行此命令。

xinfo命令

语法说明:

XINFO [CONSUMERS key groupname] [GROUPS key] [STREAM key] [HELP]

  • [CONSUMERS key groupname],查询指定 key 和指定分组中的消费者信息。
  • [GROUPS key],查询指定 key 中的分组信息。
  • [STREAM key],查询指定 key 中所有的信息。

1
2
3
4
5
6
7

127.0.0.1:6379> xinfo consumers message read_group
1) 1) "name"
   2) "read"
   3) "pending"
   4) (integer) 4
   5) "idle"
   6) (integer) 206796

读取 message 的 read_group 分组中所有的消费者信息。

1
2
3
4
5
6
7
8
9

127.0.0.1:6379> xinfo groups message
1) 1) "name"
   2) "read_group"
   3) "consumers"
   4) (integer) 1
   5) "pending"
   6) (integer) 4
   7) "last-delivered-id"
   8) "1604494179721-0"

读取 message 中所有的分组信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

127.0.0.1:6379> xinfo stream message
 1) "length"
 2) (integer) 4
 3) "radix-tree-keys"
 4) (integer) 1
 5) "radix-tree-nodes"
 6) (integer) 2
 7) "last-generated-id"
 8) "1604494179721-0"
 9) "groups"
10) (integer) 1
11) "first-entry"
12) 1) "1604476672762-0"
    2) 1) "key3"
       2) "value3"
13) "last-entry"
14) 1) "1604494179721-0"
    2) 1) "readkey"
       2) "readvalue"

读取 message 所有的信息。

xpending命令

语法说明:

XPENDING key group [start end count] [consumer]

  • key,指定的 key
  • group,指定的分组
  • [start end count],起始 ID 和结束 ID 还有数量
  • consumer,消费者名字

1
2
3
4
5
6

127.0.0.1:6379> xpending message readgroup
1) (integer) 2
2) "1604496633846-0"
3) "1604496640734-0"
4) 1) 1) "read"
      2) "2"

xpending 命令可以查看对应分组中未确认的消息的数量和其所对应的消费者的名字还有起始和终止 ID。

1
2
3
4
5
6
7
8
9

127.0.0.1:6379> xpending message readgroup - + 10 read
1) 1) "1604496633846-0"
   2) "read"
   3) (integer) 513557
   4) (integer) 4
2) 1) "1604496640734-0"
   2) "read"
   3) (integer) 482927
   4) (integer) 1

使用 xpending 命令可以查看处于未确认状态的消息的具体信息。

xreadgroup命令

语法说明:

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]

  • GROUP,固定
  • group,分组名
  • consumer,消费者名
  • [COUNT count],每次获取消息的数量
  • [BLOCK milliseconds],阻塞模式和超时时间
  • [NOACK],不需要确认消息,适用于不怎么重要的可以丢失的消息
  • STREAMS,固定
  • key [key ...],指定的 key
  • ID [ID ...],指定的消息 ID,> 指定读取所有未消费的消息,其他值指定被挂起的消息

xreadgroup 命令通过与消费者组和消费者的结合可以做到消息的读取与确认,在 xread 的基础上细化了读取消息操作。

从语法上来看,xreadgroup 和 xread 命令几乎相同,xreadgroup 命令多了一个强制性的参数:GROUP groupname consumername。

当多个消费者同时消费同一个消息队列时,会重复消费相同的消息,每条消息都会被每个消费者消费一遍。但是如果想要多个消费者协作消费同一个消息队列时,就需要用到消费者组。

例如推送系统,肯定是不可以重复推送的,也就是说每条消息只可以被消费一遍,这时就可以使用多个消费者来消费同一个推送队列,降低每个消费者系统的压力。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

127.0.0.1:6379> flushdb
OK
127.0.0.1:6379> xadd message * key1 value1
"1604496633846-0"
127.0.0.1:6379> xadd message * key2 value2
"1604496640734-0"
127.0.0.1:6379> xgroup create message readgroup 0
OK
127.0.0.1:6379> xadd message * key3 value3
"1604496696501-0"
127.0.0.1:6379> xadd message * key4 value4
"1604496704823-0"
127.0.0.1:6379> xreadgroup group readgroup read count 1 streams message >
1) 1) "message"
   2) 1) 1) "1604496633846-0"
         2) 1) "key1"
            2) "value1"
127.0.0.1:6379> xreadgroup group readgroup read count 1 streams message >
1) 1) "message"
   2) 1) 1) "1604496640734-0"
         2) 1) "key2"
            2) "value2"

从头开始,清空数据库,重新给 message 添加消息,创建分组,使用 readgroup 命令读取最新的消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

127.0.0.1:6379> xinfo consumers message readgroup
1) 1) "name"
   2) "read"
   3) "pending"
   4) (integer) 2
   5) "idle"
   6) (integer) 53146
127.0.0.1:6379> xpending message readgroup - + 10 read
1) 1) "1604496633846-0"
   2) "read"
   3) (integer) 513557
   4) (integer) 4
2) 1) "1604496640734-0"
   2) "read"
   3) (integer) 482927
   4) (integer) 1

读取了两条消息,使用 xinfo 命令查看,可以看到有两条消息处于 pending(待处理)状态,使用 xpending 命令可以查看处于未确认状态的消息的具体信息。

xack命令

语法说明:

XACK key group ID [ID ...]

  • key,指定的 key
  • group,指定的 group
  • D [ID ...],需要确认的消息的 ID

xack 命令从 pending 队列中删除挂起的消息,也就是确认之前未确认的消息。当使用 xreadgroup 命令读取消息时,消息同时被存储到 PEL 中,等待被确认,调用 xack 命令可以从 PEL 中删除挂起的消息并且释放内存,确保不丢失消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

127.0.0.1:6379> xack message readgroup 1604496633846-0
(integer) 1
127.0.0.1:6379> xpending message readgroup
1) (integer) 1
2) "1604496640734-0"
3) "1604496640734-0"
4) 1) 1) "read"
      2) "1"
127.0.0.1:6379> xinfo consumers message readgroup
1) 1) "name"
   2) "read"
   3) "pending"
   4) (integer) 1
   5) "idle"
   6) (integer) 418489

使用 xack 命令确认一条消息,再次使用 xpending 命令查看未确认消息的数量,只剩一条未确认消息,使用 xinfo 命令查看处于 pending 状态的消息数量也为 1,确认消息成功。

xclaim命令

语法说明:

XCLAIM key group consumer min-idle-time ID [ID ...] [IDLE ms] [TIME ms-unix-time] [RETRYCOUNT count] [FORCE] [JUSTID]

  • key,指定的 key
  • group,指定的分组
  • consumer,指定的消费者
  • min-idle-time,指定消息最小空闲数,指定空闲了多久的消息会被选中
  • ID [ID ...],消息的 ID
  • [IDLE ms],设置消息的空闲时间,如果不提供,默认为 0
  • [TIME ms-unix-time],和IDLE相同,unix 时间戳
  • RETRYCOUNT,设置重试次数,通常 xclaim 不会改变这个值,它通常用于 xpending 命令,用来发现一些长时间未被处理的消息。
  • FORCE,在 PEL 中创建待处理消息,即使指定的 ID 尚未分配给客户端的PEL。
  • JUSTID,只返回认领的消息 ID 数组,不返回实际消息。

xclaim 命令用于更改未确认消息的所有权,如果有消费者在读取了消息之后未处理完成就挂掉了,那么消息会一直在 pending 队列中,占用内存,这时需要使用 xclaim 命令更改此条消息的所属者,让其他的消费者去消费这条消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14

127.0.0.1:6379> xreadgroup group readgroup read2 count 1 streams message >
1) 1) "message"
   2) 1) 1) "1604496696501-0"
         2) 1) "key3"
            2) "value3"
127.0.0.1:6379> xack message readgroup 1604496696501-0
(integer) 1
127.0.0.1:6379> xpending message readgroup - + 10 read
1) 1) "1604496640734-0"
   2) "read"
   3) (integer) 1258517
   4) (integer) 2
127.0.0.1:6379> xpending message readgroup - + 10 read2
(empty array)

新创建一个消费者,读取一条消息,然后将消息确认掉,查看两个消费者中未确认消息的数量,read 有一条,read2 没有未确认的消息。

1
2
3
4
5
6
7
8
9
10
11

127.0.0.1:6379> xclaim message readgroup read2 0 1604496640734-0
1) 1) "1604496640734-0"
   2) 1) "key2"
      2) "value2"
127.0.0.1:6379> xpending message readgroup - + 10 read2
1) 1) "1604496640734-0"
   2) "read2"
   3) (integer) 3724
   4) (integer) 4
127.0.0.1:6379> xpending message readgroup - + 10 read
(empty array)

使用 xclaim 命令将消息所有权转移给 read2 这个消费者,可以看到,消费者 read2 的 pending 队列中有一条未确认消息,消费者 read 的 pending 队列中已经没有消息了。文章来源地址https://www.toymoban.com/news/detail-461670.html

到了这里,关于redis【stream】:对redis流数据类型的详细介绍的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • redis 的基本介绍以及 五种 数据类型

    一句话:redis 是一个开源的、使用C语言编写的、支持网络交互,基于内存也可持久化的 key-value (非关系型)数据库 redis作者博客,有兴趣的小伙伴可以去逛一逛:http://github.com/antirez 关于key 的命名,不要太长也不要太短,Key太长越占资源,太短可读性又很差,key 之间的单词

    2024年02月11日
    浏览(30)
  • Python数据类型详细介绍

    一、数字(Number) 1. int (整型,长整型) 2. float (浮点型) 3. complex(复数) 二、布尔(Boolean) 三、字符串类型(String)  四、列表 1.  创建列表 2. 列表的索引  3. 列表的切片 4. 列表加法和乘法 5.列表的修改 6. 列表的删除 7. 列表追加、插入和扩展 8. 列表元素查找 9. 列表

    2024年02月05日
    浏览(28)
  • 深入学习 redis - Stream、Geospatial、HyperLogLog、Bitmap、Bitfields 类型扩展

    目录 前言 Stream geospatial HyperLogLog Bitmaps Bitfields redis 中最关键的五个数据类型 String、List、Hash、Set、Zset 应用最广泛,同时 redis 也推出了额外的 5 个数据类型,他们分别是针对特殊场景才进行的应用的. Ps:这几种类型的具体使用不用记,记你也记不住,因为不常用,因此我们

    2024年02月15日
    浏览(51)
  • Redis之数据类型String、List、Hash、Set、Sorted Set(详细)

    一、String数据类型 1、SET/GET/APPEND/STRLEN (1) APPEND (2) SET/STRLEN 2、 INCR/ DECR/INCRBY/DECRBY (1)INCR/ DECR (2) INCRBY/DECRBY INCRBY key increment:key值增加指定的整数DECRBY key decrement:key值减少指定的整数  3、GETSET 4、 SETEX  5、SETNX 6、MSET/MGET/MSETNX  二、List数据类型 1、LPUSH/LPUSHX/

    2024年02月11日
    浏览(28)
  • 本文通过实例介绍了Redis的基础知识、数据类型、数据结构以及典型应用场景 值得一看!

    作者:禅与计算机程序设计艺术 2017年,Redis是基于MIT许可发布的一个开源的高性能键值数据库,其开发语言为C语言。它提供了多种数据类型(strings、hashes、lists、sets、sorted sets等),分布式支持(可横向扩展),内存存储,持久化功能,事务处理功能等。作为一种高性能的

    2024年02月06日
    浏览(46)
  • C语言的数据类型(整型、字符型,浮点型等详细介绍、ASCLL表以及常量、变量的详细介绍)

    C语言的数据类型基本介绍         在C语言中,数据类型可以分为:基本数据类型(整型、浮点型/实型、字符型)、构造数据类型(数组、指针、结构体、共用体、枚举)以及空类型(万能类型void)。常用的有:整形(int)、字符型(char)、短整型(short)、长整型(

    2024年02月07日
    浏览(37)
  • Stream流 - 获取Stream和转换操作(含基本数据类型流)

    Stream流是jdk1.8对集合对象功能的增强,可以通过将集合转换为流模型,通过声明的方式对集合中的每个元素进行一系列并行或者串行的流水线操作。 Stream只要给出对其包含的元素执行什么操作,Stream就会隐式地在内部进行遍历,并给出响应的数据转换。 Stream可以并行化操作

    2024年02月16日
    浏览(21)
  • 【面试题】JDK1.8新特性Stream详细介绍

    Java 8引入了Stream API,它提供了一种函数式编程的方式来处理集合数据。Stream API提供了丰富的操作方法,可以对集合进行筛选、映射、排序等操作。下面是一些常用的Stream集合操作: 筛选(Filter):使用 filter 方法可以根据指定的条件筛选出集合中符合条件的元素。例如: 上

    2024年02月16日
    浏览(26)
  • 博客摘录「 Redis( 缓存篇 ==> 超详细的缓存介绍与数据一致性解决方案 &; 代码实现」

    Redis 旁路缓存 由于高并发原因,先更新数据库和先更新缓存策略都会因为延迟时间而导致数据不一致问题。 两种策略 先删除缓存,再更新数据库; 先更新数据库,再删除缓存。 因为缓存的写入通常要远远快于数据库的写入 ,所以先更新数据库再删缓存,删完缓存,下次访

    2024年02月15日
    浏览(28)
  • Redis学习指南(9)--Redis的列表类型介绍

    Redis的列表(List)数据类型是一种有序的字符串集合,支持从两端添加和移除元素。以下是列表数据类型的特点: 有序性 : 元素按照插入顺序排列。 支持队列和栈操作 : 从两端添加和移除元素。 可存储重复元素 : 允许列表中存在相同的元素。 虽然Redis中的列表和Java语言中的

    2024年02月01日
    浏览(35)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包