分布式锁之redis实现

这篇具有很好参考价值的文章主要介绍了分布式锁之redis实现。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

docker安装redis

拉取镜像

docker pull redis:6.2.6

查看镜像

分布式锁之redis实现,springCloud,分布式,redis,数据库

启动容器并挂载目录

需要挂在的data和redis.conf自行创建即可

docker run --restart always -d -v /usr/local/docker/redis/redis.conf:/usr/local/etc/redis/redis.conf -v /usr/local/docker/redis/data:/data --name redis -p 6379:6379 redis:6.2.6 redis-server /usr/local/etc/redis/redis.conf

查看运行状态 

不要忘记开放端口6379

分布式锁之redis实现,springCloud,分布式,redis,数据库

进入容器内部使用redis-cli

docker exec -it 13829d3f335a /bin/bash

redis-cli

[可选]用密码登录 

修改redis.conf配置文件,设置 requirepass xxxxx

spring boot 集成redis

添加依赖

      <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
      </dependency>

添加redis配置 


server.port= 10010

spring.datasource.driver-class-name= com.mysql.cj.jdbc.Driver
spring.datasource.url= jdbc:mysql://39.106.53.30:3306/lock_db?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai
spring.datasource.username=root
spring.datasource.password=root123456

spring.redis.host=39.106.53.30
spring.redis.port=6379

使用StringRedisTemplate

如果直接使用RedisTemplate使用的序列化器是jdk的,存的是二进制,使用StringRedisTemplate默认初始化序列化器就是String类型

    public StringRedisTemplate() {
        this.setKeySerializer(RedisSerializer.string());
        this.setValueSerializer(RedisSerializer.string());
        this.setHashKeySerializer(RedisSerializer.string());
        this.setHashValueSerializer(RedisSerializer.string());
    }

redis演示超卖问题

执行票数存入redis指令

set ticket 5000

 编写代码演示超卖问题

/**
 * @Author sl
 */
@Service

public class TicketServiceImpl implements TicketService {


    @Autowired
    private StringRedisTemplate redisTemplate;

    @Override
    public  void sellTicket(){

        //获取redis中的票数
        String ticket = redisTemplate.opsForValue().get("ticket");

        if(ticket!= null && ticket.length() != 0){
            // 扣减票数

            Integer integer = Integer.valueOf(ticket);

            if(integer >0){
                redisTemplate.opsForValue().set("ticket",String.valueOf(--integer));
            }

        }

    }
}

 5000请求压测,结果为4895,发生了超卖问题

redis解决超卖问题 

解决方案

解决方案

  •         本地jvm锁(这种情况仅限单机,不做介绍)
  •         redis乐观锁 watch  multi exec(性能低)
  •         分布式锁(redis+lua手动实现或者通过redission实现)

redis乐观锁实现 

watch: 监控一个或者多个key,如果这些key在提交事务(exec)之前被其他用户修改过,那么事务将执行失败,需要重新获取最新数据重头操作

multi: 开启事务,使用该命令,标记一个事务块的开始,redis会将这些操作放入队列中

exec: 执行事务

分布式锁之redis实现,springCloud,分布式,redis,数据库

 乐观锁的代码需要包在SessionCallback中实现

package com.test.lockservice.service.impl;

import com.test.lockservice.service.TicketService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.core.RedisOperations;
import org.springframework.data.redis.core.SessionCallback;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;

import java.util.List;

/**
 * @Author sl
 */
@Service

public class TicketServiceImpl implements TicketService {


    @Autowired
    private StringRedisTemplate redisTemplate;

    @Override
    public  void sellTicket(){

        redisTemplate.execute(new SessionCallback<Object>() {
            @Override
            public  Object execute(RedisOperations redisOperations) throws DataAccessException {

                // 开启监听
                redisOperations.watch("ticket");
                //获取redis中的票数
                String ticket = redisTemplate.opsForValue().get("ticket");

                if(ticket!= null && ticket.length() != 0){
                    // 开启事务
                    redisOperations.multi();
                    Integer integer = Integer.valueOf(ticket);
                    // 扣减票数
                    redisOperations.opsForValue().set("ticket",String.valueOf(--integer));

                    // 提交事务
                    List exec = redisOperations.exec();

                    // 如果获取锁失败 ,重试
                    if(exec == null || exec.size() == 0){
                        try {
                            // 减少锁争抢,避免栈内存溢出
                            Thread.sleep(40);
                            sellTicket();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }

                }
                return null;
            }
        });


    }
}

