ReentrantLock原理--非公平锁、可重入、可打断性

这篇具有很好参考价值的文章主要介绍了ReentrantLock原理--非公平锁、可重入、可打断性。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

ReentrantLock原理--非公平锁、可重入、可打断性

1.非公平锁实现原理

加锁解锁流程

先从构造器开始看,默认为非公平锁实现

public ReentrantLock() {
 	sync = new NonfairSync();
}

NonfairSync 继承自 AQS

没有竞争时

ReentrantLock原理--非公平锁、可重入、可打断性
第一个竞争出现时
ReentrantLock原理--非公平锁、可重入、可打断性

Thread-1 执行了

  1. CAS 尝试将 state 由 0 改为 1,结果失败

  2. 进入 tryAcquire 逻辑,这时 state 已经是1,结果仍然失败

  3. 接下来进入 addWaiter 逻辑,构造 Node 队列

    • 图中黄色三角表示该 Node 的 waitStatus 状态,其中 0 为默认正常状态

    • Node 的创建是懒惰的

    • 其中第一个 Node 称为 Dummy(哑元)或哨兵,用来占位,并不关联线程

ReentrantLock原理--非公平锁、可重入、可打断性

当前线程进入 acquireQueued 逻辑

  1. acquireQueued 会在一个死循环中不断尝试获得锁,失败后进入 park 阻塞

  2. 如果自己是紧邻着 head(排第二位),那么再次 tryAcquire 尝试获取锁,当然这时 state 仍为 1,失败

  3. 进入 shouldParkAfterFailedAcquire 逻辑,将前驱 node,即 head 的 waitStatus 改为 -1,这次返回 false

    ReentrantLock原理--非公平锁、可重入、可打断性

  4. shouldParkAfterFailedAcquire 执行完毕回到 acquireQueued ,再次 tryAcquire 尝试获取锁,当然这时state 仍为 1,失败

  5. 当再次进入 shouldParkAfterFailedAcquire 时,这时因为其前驱 node 的 waitStatus 已经是 -1,这次返回true

  6. 进入 parkAndCheckInterrupt, Thread-1 park(灰色表示)

ReentrantLock原理--非公平锁、可重入、可打断性

再次有多个线程经历上述过程竞争失败,变成这个样子

ReentrantLock原理--非公平锁、可重入、可打断性

Thread-0 释放锁,进入 tryRelease 流程,如果成功

  • 设置 exclusiveOwnerThread 为 null
  • state = 0

ReentrantLock原理--非公平锁、可重入、可打断性

当前队列不为 null,并且 head 的 waitStatus = -1,进入 unparkSuccessor 流程

找到队列中离 head 最近的一个 Node(没取消的),unpark 恢复其运行,本例中即为 Thread-1

回到 Thread-1 的 acquireQueued 流程

ReentrantLock原理--非公平锁、可重入、可打断性

如果加锁成功(没有竞争),会设置

  • exclusiveOwnerThread 为 Thread-1,state = 1
  • head 指向刚刚 Thread-1 所在的 Node,该 Node 清空 Thread
  • 原本的 head 因为从链表断开,而可被垃圾回收

如果这时候有其它线程来竞争(非公平的体现),例如这时有 Thread-4 来了

ReentrantLock原理--非公平锁、可重入、可打断性

如果不巧又被 Thread-4 占了先

  • Thread-4 被设置为 exclusiveOwnerThread,state = 1
  • Thread-1 再次进入 acquireQueued 流程,获取锁失败,重新进入 park 阻塞

加锁源码

