结合ReentrantLock来看AQS的原理

这篇具有很好参考价值的文章主要介绍了结合ReentrantLock来看AQS的原理。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

AQS的定义

​ 队列同步器 AbstractQueuedSynchronizer(以下简称同步器),是用来构建锁或者其他同步组件的基础框架,它使用了一个 int 成员变量表示同步状态,通过内置的 FIFO 队列来完成资源获取线程的排队工作,并发包的作者(Doug Lea)期望它能够成为实现大部分同步需求的基础。

队列同步器的接口与示例

同步器的设计是基于模板方法模式的,也就是说,使用者需要继承同步器并重写指定的方法,随后将同步器组合在自定义同步组件的实现中,并调用同步器提供的模板方法,而这些模板方法将会调用使用者重写的方法。重写同步器指定的方法时,需要使用同步器提供的如下 3 个方法来访问或修改同步状态。

  • getState():获取当前同步状态。

  • setState(int newState):设置当前同步状态。

  • compareAndSetState(int expect,int update):使用 CAS 设置当前状态,该方法能够保证状态设置的原子性。

同步器可重写的方法与描述如下表所示。

方法名称 描述信息
protected boolean tryAcquire(int arg) 独占式获取同步状态,实现该方法需要查询当前状态并判断同步状态是否符合预期,然后再进行CAS设置同步状态
protected boolean tryAcquire(int arg) 独占式释放同步状态,等待获取同步状态的线程将有机会获取同步状态。
protected int tryAcquireShared(int arg) 共享式获取同步状态,返回大于等于0的值,表示获取成功,反之,失败。
protected boolean tryReleaseShared(int arg) 共享式释放锁。
protected boolean isHeldExclusively() 判断当前线程是否在独占模式下被线程占用,一般该方法表示是否被当前线程所独占。

实现自定义同步组件的同时,将会调用同步器提供的模板方法,这些(部分)模板方法与描述下表所示。

方法名称 描述
public final void acquire(int arg) 独占式获取同步状态,如果当前线程获取同步状态成功,则由该方法返回,否则,将会进入同步队列等待,该方法将会调用重写的tryAcquire(int arg)方法。
public final void acquireInterruptibly(int arg) 与acquire(int arg) 相同,但是该方法响应中断,当前线程未获取到同步状态而进入同步队列中,如果当前线程被中断,则该方法会抛出InterruptedException 异常。
public final boolean tryAcquireNanos(int arg, long nanosTimeout) 在acquireInterruptibly(int arg) 的基础上增加了超时限制,如果当前线程在超时时间内没有获取同步状态,那么会返回false,如果获取到了返回true。
public final void acquireShared(int arg) 共享式的获取同步状态,如果当前线程未获取到同步状态,将会进入同步队列等待,与独占式获取锁的主要区别是在同一时刻可以有多个线程获取到同步状态。
public final void acquireSharedInterruptibly(int arg) 与acquireInterruptibly(int arg) 方法相同,该方法响应中断。
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) 与acquireShared(int arg) 相同,增加了超时限制。
public final boolean release(int arg) 独占式的释放同步状态,该方法会在释放同步状态之后,将同步队列中第一个节点包含的线程唤醒。
public final boolean releaseShared(int arg) 共享式释放同步状态。
public final Collection getQueuedThreads() 获得在同步队列上的线程集合。

​ 同步器提供的模板方法基本上分为 3 类:独占式获取与释放同步状态、共享式获取与释放同步状态和查询同步队列中的等待线程情况。自定义同步组件将使用同步器提供的模板方法来实现自己的同步语义。

自定义独占锁代码示例

通过重写模板模式的钩子方法实现自定义独占锁。

