(五)库存超卖案例实战——使用zookeeper分布式锁解决“超卖”问题

这篇具有很好参考价值的文章主要介绍了(五)库存超卖案例实战——使用zookeeper分布式锁解决“超卖”问题。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

前言

本节内容使用zookeeper实现分布式锁,完成并发访问“超卖”问题的解决。相对于redis分布式锁,zookeeper能够保证足够的安全性。关于zookeeper的安装内容这里不做介绍,开始本节内容之前先自行安装好zookeeper中间键服务。这里我们利用创建zookeeper路径节点的唯一性实现分布式锁。并同时演示如何使用Curator工具包,完成分布式锁。

正文

  • 在项目中添加zookeeper的pom依赖
<dependency>
	<groupId>org.apache.zookeeper</groupId>
	<artifactId>zookeeper</artifactId>
	<version>3.7.2</version>
</dependency>
  • 创建zookeeper客户端工具,实现加锁和解锁方法 

- 创建ZookeeperClient客户端工具

package com.ht.atp.plat.util;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;


@Component
public class ZookeeperClient {
    /**
     * zookeeper连接地址
     */
    private static final String connectString = "192.168.110.88:2181";

    /**
     * 分布式锁根路径
     */
    private static final String ROOT_PATH = "/distributed";

    /**
     * zookeeper客户端
     */
    private ZooKeeper zooKeeper;

