【Redis进阶】一文搞懂Redisson的看门狗机制底层实现

这篇具有很好参考价值的文章主要介绍了【Redis进阶】一文搞懂Redisson的看门狗机制底层实现。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1. 看门狗机制概述

看门狗机制是Redission提供的一种自动延期机制,这个机制使得Redission提供的分布式锁是可以自动续期的

private long lockWatchdogTimeout = 30 * 1000;

看门狗机制提供的默认超时时间是30*1000毫秒,也就是30秒

如果一个线程获取锁后,运行程序到释放锁所花费的时间大于锁自动释放时间(也就是看门狗机制提供的超时时间30s),那么Redission会自动给redis中的目标锁延长超时时间。

在Redission中想要启动看门狗机制,那么我们就不用获取锁的时候自己定义leaseTime(锁自动释放时间)

如果自己定义了锁自动释放时间的话,无论是通过lock还是tryLock方法,都无法启用看门狗机制。

但是,如果传入的leaseTime为-1,也是会开启看门狗机制的。

分布式锁是不能设置永不过期的,这是为了避免在分布式的情况下,一个节点获取锁之后宕机从而出现死锁的情况,所以需要个分布式锁设置一个过期时间。但是这样会导致一个线程拿到锁后,在锁的过期时间到达的时候程序还没运行完,导致锁超时释放了,那么其他线程就能获取锁进来,从而出现问题。

所以,看门狗机制的自动续期,就很好地解决了这一个问题。


2. 源码解读

进入tryLock方法,这里的tryLock(waitTime, -1, unit)有三个参数

  1. waitTime:获取锁的最大等待时间(没有传默认为-1)
  2. leaseTime:锁自动释放的时间(没有传的话默认-1)
  3. unit:时间的单位(等待时间和锁自动释放的时间单位)
public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {
    return tryLock(waitTime, -1, unit);
}
    @Override
    public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        long time = unit.toMillis(waitTime);
        long current = System.currentTimeMillis();
        long threadId = Thread.currentThread().getId();
        Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return true;
        }
        
        time -= System.currentTimeMillis() - current;
        if (time <= 0) {
            acquireFailed(waitTime, unit, threadId);
            return false;
        }
        
        current = System.currentTimeMillis();
        RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
        if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
            if (!subscribeFuture.cancel(false)) {
                subscribeFuture.onComplete((res, e) -> {
                    if (e == null) {
                        unsubscribe(subscribeFuture, threadId);
                    }
                });
            }
            acquireFailed(waitTime, unit, threadId);
            return false;
        }

        try {
            time -= System.currentTimeMillis() - current;
            if (time <= 0) {
                acquireFailed(waitTime, unit, threadId);
                return false;
            }
        
            while (true) {
                long currentTime = System.currentTimeMillis();
                ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    return true;
                }

                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(waitTime, unit, threadId);
                    return false;
                }

                // waiting for message
                currentTime = System.currentTimeMillis();
                if (ttl >= 0 && ttl < time) {
                    subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                }

                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(waitTime, unit, threadId);
                    return false;
                }
            }
        } finally {
            unsubscribe(subscribeFuture, threadId);
        }
//        return get(tryLockAsync(waitTime, leaseTime, unit));
    }

这上面一坨主要是锁重试的代码,感兴趣可以看【Redis】4.万字文章带你深入Redisson与源码解读(建议收藏)——起名方面没有灵感的博客-CSDN博客

而看门狗机制的相关代码主要在tryAcquire方法上,在这个方法里主要看到方法是tryAcquireAsync(waitTime, leaseTime, unit, threadId)

private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}

由于在tryLock方法中没传leaseTime,所以leaseTime为默认值-1

调用tryLockInnerAsync,如果获取锁失败,返回的结果是这个key的剩余有效期,如果获取锁成功,则返回null。

