Redis分布式可重入锁实现方案

这篇具有很好参考价值的文章主要介绍了Redis分布式可重入锁实现方案。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

前言

在单进程环境下,要保证一个代码块的同步执行,直接用synchronized 关键字或ReetrantLock 即可。在分布式环境下,要保证多个节点的线程对代码块的同步访问,就必须要用到分布式锁方案。
分布式锁实现方案有很多,有基于关系型数据库行锁实现的;有基于ZooKeeper临时顺序节点实现的;还有基于 Redis setnx 命令实现的。本文介绍一下基于 Redis 实现的分布式锁方案。

理解分布式锁

实现分布式锁有几个要求

  • 互斥性:任意时刻,最多只会有一个客户端线程可以获得锁
  • 可重入:同一客户端的同一线程,获得锁后能够再次获得锁
  • 避免死锁:客户端获得锁后即使宕机,后续客户端也可以获得锁
  • 避免误解锁:客户端A加的锁只能由A自己释放
  • 释放锁通知:持有锁的客户端释放锁后,最好可以通知其它客户端继续抢锁
  • 高性能和高可用

Redis 服务端命令是单线程串行执行的,天生就是原子的,并且支持执行自定义的 lua 脚本,功能上更加强大。
关于互斥性,我们可以用 setnx 命令实现,Redis 可以保证只会有一个客户端 set 成功。但是由于我们要实现的是一个分布式的可重入锁,数据结构得用 hash,用客户端ID+线程ID作为 field,value 记作锁的重入次数即可。
关于死锁,代码里建议把锁的释放写在 finally 里面确保一定执行,针对客户端抢到锁后宕机的场景,可以给 redis key 设置一个超时时间来解决。
关于误解锁,客户端在释放锁时,必须判断 field 是否当前客户端ID以及线程ID一致,不一致就不执行删除,这里需要用到 lua 脚本判断。
关于释放锁通知,可以利用 Redis 发布订阅模式,给每个锁创建一个频道,释放锁的客户端负责往频道里发送消息通知等待抢锁的客户端。
最后关于高性能和高可用,因为 Redis 是基于内存的,天生就是高性能的。但是 Redis 服务本身一旦出现问题,分布式锁也就不可用了,此时可以多部署几台独立的示例,使用 RedLock 算法来解决高可用的问题。

设计实现

首先我们定义一个 RedisLock 锁对象的抽象接口,只有尝试加锁和释放锁方法

public interface RedisLock {

    boolean tryLock();

    boolean tryLock(long waitTime, long leaseTime, TimeUnit unit);

    void unlock();
}

然后提供一个默认实现 DefaultRedisLock

public class DefaultRedisLock implements RedisLock {

    // 客户端ID UUID
    private final String clientId;
    private final StringRedisTemplate redisTemplate;
    // 锁频道订阅器 接收释放锁通知
    private final LockSubscriber lockSubscriber;
    // 加锁的key
    private final String lockKey;
}

关于tryLock() ,首先执行lua脚本尝试获取锁,如果加锁失败则返回其它客户端持有锁的过期时间,客户端订阅锁对应的频道,然后sleep,直到收到锁释放的通知再继续抢锁。最终不管有没有抢到锁,都会在 finally 取消频道订阅。

