基于Redisson的联锁(MultiLock)

这篇具有很好参考价值的文章主要介绍了基于Redisson的联锁(MultiLock)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

基于Redis的分布式MultiLock对象允许对Lock对象进行分组并将它们作为单个锁进行处理。每个RLock对象可能属于不同的Redisson实例。

如果获取的Redisson实例MultiLock崩溃,那么它可能永远挂在获取状态。为了避免这种情况,Redisson维护了一个锁看门狗,它会在持有者Redisson实例处于活动状态时延长锁过期时间。默认情况下,锁定看门狗超时为30s,可以通过Config.lockWatchdogTimeout设置进行更改。作者的另外一篇文章有对看门狗机制有解析:基于Redisson的可重入分布式锁

leaseTime:在指定的时间间隔后锁将自动释放

MultiLock对象的行为符合java锁规范。这意味着只有锁的拥有者线程才能解锁它,否则会抛出IllegalMonitorStateException异常。否则考虑使用RSemaphore对象。

使用示例

普通使用示例:

RLock lock1 = redisson1.getLock("lock1");
RLock lock2 = redisson2.getLock("lock2");
RLock lock3 = redisson3.getLock("lock3");

RLock multiLock = anyRedisson.getMultiLock(lock1, lock2, lock3);

// traditional lock method
multiLock.lock();

// or acquire lock and automatically unlock it after 10 seconds
multiLock.lock(10, TimeUnit.SECONDS);

// or wait for lock aquisition up to 100 seconds 
// and automatically unlock it after 10 seconds
boolean res = multiLock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
try {
...
} finally {
multiLock.unlock();
}
}

Async接口使用的代码示例:

RLock lock1 = redisson1.getLock("lock1");
RLock lock2 = redisson2.getLock("lock2");
RLock lock3 = redisson3.getLock("lock3");

RLock multiLock = anyRedisson.getMultiLock(lock1, lock2, lock3);

RFuture<Void> lockFuture = multiLock.lockAsync();

// or acquire lock and automatically unlock it after 10 seconds
RFuture<Void> lockFuture = multiLock.lockAsync(10, TimeUnit.SECONDS);

// or wait for lock aquisition up to 100 seconds 
// and automatically unlock it after 10 seconds
RFuture<Boolean> lockFuture = multiLock.tryLockAsync(100, 10, TimeUnit.SECONDS);

lockFuture.whenComplete((res, exception) -> {
    // ...
    multiLock.unlockAsync();
});

Reactive接口使用的代码示例:

RedissonReactiveClient anyRedisson = redissonClient.reactive();

RLockReactive lock1 = redisson1.getLock("lock1");
RLockReactive lock2 = redisson2.getLock("lock2");
RLockReactive lock3 = redisson3.getLock("lock3");

RLockReactive multiLock = anyRedisson.getMultiLock(lock1, lock2, lock3);

Mono<Void> lockMono = multiLock.lock();

// or acquire lock and automatically unlock it after 10 seconds
Mono<Void> lockMono = multiLock.lock(10, TimeUnit.SECONDS);

// or wait for lock aquisition up to 100 seconds 
// and automatically unlock it after 10 seconds
Mono<Boolean> lockMono = multiLock.tryLock(100, 10, TimeUnit.SECONDS);

lockMono.doOnNext(res -> {
   // ...
})
.doFinally(multiLock.unlock())
.subscribe();

RxJava3接口使用的代码示例:

RedissonRxClient anyRedisson = redissonClient.rxJava();

RLockRx lock1 = redisson1.getLock("lock1");
RLockRx lock2 = redisson2.getLock("lock2");
RLockRx lock3 = redisson3.getLock("lock3");

RLockRx multiLock = anyRedisson.getMultiLock(lock1, lock2, lock3);

Completable lockRes = multiLock.lock();

// or acquire lock and automatically unlock it after 10 seconds
Completable lockRes = multiLock.lock(10, TimeUnit.SECONDS);

// or wait for lock aquisition up to 100 seconds 
// and automatically unlock it after 10 seconds
Single<Boolean> lockRes = multiLock.tryLock(100, 10, TimeUnit.SECONDS);