 1000请求压测,结果为4000,没有发生超卖,但性能极低

redis实现分布式锁

分布式锁的实现方案中redis的实现主要思想就是独占排他使用,在redis中可以使用setnx命令进行独占排他使用

  • 加锁 setnx 
  • 解锁 del
  • 重试:递归(容易造成栈内存溢出),这里使用循环

 文章来源地址https://www.toymoban.com/news/detail-695142.html

package com.test.lockservice.service.impl;

import com.test.lockservice.service.TicketService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;

/**
 * @Author sl
 */
@Service

public class TicketServiceImpl implements TicketService {


    @Autowired
    private StringRedisTemplate redisTemplate;

    @Override
    public  void sellTicket(){

        // setnx 排他使用,如果获取锁不成功,则重试
        while(!redisTemplate.opsForValue().setIfAbsent("lock", "111")){
            try {
                Thread.sleep(40);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        try {
            //获取redis中的票数
            String ticket = redisTemplate.opsForValue().get("ticket");

            if(ticket!= null && ticket.length() != 0){
                // 扣减票数

                Integer integer = Integer.valueOf(ticket);

                if(integer >0){
                    redisTemplate.opsForValue().set("ticket",String.valueOf(--integer));
                }
            }
        } finally {
            // 解锁操作
            redisTemplate.delete("lock");
        }

    }
}

压测1000,显示无超卖现象 

分布式锁之redis实现,springCloud,分布式,redis,数据库分布式锁之redis实现,springCloud,分布式,redis,数据库

添加过期时间防止死锁问题

当前代码存在问题,假如现在有4台服务器争抢锁,编号为1的服务器抢到了锁,但是没来得及释放锁,就宕机啦,其他2,3,4服务器就永远拿不到锁,这就是产生的死锁问题,解决方案是给锁添加过期时间来解决

分布式锁之redis实现,springCloud,分布式,redis,数据库

要保证枷锁和设置过期时间具有原子性,否则加了锁,没来得及给过期时间就宕机啦,又会产生死锁问题

expire key 20指令和枷锁指令是两条指令不具有原子性,在这里使用 set key ex 20 nx命令设置过期时间来保证原子性

分布式锁之redis实现,springCloud,分布式,redis,数据库

添加过期时间和获取锁的原子性

redisTemplate.opsForValue().setIfAbsent("lock", "111",3, TimeUnit.SECONDS)

 // setnx 排他使用,如果获取锁不成功,则重试
        while(!redisTemplate.opsForValue().setIfAbsent("lock", "111",3, TimeUnit.SECONDS)){
            try {
                Thread.sleep(40);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
 }

通过UUID防止误删

因为已经加了过期时间,如果加了3秒过期时间,第一个请求到了第3秒还没执行完毕,锁就失效了,这时第二个请求获取锁,执行1s的时候,第一个请求执行到del指令,就把第二个锁删除掉啦(误删)

解决方案:通过uuid标识是自己的锁,通过判断是自己的锁,在删除

分布式锁之redis实现,springCloud,分布式,redis,数据库

添加uuid防止误删

    public  void sellTicket(){

        String uuid = UUID.randomUUID().toString();
        // setnx 排他使用,如果获取锁不成功,则重试
        while(!redisTemplate.opsForValue().setIfAbsent("lock", uuid,3, TimeUnit.SECONDS)){
            try {
                Thread.sleep(40);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        try {
            //获取redis中的票数
            String ticket = redisTemplate.opsForValue().get("ticket");

            if(ticket!= null && ticket.length() != 0){
                // 扣减票数

                Integer integer = Integer.valueOf(ticket);

                if(integer >0){
                    redisTemplate.opsForValue().set("ticket",String.valueOf(--integer));
                }
            }
        } finally {
            // 判断是自己的锁在删除
            if(uuid.equals(redisTemplate.opsForValue().get("lock"))){
                redisTemplate.delete("lock");
            }
            
        }

    }

使用Lua脚本解决防误删的原子性问题

判断和删除锁之间需要保证原子性第一个请求因为如果判断的时候,发现是自己的锁,然后此时锁超过了过期时间,此时,第二个请求获取到锁,第一个请求执行del指令,删除的是第二个请求的锁,所以需要在判断和删除锁之间保持原子性

解决方案:使用Lua脚本保证原子性,Lua脚本将多条命令一次性发给redis,redis单线程的特性可以保证原子性操作

Lua脚本介绍和redis执行Lua脚本 

Lua 是一种轻量小巧的脚本语言,用标准C语言编写并以源代码形式开放, 其设计目的是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功能,编译后仅仅一百余K,可以很方便的嵌入别的程序里

菜鸟地址:https://www.runoob.com/lua/lua-variables.html

Lua脚本流程控制和变量定义

--[ 定义全局变量a 局部变量用local a --]
a = 100;
--[ 检查条件 --]
if( a < 20 )
then
   --[ if 条件为 true 时执行该语句块 --]
   print("a 小于 20" )
else
   --[ if 条件为 false 时执行该语句块 --]
   print("a 大于 20" )
end
print("a 的值为 :", a)

在redis中执行Lua脚本

redis中继承了Lua脚本,lua-time-limit参数现在脚本最长运行时间,默认是5秒,执行指令为:

eval script numkeys key [key ...] arg [arg ...]

numkeys:标识key的数量 不能省略

hello word

eval "return 'hello world'" 0

分支语句KEYS和ARGV必须大写

eval "if KEYS[1]==1 then return KEYS[1] else return  ARGV[1] end" 1 0 3 

分布式锁之redis实现,springCloud,分布式,redis,数据库解决判断和删除之间的原子性问题

// 如果是自己的锁,则删除,否则返回0为false
if redis.call('get',KEYS[1]) == ARGV[1]
then
    return redis.call('del',KEYS[1])
else
    return 0
end

keys:lock

argv: uuid
 public  void sellTicket(){

        String uuid = UUID.randomUUID().toString();
        // setnx 排他使用,如果获取锁不成功,则重试
        while(!redisTemplate.opsForValue().setIfAbsent("lock", uuid,3, TimeUnit.SECONDS)){
            try {
                Thread.sleep(40);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        try {
            //获取redis中的票数
            String ticket = redisTemplate.opsForValue().get("ticket");

            if(ticket!= null && ticket.length() != 0){
                // 扣减票数

                Integer integer = Integer.valueOf(ticket);

                if(integer >0){
                    redisTemplate.opsForValue().set("ticket",String.valueOf(--integer));
                }
            }
        } finally {
            String script="if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";

            this.redisTemplate.execute(new DefaultRedisScript<>(script,Boolean.class), Collections.singletonList("lock"),uuid);

        }

    }

 压测1000 显示无超卖现象

分布式锁之redis实现,springCloud,分布式,redis,数据库

hash+Lua解决锁的可重复入问题

由于上述加锁命令使用了 SETNX ,一旦键存在就无法再设置成功,这就导致后续同一线程内继续加 锁,将会加锁失败。当一个线程执行一段代码成功获取锁之后,继续执行时,又遇到加锁的子任务代码,可重入性就保证线程能继续执行,而不可重入就是需要等待锁释放之后,再次获取锁成功,才能继续往下执行

第一个就是锁的重入问题

当前方法a获取锁,在方法之中调用b方法,b方法也需要获取锁,这个时候造成了死锁问题,采用hash+Lua脚本解决

第二个就是锁的自动续期问题:后续会解决续期问题

探讨ReentrantLock的可重入原理

ReentrantLock继承了aqs,aqs是锁的基石

可重入锁加锁流程

  • CAS获取锁,如果没有线程占用锁(state==0),加锁成功并记录当前线程是有锁线程
  • 如果state的值不为0,说明锁已经被占用。则判断当前线程是否是有锁线程,如果是则重入 (state + 1)
  • 否则加锁失败,入队等待

可重入锁解锁流程

  • 判断当前线程是否是有锁线程,不是则抛出异常
  • 对state的值减1之后,判断state的值是否为0,为0则解锁成功,返回true
  • 如果减1后的值不为0,则返回false

hash+Lua实现可重复入锁

参照ReentrantLock中的非公平可重入锁实现分布式可重入锁: hash + lua脚本
加锁

  •     判断锁是否存在 (exists),则直接获取锁 hset key field value
  •     如果锁存在则判断是否自己的锁 (hexists),如果是自己的锁则重入: hincrby key field increment
  •     否则重试:递归 循环
加锁
如果锁不存在或者这是自己的锁,就通过hincrby(不存在就新增并加1,存在就加1)获取锁或者锁次
数加1

if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1], ARGV[1]) == 1 
then 
	redis.call('hincrby',KEYS[1], ARGV[1], 1)
	redis.call('expire',KEYS[1],ARGV[2])
	return 1
else 
	return 0
end

keys lock
argv uuid 30


解锁
判断 hash set 可重入 key 的值是否等于 0
如果为 nil 代表 自己的锁已不存在,在尝试解其他线程的锁,解锁失败
如果为 0 代表 可重入次数被减 1
如果为 1 代表 该可重入 key 解锁成功
1 代表解锁成功,锁被释放
0 代表可重入次数被减 1
null 代表其他线程尝试解锁,解锁失败
if redis.call('hexists',KEYS[1],ARGV[1])==0 
then 
	return nil 
elseif redis.call('hincrby',KEYS[1],ARGV[1],-1)>0 
then 
	return 0 
else 
	redis.call('del',KEYS[1]) 
	return 1 
end

keys lock
argv uuid

exists判断lock是否存在,hexists lock uuid 判断filed是否存在

通过hincrby(不存在就新增并加1,存在就加1)获取锁或者锁次数加1,hincrby命令,如果增加的key filed 不存在则新增并加1

分布式锁之redis实现,springCloud,分布式,redis,数据库

加锁工具类

package com.test.lockservice.utils;

import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;

import java.util.Collections;
import java.util.UUID;

public class RedisLock {

    private StringRedisTemplate redisTemplate;

    private String lockName;

    private String uuid;

    private Integer expire = 30;
    private static final ThreadLocal<String> THREAD_LOCAL = new ThreadLocal<>();

    public RedisLock(StringRedisTemplate redisTemplate, String lockName) {
        this.redisTemplate = redisTemplate;
        this.lockName = lockName;
        this.uuid = THREAD_LOCAL.get();
        if (uuid == null) {
            this.uuid = UUID.randomUUID().toString();
            THREAD_LOCAL.set(uuid);
        }
        this.expire = expire;
    }

    public void lock(){
        this.lock(expire);
    }

    public void lock(Integer expire){
        this.expire = expire;
        String script = "if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1], ARGV[1]) == 1 then redis.call('hincrby',KEYS[1], ARGV[1], 1) redis.call('expire',KEYS[1],ARGV[2]) return 1 else return 0 end";
        System.out.println(script);
        if (!this.redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Collections.singletonList(lockName), uuid, expire.toString())){
            try {
                // 没有获取到锁,重试
                Thread.sleep(60);
                lock(expire);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }

    public void unlock(){
        String script = "if redis.call('hexists',KEYS[1],ARGV[1])==0 then return nil elseif redis.call('hincrby',KEYS[1],ARGV[1],-1)>0 then return 0 else redis.call('del',KEYS[1]) return 1 end";
        /**
         * 如果返回值没有使用Boolean,spring-data-redis 进行类型转换时将会把 null 转为 false,这就会影响我们逻辑判断
         * 所以返回类型只好使用 Long:null-解锁失败;0-重入次数减1;1-解锁成功
         */
        Long result = this.redisTemplate.execute(new DefaultRedisScript<>(script, Long.class),  Collections.singletonList(lockName), uuid);

        // 如果未返回值,代表尝试解其他线程的锁
        if (result == null) {
            throw new IllegalMonitorStateException("attempt to unlock lock, not locked by lockName: " + lockName + " with request: " + uuid);
        } else if (result == 1) {
            THREAD_LOCAL.remove();
        }
    }
}

测试可重入 

@Override
    public  void checkAndLock(){
        RedisLock lock = new RedisLock(redisTemplate, "lock");
        lock.lock();
        // 查询票数
        Ticket ticket = ticketMapper.selectOne(new QueryWrapper<Ticket>().eq("sell_company", "12306"));
        // 判断不为空和票数大于0
        if(ticket!=null&& ticket.getCount() > 0){
            ticket.setCount(ticket.getCount()-1);
            ticketMapper.updateById(ticket);
        }
        // 测试可重入
        testRepeatEntry();
        lock.unlock();
    }

    public void testRepeatEntry(){
        RedisLock lock = new RedisLock(redisTemplate, "lock");
        lock.lock();
        System.out.println("redis分布式锁测试可重入");
        lock.unlock();
    }

 压测1000,未发现超卖问题,并解决可重入的问题

分布式锁之redis实现,springCloud,分布式,redis,数据库

锁的自动续期

如果在锁还在使用过程中,锁还未使用完,就失效了,也就产生了锁如何自动添加过期时间的问题 

实现方案: 定时器 + Lua脚本定时续期


自动续期

if redis.call('hexists',KEYS[1],ARGV[1])==1
then
	redis.call('expire',KEYS[1],ARGV[2]) 
	return 1 
else 
	return 0 
end

这里没有选用线程池的原因在于释放锁之后没有取消定时任务的方法,所以选用jdk自带的

Timer作为定时任务 

package com.test.lockservice.utils;

import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;

import java.util.*;

public class RedisLock {

    private StringRedisTemplate redisTemplate;

    private String lockName;

    private String uuid;

    private Integer expire = 30;

    @SuppressWarnings("all")
    private static final Timer timer = new Timer();

    private static final ThreadLocal<String> THREAD_LOCAL = new ThreadLocal<>();

    public RedisLock(StringRedisTemplate redisTemplate, String lockName) {
        this.redisTemplate = redisTemplate;
        this.lockName = lockName;
        this.uuid = THREAD_LOCAL.get();
        if (uuid == null) {
            this.uuid = UUID.randomUUID().toString();
            THREAD_LOCAL.set(uuid);
        }
        this.expire = expire;
    }

    public void lock(){
        this.lock(expire);
    }

    public void lock(Integer expire){
        this.expire = expire;
        String script = "if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1], ARGV[1]) == 1 then redis.call('hincrby',KEYS[1], ARGV[1], 1) redis.call('expire',KEYS[1],ARGV[2]) return 1 else return 0 end";
        if (!this.redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Collections.singletonList(lockName), uuid, expire.toString())){
            try {
                // 没有获取到锁,重试
                Thread.sleep(60);
                lock(expire);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        // 自动续期
        renewExpire();

    }

    public void unlock(){
        String script = "if redis.call('hexists',KEYS[1],ARGV[1])==0 then return nil elseif redis.call('hincrby',KEYS[1],ARGV[1],-1)>0 then return 0 else redis.call('del',KEYS[1]) return 1 end";
        /**
         * 如果返回值没有使用Boolean,spring-data-redis 进行类型转换时将会把 null 转为 false,这就会影响我们逻辑判断
         * 所以返回类型只好使用 Long:null-解锁失败;0-重入次数减1;1-解锁成功
         */
        Long result = this.redisTemplate.execute(new DefaultRedisScript<>(script, Long.class),  Collections.singletonList(lockName), uuid);

        // 如果未返回值,代表尝试解其他线程的锁
        if (result == null) {
            throw new IllegalMonitorStateException("attempt to unlock lock, not locked by lockName: " + lockName + " with request: " + uuid);
        } else if (result == 1) {
            THREAD_LOCAL.remove();
        }
        // 释放锁成功
        this.uuid = null;
    }


    @SuppressWarnings("all")
    private void renewExpire() {
        String script = "if redis.call('hexists',KEYS[1],ARGV[1])==1 then redis.call('expire',KEYS[1],ARGV[2]) return 1 else return 0 end";
        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                if (uuid != null) {
                    redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), RedisLock.this.uuid, expire.toString());
                    renewExpire();
                }
            }
        },expire * 1000 / 3);
    }
}

红锁算法 

利用红锁算法解决集群下锁的问题:

  • 1、应用程序获取当前系统时间
  • 2、应用程序以相同的kv值依次从多个redis实例中获取锁,如果某一个节点超过了一定时间(小于过期时间)没有获取到锁,则放弃,尽快从其他节点获取锁,避免一个节点宕机阻塞
  • 3、计算锁的消耗时间= 客户端当前时间-step1中的事件,获取锁的时间小于总的锁定时间,并且半数以上节点获取锁成功,认为获取锁成功
  • 4、如果获取锁失败,对所有节点释放锁

redis分布式锁小结

redis分布式锁最开始采用setnex+Lua脚本的方式,我们发现存在不可重入的问题,于是使用hash+Lua脚本解决可重入问题,并解决了自动续期问题,但是还存在一个重要问题,就是redis集群部署所带来的并发问题,所以使用Redission作为最终的分布式锁解决方案

redis集群状态下的问题:
  • 客户端A从master获取到锁
  • 在master将锁同步到slave之前,master宕掉了
  • slave节点被晋级为master节点
  • 客户端B取得了同一个资源被客户端A已经获取到的另外一个锁

redisson中的分布式锁  

Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅 提供了一系列的分布式的Java常用对象,还提供了许多分布式服务
Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上

Redisson引入依赖

 <dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.11.2</version>
</dependency>

Redission配置 

package com.test.lockservice.config;

import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @Author sl
 */
@Configuration
public class RedissonConfig {

    @Bean
    public RedissonClient redissonClient(){
        Config config = new Config();
//        config.useClusterServers()
        config.useSingleServer().setAddress("redis://39.106.53.30:6379").setPassword("12345");
        return Redisson.create(config);
    }
}

Redission使用

@Autowired
    private RedissonClient redissonClient;


    public void userRedisson(){
        // 获取锁
        RLock lock = redissonClient.getLock("lock");
        try {
            // 加锁
            lock.lock();
            //获取redis中的票数
            String ticket = redisTemplate.opsForValue().get("ticket");

            if(ticket!= null && ticket.length() != 0){
                // 扣减票数
                Integer integer = Integer.valueOf(ticket);

                if(integer >0){
                    redisTemplate.opsForValue().set("ticket",String.valueOf(--integer));
                }
            }
        } finally {
            // 解锁
            lock.unlock();
        }
    }

1000并发压测,发现并无超卖问题 

分布式锁之redis实现,springCloud,分布式,redis,数据库

分布式锁之redis实现,springCloud,分布式,redis,数据库

RLock原理

 RLock对象实现了 java.util.concurrent.locks.Lock 接口,如果负责储存这个分布式锁的Redisson节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态。为了避免这种情况的发生,Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。默认情况下,看门狗检查锁的超时时间 是30秒钟,也可以通过修改 Config.lockWatchdogTimeout 来另行指定

  • RLock 对象完全符合Java的Lock规范。也就是说只有拥有锁的进程才能解锁,其他进程解锁则会抛出 IllegalMonitorStateException 错误
  • 另外Redisson还通过加锁的方法提供了 leaseTime 的参数来指定加锁的时间。超过这个时间后锁便自动解开了

其实Redisson底层的实现思路同样是hash+Lua脚本的实现方式,在源码中可以看到,下面列举一下加锁的源码

    <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);

        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                  "if (redis.call('exists', KEYS[1]) == 0) then " +
                      "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "return redis.call('pttl', KEYS[1]);",
                    Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    }
    