获取锁成功后,如果检测不存在异常并且获取锁成功`(ttlRemaining == null)。

那么则执行this.scheduleExpirationRenewal(threadId);来启动看门狗机制。

private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    if (leaseTime != -1L) {
        return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    } else {
        //如果获取锁失败,返回的结果是这个key的剩余有效期
        RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        //上面获取锁回调成功之后,执行这代码块的内容
        ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
            //不存在异常
            if (e == null) {
                //剩余有效期为null
                if (ttlRemaining == null) {
                    //这个函数是解决最长等待有效期的问题
                    this.scheduleExpirationRenewal(threadId);
                }

            }
        });
        return ttlRemainingFuture;
    }
}
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    internalLockLeaseTime = unit.toMillis(leaseTime);

    return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                          // 锁不存在,则往redis中设置锁信息
                          "if (redis.call('exists', KEYS[1]) == 0) then " +
                          "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                          "return nil; " +
                          "end; " +
                          // 锁存在
                          "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                          "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                          "return nil; " +
                          "end; " +
                          "return redis.call('pttl', KEYS[1]);",
                          Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

一个锁就对应自己的一个ExpirationEntry类,

EXPIRATION_RENEWAL_MAP存放的是所有的所信息。

根据锁的名称从EXPIRATION_RENEWAL_MAP里面获取锁,如果存在这把锁则冲入,如果不存在,则将这个新锁放置进EXPIRATION_RENEWAL_MAP,并且开启看门狗机制。

private static final ConcurrentMap<String, ExpirationEntry> EXPIRATION_RENEWAL_MAP = new ConcurrentHashMap<>();
private void scheduleExpirationRenewal(long threadId) {
    ExpirationEntry entry = new ExpirationEntry();
    //这里EntryName是指锁的名称
    ExpirationEntry oldEntry = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry);
    if (oldEntry != null) {
        //重入
        //将线程ID加入
        oldEntry.addThreadId(threadId);
    } else {
        //将线程ID加入
        entry.addThreadId(threadId);
        //续约
        this.renewExpiration();
    }
}

首先,从EXPIRATION_RENEWAL_MAP中获取这个锁,接下来定义一个延迟任务task,这个任务的步骤如下

  1. 新创建了一个子线程去反复调用
  2. EXPIRATION_RENEWAL_MAP中获取这把锁,如果这把锁不存在了,说明被删除了,不在需要续期了。
  3. 从锁中获取获得这把锁的线程IDthreadId
  4. 调用renewExpirationAsync方法刷新最长等待时间
  5. 如果刷新成功,则进来递归调用这个函数renewExpiration()

这个任务task设置为 this.internalLockLeaseTime / 3L,也是锁自动释放时间,因为没传,也就是10s。

也就是说,这个延迟任务延迟十秒执行一次。

最后,为这把锁ee设置延迟任务task即可

private void renewExpiration() {
    //先从map里得到这个ExpirationEntry
    ExpirationEntry ee = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
    if (ee != null) {
        //这个是一个延迟任务
        Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            //延迟任务内容
            public void run(Timeout timeout) throws Exception {
                //拿出ExpirationEntry
                ExpirationEntry ent = (ExpirationEntry)RedissonLock.EXPIRATION_RENEWAL_MAP.get(RedissonLock.this.getEntryName());
                if (ent != null) {
                    //从ExpirationEntry拿出线程ID
                    Long threadId = ent.getFirstThreadId();
                    if (threadId != null) {
                        //调用renewExpirationAsync方法刷新最长等待时间
                        RFuture<Boolean> future = RedissonLock.this.renewExpirationAsync(threadId);
                        future.onComplete((res, e) -> {
                            if (e != null) {
                                RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", e);
                            } else {
                                if (res) {
                                    //renewExpirationAsync方法执行成功之后,进行递归调用,调用自己本身函数
                                    //那么就可以实现这样的效果
                                    //首先第一次进行这个函数,设置了一个延迟任务,在10s后执行
                                    //10s后,执行延迟任务的内容,刷新有效期成功,那么就会再新建一个延迟任务,刷新最长等待有效期
                                    //这样这个最长等待时间就会一直续费
                                    RedissonLock.this.renewExpiration();
                                }

                            }
                        });
                    }
                }
            }
        }, 
                                                                              //这是锁自动释放时间,因为没传,所以是看门狗时间=30*1000
                                                                              //也就是10s
                                                                              this.internalLockLeaseTime / 3L, 
                                                                              //时间单位
                                                                              TimeUnit.MILLISECONDS);
        //给当前ExpirationEntry设置延迟任务
        ee.setTimeout(task);
    }
}



// 刷新等待时间
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
    return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                          "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                          "return 1; " +
                          "end; " +
                          "return 0;",
                          Collections.singletonList(getName()),
                          internalLockLeaseTime, getLockName(threadId));
}

最后,在释放锁的时候,就会关闭所有的延迟任务,核心代码如下

public RFuture<Void> unlockAsync(long threadId) {
    RPromise<Void> result = new RedissonPromise();
    RFuture<Boolean> future = this.unlockInnerAsync(threadId);
    future.onComplete((opStatus, e) -> {
        //取消锁更新任务
        this.cancelExpirationRenewal(threadId);
        if (e != null) {
            result.tryFailure(e);
        } else if (opStatus == null) {
            IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + this.id + " thread-id: " + threadId);
            result.tryFailure(cause);
        } else {
            result.trySuccess((Object)null);
        }
    });
    return result;
}

void cancelExpirationRenewal(Long threadId) {
    //获得当前这把锁的任务
    ExpirationEntry task = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
    if (task != null) {
        //当前锁的延迟任务不为空,且线程id不为空
        if (threadId != null) {
            //先把线程ID去掉
            task.removeThreadId(threadId);
        }

        if (threadId == null || task.hasNoThreads()) {
            //然后取出延迟任务
            Timeout timeout = task.getTimeout();
            if (timeout != null) {
                //把延迟任务取消掉
                timeout.cancel();
            }
			//再把ExpirationEntry移除出map
            EXPIRATION_RENEWAL_MAP.remove(this.getEntryName());
        }

    }
}

3. 总结

在使用Redis实现分布式锁的时候,会存在很多问题。

比如说业务逻辑处理时间>自己设置的锁自动释放时间的话,Redis就会按超时情况把锁释放掉,而其他线程就会趁虚而入抢夺锁从而出现问题,因此需要有一个续期的操作。

并且,如果释放锁的操作在finally完成,需要判断一下当前锁是否是属于自己的锁,防止释放掉其他线程的锁,这样释放锁的操作就不是原子性了,而这个问题很好解决,使用lua脚本即可。

Redisson的出现,其中的看门狗机制很好解决续期的问题,它的主要步骤如下:

  1. 在获取锁的时候,不能指定leaseTime或者只能将leaseTime设置为-1,这样才能开启看门狗机制。
  2. tryLockInnerAsync方法里尝试获取锁,如果获取锁成功调用scheduleExpirationRenewal执行看门狗机制
  3. scheduleExpirationRenewal中比较重要的方法就是renewExpiration,当线程第一次获取到锁(也就是不是重入的情况),那么就会调用renewExpiration方法开启看门狗机制。
  4. renewExpiration会为当前锁添加一个延迟任务task,这个延迟任务会在10s后执行,执行的任务就是将锁的有效期刷新为30s(这是看门狗机制的默认锁释放时间)
  5. 并且在任务最后还会继续递归调用renewExpiration

也就是总的流程就是,首先获取到锁(这个锁30s后自动释放),然后对锁设置一个延迟任务(10s后执行),延迟任务给锁的释放时间刷新为30s,并且还为锁再设置一个相同的延迟任务(10s后执行),这样就达到了如果一直不释放锁(程序没有执行完)的话,看门狗机制会每10s将锁的自动释放时间刷新为30s。

而当程序出现异常,那么看门狗机制就不会继续递归调用renewExpiration,这样锁会在30s后自动释放。

或者,在程序主动释放锁后,流程如下:文章来源地址https://www.toymoban.com/news/detail-559685.html

  1. 将锁对应的线程ID移除
  2. 接着从锁中获取出延迟任务,将延迟任务取消
  3. 在将这把锁从EXPIRATION_RENEWAL_MAP中移除。

到了这里,关于【Redis进阶】一文搞懂Redisson的看门狗机制底层实现的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Redis进阶底层原理-Cluster集群底层

    Redis底层原理篇

    2024年02月16日
    浏览(38)
  • 彻底搞懂Redis持久化机制,轻松应对工作面试

    Redis是基于内存存储的数据库,如果遇到服务重启或者崩溃,内存中的数据将会被清空。所以为了确保数据安全性和可靠性,我们需要将内存中的数据持久化到磁盘上。 持久化不仅可以防止由于系统故障、重启或者其他原因导致的数据丢失。还可以用于备份、数据恢复和迁移

    2023年04月20日
    浏览(38)
  • Redis进阶底层原理- 持久化

    Redis-Fork、RDF、RDB底层原理

    2024年02月16日
    浏览(40)
  • Redis进阶底层原理 - 高可用哨兵模式

    Redis底层原理篇

    2024年02月17日
    浏览(34)
  • 【MySQL进阶-08】深入理解innodb存储格式,双写机制,buffer pool底层结构和淘汰策略

    MySql系列整体栏目 内容 链接地址 【一】深入理解mysql索引本质 https://blog.csdn.net/zhenghuishengq/article/details/121027025 【二】深入理解mysql索引优化以及explain https://blog.csdn.net/zhenghuishengq/article/details/124552080 【三】深入理解mysql的索引分类,覆盖索引(失效),回表,MRR https://bl

    2024年02月05日
    浏览(49)
  • redis实战-redis实现分布式锁&redisson快速入门

    前言 集群环境下的并发问题  分布式锁 定义 需要满足的条件 常见的分布式锁 redis实现分布式锁 核心思路 代码实现 误删情况 逻辑说明 解决方案 代码实现 更为极端的误删情况 Lua脚本解决原子性问题 分布式锁-redission redisson的概念 快速入门 总结 在前面我们已经实现了单机

    2024年02月09日
    浏览(52)
  • 【Redis实战】有MQ为啥不用?用Redis作消息队列!?Redis作消息队列使用方法及底层原理高级进阶

     🎉🎉欢迎光临🎉🎉 🏅我是苏泽,一位对技术充满热情的探索者和分享者。🚀🚀 🌟特别推荐给大家我的最新专栏 《Redis实战与进阶》 本专栏纯属为爱发电永久免费!!! 这是苏泽的个人主页可以看到我其他的内容哦👇👇 努力的苏泽 http://suzee.blog.csdn.net/ 我们用的是云

    2024年02月20日
    浏览(44)
  • Redis分布式锁及Redisson的实现原理

    Redis分布式锁 在讨论分布式锁之前我们回顾一下一些单机锁,比如synchronized、Lock 等 锁的基本特性: 1.互斥性:同一时刻只能有一个节点访问共享资源,比如一个代码块,或者同一个订单同一时刻只能有一个线程去支付等。 2.可重入性: 允许一个已经获得锁的线程,在没有释

    2024年02月06日
    浏览(45)
  • 一文让你搞懂javascript如何实现继承

    一、本文想给你聊的东西包含一下几个方面:(仅限于es6之前的语法哈,因为es6里面class这用上了。。) 1.原型是啥?原型链是啥? 2.继承的通用概念。 3.Javascript实现继承的方式有哪些?   二、原型是啥?原型链是啥? 1.原型是函数本身的prototype属性。 首先js和java不

    2024年02月04日
    浏览(56)
  • Redis从入门到精通【进阶篇】之对象机制详解

    Redis 之所以是一款高性能和受大家欢迎的的内存数据库,不仅是它支持多种数据类型,包括字符串、列表、哈希、集合、有序集合等数据结构。而且这些数据类型都是由对象结构(redisObject) 和对应编码的数据结构组合而成。在 Redis 中,对象结构是所有数据类型的底层实现,它

    2024年02月12日
    浏览(36)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包