前言
1.为什么要用分布式锁
如果是单机情况下(单JVM),线程之间共享内存,只要使用线程锁就可以解决并发问题。但如果是分布式情况下(多JVM),线程A和线程B很可能不是在同一JVM中,这样线程锁就无法起到作用了,这时候就要用到分布式锁来解决。分布式锁其实就是,控制分布式系统不同进程共同访问共享资源的一种锁的实现。如果不同的系统或同一个系统的不同主机之间共享了某个临界资源,往往需要互斥来防止彼此干扰,以保证一致性。
2.分布式锁有哪些实现方式
-
基于缓存(Redis等)实现分布式锁;
-
基于数据库实现分布式锁; 例如唯一性约束排它锁,version 乐观锁
-
基于Zookeeper实现分布式锁;
从性能角度(从高到低)
缓存 > Zookeeper >= 数据库
3.使用redis作为分布锁的好处
-
Redis为单进程单线程模式,采用队列模式将并发访问变成串行访问,且多客户端对Redis的连接并不存在竞争关系
-
纯内存操作,单线程操作,避免了频繁的上下文切换 非堵塞 I/O 多路复用 等特点使得 速度快
4.我们先来看下,一把靠谱的分布式锁应该有哪些特征
-
「互斥性」: 任意时刻,只有一个客户端能持有锁。
-
「锁超时释放」:持有锁超时,可以释放,防止不必要的资源浪费,也可以防止死锁。
-
「可重入性」:一个线程如果获取了锁之后,可以再次对其请求加锁,防止死锁。
-
「高性能和高可用」:加锁和解锁需要开销尽可能低,同时也要保证高可用,避免分布式锁失效。
-
「安全性」:锁只能被持有的客户端删除,不能被其他客户端删除
Redis分布式锁方案
Redis分布式锁方案一:SETNX + EXPIRE
提到Redis的分布式锁,很多小伙伴马上就会想到setnx
+ expire
命令。即先用setnx
来抢锁,如果抢到之后,再用expire
给锁设置一个过期时间,防止锁忘记了释放。
SETNX 是SET IF NOT EXISTS的简写.日常命令格式是SETNX key value,如果 key不存在,则SETNX成功返回1,如果这个key已经存在了,则返回0。
假设某电商网站的某商品做秒杀活动,key可以设置为key_resource_id,value设置任意值,伪代码如下:
if(jedis.setnx(key_resource_id,lock_value)==1) { //加锁 expire(key_resource_id,100); //设置过期时间 try { do something //业务请求 } catch () { } finally { jedis.del(key_resource_id); //释放锁 } }
但是这个方案中,setnx
和expire
两个命令分开了,「不是原子操作」。如果执行完setnx
加锁,正要执行expire
设置过期时间时,进程crash或者要重启维护了,那么这个锁就“长生不老”了,「别的线程永远获取不到锁啦」。
Redis分布式锁方案二:SETNX + value值是(系统时间+过期时间)
为了解决方案一,「发生异常锁得不到释放的场景」,有小伙伴认为,可以把过期时间放到setnx
的value值里面。如果加锁失败,再拿出value值校验一下即可。加锁代码如下:
long expires = System.currentTimeMillis() + expireTime; //系统时间+设置的过期时间 String expiresStr = String.valueOf(expires); // 如果当前锁不存在,返回加锁成功 if (jedis.setnx(key_resource_id, expiresStr) == 1) { return true; } // 如果锁已经存在,获取锁的过期时间 String currentValueStr = jedis.get(key_resource_id); // 如果获取到的过期时间,小于系统当前时间,表示已经过期 if (currentValueStr != null && Long.parseLong(currentValueStr) < System.currentTimeMillis()) { // 锁已过期,获取上一个锁的过期时间,并设置现在锁的过期时间 String oldValueStr = jedis.getSet(key_resource_id, expiresStr); if (oldValueStr != null && oldValueStr.equals(currentValueStr)) { // 考虑多线程并发的情况,只有一个线程的设置值和当前值相同,它才可以加锁 return true; } } //其他情况,均返回加锁失败 return false; }
这个方案的优点是,巧妙移除expire
单独设置过期时间的操作,把「过期时间放到setnx的value值」里面来。解决了方案一发生异常,锁得不到释放的问题。但是这个方案还有别的缺点:
过期时间是客户端自己生成的(System.currentTimeMillis()是当前系统的时间),必须要求分布式环境下,每个客户端的时间必须同步。
如果锁过期的时候,并发多个客户端同时请求过来,都执行jedis.getSet(),最终只能有一个客户端加锁成功,但是该客户端锁的过期时间,可能被别的客户端覆盖
该锁没有保存持有者的唯一标识,可能被别的客户端释放/解锁。
Redis分布式锁方案三:使用Lua脚本(包含SETNX + EXPIRE两条指令)
实际上,我们还可以使用Lua脚本来保证原子性(包含setnx和expire两条指令),lua脚本如下:
if redis.call('setnx',KEYS[1],ARGV[1]) == 1 then redis.call('expire',KEYS[1],ARGV[2]) else return 0 end;
加锁代码如下:
String lua_scripts = "if redis.call('setnx',KEYS[1],ARGV[1]) == 1 then" + " redis.call('expire',KEYS[1],ARGV[2]) return 1 else return 0 end"; Object result = jedis.eval(lua_scripts, Collections.singletonList(key_resource_id), Collections.singletonList(values)); //判断是否成功 return result.equals(1L);
Redis支持LUA脚本的主要优势
高效性:减少网络开销及时延,多次redis服务器网络请求的操作,使用LUA脚本可以用一个请求完成
数据可靠性:Redis会将整个脚本作为一个整体执行,中间不会被其他命令插入。
原子操作:将脚本作为一个整体执行,中间不会插入其他命令,无需使用事务
可嵌入性:可嵌入JAVA,C#等多种编程语言,支持不同操作系统跨平台交互
复用:客户端发送的脚本永久存在redis中,其他客户端可以复用脚本
Redis分布式锁方案方案四:SET的扩展命令(SET EX PX NX)
除了使用,使用Lua脚本,保证SETNX + EXPIRE
两条指令的原子性,我们还可以巧用Redis的SET指令扩展参数!(SET key value[EX seconds][PX milliseconds][NX|XX]
),它也是原子性的!
SET key valueEX seconds[NX|XX]
NX :表示key不存在的时候,才能set成功,也即保证只有第一个客户端请求才能获得锁,而其他客户端请求只能等其释放锁,才能获取。
EX seconds :设定key的过期时间,时间单位是秒。
PX milliseconds: 设定key的过期时间,单位为毫秒
XX: 仅当key存在时设置值
伪代码demo如下:
但是呢,
if(jedis.set(key_resource_id, lock_value, "NX", "EX", 100s) == 1){ //加锁 try { do something //业务处理 }catch(){ } finally { jedis.del(key_resource_id); //释放锁 } }
这个方案还是可能存在问题:
-
问题一:「锁过期释放了,业务还没执行完」。假设线程a获取锁成功,一直在执行临界区的代码。但是100s过去后,它还没执行完。但是,这时候锁已经过期了,此时线程b又请求过来。显然线程b就可以获得锁成功,也开始执行临界区的代码。那么问题就来了,临界区的业务代码都不是严格串行执行的啦。
-
问题二:「锁被别的线程误删」。假设线程a执行完后,去释放锁。但是它不知道当前的锁可能是线程b持有的(线程a去释放锁时,有可能过期时间已经到了,此时线程b进来占有了锁)。那线程a就把线程b的锁释放掉了,但是线程b临界区业务代码可能都还没执行完呢。
方案五:SET EX PX NX + 校验唯一随机值,再删除
既然锁可能被别的线程误删,那我们给value值设置一个标记当前线程唯一的随机数,在删除的时候,校验一下,不就OK了嘛。伪代码如下:
uni_request_id = uuid; if(jedis.set(key_resource_id, uni_request_id, "NX", "EX", 100s) == 1){ //加锁 try { do something //业务处理 }catch(){ } finally { //判断是不是当前线程加的锁,是才释放 if (uni_request_id.equals(jedis.get(key_resource_id))) { jedis.del(lockKey); //释放锁 } } }
在这里,「判断是不是当前线程加的锁」和「释放锁」不是一个原子操作。如果调用jedis.del()释放锁的时候,可能这把锁已经不属于当前客户端,会解除他人加的锁。
为了更严谨,一般也是用lua脚本代替。lua脚本如下:
if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end;
方案六:解决可重入性问题可采用计数器的方式解决。
解决可重入性的问题
可重入问题举例
看下面的demo
public class Demo1 {
public synchronized void functionA(){
System.out.println("iAmFunctionA");
functionB();
}
public synchronized void functionB(){
System.out.println("iAmFunctionB");
}
}
代码解释
functionA()和functionB()都是同步方法,当线程进入funcitonA()会获得该类的对象锁,这个锁"new Demo1()",在functionA()对方法functionB()做了调用,但是functionB()也是同步的,因此该线程需要再次获得该对象锁(new Demo1())。其他线程是无法获该对象锁的。
这就是可重入锁。
可重入锁的作用就是为了避免死锁,java中synchronized和ReentrantLock都是可重入锁
@Service public class RedisLockImpl implements RedisLock{ public static ThreadLocal<CountDTO> threadLocal = new ThreadLocal<>(); /** *上锁 */ @Override public boolean tryLock(String key, String value, Long time) { CountDTO countDTO = threadLocal.get(); Boolean lock if (countDTO == null){ String clientId = UUID.randomUUID().toString(); countDTO.setClientId(clientId); countDTO.setCount(new AtomicInteger(1)); lock = (jedis.set(key, countDTO.getClientId(), "NX", "EX", time) == 1); } else { // ++i countDTO.setCount(new AtomicInteger(countDTO.getCount().incrementAndGet())); return true; } return lock; } } /** *释放锁 */ @Override public void releaseLock(String key) { try { if (threadLocal.get() != null && threadLocal.get().getClientId().equals(jedis.get(key))){ if (threadLocal.get().getCount().get() == 0){ jedis.evalsha(unlockLua, Arrays.asList(key), Arrays.asList(threadLocal.get().getClientId())); } else { CountDTO countDTO = threadLocal.get(); countDTO.setCount(new AtomicInteger(countDTO.getCount().decrementAndGet())); threadLocal.set(countDTO); } } } finally { if (threadLocal.get() != null){ if (threadLocal.get().getCount().get() == 0){ threadLocal.remove(); } } } } private final String unlockLua = jedis.scriptLoad("local lock_key=KEYS[1]\n" + "local lock_value=ARGV[1]\n" + "\n" + "local current_value=redis.call('get',lock_key)\n" + "local result=0\n" + "if lock_value==current_value then\n" + " redis.call('del',lock_key)\n" + " result=1\n" + "end\n" + " return result"); }
方案七:增加自旋,变成自旋锁
减少未获取到锁的失败次数
@Service public class RedisLockImpl implements RedisLock{ public static ThreadLocal<CountDTO> threadLocal = new ThreadLocal<>(); /** *上锁 * timeout 等待最大超时时间 */ @Override public boolean tryLock(String key, String value, Long time, Long timeout) { CountDTO countDTO = threadLocal.get(); if (countDTO == null){ String clientId = UUID.randomUUID().toString(); countDTO.setClientId(clientId); countDTO.setCount(new AtomicInteger(1)); long tryTime = System.currentTimeMillis() + timeout * 1000L; } else { countDTO.setCount(new AtomicInteger(countDTO.getCount().incrementAndGet())); return true; } while (System.currentTimeMillis() < tryTime) { if (jedis.set(key, countDTO.getClientId(), "NX", "EX", time) == 1) { logger.info("get lock success ,key=" + key + ", expire seconds=" + time); return true; } try { TimeUnit.MILLISECONDS.sleep(50); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } @Override public void releaseLock(String key) { try { if (threadLocal.get() != null && threadLocal.get().getClientId().equals(jedis.get(key))){ if (threadLocal.get().getCount().get() == 0){ jedis.evalsha(unlockLua, Arrays.asList(key), Arrays.asList(threadLocal.get().getClientId())); } else { CountDTO countDTO = threadLocal.get(); countDTO.setCount(new AtomicInteger(countDTO.getCount().decrementAndGet())); threadLocal.set(countDTO); } } } finally { if (threadLocal.get() != null){ if (threadLocal.get().getCount().get() == 0){ threadLocal.remove(); } } } } private final String unlockLua = jedis.scriptLoad("local lock_key=KEYS[1]\n" + "local lock_value=ARGV[1]\n" + "\n" + "local current_value=redis.call('get',lock_key)\n" + "local result=0\n" + "if lock_value==current_value then\n" + " redis.call('del',lock_key)\n" + " result=1\n" + "end\n" + " return result"); }
方案八:增加看门狗
如果key超时了也会使得锁在执行过程中失效,那么怎么解决呢,watchDog看门狗机制其实就是应对这种情况,实现锁的一个续命处理
@Service public class RedisLockImpl implements RedisLock { public static ThreadLocal<CountDTO> threadLocal = new ThreadLocal<>(); //定期执行任务 private ScheduledThreadPoolExecutor scheduledExecutorService = new ScheduledThreadPoolExecutor(1); @Override public boolean tryLock(String key, String value, Long time, Long timeout) { CountDTO countDTO = threadLocal.get(); if (countDTO == null) { String clientId = UUID.randomUUID().toString(); countDTO.setClientId(clientId); countDTO.setCount(new AtomicInteger(1)); } else { countDTO.setCount(new AtomicInteger(countDTO.getCount().incrementAndGet())); return true; } long tryTime = System.currentTimeMillis() + timeout * 1000L; while (System.currentTimeMillis() < tryTime) { if (jedis.set(key, countDTO.getClientId(), "NX", "EX", time) == 1) { logger.info("get lock success ,key=" + key + ", expire seconds=" + time); this.watchDog(key, countDTO.getClientId(), time); return true; } try { TimeUnit.MILLISECONDS.sleep(50); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } @Override public void releaseLock(String key) { try { if (threadLocal.get() != null && threadLocal.get().getClientId().equals(jedis.get(key))) { if (threadLocal.get().getCount().get() == 0) { //shutdown方法:平滑的关闭ExecutorService,当此方法被调用时,ExecutorService停止接收新的任务并且等待已经提交的任务(包含提交正在执行和提交未执行)执行完成。当所有提交任务执行完毕,线程池即被关闭。 scheduledExecutorService.shutdown(); jedis.evalsha(unlockLua, Arrays.asList(key), Arrays.asList(threadLocal.get().getClientId())); } else { CountDTO countDTO = threadLocal.get(); countDTO.setCount(new AtomicInteger(countDTO.getCount().decrementAndGet())); threadLocal.set(countDTO); } } } finally { if (threadLocal.get() != null) { if (threadLocal.get().getCount().get() == 0) { threadLocal.remove(); } } } } /** * 看门狗执行逻辑 */ private void watchDog(String key, String value, long ttl) { //获取续命速率 long rate = getRate(ttl); if (scheduledExecutorService.isShutdown()) { scheduledExecutorService = new ScheduledThreadPoolExecutor(1); } //周期性执行,根据rate进行执行 scheduledExecutorService.scheduleAtFixedRate(new watchDogThread(scheduledExecutorService, Arrays.asList(key), Arrays.asList(value, String.valueOf(ttl))), 1, rate, TimeUnit.SECONDS); } private long getRate(long ttl) { if (ttl - 5 > 0) { return ttl - 5; } else if (ttl - 1 > 0) { return ttl - 1; } throw new RuntimeException("ttl 不允许小于1"); } class watchDogThread implements Runnable { private ScheduledThreadPoolExecutor poolExecutor; private List<String> keys; private List<String> args; public watchDogThread(ScheduledThreadPoolExecutor poolExecutor, List<String> keys, List<String> args) { this.poolExecutor = poolExecutor; this.keys = keys; this.args = args; } @Override public void run() { logger.info("进行续期"); if ((long) jedis.evalsha(watchLua, keys, args) == 0) { //续期失败 可能是业务系统发生异常并且没有进行异常捕捉,没有进行释放锁操作 poolExecutor.shutdown(); } } } private final String watchLua = jedis.scriptLoad("local lock_key=KEYS[1]\n" + "local lock_value=ARGV[1]\n" + "local lock_ttl=ARGV[2]\n" + "local current_value=redis.call('get',lock_key)\n" + "local result=0;\n" + "if lock_value==current_value then\n" + " result=1;\n" + " redis.call('expire',lock_key,lock_ttl)\n" + "end\n" + "return result"); private final String unlockLua = jedis.scriptLoad("local lock_key=KEYS[1]\n" + "local lock_value=ARGV[1]\n" + "\n" + "local current_value=redis.call('get',lock_key)\n" + "local result=0\n" + "if lock_value==current_value then\n" + " redis.call('del',lock_key)\n" + " result=1\n" + "end\n" + " return result"); }
方案九:Redisson框架
Redisson是架设在Redis基础上的一个Java驻内存数据网格(In-Memory Data Grid)。 Redisson在基于NIO的Netty框架上,生产环境使用分布式锁。
public String discount() throws IOException{ String key = "lock001"; //加锁 DistributedRedisLock.acquire(key); //执行具体业务逻辑 dosoming //释放锁 DistributedRedisLock.release(key); //返回结果 return soming; }
执行步骤
-
根据负载均衡策略选择一个redis主节点。
-
执行lua脚本加锁,加锁成功返回null,否则返回当前锁的剩余过期时间。
-
watchlog机制,定时线程给当前锁每隔10s执行一次续命。文章来源:https://www.toymoban.com/news/detail-607404.html
-
获取锁成功 则return,失败则自旋获取锁。文章来源地址https://www.toymoban.com/news/detail-607404.html
到了这里,关于redis分布式锁的9种实现方式的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!