安全无忧:Java并发集合容器的应用与实践

这篇具有很好参考价值的文章主要介绍了安全无忧:Java并发集合容器的应用与实践。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Java 常见并发容器

JDK 提供的这些容器大部分在 java.util.concurrent 包中:

  • ConcurrentHashMap : 线程安全的 HashMap
  • CopyOnWriteArrayList : 线程安全的 List,在读多写少的场合性能非常好,远远好于 Vector
  • ConcurrentLinkedQueue : 高效的并发队列,使用链表实现。可以看做一个线程安全的 LinkedList,这是一个非阻塞队列
  • BlockingQueue : 这是一个接口,JDK 内部通过链表、数组等方式实现了这个接口。表示阻塞队列,非常适合用于作为数据共享的通道,解决生产者消费者问题
  • ConcurrentSkipListMap : 跳表的实现。这是一个 Map,使用跳表的数据结构进行快速查找

ConcurrentHashMap

我们知道 HashMap 不是线程安全的,在并发场景下如果要保证一种可行的方式是使用 Collections.synchronizedMap() 方法来包装 HashMap。但这是通过使用一个全局的锁来同步不同线程间的并发访问,因此会带来不可忽视的性能问题。

所以就有了 HashMap 的线程安全版本 ConcurrentHashMap。

实现原理

在 ConcurrentHashMap 中,无论是读操作还是写操作都能保证很高的性能:在进行读操作时(几乎)不需要加锁,而在写操作时通过锁分段技术只对所操作的段加锁而不影响客户端对其它段的访问。

JDK1.7

底层数据结构:Segments 数组 + HashEntry 数组 + 链表,采用分段锁保证安全性。

容器中有多把锁,每一把锁锁一段数据,这样在多线程访问时不同段的数据时,就不会存在锁竞争了,这样便可以有效地提高并发效率。

JDK1.8

底层数据结构:Synchronized + CAS + Node + 红黑树,Node 的 val 和 next 都用 volatile 保证,保证可见性。查找、替换、赋值操作都使用 CAS。

CAS 是乐观锁,在一些场景中(并发不激烈的情况下)它比 Synchronized 和 ReentrentLock 的效率要高,当 CAS 保障不了线程安全的情况下(扩容、hash 冲突)转成Synchronized 来保证线程安全,大大提高了低并发下的性能。

