前言
大家都知道在并发编程中一般会用到多线程技术,多线程技术可以大大增加系统QPS/TPS。但是在一些特殊的业务场景下我们需要限制线程的并发数目,比如秒杀系统、多种商品金额叠加运算等等都是需要限制线程数量。特别是在分布式微服务架构,多线程同步问题尤为明显。一般在实际的开发中我们会使用同步和加锁的方式对线程进行处理,比如synchronized修饰代码块和方法达到同步的目的。但是synchronized默认非公平锁且没有中断和超时功能,所以在实际开发中我们推荐使用可重入锁ReentrantLock。
ReentrantLock原理
可重入锁ReentrantLock内部继承AQS抽象同步队列,并由AQS中的CLH双向FIFO阻塞队列和STATE状态保证锁的同步。ReentrantLock在初始化可以显示指定公平与非公平锁,默认是非公平锁。是否公平锁体现在公平锁在STATE == 0时唤醒CLH阻塞队列头部线程,非公平锁则是CLH队列中的线程与其他线程进行争夺资源。另外ReentrantLock必须显示使用Lock()、unLock()获取与释放锁,并提供中断和超时获取锁等方法,功能和灵活度远远高于synchronized。
ReentrantLock VS Synchronized
1、用法不同,synchronized修饰方法、静态方法和代码块,ReentrantLock只用用在代码块中;
2、锁性质不同,Synchronized是非公平锁,ReentrantLock默认非公平锁,但可以显示指定公平与非公平锁;
3、获取锁方式不同,Synchronized是底层自动加解锁,ReentrantLock必须用代码加解锁;
4、中断方式不同,Synchronized获取锁不能中断,ReentrantLock可以中断解决死锁问题;
5、实现方式不同,Synchronized 通过JVM监视器Monitor实现,通过增加monitorenter 和 monitorexit指令对代码块同步;ReentrantLock则是通过AQS抽象同步队列的API进行同步。
源码解析
ReentrantLock同步机制
进入package java.util.concurrent.locks 下查看ReentrantLock源码:
//reentrantlock 内部类继承aqs,并显示覆写一些方法
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
/**
* Performs {@link Lock#lock}. The main reason for subclassing
* is to allow fast path for nonfair version.
*/
abstract void lock();
/**
* 非公平尝试获取锁
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
//释放锁
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
}
如上源码所示,ReentrantLock 内部类 Sync 继承AQS抽象同步队列,并覆写了尝试获取锁、获取锁、释放锁的方法。从这里可以看出ReentrantLock是通过调用AQS的API实现同步功能。
继续查看源码:
/**
* 非公平锁同步对象
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
/**
* 公平锁同步对象
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
如上源码所示,ReentrantLock有公平锁与非公平锁之分。非公平锁与公平锁都是覆写了tryAcquire()尝试获取锁的方法,并都是调用AQS获取锁acquire()方法。唯一不同的是公平锁在尝试获取锁的时候会调用hasQueuedPredecessors()方法判断线程是否在CLH队列中,非公平则没有这个逻辑直接是 CAS设置 STATE值、标识独占线程和重入验证。
当然ReentrantLock默认是非公平锁,也是可以通过构造方法指定锁类型:
/**
* Creates an instance of {@code ReentrantLock}.
* This is equivalent to using {@code ReentrantLock(false)}.
*/
public ReentrantLock() {
sync = new NonfairSync();
}
/**
* Creates an instance of {@code ReentrantLock} with the
* given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
ReentrantLock可重入机制
可重入机制就是同一个线程可以多次进入受限同步资源。ReentrantLock当然是可重入的,我们进入源码:
//非公平锁对象尝试获取锁
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
//公平锁对象尝试获取锁
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
如上源码所示无论是用的公平锁还是非公平锁,在STETE == 0时都会用 CAS compareAndSetState(0, acquires)方法改变STATE值,然后调用setExclusiveOwnerThread(current)方法设置独占线程;如果STATE != 0说明资源已经被占用,此时会在getExclusiveOwnerThread()方法会验证是否是独占线程,如果current == getExclusiveOwnerThread() 当前线程就是独占资源的线程,那么该线程直接获取到锁进入资源。
ReentrantLock可中断机制
可中断机制就是线程在阻塞队列中等待过程中,我们可以调用lockInterruptibly() 中断方法打断线程的等待来防止死锁的发生。
我们继续查看中断机制的源码:
//中断方法
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
//aqs中断方法
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//尝试获取锁
if (!tryAcquire(arg))
//获取不到表示在阻塞队列,则进行中断逻辑
doAcquireInterruptibly(arg);
}
如上源码所示,当程序主动调用lockInterruptibly()方法是调用的AQS获取中断机制。首先会先尝试获取锁,如果能够获取到锁则不进入中断逻辑,如果没有获取到锁则进入中断方法。
继续查看中断源码:
//aqs中断方法
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
//尝试获取锁
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
//获取失败抛出中断异常
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
如源码所示,中断机制会自旋获取锁,如果获取到锁则不再执行中断机制;如果没有获取到锁则抛出 InterruptedException() 异常,程序最终如果没有获取到锁会调用 cancelAcquire(node)取消获取锁,从而避免无限等待使程序死锁的发生。
ReentrantLock超时机制
超时机制就是获取锁的方法中传入超时时间,如果在这个时间后还没有获取到锁则取消获取锁并返回false标识。
老规矩查看源码:
//tryLock 带有超时时间的尝试获取锁
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
//底层调用aqs获取锁方法
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
如源码所示,tryLock(long timeout, TimeUnit unit)底层是调用AQS tryAcquireNanos(int arg, long nanosTimeout)方法。在调用时会先尝试获取锁,没有获取到则进入超时获取锁逻辑。
继续查看超时获取锁源码:
//AQS超时获取锁源码
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
//获取当前节点的前节点
final Node p = node.predecessor();
//前节点为头结点且获取锁成功
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
//超时返回为未获取到锁标识
return false;
//未获取到锁且线程应该阻塞且超时时间大于1000ns会睡眠nanosTimeout
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
//取消获取锁
cancelAcquire(node);
}
}
如源码所示 如果 p == head && tryAcquire(arg) 当前线程节点的前驱节点是头部节点且当前线程尝试获取锁成功,会将当前节点设置为头部节点且直接返回获取到锁标识。如果不是头部节点或没有获取到锁则会判断是否获取时间超时,如果超时返回false结束流程,否则shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold 节点需要阻塞且超时时间大于1000纳秒则 LockSupport.parkNanos(this, nanosTimeout) 将当前线程睡眠nanosTimeout 纳秒,否则当前线程自旋获取锁。
cancelAcquire(node) 如果没有获取到锁或者超时最终都会直接取消获取锁机制 。
条件变量Condition
条件变量就是线程在执行的过程中会可以调用await()阻塞线程、signal()唤醒线程。就相当于休息室,调用await就是让当前线程等待,signal就是让线程继续执行。值得注意的是在await()阻塞线程时候会释放锁,signal()则是唤醒最先调用await()的线程,并且我们可以定义多个condition条件。
查看源码:
//ReentrantLock 中新建一个条件变量
public Condition newCondition() {
return sync.newCondition();
}
//调用aqs ConditionObject实例化一个条件对象
final ConditionObject newCondition() {
return new ConditionObject();
}
如源码所示ReentrantLock condition还是用的AQS的ConditionObject。
我们继续分析ConditionObject 中的核心方法await()、signal()方法:
//aqs ConditionObject await方法
public final void await() throws InterruptedException {
//线程如果中断则抛出中断异常
if (Thread.interrupted())
throw new InterruptedException();
//将当前节点加入等待队列
Node node = addConditionWaiter();
//释放同步队列节点
int savedState = fullyRelease(node);
int interruptMode = 0;
//判断是否在同步队列,如果不在同步队列则阻塞线程
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
如源码所示,我们在线程执行过程中调用 Condition await()方法会先检查线程是否中断,没有中断会将当前线程加入等待队列,释放同步队列并保存STATE状态。然后会判断是节点是否在同步队列,不在同步队列会执行LockSupport.park(this)方法将线程阻塞。
当然在线程阻塞过程中会自旋验证 (interruptMode = checkInterruptWhileWaiting(node)) != 0 线程是否中断,是否被Condition signal()方法唤醒。
我们继续分析signal()源码:
// condition 唤醒方法
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
//唤醒等待队列第一个线程
doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
如源码所示,signal()唤醒方法默认唤醒条件等待队列中第一个线程。transferForSignal(first)则是将节点从条件队列移动到同步队列。
继续查看移动节点源码transferForSignal:
final boolean transferForSignal(Node node) {
/*
* cas 修改 Node.CONDITION == 0,如果修改失败标识节点已经取消
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
将node 移动到同步队列并返回前驱节点
*/
Node p = enq(node);
int ws = p.waitStatus;
//如果前驱节点 waitStatus > 0 或者 cas修改p节点为唤醒状态失败
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
//强制唤醒线程
LockSupport.unpark(node.thread);
return true;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
如上源码所示,enq(node)方法会将等待队列的第一个节点移动到同步队列,并会返回节点的前驱节点。然后 ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL) 表示 节点的前驱节点取消或者cas设置前驱节点waitStatus == SIGNAL失败 则 执行LockSupport.unpark(node.thread)方法强制唤醒节点。
这里有个很重要的点就是同步队列节点的 waitStatus等待状态,如果是SIGNAL,则表示后续节点是一个阻塞节点需要唤醒的节点,当当前节点执行完成后会主动唤醒后续节点。这里也表明了conditon signal()方法唤醒线程并不会立即执行,而是要当前线程执行完成才会执行唤醒的线程。文章来源:https://www.toymoban.com/news/detail-425168.html
写在最后
今天的博文我们讲述了ReentrantLock与Synchronized的区别联系,也解析了ReentrantLock可重入、可中断、超时机制以及conditon变量的源码。通过源码我们可以知道ReentrantLock 是通过AQS保证同步,其他的中断机制、超时机制、以及condition变量也都是基于AQS中的相应底层代码。在实际的开发过程中,我们可以根据自身业务场景选择应用ReentrantLock方法,从而增加系统的健壮性和可维护性。文章来源地址https://www.toymoban.com/news/detail-425168.html
到了这里,关于并发编程之可重入锁ReentrantLock的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!