    /**
     * 初始化zookeeper客户端
     */
    @PostConstruct
    public void init() {
        try {
            // 连接zookeeper服务器
            this.zooKeeper = new ZooKeeper(connectString, 30000, event -> System.out.println("获取链接成功!!"));
            // 创建分布式锁根节点
            if (this.zooKeeper.exists(ROOT_PATH, false) == null) {
                this.zooKeeper.create(ROOT_PATH, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        } catch (Exception e) {
            System.out.println("获取链接失败!");
            e.printStackTrace();
        }
    }

    /**
     * 销毁zookeeper客户端
     */
    @PreDestroy
    public void destroy() {
        try {
            if (zooKeeper != null) {
                zooKeeper.close();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 加锁
     *
     * @param lockName
     */
    public void lock(String lockName) {
        try {
            zooKeeper.create(ROOT_PATH + "/" + lockName, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        } catch (Exception e) {
            // 重试
            try {
                Thread.sleep(200);
                lock(lockName);
            } catch (InterruptedException ex) {
                ex.printStackTrace();
            }
        }
        System.out.println("----------加锁成功------------");
    }

    /**
     * 解锁
     *
     * @param lockName
     */
    public void unlock(String lockName) {
        try {
            this.zooKeeper.delete(ROOT_PATH + "/" + lockName, 0);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
        System.out.println("----------解锁成功------------");
    }
}

- 使用临时节点EPHEMERAL加锁,这里使用临时节点是方便锁的自动释放,避免发生死锁问题

(五)库存超卖案例实战——使用zookeeper分布式锁解决“超卖”问题,ATP应用测试平台,# springboot,# 分布式锁,spring boot

- 业务执行完成,删除临时节点,解锁

(五)库存超卖案例实战——使用zookeeper分布式锁解决“超卖”问题,ATP应用测试平台,# springboot,# 分布式锁,spring boot

  • 实现“超卖”的业务方法,使用自定义的zookeeper工具类加锁
    @Autowired
    ZookeeperClient zookeeperClient;

    @Override
    public void checkAndReduceStock() {
        zookeeperClient.lock("lock");

        // 查询库存
        WmsStock wmsStock = baseMapper.selectById(1L);
        // 验证库存大于0再扣减库存
        if (wmsStock != null && wmsStock.getStockQuantity() > 0) {
            wmsStock.setStockQuantity(wmsStock.getStockQuantity() - 1);
            baseMapper.updateById(wmsStock);
        }

        // 释放锁
        zookeeperClient.unlock("lock");
    }
  • 将数据库库存表的库存恢复为10000,分别启动7000,7001,7002服务

(五)库存超卖案例实战——使用zookeeper分布式锁解决“超卖”问题,ATP应用测试平台,# springboot,# 分布式锁,spring boot

(五)库存超卖案例实战——使用zookeeper分布式锁解决“超卖”问题,ATP应用测试平台,# springboot,# 分布式锁,spring boot

  •  启动jmeter压测工具,压测库存扣减接口,查看结果

- 库存扣减为0

(五)库存超卖案例实战——使用zookeeper分布式锁解决“超卖”问题,ATP应用测试平台,# springboot,# 分布式锁,spring boot

- jmeter压测结果,平均访问时间502ms,请求吞吐量为每秒161

(五)库存超卖案例实战——使用zookeeper分布式锁解决“超卖”问题,ATP应用测试平台,# springboot,# 分布式锁,spring boot

- 不存在并发“超卖问题”,但是接口访问吞吐量较低。由于在加锁过程中,获取不到锁,会无限自旋去获取锁,导致性能下降。

  •  优化分布式锁,使用zk的临时序列化节点实现分布式锁,避免锁的自旋操作

- 优化代码

package com.ht.atp.plat.util;

import org.apache.commons.lang3.StringUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.List;

@Component
public class ZookeeperClientNoBlock {
    /**
     * zookeeper连接地址
     */
    private static final String connectString = "192.168.110.88:2181";

    /**
     * 分布式锁根路径
     */
    private static final String ROOT_PATH = "/distributed";

    /**
     * zookeeper客户端
     */
    private ZooKeeper zooKeeper;

    /**
     * 初始化zookeeper客户端
     */
    @PostConstruct
    public void init() {
        try {
            // 连接zookeeper服务器
            this.zooKeeper = new ZooKeeper(connectString, 30000, event -> System.out.println("获取链接成功!!"));
            // 创建分布式锁根节点
            if (this.zooKeeper.exists(ROOT_PATH, false) == null) {
                this.zooKeeper.create(ROOT_PATH, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        } catch (Exception e) {
            System.out.println("获取链接失败!");
            e.printStackTrace();
        }
    }

    /**
     * 销毁zookeeper客户端
     */
    @PreDestroy
    public void destroy() {
        try {
            if (zooKeeper != null) {
                zooKeeper.close();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 创建锁
     *
     * @param lockName
     */
    public String lock(String lockName) {
        try {
            String realLock = zooKeeper.create(ROOT_PATH + "/" + lockName + "-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            return realLock;
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("----------加锁成功------------");
        return null;
    }

    /**
     * 检查锁
     *
     * @param lockName
     */
    public void checkLock(String lockName) {
        String preNode = getPreNode(lockName);
        // 如果该节点没有前一个节点,说明该节点时最小节点,放行执行业务逻辑
        if (StringUtils.isEmpty(preNode)) {
            return;
        }
        // 重新检查。是否获取到锁
        try {

            Thread.sleep(20);
        } catch (InterruptedException ex) {
            ex.printStackTrace();
        }
        checkLock(lockName);
    }

    /**
     * 获取指定节点的前节点
     *
     * @param path
     * @return
     */
    private String getPreNode(String path) {
        try {
            // 获取当前节点的序列化号
            Long curSerial = Long.valueOf(StringUtils.substringAfterLast(path, "-"));
            // 获取根路径下的所有序列化子节点
            List<String> nodes = this.zooKeeper.getChildren(ROOT_PATH, false);

            // 判空
            if (CollectionUtils.isEmpty(nodes)) {
                return null;
            }

            // 获取前一个节点
            Long flag = 0L;
            String preNode = null;
            for (String node : nodes) {
                // 获取每个节点的序列化号
                Long serial = Long.valueOf(StringUtils.substringAfterLast(node, "-"));
                if (serial < curSerial && serial > flag) {
                    flag = serial;
                    preNode = node;
                }
            }

            return preNode;
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 解锁
     *
     * @param lockName
     */
    public void unlock(String lockName) {
        try {
            this.zooKeeper.delete(lockName, 0);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
        System.out.println("----------解锁成功------------");
    }
}

- 使用临时序列化节点EPHEMERAL_SEQUENTIAL加锁,保证每个节点都可以加锁成功,避免自旋操作

(五)库存超卖案例实战——使用zookeeper分布式锁解决“超卖”问题,ATP应用测试平台,# springboot,# 分布式锁,spring boot

- 通过判断当前节点是否是第一个节点,如果是第一个节点,才可执行后续的业务

(五)库存超卖案例实战——使用zookeeper分布式锁解决“超卖”问题,ATP应用测试平台,# springboot,# 分布式锁,spring boot

- 解锁操作

(五)库存超卖案例实战——使用zookeeper分布式锁解决“超卖”问题,ATP应用测试平台,# springboot,# 分布式锁,spring boot

  • 扣减库存业务实现方法
public void checkAndReduceStock() {
        //加锁
        String lock = zookeeperClientNoBlock.lock("lock");
        //检查锁
        zookeeperClientNoBlock.checkLock(lock);

        // 查询库存
        WmsStock wmsStock = baseMapper.selectById(1L);
        // 验证库存大于0再扣减库存
        if (wmsStock != null && wmsStock.getStockQuantity() > 0) {
            wmsStock.setStockQuantity(wmsStock.getStockQuantity() - 1);
            baseMapper.updateById(wmsStock);
        }

        // 释放锁
        zookeeperClientNoBlock.unlock(lock);
    }
  • 将扣减库存恢复为10000,重新启动7000,7001,7002服务,使用jmeter压测工具再次压测

- 库存扣减为0

(五)库存超卖案例实战——使用zookeeper分布式锁解决“超卖”问题,ATP应用测试平台,# springboot,# 分布式锁,spring boot

- 压测结果:平均访问时间1597ms,请求访问吞吐量为每秒62

(五)库存超卖案例实战——使用zookeeper分布式锁解决“超卖”问题,ATP应用测试平台,# springboot,# 分布式锁,spring boot

从优化结果来看,此种方式更加耗时,性能更差。将加锁操作改为非自旋操作,虽然加锁不在耗时,但是会自旋判断自己是否是最小的节点,依然存在耗时操作,且逻辑更为复杂。

  • 优化分布式锁,通过使用zookeeper的Watcher监听来实现阻塞锁

- 实现代码

package com.ht.atp.plat.util;

import org.apache.commons.lang3.StringUtils;
import org.apache.zookeeper.*;
import org.apache.zookeeper.proto.WatcherEvent;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.List;
import java.util.concurrent.CountDownLatch;


@Component
public class ZookeeperClientBlockWatch {
    /**
     * zookeeper连接地址
     */
    private static final String connectString = "192.168.110.88:2181";

    /**
     * 分布式锁根路径
     */
    private static final String ROOT_PATH = "/distributed";

    /**
     * zookeeper客户端
     */
    private ZooKeeper zooKeeper;

    /**
     * 初始化zookeeper客户端
     */
    @PostConstruct
    public void init() {
        try {
            // 连接zookeeper服务器
            this.zooKeeper = new ZooKeeper(connectString, 30000, event -> System.out.println("获取链接成功!!"));
            // 创建分布式锁根节点
            if (this.zooKeeper.exists(ROOT_PATH, false) == null) {
                this.zooKeeper.create(ROOT_PATH, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        } catch (Exception e) {
            System.out.println("获取链接失败!");
            e.printStackTrace();
        }
    }

    /**
     * 销毁zookeeper客户端
     */
    @PreDestroy
    public void destroy() {
        try {
            if (zooKeeper != null) {
                zooKeeper.close();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 创建锁
     *
     * @param lockName
     */
    public String lock(String lockName) {
        try {
            String realLock = zooKeeper.create(ROOT_PATH + "/" + lockName + "-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            return realLock;
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("----------加锁成功------------");
        return null;
    }

    /**
     * 检查锁
     *
     * @param lockName
     */
    public void checkLock(String lockName) {
        try {
            String preNode = getPreNode(lockName);
            // 如果该节点没有前一个节点,说明该节点时最小节点,放行执行业务逻辑
            if (!StringUtils.isEmpty(preNode)) {
                CountDownLatch countDownLatch = new CountDownLatch(1);
                if (this.zooKeeper.exists(ROOT_PATH + "/" + preNode, new Watcher() {
                    @Override
                    public void process(WatchedEvent watchedEvent) {
                        System.out.println("监控:"+ watchedEvent.getPath());
                        countDownLatch.countDown();
                    }
                }) == null) {
                    return;
                }
                // 阻塞,减少自旋检查
                countDownLatch.await();
            }
            return;

        } catch (Exception e) {
            e.printStackTrace();
            // 重新检查。是否获取到锁
            try {
                Thread.sleep(20);
            } catch (InterruptedException ex) {
                ex.printStackTrace();
            }
            checkLock(lockName);
        }
    }

    /**
     * 获取指定节点的前节点
     *
     * @param path
     * @return
     */
    private String getPreNode(String path) {
        try {
            // 获取当前节点的序列化号
            Long curSerial = Long.valueOf(StringUtils.substringAfterLast(path, "-"));
            // 获取根路径下的所有序列化子节点
            List<String> nodes = this.zooKeeper.getChildren(ROOT_PATH, false);

            // 判空
            if (CollectionUtils.isEmpty(nodes)) {
                return null;
            }

            // 获取前一个节点
            Long flag = 0L;
            String preNode = null;
            for (String node : nodes) {
                // 获取每个节点的序列化号
                Long serial = Long.valueOf(StringUtils.substringAfterLast(node, "-"));
                if (serial < curSerial && serial > flag) {
                    flag = serial;
                    preNode = node;
                }
            }

            return preNode;
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 解锁
     *
     * @param lockName
     */
    public void unlock(String lockName) {
        try {
            this.zooKeeper.delete(lockName, 0);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
        System.out.println("----------解锁成功------------");
    }
}

- 检查加锁,加入前一个节点的监控判断,如果前一个节点的锁还没有释放,就使用CountDownLatch计数器阻塞程序,减少自旋检查,当监控到前一个节点的锁释放,则当前节点获取到锁,开始执行业务逻辑

(五)库存超卖案例实战——使用zookeeper分布式锁解决“超卖”问题,ATP应用测试平台,# springboot,# 分布式锁,spring boot

  •  修改扣减库存业务实现方法为监控阻塞的方式
    @Autowired
    ZookeeperClientBlockWatch zookeeperClientBlockWatch;

    public void checkAndReduceStock() {
        //加锁
        String lock = zookeeperClientBlockWatch.lock("lock");
        //检查锁
        zookeeperClientBlockWatch.checkLock(lock);

        // 查询库存
        WmsStock wmsStock = baseMapper.selectById(1L);
        // 验证库存大于0再扣减库存
        if (wmsStock != null && wmsStock.getStockQuantity() > 0) {
            wmsStock.setStockQuantity(wmsStock.getStockQuantity() - 1);
            baseMapper.updateById(wmsStock);
        }

        // 释放锁
        zookeeperClientBlockWatch.unlock(lock);
    }
  •  将扣减库存恢复为10000,重新启动7000,7001,7002服务,使用jmeter压测工具再次压测

- 库存扣减为了0

(五)库存超卖案例实战——使用zookeeper分布式锁解决“超卖”问题,ATP应用测试平台,# springboot,# 分布式锁,spring boot

- jmeter压测结果:平均访问时间441ms,请求访问吞吐量为每秒223

(五)库存超卖案例实战——使用zookeeper分布式锁解决“超卖”问题,ATP应用测试平台,# springboot,# 分布式锁,spring boot

从优化结果来看,使用带监控的阻塞锁吞吐量更高,平均访问时间更小,性能比自旋加锁的方式性能更优。

  • 使用ThreadLocal优化分布式锁,将zookeeper的Watcher监听来实现阻塞锁优化为可重入分布式锁

- 优化代码

package com.ht.atp.plat.util;

import org.apache.commons.lang3.StringUtils;
import org.apache.zookeeper.*;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.List;
import java.util.concurrent.CountDownLatch;


@Component
public class ZookeeperClientBlockWatchReentrant {
    /**
     * zookeeper连接地址
     */
    private static final String connectString = "192.168.110.88:2181";

    /**
     * 分布式锁根路径
     */
    private static final String ROOT_PATH = "/distributed";

    /**
     * zookeeper客户端
     */
    private ZooKeeper zooKeeper;

    /**
     * ThreadLocal本地线程
     */
    private static final ThreadLocal<Integer> THREAD_LOCAL = new ThreadLocal<>();

    /**
     * 初始化zookeeper客户端
     */
    @PostConstruct
    public void init() {
        try {
            // 连接zookeeper服务器
            this.zooKeeper = new ZooKeeper(connectString, 30000, event -> System.out.println("获取链接成功!!"));
            // 创建分布式锁根节点
            if (this.zooKeeper.exists(ROOT_PATH, false) == null) {
                this.zooKeeper.create(ROOT_PATH, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        } catch (Exception e) {
            System.out.println("获取链接失败!");
            e.printStackTrace();
        }
    }

    /**
     * 销毁zookeeper客户端
     */
    @PreDestroy
    public void destroy() {
        try {
            if (zooKeeper != null) {
                zooKeeper.close();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 创建锁
     *
     * @param lockName
     */
    public String lock(String lockName) {
        try {
            if (THREAD_LOCAL.get() == null || THREAD_LOCAL.get() == 0){
                String realLock = zooKeeper.create(ROOT_PATH + "/" + lockName + "-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
                return realLock;
            }
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("----------加锁成功------------");
        return null;
    }

    /**
     * 检查锁
     *
     * @param lockName
     */
    public void checkLock(String lockName) {
        Integer flag = THREAD_LOCAL.get();
        if (flag != null && flag > 0) {
            THREAD_LOCAL.set(flag + 1);
            return;
        }
        try {
            String preNode = getPreNode(lockName);
            // 如果该节点没有前一个节点,说明该节点时最小节点,放行执行业务逻辑
            if (!StringUtils.isEmpty(preNode)) {
                CountDownLatch countDownLatch = new CountDownLatch(1);
                if (this.zooKeeper.exists(ROOT_PATH + "/" + preNode, new Watcher() {
                    @Override
                    public void process(WatchedEvent watchedEvent) {
                        System.out.println("监控:"+ watchedEvent.getPath());
                        countDownLatch.countDown();
                    }
                }) == null) {
                    THREAD_LOCAL.set(1);
                    return;
                }
                // 阻塞,减少自旋检查
                countDownLatch.await();
            }
            THREAD_LOCAL.set(1);
            return;

        } catch (Exception e) {
            e.printStackTrace();
            // 重新检查。是否获取到锁
            try {
                Thread.sleep(20);
            } catch (InterruptedException ex) {
                ex.printStackTrace();
            }
            checkLock(lockName);
        }
    }

    /**
     * 获取指定节点的前节点
     *
     * @param path
     * @return
     */
    private String getPreNode(String path) {
        try {
            // 获取当前节点的序列化号
            Long curSerial = Long.valueOf(StringUtils.substringAfterLast(path, "-"));
            // 获取根路径下的所有序列化子节点
            List<String> nodes = this.zooKeeper.getChildren(ROOT_PATH, false);

            // 判空
            if (CollectionUtils.isEmpty(nodes)) {
                return null;
            }

            // 获取前一个节点
            Long flag = 0L;
            String preNode = null;
            for (String node : nodes) {
                // 获取每个节点的序列化号
                Long serial = Long.valueOf(StringUtils.substringAfterLast(node, "-"));
                if (serial < curSerial && serial > flag) {
                    flag = serial;
                    preNode = node;
                }
            }

            return preNode;
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 解锁
     *
     * @param lockName
     */
    public void unlock(String lockName) {
        try {
            THREAD_LOCAL.set(THREAD_LOCAL.get() - 1);
            if (THREAD_LOCAL.get() == 0) {
                this.zooKeeper.delete(lockName, 0);
                THREAD_LOCAL.remove();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
        System.out.println("----------解锁成功------------");
    }
}

- 使用ThreadLocal存储当前线程的操作

(五)库存超卖案例实战——使用zookeeper分布式锁解决“超卖”问题,ATP应用测试平台,# springboot,# 分布式锁,spring boot

- 加锁:先检测本地线程是否存在或者值是否为0,如果不存在或者为0,则创建锁,避免重复创建

(五)库存超卖案例实战——使用zookeeper分布式锁解决“超卖”问题,ATP应用测试平台,# springboot,# 分布式锁,spring boot

- 检查加锁:如果值大于0,代表可重入,值加1;如果不存在或者等于0,将值设置为1,代表第一次获取到锁

(五)库存超卖案例实战——使用zookeeper分布式锁解决“超卖”问题,ATP应用测试平台,# springboot,# 分布式锁,spring boot

- 解锁:获取线程的值,并减少1,如果减少后的值变为0,代表锁已经不在占用,此时才释放锁,并且将当前线程的值存储移除;如果减少的值还是大于0,代表此时锁还在占用

(五)库存超卖案例实战——使用zookeeper分布式锁解决“超卖”问题,ATP应用测试平台,# springboot,# 分布式锁,spring boot

  •  扣减库存业务代码
    @Autowired
    ZookeeperClientBlockWatchReentrant zookeeperClientBlockWatchReentrant;

    public void checkAndReduceStock() {
        //加锁
        String lock = zookeeperClientBlockWatchReentrant.lock("lock");
        //检查锁
        zookeeperClientBlockWatchReentrant.checkLock(lock);

        // 查询库存
        WmsStock wmsStock = baseMapper.selectById(1L);
        // 验证库存大于0再扣减库存
        if (wmsStock != null && wmsStock.getStockQuantity() > 0) {
            wmsStock.setStockQuantity(wmsStock.getStockQuantity() - 1);
            baseMapper.updateById(wmsStock);
        }

        // 释放锁
        zookeeperClientBlockWatchReentrant.unlock(lock);
    }
  • 将扣减库存恢复为10000,重新启动7000,7001,7002服务,使用jmeter压测工具再次压测

- 库存扣减为0

(五)库存超卖案例实战——使用zookeeper分布式锁解决“超卖”问题,ATP应用测试平台,# springboot,# 分布式锁,spring boot

- jmeter压测结果:平均访问时间426ms,吞吐量每秒231

(五)库存超卖案例实战——使用zookeeper分布式锁解决“超卖”问题,ATP应用测试平台,# springboot,# 分布式锁,spring boot

  •  使用zookeeper的Curator工具包实现分布式锁

- 引入pom依赖

<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-recipes -->
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>5.1.0</version>
</dependency>

- 创建CuratorFramework的bean工具类

package com.ht.atp.plat.config;

import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorEventType;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.WatchedEvent;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * CuratorFramework工具类配置
 */
@Slf4j
@Configuration
public class ZookeeperConfig {
 
    @Bean
    public CuratorFramework curatorFramework() {
        // ExponentialBackoffRetry是种重连策略,每次重连的间隔会越来越长,1000毫秒是初始化的间隔时间,3代表尝试重连次数。
        ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
        // 创建客户端
        CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("192.168.110.88:2181", retryPolicy);
        // 添加watched 监听器
        curatorFramework.getCuratorListenable().addListener((CuratorFramework client, CuratorEvent event) -> {
            CuratorEventType type = event.getType();
            if (type == CuratorEventType.WATCHED) {
                WatchedEvent watchedEvent = event.getWatchedEvent();
                String path = watchedEvent.getPath();
                log.info(watchedEvent.getType() + " ----------------------------> " + path);
                // 重新设置该节点监听
                if (null != path) {
                    client.checkExists().watched().forPath(path);
                }
            }
        });
        // 启动客户端
        curatorFramework.start();
        return curatorFramework;
    }
}

- 使用可重入锁,扣减库存业务方法

@Autowired
    private CuratorFramework curatorFramework;

    public void checkAndReduceStock() throws Exception {
        //加锁
        InterProcessMutex mutex = new InterProcessMutex(curatorFramework, "/curator/lock");
        try {
            // 加锁
            mutex.acquire();
            // 查询库存
            WmsStock wmsStock = baseMapper.selectById(1L);
            // 验证库存大于0再扣减库存
            if (wmsStock != null && wmsStock.getStockQuantity() > 0) {
                wmsStock.setStockQuantity(wmsStock.getStockQuantity() - 1);
                baseMapper.updateById(wmsStock);
            }
            this.testSub(mutex);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 释放锁
            mutex.release();
        }
    }

    public void testSub(InterProcessMutex mutex) throws Exception {

        try {
            mutex.acquire();
            System.out.println("测试可重入锁。。。。");
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            // 释放锁
            mutex.release();
        }
    }
  • 将扣减库存恢复为10000,重新启动7000,7001,7002服务,使用jmeter压测工具再次压测 

- 库存扣减为0

(五)库存超卖案例实战——使用zookeeper分布式锁解决“超卖”问题,ATP应用测试平台,# springboot,# 分布式锁,spring boot

- jmeter压测结果:平均访问时间497,吞吐量每秒198

(五)库存超卖案例实战——使用zookeeper分布式锁解决“超卖”问题,ATP应用测试平台,# springboot,# 分布式锁,spring boot

- 能够解决并发访问“超卖问题”

结语

关于使用zookeeper分布式锁解决“超卖”问题的内容到这里就结束了,我们下期见。。。。。。文章来源地址https://www.toymoban.com/news/detail-736483.html

到了这里,关于(五)库存超卖案例实战——使用zookeeper分布式锁解决“超卖”问题的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • zookeeper实现分布式锁-curator,java面试项目经验案例

    //5.如果不是第一个节点,需要监听前一个节点 //用一个临时变量记录当前节点的上一个节点 String previousNode = firstNode; for(String node : children){ if(currentNode.endsWith(node)){ //如果当前节点是node节点 ,那么就监听它的上一个节点 :比如 currentNode 这里是 0003节点 ,那 node就是 0002节点

    2024年04月23日
    浏览(45)
  • Zookeeper实战——分布式锁实现以及原理

    分布式锁是控制分布式系统之间同步访问共享资源的一种方式。分布式锁的实现方式有很多种,比如 Redis 、数据库 、 zookeeper 等。这篇文章主要介绍用 Zookeeper 实现分布式锁。 先说结论: Zookeeper 是基于临时顺序节点以及 Watcher 监听器机制实现分布式锁的 。 (1)ZooKeeper 的每

    2023年04月08日
    浏览(34)
  • 分布式锁原理与实战三:ZooKeeper分布式锁的原理

             目录 ZooKeeper分布式锁的原理 ZooKeeper的每一个节点,都是一个天然的顺序发号器。 ZooKeeper节点的递增有序性,可以确保锁的公平 ZooKeeper的节点监听机制,可以保障占有锁的传递有序而且高效 ZooKeeper的节点监听机制,能避免羊群效应 分布式锁的抢占过程 客户端

    2024年02月08日
    浏览(45)
  • ZooKeeper 实战(五) Curator实现分布式锁

    1.1.分布式锁概念 分布式锁是一种用于实现分布式系统中的同步机制的技术。它允许在多个进程或线程之间实现互斥访问共享资源,以避免并发访问时的数据不一致问题。分布式锁的主要目的是在分布式系统中提供类似于全局锁的效果,以确保在任何时刻只有一个进程或线程

    2024年01月18日
    浏览(39)
  • 【ZooKeeper高手实战】ZAB协议:ZooKeeper分布式一致性的基石

    🌈🌈🌈🌈🌈🌈🌈🌈 欢迎关注公众号(通过文章导读关注:【11来了】),及时收到 AI 前沿项目工具及新技术 的推送 发送 资料 可领取 深入理解 Redis 系列文章结合电商场景讲解 Redis 使用场景 、 中间件系列笔记 和 编程高频电子书 ! 文章导读地址:点击查看文章导读!

    2024年02月03日
    浏览(46)
  • Zookeeper 实战 | Zookeeper 和Spring Cloud相结合解决分布式锁、服务注册与发现、配置管理

    专栏集锦,大佬们可以收藏以备不时之需: Spring Cloud 专栏: Python 专栏: Redis 专栏: TensorFlow 专栏: Logback 专栏: 量子计算: 量子计算 | 解密著名量子算法Shor算法和Grover算法 AI机器学习实战: AI机器学习实战 | 使用 Python 和 scikit-learn 库进行情感分析 AI机器学习 | 基于lib

    2024年02月05日
    浏览(89)
  • 使用ZooKeeper实现分布式锁

    目录 引言 1. ZooKeeper简介 2. 分布式锁实现原理 3. 分布式锁实现步骤 步骤一:创建ZooKeeper客户端 步骤二:创建分布式锁类 步骤三:使用分布式锁 4. 总结 在分布式系统中,实现分布式锁是一项常见的任务,可以用于保证同一时间只有一个客户端可以访问共享资源,从而避免竞

    2024年02月21日
    浏览(49)
  • springboot 使用zookeeper实现分布式队列

    一.添加ZooKeeper依赖:在pom.xml文件中添加ZooKeeper客户端的依赖项。例如,可以使用Apache Curator作为ZooKeeper客户端库: 二.创建ZooKeeper连接:在应用程序的配置文件中,配置ZooKeeper服务器的连接信息。例如,在application.properties文件中添加以下配置:  三.创建分布式队列:使用Z

    2024年02月13日
    浏览(38)
  • springboot 使用zookeeper实现分布式锁

    一.添加ZooKeeper依赖:在pom.xml文件中添加ZooKeeper客户端的依赖项。例如,可以使用Apache Curator作为ZooKeeper客户端库: 二.创建ZooKeeper连接:在应用程序的配置文件中,配置ZooKeeper服务器的连接信息。例如,在application.properties文件中添加以下配置: 三.创建分布式锁:使用ZooKee

    2024年02月13日
    浏览(37)
  • springboot 使用zookeeper实现分布式ID

    添加ZooKeeper依赖:在pom.xml文件中添加ZooKeeper客户端的依赖项。例如,可以使用Apache Curator作为ZooKeeper客户端库: 创建ZooKeeper连接:在应用程序的配置文件中,配置ZooKeeper服务器的连接信息。例如,在application.properties文件中添加以下配置: 创建分布式ID生成器:使用ZooKeeper客

    2024年02月13日
    浏览(40)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包