Redisson分布式锁

这篇具有很好参考价值的文章主要介绍了Redisson分布式锁。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、Redisson简单介绍

Redisson是一个在Redis的基础上实现的Java驻内存数据网格,可参考Redisson官方文档使用,它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。

二、Redisson简单使用

1. maven引用

<dependency>
			<groupId>org.redisson</groupId>
			<artifactId>redisson</artifactId>
			<version>3.19.1</version>
		</dependency>

2. RedisConfig配置

@Configuration
public class RedissonConfig {
    @Value("${spring.redis.host}")
    private String redisAddress;

    @Bean
    public RedissonClient redissonClient(){
        // 默认连接地址 127.0.0.1:6379
        RedissonClient redisson = Redisson.create();

//        Config config = new Config();
//        config.useSingleServer().setAddress("redis://" + redisAddress); //redis服务器地址
//                .setDatabase(0) //制定redis数据库编号
//                .setUsername("").setPassword() // redis用户名密码
//                .setConnectionMinimumIdleSize(10) //连接池最小空闲连接数
//                .setConnectionPoolSize(50) //连接池最大线程数
//                .setIdleConnectionTimeout(60000) //线程的超时时间
//                .setConnectTimeout(6000) //客户端程序获取redis链接的超时时间
//                .setTimeout(60000) //响应超时时间
//        RedissonClient redisson = Redisson.create(config);
        return redisson;
    }
}

3. StockRedissonService

@Service
public class StockRedissonService {

    @Autowired
    private StringRedisTemplate redisTemplate;

    @Autowired
    private RedissonClient redissonClient;

    public void deduct() {

        RLock rlock = redissonClient.getLock("lock");
        rlock.lock();

        try{
            // 1。 查询库存
            String stockStr = redisTemplate.opsForValue().get("stock");
            if (!StringUtil.isNullOrEmpty(stockStr)) {
                int stock = Integer.parseInt(stockStr);
                // 2。 判断条件是否满足
                if (stock > 0) {
                    // 3 更新redis
                    redisTemplate.opsForValue().set("stock", String.valueOf(stock - 1));
                }
            }
        }finally {
            rlock.unlock();
        }
    }

}

简单来说,就是直接

RLock rlock = redissonClient.getLock(“lock”);

获取到锁,然后lock()和unlock()即可。

4. 测试

redisson分布式锁,分布式锁,分布式,redis,java,缓存
并发达到了660,比之前自己写的redis LUA脚本还要高。

三、Redisson源码

1. 加锁

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
                "if ((redis.call('exists', KEYS[1]) == 0) " +
                            "or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                    "end; " +
                    "return redis.call('pttl', KEYS[1]);",
                Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
    }

这就是加锁的核心代码,其实本质上也就是LUA脚本+hash数据类型

// 如果这个key不存在,或者这个key是我自己的,那么state+1,并且重置失效时间
"if ((redis.call('exists', KEYS[1]) == 0) " +
"or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
// 如果不满足,则以毫秒为单位,返回key的过期时间
"return redis.call('pttl', KEYS[1]);",

pttl: 以毫秒为单位返回 key 的剩余过期时间。
当 key 不存在时,返回 -2 。
当 key 存在但没有设置剩余生存时间时,返回 -1 。
否则,以毫秒为单位,返回 key 的剩余生存时间。

这其实和我们之前手写的lua脚本并没有什么太大差别,只是它这里return nil表示的是加锁成功

2. 解锁

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
              "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                        "return nil;" +
                    "end; " +
                    "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                    "if (counter > 0) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                        "return 0; " +
                    "else " +
                        "redis.call('del', KEYS[1]); " +
                        "redis.call('publish', KEYS[2], ARGV[1]); " +
                        "return 1; " +
                    "end; " +
                    "return nil;",
                Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
    }

解锁的核心代码,和加锁一样,本质上也是LUA脚本+hash数据类型

// 如果key不是我的,直接返回nil,解锁失败
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
// 如果key是我的,那么state-1
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
// state-1后的值,如果>0,说明重入过了,那更新下失效时间
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
// state-1后的值,如果=0,说明全部解锁成功了,删除锁,并且publish
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;",

3. 自动续期

private void renewExpiration() {
        ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
        if (ee == null) {
            return;
        }
        
        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
                if (ent == null) {
                    return;
                }
                Long threadId = ent.getFirstThreadId();
                if (threadId == null) {
                    return;
                }
                
                CompletionStage<Boolean> future = renewExpirationAsync(threadId);
                future.whenComplete((res, e) -> {
                    if (e != null) {
                        log.error("Can't update lock {} expiration", getRawName(), e);
                        EXPIRATION_RENEWAL_MAP.remove(getEntryName());
                        return;
                    }
                    
                    if (res) {
                        // reschedule itself
                        renewExpiration();
                    } else {
                        cancelExpirationRenewal(null);
                    }
                });
            }
        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
        
        ee.setTimeout(task);
    }