lockRes.doOnSuccess(res -> {
   // ...
})
.doFinally(multiLock.unlock())
.subscribe();
源码解析(RedissonMultiLock)
  • Redisson获取联锁
// 这里相对简单,就是创建了一个RLock集合,为了后续分别去获取锁
final List<RLock> locks = new ArrayList<>();
@Override
public RLock getMultiLock(RLock... locks) {
    return new RedissonMultiLock(locks);
}
public RedissonMultiLock(RLock... locks) {
    if (locks.length == 0) {
        throw new IllegalArgumentException("Lock objects are not defined");
    }
    this.locks.addAll(Arrays.asList(locks));
}
  • 加锁

leaseTime:指定加锁的时间。超过这个时间后锁便自动解开了。

为了方便我们的源码分析,假设我们的locks的size为6。leaseTime为2s

@Override
public void lock(long leaseTime, TimeUnit unit) {
    try {
        lockInterruptibly(leaseTime, unit);
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}
    
@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
    // 基础等待时间设置为连锁数量*1500,单位是毫秒 
    // 6*1500=9000ms 也就是9s
    long baseWaitTime = locks.size() * 1500;
    // 设置等待时间为-1
    long waitTime = -1;
    // 如果锁释放的时间为-1,就让等待时间等于基础等待时间9s
    // lock的无参方法默认leaseTime=-1
    if (leaseTime == -1) {
        waitTime = baseWaitTime;
    } else {
        // 如果锁的释放时间不为-1,把leaseTime转为毫秒
        leaseTime = unit.toMillis(leaseTime);
        // 把锁的释放时间传给等待时间,如果leaseTime=2s那么waitTime也等于2s
        waitTime = leaseTime;
        if (waitTime <= 2000) {
            // 也就是说leaseTime即使小于2s,waitTime也会被重置为2s
            waitTime = 2000;
        } else if (waitTime <= baseWaitTime) {
            // 如果leaseTime大于2s,并且小于9s,将重新设置等待时间,我们暂且还不知道这个等待时间做什么用。
            // 如果leaseTime等于6,那么waitTime=6,此时waitTime小于9s,重新设置waitTime
            // 将waitTime设置为大于等于3小于6的整数。(此处不明白看下面的解释)
            waitTime = ThreadLocalRandom.current().nextLong(waitTime/2, waitTime);
        } else {
            // 如果leaseTime大于2s而且大于9s(baseWaitTime),同样重新设置waitTime的值
            // 如果传入的leaseTime=10s,那么waitTime一开始也是10s,并且大于baseWaitTime的9s
            // 将waitTime设置为大于等于9s,小于10s的整数。
            waitTime = ThreadLocalRandom.current().nextLong(baseWaitTime, waitTime);
        }
    }

    while (true) {
        // 传入waitTime开始尝试获取锁了
        if (tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS)) {
            return;
        }
    }
}

ThreadLocalRandom.current().nextLong(origin, bound)是用于生成一个指定范围内的随机长整数。

具体解释如下:

ThreadLocalRandom.current() 返回当前线程的 ThreadLocalRandom 实例,用于生成随机数。

nextLong(origin, bound) 生成一个介于 origin(包含)和 bound(不包含)之间的随机长整型数。这意味着生成的随机数大于等于 origin,并且小于 bound。

此处为什么需要去修改waitTime的值,为什么还得整个随机数,使用baseWaitTime调整waitTime的作用是什么?

  • 尝试获取锁

waitTime:表示尝试获取锁的等待时间。它指定了在尝试获取锁时最长的等待时间。

leaseTime: 指定加锁的时间。超过这个时间后锁便自动解开了。

TimeUnit:时间单位文章来源地址https://www.toymoban.com/news/detail-677662.html

