多线程与高并发——并发编程(4)

这篇具有很好参考价值的文章主要介绍了多线程与高并发——并发编程(4)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

四、阻塞队列

1 基础概念

1.1 生产者消费者概念

生产者-消费者是设计模式的一种,让生产者和消费者基于一个容器来解决强耦合的问题。生产者与消费者彼此之间不会直接通讯,而是通过一个容器(队列)进行通讯。

  • 生产者生产完数据后扔到容器中,不用等消费者来处理;
  • 消费者也不需要去找生产者要数据,直接从容器中获取即可;
  • 而这种容器最常用的结构就是队列。

1.2 JUC阻塞队列的存取方法

常用的存取方法都来自 JUC 包下的 BlockingQueue

  • 生产者存储方法:
    • add(E):添加数据到队列,若队列满了,抛出异常;
    • offer(E):添加数据到队列,若队列满了,返回 false;
    • offer(E,timeout,unit):添加数据到队列,若队列满了,阻塞 timeout 时间,超时后返回 false;
    • put(E):添加数据到队列,若队列满了,挂起线程,等到队列中有位置,再扔数据进去,死等。
  • 消费者取数据方法:
    • remove():从队列中移除数据,若队列为空,抛出异常;
    • poll():从队列中移除数据,若队列为空,返回 false;
    • poll(timeout,unit):从队列中移除数据,若队列为空,阻塞 timeout 时间,等生产者仍数据再获取数据,超时后返回 false;
    • take():从队列中移除数据,若队列为空,挂起线程,一直等生产者仍数据再获取。

2 ArrayBlockingQueue

2.1 ArrayBlockingQueue的基本使用

  • ArrayBlockingQueue 在初始化时,必须指定当前队列的长度,因为 ArrayBlockingQueue 是基于数组实现的队列结构,数组长度不可变,必须提前设置数据长度信息。
public static void main(String[] args) throws InterruptedException {
   
    // 必须设置队列长度
    ArrayBlockingQueue queue = new ArrayBlockingQueue(4);
    // 生产者生产数据
    queue.add("1");
    queue.offer("2");
    queue.offer("3", 2, TimeUnit.SECONDS);
    queue.put("4");
    // 消费者消费数据
    System.out.println(queue.remove());
    System.out.println(queue.poll());
    System.out.println(queue.poll(2, TimeUnit.SECONDS));
    System.out.println(queue.take());
}

2.2 生产者方法实现原理

  • 生产者添加数据到队列的方法比较多,需要一个一个看
2.2.1 ArrayBlockingQueue的常见属性

ArrayBlockingQueue中的成员变量

final Object[] items; 				// 就是数组本身
int takeIndex;						// 取数据的下标
int putIndex;						// 存数据的下标
int count;							// 当前数组中元素的个数
final ReentrantLock lock;			// 就是一个 ReentrantLock 锁
private final Condition notEmpty;	// 消费者挂起线程和唤醒线程用到的Condition(可看作是synchronized的wait和notify)
private final Condition notFull;	// 生产者挂起线程和唤醒线程用到的Condition(可看作是synchronized的wait和notify)
2.2.2 add方法
  • add方法本身就是调用了offer方法,如果offer方法返回false,直接抛出异常
public boolean add(E e) {
   
    if (offer(e))
        return true;
    else 	// 抛出的异常
        throw new IllegalStateException("Queue full");
}
2.2.3 offer方法
public boolean offer(E e) {
   
    checkNotNull(e);	// 要求存储的数据不允许为null,否则抛出空指针异常
	// 拿到当前阻塞队列的lock锁
    final ReentrantLock lock = this.lock;
    lock.lock();	// 为保证线程安全,加锁
    try {
   
		// 判断队列中元素是否满了,若满了,则返回false
        if (count == items.length)
            return false;
        else {
   
			// 队列没满,执行 enqueue 将元素添加到队列中,并返回true
            enqueue(e);
            return true;
        }
    } finally {
   
        lock.unlock();		// 操作完释放锁
    }
}
// ================
private void enqueue(E x) {
   
    // 拿到数组的引用,将元素放到指定的位置
    final Object[] items = this.items;
    items[putIndex] = x;
	// 对putIndex进行++操作,并判断是否等于数组长度,需要归为
    if (++putIndex == items.length)
        putIndex = 0;	// 归位:将索引值设置为0
    count++;	// 添加成功,数据++
    notEmpty.signal();	// 将一个Condition中阻塞的线程唤醒
}
2.2.4 offer(time,unit)方法

生产者在添加数据时,如果队列已经满,阻塞一会:

  • 阻塞到消费者消费了消息,然后唤醒当前阻塞线程;
  • 阻塞到了 timeout 时间,再次判断是否可以添加,若不能直接告辞。
