前言
本节内容使用zookeeper实现分布式锁,完成并发访问“超卖”问题的解决。相对于redis分布式锁,zookeeper能够保证足够的安全性。关于zookeeper的安装内容这里不做介绍,开始本节内容之前先自行安装好zookeeper中间键服务。这里我们利用创建zookeeper路径节点的唯一性实现分布式锁。并同时演示如何使用Curator工具包,完成分布式锁。
正文
- 在项目中添加zookeeper的pom依赖
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.7.2</version>
</dependency>
- 创建zookeeper客户端工具,实现加锁和解锁方法
- 创建ZookeeperClient客户端工具
package com.ht.atp.plat.util; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; @Component public class ZookeeperClient { /** * zookeeper连接地址 */ private static final String connectString = "192.168.110.88:2181"; /** * 分布式锁根路径 */ private static final String ROOT_PATH = "/distributed"; /** * zookeeper客户端 */ private ZooKeeper zooKeeper; /** * 初始化zookeeper客户端 */ @PostConstruct public void init() { try { // 连接zookeeper服务器 this.zooKeeper = new ZooKeeper(connectString, 30000, event -> System.out.println("获取链接成功!!")); // 创建分布式锁根节点 if (this.zooKeeper.exists(ROOT_PATH, false) == null) { this.zooKeeper.create(ROOT_PATH, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch (Exception e) { System.out.println("获取链接失败!"); e.printStackTrace(); } } /** * 销毁zookeeper客户端 */ @PreDestroy public void destroy() { try { if (zooKeeper != null) { zooKeeper.close(); } } catch (InterruptedException e) { e.printStackTrace(); } } /** * 加锁 * * @param lockName */ public void lock(String lockName) { try { zooKeeper.create(ROOT_PATH + "/" + lockName, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); } catch (Exception e) { // 重试 try { Thread.sleep(200); lock(lockName); } catch (InterruptedException ex) { ex.printStackTrace(); } } System.out.println("----------加锁成功------------"); } /** * 解锁 * * @param lockName */ public void unlock(String lockName) { try { this.zooKeeper.delete(ROOT_PATH + "/" + lockName, 0); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } System.out.println("----------解锁成功------------"); } }
- 使用临时节点EPHEMERAL加锁,这里使用临时节点是方便锁的自动释放,避免发生死锁问题
- 业务执行完成,删除临时节点,解锁
- 实现“超卖”的业务方法,使用自定义的zookeeper工具类加锁
@Autowired
ZookeeperClient zookeeperClient;
@Override
public void checkAndReduceStock() {
zookeeperClient.lock("lock");
// 查询库存
WmsStock wmsStock = baseMapper.selectById(1L);
// 验证库存大于0再扣减库存
if (wmsStock != null && wmsStock.getStockQuantity() > 0) {
wmsStock.setStockQuantity(wmsStock.getStockQuantity() - 1);
baseMapper.updateById(wmsStock);
}
// 释放锁
zookeeperClient.unlock("lock");
}
- 将数据库库存表的库存恢复为10000,分别启动7000,7001,7002服务
- 启动jmeter压测工具,压测库存扣减接口,查看结果
- 库存扣减为0
- jmeter压测结果,平均访问时间502ms,请求吞吐量为每秒161
- 不存在并发“超卖问题”,但是接口访问吞吐量较低。由于在加锁过程中,获取不到锁,会无限自旋去获取锁,导致性能下降。
- 优化分布式锁,使用zk的临时序列化节点实现分布式锁,避免锁的自旋操作
- 优化代码
package com.ht.atp.plat.util; import org.apache.commons.lang3.StringUtils; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.util.List; @Component public class ZookeeperClientNoBlock { /** * zookeeper连接地址 */ private static final String connectString = "192.168.110.88:2181"; /** * 分布式锁根路径 */ private static final String ROOT_PATH = "/distributed"; /** * zookeeper客户端 */ private ZooKeeper zooKeeper; /** * 初始化zookeeper客户端 */ @PostConstruct public void init() { try { // 连接zookeeper服务器 this.zooKeeper = new ZooKeeper(connectString, 30000, event -> System.out.println("获取链接成功!!")); // 创建分布式锁根节点 if (this.zooKeeper.exists(ROOT_PATH, false) == null) { this.zooKeeper.create(ROOT_PATH, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch (Exception e) { System.out.println("获取链接失败!"); e.printStackTrace(); } } /** * 销毁zookeeper客户端 */ @PreDestroy public void destroy() { try { if (zooKeeper != null) { zooKeeper.close(); } } catch (InterruptedException e) { e.printStackTrace(); } } /** * 创建锁 * * @param lockName */ public String lock(String lockName) { try { String realLock = zooKeeper.create(ROOT_PATH + "/" + lockName + "-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); return realLock; } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("----------加锁成功------------"); return null; } /** * 检查锁 * * @param lockName */ public void checkLock(String lockName) { String preNode = getPreNode(lockName); // 如果该节点没有前一个节点,说明该节点时最小节点,放行执行业务逻辑 if (StringUtils.isEmpty(preNode)) { return; } // 重新检查。是否获取到锁 try { Thread.sleep(20); } catch (InterruptedException ex) { ex.printStackTrace(); } checkLock(lockName); } /** * 获取指定节点的前节点 * * @param path * @return */ private String getPreNode(String path) { try { // 获取当前节点的序列化号 Long curSerial = Long.valueOf(StringUtils.substringAfterLast(path, "-")); // 获取根路径下的所有序列化子节点 List<String> nodes = this.zooKeeper.getChildren(ROOT_PATH, false); // 判空 if (CollectionUtils.isEmpty(nodes)) { return null; } // 获取前一个节点 Long flag = 0L; String preNode = null; for (String node : nodes) { // 获取每个节点的序列化号 Long serial = Long.valueOf(StringUtils.substringAfterLast(node, "-")); if (serial < curSerial && serial > flag) { flag = serial; preNode = node; } } return preNode; } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } return null; } /** * 解锁 * * @param lockName */ public void unlock(String lockName) { try { this.zooKeeper.delete(lockName, 0); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } System.out.println("----------解锁成功------------"); } }
- 使用临时序列化节点EPHEMERAL_SEQUENTIAL加锁,保证每个节点都可以加锁成功,避免自旋操作
- 通过判断当前节点是否是第一个节点,如果是第一个节点,才可执行后续的业务
- 解锁操作
- 扣减库存业务实现方法
public void checkAndReduceStock() {
//加锁
String lock = zookeeperClientNoBlock.lock("lock");
//检查锁
zookeeperClientNoBlock.checkLock(lock);
// 查询库存
WmsStock wmsStock = baseMapper.selectById(1L);
// 验证库存大于0再扣减库存
if (wmsStock != null && wmsStock.getStockQuantity() > 0) {
wmsStock.setStockQuantity(wmsStock.getStockQuantity() - 1);
baseMapper.updateById(wmsStock);
}
// 释放锁
zookeeperClientNoBlock.unlock(lock);
}
- 将扣减库存恢复为10000,重新启动7000,7001,7002服务,使用jmeter压测工具再次压测
- 库存扣减为0
- 压测结果:平均访问时间1597ms,请求访问吞吐量为每秒62
- 从优化结果来看,此种方式更加耗时,性能更差。将加锁操作改为非自旋操作,虽然加锁不在耗时,但是会自旋判断自己是否是最小的节点,依然存在耗时操作,且逻辑更为复杂。
- 优化分布式锁,通过使用zookeeper的Watcher监听来实现阻塞锁
- 实现代码
package com.ht.atp.plat.util; import org.apache.commons.lang3.StringUtils; import org.apache.zookeeper.*; import org.apache.zookeeper.proto.WatcherEvent; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.util.List; import java.util.concurrent.CountDownLatch; @Component public class ZookeeperClientBlockWatch { /** * zookeeper连接地址 */ private static final String connectString = "192.168.110.88:2181"; /** * 分布式锁根路径 */ private static final String ROOT_PATH = "/distributed"; /** * zookeeper客户端 */ private ZooKeeper zooKeeper; /** * 初始化zookeeper客户端 */ @PostConstruct public void init() { try { // 连接zookeeper服务器 this.zooKeeper = new ZooKeeper(connectString, 30000, event -> System.out.println("获取链接成功!!")); // 创建分布式锁根节点 if (this.zooKeeper.exists(ROOT_PATH, false) == null) { this.zooKeeper.create(ROOT_PATH, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch (Exception e) { System.out.println("获取链接失败!"); e.printStackTrace(); } } /** * 销毁zookeeper客户端 */ @PreDestroy public void destroy() { try { if (zooKeeper != null) { zooKeeper.close(); } } catch (InterruptedException e) { e.printStackTrace(); } } /** * 创建锁 * * @param lockName */ public String lock(String lockName) { try { String realLock = zooKeeper.create(ROOT_PATH + "/" + lockName + "-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); return realLock; } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("----------加锁成功------------"); return null; } /** * 检查锁 * * @param lockName */ public void checkLock(String lockName) { try { String preNode = getPreNode(lockName); // 如果该节点没有前一个节点,说明该节点时最小节点,放行执行业务逻辑 if (!StringUtils.isEmpty(preNode)) { CountDownLatch countDownLatch = new CountDownLatch(1); if (this.zooKeeper.exists(ROOT_PATH + "/" + preNode, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println("监控:"+ watchedEvent.getPath()); countDownLatch.countDown(); } }) == null) { return; } // 阻塞,减少自旋检查 countDownLatch.await(); } return; } catch (Exception e) { e.printStackTrace(); // 重新检查。是否获取到锁 try { Thread.sleep(20); } catch (InterruptedException ex) { ex.printStackTrace(); } checkLock(lockName); } } /** * 获取指定节点的前节点 * * @param path * @return */ private String getPreNode(String path) { try { // 获取当前节点的序列化号 Long curSerial = Long.valueOf(StringUtils.substringAfterLast(path, "-")); // 获取根路径下的所有序列化子节点 List<String> nodes = this.zooKeeper.getChildren(ROOT_PATH, false); // 判空 if (CollectionUtils.isEmpty(nodes)) { return null; } // 获取前一个节点 Long flag = 0L; String preNode = null; for (String node : nodes) { // 获取每个节点的序列化号 Long serial = Long.valueOf(StringUtils.substringAfterLast(node, "-")); if (serial < curSerial && serial > flag) { flag = serial; preNode = node; } } return preNode; } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } return null; } /** * 解锁 * * @param lockName */ public void unlock(String lockName) { try { this.zooKeeper.delete(lockName, 0); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } System.out.println("----------解锁成功------------"); } }
- 检查加锁,加入前一个节点的监控判断,如果前一个节点的锁还没有释放,就使用CountDownLatch计数器阻塞程序,减少自旋检查,当监控到前一个节点的锁释放,则当前节点获取到锁,开始执行业务逻辑
- 修改扣减库存业务实现方法为监控阻塞的方式
@Autowired
ZookeeperClientBlockWatch zookeeperClientBlockWatch;
public void checkAndReduceStock() {
//加锁
String lock = zookeeperClientBlockWatch.lock("lock");
//检查锁
zookeeperClientBlockWatch.checkLock(lock);
// 查询库存
WmsStock wmsStock = baseMapper.selectById(1L);
// 验证库存大于0再扣减库存
if (wmsStock != null && wmsStock.getStockQuantity() > 0) {
wmsStock.setStockQuantity(wmsStock.getStockQuantity() - 1);
baseMapper.updateById(wmsStock);
}
// 释放锁
zookeeperClientBlockWatch.unlock(lock);
}
- 将扣减库存恢复为10000,重新启动7000,7001,7002服务,使用jmeter压测工具再次压测
- 库存扣减为了0
- jmeter压测结果:平均访问时间441ms,请求访问吞吐量为每秒223
- 从优化结果来看,使用带监控的阻塞锁吞吐量更高,平均访问时间更小,性能比自旋加锁的方式性能更优。
- 使用ThreadLocal优化分布式锁,将zookeeper的Watcher监听来实现阻塞锁优化为可重入分布式锁
- 优化代码
package com.ht.atp.plat.util; import org.apache.commons.lang3.StringUtils; import org.apache.zookeeper.*; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.util.List; import java.util.concurrent.CountDownLatch; @Component public class ZookeeperClientBlockWatchReentrant { /** * zookeeper连接地址 */ private static final String connectString = "192.168.110.88:2181"; /** * 分布式锁根路径 */ private static final String ROOT_PATH = "/distributed"; /** * zookeeper客户端 */ private ZooKeeper zooKeeper; /** * ThreadLocal本地线程 */ private static final ThreadLocal<Integer> THREAD_LOCAL = new ThreadLocal<>(); /** * 初始化zookeeper客户端 */ @PostConstruct public void init() { try { // 连接zookeeper服务器 this.zooKeeper = new ZooKeeper(connectString, 30000, event -> System.out.println("获取链接成功!!")); // 创建分布式锁根节点 if (this.zooKeeper.exists(ROOT_PATH, false) == null) { this.zooKeeper.create(ROOT_PATH, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch (Exception e) { System.out.println("获取链接失败!"); e.printStackTrace(); } } /** * 销毁zookeeper客户端 */ @PreDestroy public void destroy() { try { if (zooKeeper != null) { zooKeeper.close(); } } catch (InterruptedException e) { e.printStackTrace(); } } /** * 创建锁 * * @param lockName */ public String lock(String lockName) { try { if (THREAD_LOCAL.get() == null || THREAD_LOCAL.get() == 0){ String realLock = zooKeeper.create(ROOT_PATH + "/" + lockName + "-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); return realLock; } } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("----------加锁成功------------"); return null; } /** * 检查锁 * * @param lockName */ public void checkLock(String lockName) { Integer flag = THREAD_LOCAL.get(); if (flag != null && flag > 0) { THREAD_LOCAL.set(flag + 1); return; } try { String preNode = getPreNode(lockName); // 如果该节点没有前一个节点,说明该节点时最小节点,放行执行业务逻辑 if (!StringUtils.isEmpty(preNode)) { CountDownLatch countDownLatch = new CountDownLatch(1); if (this.zooKeeper.exists(ROOT_PATH + "/" + preNode, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println("监控:"+ watchedEvent.getPath()); countDownLatch.countDown(); } }) == null) { THREAD_LOCAL.set(1); return; } // 阻塞,减少自旋检查 countDownLatch.await(); } THREAD_LOCAL.set(1); return; } catch (Exception e) { e.printStackTrace(); // 重新检查。是否获取到锁 try { Thread.sleep(20); } catch (InterruptedException ex) { ex.printStackTrace(); } checkLock(lockName); } } /** * 获取指定节点的前节点 * * @param path * @return */ private String getPreNode(String path) { try { // 获取当前节点的序列化号 Long curSerial = Long.valueOf(StringUtils.substringAfterLast(path, "-")); // 获取根路径下的所有序列化子节点 List<String> nodes = this.zooKeeper.getChildren(ROOT_PATH, false); // 判空 if (CollectionUtils.isEmpty(nodes)) { return null; } // 获取前一个节点 Long flag = 0L; String preNode = null; for (String node : nodes) { // 获取每个节点的序列化号 Long serial = Long.valueOf(StringUtils.substringAfterLast(node, "-")); if (serial < curSerial && serial > flag) { flag = serial; preNode = node; } } return preNode; } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } return null; } /** * 解锁 * * @param lockName */ public void unlock(String lockName) { try { THREAD_LOCAL.set(THREAD_LOCAL.get() - 1); if (THREAD_LOCAL.get() == 0) { this.zooKeeper.delete(lockName, 0); THREAD_LOCAL.remove(); } } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } System.out.println("----------解锁成功------------"); } }
- 使用ThreadLocal存储当前线程的操作
- 加锁:先检测本地线程是否存在或者值是否为0,如果不存在或者为0,则创建锁,避免重复创建
- 检查加锁:如果值大于0,代表可重入,值加1;如果不存在或者等于0,将值设置为1,代表第一次获取到锁
- 解锁:获取线程的值,并减少1,如果减少后的值变为0,代表锁已经不在占用,此时才释放锁,并且将当前线程的值存储移除;如果减少的值还是大于0,代表此时锁还在占用
- 扣减库存业务代码
@Autowired
ZookeeperClientBlockWatchReentrant zookeeperClientBlockWatchReentrant;
public void checkAndReduceStock() {
//加锁
String lock = zookeeperClientBlockWatchReentrant.lock("lock");
//检查锁
zookeeperClientBlockWatchReentrant.checkLock(lock);
// 查询库存
WmsStock wmsStock = baseMapper.selectById(1L);
// 验证库存大于0再扣减库存
if (wmsStock != null && wmsStock.getStockQuantity() > 0) {
wmsStock.setStockQuantity(wmsStock.getStockQuantity() - 1);
baseMapper.updateById(wmsStock);
}
// 释放锁
zookeeperClientBlockWatchReentrant.unlock(lock);
}
- 将扣减库存恢复为10000,重新启动7000,7001,7002服务,使用jmeter压测工具再次压测
- 库存扣减为0
- jmeter压测结果:平均访问时间426ms,吞吐量每秒231
- 使用zookeeper的Curator工具包实现分布式锁
- 引入pom依赖
<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-recipes --> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>5.1.0</version> </dependency>
- 创建CuratorFramework的bean工具类
package com.ht.atp.plat.config; import lombok.extern.slf4j.Slf4j; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.framework.api.CuratorEventType; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.WatchedEvent; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * CuratorFramework工具类配置 */ @Slf4j @Configuration public class ZookeeperConfig { @Bean public CuratorFramework curatorFramework() { // ExponentialBackoffRetry是种重连策略,每次重连的间隔会越来越长,1000毫秒是初始化的间隔时间,3代表尝试重连次数。 ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3); // 创建客户端 CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("192.168.110.88:2181", retryPolicy); // 添加watched 监听器 curatorFramework.getCuratorListenable().addListener((CuratorFramework client, CuratorEvent event) -> { CuratorEventType type = event.getType(); if (type == CuratorEventType.WATCHED) { WatchedEvent watchedEvent = event.getWatchedEvent(); String path = watchedEvent.getPath(); log.info(watchedEvent.getType() + " ----------------------------> " + path); // 重新设置该节点监听 if (null != path) { client.checkExists().watched().forPath(path); } } }); // 启动客户端 curatorFramework.start(); return curatorFramework; } }
- 使用可重入锁,扣减库存业务方法
@Autowired private CuratorFramework curatorFramework; public void checkAndReduceStock() throws Exception { //加锁 InterProcessMutex mutex = new InterProcessMutex(curatorFramework, "/curator/lock"); try { // 加锁 mutex.acquire(); // 查询库存 WmsStock wmsStock = baseMapper.selectById(1L); // 验证库存大于0再扣减库存 if (wmsStock != null && wmsStock.getStockQuantity() > 0) { wmsStock.setStockQuantity(wmsStock.getStockQuantity() - 1); baseMapper.updateById(wmsStock); } this.testSub(mutex); } catch (Exception e) { e.printStackTrace(); } finally { // 释放锁 mutex.release(); } } public void testSub(InterProcessMutex mutex) throws Exception { try { mutex.acquire(); System.out.println("测试可重入锁。。。。"); } catch (Exception e) { e.printStackTrace(); }finally { // 释放锁 mutex.release(); } }
- 将扣减库存恢复为10000,重新启动7000,7001,7002服务,使用jmeter压测工具再次压测
- 库存扣减为0
- jmeter压测结果:平均访问时间497,吞吐量每秒198
- 能够解决并发访问“超卖问题”文章来源:https://www.toymoban.com/news/detail-736483.html
结语
关于使用zookeeper分布式锁解决“超卖”问题的内容到这里就结束了,我们下期见。。。。。。文章来源地址https://www.toymoban.com/news/detail-736483.html
到了这里,关于(五)库存超卖案例实战——使用zookeeper分布式锁解决“超卖”问题的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!