 公平锁 

基于Redis的Redisson分布式可重入公平锁也是实现了 java.util.concurrent.locks.Lock 接口的一 种 RLock 对象。同时还提供了异步(Async)、反射式(Reactive)和RxJava2标准的接口。它保证了 当多个Redisson客户端线程同时请求加锁时,优先分配给先发出请求的线程。所有请求线程会在一个队 列中排队,当某个线程出现宕机时,Redisson会等待5秒后继续下一个线程,也就是说如果前面有5个 线程都处于等待状态,那么后面的线程会等待至少25秒
public void useFairLock() {
        RLock fairLock = redissonClient.getFairLock("fairLock");
//        fairLock.lock();

        // 10秒钟以后自动解锁
        // 无需调用unlock方法手动解锁
        fairLock.lock(10, TimeUnit.SECONDS);
        System.out.println("加锁成功"+Thread.currentThread().getName());

        // 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
//        boolean res = fairLock.tryLock(100, 10, TimeUnit.SECONDS);
//        fairLock.unlock();
    }


加锁成功http-nio-10010-exec-5
加锁成功http-nio-10010-exec-10

可以看到,公平锁会维护一个队列,按发送顺序依次加锁

分布式锁之redis实现,springCloud,分布式,redis,数据库 

联锁

   在多个redis实例上获取锁,联锁所有的锁都上锁成功才算成功

