10分钟从源码级别搞懂AQS(AbstractQueuedSynchronizer)

这篇具有很好参考价值的文章主要介绍了10分钟从源码级别搞懂AQS(AbstractQueuedSynchronizer)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

10分钟从源码级别搞懂AQS(AbstractQueuedSynchronizer)

前言

上篇文章15000字、6个代码案例、5个原理图让你彻底搞懂Synchronized有说到synchronized由object monitor实现的

object monitor中由cxq栈和entry list来实现阻塞队列,wait set实现等待队列,从而实现synchronized的等待/通知模式

而JDK中的JUC并发包也通过类似的阻塞队列和等待队列实现等待/通知模式

这篇文章就来讲讲JUC的基石AQS(AbstractQueuedSynchronizer)

需要了解的前置知识:CAS、volatile

如果不了解CAS可以看上篇讲述synchronized的文章(链接在上面)

如果不了解volatile可以看这篇文章 5个案例和流程图让你从0到1搞懂volatile关键字

本篇文章以AQS为中心,深入浅出描述AQS中的数据结构、设计以及获取、释放同步状态的源码流程、Condition等

观看本文大约需要10分钟,可以带着几个问题去观看

  1. 什么是AQS,它是干啥用的?
  2. AQS是使用什么数据结构实现的?
  3. AQS获取/释放同步状态是如何实现的?
  4. AQS除了具有synchronized的功能还拥有什么其他特性?
  5. AQS如何去实现非公平锁、公平锁?
  6. 什么是Condition?它跟AQS是什么关系?

AQS数据结构

什么是AQS呢?

AQS是一个同步队列(阻塞队列),是并发包中的基础,很多并发包中的同步组件底层都使用AQS来实现,比如:ReentrantLock、读写锁、信号量等等...

AQS有三个重要的字段,分别是: head 头节点、tail 尾节点、state 同步状态

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    //头节点
    private transient volatile Node head;
    //尾节点
    private transient volatile Node tail;
    //同步状态
    private volatile int state;   
}    

头尾节点很好理解,因为AQS本身就是个双向链表,那么state同步状态是什么?

AQS中使用同步状态表示资源,然后使用CAS来获取/释放资源,比如设置资源为1,一个线程来尝试获取资源,由于同步状态目前为1,于是该线程CAS替换同步状态为0,成功后表示获取到资源,之后其他线程再来获取资源就无法获取了(状态为0),直到获取资源的线程来释放资源

上述获取/释放资源也可以理解成获取/释放锁

同时三个字段都被volatile修饰,用volatile来保证内存可见性,防止其他线程修改这些数据时当前线程无法感知

通过上面的描述,我们可以知道AQS大概长这样

10分钟从源码级别搞懂AQS(AbstractQueuedSynchronizer)

当某个线程获取资源失败时,会被构建成节点加入AQS中

节点Node是AQS中的内部类,Node中有些重要的字段一起来看看

static final class Node {
        //节点状态
        volatile int waitStatus;
    
        //前驱节点
        volatile Node prev;
​
        //后继节点
        volatile Node next;
        
        //当前节点所代表的线程
        volatile Thread thread;
​
        //等待队列使用时的后继节点指针
        Node nextWaiter;
}

prev、next、thread应该都好理解

AQS同步队列和等待队列都使用这种节点,当等待队列节点被唤醒出队时,方便加入同步队列

nextWaiter就是用于节点在等待队列中指向下一个节点

waitStatus表示节点的状态

状态 说明
INITIAL 0 初始状态
CANCELLED 1 该节点对应的线程取消调度
SIGNAL -1 该节点对应的线程阻塞,等待唤醒竞争资源
CONDITION -2 该节点在等待(条件)队列中,等待唤醒后从等待队列出队进入同步队列竞争
PROPAGATE -3 共享情况下,会唤醒后续所有共享节点

不太理解状态不要紧,我们后文遇到再说

经过上面的描述,节点大概是长成这样的

10分钟从源码级别搞懂AQS(AbstractQueuedSynchronizer)

