java并发编程:LinkedBlockingQueue详解

这篇具有很好参考价值的文章主要介绍了java并发编程:LinkedBlockingQueue详解。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。


简介

在集合框架里,想必大家都用过ArrayList和LinkedList,也经常在面试中问到他们之间的区别。ArrayList和ArrayBlockingQueue一样,内部基于数组来存放元素,而LinkedBlockingQueue则和LinkedList一样,内部基于链表来存放元素。

LinkedBlockingQueue实现了BlockingQueue接口,这里放一张类的继承关系图:
java并发编程:LinkedBlockingQueue详解
LinkedBlockingQueue不同于ArrayBlockingQueue,它如果不指定容量,默认为Integer.MAX_VALUE,也就是无界队列。所以为了避免队列过大造成机器负载或者内存爆满的情况出现,我们在使用的时候建议手动传一个队列的大小。

源码分析

属性

/**
 * 节点类,用于存储数据
 */
static class Node<E> {
    E item;
    Node<E> next;

    Node(E x) { item = x; }
}

/** 阻塞队列的大小,默认为Integer.MAX_VALUE */
private final int capacity;

/** 当前阻塞队列中的元素个数 */
private final AtomicInteger count = new AtomicInteger();

/**
 * 阻塞队列的头结点
 */
transient Node<E> head;

/**
 * 阻塞队列的尾节点
 */
private transient Node<E> last;

/** 获取并移除元素时使用的锁,如take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();

/** notEmpty条件对象,当队列没有数据时用于挂起执行删除的线程 */
private final Condition notEmpty = takeLock.newCondition();

/** 添加元素时使用的锁如 put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();

/** notFull条件对象,当队列数据已满时用于挂起执行添加的线程 */
private final Condition notFull = putLock.newCondition();

从上面的属性我们知道,每个添加到LinkedBlockingQueue队列中的数据都将被封装成Node节点,添加的链表队列中,其中head和last分别指向队列的头结点和尾结点。与ArrayBlockingQueue不同的是,LinkedBlockingQueue内部分别使用了takeLock 和 putLock 对并发进行控制,也就是说,添加和删除操作并不是互斥操作,可以同时进行,这样也就可以大大提高吞吐量。

这里如果不指定队列的容量大小,也就是使用默认的Integer.MAX_VALUE,如果存在添加速度大于删除速度时候,有可能会内存溢出,这点在使用前希望慎重考虑。

另外,LinkedBlockingQueue对每一个lock锁都提供了一个Condition用来挂起和唤醒其他线程。

构造函数

public LinkedBlockingQueue() {
    // 默认大小为Integer.MAX_VALUE
    this(Integer.MAX_VALUE);
}

public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    last = head = new Node<E>(null);
}

public LinkedBlockingQueue(Collection<? extends E> c) {
    this(Integer.MAX_VALUE);
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        int n = 0;
        for (E e : c) {
            if (e == null)
                throw new NullPointerException();
            if (n == capacity)
                throw new IllegalStateException("Queue full");
            enqueue(new Node<E>(e));
            ++n;
        }
        count.set(n);
    } finally {
        putLock.unlock();
    }
}

默认的构造函数和最后一个构造函数创建的队列大小都为Integer.MAX_VALUE,只有第二个构造函数用户可以指定队列的大小。第二个构造函数最后初始化了last和head节点,让它们都指向了一个元素为null的节点。
java并发编程:LinkedBlockingQueue详解
最后一个构造函数使用了putLock来进行加锁,但是这里并不是为了多线程的竞争而加锁,只是为了放入的元素能立即对其他线程可见。

同样,LinkedBlockingQueue也有着和ArrayBlockingQueue一样的方法,我们先来看看入队列的方法。

入队方法

LinkedBlockingQueue提供了多种入队操作的实现来满足不同情况下的需求,入队操作有如下几种:

  • void put(E e);
  • boolean offer(E e);
  • boolean offer(E e, long timeout, TimeUnit unit)。

put(E e)

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    // 获取锁中断
    putLock.lockInterruptibly();
    try {
        //判断队列是否已满,如果已满阻塞等待
        while (count.get() == capacity) {
            notFull.await();
        }
        // 把node放入队列中
        enqueue(node);
        c = count.getAndIncrement();
        // 再次判断队列是否有可用空间,如果有唤醒下一个线程进行添加操作
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    // 如果队列中有一条数据,唤醒消费线程进行消费
    if (c == 0)
        signalNotEmpty();
}