  @Override
    public void useMutiLock() {

        RLock lock1 = redissonClient.getLock("lock1");
//        RLock lock2 = redissonClient.getLock("lock2");
        //联锁所有的锁都上锁成功才算成功
        RedissonMultiLock redissonMultiLock = new RedissonMultiLock(lock1);
        redissonMultiLock.lock();
        System.out.println("业务内容");
        redissonMultiLock.unlock();
    }

红锁

在多个节点上加锁,大部分节点获取锁成功就算成功


   public void useRedLock() {
        RLock lock1 = redissonClient.getLock("lock1");
//        RLock lock2 = redissonClient.getLock("lock2");
        RedissonRedLock readLock = new RedissonRedLock(lock1);
        // 红锁在大部分节点上加锁成功就算成功
        readLock.lock();
        System.out.println("业务内容");
        readLock.unlock();
    }

读写锁

对读和写上锁,RReadWriteLock实现了java.util.concurrent.locks.ReadWriteLock接口,读-读不阻塞

 public void useReadWriteLock() {
        /**
         * 读-读 不阻塞 读-写 阻塞 写-写 阻塞
         * RReadWriteLock实现了java.util.concurrent.locks.ReadWriteLock接口
         */
        RReadWriteLock rwlock = redissonClient.getReadWriteLock("readWrite");
        // 最常见的读锁
        rwlock.readLock().lock();
        // 写锁
        rwlock.writeLock().lock();
        // 10秒钟以后自动解锁无需调用unlock方法手动解锁
        rwlock.readLock().lock(10, TimeUnit.SECONDS);

        rwlock.writeLock().lock(10, TimeUnit.SECONDS);
        // 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
        // boolean res = rwlock.readLock().tryLock(100, 10, TimeUnit.SECONDS);

        rwlock.readLock().unlock();
        rwlock.writeLock().unlock();
    }