// 线程在挂起时,如果对当前阻塞线程的终端标记位进行设置,会抛出异常直接结束
public boolean offer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {
   
	// 非空校验
    checkNotNull(e);
    long nanos = unit.toNanos(timeout);		// 将时间单位转为纳秒
    final ReentrantLock lock = this.lock;	// 加锁
    lock.lockInterruptibly();	// 允许线程中断排除异常的加锁方法
    try {
   
        // 为什么是while(虚假唤醒)
        while (count == items.length) {
   	// 如果元素个数和数组长度一致,说明队列满了
            if (nanos <= 0)	// 判断等待时间是否充裕
                return false;	// 不充裕,直接添加失败,返回false
            // 挂起等待,会同时释放锁资源(对标 synchronized 的wait方法)
            // awaitNanos会挂起线程,并且返回剩余的阻塞时间,恢复执行时,需要重新获取锁资源
            nanos = notFull.awaitNanos(nanos);
        }
        enqueue(e); // 这里锁门队列有空间了,enqueue将数据添加到阻塞队列中,并返回true
        return true;
    } finally {
   
        lock.unlock();	// 是否锁资源
    }
}
2.2.5 put方法
  • 如果队列是满的,就一直挂起,直到被唤醒,或者被中断
public void put(E e) throws InterruptedException {
   
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
   
        while (count == items.length)
            // await方法会一直阻塞,直到被唤醒或者被中断
            notFull.await();
        enqueue(e);
    } finally {
   
        lock.unlock();
    }
}

2.3 消费者方法实现原理

2.3.1 remove方法
  • remove方法本身就是调用了poll方法,如果poll方法返回null,直接抛出异常
public E remove() {
   
    E x = poll();
    if (x != null)
        return x;
    else	// 没数据抛出异常
        throw new NoSuchElementException();
}
2.3.2 poll方法
// 拉取数据
public E poll() {
   
    final ReentrantLock lock = this.lock;
    lock.lock();	// 加锁
    try {
   
        // 若没有数据,直接返回null;否则执行dequeue,取出数据并返回
        return (count == 0) ? null : dequeue();
    } finally {
   
        lock.unlock();
    }
}
// 取出数据
private E dequeue() {
   
    // 将成员变量引用到局部变量
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];		// 直接获取指定索引位置的数据
    items[takeIndex] = null;		// 取出数据后,清空该索引位置
    if (++takeIndex == items.length)	// 设置下次取数据的索引位置
        takeIndex = 0;
    count--;	// 数组中元素个数减一
    if (itrs != null)	// 迭代器内容先跳过
        itrs.elementDequeued();
    // signal方法,会唤醒当前Condition中排队的一个Node
    // signalAll方法,会将Condition中所有的Node,全都唤醒
    notFull.signal();
    return x;	// 返回数据
}
2.3.3 poll(timeout,unit)方法
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
   
    long nanos = unit.toNanos(timeout);		// 转换时间单位
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();				// 加锁,可中断唤醒
    try {
   
        while (count == 0) {
   	// 如果没数据
            if (nanos <= 0)		// 也没时间了,就不阻塞,返回null
                return null;
            // 有时间,就挂起消费者线程一段时间
            nanos = notEmpty.awaitNanos(nanos);
        }
        return dequeue();	// 取数据
    } finally {
   
        lock.unlock();
    }
}
2.3.4 take方法
public E take() throws InterruptedException {
   
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
   
        while (count == 0)	// 使用while,防止虚假唤醒
            notEmpty.await();
        return dequeue();
    } finally {
   
        lock.unlock();
    }
}
2.3.5 虚假唤醒

阻塞队列中,如果需要线程挂起操作,判断有无数据的位置采用的是while循环,为什么不使用if?文章来源地址https://www.toymoban.com/news/detail-694621.html

  • 首先肯定不能换成 if 逻辑判断,比如:有线程 A、B、E、C,其中 ABE 是生产者,C是消费者。假如线程的队列是满的,AB挂起
// E,拿到锁资源,还没有走while判断
while (count == items.length)
    // A醒了
    // B挂起
    notFull.await();
enqueue(e)
  • C 此时消费一条数据,执行 notFull.signal() 唤醒一个线程,A线程被唤醒;E走判断发现有空余位置,可以添加数据到队列,则E添加数据,走enqueue。
  • 如果判断是 if,A 在E释放锁资源后,拿到锁资源,直接走 enqueue 方法,此时 A线程就是在 putIndex 的位置,覆盖掉之前的数据,会造成数据安全问题。

3 LinkedBlockingQueue

3.1 LinkedBlockingQueue的底层实现

  • 查看 LinkedBlockingQueue 是如何存储数据,以及如何实现链表结构的。
// Node对象就是存储数据的单位
static class Node<E> {
   
    // 存储的数据
    E item;
	// 指向下一个数据的指针
    Node<E> next;
	// 有参构造
    Node(E x) {
    item = x; }
}
  • 查看LinkedBlockingQueue的有参构造
// 可以手动指定LinkedBlockingQueue的长度,如果没有指定,默认为Integer.MAX_VALUE
public LinkedBlockingQueue(int capacity) {
   
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    // 在初始化时,构建一个item为null的节点,作为head和last,这种node可以成为哨兵Node,
    // 如果没有哨兵节点,那么在获取数据时,需要判断head是否为null,才能找next
    // 如果没有哨兵节点,那么在添加数据时,需要判断last是否为null,才能找next
    last = head = new Node<E>(null);
}
  • 查看LinkedBlockingQueue的其他属性