小结put方法来看,它总共做了以下情况的考虑:

  • 队列已满,阻塞等待。
  • 队列未满,创建一个node节点放入队列中,如果放完以后队列还有剩余空间,继续唤醒下一个添加线程进行添加。如果放之前队列中没有元素,放完以后要唤醒消费线程进行消费。

很清晰明了是不是?

我们来看看该方法中用到的几个其他方法,先来看看enqueue(Node node)方法:

private void enqueue(Node<E> node) {
    last = last.next = node;
}

该方法可能有些同学看不太懂,我们用一张图来看看往队列里依次放入元素A和元素B,毕竟无图无真相:
java并发编程:LinkedBlockingQueue详解
接下来我们看看signalNotEmpty,顺带着看signalNotFull方法。

private void signalNotEmpty() {
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
}

private void signalNotFull() {
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        notFull.signal();
    } finally {
        putLock.unlock();
    }
}

为什么要这么写?因为signal的时候要获取到该signal对应的Condition对象的锁才行。

offer(E e)

public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    final AtomicInteger count = this.count;
    if (count.get() == capacity)
        return false;
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        // 队列有可用空间,放入node节点,判断放入元素后是否还有可用空间,
        // 如果有,唤醒下一个添加线程进行添加操作。
        if (count.get() < capacity) {
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        }
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
    return c >= 0;
}

可以看到offer仅仅对put方法改动了一点点,当队列没有可用元素的时候,不同于put方法的阻塞等待,offer方法直接方法false。

offer(E e, long timeout, TimeUnit unit)

public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

    if (e == null) throw new NullPointerException();
    long nanos = unit.toNanos(timeout);
    int c = -1;
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        // 等待超时时间nanos,超时时间到了返回false
        while (count.get() == capacity) {
            if (nanos <= 0)
                return false;
            nanos = notFull.awaitNanos(nanos);
        }
        enqueue(new Node<E>(e));
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
    return true;
}

该方法只是对offer方法进行了阻塞超时处理,使用了Condition的awaitNanos来进行超时等待,这里为什么要用while循环?因为awaitNanos方法是可中断的,为了防止在等待过程中线程被中断,这里使用while循环进行等待过程中中断的处理,继续等待剩下需等待的时间。

出队方法

入队列的方法说完后,我们来说说出队列的方法。LinkedBlockingQueue提供了多种出队操作的实现来满足不同情况下的需求,如下:

  • E take();
  • E poll();
  • E poll(long timeout, TimeUnit unit);

take()

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        // 队列为空,阻塞等待
        while (count.get() == 0) {
            notEmpty.await();
        }
        x = dequeue();
        c = count.getAndDecrement();
        // 队列中还有元素,唤醒下一个消费线程进行消费
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    // 移除元素之前队列是满的,唤醒生产线程进行添加元素
    if (c == capacity)
        signalNotFull();
    return x;
}

take方法看起来就是put方法的逆向操作,它总共做了以下情况的考虑:

  • 队列为空,阻塞等待。
  • 队列不为空,从队首获取并移除一个元素,如果消费后还有元素在队列中,继续唤醒下一个消费线程进行元素移除。如果放之前队列是满元素的情况,移除完后要唤醒生产线程进行添加元素。

我们来看看dequeue方法

private E dequeue() {
    // 获取到head节点
    Node<E> h = head;
    // 获取到head节点指向的下一个节点
    Node<E> first = h.next;
    // head节点原来指向的节点的next指向自己,等待下次gc回收
    h.next = h; // help GC
    // head节点指向新的节点
    head = first;
    // 获取到新的head节点的item值
    E x = first.item;
    // 新head节点的item值设置为null
    first.item = null;
    return x;
}

可能有些童鞋链表算法不是很熟悉,我们可以结合注释和图来看就清晰很多了。
java并发编程:LinkedBlockingQueue详解
其实这个写法看起来很绕,我们其实也可以这么写:

private E dequeue() {
    // 获取到head节点
    Node<E> h = head;
    // 获取到head节点指向的下一个节点,也就是节点A
    Node<E> first = h.next;
    // 获取到下下个节点,也就是节点B
    Node<E> next = first.next;
    // head的next指向下下个节点,也就是图中的B节点
    h.next = next;
    // 得到节点A的值
    E x = first.item;
    first.item = null; // help GC
    first.next = first; // help GC
    return x;
}

poll()

public E poll() {
    final AtomicInteger count = this.count;
    if (count.get() == 0)
        return null;
    E x = null;
    int c = -1;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        if (count.get() > 0) {
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        }
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}

poll方法去除了take方法中元素为空后阻塞等待这一步骤,这里也就不详细说了。同理,poll(long timeout, TimeUnit unit)也和offer(E e, long timeout, TimeUnit unit)一样,利用了Condition的awaitNanos方法来进行阻塞等待直至超时。这里就不列出来说了。

