并发编程之可重入锁ReentrantLock

这篇具有很好参考价值的文章主要介绍了并发编程之可重入锁ReentrantLock。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

前言

大家都知道在并发编程中一般会用到多线程技术,多线程技术可以大大增加系统QPS/TPS。但是在一些特殊的业务场景下我们需要限制线程的并发数目,比如秒杀系统、多种商品金额叠加运算等等都是需要限制线程数量。特别是在分布式微服务架构,多线程同步问题尤为明显。一般在实际的开发中我们会使用同步和加锁的方式对线程进行处理,比如synchronized修饰代码块和方法达到同步的目的。但是synchronized默认非公平锁且没有中断和超时功能,所以在实际开发中我们推荐使用可重入锁ReentrantLock。

ReentrantLock原理

可重入锁ReentrantLock内部继承AQS抽象同步队列,并由AQS中的CLH双向FIFO阻塞队列和STATE状态保证锁的同步。ReentrantLock在初始化可以显示指定公平与非公平锁,默认是非公平锁。是否公平锁体现在公平锁在STATE == 0时唤醒CLH阻塞队列头部线程,非公平锁则是CLH队列中的线程与其他线程进行争夺资源。另外ReentrantLock必须显示使用Lock()、unLock()获取与释放锁,并提供中断和超时获取锁等方法,功能和灵活度远远高于synchronized。
并发编程之可重入锁ReentrantLock

ReentrantLock VS Synchronized

1、用法不同,synchronized修饰方法、静态方法和代码块,ReentrantLock只用用在代码块中;
2、锁性质不同,Synchronized是非公平锁,ReentrantLock默认非公平锁,但可以显示指定公平与非公平锁;
3、获取锁方式不同,Synchronized是底层自动加解锁,ReentrantLock必须用代码加解锁;
4、中断方式不同,Synchronized获取锁不能中断,ReentrantLock可以中断解决死锁问题;
5、实现方式不同,Synchronized 通过JVM监视器Monitor实现,通过增加monitorenter 和 monitorexit指令对代码块同步;ReentrantLock则是通过AQS抽象同步队列的API进行同步。

源码解析

ReentrantLock同步机制

进入package java.util.concurrent.locks 下查看ReentrantLock源码:

//reentrantlock 内部类继承aqs,并显示覆写一些方法
abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = -5179523762034025860L;

    /**
     * Performs {@link Lock#lock}. The main reason for subclassing
     * is to allow fast path for nonfair version.
     */
    abstract void lock();

    /**
     * 非公平尝试获取锁
     */
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }


    //释放锁
    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }
}

如上源码所示,ReentrantLock 内部类 Sync 继承AQS抽象同步队列,并覆写了尝试获取锁、获取锁、释放锁的方法。从这里可以看出ReentrantLock是通过调用AQS的API实现同步功能。

继续查看源码:

/**
 * 非公平锁同步对象
 */
static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;

    /**
     * Performs lock.  Try immediate barge, backing up to normal
     * acquire on failure.
     */
    final void lock() {
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}

/**
 * 公平锁同步对象
 */
static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;

    final void lock() {
        acquire(1);
    }

    /**
     * Fair version of tryAcquire.  Don't grant access unless
     * recursive call or no waiters or is first.
     */
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}

如上源码所示,ReentrantLock有公平锁与非公平锁之分。非公平锁与公平锁都是覆写了tryAcquire()尝试获取锁的方法,并都是调用AQS获取锁acquire()方法。唯一不同的是公平锁在尝试获取锁的时候会调用hasQueuedPredecessors()方法判断线程是否在CLH队列中,非公平则没有这个逻辑直接是 CAS设置 STATE值、标识独占线程和重入验证。

当然ReentrantLock默认是非公平锁,也是可以通过构造方法指定锁类型:

/**
 * Creates an instance of {@code ReentrantLock}.
 * This is equivalent to using {@code ReentrantLock(false)}.
 */
public ReentrantLock() {
    sync = new NonfairSync();
}