// 假设传入的waitTime=2s leaseTime=2s
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
		// 定义了一个新的释放时间newLeaseTime=-1
        long newLeaseTime = -1;
    	// 如果传入了时间的tryLock,leaseTime就不等于-1,不传默认值为-1
        if (leaseTime != -1) {
            // 将新的锁释放时间设置为waitTime的2倍,单位是毫秒,也就是4000ms
            newLeaseTime = unit.toMillis(waitTime)*2;
        }
        // 获取当前时间(毫秒)
        long time = System.currentTimeMillis();
    	// remain==保持,先翻译为保持时间,定义为-1
        long remainTime = -1;
        if (waitTime != -1) {
            // 保持时间设置为waitTime,2000ms
            remainTime = unit.toMillis(waitTime);
        }
    	// calcLockWaitTime(remainTime);-->return Math.max(remainTime / locks.size(), 1);
    	// 300ms=lockWaitTime
        long lockWaitTime = calcLockWaitTime(remainTime);
        // return 0
        int failedLocksLimit = failedLocksLimit();
        List<RLock> acquiredLocks = new ArrayList<>(locks.size());
    	// 循环获取锁
        for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
            // 获取到的redisson实例生成的锁
            RLock lock = iterator.next();
            // 锁获取标识
            boolean lockAcquired;
            try {
                if (waitTime == -1 && leaseTime == -1) {
                    lockAcquired = lock.tryLock();
                } else {
                    // awaitTime=300ms
                    long awaitTime = Math.min(lockWaitTime, remainTime);
                    // 直接去获取锁,返回true or false
                    lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
                }
            } catch (RedisResponseTimeoutException e) {
                // 如果发生了RedisResponseTimeoutException,会先解锁。因为这个时候不确定是否加锁成功了,所以解锁设置标识为失败。
                unlockInner(Arrays.asList(lock));
                lockAcquired = false;
            } catch (Exception e) {
                // 其他异常设置标识为false
                lockAcquired = false;
            }
            if (lockAcquired) {
                // 如果加锁成功 放入集合中
                acquiredLocks.add(lock);
            } else {
                // 6-当前成功的数量=0,直接退出循环,也就是说超过了最大的失败限制
                // 这里RedissonRedLock有重写,红锁有自己的规则
                if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {
                    break;
                }
				// failedLocksLimit==0,那么只要失败就进入这个逻辑
                if (failedLocksLimit == 0) {
                    // 会把获取到锁的一次性解锁
                    unlockInner(acquiredLocks);
                    if (waitTime == -1 && leaseTime == -1) {
                        return false;
                    }
                    // 重置failedLocksLimit=0
                    failedLocksLimit = failedLocksLimit();
                    // 清空获取到锁的集合
                    acquiredLocks.clear();
                    // reset iterator
                    while (iterator.hasPrevious()) {
                        iterator.previous();
                    }
                } else {
                    // RedissonRedLock才会进入这个逻辑
                    failedLocksLimit--;
                }
            }
			// 如果remainTime不为-1
            // remainTime=2000ms
            if (remainTime != -1) {
                // 查看remainTime的剩余时间
                remainTime -= System.currentTimeMillis() - time;
                // 重置time
                time = System.currentTimeMillis();
                // 如果保持时间也就是之前的waitTime小于0,也就是说超过了尝试获取锁时最长的等待时间,释放所有已获得的锁,并返回false,加锁失败
                if (remainTime <= 0) {
                    unlockInner(acquiredLocks);
                    return false;
                }
            }
        }
		// 如果没有超过尝试获取锁时最长等待时间,并且leaseTime不为-1
        if (leaseTime != -1) {
            // 创建了一个RFuture集合
            List<RFuture<Boolean>> futures = new ArrayList<>(acquiredLocks.size());
            for (RLock rLock : acquiredLocks) {
                //为每个锁设置过期时间,是一个异步的操作
                RFuture<Boolean> future = ((RedissonLock) rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
                futures.add(future);
            }
            
            for (RFuture<Boolean> rFuture : futures) {
                // 阻塞当前线程,同步等待每个异步操作的结果
                rFuture.syncUninterruptibly();
            }
        }
        
        return true;
    }