读取操作的实现
public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    int h = spread(key.hashCode());
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {
        if ((eh = e.hash) == h) {
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        else if (eh < 0)
            return (p = e.find(h, key)) != null ? p.val : null;
        while ((e = e.next) != null) {
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}

get 操作全程无锁。get 操作可以无锁是由于 Node 元素的 val 和指针 next 是用 volatile 修饰的,在多线程环境下线程 A 修改节点的 val 或者新增节点的时候是对线程 B 可见的。

写入操作的实现
final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    // 根据key的进行hash操作,找到Node数组中的位置
    int hash = spread(key.hashCode());
    int binCount = 0;
    // 这边加了一个循环,就是不断的尝试,因为在table的初始化和casTabAt用到了compareAndSwapInt、compareAndSwapObject
    // 因为如果其他线程正在修改tab,那么尝试就会失败,所以这边要加一个for循环,不断的尝试
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        if (tab == null || (n = tab.length) == 0)
            // 先判断Node数组有没有初始化,如果没有初始化先初始化initTable();
            tab = initTable();
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            // 不存在hash冲突,即该位置是null,直接用CAS插入
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        else if ((fh = f.hash) == MOVED)
            // 如果tab[i]不为空并且hash值为MOVED(-1),说明该链表正在进行transfer操作,返回扩容完成后的table。
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            // 存在hash冲突,对链表的头节点或者红黑树的头节点加synchronized锁,进一步减少线程冲突
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) {
                        binCount = 1;
                        // 如果是链表,就遍历链表
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                // 如果key相同就执行覆盖操作
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                // 如果key不同就将元素插入到链表的尾部
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    else if (f instanceof TreeBin) {
                        // 如果是红黑树,就按照红黑树的结构进行插入。
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                              value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            if (binCount != 0) {
                // 链表长度大于8, Node数组的长度超过64时,会将链表的转化为红黑树
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    // 计数增加1,有可能触发transfer操作(扩容)。
    addCount(1L, binCount);
    return null;
}
  1. 根据 key 的进行 hash 操作,找到 Node 数组中的位置
  2. 然后加了一个死循环,意思是不断的尝试,因为在 table 的初始化和 casTabAt 用到了 compareAndSwapInt、compareAndSwapObject,因为如果其他线程正在修改table,那么尝试就会失败,所以要加一个 fo 循环,不断的尝试
  3. 之后判断 Node 数组有没有初始化,如果没有初始化先初始化 initTable
  4. 如果不存在 hash 冲突,即该位置是 null,直接用 CAS 插入,之后跳出循环
  5. 如果存在了冲突并且 hash 值为 MOVED(-1),说明该链表正在进行 transfer 操作,获取到扩容完成后的 table,进入下一次的循环
  6. 存在产生 hash 冲突,那么对链表的头节点或者红黑树的头节点加 synchronized 锁,进一步减少线程冲突。
    1. 如果是链表,就遍历链表,如果 key 相同就执行覆盖操作,key 不同就追加,之后跳出循环
    2. 如果是红黑树,就按照红黑树的结构进行插入,之后跳出循环
  7. 循环结束后判断链表长度是否大于8,当同时满足 Node 数组的长度超过 64 时,会将链表的转化为红黑树
  8. 最后计数增加 1,同时有可能触发 transfer 操作(扩容)

CopyOnWriteArrayList

在很多应用场景中,读操作可能会远远大于写操作。由于读操作根本不会修改原有的数据,因此对于每次读取都进行加锁其实是一种资源浪费。我们应该允许多个线程同时访问 List 的内部数据,毕竟读取操作是安全的。

这和我们之前在多线程章节讲过 ReentrantReadWriteLock 读写锁的思想非常类似,也就是读读共享、写写互斥、读写互斥、写读互斥。JDK 中提供了 CopyOnWriteArrayList 类比相比于在读写锁的思想又更进一步。为了将读取的性能发挥到极致,CopyOnWriteArrayList 读取是完全不用加锁的,并且更厉害的是:写入也不会阻塞读取操作。只有写入和写入之间需要进行同步等待。这样一来,读操作的性能就会大幅度提升。

实现原理

CopyOnWriteArrayList 类的所有可变操作(add,set 等等)都是通过创建底层数组的新副本来实现的。当 List 需要被修改的时候,我并不修改原有内容,而是对原有数据进行一次复制,将修改的内容写入副本。写完之后,再将修改完的副本替换原来的数据,这样就可以保证写操作不会影响读操作了。

从 CopyOnWriteArrayList 的名字就能看出 CopyOnWriteArrayList 是满足 CopyOnWrite 的。所谓 CopyOnWrite 也就是说:在计算机,如果你想要对一块内存进行修改时,我们不在原有内存块中进行写操作,而是将内存拷贝一份,在新的内存中进行写操作,写完之后呢,就将指向原来内存指针指向新的内存,原来的内存就可以被回收掉了。

读取操作的实现

private transient volatile Object[] array;

public E get(int index) {
    return get(getArray(), index);
}

@SuppressWarnings("unchecked")
private E get(Object[] a, int index) {
    return (E) a[index];
}

final Object[] getArray() {
    return array;
}

读取操作没有任何同步控制和锁操作,理由就是内部数组 array 不会发生修改,只会被另外一个 array 替换,因此可以保证数据安全。

写入操作的实现

public boolean add(E e) {
    final ReentrantLock lock = this.lock;
    // 加锁
    lock.lock();
    try {
        Object[] elements = getArray();
        int len = elements.length;
        // 拷贝新数组
        Object[] newElements = Arrays.copyOf(elements, len + 1);
        newElements[len] = e;
        setArray(newElements);
        return true;
    } finally {
        // 释放锁
        lock.unlock();
    }
}

CopyOnWriteArrayList 写入操作在添加元素的时候加了锁,保证了同步,避免了多线程写的时候会 copy 出多个副本出来。

ConcurrentLinkedQueue

Java 提供的线程安全的 Queue 可以分为阻塞队列非阻塞队列,其中阻塞队列的典型例子是 BlockingQueue,非阻塞队列的典型例子是 ConcurrentLinkedQueue,在实际应用中要根据实际需要选用阻塞队列或者非阻塞队列。 阻塞队列可以通过加锁来实现,非阻塞队列可以通过 CAS 操作实现。

从名字可以看出,ConcurrentLinkedQueue 这个队列使用链表作为其数据结构,它在高并发环境中性能表现得非常好。它之所有能有很好的性能,是因为其内部复杂的实现。

ConcurrentLinkedQueue 底层是通过 CAS 非阻塞算法来实现线程的。

阻塞队列

阻塞队列(BlockingQueue)被广泛使用在“生产者-消费者”问题中,其原因是 BlockingQueue 提供了可阻塞的插入和移除的方法。当试图向队列添加元素而队列已满,或是想从队列移出元素而队列为空的时候,阻塞队列导致线程阻塞。

BlockingQueue 是一个接口,继承自 Queue,所以其实现类也可以作为 Queue 的实现来使用,而 Queue 又继承自 Collection 接口。

阻塞队列的相关方法如下:

安全无忧:Java并发集合容器的应用与实践,Java:解锁编程的秘密宝藏,《Java核心技术》

  • 如果将队列当作线程管理工具来使用,将要用到 put 和 take 方法。
  • 当试图向满的队列中添加或从空的队列中移出元素时,add、remove 和 element 操作抛出异常。
  • 在一个多线程程序中, 队列会在任何时候空或满,因此,一定要使用 offer、poll 和 peek 方法作为替代。这些方法如果不能完成任务,只是给出一个错误提示而不会抛出异常。poll 和 peek 方法通过返回空来指示失败。因此,向这些队列中插入 null 值是非法的。

java.util.concurrent 包提供了阻塞队列的几个变种。下面主要介绍一下 3 个常见的实现类:ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue。

ArrayBlockingQueue

ArrayBlockingQueue 是 BlockingQueue 接口的有界队列实现类,底层采用数组来实现。ArrayBlockingQueue 在构造时需要指定容量,并且有一个可选的参数来指定是否需要公平性。若设置了公平参数,则那么等待了最长时间的线程会优先得到处理。通常,公平性会降低性能,只有在确实非常需要时才使用它。

ArrayBlockingQueue 一旦创建,容量不能改变。其并发控制采用可重入锁 ReentrantLock,不管是插入操作还是读取操作,都需要获取到锁才能进行操作。当队列容量满时,尝试将元素放入队列将导致操作阻塞,尝试从一个空队列中取一个元素也会同样阻塞。

/**
 * 指定容量构造
 */
public ArrayBlockingQueue(int capacity) {
    // 不设置公平参数
    this(capacity, false);
}

/**
 * 指定容量和公平参数的构造,fair为true则表示公平,等待越长越优先
 * 底层通过公平锁实现
 */
public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}


/**
 * 添加一个元素,如果队列满,则阻塞
 */
public void put(E e) throws InterruptedException {
    checkNotNull(e);
    // 可重入锁
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            // 如果队列满,则阻塞
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

/**
 * 移除并返回头元素,如果队列空,则阻塞
 */
public E take() throws InterruptedException {
    // 可重入锁
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            // 如果队列空,则阻塞
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}

LinkedBlockingQueue

LinkedBlockingQueue 底层基于单向链表实现的阻塞队列,可以当做无界队列也可以当做有界队列来使用,同样满足先进先出的特性,与 ArrayBlockingQueue 相比起来具有更高的吞吐量,为了防止 LinkedBlockingQueue 容量迅速增,损耗大量内存。通常在创建 LinkedBlockingQueue 对象时,会指定其大小,如果未指定,容量等于 Integer.MAX_VALUE

其并发控制采用可重入锁 ReentrantLock,不管是插入操作还是读取操作,都需要获取到锁才能进行操作。当队列容量满时,尝试将元素放入队列将导致操作阻塞,尝试从一个空队列中取一个元素也会同样阻塞。

LinkedBlockingDeque 相当于是 LinkedBlockingQueue 的变种,底层基于双向链表实现。

/**
 * 某种意义上的无界队列,容量是Integer.MAX_VALUE
 */
public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

/**
 * 指定容量构造,有界队列
 */
public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    last = head = new Node<E>(null);
}

/**
 * 添加一个元素,如果队列满,则阻塞
 */
public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        while (count.get() == capacity) {
            notFull.await();
        }
        enqueue(node);
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
}

/**
 * 移除并返回头元素,如果队列空,则阻塞
 */
public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        while (count.get() == 0) {
            notEmpty.await();
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}

PriorityBlockingQueue

PriorityBlockingQueue 是一个支持优先级的无界阻塞队列,它不是一个先进先出的队列。默认情况下元素采用自然顺序进行排序,也可以通过自定义类实现 compareTo() 方法来指定元素排序规则,或者初始化时通过构造器参数 Comparator 来指定排序规则。

PriorityBlockingQueue 底层基于数组实现,默认容量为 11。扩容机制:每次新增检测当前容量,已满的时候进行扩容,假设当前容量为 n,如果 n < 64,那么新容量是 2n+2,否则是 1.5n。

PriorityBlockingQueue 并发控制采用的是可重入锁 ReentrantLock,队列为无界队列(ArrayBlockingQueue 是有界队列,LinkedBlockingQueue 也可以通过在构造函数中传入 capacity 指定队列最大的容量,但是 PriorityBlockingQueue 只能指定初始的队列大小,后面插入元素的时候,如果空间不够的话会自动扩容)。

简单地说,它就是 PriorityQueue 的线程安全版本。不可以插入 null 值,同时,插入队列的对象必须是可比较大小的(comparable),否则报 ClassCastException 异常。它的插入操作 put 方法不会 block,因为它是无界队列(take 方法在队列为空的时候会阻塞)。

DelayQueue:基于 PriorityQueue 的一种特殊队列,里面的元素只有在其到期时才能从队列中取出。文章来源地址https://www.toymoban.com/news/detail-803190.html

到了这里,关于安全无忧:Java并发集合容器的应用与实践的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • JUC并发编程学习(五)集合类不安全

    List不安全 单线程情况下集合类和很多其他的类都是安全的,因为同一时间只有一个线程在对他们进行修改,但是如果是多线程情况下,那么集合类就不一定是安全的,可能会出现一条线程正在修改的同时另一条线程启动来对这个集合进行修改,这种情况下就会导致发生并发

    2024年02月06日
    浏览(50)
  • Java应用中文件上传安全性分析与安全实践

    ✨✨谢谢大家捧场,祝屏幕前的小伙伴们每天都有好运相伴左右,一定要天天开心哦!✨✨  🎈🎈作者主页: 喔的嘛呀🎈🎈 目录 引言 一. 文件上传的风险 二. 使用合适的框架和库 1. Spring框架的MultipartFile 2. Apache Commons FileUpload 3. Apache Commons IO 三. 文件上传路径的安全设置

    2024年04月25日
    浏览(49)
  • JUC并发编程-集合不安全情况以及Callable线程创建方式

    1)List 不安全 ArrayList 在并发情况下是不安全的 解决方案 : 1.Vector 2.Collections.synchonizedList() 3. CopyOnWriteArrayList 核心思想 是,如果有 多个调用者(Callers)同时要求相同的资源 (如内存或者是磁盘上的数据存储),他们 会共同获取相同的指针指向相同的资源 , 直到某个调用者

    2024年01月23日
    浏览(49)
  • 开源软件安全与应对策略探讨 - Java 机密计算技术应用实践

    据统计,90% 以上的应用都在使用第三方软件库,这些软件大部分都是开源的。与此同时,有超过一半的全球 500 强公司都在使用存在漏洞的开源软件。这幅漫画生动的描述了一个大型应用软件的组件架构,它可能建立在一个极其脆弱的开源组件基础之上,这个组件可能是二十

    2024年01月22日
    浏览(61)
  • 常用Java代码-Java中的并发集合(ConcurrentHashMap、CopyOnWriteArrayList等)

    在Java中,并发集合是一组为多线程环境设计的集合类,它们提供了线程安全的操作。这些集合类包括 ConcurrentHashMap , CopyOnWriteArrayList 等。以下是对这两个类的一个简单的代码解释。 1.ConcurrentHashMap ConcurrentHashMap 是Java并发包 java.util.concurrent 中的一个类,它提供了线程安全的

    2024年01月21日
    浏览(42)
  • JUC并发编程——集合类不安全及Callable(基于狂神说的学习笔记)

    List不安全 CopyOnWriteArrayList与vector对比,以下来自CSDN智能助手的回答: Java中的CopyOnWriteArrayList和Vector都是线程安全的动态数组,可以在多线程环境下使用。 CopyOnWriteArrayList使用了一种特殊的写时复制机制,它在对数组进行修改时,会创建一个新的副本,而不是直接在原数组上

    2024年02月07日
    浏览(73)
  • 解锁Spring Boot中的设计模式—04.桥接模式:探索【桥接模式】的奥秘与应用实践!

    桥接模式也称为桥梁模式、接口模式或者柄体(Handle and Body)模式,是将 抽象部分 与他的 具体实现部分 分离 ,使它们都可以独立地变化,通过 组合 的方式 建立 两个类之间的 联系 ,而不是继承。 桥接模式是一种结构型设计模式,旨在将抽象部分与实现部分分离,使它们

    2024年02月21日
    浏览(55)
  • 解锁Spring Boot中的设计模式—03.委派模式:探索【委派模式】的奥秘与应用实践!

    委派模式 是一种负责任务的调度和分配模式,类似于代理模式但更注重结果而非过程。它可以被视为一种特殊情况下的静态代理的全权代理,但并不属于GOF 23种设计模式之一,而是归类为行为型模式。 委派模式在Spring框架中广泛应用,其中最常见的例子是 DispatcherServlet ,它

    2024年02月19日
    浏览(56)
  • 关于并发编程与线程安全的思考与实践

    作者:京东健康 张娜 并发编程的意义是充分的利用处理器的每一个核,以达到最高的处理性能,可以让程序运行的更快。而处理器也为了提高计算速率,作出了一系列优化,比如: 1、硬件升级:为平衡CPU 内高速存储器和内存之间数量级的速率差,提升整体性能,引入了多

    2024年02月03日
    浏览(64)
  • 进阶JAVA篇- Collcetions 工具类与集合的并发修改异常问题

    目录         1.0 集合的并发修改问题                 1.1 如何解决集合的并发修改问题         2.0 Collcetions 工具类的说明         我们可以简单的认为,就是使用迭代器遍历集合时,又同时在删除集合中的数据,程序就会出现并发修改异常的错误。 代码如下: 运行

    2024年02月07日
    浏览(43)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包