/**
 * Creates an instance of {@code ReentrantLock} with the
 * given fairness policy.
 *
 * @param fair {@code true} if this lock should use a fair ordering policy
 */
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

ReentrantLock可重入机制

可重入机制就是同一个线程可以多次进入受限同步资源。ReentrantLock当然是可重入的,我们进入源码:

//非公平锁对象尝试获取锁
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

//公平锁对象尝试获取锁
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

如上源码所示无论是用的公平锁还是非公平锁,在STETE == 0时都会用 CAS compareAndSetState(0, acquires)方法改变STATE值,然后调用setExclusiveOwnerThread(current)方法设置独占线程;如果STATE != 0说明资源已经被占用,此时会在getExclusiveOwnerThread()方法会验证是否是独占线程,如果current == getExclusiveOwnerThread() 当前线程就是独占资源的线程,那么该线程直接获取到锁进入资源。

ReentrantLock可中断机制

可中断机制就是线程在阻塞队列中等待过程中,我们可以调用lockInterruptibly() 中断方法打断线程的等待来防止死锁的发生。
我们继续查看中断机制的源码:

//中断方法
public void lockInterruptibly() throws InterruptedException {
    sync.acquireInterruptibly(1);
}
//aqs中断方法
public final void acquireInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    //尝试获取锁
    if (!tryAcquire(arg))
        //获取不到表示在阻塞队列,则进行中断逻辑
        doAcquireInterruptibly(arg);
}

如上源码所示,当程序主动调用lockInterruptibly()方法是调用的AQS获取中断机制。首先会先尝试获取锁,如果能够获取到锁则不进入中断逻辑,如果没有获取到锁则进入中断方法。

继续查看中断源码:

//aqs中断方法
private void doAcquireInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            //尝试获取锁
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            //获取失败抛出中断异常
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

如源码所示,中断机制会自旋获取锁,如果获取到锁则不再执行中断机制;如果没有获取到锁则抛出 InterruptedException() 异常,程序最终如果没有获取到锁会调用 cancelAcquire(node)取消获取锁,从而避免无限等待使程序死锁的发生。

ReentrantLock超时机制

超时机制就是获取锁的方法中传入超时时间,如果在这个时间后还没有获取到锁则取消获取锁并返回false标识。

老规矩查看源码:

//tryLock 带有超时时间的尝试获取锁
public boolean tryLock(long timeout, TimeUnit unit)
        throws InterruptedException {
    return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}

//底层调用aqs获取锁方法
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquire(arg) ||
        doAcquireNanos(arg, nanosTimeout);
}

如源码所示,tryLock(long timeout, TimeUnit unit)底层是调用AQS tryAcquireNanos(int arg, long nanosTimeout)方法。在调用时会先尝试获取锁,没有获取到则进入超时获取锁逻辑。

继续查看超时获取锁源码:

//AQS超时获取锁源码
private boolean doAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            //获取当前节点的前节点
            final Node p = node.predecessor();
            //前节点为头结点且获取锁成功
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L)
                //超时返回为未获取到锁标识
                return false;
            //未获取到锁且线程应该阻塞且超时时间大于1000ns会睡眠nanosTimeout
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            //取消获取锁
            cancelAcquire(node);
    }
}

如源码所示 如果 p == head && tryAcquire(arg) 当前线程节点的前驱节点是头部节点且当前线程尝试获取锁成功,会将当前节点设置为头部节点且直接返回获取到锁标识。如果不是头部节点或没有获取到锁则会判断是否获取时间超时,如果超时返回false结束流程,否则shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold 节点需要阻塞且超时时间大于1000纳秒则 LockSupport.parkNanos(this, nanosTimeout) 将当前线程睡眠nanosTimeout 纳秒,否则当前线程自旋获取锁。

cancelAcquire(node) 如果没有获取到锁或者超时最终都会直接取消获取锁机制 。

条件变量Condition