到了这里,关于基于Redisson的联锁(MultiLock)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Redis之Redisson原理详解

    Redisson 顾名思义, Redis 的儿子,本质上还是 Redis 加锁,不过是对 Redis 做了很多封装,它不仅提供了一系列的分布式的 Java 常用对象,还提供了许多分布式服务。 Redisson 和 Jedis 、 Lettuce 有什么区别? Redisson 和它俩的区别就像一个用鼠标操作图形化界面,一个用命令行操作文

    2024年02月09日
    浏览(43)
  • Redis 整合中 Redisson 的使用

    大家好 , 我是苏麟 , 今天带来 Redisson 使用 . 官方文档 :  GitHub - redisson/redisson: Redisson - Easy Redis Java client with features of In-Memory Data Grid. Sync/Async/RxJava/Reactive API. Over 50 Redis based Java objects and services: Set, Multimap, SortedSet, Map, List, Queue, Deque, Semaphore, Lock, AtomicLong, Map Reduce, Bloom filter,

    2024年02月11日
    浏览(45)
  • Redis实战——Redisson分布式锁

    目录 1 基于Redis中setnx方法的分布式锁的问题 2 Redisson         2.1 什么是Redisson         2.2 Redisson实现分布式锁快速入门         2.3 Redisson 可重入锁原理                 什么是可重入锁?                 Redisson中又是如何实现的呢?         2

    2024年02月15日
    浏览(49)
  • Redis系列之客户端Redisson

    官方推荐的客户端,支持Redis单实例、Redis哨兵、Redis Cluster、Redis master-slave等各种部署架构。 GitHub, 功能: 分布式锁 使用Redisson提供的分布式锁的一个最常见场景,应用部署为多个节点,然后使用Spring提供的原生@Scheduled任务调度功能;而没有使用xxl-job等轻量级分布式任务调

    2024年02月09日
    浏览(49)
  • Redis客户端Redisson使用示例

    Redisson作为Java连接Redis的客户端,提供了连接、操作Redis的方法,还提供分布式锁、红锁等并发工具。Redisson除了提供同步接口外,还提供了异步(Async)、反射式(Reactive)和RxJava2标准的接口。Redisson会序列化Java对象然后保存到redis,所以通过redis命令行设置的值,Redisson来获

    2024年02月06日
    浏览(61)
  • redis的分布式事务-redisson

    Redisson分布式锁是一种基于redis实现的分布式锁,它利用redis的setnx命令实现分布式锁的互斥访问。同时还支持锁的自动续期功能,可以避免因为某个进程崩溃或者网络故障导致锁无法释放的情况。 只要线程一加锁成功,就会启动一个watch dog看门狗,它是一个后台线程,会每隔

    2024年02月13日
    浏览(47)
  • 【征服redis5】redis的Redisson客户端

    目录 1 Redisson介绍 2. 与其他Java Redis客户端的比较 3.基本的配置与连接池 3.1 依赖和SDK 3.2 配置内容解析 4 实战案例:优雅的让Hash的某个Field过期 5 Redisson的强大功能 Redisson 最初由 GitHub 用户 “mrniko” 创建,并在 Apache 2.0 许可证下发布。它的目标是提供一组强大的工具和 API,

    2024年01月17日
    浏览(50)
  • Redis实战之Redisson使用技巧详解

    一、摘要 什么是 Redisson ?来自于官网上的描述内容如下! Redisson 是一个在 Redis 的基础上实现的 Java 驻内存数据网格客户端(In-Memory Data Grid)。它不仅提供了一系列的 redis 常用数据结构命令服务,还提供了许多分布式服务,例如分布式锁、分布式对象、分布式集合、分布式

    2023年04月09日
    浏览(33)
  • 【Spring Boot 3】【Redis】集成Redisson

    软件开发是一门实践性科学,对大多数人来说,学习一种新技术不是一开始就去深究其原理,而是先从做出一个可工作的DEMO入手。但在我个人学习和工作经历中,每次学习新技术总是要花费或多或少的时间、检索不止一篇资料才能得出一个可工作的DEMO,这占用了我大量的时

    2024年01月23日
    浏览(50)
  • redis缓存击穿,redisson分布式锁,redis逻辑过期

    什么是缓存击穿: 缓存击穿是指在高并发环境下,某个热点数据的缓存过期,导致大量请求同时访问后端存储系统,引起系统性能下降和后端存储压力过大的现象。 解决方案: 1. redisson分布式锁 本质上是缓存重建的过程中,大量的请求访问到后端的数据库导致数据库压力过

    2024年02月06日
    浏览(53)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包