// Sync 继承自 AQS
static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;

    // 加锁实现
    final void lock() {
        // 首先用 cas 尝试(仅尝试一次)将 state 从 0 改为 1, 如果成功表示获得了独占锁
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            // 如果尝试失败,进入 ㈠
            acquire(1);
    }

    // ㈠ AQS 继承过来的方法, 方便阅读, 放在此处
    public final void acquire(int arg) {
        // ㈡ tryAcquire 
        if (
                !tryAcquire(arg) &&
                        // 当 tryAcquire 返回为 false 时, 先调用 addWaiter ㈣, 接着 acquireQueued ㈤
                        acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
        ) {
            selfInterrupt();
        }
    }

    // ㈡ 进入 ㈢
    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }

    // ㈢ Sync 继承过来的方法, 方便阅读, 放在此处
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        // 如果还没有获得锁
        if (c == 0) {
            // 尝试用 cas 获得, 这里体现了非公平性: 不去检查 AQS 队列
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        // 如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入
        else if (current == getExclusiveOwnerThread()) {
            // state++
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        // 获取失败, 回到调用处
        return false;
    }

    // ㈣ AQS 继承过来的方法, 方便阅读, 放在此处
    private Node addWaiter(Node mode) {
        // 将当前线程关联到一个 Node 对象上, 模式为独占模式
        Node node = new Node(Thread.currentThread(), mode);
        // 如果 tail 不为 null, cas 尝试将 Node 对象加入 AQS 队列尾部
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                // 双向链表
                pred.next = node;
                return node;
            }
        }
        // 尝试将 Node 加入 AQS, 进入 ㈥
        enq(node);
        return node;
    }

    // ㈥ AQS 继承过来的方法, 方便阅读, 放在此处
    private Node enq(final Node node) {
        for (; ; ) {
            Node t = tail;
            if (t == null) {
                // 还没有, 设置 head 为哨兵节点(不对应线程,状态为 0)
                if (compareAndSetHead(new Node())) {
                    tail = head;
                }
            } else {
                // cas 尝试将 Node 对象加入 AQS 队列尾部
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

    // ㈤ AQS 继承过来的方法, 方便阅读, 放在此处
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (; ; ) {
                final Node p = node.predecessor();
                // 上一个节点是 head, 表示轮到自己(当前线程对应的 node)了, 尝试获取
                if (p == head && tryAcquire(arg)) {
                    // 获取成功, 设置自己(当前线程对应的 node)为 head
                    setHead(node);
                    // 上一个节点 help GC
                    p.next = null;
                    failed = false;
                    // 返回中断标记 false
                    return interrupted;
                }
                if (
                    // 判断是否应当 park, 进入 ㈦
                        shouldParkAfterFailedAcquire(p, node) &&
                                // park 等待, 此时 Node 的状态被置为 Node.SIGNAL ㈧
                                parkAndCheckInterrupt()
                ) {
                    interrupted = true;
                }
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    // ㈦ AQS 继承过来的方法, 方便阅读, 放在此处
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        // 获取上一个节点的状态
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL) {
            // 上一个节点都在阻塞, 那么自己也阻塞好了
            return true;
        }
        // > 0 表示取消状态
        if (ws > 0) {
            // 上一个节点取消, 那么重构删除前面所有取消的节点, 返回到外层循环重试
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            // 这次还没有阻塞
            // 但下次如果重试不成功, 则需要阻塞,这时需要设置上一个节点状态为 Node.SIGNAL
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

    // ㈧ 阻塞当前线程
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }
}
/*
注意:
	是否需要 unpark 是由当前节点的前驱节点的 waitStatus == Node.SIGNAL 来决定,而不是本节点的
waitStatus 决定
*/

解锁源码

// Sync 继承自 AQS
static final class NonfairSync extends Sync {
    // 解锁实现
    public void unlock() {
        sync.release(1);
    }

    // AQS 继承过来的方法, 方便阅读, 放在此处
    public final boolean release(int arg) {
        // 尝试释放锁, 进入 ㈠
        if (tryRelease(arg)) {
            // 队列头节点 unpark
            Node h = head;
            if (
                // 队列不为 null
                    h != null &&
                            // waitStatus == Node.SIGNAL 才需要 unpark
                            h.waitStatus != 0
            ) {
                // unpark AQS 中等待的线程, 进入 ㈡
                unparkSuccessor(h);
            }
            return true;
        }
        return false;
    }

    // ㈠ Sync 继承过来的方法, 方便阅读, 放在此处
    protected final boolean tryRelease(int releases) {
        // state--
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        // 支持锁重入, 只有 state 减为 0, 才释放成功
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }

    // ㈡ AQS 继承过来的方法, 方便阅读, 放在此处
    private void unparkSuccessor(Node node) {
        // 如果状态为 Node.SIGNAL 尝试重置状态为 0
        // 不成功也可以
        int ws = node.waitStatus;
        if (ws < 0) {
            compareAndSetWaitStatus(node, ws, 0);
        }
        // 找到需要 unpark 的节点, 但本节点从 AQS 队列中脱离, 是由唤醒节点完成的
        Node s = node.next;
        // 不考虑已取消的节点, 从 AQS 队列从后至前找到队列最前面需要 unpark 的节点
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }
}

2.可重入原理

static final class NonfairSync extends Sync {
    // ...

    // Sync 继承过来的方法, 方便阅读, 放在此处
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        // 如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入
        else if (current == getExclusiveOwnerThread()) {
            // state++
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }

    // Sync 继承过来的方法, 方便阅读, 放在此处
    protected final boolean tryRelease(int releases) {
        // state--
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        // 支持锁重入, 只有 state 减为 0, 才释放成功
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }
}

3.可打断原理

不可打断模式

在此模式下,即使它被打断,仍会驻留在 AQS 队列中,一直要等到获得锁后方能得知自己被打断了文章来源地址https://www.toymoban.com/news/detail-402371.html

// Sync 继承自 AQS
static final class NonfairSync extends Sync {
    // ...

    private final boolean parkAndCheckInterrupt() {
        // 如果打断标记已经是 true, 则 park 会失效
        LockSupport.park(this);
        // interrupted 会清除打断标记
        return Thread.interrupted();
    }

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (; ; ) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null;
                    failed = false;
                    // 还是需要获得锁后, 才能返回打断状态
                    return interrupted;
                }
                if (
                        shouldParkAfterFailedAcquire(p, node) &&
                                parkAndCheckInterrupt()
                ) {
                    // 如果是因为 interrupt 被唤醒, 返回打断状态为 true
                    interrupted = true;
                }
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    public final void acquire(int arg) {
        if (
                !tryAcquire(arg) &&
                        acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
        ) {
            // 如果打断状态为 true
            selfInterrupt();
        }
    }

    static void selfInterrupt() {
        // 重新产生一次中断
        Thread.currentThread().interrupt();
    }
}

可打断模式

static final class NonfairSync extends Sync {
    public final void acquireInterruptibly(int arg) throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        // 如果没有获得到锁, 进入 ㈠
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }

    // ㈠ 可打断的获取锁流程
    private void doAcquireInterruptibly(int arg) throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt()) {
                    // 在 park 过程中如果被 interrupt 会进入此
                    // 这时候抛出异常, 而不会再次进入 for (;;)
                    throw new InterruptedException();
                }
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
}