 信号量

资源限流并发工具类,java.util.concurrent.semaphore是单机版限流,RSemaphore是分布式限流,下面的Semaphore会始终限流3个资源

单机版 

package com.test.lockservice.service.impl;

import java.util.concurrent.Semaphore;

/**
 * @Author sl
 */
public class SemaphoreTest {
    public static void main(String[] args) {
        // 3个有限资源
        Semaphore semaphore = new Semaphore(3);

        for (int i = 0; i < 6; i++) {
            new Thread(()->{
                try{
                    // 获取资源
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName() + "抢到车位");
                    Thread.sleep(1000);
                    System.out.println(Thread.currentThread().getName()  +"离开车位");
                }catch (Exception e){
                    e.printStackTrace();
                }finally {
                    // 释放资源
                    semaphore.release();
                }

            }).start();
        }

    }
}

Thread-1抢到车位
Thread-0抢到车位
Thread-4抢到车位
Thread-0离开车位
Thread-1离开车位
Thread-4离开车位
Thread-3抢到车位
Thread-5抢到车位
Thread-2抢到车位
Thread-5离开车位
Thread-3离开车位
Thread-2离开车位

分布式版

  public void useSemaphore() {
        /**
         * RSemaphore 采用了与java.util.concurrent.semaphore相似的接口
         * 资源限流信号量, 3个资源 6个线程, semaphore是单机版限流,RSemaphore是分布式限流
         */
        RSemaphore semaphore = redissonClient.getSemaphore("semaphore");
        try{
            semaphore.acquire();
        }catch(Exception e){
            e.printStackTrace();
        }finally {
            semaphore.release();
        }
    }