AQS中还有另外一个内部类ConditionObject用于实现等待队列/条件队列,我们后文再来说说

AQS中可以分为独占、共享模式,其中这两种模式下还可以支持响应中断、纳秒级别超时

独占模式可以理解为同一时间只有一个线程能够获取同步状态

共享模式可以理解为可以有多个线程能够获取同步状态,方法中常用shared标识

方法中常用acquire标识获取同步状态,release标识释放同步状态

10分钟从源码级别搞懂AQS(AbstractQueuedSynchronizer)

这些方法都是模板方法,规定流程,将具体的实现留给实现类去做(比如获取同步状态,该如何获取交给实现类去实现)

独占式

独占式实际就是时刻上只允许一个线程独占该资源,多线程竞争情况下也只能有一个线程获取同步状态成功

获取同步状态

不响应中断的独占获取和响应中断、超时的类似,我们以acquire为例查看源码

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

tryAcquire 方法用于尝试获取同步状态,参数arg表示获取多少同步状态,获取成功返回true 则会退出方法,留给实现类去实现

addWaiter

addWaiter(Node.EXCLUSIVE) 构建独占式节点,并用CAS+失败重试的方式加入AQS的末尾

    private Node addWaiter(Node mode) {
        //构建节点
        Node node = new Node(Thread.currentThread(), mode);
        //尾节点不为空则CAS替换尾节点
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //尾节点为空或则CAS失败执行enq
        enq(node);
        return node;
    }
    private Node enq(final Node node) {
        //失败重试
        for (;;) {
            Node t = tail;
            //没有尾节点 则CAS设置头节点(头尾节点为一个节点),否则CAS设置尾节点
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

enq方法主要以自旋(中途不会进入等待模式)去CAS设置尾节点,如果AQS中没有节点则头尾节点为同一节点

由于添加到尾节点存在竞争,因此需要用CAS去替换尾节点

10分钟从源码级别搞懂AQS(AbstractQueuedSynchronizer)

acquireQueued

acquireQueued方法主要用于AQS队列中的节点来自旋获取同步状态,在这个自旋中并不是一直执行的,而是会被park进入等待

final boolean acquireQueued(final Node node, int arg) {
    //记录是否失败
    boolean failed = true;
    try {
        //记录是否中断过
        boolean interrupted = false;
        //失败重试 
        for (;;) {
            //p 前驱节点
            final Node p = node.predecessor();
            //如果前驱节点为头节点,并尝试获取同步状态成功则返回
            if (p == head && tryAcquire(arg)) {
                //设置头节点
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            //失败则设置下标记然后进入等待检查中断
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        //如果失败则取消获取
        if (failed)
            cancelAcquire(node);
    }
}

在尝试获取同步状态前有个条件p == head && tryAcquire(arg):前驱节点是头节点

因此AQS中的节点获取状态是FIFO的

但即使满足前驱节点是头节点,并不一定就能获取同步状态成功,因为还未加入AQS的线程也可能尝试获取同步状态,以此来实现非公平锁

那如何实现公平锁呢?

在尝试获取同步状态前都加上这个条件就行了呗!

再来看看shouldParkAfterFailedAcquire 获取同步状态失败后应该停放

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    //前驱节点状态
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        //前驱节点状态是SIGNAL 说明前驱释放同步状态回来唤醒 直接返回
        return true;
    if (ws > 0) {
        //如果前驱状态大于0 说明被取消了,就一直往前找,找到没被取消的节点
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        //排在没被取消的节点后面
        pred.next = node;
    } else {
        //前驱没被取消,而且状态不是SIGNAL CAS将状态更新为SIGNAL,释放同步状态要来唤醒
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

实际上是park前的一些准备

再来看看 parkAndCheckInterrupt ,用工具类进入等待状态,被唤醒后检查是否中断

private final boolean parkAndCheckInterrupt() {
        //线程进入等待状态... 
        LockSupport.park(this);
         //检查是否中断 (会清除中断标记位)
        return Thread.interrupted();
}

acquireQueued的中如果未获取同步状态并且抛出异常,最终会执行cancelAcquire取消

当感知到中断时返回true回去,来到第一层acquire方法执行selfInterrupt方法,自己中断线程

acquire流程图:

  1. 先尝试获取同步状态失败则CAS+失败重试添加到AQS末尾
  1. 前驱节点为头节点且获取同步状态成功则返回,否则进入等待状态等待唤醒,唤醒后重试
  1. 在2期间发生异常取消当前节点

10分钟从源码级别搞懂AQS(AbstractQueuedSynchronizer)

释放同步状态

先进行释放同步状态,成功后头节点状态不为0 唤醒下一个状态不是被取消的节点

public final boolean release(int arg) {
    //释放同步状态
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            //唤醒下一个状态不大于0(大于0就是取消)的节点
            unparkSuccessor(h);
        return true;
    }
    return false;
}

响应中断

acquireInterruptibly用于响应中断的获取同步状态

public final void acquireInterruptibly(int arg)
        throws InterruptedException {
    //查看是否被中断,中断抛出异常
    if (Thread.interrupted())
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}

doAcquireInterruptibly 与原过程类似,就是在被唤醒后检查到被中断时抛出中断异常

    private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    //被唤醒后检查到被中断时抛出中断异常
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

响应中断的获取同步状态被中断时会直接抛出中断异常,而不响应的是自己中断

响应超时

响应超时的获取同步状态使用tryAcquireNanos 超时时间为纳秒级别

public final boolean tryAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquire(arg) ||
        doAcquireNanos(arg, nanosTimeout);
}

可以看出响应超时同时也会响应中断

doAcquireNanos也与原过程类似

    private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        final long deadline = System.nanoTime() + nanosTimeout;
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
                //还有多久超时
                nanosTimeout = deadline - System.nanoTime();
                if (nanosTimeout <= 0L)
                    //已超时
                    return false;
                if (shouldParkAfterFailedAcquire(p, node) &&
                    //大于1ms
                    nanosTimeout > spinForTimeoutThreshold)
                    //超时等待
                    LockSupport.parkNanos(this, nanosTimeout);
                //响应中断
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

响应超时在自旋期间会计算还有多久超时,如果大于1ms就等待对应的时间,否则就继续自旋,同时响应中断

共享

共享式就是允许多个线程同时获取一定的资源,比如信号量、读锁就是用共享式实现的

其实共享式与独占式流程类似,只是尝试获取同步状态的实现不同

我们用个获取同步状态的方法来说明

共享式获取同步状态使用acquireShared

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

tryAcquireShared 尝试获取同步状态,参数arg表示获取多少同步状态,返回剩余可获取同步状态的数量

如果剩余可获取同步状态数量小于0 说明 未获取成功进入doAcquireShared

    private void doAcquireShared(int arg) {
        //添加共享式节点
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                //获取前驱节点
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        //如果前驱节点为头节点 并且 获取同步状态成功 设置头节点
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                //获取失败进入会等待的自旋
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

响应中断、超时等方法也与独占式类似,只是有些设置细节不同

Condition

上文曾说过AQS充当阻塞(同步)队列,Condition来充当等待队列

AQS的内部类ConditionObject就是Condition的实现,它充当等待队列,用字段记录头尾节点

public class ConditionObject implements Condition{
        //头节点
        private transient Node firstWaiter;
        //尾节点
        private transient Node lastWaiter;  
}

节点之间使用nextWait指向下一个节点,形成单向链表

10分钟从源码级别搞懂AQS(AbstractQueuedSynchronizer)

同时提供await系列方法来让当前线程进入等待,signal系列方法来唤醒

        public final void await() throws InterruptedException {
            //响应中断
            if (Thread.interrupted())
                throw new InterruptedException();
            //添加到末尾 不需要保证原子性,因为能指向await一定是获取到同步资源的
            Node node = addConditionWaiter();
            //释放获取的同步状态
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            //不在同步队列就park进入等待
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            //被唤醒后自旋获取同步状态
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            //取消后清理
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

await主要将节点添加到condition object末尾,释放获取的同步状态,进入等待,唤醒后自旋获取同步状态

signal的主要逻辑在transferForSignal中

    final boolean transferForSignal(Node node) {
        //CAS修改节点状态 失败返回 变成取消
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
        //加入AQS末尾
        Node p = enq(node);
        int ws = p.waitStatus;
        //CAS将节点状态修改为SIGNAL 成功则唤醒节点
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

signal 主要把状态从-2condition 修改为 0(失败则取消节点), 然后加入AQS的末尾,最后再将状态该为-1 signal,成功则唤醒节点

为什么加入AQS末尾还是使用enq去CAS+失败重试操作保证原子性呢?

因为ConditionObject允许有多个,也就一个AQS同步队列可能对应多个Condition等待(条件)队列

10分钟从源码级别搞懂AQS(AbstractQueuedSynchronizer)

总结

本篇文章以AQS为核心,深入浅出的描述AQS实现的数据结构、设计思想、获取/释放同步资源的源码级流程、Condition等

AQS使用头尾节点来实现双向队列,提供同步状态和获取/释放同步状态的模板方法来实现阻塞(同步)队列,并且这些字段使用volatile修饰,保证可见性与读取的场景配合,不需要保证原子性,在写的场景下常用CAS保证原子性

AQS与Condition使用相同类型的节点,在AQS中节点维护成双向链表,在Condition中节点维护成单向链表,节点除了维护指向关系,还需要记录对应线程和节点状态

AQS分为独占式和共享式,使用独占式时只允许一个线程获取同步状态,使用共享式时则允许多个线程获取同步状态;其中还提供响应中断、等待超时的类似方法

获取同步状态:先尝试获取同步状态,如果失败则CAS+失败重试的方式将节点添加到AQS末尾,等待被前驱节点唤醒;只有当前驱节点为头节点并且获取同步状态成功才返回,否则进入等待,被唤醒后继续尝试(自旋);在此期间如果发生异常,在抛出异常前会取消该节点

释放同步状态:尝试释放同步状态,成功后唤醒后继未被取消的节点

在获取同步状态时,被唤醒后会检查中断标识,如果是响应中断的则会直接抛出中断异常,不响应的则是在最外层自己中断

响应超时时,在自旋获取同步状态期间会计时,如果距离超时小于1ms就不进入等待的自旋,大于则再等待对应时间

AQS充当阻塞队列,Condition充当它的等待队列来实现等待/通知模式,AQS的内部类ConditionObject在await时会加入Condition末尾并释放同步状态进入等待队列,在被唤醒后自旋(失败会进入等待)获取同步状态;在single时会CAS的将condition头节点并加入AQS尾部再去唤醒(因为一个AQS可能对应多个Condition因此要CAS保证原子性)

最后(不要白嫖,一键三连求求拉~)

本篇文章被收入专栏 由点到线,由线到面,深入浅出构建Java并发编程知识体系,感兴趣的同学可以持续关注喔

本篇文章笔记以及案例被收入 gitee-StudyJava、 github-StudyJava 感兴趣的同学可以stat下持续关注喔~

有什么问题可以在评论区交流,如果觉得菜菜写的不错,可以点赞、关注、收藏支持一下~

关注菜菜,分享更多干货,公众号:菜菜的后端私房菜

本文由博客一文多发平台 OpenWrite 发布!文章来源地址https://www.toymoban.com/news/detail-692476.html

到了这里,关于10分钟从源码级别搞懂AQS(AbstractQueuedSynchronizer)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 10分钟的时间,带你彻底搞懂JavaScript数据类型转换

    前言  📫 大家好,我是南木元元,热衷分享有趣实用的文章,希望大家多多支持,一起进步!  🍅  个人主页: 南木元元 目录 JS数据类型 3种转换类型 ToBoolean ToString ToNumber 对象转原始类型 隐式类型转换 结语 JS数据类型 首先我们需要知道,js中数据类型分为两大类: 基本

    2024年02月05日
    浏览(50)
  • 10分钟搞懂商业模式画布:9张分析图、6张模板

    新入职、新行业,新人如何快速搞懂它的业务?新领域、新业务,投资人如何快速搞明白一个公司?新商机、新模式,创业者如何快速一个业务的商业前景? 推荐大家使用商业模式画布,它可以让你轻松看透商业模式。对新入职员工、想快速了解一个领域的投资者、了解一个

    2024年02月04日
    浏览(46)
  • 一文让你彻底搞懂AQS(通俗易懂的AQS)

    AQS是一个用来构建锁和同步器的框架,使用AQS能简单且高效地构造出应用广泛的大量的同步器,比如我们提到的ReentrantLock,Semaphore,其他的诸如ReentrantReadWriteLock,SynchronousQueue,FutureTask等等皆是基于AQS的。当然,我们自己也能利用AQS非常轻松容易地构造出符合我们自己需求的

    2024年02月17日
    浏览(38)
  • 基于AbstractQueuedSynchronizer之Condition源码分析

    java.util.concurrent.locks.Condition 是一个将 Object 的监视器方法(wait、notify 和 notifyAll)分离到不同的对象中的类,以实现每个对象有多个等待集合的效果,这些对象与任意 Lock 实现结合使用。当一个 Lock 替换了synchronized方法和语句的使用时,Condition 替换了 Object 监视器方法的使用

    2024年02月09日
    浏览(33)
  • 万字长文解析AQS抽象同步器核心原理(深入阅读AQS源码)

    在争用激烈的场景下使用基于CAS自旋实现的轻量级锁有两个大的问题: CAS恶性空自旋会浪费大量的CPU资源。 在SMP架构的CPU上会导致“总线风暴”。 解决CAS恶性空自旋的有效方式之一是以空间换时间,较为常见的方案有两种:分散操作热点、使用队列削峰。 JUC并发包使用的是

    2024年02月11日
    浏览(40)
  • 并发编程 - AQS 源码

    1. AQS 源码 2. AQS 框架具体实现 - 独占锁实现 ReentrantLock 源码 实现 ReentrantLock 的三大核心原理: LocksSuport :加锁解锁 自旋 :如果没有加锁成功就一直自旋 CAS :保证只能有一个线程可以加锁成功 queue 队列:用容器保存上面未加锁成功的阻塞线程,要解锁的时候从容器中拿出

    2023年04月21日
    浏览(32)
  • 美团面试官:可重复读隔离级别实现原理是什么?(一文搞懂MVCC机制)

    本文首发于公众号【看点代码再上班】,欢迎围观,第一时间获取最新文章。 原文:美团面试官:可重复读隔离级别实现原理是什么?(一文搞懂MVCC机制) “全文共计4270字,预计阅读时间6分钟 大家好,我是 tin ,这是我的第26篇原创文章 还记得MySQL数据库事务都有哪些隔离

    2024年02月13日
    浏览(35)
  • Java多线程/AQS源码

    持锁模式-资源占用模式:  你是想要群体面试,还是单独面试啊  独占和共享 获锁方式:  是不是要把手机关机啊,赶不赶时间是不是必须要在某个时间内面试完,是否公平啊  不响应线程中断;响应线程中断;定时获取锁;公平获取;非公平获取 同步状态-锁状态;

    2024年02月15日
    浏览(30)
  • 一分钟搞懂ResNet

    ResNet的输入和输出通常都是图像或者图像特征,具体输入和输出的尺寸和通道数取决于具体的网络结构和任务。在ResNet中,输入图像首先经过一个卷积层和池化层,然后通过多个残差模块,最后通过全局平均池化和全连接层输出最终的分类结果。 ResNet在图像分类、目标检测、

    2024年02月05日
    浏览(44)
  • AQS源码分析——以ReentrantLock为例

    AQS自身属性: Node属性: 1. 以ReentrantLock为例,其他类举一反三,方法lock() 2. Lock接口实现类,基本都是通过【聚合】了一个 【队列同步器】的子类完成线程访问控制的Sync实现lock;Sync继承AQS;Sync又有两个实现类。  公平锁与非公平锁的实现: 接下来以非公平锁为突破口:

    2024年02月10日
    浏览(38)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包