Redisson实现分布式锁原理
自己实现锁续命
- 在 controller 里开一个 线程 (可以为 守护线程)
- 每10秒,判断一个 这个 UUID是否存在,如果 存在,重置为 30秒。
- 如果不存在,守护线程 也结束。
基本的key value
atguiguLock 锁的名字,是redis的 key
hasHkey为:
82a218d0-27c8-4028-a8ca-dfa514da61c7:71 #UUID:线程ID
value为:1。如果再次调用lock,会变成2
基本的使用
- setIfAbsent存在不设置
RLock lock = redisson.getLock(REDIS_LOCK);
lock.lock(30, TimeUnit.SECONDS);
//如果是被锁定的
if (lock.isLocked()) {
//如果是 当前线程持有的话
if (lock.isHeldByCurrentThread()) {
//才进行解锁
lock.unlock();
}
}
//1 声明一个uuid ,将做为一个value 放入我们的key所对应的值中
String uuid = UUID.randomUUID().toString();
//2 定义一个锁:lua 脚本可以使用同一把锁,来实现删除!
// 访问skuId 为25号的商品 100008348542
String locKey = "lock:" + "25"; // 锁住的是每个商品的数据
// 3 获取锁
Boolean lock = redisTemplate.opsForValue()
.setIfAbsent(locKey, uuid, 3, TimeUnit.SECONDS);
//锁的key,锁的值uuid,锁3秒
//如果这个锁存在,就不设置
16384
Redis 集群没有使用一致性hash, 而是引入了哈希槽的概念。
Redis Cluster 采用虚拟哈希槽分区,所有的键根据哈希函数映射到 0 ~ 16383 整数槽内,每个key通过CRC16校验后对16384取模来决定放置哪个槽(Slot),每一个节点负责维护一部分槽以及槽所映射的键值数据。
计算公式:slot = CRC16(key) & 16383。
这种结构很容易添加或者删除节点,并且无论是添加删除或者修改某一个节点,都不会造成集群不可用的状态。使用哈希槽的好处就在于可以方便的添加或移除节点。
- 当需要增加节点时,只需要把其他节点的某些哈希槽挪到新节点就可以了;
- 当需要移除节点时,只需要把移除节点上的哈希槽挪到其他节点就行了。
大神的回答
1. 定位 Master 的节点:16384 取模
- 这里应该是 通过 myLock 决定 存到 哪个地方
- 客户端线程,在 底层是 如何实现 加锁的
第一步,先定位 Master 的节点,
通过key,就是 redisson.getLock(“myLock”)的字符串参数,
- myLock 计算出 循环冗余校验码的值,
- 再用 该 循环冗余 校验码对 16384 取模,得到 hash slot
- 通过这个 hash solt,定位redis-cluster的集群当中的master
的节点
2. 加锁:UUID:ThreadID设置为1
第二步:加锁
- 加锁底层逻辑是通过Lua脚本来实现的,
- 如果客户端线程第一次去加锁的话,会在key对应的hash数据结构当中,添加线程标识,UUID:ThreadID 1
- 指定该线程当前对这个key加锁一次了,并设置锁的过期时间为30秒
客户段线程是如何维持加锁的
- 当加锁成功以后,此时会对加锁的结果设置一个监听器
- 如果监听到加锁成功了,也就是返回的结果为null
- 就会在后台通过watchdog开门狗机制,启动一个后台定时任务
- 每隔10秒执行一次检查,如果当前key依然存在,就重置key的存活时间为30秒
- 维持加锁,底层就是通过后台这样一个,线程定时刷新存活时间维持的
如何实现可重入锁:UUID:ThreadID 的值 +1
相同的客户端线程是如何实现可重入加锁的
- 第一次加锁时,会往key对应的hash数据结构当中,设置UUID:ThreadID 1,表示当前线程对key加锁一次
- 如果相同的线程来再次对这个key加锁,只需要将UUID:ThreadlD持有锁的次数加1即可,就为UUID:ThreadID 2了
- redisson底层就是通过这样的数据结构,来表示重入加锁的语义的
其他线程加锁失败时:自旋锁
- 底层是如何实现阻塞的,通过key对应的hash结构当中的UUID:threadid判断是否为当前线程ID
- 如果不是,则线程加锁失败,如果没有设置锁取所超时时间,
- 此时就会进入一个while的死循环中,一直尝试加锁直到加锁成功才会返回
宕机锁是如何释放的:没看门狗了
- 相应的watchdog后台定时任务,当然已经没了
- 此时就无法对key进行定时续期,那么当指定存活时间过后,
- key就会自动失效,锁当然也就自动释放了
如何主动释放持有的锁:lua脚本,扣重入次数
- 底层同样也是通过执行Lua脚本的方式,
- 如果判断当前释放的key存在,并且在key的hash结构当中
- 存在当前线程的加锁信息,那么此时就会减扣当前线程对这个key的重入锁次数
- 减扣线程的重入锁次数之后,如果当前线程在这个key的重入次数为0,此时就会直接释放锁,
- 如果当前线程,在这个key中重入锁次数依然大于0,此时就直接重置一下,Key的续期时间为30秒
- 然后 value - 1 (减去 1)
锁超时的机制:不指定超时 一直获取
7:客户端尝试获取锁超时时间的机制,底层是如何实现的
- 如果在加锁时就指定呢,尝试获取锁超时时间,
- 如果获取所失败,此时就不会永无止境,的在while循环里面一直等待
- 而是根据你指定的锁超时时间,在这段时间范围内获取不到锁,那么就会标记为获取所失败
- 直接返回 false
锁超时自动释放:指定超时的没看门狗
8客户端锁超时自动释放机制在底层又是如何实现的文章来源:https://www.toymoban.com/news/detail-406105.html
- 如果在加锁时指定的锁超时时间,那么就算你获取所成功了
- 也不会开启watch dog 他定时任务了,
- 此时直接 就将当前持有的这把锁的过期时间,设置为你指定的超时时间
- 那么当你指定的时间到了之后,Key失效被删除了,key对应的锁相应的也就自动释放了
slot
英
/slɒt/
n.
(可投入东西的)狭长孔,狭槽;
v.
把……投入窄孔中,把……放到指定位置;为……安排时间,安置;
* 我的整理
加锁核心方法
lock 和 tryAcquireAsync
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
long threadId = Thread.currentThread().getId();
Long ttl = this.tryAcquire(-1L, leaseTime, unit, threadId);
}
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1L) {
return this.tryLockInnerAsync(waitTime, leaseTime, unit,
threadId, RedisCommands.EVAL_LONG);
} else {
}
}
acquire
英
/əˈkwaɪə(r)/
v.
获得,得到;学到,习得;患上(疾病);逐渐具有,开始学会
最终调用tryLockInnerAsync
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
//过期时间为 30000毫秒
this.internalLockLeaseTime = unit.toMillis(leaseTime);
return this.evalWriteAsync(this.getName(),
LongCodec.INSTANCE, command,
"if下面的 lua脚本",
Collections.singletonList(this.getName()),
this.internalLockLeaseTime, //30000毫秒
this.getLockName(threadId)
);
}
//锁名字:就是 ID:线程名。id默认是uuid
protected String getLockName(long threadId) {
return this.id + ":" + threadId;
}
看门狗的超时
//初始化 时间为30秒。3万毫秒
public Config() {
this.transportMode = TransportMode.NIO;
this.lockWatchdogTimeout = 30000L;
}
加锁的 lua 脚本
基本命令学习
PEXPIRE key milliseconds 设置key的有效时间以毫秒为单位
#pexpire
exists num11 //不存在,返回为0
(integer) 0
PTTL key 获取key的有效毫秒数
#pttl
基本结构
key为:atguiguLock
hashKey为:a4fafbec-ad68-4ee1-9420-ed22e547083f:71
hash值为:1文章来源地址https://www.toymoban.com/news/detail-406105.html
核心脚本 hincrby 加锁
// 如果 keys[1](lockKey) == 0,说明不存在,执行这些逻辑。
// keys[1] 不存在,使用hincrby命令,设置redis的key为:lockKey。
// 设置 hash的key为:UUID:线程名ID,hash的值为:1
//早起的版本为 hset设置。新版为:hincrby
if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1);
//调用 过期时间为 参数1。30万毫秒,30秒
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
//如果 这个 hash 存在(即:hexists 返回1)
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
//使用 hincrby,增加1
redis.call('hincrby', KEYS[1], ARGV[2], 1);
//重置过期时间为 30秒
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
return redis.call('pttl', KEYS[1]);
续命的方法
onComplete 和 scheduleExpirationRenewal
- RFuture执行完毕后 回调
- 嵌套调用 TimerTask 是 netty包下的类,
- 续命完成后调用 onComplete,方法里 又调用了自己。
RFuture<Long> ttlRemainingFuture =
this.tryLockInnerAsync(
waitTime,
this.commandExecutor.getConnectionManager().getCfg()
.getLockWatchdogTimeout(),
TimeUnit.MILLISECONDS,
threadId,
RedisCommands.EVAL_LONG
);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e == null) {
if (ttlRemaining == null) {
this.scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
private void scheduleExpirationRenewal(long threadId) {
RedissonLock.ExpirationEntry entry = new RedissonLock.ExpirationEntry();
RedissonLock.ExpirationEntry oldEntry =
(RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP
.putIfAbsent(this.getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
//续命逻辑在这里
this.renewExpiration();
}
}
续命外层方法renewExpiration
private void renewExpiration() {
RedissonLock.ExpirationEntry ee =
(RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP
.get(this.getEntryName());
if (ee != null) {
Timeout task = this.commandExecutor.getConnectionManager()
.newTimeout(new TimerTask() {
public void run(Timeout timeout) throws Exception {
RedissonLock.ExpirationEntry ent =
(RedissonLock.ExpirationEntry)
RedissonLock.EXPIRATION_RENEWAL_MAP
.get(RedissonLock.this.getEntryName());
if (ent != null) {
Long threadId = ent.getFirstThreadId();
if (threadId != null) {
RFuture<Boolean> future =
RedissonLock.this.renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
RedissonLock.log.error("Can't update lock "
+ RedissonLock.this.getName()
+ " expiration", e);
} else {
if (res) {
//锁续命完毕,执行 onComplete 方法,又调用 他自己。
RedissonLock.this.renewExpiration();
}
}
});
}
}
}
//延迟执行的参数,为 超时时间 / 3L
}, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
}
最底层 调用lua脚本续命
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE,
RedisCommands.EVAL_BOOLEAN,
"if 续命的脚本",
Collections.singletonList(this.getName()),
this.internalLockLeaseTime,
this.getLockName(threadId));
}
续命的脚本 pexpire
- 如果这把锁还在,重新设置 pexpire
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1)
then redis.call('pexpire', KEYS[1], ARGV[1]);
return 1; end;
return 0;
解锁的方法unlockInnerAsync
核心代码
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE,
RedisCommands.EVAL_BOOLEAN, "if 下面的lua脚本",
Arrays.asList(this.getName(),
this.getChannelName()),
LockPubSub.UNLOCK_MESSAGE,
this.internalLockLeaseTime,
this.getLockName(threadId));
}
解锁的 lua脚本
- 会发布一个解锁的消息
- 发布的通道为:redisson_lock__channel
- 注意 第二个是 __双下划线
// 如果 这个 hashKey 不存在,直接返回0
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end;
//否则,这个 key存在,就 减去1,接收返回值,
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
//如果返回值 大于0,就续期30秒。
if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0;
//如果这个 结果 不大于0,就删除,并发发布一个解锁的消息
else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]);
return 1; end;
return nil;
PUBLISH channel message
发布一条消息到频道
this.getChannelName() 发布到:
LockPubSub.UNLOCK_MESSAGE,
public static final Long UNLOCK_MESSAGE = 0L;
//最终返回:redisson_lock__channel:{atguiguLock}
String getChannelName() {
return prefixName("redisson_lock__channel", this.getName()); //atguiguLock
}
总结 敲黑板
背会 RFuture onComplete BiConsumer
exists hincrby pexpire
Redisson 实现原理
- lock 加锁 使用lua脚本
- 加锁成功,放入的是一个 redis key为: 你设置的锁名字,value为 hash
-
hash的 key为:uuid:线程id,第一次 过来这个线程,值为1
- 这个值为1,就是做 可重入锁,如果当前线程 再次加锁,值变为2
- 默认设置超时时间为 30秒
- 启动看门狗,看门狗每 10秒 执行一次(超时时间/3),如果当前key还存在 就续期 成30秒。
-
hash的 key为:uuid:线程id,第一次 过来这个线程,值为1
- 加锁失败,进入自旋 抢锁逻辑。
- 加锁成功,放入的是一个 redis key为: 你设置的锁名字,value为 hash
- unlock解锁 依然使用lua脚本
- lock 加锁,如果设置了 超时间的 ,就没有看门够机制,到了超时时间,会自动释放锁。
具体代码怎么做的
- 使用 RFuture 类,
- 是 Redisson 自己写的一个接口,核心 有个回调方法 onComplete,
- 使用了 BiConsumer (函数式消费者 接口,2个参数的)
- 所以 在执行完毕后 会执行 onComplete 回调方法,方法里执行看门狗逻辑,
- 看门狗里的代码 最终依然是 RFuture 类,每次正常续期后,睡10秒后,在 onComplete 再次调用自己,决定是否再次续期。
RFuture<V> extends Future<V>, CompletionStage{
void onComplete(BiConsumer<? super V, ? super Throwable> var1);
}
具体lua脚本如何做的
加锁
- 加锁用 exists 命令判断 为0是不存在,如果 不存在 才加锁。加锁的命令为 hincrby,早起的版本3.6左右用 hset,3.13 用 hincrby
- 使用命令 pexpire 设置过期的毫秒,默认为 3万,即:30秒。
- 使用 hexists 判断 当前 redis 锁的hash key否存在(redis key是锁名字,里面hash 的 key 为 UUID:线程ID ),为1是存在。
- 如果 存在,就把这个 hash的 value +1,依然使用 hincrby
- 最后使用 pexpire ,把当前值,重新设置为:30秒。
- 脚本的返回为 pttl 当前 这个redis 剩余的毫秒数
//加锁的逻辑
//早起的版本为 hset设置。新版为:hincrby
if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1);
//调用 过期时间为 参数1。30万毫秒,30秒
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
//如果 这个 hash 存在(即:hexists 返回1)
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
//使用 hincrby,增加1
redis.call('hincrby', KEYS[1], ARGV[2], 1);
//重置过期时间为 30秒
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
return redis.call('pttl', KEYS[1]);
续命
- 续命:lua脚本使用的 hexists ,如果 redis 里这个 hash存在,pexpire 续命为 3万毫秒。
//续命的逻辑
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1)
then redis.call('pexpire', KEYS[1], ARGV[1]);
return 1; end;
return 0;
解锁
- 解锁:依然使用 hexists,如果 这个redis hash的key 存在,在使用 hincrby 增加 -1,就是 减去1
- 判断这个值 减去1后 是否 >0,如果 > 0,就 重新设置为 3万毫秒
- 否则 就删除这条 redis,并发送一个 解锁的消息。发布内容为 0,通道为:redisson_lock__channel:{你的锁key}
//释放锁的逻辑
// 如果 这个 hashKey 不存在,直接返回0
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end;
//否则,这个 key存在,就 减去1,接收返回值,
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
//如果返回值 大于0,就续期30秒。
if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0;
//如果这个 结果 不大于0,就删除,并发发布一个解锁的消息
else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]);
return 1; end;
return nil;
PUBLISH channel message
发布一条消息到频道
this.getChannelName() 发布到:
LockPubSub.UNLOCK_MESSAGE, public static final Long UNLOCK_MESSAGE = 0L;
//最终返回:redisson_lock__channel:{atguiguLock}
String getChannelName() {
return prefixName("redisson_lock__channel", this.getName()); //atguiguLock
}
到了这里,关于【面试 分布式锁详细解析】续命 自旋锁 看门狗 重入锁,加锁 续命 解锁 核心源码,lua脚本解析,具体代码和lua脚本如何实现的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!