闭锁(CountDownLatch)

CountDownLatch并发工具类,一个线程等待一组线程结束是一个做减法的倒计时器,RCountDownLatch 采用了与java.util.concurrent.CountDownLatch 相似的接口和用法,

 单机版

package com.test.lockservice.service.impl;

import java.util.concurrent.CountDownLatch;

/**
 * @Author sl
 */
public class CountDownLatchTest {

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(6);
        for (int i = 1; i <= 6; i++) {
            new Thread(()->{
                System.out.println(Thread.currentThread().getName() + "\t上完自习");
                countDownLatch.countDown();
            },String.valueOf(i)).start();
        }
        // 班长等待所有线程同学走完在锁门
        countDownLatch.await();
        System.out.println(Thread.currentThread().getName() + "\t班长离开,锁门");
    }
}

1	上完自习
3	上完自习
4	上完自习
5	上完自习
2	上完自习
6	上完自习
main	班长离开,锁门

顺道介绍一下CyclicBarrier并发工具类,与CountDownLatch正好相反,它做的是加法

package com.test.lockservice.service.impl;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

/**
 * @Author sl
 */
public class CyclicBarrierTest {

    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(7,()->{
            System.out.println("集齐了卡片,开始召唤神龙");
        });

        for (int i = 0; i < 7; i++) {
            String s = String.valueOf(i);
            new Thread(()->{
                System.out.println(Thread.currentThread().getName() + "\t 收集到第"+s+"卡片");
                try {
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            },String.valueOf(i)).start();

        }
    }
}