到了这里,关于ReentrantLock原理--非公平锁、可重入、可打断性的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Redis分布式可重入锁实现方案

    在单进程环境下,要保证一个代码块的同步执行,直接用 synchronized 或 ReetrantLock 即可。在分布式环境下,要保证多个节点的线程对代码块的同步访问,就必须要用到分布式锁方案。 分布式锁实现方案有很多,有基于关系型数据库行锁实现的;有基于ZooKeeper临时顺序节

    2024年02月19日
    浏览(43)
  • redis — redis cluster集群模式下如何实现批量可重入锁?

    一、redis cluster 集群版 在Redis 3.0版本以后,Redis发布了Redis Cluster。该集群主要支持搞并发和海量数据处理等优势,当 Redis 在集群模式下运行时,它处理数据存储的方式与作为单个实例运行时不同。这是因为它应该准备好跨多个节点分发数据,从而实现水平可扩展性。具体能力表

    2024年01月21日
    浏览(46)
  • 【分布式锁】06-Zookeeper实现分布式锁:可重入锁源码分析

    前言 前面已经讲解了Redis的客户端Redission是怎么实现分布式锁的,大多都深入到源码级别。 在分布式系统中,常见的分布式锁实现方案还有Zookeeper,接下来会深入研究Zookeeper是如何来实现分布式锁的。 Zookeeper初识 文件系统 Zookeeper维护一个类似文件系统的数据结构 image.png 每

    2024年02月22日
    浏览(47)
  • 分布式锁设计选型 不可重入锁建议使用ZooKeeper来实现 可重入锁建议使用Redis来实现 分布式锁:ZooKeeper不可重入锁 Java优化建议

    在设计分布式锁时,需要考虑业务场景和业务需求,以保证锁的正确性和可用性。 例如,在一个电商系统中,每个商品都有一个库存量。为了避免多个用户同时购买同一件商品导致库存出现不一致的情况,可以为每个商品设置一个分布式锁,确保同一时间只能有一个用户购买

    2024年02月08日
    浏览(49)
  • Java入门-可重入锁

    什么是可重入锁? 当线程获取某个锁后,还可以继续获取它,可以递归调用,而不会发生死锁; 可重入锁案例 程序可重入加锁 A.class,没有发生死锁。 sychronized锁 运行结果 ReentrantLock 运行结果 如何保证可重入 当一个线程访问同步块并获取锁时,会在对象头和栈帧中的锁记录里

    2024年02月22日
    浏览(41)
  • java~理解可重入锁

    在Java中,可重入锁(Reentrant Lock)是一种同步机制,允许线程在持有锁的情况下再次获取该锁,而不会被自己所持有的锁所阻塞。也就是说,一个线程可以多次获得同一个锁,而不会出现死锁的情况。 可重入锁在多线程编程中非常有用,它允许线程在访问共享资源时多次获取

    2024年02月09日
    浏览(49)
  • Linux 可重入、异步信号安全和线程安全

    当一个被捕获的信号被一个进程处理时,进程执行的普通的指令序列会被一个信号处理器暂时地中断。它首先执行该信号处理程序中的指令。如果从信号处理程序返回(例如没有调用exit或longjmp),则继续执行在捕获到信号时进程正在执行的正常指令序列(这和当一个硬件中

    2024年02月11日
    浏览(35)
  • 【Java | 多线程】可重入锁的概念以及示例

    可重入锁(又名递归锁)是一种特殊类型的锁,它允许 同一个线程在获取锁后再次进入该锁保护的代码块或方法,而不需要重新获取锁 。 说白了,可重入锁的特点就是同一个线程可以多次获取同一个锁,而不会因为之前已经获取过锁而阻塞。 可重入锁的一个优点是可以一定

    2024年04月24日
    浏览(39)
  • 以太坊硬分叉后的可重入漏洞攻击

    以太坊君士坦丁堡升级将降低部分 SSTORE 指令的 gas 费用。然而,这次升级也有一个副作用,在 Solidity 语言编写的智能合约中调用 address.transfer()函数或 address.send()函数时存在可重入漏洞。在目前版本的以太坊网络中,这些函数被认为是可重入安全的,但分叉后它们不再是了。

    2024年02月11日
    浏览(36)
  • 【redis】redis分布式锁(二)可重入锁+设计模式

    上一篇链接: 【redis】redis分布式锁(一)手写分布式锁1.0~6.0 隐式锁(即synchronized使用的锁)默认是可重入锁 synchronized的重入实现机理 显式锁也有ReentrantLock这样的可重入锁 结论: 下边将使用lua脚本的方式,把可重入锁的案例语句原子化 V1.0版本 蓝色部分是重复的,可

    2024年02月03日
    浏览(42)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包