它也是使用的new TimerTask() ,同样也是一次性续期,若res=1,再次续期。

官方文档:如果负责储存这个分布式锁的Redisson节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态。为了避免这种情况的发生,Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。默认情况下,看门狗的检查锁的超时时间是30秒钟,也可以通过修改Config.lockWatchdogTimeout来另行指定。

不同的是,即使redisson节点宕机,它还是可以自动续期,保证你的业务逻辑正常结束。

4. 总结

其实和我们之前手写实现的redis分布式锁原理差不多,但是它是配合着CompletableFuture去异步实现,更加高效
本质上还是LUA脚本+hash数据类型

四、Redisson公平锁

  1. 公平锁
    redisson分布式锁,分布式锁,分布式,redis,java,缓存

非公平锁:来个线程就先试试能不能插队,不能插队才去后面排队
公平锁:线程都乖乖去后面排队去,不准插队

  1. 代码

StockController新增一个接口,用于调用公平锁

 @GetMapping("fair/lock/{id}")
    public String fairLock(@PathVariable("id")Long id){
        stockService.fairLock(id);
        return "hello fair lock";
    }

StockRedissonService

public void fairLock(Long id) {

        RLock fairLock = redissonClient.getFairLock("fairLock");
        fairLock.lock();
        try {
            TimeUnit.SECONDS.sleep(3);
            System.out.println("-----测试公平锁-----");
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            fairLock.unlock();
        }
    }
  1. 测试
    访问http://localhost:10010/fair/lock/1 从1一直访问到5

redisson分布式锁,分布式锁,分布式,redis,java,缓存

redis中有个等待队列
redisson分布式锁,分布式锁,分布式,redis,java,缓存
访问是顺序执行。

  1. 那如果把公平锁修改为非公平锁呢
RLock fairLock = redissonClient.getLock("fairLock");
  1. 重新测试
    redisson分布式锁,分布式锁,分布式,redis,java,缓存
    redis中并无等待队列
    redisson分布式锁,分布式锁,分布式,redis,java,缓存
    锁访问顺序也并不是先进先出。

ps:若使用nginx转发会超时重发,修改conf即可
redisson分布式锁,分布式锁,分布式,redis,java,缓存

添加如下:

proxy_connect_timeout 12000;
proxy_send_timeout 12000;
proxy_read_timeout 12000;

五、Redisson读写锁

读写 不能并发 加锁
写写 不能并发 加锁
读读 能并发 不加锁

StockController

 @GetMapping("read/lock")
    public String readLock(){
        stockService.readLock();
        return "read read lock";
    }

    @GetMapping("write/lock")
    public String writeLock(){
        stockService.writeLock();
        return "write write lock";
    }

StockRedissonService

public void readLock() {
        RReadWriteLock lock = redissonClient.getReadWriteLock("rwLock");
        lock.readLock().lock(10, TimeUnit.SECONDS);
        // TODO 疯狂的读
//        lock.readLock().unlock();
    }

    public void writeLock() {
        RReadWriteLock lock = redissonClient.getReadWriteLock("rwLock");
        lock.writeLock().lock(10, TimeUnit.SECONDS);
        // TODO 疯狂的写
//        lock.writeLock().unlock();
    }

简单测试下,两个页面分别访问
http://localhost:10010/write/lock 写操作
http://localhost:10010/read/lock 读操作

写写: 多次写操作明显等待
redisson分布式锁,分布式锁,分布式,redis,java,缓存

写读:明显等待
redisson分布式锁,分布式锁,分布式,redis,java,缓存
读写:明显等待
redisson分布式锁,分布式锁,分布式,redis,java,缓存
读锁有多个,但写锁只有一个

读读:无等待现象

五、Redisson的RSemaphore信号量

1. Semaphore:

  • 可以用来控制同时访问特定资源的线程数量,常用于限流场景。

  • Semaphore接收一个int整型值,表示 许可证数量。

  • 线程通过调用acquire()获取许可证,执行完成之后通过调用release()归还许可证。

  • 只有获取到许可证的线程才能运行,获取不到许可证的线程将会阻塞。

 public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(3);
        for (int i = 0; i < 6; i++) {
            new Thread(()->{
                try {
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName() + ":抢到了停车位");
                    TimeUnit.SECONDS.sleep(new Random().nextInt(10));
                    System.out.println(Thread.currentThread().getName() + "停了会儿就开走了");
                    semaphore.release();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }, i + "号车").start();
        }
    }

