基于redis stream实现一个可靠的消息队列

这篇具有很好参考价值的文章主要介绍了基于redis stream实现一个可靠的消息队列。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

我们使用的库为redisson。
添加元素到队列很简单,用RStream.add方法即可。

如何从队列获取元素?由于我们打算实现kafka那样的consumer group机制,所以,读操作要用RStream.readGroup函数(XREADGROUP命令),该命令有阻塞和非阻塞版本,简单起见,我们使用非阻塞版本(不带BLOCK参数),由应用层来定时轮询。Id参数我们设置为StreamReadGroupArgs.neverDelivered(),相当于redis命令里的>,每次只取最新的消息。相关的代码样例如下:

public List<Record> poll(String groupName, String consumerName) {
    return toConsumerRecords(redisMsgQueue.readGroup(groupName, consumerName, StreamReadGroupArgs.neverDelivered()));
}

追求可靠性,则须封装XACK命令。引入ACK之后,自然要考虑对那些ACK超时的消息如何补救。为此,我们要结合XPENDING和XCLAIM两条命令,利用前者查出目前尚未ACK的消息集M(术语叫PEL,pending entry list),利用后者将M中已超时的那部分(通过min-idle-time参数过滤)分配给其它存活的consumer去做补救处理。为何XPENDING出来之后要做XCLAIM呢?主要是我们希望做补救处理的consumer只能有一位,XCLAIM命令在多consumer竞争时能保证独占性,从而避免消息的多次处理。XPENDING遍历PEL也是有技巧的,一是要分批获取,二是由于XPENDING的startId和endId是闭区间, 需将用于遍历的cursor Id转成开区间(参考redis的streamId说明,具体做法是将streamId的sequence部分加1)。参考实现如下:

public List<Record> fetchNotAck(String groupName, String consumerName, long ackTimeout) {
    PendingResult pr = redisMsgQueue.getPendingInfo(groupName);
    if (pr.getTotal() == 0) {
        return Collections.emptyList();	
    }
    StreamMessageId curId = pr.getLowestId();
    StreamMessageId maxId = pr.getHighestId();
    List<Record> totalRecs = new ArrayList<>();
    while (true) {
        List<PendingEntry> entries = redisMsgQueue.listPending(groupName, curId, maxId, 100);
        if (entries.isEmpty()) {
            break;
        }
        curId = makeExclusive(entries.get(entries.size() - 1).getId());
        Map<StreamMessageId, Map<String, T> > rawRes = redisMsgQueue.claim(groupName, consumerName, ackTimeout, TimeUnit.SECONDS, entries.stream.map(PendingEntry::getId).toArray(StreamMessageId[]::new));
        totalRecs.addAll(toConsumerRecords(rawRes));
    }
    return totalRecs;
}

private StreamMessageId makeExclusive(StreamMessageId id) {
    return new StreamMessageId(id.getId0(), id.getId1() + 1);
}

最后,stream的定时清理动作,解决内存占用过大的问题,使用lua脚本包装XTRIM命令,根据上一次清理时的stream size,计算本次要保留的元素个数。这里还有一个特殊点,如果一条未ACK消息被XTRIM或XDEL从stream里删除,其并不会从PEL里同步删除,用XCLAIM也获取不了它,这就成了一条PEL垃圾数据,据说redis7.0会解决该问题。stream清理算法如下:

private static final String SHRINK_SCRIPT = "local nowSz = redis.call('XLEN', KEYS[1]);redis.call('XTRIM', KEYS[1], 'MAXLEN', nowSz - ARGV[1]); local leftSz = redis.call('XLEN', KEYS[1]); redis.call('HSET', KEYS[2], KEYS[1], leftSz);return leftSz;";

public long shrinkByLastSize(long lastSize, String queueSizeMapName) {
    RScript  rScript = getRedissonClient().getScript(LongCodec.INSTANCE);
    return rScript.eval(RScript.Mode.READ_WRITE, SHRINK_SCRIPT, RScript.ReturnType.INTEGER, 
        Arrays.asList(this.streamName, queueSizeMapName), lastSize);
}

其中,queueSizeMap是一个redis hash,用于存储每个stream上次清理后的size。size的设置和stream的XTRIM操作必须是原子操作,所以放一个lua脚本里。另外,stream 和queueSizeMap有可能不在一个slot里(redis cluster下),要考虑在名字里加{}确保这俩在一起,否则lua脚本会报错。文章来源地址https://www.toymoban.com/news/detail-525840.html

