Java并发工具CountDownLatch的使用和原理

这篇具有很好参考价值的文章主要介绍了Java并发工具CountDownLatch的使用和原理。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1.等待多线程完成的 CountDownLatch

  CountDownLatch 允许一个或多个线程等待其他线程完成操作。

  假如有这样一个需求:我们需要解析一个 Excel 里多个 sheet 的数据,此时可以考虑使用多线程,每个线程解析一个 sheet 里的数据,等到所有的 sheet 都解析完之后,程序需要提示解析完成。在这个需求中,要实现主线程等待所有线程完成 sheet 的解析操作,最简单的做法是使用 join()方法,如代码清单 1-1 所示。

代码清单1-1 CountDownLatchUseCase.java
public class CountDownLatchUseCase {
    public static void main(String[] args) throws InterruptedException {
        Thread threadA = new Thread(()->{
            System.out.println("parser1 finish");
        },"threadA");

        Thread threadB = new Thread(()->{
            System.out.println("parser2 finish");
        },"threadB");

        // 任务A B 开始执行
        threadA.start();
        threadB.start();

        // 等待AB 完成任务
        threadA.join();
        threadB.join();
    }
}

  join 用于让当前执行线程等待 join 线程执行结束。其实现原理是不停检查 join 线程是否存活,如果 join 线程存活则让当前线程永远等待。其中,wait(0)表示永远等待下去,代码片段如下。

java.lang.Thread#join(long)
// 同步方法
public final synchronized void join(long millis)
throws InterruptedException {
	long base = System.currentTimeMillis();
	long now = 0;

	if (millis < 0) {
		throw new IllegalArgumentException("timeout value is negative");
	}
	if (millis == 0) {
        // isAlive()方法返回一个boolean值,如果线程已经启动且尚未终止,则返回true;否则,返回false。此次的while循环为了防止被阻塞的线程被意外唤醒,走出这段循环代表线程任务已经执行完毕,线程已经终止。
		while (isAlive()) {
            // 线程进入等待状态
			wait(0);
		}
	} else {
        // 使用join(long millis)参数不为0走这部分逻辑,如果线程没有即时被唤醒,超时时间过后自己进入RUNNABLE状态。
		while (isAlive()) {
			long delay = millis - now;
			if (delay <= 0) {
				break;
			}
			wait(delay);
			now = System.currentTimeMillis() - base;
		}
	}
}

  直到 join 线程中止后,线程的 this.notifyAll()方法会被调用,调用 notifyAll()方法是在 JVM 里实现的,所以在 JDK 里看不到。
  在 JDK 1.5 之后的并发包中提供的 CountDownLatch 也可以实现 join 的功能,并且比join 的功能更多,将代码1-1使用CountDownLatch改下下,如代码清单 1-2 所示。

代码清单1-2 CountDownLatchUseCase.java
public class CountDownLatchUseCase {
    private static CountDownLatch countDownLatch = new CountDownLatch(2);
    public static void main(String[] args) throws InterruptedException {
        Thread threadA = new Thread(()->{
            System.out.println("parser1 finish");
            countDownLatch.countDown();
        },"threadA");

        Thread threadB = new Thread(()->{
            System.out.println("parser2 finish");
            countDownLatch.countDown();
        },"threadB");
        threadA.start();
        threadB.start();
        // 等待任务完成
        countDownLatch.await();
    }
}

  CountDownLatch 的构造函数接收一个 int 类型的参数作为计数器,如果你想等待 N个点完成,这里就传入 N。当我们调用 CountDownLatch 的 countDown 方法时,N 就会减 1,CountDownLatch 的 await 方法会阻塞当前线程,直到 N 变成零。由于 countDown方法可以用在任何地方,所以这里说的 N 个点,可以是 N 个线程,也可以是 1 个线程里的 N 个执行步骤。用在多个线程时,只需要把这个 CountDownLatch 的引用传递到线程里即可。

  如果有某个解析 sheet 的线程处理得比较慢,我们不可能让主线程一直等待,所以可以使用另外一个带指定时间的 await 方法——await(long time,TimeUnit unit),这个方法等待特定时间后,就会不再阻塞当前线程。join 也有类似的方法。

计数器必须大于等于 0,只是等于 0 时候,计数器就是零,调用 await 方法时不会阻塞当前线程。CountDownLatch 不可能重新初始化或者修改 CountDownLatch对象的内部计数器的值。一个线程调用 countDown 方法 happen-before,另外一个线程调用 await 方法。