获取元素方法

public E peek() {
    if (count.get() == 0)
        return null;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        Node<E> first = head.next;
        if (first == null)
            return null;
        else
            return first.item;
    } finally {
        takeLock.unlock();
    }
}

加锁后,获取到head节点的next节点,如果为空返回null,如果不为空,返回next节点的item值。

删除元素方法

public boolean remove(Object o) {
    if (o == null) return false;
    // 两个lock全部上锁
    fullyLock();
    try {
        // 从head开始遍历元素,直到最后一个元素
        for (Node<E> trail = head, p = trail.next;
             p != null;
             trail = p, p = p.next) {
            // 如果找到相等的元素,调用unlink方法删除元素
            if (o.equals(p.item)) {
                unlink(p, trail);
                return true;
            }
        }
        return false;
    } finally {
        // 两个lock全部解锁
        fullyUnlock();
    }
}

void fullyLock() {
    putLock.lock();
    takeLock.lock();
}

void fullyUnlock() {
    takeLock.unlock();
    putLock.unlock();
}

因为remove方法使用两个锁全部上锁,所以其他操作都需要等待它完成,而该方法需要从head节点遍历到尾节点,所以时间复杂度为O(n)。我们来看看unlink方法。

void unlink(Node<E> p, Node<E> trail) {
    // p的元素置为null
    p.item = null;
    // p的前一个节点的next指向p的next,也就是把p从链表中去除了
    trail.next = p.next;
    // 如果last指向p,删除p后让last指向trail
    if (last == p)
        last = trail;
    // 如果删除之前元素是满的,删除之后就有空间了,唤醒生产线程放入元素
    if (count.getAndDecrement() == capacity)
        notFull.signal();
}

问题

看源码的时候,我给自己抛出了一个问题。

  • 为什么dequeue里的h.next不指向null,而指向h?
  • 为什么unlink里没有p.next = null或者p.next = p这样的操作?
    这个疑问一直困扰着我,直到我看了迭代器的部分源码后才豁然开朗,下面放出部分迭代器的源码:
private Node<E> current;
private Node<E> lastRet;
private E currentElement;

Itr() {
    fullyLock();
    try {
        current = head.next;
        if (current != null)
            currentElement = current.item;
    } finally {
        fullyUnlock();
    }
}

private Node<E> nextNode(Node<E> p) {
    for (;;) {
        // 解决了问题1
        Node<E> s = p.next;
        if (s == p)
            return head.next;
        if (s == null || s.item != null)
            return s;
        p = s;
    }
}

迭代器的遍历分为两步,第一步加双锁把元素放入临时变量中,第二部遍历临时变量的元素。也就是说remove可能和迭代元素同时进行,很有可能remove的时候,有线程在进行迭代操作,而如果unlink中改变了p的next,很有可能在迭代的时候会造成错误,造成不一致问题。这个解决了问题2。

而问题1其实在nextNode方法中也能找到,为了正确遍历,nextNode使用了 s == p的判断,当下一个元素是自己本身时,返回head的下一个节点。

总结

LinkedBlockingQueue是一个阻塞队列,内部由两个ReentrantLock来实现出入队列的线程安全,由各自的Condition对象的await和signal来实现等待和唤醒功能。它和ArrayBlockingQueue的不同点在于:文章来源地址https://www.toymoban.com/news/detail-479341.html

  • 队列大小有所不同,ArrayBlockingQueue是有界的初始化必须指定大小,而LinkedBlockingQueue可以是有界的也可以是无界的(Integer.MAX_VALUE),对于后者而言,当添加速度大于移除速度时,在无界的情况下,可能会造成内存溢出等问题。
  • 数据存储容器不同,ArrayBlockingQueue采用的是数组作为数据存储容器,而LinkedBlockingQueue采用的则是以Node节点作为连接对象的链表。
  • 由于ArrayBlockingQueue采用的是数组的存储容器,因此在插入或删除元素时不会产生或销毁任何额外的对象实例,而LinkedBlockingQueue则会生成一个额外的Node对象。这可能在长时间内需要高效并发地处理大批量数据的时,对于GC可能存在较大影响。
  • 两者的实现队列添加或移除的锁不一样,ArrayBlockingQueue实现的队列中的锁是没有分离的,即添加操作和移除操作采用的同一个ReenterLock锁,而LinkedBlockingQueue实现的队列中的锁是分离的,其添加采用的是putLock,移除采用的则是takeLock,这样能大大提高队列的吞吐量,也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。