条件变量就是线程在执行的过程中会可以调用await()阻塞线程、signal()唤醒线程。就相当于休息室,调用await就是让当前线程等待,signal就是让线程继续执行。值得注意的是在await()阻塞线程时候会释放锁,signal()则是唤醒最先调用await()的线程,并且我们可以定义多个condition条件。

查看源码:

//ReentrantLock 中新建一个条件变量
public Condition newCondition() {
    return sync.newCondition();
}

//调用aqs ConditionObject实例化一个条件对象
final ConditionObject newCondition() {
    return new ConditionObject();
}

如源码所示ReentrantLock condition还是用的AQS的ConditionObject。

我们继续分析ConditionObject 中的核心方法await()、signal()方法:

//aqs ConditionObject  await方法
public final void await() throws InterruptedException {
    //线程如果中断则抛出中断异常
    if (Thread.interrupted())
        throw new InterruptedException();
    //将当前节点加入等待队列    
    Node node = addConditionWaiter();
    //释放同步队列节点
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    //判断是否在同步队列,如果不在同步队列则阻塞线程
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

如源码所示,我们在线程执行过程中调用 Condition await()方法会先检查线程是否中断,没有中断会将当前线程加入等待队列,释放同步队列并保存STATE状态。然后会判断是节点是否在同步队列,不在同步队列会执行LockSupport.park(this)方法将线程阻塞。

当然在线程阻塞过程中会自旋验证 (interruptMode = checkInterruptWhileWaiting(node)) != 0 线程是否中断,是否被Condition signal()方法唤醒。

我们继续分析signal()源码:

// condition 唤醒方法
public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        //唤醒等待队列第一个线程
        doSignal(first);
}

private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

如源码所示,signal()唤醒方法默认唤醒条件等待队列中第一个线程。transferForSignal(first)则是将节点从条件队列移动到同步队列。

继续查看移动节点源码transferForSignal:

final boolean transferForSignal(Node node) {
    /*
     * cas 修改 Node.CONDITION == 0,如果修改失败标识节点已经取消
     */
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    /*
    将node 移动到同步队列并返回前驱节点
    */
    Node p = enq(node);
    int ws = p.waitStatus;
    //如果前驱节点 waitStatus > 0 或者 cas修改p节点为唤醒状态失败
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        //强制唤醒线程
        LockSupport.unpark(node.thread);
    return true;
}
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

如上源码所示,enq(node)方法会将等待队列的第一个节点移动到同步队列,并会返回节点的前驱节点。然后 ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL) 表示 节点的前驱节点取消或者cas设置前驱节点waitStatus == SIGNAL失败 则 执行LockSupport.unpark(node.thread)方法强制唤醒节点。

这里有个很重要的点就是同步队列节点的 waitStatus等待状态,如果是SIGNAL,则表示后续节点是一个阻塞节点需要唤醒的节点,当当前节点执行完成后会主动唤醒后续节点。这里也表明了conditon signal()方法唤醒线程并不会立即执行,而是要当前线程执行完成才会执行唤醒的线程。

写在最后

今天的博文我们讲述了ReentrantLock与Synchronized的区别联系,也解析了ReentrantLock可重入、可中断、超时机制以及conditon变量的源码。通过源码我们可以知道ReentrantLock 是通过AQS保证同步,其他的中断机制、超时机制、以及condition变量也都是基于AQS中的相应底层代码。在实际的开发过程中,我们可以根据自身业务场景选择应用ReentrantLock方法,从而增加系统的健壮性和可维护性。文章来源地址https://www.toymoban.com/news/detail-425168.html