class Mutex implements Lock {
    // 内部静态类自定义同步器
    private static class Sync extends AbstractQueuedSynchronizer {
        // 是否处于占用状态
        @Override
        protected boolean isHeldExclusively() {
            return this.getState() == 1;
        }
        // 当状态为0时获取到锁
        @Override
        protected boolean tryAcquire(int arg) {
            if (this.compareAndSetState(0,1)){
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
        //释放锁,将锁状态设置为0
        @Override
        protected boolean tryRelease(int arg) {
            if (getState() == 0){
                throw new IllegalMonitorStateException();
            }
            this.setExclusiveOwnerThread(null);
            this.setState(0);
            return true;
        }
        //返回一个Condition,每个condition都包含了一个condition队列
        Condition newCondition(){
            return new ConditionObject();
        }
    }
    // 仅需将操作代理到Sync上面。
    private Sync sync = new Sync();
    
    @Override
    public void lock() {
        sync.acquire(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1,unit.toNanos(time));
    }

    @Override
    public void unlock() {
        sync.release(1);
    }

    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }
}

​ 上述示例中,独占锁 Mutex 是一个自定义同步组件,它在同一时刻只允许一个线程占有锁。Mutex 中定义了一个静态内部类,该内部类继承了同步器并实现了独占式获取和释放同步状态。在 tryAcquire(int acquires)方法中,如果经过 CAS 设置成功(同步状态设置为 1),则代表获取了同步状态,而在 tryRelease(int releases)方法中只是将同步状态重置为 0。用户使用 Mutex 时并不会直接和内部同步器的实现打交道,而是调用 Mutex提供的方法,在 Mutex 的实现中,以获取锁的 lock()方法为例,只需要在方法实现中调用同步器的模板方法 acquire(int args)即可,当前线程调用该方法获取同步状态失败后会被加入到同步队列中等待,这样就大大降低了实现一个可靠自定义同步组件的门槛。

队列同步器的实现分析

接下来将从实现角度分析同步器是如何完成线程同步的,主要包括:

  • 同步队列

  • 独占式同步状态获取与释放、

  • 共享式同步状态获取与释放

  • 超时获取同步状态等同步器的核心数据结构与模板方法。

1 同步队列

​ 同步器依赖内部的同步队列(一个 FIFO 双向队列)来完成同步状态的管理,当前线程获取同步状态失败时,同步器会将当前线程以及等待状态等信息构造成为一个节点(Node)并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点中的线程唤醒,使其再次尝试获取同步状态。同步队列中的节点(Node)用来保存获取同步状态失败的线程引用、等待状态以及前驱和后继节点,节点的属性类型与名称以及描述如下表所示。

属性类型和名称 描述
int waitStatus 用来表示当前节点在队列中的状态,包含以下状态:
0 当一个Node被初始化的时候的默认值
CANCELLED 为1,表示线程获取锁的请求已经取消了
CONDITION 为-2,表示节点在等待队列中,节点线程等待唤醒
PROPAGATE 为-3,当前线程处在SHARED情况下,该字段才会使用
SIGNAL 为-1,表示线程已经准备好了,就等资源释放了
Node prev 前驱节点,当节点加入同步队列时设置
Node next 后续节点
Thread thread 获取同步状态的线程
Node nextWaiter 等待队列中的后继节点。如果当前节点是共享的,那么这个字段将是一个 SHARED常量,也就是说节点类型(独占和共享)和等待队列中的后继节点共用同一个字段

​ 节点是构成同步队列的基础,同步器拥有首节点(head)和尾节点(tail),没有成功获取同步状态的线程将会成为节点加入该队列的尾部,同步队列的基本结构如下图所示。

结合ReentrantLock来看AQS的原理

​ 在上图中,同步器包含了两个节点类型的引用,一个指向头节点,而另一个指向尾节点。试想一下,当一个线程成功地获取了同步状态(或者锁),其他线程将无法获取到同步状态,转而被构造成为节点并加入到同步队列中,而这个加入队列的过程必须要保证线程安全,因此同步器提供了一个基于 CAS 的设置尾节点的方法:compareAndSetTail(Node expect,Node update),它需要传递当前线程“认为”的尾节点和当前节点,只有设置成功后,当前节点才正式与之前的尾节点建立关联。

​ 同步器将节点加入到同步队列的过程如下图所示。

结合ReentrantLock来看AQS的原理

​ 同步队列遵循 FIFO,首节点是获取同步状态成功的节点,首节点的线程在释放同步状态时,将会唤醒后继节点,而后继节点将会在获取同步状态成功时将自己设置为首节点,该过程如下图所示。

结合ReentrantLock来看AQS的原理

​ 在上图 中,设置首节点是通过获取同步状态成功的线程来完成的,由于只有一个线程能够成功获取到同步状态,因此设置头节点的方法并不需要使用 CAS 来保证,它只需要将首节点设置成为原首节点的后继节点并断开原首节点的 next 引用即可。

2.通过ReentrantLock理解AQS

ReentrantLock中公平锁和非公平锁在底层是相同的,这里以非公平锁为例进行分析。

在非公平锁中,有一段这样的代码:

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;