redisson分布式锁,分布式锁,分布式,redis,java,缓存
每次只会停三辆车,走一辆后才可以停下一辆,完美实现限流。

2. RSemaphore:实现分布式限流

StockRedissonService

 public void semaphoreLock() {
        RSemaphore semaphore = redissonClient.getSemaphore("semaphore");
        semaphore.trySetPermits(3); //设置资源量,限流的线程数
        try {
            semaphore.acquire(); // 获取资源,获取资源成功的线程可以继续处理业务操作,否则会被阻塞住
            redisTemplate.opsForList().rightPush("log",port + "端口获取了资源,开始处理业务逻辑" + Thread.currentThread().getName());
            TimeUnit.SECONDS.sleep(new Random().nextInt(10));
            redisTemplate.opsForList().rightPush("log",port + "端口处理完,释放了资源"  + Thread.currentThread().getName());
            semaphore.release(); // 手动释放资源,后续请求线程可以获取该资源
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }

    }

用redis的list数据类型来记录日志,方便集群部署的日志查看。

  • 注意:
semaphore.trySetPermits(3);//设置资源量,限流的线程数

这会在redis中成semaphore的key,如果要修改资源量,必须手动把redis中该key删除,否则只在代码中修改,重启后无法生效。
redisson分布式锁,分布式锁,分布式,redis,java,缓存

简单测试下,多次访问后
redisson分布式锁,分布式锁,分布式,redis,java,缓存

集群部署下,共用了信号量的限流,两个端口启动,同一时间限流了3个线程。

六、Redisson的RCountDownLatch

1. CountDownLatch:允许一个或者多个线程去等待其他线程完成操作。

  • CountDownLatch接收一个int型参数,表示要等待的工作线程的个数。
  • await(): 使当前线程进入同步队列进行等待,直到latch的值被减到0或者当前线程被中断,当前线程就会被唤醒。
  • countDown(): 使latch的值减1,如果减到了0,则会唤醒所有等待在这个latch上的线程。