2.CountDownLatch原理解析

2.1 构造方法 public CountDownLatch(int count)

初始化好AQS同步状态为state

// 1.方法 CountDownLatch countDownLatch = new CountDownLatch(2);
# java.util.concurrent.CountDownLatch#CountDownLatch
// 使用构造方法将AQS的同步状态位初始为count
public CountDownLatch(int count) {
	if (count < 0) throw new IllegalArgumentException("count < 0");
	this.sync = new Sync(count);
}

Sync(int count) {
	setState(count);
}

2.2 方法public void countDown()

# 片段1 java.util.concurrent.CountDownLatch#countDown
public void countDown() {
    // 将共享状态State的值减一
	sync.releaseShared(1);
}

# 片段 2 java.util.concurrent.locks.AbstractQueuedSynchronizer#releaseShared
public final boolean releaseShared(int arg) {
    // tryReleaseShared(arg)方法是CountDownLatch 重写的方法,该方法见片段3
	if (tryReleaseShared(arg)) {
        // 当同步状态为0时会执行这个方法
		doReleaseShared();
		return true;
	}
	return false;
}

# 片段 3 java.util.concurrent.CountDownLatch.Sync#tryReleaseShared 模板方法
protected boolean tryReleaseShared(int releases) {
	// Decrement count; signal when transition to zero
    // 自旋+CAS 更新同步状态
	for (;;) {
        // 获得同步状态
		int c = getState();
        
		if (c == 0)
			return false;
		int nextc = c-1;
		if (compareAndSetState(c, nextc))
            // 如果同步更新后的值为0放回true
			return nextc == 0;
	}
}

