SychronousQueue同步队列

这篇具有很好参考价值的文章主要介绍了SychronousQueue同步队列。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

SynchronousQueue 是一个比较特别的队列,由于在线程池方面有所应用,为了更好的理解线程池的实现原理,

此队列源码中充斥着大量的CAS语句,理解起来是有些难度的,为了方便日后回顾,本篇文章会以简洁的图形化方式展示该队列底层的实现原理。 

SychronousQueue简单 

经典的生产者-消费者模式,操作流程是这样的: 
有多个生产者,可以并发生产产品,把产品置入队列中,如果队列满了,生产者就会阻塞; 有多个消费者,并发从队列中获取产品,如果队列空了,消费者就会阻塞;

SynchronousQueue 也是一个队列来的,的特别之,一个生产线程,当它生产产品(即put的时候),如果当前没有人想要消费产品(即当前没有线程执行take),此生产线程必须阻塞,等待一个消费线程调用take操作,take操作将会唤醒该生产线程,同时消费线程会获取生产线程的产品(即数据传递),这样的一个过程称为一次配对过程(当然也可以先take后put,原理是一样的)。 

我们用一个简单的代码来验证一下,如下所示: 

package com.concurrent;

import java.util.concurrent.SynchronousQueue; 

public class SynchronousQueueDemo { 

 public static void main(String[] args) throws InterruptedException { 

 final SynchronousQueue<Integer> queue = new SynchronousQueue<Integer>(); 

 Thread putThread = new Thread(new Runnable() { 

 @Override 

 public void run() { 

 System.out.println("put thread start"); 

 try { 

 queue.put(1); 

 } catch (InterruptedException e) {

 } 

 System.out.println("put thread end"); 

 } 

 }); 

 Thread takeThread = new Thread(new Runnable() { 

 @Override 

 public void run() { 

 System.out.println("take thread start"); 

 try { 
 System.out.println("take from putThread: " + queue.take());  } catch (InterruptedException e) { 

 } 

 System.out.println("take thread end"); 

 } 

 }); 

 putThread.start();

 Thread.sleep(1000);

 takeThread.start();

 } 

}

一种输出结果如下: 

put thread start

take thread start

take from putThread: 1

put thread end

take thread end

从结果可以看出,put线程执行queue.put(1) 后就被阻塞了,只有take线程进行了消费,put线程才可以返回。

可以认为这是一种线程与线程间一对一传递消息的模型。 

SychronousQueue实现 

不 像 ArrayBlockingQueue 、 LinkedBlockingDeque 之类 的 阻 塞队列依 赖 AQS 实 现 并发 操作 ,SynchronousQueue直接使用CAS实现线程的安全访问。 

队列的实现策略通常分为公平模式和非公平模式,接下来将分别进行说明。 

公平 
公平模式下,底层实现使用的是TransferQueue这个内部队列,它有一个head和tail指针,用于指向当前正在等待匹配的线程节点。 

初始化时,TransferQueue的状态如下: 

接着我们进行一些操作: 
1、线程put1执行  put(1)操作,由于当前没有配对的消费线程,所以put1线程入队列,自旋一小会后睡眠等待,这时队列状态如下: 

2、接着,线程put2执行了put(2)操作,跟前面一样,put2线程入队列,自旋一小会后睡眠等待,这时队列

状态如下: 

3、这时候,来了一个线程take1,执行了  take操作,由于tail指向put2线程,put2线程跟take1线程配对了(一put一take),这时take1线程不需要入队,但是请注意了,这时候,要唤醒的线程并不是put2,而是put1。 

为何? 大家应该知道我们现在讲的是公平策略,所谓公平就是谁先入队了,谁就优先被唤醒,我们的例子明显是put1 应该优先被唤醒。至于读者可能会有一个疑问,明明是 take1 线程跟 put2 线程匹配上了,结果是 put1 线程被唤醒消费,怎么确保take1线程一定可以和次首节点(head.next)也是匹配的呢?其实大家可以拿个纸画一画,就会发现真的就是这样的。 

公平队尾队。 

执行后put1线程被唤醒,take1线程的  take()方法返回了1(put1线程的数据),这样就实现了线程间的一对一通信,这时候内部状态如下: 

4、最后,再来一个线程take2,执行take操作,这时候只有put2线程在等候,而且两个线程匹配上了,线程put2被唤醒, take2线程take操作返回了2(线程put2的数据),这时候队列又回到了起点,如下所示: 

以上便是公平模式下,SynchronousQueue的实现模型。总结下来就是:队尾匹配队头出队,先进先出,体现公平原则。 

非公 
我们还是使用跟公平模式下一样的操作流程,对比两种策略下有何不同。非公平模式底层的实现使用的是TransferStack,一个栈,实现中用head指针指向栈顶,接着我们看看它的实现模型: 
1、线程put1执行  put(1)操作,由于当前没有配对的消费线程,所以put1线程入栈,自旋一小会后睡眠等 待,这时栈状态如下: 

head put1线程

2、接着,线程put2再次执行了put(2)操作,跟前面一样,put2线程入栈,自旋一小会后睡眠等待,这时栈状态如下: 

head

put2线程

put1线程

3、这时候,来了一个线程take1,执行了take操作,这时候发现栈顶为put2线程,匹配成功,但是实现会先把 take1 线程入栈,然后 take1 线程循环执行匹配 put2 线程逻辑,一旦发现没有并发冲突,就会把栈顶指针直接指向  put1线程 

步骤一: 

head

Take1线程

循环中匹配put2线程

put2线程

put1线程

步骤二: 

head put1线程

4、最后,再来一个线程take2,执行take操作,这跟步骤3的逻辑基本是一致的,take2线程入栈, 然后在循环中匹配put1线程,最终全部匹配完毕,栈变为空,恢复初始状态,如下图所示: 步骤一: 