public static void main(String[] args) throws InterruptedException { // 班长线程
        CountDownLatch countDownLatch = new CountDownLatch(6);
        for (int i = 0; i < 6; i++) {
            new Thread(() -> {
                System.out.println(Thread.currentThread().getName() + "准备出门了");
                try {
                    TimeUnit.SECONDS.sleep(10);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                System.out.println(Thread.currentThread().getName() + "睡了一会,终于出门了");
                countDownLatch.countDown();
            }, i + "号同学" ).start();
        }

        countDownLatch.await();
        System.out.println("班长锁门了");
    }

班长等六个同学全部出门后才可以锁门
redisson分布式锁,分布式锁,分布式,redis,java,缓存

2. RCountDownLatch

  • StockController
@GetMapping("test/countDown")
    public String countDown(){
        stockService.countDown();
        return "出来了一位同学";
    }

    @GetMapping("test/latch")
    public String testLatch(){
        stockService.testLatch();
        return "班长锁门了";
    }
  • countDown 同学出门
 public void countDown() {
        RCountDownLatch cdl = redissonClient.getCountDownLatch("cdl");
        // TODO 出门准备完成
        cdl.countDown();
    }
  • latch 等六位同学出门后,班长锁门
 public void testLatch() {
        RCountDownLatch cdl = redissonClient.getCountDownLatch("cdl");
        cdl.trySetCount(6);
        try {
            cdl.await();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        // TODO 准备锁门
    }
 RCountDownLatch cdl = redissonClient.getCountDownLatch("cdl");

其中的“cdl"即是存放在redis的key,两个方法必须命名一致。

分别访问接口
http://localhost/test/countDown 出门
http://localhost/test/latch 班长锁门
只有访问六次出门后,才可以成功锁门
redisson分布式锁,分布式锁,分布式,redis,java,缓存

redis中存放着cdl的key,也就是目前的等待线程数。

七、 总结

redisson: redis的java客户端,分布式锁
玩法:文章来源地址https://www.toymoban.com/news/detail-602748.html

  1. 引入依赖
  2. java配置类:RedissonConfig
  3. 代码使用
  • 可重入锁RLock对象: CompletableFuture + LUA脚本 + hash
RLock rlock = redissonClient.getLock("xxx");
rlock.lock()/unlock()
  • 公平锁:
RLock fairLock = redissonClient.getLock("xxx");
fairLock.lock()/unlock()
  • 联锁 和 红锁
  • 读写锁:
 RReadWriteLock lock = redissonClient.getReadWriteLock("xxx");
lock.readLock().lock()/unlock();
lock.writeLock().lock()/unlock();
  • 信号量:
RSemaphore semaphore = redissonClient.getSemaphore("xxx");
semaphore.trySetPermits(3); //设置资源量,限流的线程数
semaphore.acquire()/release()
  • 闭锁:
RCountDownLatch cdl = redissonClient.getCountDownLatch("xxx");
cdl.trySetCount(6);
cdl.await()/countDown()

到了这里,关于Redisson分布式锁的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Redis分布式锁及Redisson的实现原理

    Redis分布式锁 在讨论分布式锁之前我们回顾一下一些单机锁,比如synchronized、Lock 等 锁的基本特性: 1.互斥性:同一时刻只能有一个节点访问共享资源,比如一个代码块,或者同一个订单同一时刻只能有一个线程去支付等。 2.可重入性: 允许一个已经获得锁的线程,在没有释

    2024年02月06日
    浏览(45)
  • 在Java项目中使用redisson实现分布式锁

    在Java项目中使用Redission自定义注解实现分布式锁: 添加Redission依赖项:在项目的pom.xml中添加Redission依赖项: 创建自定义注解:创建一个自定义注解来标记需要使用分布式锁的方法。例如,创建一个名为 @DistributedLock 的注解: 创建注解切面:创建一个切面类,通过AOP将注解

    2024年02月16日
    浏览(39)
  • 【业务功能100】补充代码【业务功能88】微服务-springcloud-分布式锁-redis-redisson-springcache

    采用redisson做分布式锁,完成数据的查询接口功能getCatelog2JSONRedis 原先从mysql数据库查询的效率较低,现在将部分固定数据展示比如页面的树形栏目信息等,存储到 redis缓存 ,然后基于分布式集群,需要结合本地锁(synchronized )与分布式锁(redissonClient.getLock(“catelog2JSON-lock”

    2024年02月09日
    浏览(45)
  • 【Redisson】Redisson--分布式中几种锁

    Redisson系列文章: 【Redisson】Redisson–基础入门 【Redisson】Redisson–布隆(Bloom Filter)过滤器 【Redisson】Redisson–分布式锁的使用(推荐使用) 【分布式锁】Redisson分布式锁底层原理 【Redisson】Redisson–限流器、 【Redisson】Redisson–分布式远程服务(Remote Service) 【Redisson】Redisson–

    2024年02月13日
    浏览(50)
  • 【Redisson】Redisson--分布式远程服务(Remote Service)

    Redisson系列文章: 【Redisson】Redisson–基础入门 【Redisson】Redisson–布隆(Bloom Filter)过滤器 【Redisson】Redisson–分布式锁的使用(推荐使用) 【分布式锁】Redisson分布式锁底层原理 【Redisson】Redisson–限流器 当前有两台服务器连接的是同一个Redisson中间件,这两台服务器叫它们

    2024年02月13日
    浏览(46)
  • Redisson分布式锁

    Redisson是一个在Redis的基础上实现的Java驻内存数据网格,可参考Redisson官方文档使用,它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。 简单来说,就是直接 RLock rlock = redissonClient.getLock(“lock”); 获取到锁,然后lock()和unlock()即可。 并发达到了660,比之

    2024年02月16日
    浏览(43)
  • Redisson—分布式对象

    每个Redisson对象实例都会有一个与之对应的Redis数据实例,可以通过调用getName方法来取得Redis数据实例的名称(key)。 所有与Redis key相关的操作都归纳在RKeys这个接口里: Redisson的分布式RBucketJava对象是一种通用对象桶可以用来存放任类型的对象。 除了同步接口外,还提供了异

    2024年02月07日
    浏览(47)
  • Redisson 分布式锁

    Redis是基础客户端库,可用于执行基本操作。 Redisson是基于Redis的Java客户端,提供高级功能如分布式锁、分布式集合和分布式对象。 Redisson提供更友好的API,支持异步和响应式编程,提供内置线程安全和失败重试机制。 实现步骤: 通过导入坐标和配置,注入RedissonClient对象之

    2024年02月11日
    浏览(56)
  • 运用分布式锁 redisson

    导入依赖 根据springboot版本不同自行选择版本 dependency groupIdorg.redisson/groupId artifactIdredisson-spring-boot-starter/artifactId version3.15.3/version /dependency 创建客户端

    2024年01月19日
    浏览(46)
  • SpringBoot+Redisson分布式锁

    org.redisson.config.Config类是Redisson框架中用于配置Redisson客户端的类。以下是一些常用的配置项: codec(编码) :默认值是org.redisson.codec.JsonJacksonCodec,用于定义与Redis交互时使用的编解码器。 useSingleServer :设置为true时,将使用单节点模式进行连接。 useMasterSlave :设置为true时,

    2024年01月19日
    浏览(47)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包