到了这里,关于java并发编程:LinkedBlockingQueue详解的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【Java 并发编程】一文详解 Java 内置锁 synchronized

    存在共享数据; 多线程共同操作共享数。 synchronized 可以保证在同一时刻,只有一个线程可以执行某个方法或某个代码块,同时 synchronized 可以保证一个线程的变化可见(可见性),即可以代替 volatile。 多线程编程中,有可能会出现多个线程同时访问同一个共享、可变

    2024年02月02日
    浏览(35)
  • 【Java 并发编程】Java 线程本地变量 ThreadLocal 详解

    先一起看一下 ThreadLocal 类的官方解释: 用大白话翻译过来,大体的意思是: ThreadLoal 提供给了 线程局部变量 。同一个 ThreadLocal 所包含的对象,在不同的 Thread 中有不同的副本。这里有几点需要注意: 因为每个 Thread 内有自己的实例副本,且 该副本只能由当前 Thread 使用 。

    2024年02月04日
    浏览(66)
  • java并发编程 AbstractQueuedSynchronizer(AQS)详解一

    AQS在类的注释上说的已经很明白,提供一个框架,用于实现依赖先进先出(FIFO)等待队列的阻塞锁和相关同步器(信号量、事件等)。此类被设计做为大多数类型的同步器的一个有用的基础类,这些同步器依赖于单个原子int值(state字段)来表示状态。 java 并发编程系列文章

    2024年02月10日
    浏览(45)
  • Java并发编程第6讲——线程池(万字详解)

    Java中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池,本篇文章就详细介绍一下。 定义:线程池是一种用于管理和重用线程的技术(池化技术),它主要用于提高多线程应用程序的性能和效率。 ps:线程池、连接池、内存池

    2024年02月11日
    浏览(39)
  • 【JUC并发编程】集合类安全问题

    代码演示 上面代码会报错并发修改异常“java.util.ConcurrentModificationException” 1. 使用Vector类【不推荐】 Vector类的add方法是同步方法,但是效率很低 代码演示 2. 使用Collections工具类 使用 Collections.synchronizedList()方法 将普通的ArrayList类转换为安全的集合类 代码演示 3. 使用CopyOnW

    2024年02月12日
    浏览(41)
  • 【并发编程】Java的Future机制详解(Future接口和FutureTask类)

    目录 一、彻底理解Java的Future模式 二、为什么出现Future机制 2.1 Future 类有什么用? 三、Future的相关类图 2.1 Future 接口 2.2 FutureTask 类 五、FutureTask源码分析 5.1 state字段 5.2 其他变量 5.3 CAS工具初始化 5.4 构造函数 5.5 jdk1.8和之前版本的区别 六、Callable 和 Future 有什么关系? 七、

    2024年02月03日
    浏览(38)
  • JUC并发编程学习(五)集合类不安全

    List不安全 单线程情况下集合类和很多其他的类都是安全的,因为同一时间只有一个线程在对他们进行修改,但是如果是多线程情况下,那么集合类就不一定是安全的,可能会出现一条线程正在修改的同时另一条线程启动来对这个集合进行修改,这种情况下就会导致发生并发

    2024年02月06日
    浏览(49)
  • ElasticSearch 中的中文分词器以及索引基本操作详解,Java高并发编程详解深入理解pdf

    PUT book/_settings { “number_of_replicas”: 2 } 修改成功后,如下: 更新分片数也是一样。 2.3 修改索引的读写权限 索引创建成功后,可以向索引中写入文档: PUT book/_doc/1 { “title”:“三国演义” } 写入成功后,可以在 head 插件中查看: 默认情况下,索引是具备读写权限的,当然这

    2024年04月09日
    浏览(49)
  • JUC并发编程-集合不安全情况以及Callable线程创建方式

    1)List 不安全 ArrayList 在并发情况下是不安全的 解决方案 : 1.Vector 2.Collections.synchonizedList() 3. CopyOnWriteArrayList 核心思想 是,如果有 多个调用者(Callers)同时要求相同的资源 (如内存或者是磁盘上的数据存储),他们 会共同获取相同的指针指向相同的资源 , 直到某个调用者

    2024年01月23日
    浏览(46)
  • 【多线程及高并发 六】并发集合及线程池详解

    👏作者简介:大家好,我是若明天不见,BAT的Java高级开发工程师,CSDN博客专家,后端领域优质创作者 📕系列专栏:多线程及高并发系列 📕其他专栏:微服务框架系列、MySQL系列、Redis系列、Leetcode算法系列、GraphQL系列 📜如果感觉博主的文章还不错的话,请👍点赞收藏关

    2024年02月01日
    浏览(38)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包