head

Take2线程

循环中匹配put1线程

put1线程

步骤二: 

head Null

可以从上面流程看出,虽然put1线程先入栈了,但是却是后匹配,这就是非公平的由来。 

SychronousQueue总结 
SynchronousQueue由于其独有的线程一一配对通信机制,在大部分平常开发中,可能都不太会用到,但线程池技术中会有所使用,由于内部没有使用AQS,而是直接使用CAS,所以代码理解起来会比较困难,但这并不妨碍我们理解底层的实现模型,在理解了模型的基础上,有兴趣的话再查阅源码,就会有方向感,看起来也会比较容易,希望本文有所借鉴意义。 
文章来源地址https://www.toymoban.com/news/detail-470172.html

到了这里,关于SychronousQueue同步队列的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【Java基础】AQS (AbstractQueuedSynchronizer) 抽象队列同步器

    关于作者:CSDN内容合伙人、技术专家, 从零开始做日活千万级APP。 专注于分享各领域原创系列文章 ,擅长java后端、移动开发、人工智能等,希望大家多多支持。 我们继续总结学习 Java基础知识 ,温故知新。 CLH(Craig, Landin, and Hagersten locks)是一种自旋锁,能确保无饥饿性,提

    2024年02月13日
    浏览(32)
  • 并发专栏-队列同步器 AQS 以及 Reentrantlock 应用

    Java 中的大部分同步类都是基于AbstractQueuedSynchronizer(简称为AQS)实现的。 ReentrantLock 、 ReentrantReadWriteLock 、 Semaphore(信号量) 、 CountDownLatch 、 公平锁 、 非公平锁 、 ThreadPoolExecutor 都和 AQS 有直接关系,所以了解 AQS 的抽象实现,并在此基础上结合上述各类的实现细节,很快就

    2024年02月07日
    浏览(35)
  • 【Linux学习】多线程——同步 | 条件变量 | 基于阻塞队列的生产者消费者模型

    🐱作者:一只大喵咪1201 🐱专栏:《Linux学习》 🔥格言: 你只管努力,剩下的交给时间! 以生活中消费者生产者为例: 生活中,我们大部分人都扮演着消费者的角色,会经常在超市买东西,比如买方便面,而超市的方便面是由供应商生成的。所以我们就是消费者,供应商

    2024年02月05日
    浏览(37)
  • Debezium vs OGG vs Tapdata:如何实时同步 Oracle 数据到 Kafka 消息队列?

    随着信息时代的蓬勃发展,企业对实时数据处理的需求逐渐成为推动业务创新和发展的重要驱动力。在这个快速变化的环境中,许多企业选择将 Oracle 数据库同步到 Kafka,以满足日益增长的实时数据处理需求。本文将深入探讨这一趋势的背后原因,并通过一个真实的客户案例

    2024年04月10日
    浏览(40)
  • 排好队,一个一个来:宫本武藏教你学队列(附各种队列源码)

    哈喽!欢迎来到黑洞晓威的博客! 上一次我们在这里聊了一下队列,现在,让我们再次翻开这个话题,继续探讨一下这个有趣的数据结构吧! 虽然队列看起来比较普通,但是它在实际应用中却 有着不可替代的作用。所以,无论是计算机系统中的任务调度,还是网络数据包的

    2023年04月09日
    浏览(20)
  • Q是一个队列,S是一个空栈,实现将队列中的所有元素逆置

    题目: / 【问题描述】设Q是一个队列,S是一个空栈,实现将队列中的元素逆置的算法。(假设队列中的元素为字符型) 【输入形式】队列中的元素依次入队 【输出形式】依次输出队列中的元素 【样例输入】abcd 【样例输出】dcba 【样例说明】 【评分标准】 / 写作原因:自己

    2024年02月06日
    浏览(15)
  • Git同步一个仓库代码到另一个仓库

    在当前仓库操作,更新代码库 查看当前仓库origin只有一个,接下来我们要add另一个仓库的origin newOrigin远程仓库名称,可以随便起个方便记忆的,目的是在本地添加一个新的远程连接 newOrigin后面是newOrigin的一个分支,可以指定为master或你要push的目标分支,都可以。执行完命令

    2024年02月10日
    浏览(29)
  • 什么?要求设计一个循环队列?

    🎈个人主页:🎈 :✨✨✨初阶牛✨✨✨ 🐻推荐专栏: 🍔🍟🌯C语言初阶 🍔🍟🌯C语言进阶 🔑个人信条: 🌵知行合一 🍉本篇简介::讲解用c语言实现数据结构的循环队列. 先声明一下: 题目来源:力扣(LeetCode) 题目名称: 设计循环队列 :题目链接 难度: 中等 介绍: 设计你的

    2024年02月07日
    浏览(28)
  • Git同步一个分支的提交到另一个分支

    Git 是一款分布式版本控制系统,它提供了许多强大的功能来管理代码的版本和变更。 cherry-pick 是一个非常常用的 Git 命令,它的功能是将某个分支的某次提交应用到当前分支。这对于将特定的代码改动从一个分支移动到另一个分支非常有用。例如,你可能在开发分支上修复了

    2024年01月20日
    浏览(31)
  • 参考RabbitMQ实现一个消息队列

    消息队列的本质就是阻塞队列,它的最大用途就是用来实现生产者消费者模型,从而实现 解耦合 以及 削峰填谷 。 在分布式系统中不再是单个服务器而是服务器“集群”,如果我们我们直接A服务器给B服务器发送请求,B服务器给A服务器返回响应,这样的话我们AB的耦合较大

    2024年02月14日
    浏览(75)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包