在分布式环境中,当需要控制对某一资源的不同进程并发访问时就需要使用分布式锁;可以使用 ZooKeeper + Curator 来实现分布式锁,本文主要介绍 Curator 中分布式锁的使用,文中所使用到的软件版本:Java 1.8.0_341、Zookeeper 3.7.1、curator 5.4.0。
1、引入依赖
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>5.4.0</version> </dependency>
2、使用样例
2.1、可重入锁
@Test public void interProcessMutex() throws InterruptedException { CuratorFramework cf = getCuratorFramework(); cf.start(); InterProcessLock lock = new InterProcessMutex(cf, "/test/lock"); int size = 5; CountDownLatch countDownLatch = new CountDownLatch(size); for (int i = 0; i < size; i++) { new Thread(() -> { try { lock.acquire(); //同一线程中可重复获取 lock.acquire(); logger.info(Thread.currentThread().getName() + "获得了锁"); Thread.sleep(1000 * 3); } catch (Exception e) { e.printStackTrace(); } finally { //获取了几次就要释放几次 release(lock); release(lock); logger.info(Thread.currentThread().getName() + "释放了锁"); } countDownLatch.countDown(); }).start(); } countDownLatch.await(); cf.close(); }
2.2、不可重入锁
@Test public void interProcessSemaphoreMutex() throws InterruptedException { CuratorFramework cf = getCuratorFramework(); cf.start(); InterProcessLock lock = new InterProcessSemaphoreMutex(cf, "/test/lock2"); int size = 5; CountDownLatch countDownLatch = new CountDownLatch(size); for (int i = 0; i < size; i++) { new Thread(() -> { try { lock.acquire(); //同一线程中不可重复获取 //lock.acquire(); logger.info(Thread.currentThread().getName() + "获得了锁"); Thread.sleep(1000 * 3); } catch (Exception e) { e.printStackTrace(); } finally { release(lock); logger.info(Thread.currentThread().getName() + "释放了锁"); } countDownLatch.countDown(); }).start(); } countDownLatch.await(); cf.close(); }
2.3、读写锁(可重入)
@Test public void interProcessReadWriteLock() throws InterruptedException { CuratorFramework cf = getCuratorFramework(); InterProcessReadWriteLock lock = new InterProcessReadWriteLock(cf, "/test/lock3"); InterProcessReadWriteLock.ReadLock readLock = lock.readLock(); InterProcessReadWriteLock.WriteLock writeLock = lock.writeLock(); cf.start(); int size = 5; CountDownLatch countDownLatch = new CountDownLatch(size); for (int i = 0; i < size; i++) { new Thread(() -> { try { readLock.acquire(); readLock.acquire(); logger.info(Thread.currentThread().getName() + "获得了读锁"); Thread.sleep(1000 * 2); } catch (Exception e) { e.printStackTrace(); } finally { //获取了几次就要释放几次 release(readLock); release(readLock); logger.info(Thread.currentThread().getName() + "释放了读锁"); } try { writeLock.acquire(); logger.info(Thread.currentThread().getName() + "获得了写锁"); Thread.sleep(1000 * 2); } catch (Exception e) { e.printStackTrace(); } finally { release(writeLock); logger.info(Thread.currentThread().getName() + "释放了写锁"); } countDownLatch.countDown(); }).start(); } countDownLatch.await(); cf.close(); }
2.4、信号量
信号量用于控制对资源同时访问的进程或线程数。
@Test public void interProcessSemaphoreV2() throws InterruptedException { CuratorFramework cf = getCuratorFramework(); InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(cf, "/test/lock4", 3); cf.start(); int size = 5; CountDownLatch countDownLatch = new CountDownLatch(size); for (int i = 0; i < size; i++) { new Thread(() -> { Lease lease = null; try { //获取一个许可 lease = semaphore.acquire(); logger.info(Thread.currentThread().getName() + "获得了许可"); Thread.sleep(1000 * 3); } catch (Exception e) { e.printStackTrace(); } finally { //释放一个许可 semaphore.returnLease(lease); logger.info(Thread.currentThread().getName() + "释放了许可"); } countDownLatch.countDown(); }).start(); } countDownLatch.await(); cf.close(); }
2.5、多个锁作为单个实体管理
InterProcessMultiLock 主要功能是将多个锁合并为一个对象来操作,简化了代码量。
@Test public void InterProcessMultiLock() throws InterruptedException { CuratorFramework cf = getCuratorFramework(); InterProcessLock lock = new InterProcessMutex(cf, "/test/lock"); InterProcessLock lock2 = new InterProcessSemaphoreMutex(cf, "/test/lock2"); InterProcessMultiLock multiLock = new InterProcessMultiLock(Arrays.asList(lock, lock2)); cf.start(); int size = 5; CountDownLatch countDownLatch = new CountDownLatch(size); for (int i = 0; i < size; i++) { new Thread(() -> { try { //相当于 lock.acquire() 和 lock2.acquire() multiLock.acquire(); logger.info(Thread.currentThread().getName() + "获得了锁"); Thread.sleep(1000 * 3); } catch (Exception e) { e.printStackTrace(); } finally { release(multiLock); logger.info(Thread.currentThread().getName() + "释放了锁"); } countDownLatch.countDown(); }).start(); } countDownLatch.await(); cf.close(); }
分布式锁使用样例的完整代码如下:文章来源:https://www.toymoban.com/news/detail-478774.html
package com.abc.demo.general.zookeeper; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.locks.*; import org.apache.curator.retry.ExponentialBackoffRetry; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Arrays; import java.util.concurrent.CountDownLatch; public class CuratorLockCase { private static Logger logger = LoggerFactory.getLogger(CuratorLockCase.class); private static String connectString = "10.49.196.33:2181"; private static int sessionTimeout = 40 * 1000; private static int connectionTimeout = 60 * 1000; /** * 可重入锁 */ @Test public void interProcessMutex() throws InterruptedException { CuratorFramework cf = getCuratorFramework(); cf.start(); InterProcessLock lock = new InterProcessMutex(cf, "/test/lock"); int size = 5; CountDownLatch countDownLatch = new CountDownLatch(size); for (int i = 0; i < size; i++) { new Thread(() -> { try { lock.acquire(); //同一线程中可重复获取 lock.acquire(); logger.info(Thread.currentThread().getName() + "获得了锁"); Thread.sleep(1000 * 3); } catch (Exception e) { e.printStackTrace(); } finally { //获取了几次就要释放几次 release(lock); release(lock); logger.info(Thread.currentThread().getName() + "释放了锁"); } countDownLatch.countDown(); }).start(); } countDownLatch.await(); cf.close(); } /** * 不可重入锁 */ @Test public void interProcessSemaphoreMutex() throws InterruptedException { CuratorFramework cf = getCuratorFramework(); cf.start(); InterProcessLock lock = new InterProcessSemaphoreMutex(cf, "/test/lock2"); int size = 5; CountDownLatch countDownLatch = new CountDownLatch(size); for (int i = 0; i < size; i++) { new Thread(() -> { try { lock.acquire(); //同一线程中不可重复获取 //lock.acquire(); logger.info(Thread.currentThread().getName() + "获得了锁"); Thread.sleep(1000 * 3); } catch (Exception e) { e.printStackTrace(); } finally { release(lock); logger.info(Thread.currentThread().getName() + "释放了锁"); } countDownLatch.countDown(); }).start(); } countDownLatch.await(); cf.close(); } /** * 读写锁(可重入) */ @Test public void interProcessReadWriteLock() throws InterruptedException { CuratorFramework cf = getCuratorFramework(); InterProcessReadWriteLock lock = new InterProcessReadWriteLock(cf, "/test/lock3"); InterProcessReadWriteLock.ReadLock readLock = lock.readLock(); InterProcessReadWriteLock.WriteLock writeLock = lock.writeLock(); cf.start(); int size = 5; CountDownLatch countDownLatch = new CountDownLatch(size); for (int i = 0; i < size; i++) { new Thread(() -> { try { readLock.acquire(); readLock.acquire(); logger.info(Thread.currentThread().getName() + "获得了读锁"); Thread.sleep(1000 * 2); } catch (Exception e) { e.printStackTrace(); } finally { //获取了几次就要释放几次 release(readLock); release(readLock); logger.info(Thread.currentThread().getName() + "释放了读锁"); } try { writeLock.acquire(); logger.info(Thread.currentThread().getName() + "获得了写锁"); Thread.sleep(1000 * 2); } catch (Exception e) { e.printStackTrace(); } finally { release(writeLock); logger.info(Thread.currentThread().getName() + "释放了写锁"); } countDownLatch.countDown(); }).start(); } countDownLatch.await(); cf.close(); } /** * 信号量,用于控制对资源同时访问的进程或线程数 */ @Test public void interProcessSemaphoreV2() throws InterruptedException { CuratorFramework cf = getCuratorFramework(); InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(cf, "/test/lock4", 3); cf.start(); int size = 5; CountDownLatch countDownLatch = new CountDownLatch(size); for (int i = 0; i < size; i++) { new Thread(() -> { Lease lease = null; try { //获取一个许可 lease = semaphore.acquire(); logger.info(Thread.currentThread().getName() + "获得了许可"); Thread.sleep(1000 * 3); } catch (Exception e) { e.printStackTrace(); } finally { //释放一个许可 semaphore.returnLease(lease); logger.info(Thread.currentThread().getName() + "释放了许可"); } countDownLatch.countDown(); }).start(); } countDownLatch.await(); cf.close(); } /** * 多个锁作为单个实体管理 */ @Test public void InterProcessMultiLock() throws InterruptedException { CuratorFramework cf = getCuratorFramework(); InterProcessLock lock = new InterProcessMutex(cf, "/test/lock"); InterProcessLock lock2 = new InterProcessSemaphoreMutex(cf, "/test/lock2"); InterProcessMultiLock multiLock = new InterProcessMultiLock(Arrays.asList(lock, lock2)); cf.start(); int size = 5; CountDownLatch countDownLatch = new CountDownLatch(size); for (int i = 0; i < size; i++) { new Thread(() -> { try { //相当于 lock.acquire() 和 lock2.acquire() multiLock.acquire(); logger.info(Thread.currentThread().getName() + "获得了锁"); Thread.sleep(1000 * 3); } catch (Exception e) { e.printStackTrace(); } finally { release(multiLock); logger.info(Thread.currentThread().getName() + "释放了锁"); } countDownLatch.countDown(); }).start(); } countDownLatch.await(); cf.close(); } private CuratorFramework getCuratorFramework() { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); CuratorFramework cf = CuratorFrameworkFactory.builder() .connectString(connectString) .sessionTimeoutMs(sessionTimeout) .connectionTimeoutMs(connectionTimeout) .retryPolicy(retryPolicy) .build(); return cf; } private void release(InterProcessLock lock) { if (lock != null) { try { lock.release(); } catch (Exception e) { e.printStackTrace(); } } } }
文章来源地址https://www.toymoban.com/news/detail-478774.html
到了这里,关于Zookeeper入门实战(5)-分布式锁的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!