# 片段 4 java.util.concurrent.locks.AbstractQueuedSynchronizer#doReleaseShared
private void doReleaseShared() {
/*
 * Ensure that a release propagates, even if there are other
 * in-progress acquires/releases.  This proceeds in the usual
 * way of trying to unparkSuccessor of head if it needs
 * signal. But if it does not, status is set to PROPAGATE to
 * ensure that upon release, propagation continues.
 * Additionally, we must loop in case a new node is added
 * while we are doing this. Also, unlike other uses of
 * unparkSuccessor, we need to know if CAS to reset status
 * fails, if so rechecking.
 */
for (;;) {
    // 获得同步队列头节点
	Node h = head;
    // 如果头节点存在且不等于尾节点(tail),进入if里
	if (h != null && h != tail) {
        // 获得头节点的等待状态
		int ws = h.waitStatus;
        // 判断等待状态是否为Node.SIGNAL,等待状态为SIGNAL表示需要唤醒后继节点
		if (ws == Node.SIGNAL) {
            //使用compareAndSetWaitStatus(h, Node.SIGNAL, 0)方法来尝试将头节点的等待状态从Node.SIGNAL设置为0。如果设置成功,则调用unparkSuccessor(h)方法唤醒后继节点。如果设置失败,则继续循环重新检查情况。
			if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
				continue;            // loop to recheck cases
            // 唤醒后续节点 见代码片段5
			unparkSuccessor(h);
		}
        //如果等待状态为0,并且使用compareAndSetWaitStatus(h, 0, Node.PROPAGATE)方法尝试将头节点的等待状态从0设置为Node.PROPAGATE,则继续循环。
		else if (ws == 0 &&
				 !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
			continue;                // loop on failed CAS
	}
    //头节点不变退出循环
	if (h == head)                   // loop if head changed
		break;
}
}

# 片段5 java.util.concurrent.locks.AbstractQueuedSynchronizer#unparkSuccessor
private void unparkSuccessor(Node node) {
	/*
	 * If status is negative (i.e., possibly needing signal) try
	 * to clear in anticipation of signalling.  It is OK if this
	 * fails or if status is changed by waiting thread.
	 */
	int ws = node.waitStatus;
    // 如果等待状态ws小于0,尝试将等待状态清除为0,忽略修改失败和其他线程可能对等待状态的更改。
	if (ws < 0)
		compareAndSetWaitStatus(node, ws, 0);

	/*
	 * Thread to unpark is held in successor, which is normally
	 * just the next node.  But if cancelled or apparently null,
	 * traverse backwards from tail to find the actual
	 * non-cancelled successor.
	 */
    // 获得头节点的下一个节点
	Node s = node.next;
	if (s == null || s.waitStatus > 0) {
		s = null;
        // 循环遍历从尾节点开始,直到找到非取消状态的节点或者已经遍历到给定节点node为止。
		for (Node t = tail; t != null && t != node; t = t.prev)
			if (t.waitStatus <= 0)
				s = t;
	}
	if (s != null)
        // 对首个等待线程进行唤醒
		LockSupport.unpark(s.thread);
}

2.3 方法countDownLatch.await()

# 片段1 java.util.concurrent.CountDownLatch#await()
public void await() throws InterruptedException {
    // 见代码片段2
	sync.acquireSharedInterruptibly(1);
}
# 片段2 java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireSharedInterruptibly
public final void acquireSharedInterruptibly(int arg)
		throws InterruptedException {
	// 判断线程是否被中断,中断就抛出中断异常	
	if (Thread.interrupted())
		throw new InterruptedException();
	// 	调用tryAcquireShared(arg)获取同步状态信息。见代码片段3
	if (tryAcquireShared(arg) < 0)
	    // 同步状态不为0执行,doAcquireSharedInterruptibly(arg)见代码片段4
		doAcquireSharedInterruptibly(arg);
}

# 片段3 java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireSharedInterruptibly
protected int tryAcquireShared(int acquires) {
    // 同步状态是否为0
	return (getState() == 0) ? 1 : -1;
}

# 片段4 java.util.concurrent.locks.AbstractQueuedSynchronizer#doAcquireSharedInterruptibly
private void doAcquireSharedInterruptibly(int arg)
	throws InterruptedException {
    // 将一个代表当前线程的等待节点node添加到同步队列中。
	final Node node = addWaiter(Node.SHARED);
    // 设置一个标志变量failed,用于跟踪获取许可的过程是否失败。
	boolean failed = true;
	try {
        // 循环
		for (;;) {
            // 获取当前节点多的前驱节点
			final Node p = node.predecessor();
            // 判断前驱节点是否为头节点
			if (p == head) {
                //见代码片段3。
				int r = tryAcquireShared(arg);
				if (r >= 0) {
                    // 同步状态为0 代表new CountDownLatch(int count) 传入的count已经被countDown()减少完了,代表任务都执行完了。
                    //设置头节点为当前节点node,并触发传播操作,将共享许可传播给后继节点。
					setHeadAndPropagate(node, r);
					p.next = null; // help GC
					failed = false;
					return;
				}
			}
            // shouldParkAfterFailedAcquire(p, node) 方法判断同步状态不为0时是否需要阻塞线程等待,返回true时继续执行parkAndCheckInterrupt()方法阻塞当前调用await()的线程,直到有线程调用countDown()将同步状态state更新为0时,调用 unparkSuccessor(Node node)方法将当前线程唤醒。
			if (shouldParkAfterFailedAcquire(p, node) &&
				parkAndCheckInterrupt())
				throw new InterruptedException();
		}
	} finally {
		if (failed)
			cancelAcquire(node);
	}
}

3.总结

  CountDownLatch使用主要围绕CountDownLatch.countDown()方法和CountDownLatch.await()方法,使用时先通过CountDownLatch的构造方法设置共享同步变量state的大小,然后让其他“工作”线程完成工作任务时调用countDown方法将state-1,让“等待”线程调用await()方法等待“工作”线程完成任务,当某个“工作”线程完成最后一个任务,并且将state更新为0时,这个“工作”线程会去唤醒同步队列中的第一个“等待”线程,首个“等待”线程被唤醒会“通知”其他“等待”线程,然后他们从await()方法方法,继续执行。

参考 《Java并发编程的艺术》文章来源地址https://www.toymoban.com/news/detail-597337.html

到了这里,关于Java并发工具CountDownLatch的使用和原理的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【Java基础】线程同步类 CountDownLatch

    ​ 关于作者:CSDN内容合伙人、技术专家, 从零开始做日活千万级APP。 专注于分享各领域原创系列文章 ,擅长java后端、移动开发、人工智能等,希望大家多多支持。 正好今天项目中用到了CountDownLatch,那我们正好总结一下,通过本文你可以学到什么是CountDownLatch及其原理,

    2024年02月12日
    浏览(47)
  • CountDownLatch使用错误+未最终断开连接导致线程池资源耗尽

            我设置了CountDownLatch对线程的协作做出了一些限制,但是我发现运行一段时间以后便发现定时任务不运行了。 具体代码: 报错以后定时任务不运行了   打印线程日志发现定时任务的线程在第86行代码停着不动了。 正常的线程日志应该是这样的。 查看第86行代码,

    2024年04月24日
    浏览(37)
  • Java并发(二)----初次使用多线程并行提高效率

    并行代表充分利用多核 cpu 的优势,提高运行效率。 想象下面的场景,执行 3 个计算,最后将计算结果汇总。 如果是串行执行,那么总共花费的时间是 10 + 11 + 9 + 1 = 31ms 但如果是四核 cpu,各个核心分别使用线程 1 执行计算 1,线程 2 执行计算 2,线程 3 执行计算 3,那么 3 个

    2023年04月13日
    浏览(67)
  • 【Java|多线程与高并发】线程安全问题以及synchronized使用实例

    Java多线程环境下,多个线程同时访问共享资源时可能出现的数据竞争和不一致的情况。 线程安全一直都是一个令人头疼的问题.为了解决这个问题,Java为我们提供了很多方式. synchronized、ReentrantLock类等。 使用线程安全的数据结构,例如ConcurrentHashMap、ConcurrentLinkedQueue等

    2024年02月09日
    浏览(44)
  • Java并发编程学习18-线程池的使用(下)

    上篇介绍了 ThreadPoolExecutor 配置和扩展相关的信息,本篇开始将介绍递归算法的并行化。 还记得我们在《Java并发编程学习11-任务执行演示》中,对页面绘制程序进行一系列改进,这些改进大大地提供了页面绘制的并行性。 我们简单回顾下相关的改进过程: 第一次新增时,页

    2024年02月12日
    浏览(37)
  • Java并发编程学习16-线程池的使用(中)

    上篇分析了在使用任务执行框架时需要注意的各种情况,并简单介绍了如何正确调整线程池大小。 本篇将继续介绍对线程池进行配置与调优的一些方法,详细如下: ThreadPoolExecutor 为 Executors 中的 newCachedThreadPool 、 newFixedThreadPool 和 newScheduledThreadExecutor 等工厂方法返回的 Exe

    2024年02月10日
    浏览(42)
  • 探讨Java多线程调度:如何实现两线程并行,一线程等待?

    亲爱的小伙伴们,大家好!我是小米,很高兴再次和大家分享一些关于Java编程的有趣技巧和知识。今天,我们将探讨一个有趣且常见的面试问题:如何让两个线程同时执行,而第三个线程必须等待前两个线程结束后才能开始执行呢?这是一个非常实用的问题,也是我们在多线

    2024年02月08日
    浏览(40)
  • (线程池)多线程使用场景--es数据批量导入、数据汇总、异步调用;如何控制某个方法允许并发访问线程的数量;对ThreadLocal的理解及实现原理、源码解析、ThreadLocal的内存泄露问题

    CountDownLatch(闭锁/倒计时锁) 用来进行线程同步协作,等待所有线程完成倒计时(一个或者多个线程,等待其他多个线程完成某件事情之后才能执行) 其中构造参数用来初始化等待计数值 await() 用来等待计数归零 countDown() 用来让计数 减一 多线程使用场景一:( es数据批量导

    2024年04月25日
    浏览(65)
  • 巧用CountDownLatch实现多线程并行工作

    【前言】       CountDownLatch 是JDK提供的一个同步工具,它可以让一个或多个线程挂起等待,一直等到其他线程执行完成才会继续执行。常用方法有 countDown 方法和 await 方法, CountDownLatch 在初始化时,需要指定一个整数n作为计数器。当调用 countDown 方法时,计数器会被减1;

    2024年02月13日
    浏览(39)
  • 多线程系列(十五) -常用并发工具类详解

    在前几篇文章中,我们讲到了线程、线程池、BlockingQueue 等核心组件,其实 JDK 给开发者还提供了比 synchronized 更加高级的线程同步组件,比如 CountDownLatch、CyclicBarrier、Semaphore、Exchanger 等并发工具类。 下面我们一起来了解一下这些常用的并发工具类! 2.1、CountDownLatch CountDow

    2024年03月09日
    浏览(51)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包