         /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
    final void lock() {
        // 首先尝试获取同步状态
        if (compareAndSetState(0, 1))
            // 获取成功,将当前线程设置为锁独占线程
            setExclusiveOwnerThread(Thread.currentThread());
        else
            // 获取失败,调用AQS模板方法acquire(int arg)
            acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}
2.1 acquire(int arg)方法

​ 接下来看看acquire的源码,acquire方法在上面介绍了,他的功能是独占式获取同步状态,如果当前线程获取同步状态成功,则由该方法返回,否则,将会进入同步队列等待,该方法将会调用重写的tryAcquire(int arg)方法。这段代码主要完成了同步状态获取、节点构造、加入同步队列以及在同步队列中自旋等待的相关工作,其主要逻辑是:首先调用自定义同步器实现的 tryAcquire(int arg)方法,该方法保证线程安全的获取同步状态,如果同步状态获取失败,则构造同步节点(独占式 Node.EXCLUSIVE,同一时刻只能有一个线程成功获取同步状态)并通过addWaiter(Node node) 方法将该节点加入到同步队列的尾部,最后调用acquireQueued(Node node,int arg)方法,使得该节点以“死循环”的方式获取同步状态。如果获取不到则阻塞节点中的线程,而被阻塞线程的唤醒主要依靠前驱节点的出队或阻塞线程被中断来实现。

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
2.1.1 tryAcquire(arg)方法

先来看看ReentrantLock在非公平锁中重写的 tryAcquire(arg)方法

protected final boolean tryAcquire(int acquires) {
	return nonfairTryAcquire(acquires);
}


final boolean nonfairTryAcquire(int acquires) {
	// 获取当前线程
	final Thread current = Thread.currentThread();
	// 获取当前锁的同步状态
	int c = getState();
	if (c == 0) {
		// 如果同步状态为0,尝试获取锁同步状态
		if (compareAndSetState(0, acquires)) {
			// 获取锁同步状态成功,设置当前锁占用线程为当前线程
			setExclusiveOwnerThread(current);
			// 返回获取锁成功
			return true;
		}
	}
	// 判断当前锁占用线程是否是当前线程,用来实现可重入锁逻辑
	else if (current == getExclusiveOwnerThread()) {
		int nextc = c + acquires;
		if (nextc < 0) // overflow
			throw new Error("Maximum lock count exceeded");
		setState(nextc);
		return true;
	}
	// 获取锁失败,调用 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法。
	return false;
}

上面这段代码中可以得知如果当前线程调用tryAcquire(arg) 方法失败后会继续调用acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法。

2.1.2acquireQueued(addWaiter(Node.EXCLUSIVE), arg)

下面我们先看addWaiter(Node.EXCLUSIVE) 源码。

private Node addWaiter(Node mode) {
    // 通过当前线程和锁模式(这里是Node.EXCLUSIVE 独占模式)封装Node节点
	Node node = new Node(Thread.currentThread(), mode);
	// Try the fast path of enq; backup to full enq on failure
	// pred 指针指向尾节点
	Node pred = tail;
	// 判断指针指向的tail节点是否为null
	if (pred != null) {
	    // 如果tail节点不为null,尝试将node放入到同步队列尾部。
		node.prev = pred;
		if (compareAndSetTail(pred, node)) {
			pred.next = node;
			// 放入成功,放回node节点
			return node;
		}
	}
	// 尾节点设置失败或者说tail为null,调用enq方法
	enq(node);
	return node;
}

/**
 * 同步器通过“死循环”来保证节点的正确添加,在“死循环”中只有通过 CAS 将节点设置成为尾节点之后,当前线程才能从该方法返回
 * 否则,当前线程不断地尝试设置。
 */
private Node enq(final Node node) {
	for (;;) {
	    // t 指向尾节点tail
		Node t = tail;
		// 判断 tail是否为null
		if (t == null) { // Must initialize
		    // 如果为null尝试创建同步队列第一个节点(虚拟节点)
			if (compareAndSetHead(new Node()))
			    // 创建虚拟节点成功,将AQS的尾节点指针也指向这个虚拟节点
				tail = head;
		} else {
            // 将node节点加入到队列尾部
			node.prev = t;
			if (compareAndSetTail(t, node)) {
				t.next = node;
				return t;
			}
		}
	}
}

​ 上述代码通过使用 compareAndSetTail(Node expect,Node update)方法来确保节点能够被线程安全添加。试想一下:如果使用一个普通的 LinkedList 来维护节点之间的关系,那么当一个线程获取了同步状态,而其他多个线程由于调用 tryAcquire(int arg)方法获取同步状态失败而并发地被添加到 LinkedList 时,LinkedList 将难以保证 Node 的正确添加,最终的结果可能是节点的数量有偏差,而且顺序也是混乱的。

​ 在 enq(final Node node)方法中,同步器通过“死循环”来保证节点的正确添加,在“死循环”中只有通过 CAS 将节点设置成为尾节点之后,当前线程才能从该方法返回,否则,当前线程不断地尝试设置。可以看出,enq(final Node node)方法将并发添加节点的请求通过 CAS 变得“串行化”了。

双向链表中,第一个节点为虚节点,其实并不存储任何信息,只是占位。真正的第一个有数据的节点,是在第二个节点开始的。


下面继续查看方法acquireQueued(addWaiter(Node.EXCLUSIVE), arg)

​ 节点进入同步队列之后,就进入了一个自旋的过程,每个节点(或者说每个线程)都在自省地观察,当条件满足,获取到了同步状态,就可以从这个自旋过程中退出,否则依旧留在这个自旋过程中(并会阻塞节点的线程),代码如下

final boolean acquireQueued(final Node node, int arg) {
    // 标记是否成功拿到资源
	boolean failed = true;
	try {
	    // 标记等待过程中是否中断过
		boolean interrupted = false;
		// 开始自旋,要么获取锁,要么中断
		for (;;) {
		    // 获取当前节点的前驱节点
			final Node p = node.predecessor();
			// 如果p是头结点,说明当前节点在真实数据队列的首部,就尝试获取锁(别忘了头结点是虚节点)
			if (p == head && tryAcquire(arg)) {
			    // 获取锁成功,头指针移动到当前node
				setHead(node);
				//断开了p节点与后继节点之间的引用关系以便在适当的时候回收内存。
				p.next = null; // help GC
				failed = false;
				return interrupted;
			}
			// 说明p为头节点且当前没有获取到锁(可能是非公平锁被抢占了)或者是p不为头结点,这个时候就要判断当前node是否要被阻塞(被阻塞条件:前驱节点的waitStatus为-1),防止无限循环浪费资源。具体两个方法下面细细分析
			if (shouldParkAfterFailedAcquire(p, node) &&
				parkAndCheckInterrupt())
				interrupted = true;
		}
	} finally {
		if (failed)
			cancelAcquire(node);
	}
}

注:setHead方法是把当前节点置为虚节点,但并没有修改waitStatus,因为它是一直需要用的数据。

// java.util.concurrent.locks.AbstractQueuedSynchronizer
// 将当前节点设置为虚节点
private void setHead(Node node) {
	head = node;
	node.thread = null;
	node.prev = null;
}

// java.util.concurrent.locks.AbstractQueuedSynchronizer

// 靠前驱节点判断当前线程是否应该被阻塞
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
	// 获取头结点的节点状态
	int ws = pred.waitStatus;
	// 说明头结点处于唤醒状态
	if (ws == Node.SIGNAL)
		return true; 
	// 通过枚举值我们知道waitStatus>0是取消状态
	if (ws > 0) {
		do {
			// 循环向前查找取消节点,把取消节点从队列中剔除
			node.prev = pred = pred.prev;
		} while (pred.waitStatus > 0);
		pred.next = node;
	} else {
		// 设置前任节点等待状态为SIGNAL
		compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
	}
	return false;
}