// 因为是链表,没有想数组的length属性,基于AtomicInteger来记录长度
private final AtomicInteger count = new AtomicInteger();
transient Node<E> head;	// 链表的头

到了这里,关于多线程与高并发——并发编程(4)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Java高可用监控中间件

    Prometheus是一个开源的系统监控和警报工具集,用于收集、存储和查询时间序列数据。 它支持多种数据源,可以监控分布式系统的各种指标,并提供强大的查询语言和灵活的警报规则。 Grafana是一个开源的可视化监控和分析平台,可以与多个数据源集成,包括Prometheus、InfluxD

    2024年01月23日
    浏览(47)
  • 【Java面试丨消息中间件】Kafka

    1. 介绍 使用kafka在消息的收发过程都有可能会出现消息丢失 (1)生产者发送消息到broker丢失 (2)消息在broker中存储丢失 (3)消费者从broker接收消息丢失 2. 生产者发送消息到broker丢失 设置异步发送:同步发送会发生阻塞,一般使用异步发送方式发送消息 消息重试:由于网

    2024年02月11日
    浏览(47)
  • 深入理解Java消息中间件-组件-消息队列

    引言: 消息中间件在现代分布式系统中扮演着至关重要的角色,它解决了系统之间异步通信和解耦的需求。而在消息中间件的架构中,核心组件之一就是消息队列。本文将深入探讨消息队列的架构组件,帮助读者加深对消息中间件的理解和应用。 一、什么是消息队列 消息队列

    2024年04月27日
    浏览(51)
  • java后端技术汇总 + 中间件 + 架构思想

    1. 华为OD机考题 + 答案 2023华为OD统一考试(A+B卷)题库清单-带答案(持续更新) 2023年华为OD真题机考题库大全-带答案(持续更新) 2. 面试题 一手真实java面试题:2023年各大公司java面试真题汇总--持续更新 3. 技术知识 java后端技术汇总 + 中间件 + 架构思想 类型 难度 Spring、

    2024年02月13日
    浏览(76)
  • Java开发框架和中间件面试题(8)

    目录 82.Mybatis一级缓存,二级缓存? 83.Mybatis如何防止SQL注入? 84.mybatis中resultType和resultMap有什么区别? 85.如何在SpringBoot中禁用Actuator断点安全性? 86.什么是SpringBoot?SpringBoot有哪些优点? 87.SpringBoot中的监视器是什么? 88.什么是yaml文件? 89.如何使用SpringBoot实现异常处理?

    2024年02月03日
    浏览(47)
  • 【Java|多线程与高并发】线程安全问题以及synchronized使用实例

    Java多线程环境下,多个线程同时访问共享资源时可能出现的数据竞争和不一致的情况。 线程安全一直都是一个令人头疼的问题.为了解决这个问题,Java为我们提供了很多方式. synchronized、ReentrantLock类等。 使用线程安全的数据结构,例如ConcurrentHashMap、ConcurrentLinkedQueue等

    2024年02月09日
    浏览(47)
  • 【Java|多线程与高并发】线程的中断的两种方法

    线程中断是指在一个线程执行的过程中,强制终止该线程的执行。虽说是中断,但本质上是让run方法快点执行完,而不是run方法执行到一半,强制结束. 本文主要介绍线程中断的两种方法 看下面这段代码: 运行结果: 看下面这张图: 在这段代码中,定义了一个 flag 的标志位,在 线程

    2024年02月08日
    浏览(44)
  • linux傻瓜式安装Java环境及中间件

    1.下载 2.追加 使用vim /etc/profile 编辑profile文件 输入 3.刷新测试 1.docker卸载 2.docker安装 注:-p 将宿主机端口与容器端口映射,冒号左侧是宿主机端口,右侧是容器端 1.傻瓜式安装安装并配置 注:requirepass 123456 redis密码 1.安装 2.启动并配置密码 3.配置web页面插件 1.拉取 2.配置

    2024年02月05日
    浏览(46)
  • 【Java|多线程与高并发】定时器(Timer)详解

    在Java中,定时器 Timer 类是用于执行定时任务的工具类。它允许你安排一个任务在未来的某个时间点执行,或者以固定的时间间隔重复执行。 在服务器开发中,客户端向服务器发送请求,然后等待服务器响应. 但服务器什么时候返回响应,并不确定. 但也不能让客户端一直等下去

    2024年02月07日
    浏览(46)
  • 深入理解Java消息中间件-消息追踪和日志管理

    在分布式系统中,确保系统的稳定性和可靠性是一个极其复杂和挑战性的任务。随着系统的规模增大和组件间交互的复杂性提升,问题定位和故障排除变得越来越困难。在这种背景下,消息追踪和日志管理成为了日常工作中不可或缺的一部分,它们为开发和运维团队提供了宝

    2024年04月28日
    浏览(50)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包