0	 收集到第0卡片
6	 收集到第6卡片
2	 收集到第2卡片
1	 收集到第1卡片
5	 收集到第5卡片
4	 收集到第4卡片
3	 收集到第3卡片
集齐了卡片,开始召唤神龙

分布式版

 public void useCountDownLatch() {
        /**
         * RCountDownLatch 采用了与java.util.concurrent.CountDownLatch 相似的接口和用法
         * 一个线程 等待一组线程完事
         * 班长等待所有同学走出门口在锁门 CountDownLatch是单机版 RCountDownLatch是分布式版
         */
        RCountDownLatch latch = redissonClient.getCountDownLatch("anyCountDownLatch");
        latch.trySetCount(6);
        latch.countDown();
        try{
            latch.await();
        }catch (Exception e){
            e.printStackTrace();
        }
    }

关于zookeeper实现分布式锁,在本专栏zookeeper章节中做了简单介绍,就是创建临时顺序节点,值最小的就是锁。 

 

到了这里,关于分布式锁之redis实现的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 微服务学习:SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式

    目录 一、高级篇 二、面试篇 ==============实用篇============== day05-Elasticsearch01 1.初识elasticsearch 1.4.安装es、kibana 1.4.1.部署单点es 1.4.2.部署kibana 1.4.3.安装IK分词器 1.4.4.总结 2.索引库操作 2.1.mapping映射属性 2.2.索引库的CRUD 2.2.1.创建索引库和映射 2.2.2.查询索引库 2.2.3.修改索引库 2.

    2024年02月02日
    浏览(59)
  • 在Spring中,可以使用不同的方式来实现分布式锁,例如基于数据库、Redis、ZooKeeper等

    在Spring中,可以使用不同的方式来实现分布式锁,例如基于数据库、Redis、ZooKeeper等。下面是两种常见的实现方式: 使用Redis实现分布式锁: 使用自定义注解实现本地锁: 以上是两种常见的在Spring中实现分布式锁的方式。第一种方式使用Redis作为分布式锁的存储介质,通过

    2024年03月17日
    浏览(46)
  • SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,系统详解springcloud微服务技术栈

    我们发现在微服务中有一个令人头疼的问题——部署,用Docker去解决这个部署难题 1、项目部署的问题 2、Docker 扔到一台机器上,它们的依赖难道没有干扰吗?不会,docker将打包好的程序放到一个隔离容器去运行,使用沙箱机制,避免互相干扰,之间不可见,这样就解决了混

    2023年04月24日
    浏览(46)
  • (黑马出品_07)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式

    [此文档是在心向阳光的天域的博客加了一些有助于自己的知识体系,也欢迎大家关注这个大佬的博客](https://blog.csdn.net/sinat_38316216/category_12263516.html) [是这个视频](https://www.bilibili.com/video/BV1LQ4y127n4/?p=5spm_id_from=pageDrivervd_source=9beb0a2f0cec6f01c2433a881b54152c) 聚合 可以让我们极其方便

    2024年03月12日
    浏览(56)
  • SpringCloud Alibaba 之 Config配置中心,Redis分布式锁详解

    目录 1.服务配置中心 1.1 服务配置中心介绍  1.2 Nacos Config 实践 1.2.1 Nacos config 入门案例  1.2.2  Nacos 配置动态刷新 1.2.3 配置共享 1.2.4 nacos 几个概念   2.分布式锁 2.1 分布式锁介绍  2.2 Redisson  2.2.1 Redisson 实践  2.2.2 Redisson 原理   首先我们来看一下,微服务架构下关于配置文件

    2024年02月04日
    浏览(43)
  • (黑马出品_高级篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式

    [此文档是在心向阳光的天域的博客加了一些有助于自己的知识体系,也欢迎大家关注这个大佬的博客](https://blog.csdn.net/sinat_38316216/category_12263516.html) [是这个视频](https://www.bilibili.com/video/BV1LQ4y127n4/?p=5spm_id_from=pageDrivervd_source=9beb0a2f0cec6f01c2433a881b54152c) 消息队列在使用过程中,面

    2024年03月19日
    浏览(48)
  • 【业务功能篇87】微服务-springcloud-本地缓存-redis-分布式缓存-缓存穿透-雪崩-击穿

      缓存的作用是减低对数据源的访问频率。从而提高我们系统的性能。 缓存的流程图 2.1 本地缓存   其实就是把缓存数据存储在内存中(Map String,Object ).在单体架构中肯定没有问题。 单体架构下的缓存处理 2.2 分布式缓存   在分布式环境下,我们原来的本地缓存就不是

    2024年02月10日
    浏览(56)
  • 【业务功能100】补充代码【业务功能88】微服务-springcloud-分布式锁-redis-redisson-springcache

    采用redisson做分布式锁,完成数据的查询接口功能getCatelog2JSONRedis 原先从mysql数据库查询的效率较低,现在将部分固定数据展示比如页面的树形栏目信息等,存储到 redis缓存 ,然后基于分布式集群,需要结合本地锁(synchronized )与分布式锁(redissonClient.getLock(“catelog2JSON-lock”

    2024年02月09日
    浏览(46)
  • 分布式天梯图算法在 Redis 图数据库中的应用

    Redis是一个高性能的键值对数据库,支持常用的数据结构和分布式操作,被广泛应用于缓存、消息队列和排行榜等场景。除了基本的数据结构,Redis还支持图数据结构并提供了一些算法支持。 天梯图算法是一种基于贪心的图搜索算法,在寻找最短路径问题中具有很高的效率。

    2024年02月14日
    浏览(35)
  • 分布式锁实现(mysql,以及redis)以及分布式的概念

    我旁边的一位老哥跟我说,你知道分布式是是用来干什么的嘛?一句话给我干懵了,我能隐含知道,大概是用来做分压处理的,并增加系统稳定性的。但是具体如何,我却道不出个1,2,3。现在就将这些做一个详细的总结。至少以后碰到面试官可以说上个123。 那么就正式进入

    2024年01月21日
    浏览(62)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包