@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) {
    final long timeout = System.currentTimeMillis() + unit.toMillis(waitTime);
    final long threadId = Thread.currentThread().getId();
    Long ttl = tryAcquire(leaseTime, unit, threadId);
    if (ttl == null) {
        return true;
    }
    if (System.currentTimeMillis() >= timeout) {
        return false;
    }
    final Semaphore semaphore = lockSubscriber.subscribe(getChannel(lockKey), threadId);
    try {
        while (true) {
            if (System.currentTimeMillis() >= timeout) {
                return false;
            }
            ttl = tryAcquire(leaseTime, unit, threadId);
            if (ttl == null) {
                return true;
            }
            if (System.currentTimeMillis() >= timeout) {
                return false;
            }
            semaphore.tryAcquire(timeout - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        lockSubscriber.unsubscribe(getChannel(lockKey), threadId);
    }
    return false;
}

tryAcquire() 就是执行lua脚本来加锁,解释一下这段脚本的逻辑:首先判断 lockKey 是否存在,不存在则直接设置 lockKey并且设置过期时间,返回空,表示加锁成功。存在则判断 field 是否和当前客户端ID+线程ID一致,一致则代表锁重入,递增一下value即可,不一致代表加锁失败,返回锁的过期时间

private Long tryAcquire(long leaseTime, TimeUnit timeUnit, long threadId) {
    return redisTemplate.execute(RedisScript.of(
                    "if (redis.call('exists', KEYS[1]) == 0) then " +
                            "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                            "return nil; end;" +
                            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                            "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                            "return nil; end;" +
                            "return redis.call('pttl', KEYS[1]);", Long.class), Collections.singletonList(lockKey),
            String.valueOf(timeUnit.toMillis(leaseTime)), getLockName(threadId));
}

lockName是由客户端ID和线程ID组成的:

private String getLockName(long threadId) {
    return clientId + ":" + threadId;
}

如果加锁失败,客户端会尝试订阅对应的频道,名称规则是:

private String getChannel(String lockKey) {
    return "__lock_channel__:" + lockKey;
}

订阅方法是LockSubscriber#subscribe ,同一个频道无需订阅多个监听器,所以用一个 Map 记录。订阅成功以后,会返回当前线程对应的一个 Semaphore 对象,默认许可数是0,当前线程会调用Semaphore#tryAcquire 等待许可数,监听器在收到锁释放消息后会给 Semaphore 对象增加许可数,唤醒线程继续抢锁。

@Component
public class LockSubscriber {

    @Autowired
    private RedisMessageListenerContainer messageListenerContainer;
    private final Map<String, Map<Long, Semaphore>> channelSemaphores = new HashMap<>();
    private final Map<String, MessageListener> listeners = new HashMap<>();
    private final StringRedisSerializer serializer = new StringRedisSerializer();

    public synchronized Semaphore subscribe(String channelName, long threadId) {
        MessageListener old = listeners.put(channelName, new MessageListener() {
            @Override
            public void onMessage(Message message, byte[] pattern) {
                String channel = serializer.deserialize(message.getChannel());
                String ignore = serializer.deserialize(message.getBody());
                Map<Long, Semaphore> semaphoreMap = channelSemaphores.get(channel);
                if (semaphoreMap != null && !semaphoreMap.isEmpty()) {
                    semaphoreMap.values().stream().findFirst().ifPresent(Semaphore::release);
                }
            }
        });
        if (old == null) {
            messageListenerContainer.addMessageListener(listeners.get(channelName), new ChannelTopic(channelName));
        }
        Semaphore semaphore = new Semaphore(0);
        Map<Long, Semaphore> semaphoreMap = channelSemaphores.getOrDefault(channelName, new HashMap<>());
        semaphoreMap.put(threadId, semaphore);
        channelSemaphores.put(channelName, semaphoreMap);
        return semaphore;
    }

    public synchronized void unsubscribe(String channelName, long threadId) {
        Map<Long, Semaphore> semaphoreMap = channelSemaphores.get(channelName);
        if (semaphoreMap != null) {
            semaphoreMap.remove(threadId);
            if (semaphoreMap.isEmpty()) {
                MessageListener listener = listeners.remove(channelName);
                if (listener != null) {
                    messageListenerContainer.removeMessageListener(listener);
                }
            }
        }
    }
}

对于 unlock,就只是一段 lua 脚本,这里解释一下:判断当前客户端ID+线程ID 这个 field 是否存在,存在说明是自己加的锁,可以释放。不存在说明不是自己加的锁,无需做任何处理。因为是可重入锁,每次 unlock 都只是递减一下 value,只有当 value 等于0时才是真正的释放锁。释放锁的时候会 del lockKey,再 publish 发送锁释放通知,让其他客户端可以继续抢锁。

@Override
public void unlock() {
    long threadId = Thread.currentThread().getId();
    redisTemplate.execute(RedisScript.of(
                    "if (redis.call('hexists', KEYS[1], ARGV[1]) == 0) then " +
                            "return nil;end;" +
                            "local counter = redis.call('hincrby', KEYS[1], ARGV[1], -1); " +
                            "if (counter > 0) then " +
                            "return 0; " +
                            "else " +
                            "redis.call('del', KEYS[1]); " +
                            "redis.call('publish', KEYS[2], 1); " +
                            "return 1; " +
                            "end; " +
                            "return nil;", Long.class), Arrays.asList(lockKey, getChannel(lockKey)),
            getLockName(threadId));
}

最后,我们需要一个 RedisLockFactory 来创建锁对象,它同时会生成客户端ID

@Component
public class RedisLockFactory {

    private static final String CLIENT_ID = UUID.randomUUID().toString();
    @Autowired
    private StringRedisTemplate redisTemplate;
    @Autowired
    private LockSubscriber lockSubscriber;

    public RedisLock getLock(String lockKey) {
        return new DefaultRedisLock(CLIENT_ID, redisTemplate, lockSubscriber, lockKey);
    }
}

至此,一个基于 Redis 实现的分布式可重入锁就完成了。

尾巴

目前这个版本的分布式锁,保证了互斥性、可重入、避免死锁和误解锁、实现了释放锁通知,但是并没有高可用的保证。如果 Redis 是单实例部署,就会存在单点问题,Redis 一旦故障,整个分布式锁将不可用。如果 Redis 是主从集群模式部署,虽然有主从自动切换,但是 Master 和 Slave 之间的数据同步是存在延迟的,分布式锁可能会出现问题。比如:客户端A加锁成功,lockKey 写入了 Master,此时 Master 宕机,其它 Slave 升级成了 Master,但是还没有同步到 lockKey,客户端B来加锁也会成功,这就没有保证互斥性。针对这个问题,可以参考 RedLock 算法,部署多个单独的 Redis 示例,只要一半以上的Redis节点加锁成功就算成功,来尽可能的保证服务高可用。文章来源地址https://www.toymoban.com/news/detail-826911.html

到了这里,关于Redis分布式可重入锁实现方案的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Redisson 分布式锁可重入的原理

    目录 1. 使用 Redis 实现分布式锁存在的问题 2. Redisson 的分布式锁解决不可重入问题的原理 不可重入:同一个线程无法两次 / 多次获取锁 举例 method1 执行需要获取锁 method2 执行也需要(同一把)锁 如果 method1 中调用了 method2,就会出现死锁的情况 method1 执行的过程是同一个线

    2024年01月25日
    浏览(31)
  • 【面试 分布式锁详细解析】续命 自旋锁 看门狗 重入锁,加锁 续命 解锁 核心源码,lua脚本解析,具体代码和lua脚本如何实现

    自己实现锁续命 在 controller 里开一个 线程 (可以为 守护线程) 每10秒,判断一个 这个 UUID是否存在,如果 存在,重置为 30秒。 如果不存在,守护线程 也结束。 基本的key value 基本的使用 setIfAbsent存在不设置 16384 Redis 集群没有使用一致性hash, 而是引入了哈希槽的概念。 R

    2023年04月09日
    浏览(30)
  • curator实现的zookeeper可重入锁

    Curator是一个Apache开源的ZooKeeper客户端库,它提供了许多高级特性和工具类,用于简化在分布式环境中使用ZooKeeper的开发。其中之一就是可重入锁。 Curator提供了 InterProcessMutex 类来实现可重入锁。以下是使用Curator实现ZooKeeper可重入锁的示例: import org.apache.curator.framework.Curato

    2024年02月15日
    浏览(31)
  • Java入门-可重入锁

    什么是可重入锁? 当线程获取某个锁后,还可以继续获取它,可以递归调用,而不会发生死锁; 可重入锁案例 程序可重入加锁 A.class,没有发生死锁。 sychronized锁 运行结果 ReentrantLock 运行结果 如何保证可重入 当一个线程访问同步块并获取锁时,会在对象头和栈帧中的锁记录里

    2024年02月22日
    浏览(31)
  • java~理解可重入锁

    在Java中,可重入锁(Reentrant Lock)是一种同步机制,允许线程在持有锁的情况下再次获取该锁,而不会被自己所持有的锁所阻塞。也就是说,一个线程可以多次获得同一个锁,而不会出现死锁的情况。 可重入锁在多线程编程中非常有用,它允许线程在访问共享资源时多次获取

    2024年02月09日
    浏览(33)
  • 并发编程之可重入锁ReentrantLock

    大家都知道在并发编程中一般会用到多线程技术,多线程技术可以大大增加系统QPS/TPS。但是在一些特殊的业务场景下我们需要限制线程的并发数目,比如秒杀系统、多种商品金额叠加运算等等都是需要限制线程数量。特别是在分布式微服务架构,多线程同步问题尤为明显。一

    2023年04月25日
    浏览(27)
  • 【Java | 多线程】可重入锁的概念以及示例

    可重入锁(又名递归锁)是一种特殊类型的锁,它允许 同一个线程在获取锁后再次进入该锁保护的代码块或方法,而不需要重新获取锁 。 说白了,可重入锁的特点就是同一个线程可以多次获取同一个锁,而不会因为之前已经获取过锁而阻塞。 可重入锁的一个优点是可以一定

    2024年04月24日
    浏览(26)
  • 解读分布式锁(redis实现方案)

    分布式锁是一种用于分布式系统中的并发控制机制,它用于确保在多个节点或多个进程之间的并发操作中,某些关键资源或代码块只能被一个节点或进程同时访问。分布式锁的目的是避免多个节点同时修改共享资源而导致的数据不一致或冲突的问题。通俗的来说,分布式锁的

    2024年02月15日
    浏览(31)
  • 【Java】三种方案实现 Redis 分布式锁

    setnx、Redisson、RedLock 都可以实现分布式锁,从易到难得排序为:setnx Redisson RedLock。一般情况下,直接使用 Redisson 就可以啦,有很多逻辑框架的作者都已经考虑到了。 1.1、简单实现 下面的锁实现可以用在测试或者简单场景,但是它存在以下问题,使其不适合用在正式环境。

    2024年02月05日
    浏览(35)
  • Zookeeper 和 Redis 哪种更好? 为什么使用分布式锁? 1. 利用 Redis 提供的 第二种,基于 ZK 实现分布式锁的落地方案 对于 redis 的分布式锁而言,它有以下缺点:

    关于这个问题,我们 可以从 3 个方面来说: 为什么使用分布式锁? 使用分布式锁的目的,是为了保证同一时间只有一个 JVM 进程可以对共享资源进行操作。 根据锁的用途可以细分为以下两类: 允许多个客户端操作共享资源,我们称为共享锁 这种锁的一般是对共享资源具有

    2024年01月16日
    浏览(39)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包