// parkAndCheckInterrupt主要用于挂起当前线程,阻塞调用栈,返回当前线程的中断状态。
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}
2.2.3 总结:ReentrantLock非公平锁的lock()的流程如下:

结合ReentrantLock来看AQS的原理

3.如何解锁

public void unlock() {
	sync.release(1);
}

调用AQS的release(int arg)方法

public final boolean release(int arg) {
    // 调用ReentrantLock 重写的tryRelease(arg)方法
	if (tryRelease(arg)) {
	    // 释放锁成功,获取头结点
		Node h = head;
		if (h != null && h.waitStatus != 0)
		    //头结点不为空并且头结点的waitStatus不是初始化节点情况,解除线程挂起状态
			unparkSuccessor(h);
		return true;
	}
	return false;
}

// java.util.concurrent.locks.ReentrantLock.Sync#tryRelease
protected final boolean tryRelease(int releases) {
    // 减去可重入次数
	int c = getState() - releases;
	// 调用该方法线程不去当前获取锁线程抛异常
	if (Thread.currentThread() != getExclusiveOwnerThread())
		throw new IllegalMonitorStateException();
	boolean free = false;
	// 判断c是否为0
	if (c == 0) {
		free = true;
		//将独占线程设置为null
		setExclusiveOwnerThread(null);
	}
	setState(c);
	return free;
}

参考:
书籍 《Java并发编程的艺术》
https://tech.meituan.com/2019/12/05/aqs-theory-and-apply.html文章来源地址https://www.toymoban.com/news/detail-568453.html