到了这里,关于基于redis stream实现一个可靠的消息队列的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • 基于Redis实现消息队列的实践

    基于Redis实现消息队列的实践

    消息队列是一种典型的发布/订阅模式,是专门为异步化应用和分布式系统设计的,具有高性能、稳定性及可伸缩性的特点,是开发分布式系统和应用系统必备的技术之一。目前,针对不同的业务场景,比较成熟可靠的消息中间件产品有RocketMQ、Kafka、RabbitMq等,基于Redis再去实

    2024年02月07日
    浏览(9)
  • redis实现消息队列

    redis实现消息队列

    消息队列(Message Queue)是一种常见的软件架构模式,用于在分布式系统中传递和处理异步消息。它解耦了发送消息的应用程序和接收消息的应用程序之间的直接依赖关系,使得消息的发送者和接收者可以独立地演化和扩展。 消息队列的基本原理是发送者将消息发送到一个中

    2024年02月09日
    浏览(9)
  • Redis如何实现消息队列

    Redis可以通过List数据结构实现简单的消息队列。在Redis中,我们可以使用 LPUSH 命令将消息推送到列表的左侧,使用 RPOP 命令从列表的右侧获取消息。这样,就可以实现一个先进先出(FIFO)的消息队列。 下面是一个使用Redis实现消息队列的简单示例: 首先,确保你已经安装了

    2024年02月14日
    浏览(7)
  • Java使用Redis实现消息队列

    Java使用Redis实现消息队列

    近期刷Java面试题刷到了“如何使用Redis实现消息队列”,解答如下: 一般使用 list 结构作为队列, rpush 生产消息, lpop 消费消息。当 lpop 没有消息的时候,要适当sleep 一会再重试。若不使用sleep,则可以用指令blpop(该指令在没有消息的时候,它会阻塞住直到消息到来) rp

    2024年02月21日
    浏览(9)
  • 使用SpringBoot利用Redis实现消息队列

    随着互联网的发展,消息队列的应用越来越广泛。消息队列可以解决系统之间的异步通信问题,提高系统的可靠性和可扩展性。在Java开发中,Redis作为一种高性能的缓存和消息队列系统,被广泛应用。本文将介绍如何使用SpringBoot中利用Redis实现消息队列。 在Redis中,List是一种

    2024年02月14日
    浏览(7)
  • PHP使用Redis实战实录5:Redis实现消息队列

    PHP使用Redis实战实录系列 PHP使用Redis实战实录1:宝塔环境搭建、6379端口配置、Redis服务启动失败解决方案 PHP使用Redis实战实录2:Redis扩展方法和PHP连接Redis的多种方案 PHP使用Redis实战实录3:数据类型比较、大小限制和性能扩展 PHP使用Redis实战实录4:单例模式和面向过程操作

    2024年02月11日
    浏览(9)
  • Spring Boot 整合Redis实现消息队列

      本篇文章主要来讲Spring Boot 整合Redis实现消息队列,实现redis用作消息队列有多种方式,比如: 基于 List 的 rpush+lpop 或 lpush+rpop 基于 List 的 rpush+blpop 或 lpush+brpop (阻塞式获取消息) 基于 Sorted Set 的优先级队列 Redis Stream (Redis5.0版本开始) Pub/Sub 机制   不过这里讲的是

    2024年02月13日
    浏览(14)
  • Redis系列14:使用List实现消息队列

    Redis系列14:使用List实现消息队列

    Redis系列1:深刻理解高性能Redis的本质 Redis系列2:数据持久化提高可用性 Redis系列3:高可用之主从架构 Redis系列4:高可用之Sentinel(哨兵模式) Redis系列5:深入分析Cluster 集群模式 追求性能极致:Redis6.0的多线程模型 追求性能极致:客户端缓存带来的革命 Redis系列8:Bitmap实现

    2024年02月07日
    浏览(8)
  • 基于springboot+Redis的前后端分离项目之消息队列(六)-【黑马点评】

    基于springboot+Redis的前后端分离项目之消息队列(六)-【黑马点评】

    🎁🎁资源文件分享 链接:https://pan.baidu.com/s/1189u6u4icQYHg_9_7ovWmA?pwd=eh11 提取码:eh11 我们来回顾一下下单流程 当用户发起请求,此时会请求nginx,nginx会访问到tomcat,而tomcat中的程序,会进行串行操作,分成如下几个步骤 1、查询优惠卷 2、判断秒杀库存是否足够 3、查询订单

    2024年02月12日
    浏览(13)
  • 5. Redis优化秒杀、Redis消息队列实现异步秒杀

    5. Redis优化秒杀、Redis消息队列实现异步秒杀

    承接Redis - 优惠券秒杀、库存超卖、分布式锁、Redisson文章 代码中有大量数据库的操作,整个业务性能并不是很好 平均耗时达到了497毫秒 首先回顾一下之前秒杀业务的流程 前端发起请求到达我们的Nginx,然后Nginx会把我们的请求负载均衡到我们的tomcat 而在tomcat中执行各种逻辑

    2024年02月13日
    浏览(8)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包