到了这里,关于并发编程之可重入锁ReentrantLock的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Redis分布式可重入锁实现方案

    在单进程环境下,要保证一个代码块的同步执行,直接用 synchronized 或 ReetrantLock 即可。在分布式环境下,要保证多个节点的线程对代码块的同步访问,就必须要用到分布式锁方案。 分布式锁实现方案有很多,有基于关系型数据库行锁实现的;有基于ZooKeeper临时顺序节

    2024年02月19日
    浏览(43)
  • 【Java | 多线程】可重入锁的概念以及示例

    可重入锁(又名递归锁)是一种特殊类型的锁,它允许 同一个线程在获取锁后再次进入该锁保护的代码块或方法,而不需要重新获取锁 。 说白了,可重入锁的特点就是同一个线程可以多次获取同一个锁,而不会因为之前已经获取过锁而阻塞。 可重入锁的一个优点是可以一定

    2024年04月24日
    浏览(38)
  • ReentrantLock原理--非公平锁、可重入、可打断性

    先从构造器开始看,默认为非公平锁实现 NonfairSync 继承自 AQS 没有竞争时 第一个竞争出现时 Thread-1 执行了 CAS 尝试将 state 由 0 改为 1,结果失败 进入 tryAcquire 逻辑,这时 state 已经是1,结果仍然失败 接下来进入 addWaiter 逻辑,构造 Node 队列 图中黄色三角表示该 Node 的 waitSt

    2023年04月08日
    浏览(31)
  • 【redis】redis分布式锁(二)可重入锁+设计模式

    上一篇链接: 【redis】redis分布式锁(一)手写分布式锁1.0~6.0 隐式锁(即synchronized使用的锁)默认是可重入锁 synchronized的重入实现机理 显式锁也有ReentrantLock这样的可重入锁 结论: 下边将使用lua脚本的方式,把可重入锁的案例语句原子化 V1.0版本 蓝色部分是重复的,可

    2024年02月03日
    浏览(42)
  • 分布式锁设计选型 不可重入锁建议使用ZooKeeper来实现 可重入锁建议使用Redis来实现 分布式锁:ZooKeeper不可重入锁 Java优化建议

    在设计分布式锁时,需要考虑业务场景和业务需求,以保证锁的正确性和可用性。 例如,在一个电商系统中,每个商品都有一个库存量。为了避免多个用户同时购买同一件商品导致库存出现不一致的情况,可以为每个商品设置一个分布式锁,确保同一时间只能有一个用户购买

    2024年02月08日
    浏览(49)
  • redis — redis cluster集群模式下如何实现批量可重入锁?

    一、redis cluster 集群版 在Redis 3.0版本以后,Redis发布了Redis Cluster。该集群主要支持搞并发和海量数据处理等优势,当 Redis 在集群模式下运行时,它处理数据存储的方式与作为单个实例运行时不同。这是因为它应该准备好跨多个节点分发数据,从而实现水平可扩展性。具体能力表

    2024年01月21日
    浏览(46)
  • 【分布式锁】06-Zookeeper实现分布式锁:可重入锁源码分析

    前言 前面已经讲解了Redis的客户端Redission是怎么实现分布式锁的,大多都深入到源码级别。 在分布式系统中,常见的分布式锁实现方案还有Zookeeper,接下来会深入研究Zookeeper是如何来实现分布式锁的。 Zookeeper初识 文件系统 Zookeeper维护一个类似文件系统的数据结构 image.png 每

    2024年02月22日
    浏览(47)
  • JavaEE 初阶篇-深入了解 CAS 机制与12种锁的特征(如乐观锁和悲观锁、轻量级锁与重量级锁、自旋锁与挂起等待锁、可重入锁与不可重入锁等等)

    🔥博客主页: 【 小扳_-CSDN博客】 ❤感谢大家点赞👍收藏⭐评论✍ 文章目录         1.0 乐观锁与悲观锁概述         1.1 悲观锁(Pessimistic Locking)         1.2 乐观锁(Optimistic Locking)         1.3 区别与适用场景         2.0 轻量级锁与重量级锁概述         2.1 真正加

    2024年04月16日
    浏览(35)
  • 互斥场景重入锁处理方案

    处理方案一:map+超时重入锁数据结构( 注:该方案并发时可以获取到锁进行操作 ) 核心逻辑模拟--加超时时间 并发问题结果数据 问题 问题:锁对象创建并发问题 处理方案二:map+不超时重入锁数据结构( 注:该方案并发时可以获取到锁进行操作 ) 核心逻辑模拟--不加超时时间 并

    2024年02月10日
    浏览(40)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包