到了这里,关于结合ReentrantLock来看AQS的原理的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 万字长文解析AQS抽象同步器核心原理(深入阅读AQS源码)

    在争用激烈的场景下使用基于CAS自旋实现的轻量级锁有两个大的问题: CAS恶性空自旋会浪费大量的CPU资源。 在SMP架构的CPU上会导致“总线风暴”。 解决CAS恶性空自旋的有效方式之一是以空间换时间,较为常见的方案有两种:分散操作热点、使用队列削峰。 JUC并发包使用的是

    2024年02月11日
    浏览(40)
  • 从ReentrantLock角度解析AQS

    是它,是它,就是它,并发包的基石; 闲来不卷,随便聊一点。 一般情况下,大家系统中至少也是JDK8了,那想必对于JDK5加入的一系列功能并不陌生吧。那时候重点加入了 java.util.concurrent 并发包,我们简称为JUC。JUC下提供了很多并发编程实用的工具类,比如并发锁lock、原子

    2023年04月14日
    浏览(35)
  • AQS源码分析——以ReentrantLock为例

    AQS自身属性: Node属性: 1. 以ReentrantLock为例,其他类举一反三,方法lock() 2. Lock接口实现类,基本都是通过【聚合】了一个 【队列同步器】的子类完成线程访问控制的Sync实现lock;Sync继承AQS;Sync又有两个实现类。  公平锁与非公平锁的实现: 接下来以非公平锁为突破口:

    2024年02月10日
    浏览(38)
  • 深入源码解析 ReentrantLock、AQS:掌握 Java 并发编程关键技术

    🔭 嗨,您好 👋 我是 vnjohn,在互联网企业担任 Java 开发,CSDN 优质创作者 📖 推荐专栏:Spring、MySQL、Nacos、Java,后续其他专栏会持续优化更新迭代 🌲文章所在专栏:JUC 🤔 我当前正在学习微服务领域、云原生领域、消息中间件等架构、原理知识 💬 向我询问任何您想要的

    2024年02月11日
    浏览(49)
  • AQS构建锁和同步器的框架

    AQS全称AbstractQueuedSynchronizer,此类在 java.util.concurrent.locks包下面 ,是一个 构建锁和同步器的框架 ,比如 ReentrantLock就是基于AQS来实现的 。 AQS内部有一个由 volatile修饰(保证其可见性) 的变量 state ,用此来表示锁是否被使用,初始化为0,当线程获取到锁时,state加1,此时当其

    2024年02月14日
    浏览(33)
  • Java中的ReentrantLock实现原理

    在并发编程中,线程安全问题一直是非常重要的问题。Java中提供了多种解决线程安全问题的机制,其中一个比较常用的就是ReentrantLock。本文将介绍ReentrantLock的实现原理,从原子性、可见性等方面解释并结合源码分析,以便更好地理解在多线程环境下实现线程安全的过程。

    2024年02月01日
    浏览(41)
  • 队列-来看Java骚操作

    元素只能从队尾插入,从队头删除 队列中的元素按照插入的顺序依次排列 只能访问队头和队尾元素,无法访问中间的元素 入队(enqueue):将元素插入到队尾。 出队(dequeue):删除队头元素,并返回被删除的元素。 获取队头元素(front):返回队头元素,但不删除。 获取队

    2024年02月16日
    浏览(30)
  • 学Java线程,你不知道什么是AQS?一文带你了解Java多线程同步的灵魂

    关于作者:CSDN内容合伙人、技术专家, 从零开始做日活千万级APP。 专注于分享各领域原创系列文章 ,擅长java后端、移动开发、人工智能等,希望大家多多支持。 我们继续总结学习 Java基础知识 ,温故知新。 CLH(Craig, Landin, and Hagersten locks)是一种自旋锁,能确保无饥饿性,提

    2024年02月16日
    浏览(43)
  • JUC并发编程之AQS原理

    全称是 AbstractQueuedSynchronizer,是阻塞式锁和相关的同步器工具的框架 特点: 用 state 属性来表示资源的状态(分独占模式和共享模式),子类需要定义如何维护这个生态,控制如何获取锁和释放锁 getState - 获取 state 状态 setState - 设置 state 状态 compareAndSetState - cas 机制设置 s

    2023年04月18日
    浏览(84)
  • 多线程系列(十八) -AQS原理浅析

    在之前的文章中,我们介绍了 ReentrantLock、ReadWriteLock、CountDownLatch、CyclicBarrier、Semaphore、ThreadPoolExecutor 等并发工具类的使用方式,它们在请求共享资源的时候,都能实现线程同步的效果。 在使用方式上稍有不同,有的是独占式,多个线程竞争时只有一个线程能执行方法,比

    2024年03月13日
    浏览(50)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包