一文彻底弄懂ConcurrentHashMap

这篇具有很好参考价值的文章主要介绍了一文彻底弄懂ConcurrentHashMap。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

导读

一文彻底弄懂ConcurrentHashMap

前言

前面分析 HashMap 的文章,提到过 HashMap 是线程不安全的,其主要原因还是在链表扩容。

JDK1.7 的 HashMap 的扩容操作用到两个方法:resize()transfer(),主要是重新定位每个桶的下标,并采用头插法将元素迁移到新的数组中。假设有多个线程都对 HashMap 进行扩容,有可能扩容后的散列表中链表成环,如果这时候执行 get() 方法查询,就会导致死循环。并且 HashMap 在并发执行 put() 操作时扩容,可能会导致结点丢失,会导致数据不准的情况。

JDK1.8 中已经很好地解决了JDK1.7 中的问题了,如果看过 JDK1.8 的源码,你就会发现 JDK1.8 中摒弃了 transfer 函数的,是直接在 resize 函数中完成数据的迁移,并且在 JDK1.8 中插入元素时是使用的尾插法。但是,JDK1.8 会带来数据覆盖的线程不安全。

我们知道了 HashMap 是非线程安全的,如果想在多线程下安全的操作 map,有哪些解决方法呢?

  • Hashtable:它是线程安全的,它在所有涉及到多线程操作的都加上了 synchronized 关键字来锁住整个 table,这就意味着所有的线程都在竞争一把锁,在多线程的环境下,它是安全的,但是无疑是效率低下的。
  • Collections.synchronizedMap:里面使用对象锁来保证多线程场景下,操作安全,本质也是对 HashMap 进行全表锁!在竞争激烈的多线程环境下性能依然也非常差,所以也不推荐使用!

ConcurrentHashMap 是 HashMap 的线程安全版本,内部也是使用数组 + 链表 + 红黑树的结构来存储元素。相比于同样线程安全的 HashtableCollections.synchronizedMap 来说,效率等各方面都有极大地提高。本文我们一起学习一下 ConcurrentHashMap

先简单介绍一下各种锁,以便下文讲到相关概念时能有个印象。

synchronized

Java 中的关键字,内部实现为监视器锁,主要是通过对象监视器在对象头中的字段来表明的。

synchronized 从旧版本到现在已经做了很多优化了,在运行时会有三种存在方式:偏向锁,轻量级锁,重量级锁

  • 偏向锁:是指一段同步代码一直被一个线程访问,那么这个线程会自动获取锁,降低获取锁的代价。
  • 轻量级锁:是指当锁是偏向锁的时候,被另外的线程所访问,偏向锁就会升级为轻量级锁,其他线程会通过自旋的形式尝试获取锁,不会阻塞,从而提高性能。
  • 重量级锁:若当前只有一个等待线程,则该线程通过自旋进行等待。但是当自旋超过一定的次数,或者一个线程在持有锁,一个在自旋,又有第三个来访时,轻量级锁升级为重量级锁。重量级锁会使其他线程阻塞,性能降低。

volatile(非锁)

Java 中的关键字,当多个线程访问同一个变量时,一个线程修改了这个变量的值,其他线程能够立即看得到修改的值。

volatilesynchronized 最主要的区别是 volatile 仅能实现变量的修改可见性,不能保证原子性;而 synchronized 则可以保证变量的修改可见性和原子性。

自旋锁

自旋锁,是指尝试获取锁的线程不会阻塞,而是循环的方式不断尝试,这样的好处是减少线程的上下文切换带来的开锁,提高性能,缺点是循环会消耗 CPU。

分段锁

分段锁,是一种锁的设计思路,它细化了锁的粒度,主要运用在 ConcurrentHashMap 中,实现高效的并发操作,当操作不需要更新整个数组时,就只锁数组中的一项就可以了。

ReentrantLock

可重入锁,是指一个线程获取锁之后再尝试获取锁时会自动获取锁,可重入锁的优点是避免死锁。其实,synchronized 也是可重入锁。

CAS

CAS 全称 Compare And Swap(比较与交换),是一种无锁算法。其作用是让 CPU 比较内存中某个值是否和预期的值相同,如果相同则将这个值更新为新值,不相同则不做更新。

CAS 算法涉及到三个操作数:

  • 需要读写的内存值 V
  • 进行比较的值 A
  • 要写入的新值 B

当且仅当 V 的值等于 A 时,CAS 通过原子方式用新值 B 来更新 V 的值(「比较+更新」整体是一个原子操作),否则不会执行任何操作。

java.util.concurrent 包中的原子类就是通过 CAS 来实现了乐观锁。

ConcurrentHashMap 实现原理

ConcurrentHashMap 在 JDK1.7 和 JDK1.8 的实现方式是不同的。

先来看下 JDK1.7。

JDK1.7 中的 ConcurrentHashMap

JDK1.7 中的 ConcurrentHashMap 是由 Segment 数组结构HashEntry 数组结构组成,即 ConcurrentHashMap 把哈希桶数组切分成小数组(Segment ),每个小数组有 n 个 HashEntry 组成。

如下图所示,首先将数据分为一段一段的存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一段数据时,其他段的数据也能被其他线程访问,实现了真正的并发访问。

一文彻底弄懂ConcurrentHashMap

整个 ConcurrentHashMap 由一个个 Segment 组成,Segment 代表”部分“或”一段“的意思,所以很多地方都会将其描述为分段锁

简单来说就是,ConcurrentHashMap 是一个 Segment数组Segment 通过继承 ReentrantLock 来进行加锁,所以每次需要加锁的操作锁住的是一个 Segment,这样只要保证每个 Segment 是线程安全的,也就实现了全局的线程安全。

Segment

Segment 本身就相当于一个 HashMap 对象。

HashMap 一样,Segment 包含一个 HashEntry 数组,数组中的每一个 HashEntry 既是一个键值对,也是一个链表的头节点。

单一的 Segment 结构如下:

一文彻底弄懂ConcurrentHashMap

像这样的 Segment 对象,在 ConcurrentHashMap 集合中有多少个呢?

concurrencyLevel:并行级别、并发数、Segment 数,怎么翻译不重要,理解它。默认是 16,也就是说默认情况下 ConcurrentHashMap 有 16 个 Segments,所以理论上,这个时候,最多可以同时支持 16 个线程并发写,只要它们的操作分别分布在不同的 Segment 上。这个值可以在初始化的时候设置为其他值,但是一旦初始化以后,它是不可以扩容的。最大不超过 MAX_SEGMENTS 也就是2^16

默认是 16 个,共同保存在一个名为 segments 的数组当中。 因此整个ConcurrentHashMap 的结构如下:

一文彻底弄懂ConcurrentHashMap

可以说,ConcurrentHashMap 是一个二级哈希表。在一个总的哈希表下面,有若干个子哈希表。

这样的二级结构,和数据库的水平拆分有些相似。

ConcurrentHashMap 并发读写的几种情况

Case1:不同 Segment 的并发写入「可以并发执行」

一文彻底弄懂ConcurrentHashMap

Case2:同一 Segment 的一写一读「可以并发执行」

一文彻底弄懂ConcurrentHashMap

Case3:同一 Segment 的并发写入

一文彻底弄懂ConcurrentHashMap

Segment 的写入是需要上锁的,因此对同一 Segment 的并发写入会被阻塞。

由此可见,ConcurrentHashMap 中每个 Segment各自持有一把锁。在保证线程安全的同时降低了锁的粒度,让并发操作效率更高。

接下来,我们就来看下 ConcurrentHashMap 读写的过程。

get 方法

  1. 计算 key 的 hash 值
  2. 通过 hash 值,定位到对应的 Segment 对象
  3. 再次通过 hash 值,定位到 Segment 当中数组的具体位置。

put 方法

  1. 计算 key 的 hash 值
  2. 通过 hash 值,定位到对应的 Segment 对象 s
  3. 插入新值到槽 s 中
    • 获取可重入锁
    • 再次通过 hash 值,定位到 Segment 当中数组的具体位置
    • 插入或覆盖 HashEntry 对象
    • 释放锁

JDK1.8 中的 ConcurrentHashMap

虽然 JDK1.7 中的 ConcurrentHashMap 解决了 HashMap 并发的安全性,但是当冲突的链表过长时,在查询遍历的时候依然很慢!

在 JDK1.8 中,HashMap 引入了红黑二叉树设计,当冲突的链表长度大于 8时,会将链表转化成红黑二叉树结构,红黑二叉树又被称为平衡二叉树,在查询效率方面,又大大的提高了不少。

在数据结构上, JDK1.8 中的 ConcurrentHashMap 选择了与 HashMap 相同的数组+链表+红黑树结构;在锁的实现上,抛弃了原有的 Segment 分段锁,采用 CAS + synchronized 实现更加细粒度的锁。

将锁的级别控制在了更细粒度的哈希桶数组元素级别,也就是说只需要锁住这个链表头节点或者红黑树的根节点,就不会影响其他的哈希桶数组元素的读写,大大提高了并发度。

我们再来看看 JDK1.8 中 ConcurrentHashMap 的整体结构,内容如下:

一文彻底弄懂ConcurrentHashMap
下面来具体分析一下 JDK1.8 中的 ConcurrentHashMap 源码。

put 方法

public V put(K key, V value) {
    return putVal(key, value, false);
}

final V putVal(K key, V value, boolean onlyIfAbsent) {
    // key和value都不能为null
    if (key == null || value == null) throw new NullPointerException();
    // 计算hash值
    int hash = spread(key.hashCode());
    //用于记录相应链表的长度
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        // 如果tab未初始化或者个数为0,则初始化node数组
        if (tab == null || (n = tab.length) == 0)
            // 初始化数组,后面会详细介绍
            tab = initTable();
        // 找该 hash 值对应的数组下标,得到第一个节点 f
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            // 如果数组该位置为空,
            // 如果使用CAS插入元素时,发现已经有元素了,则进入下一次循环,重新操作
            // 如果使用CAS插入元素成功,则break跳出循环,流程结束
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        // 如果要插入的元素所在的tab的第一个元素的hash是MOVED,说明其他线程再扩容,则当前线程帮忙一起迁移元素
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else {
           // 到这里就是说,f 是该位置的头结点,而且不为空,也不在迁移元素也就是存在hash冲突,锁住链表或者红黑树的头结点。
            V oldVal = null;
            // 获取数组该位置的头结点的监视器锁
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    // 头结点的 hash 值大于 0,说明是链表
                    if (fh >= 0) {
                        // 用于累加,记录链表的长度
                        binCount = 1;
                        //遍历链表
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            // 如果发现了"相等"的 key,判断是否要进行值覆盖,然后也就可以 break 了
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            //插入链表尾部
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    //红黑树
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        // 调用红黑树的插值方法插入新节点
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            // 如果binCount不为0,说明成功插入了元素或者寻找到了元素
            if (binCount != 0) {
               // 判断是否要将链表转换为红黑树,临界值和 HashMap 一样,也是 8
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
     // 成功插入元素,元素个数加1(是否要扩容在这个里面)
    addCount(1L, binCount);
    return null;
}

当进行 put 操作时,流程大概可以分如下几个步骤:

  • 首先会判断 key、value 是否为空,如果为空就抛异常;
  • 接着会判断容器数组是否为空,如果为空就调用 initTable() 初始化数组;
  • 进一步判断,如果没有 hash 冲突就直接 CAS 插入
  • 在接着判断 f.hash == -1 是否成立,如果成立,说明当前 f 是ForwardingNode 节点,表示有其它线程正在扩容,则一起进行扩容操作
  • 如果存在 hash 冲突,就加锁来保证线程安全,这里有两种情况,一种是链表形式就直接遍历到尾端插入,一种是红黑树就按照红黑树结构插入
  • 节点插入完成之后,如果该链表的数量大于阈值 8,就要先转换成黑红树的结构,break 再一次进入循环
  • 最后,插入完成之后调用 addCount() 方法统计 size,并且检查是否需要扩容。

put 的主流程看完了,但是至少留下了几个问题:

  • 初始化
  • 扩容
  • 帮助数据迁移

这些我们都会在下面进行一一介绍。

initTable 初始化数组

private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) {
        // 如果sizeCtl<0说明正在初始化或者扩容,让出CPU
        if ((sc = sizeCtl) < 0)
            Thread.yield(); // lost initialization race; just spin
        // CAS 一下,将 sizeCtl 设置为 -1,代表抢到了锁,设置成功则当前线程进入初始化
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            // 如果原子更新失败则说明有其它线程先一步进入初始化了,则进入下一次循环
            // 如果下一次循环时还没初始化完毕,则sizeCtl<0进入上面if的逻辑让出CPU
            // 如果下一次循环更新完毕了,则table.length!=0,退出循环
            try {
                // 再次检查table是否为空,防止ABA问题
                if ((tab = table) == null || tab.length == 0) {
                    // DEFAULT_CAPACITY 默认初始容量是 16
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    // 新建数组
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    // 把tab数组赋值给table 
                    table = tab = nt;
                    // 设置sc为数组长度的0.75倍
                    // n - (n >>> 2) = n - n/4 = 0.75n
                    // 可见这里装载因子和扩容门槛都是写死了的
                    // 这也正是没有threshold和loadFactor属性的原因
                    sc = n - (n >>> 2);
                }
            } finally {
                // 把sc赋值给sizeCtl,这时存储的是扩容门槛
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}

初始化流程如下:

  • 使用 CAS 锁控制只有一个线程初始化 tab 数组;
  • sizeCtl 在初始化后存储的是扩容门槛;
  • 扩容门槛写死的是 tab 数组大小的 0.75 倍,tab 数组大小即 map 的容量,也就是最多存储多少个元素。

sizeCtl 这个属性使用的场景很多,不过只要跟着文章的思路来,就不会被它搞晕了。

helpTransfer 协助扩容

final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
    Node<K,V>[] nextTab; int sc;
    if (tab != null && (f instanceof ForwardingNode) &&
            (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
        int rs = resizeStamp(tab.length);
        // sizeCtl<0,说明正在扩容
        while (nextTab == nextTable && table == tab &&
                (sc = sizeCtl) < 0) {
            if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || transferIndex <= 0)
                break;
            // 扩容线程数加1
            if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                // 当前线程帮忙迁移元素
                transfer(tab, nextTab);
                break;
            }
        }
        return nextTab;
    }
    return table;
}

其实 helpTransfer() 方法的目的就是调用多个工作线程一起帮助进行扩容,这样的效率就会更高,而不是只有检查到要扩容的那个线程进行扩容操作,其他线程就要等待扩容操作完成才能工作。既然这里涉及到扩容的操作,我们也一起来看看扩容方法 transfer()

transfer 扩容

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    int n = tab.length, stride;
    // stride 在单核下直接等于 n,多核模式下为 (n>>>3)/NCPU,最小值是 16
    // stride 可以理解为”步长“,有 n 个位置是需要进行迁移的,
    // 将这 n 个任务分为多个任务包,每个任务包有 stride 个任务
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; // subdivide range
    if (nextTab == null) {            // initiating
        try {
            @SuppressWarnings("unchecked")
            // 容量翻倍
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
            nextTab = nt;
        } catch (Throwable ex) {      // try to cope with OOME
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        nextTable = nextTab;
        transferIndex = n;
    }
    int nextn = nextTab.length;
    // ForwardingNode 翻译过来就是正在被迁移的 Node
    // 这个构造方法会生成一个Node,key、value 和 next 都为 null,关键是 hash 为 MOVED
    // 后面我们会看到,原数组中位置 i 处的节点完成迁移工作后,
    // 就会将位置 i 处设置为这个 ForwardingNode,用来告诉其他线程该位置已经处理过了

    // 所以它其实相当于是一个标志。
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
    // advance 指的是做完了一个位置的迁移工作,可以准备做下一个位置的了
    boolean advance = true;
    boolean finishing = false; // to ensure sweep before committing nextTab
    // i 是位置索引,bound 是边界,注意是从后往前
    for (int i = 0, bound = 0;;) {
        Node<K,V> f; int fh;
        // 下面这个 while 真的是不好理解
        // advance 为 true 表示可以进行下一个位置的迁移了
        // 简单理解结局:i 指向了 transferIndex,bound 指向了 transferIndex-stride
        while (advance) {
            int nextIndex, nextBound;
            if (--i >= bound || finishing)
                advance = false;
            // 将 transferIndex 值赋给 nextIndex
            // 这里 transferIndex 一旦小于等于 0,说明原数组的所有位置都有相应的线程去处理了
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            }
            else if (U.compareAndSwapInt
                     (this, TRANSFERINDEX, nextIndex,
                      nextBound = (nextIndex > stride ?
                                   nextIndex - stride : 0))) {
                // 看括号中的代码,nextBound 是这次迁移任务的边界,注意,是从后往前                   
                bound = nextBound;
                i = nextIndex - 1;
                advance = false;
            }
        }
        if (i < 0 || i >= n || i + n >= nextn) {
            int sc;
            if (finishing) {
                // 所有的迁移操作已经完成
                nextTable = null;
                // 将新的 nextTab 赋值给 table 属性,完成迁移
                table = nextTab;
                // 重新计算 sizeCtl:n 是原数组长度,所以 sizeCtl 得出的值将是新数组长度的 0.75 倍
                sizeCtl = (n << 1) - (n >>> 1);
                // 跳出死循环
                return;
            }
            // 这里使用 CAS 操作对 sizeCtl 进行减 1,代表做完了属于自己的任务
            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                    return;
                
                // 到这里,说明 (sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT,
                // 也就是说,所有的迁移任务都做完了,也就会进入到上面的 if(finishing){} 分支了
                finishing = advance = true;
                i = n; // recheck before commit
            }
        }
        // 如果位置 i 处是空的,没有任何节点,那么放入刚刚初始化的 ForwardingNode ”空节点“
        else if ((f = tabAt(tab, i)) == null)
            advance = casTabAt(tab, i, null, fwd);
        // 该位置处是一个 ForwardingNode,代表该位置已经迁移过了
        else if ((fh = f.hash) == MOVED)
            advance = true; // already processed
        else {
            // 对数组该位置处的结点加锁,开始处理数组该位置处的迁移工作
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    Node<K,V> ln, hn;
                    // 头结点的 hash 大于 0,说明是链表的 Node 节点
                    if (fh >= 0) {
                        // 下面这一块和 Java7 中的 ConcurrentHashMap 迁移是差不多的,
                        // 需要将链表一分为二,
                        // 找到原链表中的 lastRun,然后 lastRun 及其之后的节点是一起进行迁移的
                        // lastRun 之前的节点需要进行克隆,然后分到两个链表中
                        int runBit = fh & n;
                        Node<K,V> lastRun = f;
                        for (Node<K,V> p = f.next; p != null; p = p.next) {
                            int b = p.hash & n;
                            if (b != runBit) {
                                runBit = b;
                                lastRun = p;
                            }
                        }
                        if (runBit == 0) {
                            ln = lastRun;
                            hn = null;
                        }
                        else {
                            hn = lastRun;
                            ln = null;
                        }
                        for (Node<K,V> p = f; p != lastRun; p = p.next) {
                            int ph = p.hash; K pk = p.key; V pv = p.val;
                            if ((ph & n) == 0)
                                ln = new Node<K,V>(ph, pk, pv, ln);
                            else
                                hn = new Node<K,V>(ph, pk, pv, hn);
                        }
                        // 其中的一个链表放在新数组的位置 i
                        setTabAt(nextTab, i, ln);
                        // 另一个链表放在新数组的位置 i+n
                        setTabAt(nextTab, i + n, hn);
                        // 将原数组该位置处设置为 fwd,代表该位置已经处理完毕,
                        // 其他线程一旦看到该位置的 hash 值为 MOVED,就不会进行迁移了
                        setTabAt(tab, i, fwd);
                        // advance 设置为 true,代表该位置已经迁移完毕
                        advance = true;
                    }
                    // 如果是TreeBin,则按照红黑树进行处理,处理逻辑与上面一致
                    else if (f instanceof TreeBin) {
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> lo = null, loTail = null;
                        TreeNode<K,V> hi = null, hiTail = null;
                        int lc = 0, hc = 0;
                        for (Node<K,V> e = t.first; e != null; e = e.next) {
                            int h = e.hash;
                            TreeNode<K,V> p = new TreeNode<K,V>
                                (h, e.key, e.val, null, null);
                            if ((h & n) == 0) {
                                if ((p.prev = loTail) == null)
                                    lo = p;
                                else
                                    loTail.next = p;
                                loTail = p;
                                ++lc;
                            }
                            else {
                                if ((p.prev = hiTail) == null)
                                    hi = p;
                                else
                                    hiTail.next = p;
                                hiTail = p;
                                ++hc;
                            }
                        }
                        // 如果一分为二后,节点数少于 6,那么将红黑树转换回链表
                        ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                            (hc != 0) ? new TreeBin<K,V>(lo) : t;
                        hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                            (lc != 0) ? new TreeBin<K,V>(hi) : t;
                        // 将 ln 放置在新数组的位置 i
                        setTabAt(nextTab, i, ln);
                        // 将 hn 放置在新数组的位置 i+n
                        setTabAt(nextTab, i + n, hn);
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                }
            }
        }
    }
}

说到底,transfer 这个方法并没有实现所有的迁移任务,每次调用这个方法只实现了 transferIndex 往前 stride 个位置的迁移工作,其他的需要由外围来控制。

addCount 扩容判断

private final void addCount(long x, int check) {
    CounterCell[] as; long b, s;
    //更新baseCount,table的数量,counterCells表示元素个数的变化
    if ((as = counterCells) != null ||
            !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
        CounterCell a; long v; int m;
        boolean uncontended = true;
        if (as == null || (m = as.length - 1) < 0 ||
                (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                !(uncontended =
                        U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
            //如果多个线程都在执行,则CAS失败,执行fullAddCount,全部加入count
            fullAddCount(x, uncontended);
            return;
        }
        if (check <= 1)
            return;
        // 计算元素个数
        s = sumCount();
    }
    // check >=0 表示需要进行扩容操作
    if (check >= 0) {
        Node<K,V>[] tab, nt; int n, sc;
        // 如果元素个数达到了扩容门槛,则进行扩容
        // 注意,正常情况下sizeCtl存储的是扩容门槛,即容量的0.75倍
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                (n = tab.length) < MAXIMUM_CAPACITY) {
            // rs是扩容时的一个标识
            int rs = resizeStamp(n);
            if (sc < 0) {
                // sc<0说明正在扩容中
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                    // 扩容已经完成了,退出循环
                    break;
                // 扩容未完成,则当前线程加入迁移元素中
                // 并把扩容线程数加1
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                    (rs << RESIZE_STAMP_SHIFT) + 2))
                // 进入迁移元素
                transfer(tab, null);
            // 重新计算元素个数
            s = sumCount();
        }
    }
}

大致流程如下:

  • 利用 CAS 方法更新 baseCount 的值
  • 检查是否需要扩容,check >= 0,需要检查;
  • 如果满足扩容条件,判断当前是否正在扩容,如果是正在扩容就一起扩容;
  • 如果不在扩容,将 sizeCtl 更新为负数,并进行扩容处理。

put 的流程现在已经分析完了,你可以从中发现,他在并发处理中使用的是乐观锁,当有冲突的时候才进行并发处理,而且流程步骤很清晰,但是细节设计的很复杂,毕竟多线程的场景也复杂。

get 方法

public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    // 计算hash
    int h = spread(key.hashCode());
    // 判断数组是否为空,通过key定位到数组下标是否为空;
    if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
        // 判断头结点是否就是我们需要的节点
        if ((eh = e.hash) == h) {
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
         // hash小于0,说明是树或者正在扩容
        else if (eh < 0)
            // 使用find寻找元素,find的寻找方式依据Node的不同子类有不同的实现方式
            return (p = e.find(h, key)) != null ? p.val : null;
        // 遍历整个链表寻找元素
        while ((e = e.next) != null) {
            if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}

步骤如下:

  1. 计算 hash 值
  2. 根据 hash 值找到数组对应位置: (n - 1) & h
  3. 根据该位置处结点性质进行相应查找
    • 如果该位置为 null,那么直接返回 null 就可以了
    • 如果该位置处的节点刚好就是我们需要的,返回该节点的值即可
    • 如果该位置节点的 hash 值小于 0,说明正在扩容,或者是红黑树,后面我们再介绍 find 方法
    • 如果以上 3 条都不满足,那就是链表,进行遍历比对即可

总结

虽然 HashMap 在多线程环境下操作不安全,但是在 java.util.concurrent 包下,java 为我们提供了 ConcurrentHashMap 类,保证在多线程下 HashMap 操作安全!

在 JDK1.7 中,ConcurrentHashMap 采用了分段锁策略,将一个 HashMap 切割成 Segment 数组,其中 Segment 可以看成一个 HashMap, 不同点是 Segment 继承自 ReentrantLock,在操作的时候给 Segment 赋予了一个对象锁,从而保证多线程环境下并发操作安全。

但是 JDK1.7 中,HashMap 容易因为冲突链表过长,造成查询效率低,所以在 JDK1.8 中,HashMap 引入了红黑树特性,当冲突链表长度大于8时,会将链表转化成红黑二叉树结构。

在 JDK1.8 中,与此对应的 ConcurrentHashMap 也是采用了与 HashMap 类似的存储结构,但是 JDK1.8 中 ConcurrentHashMap 并没有采用分段锁的策略,而是在元素的节点上采用 CAS + synchronized 操作来保证并发的安全性,源码的实现比 JDK1.7 要复杂的多。文章来源地址https://www.toymoban.com/news/detail-454170.html

到了这里,关于一文彻底弄懂ConcurrentHashMap的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 彻底弄懂套接字

             套接字(socket) 是一种通信机制,凭借这种机制,客户/服务器系统的开发工作既可以在本地单机上进行,也可以跨网络进行。Linux所提供的功能(如打印服务、连接数据库和提供Web页面)和网络工具(如用于远程登录的rlogin和用于文件传输的ftp)通常都是通过套

    2024年01月17日
    浏览(30)
  • 彻底弄懂StringBuffer与StringBuilder的区别

    一问道StringBuffer与StringBuilder的区别,张口就来StringBuffer是线程安全的,因为它相关方法都加了synchronized ,StringBuilder线程不安全。没错,确实如此,但是我们查看过源码会发现StringBuffer是从jdk1.0就开始了,StringBuilder是从jdk1.5开始的。于是我就产生这样一个疑问,既然已

    2024年02月10日
    浏览(41)
  • [javascript核心-04]彻底弄懂Promise异步编程

    本文github地址:JavaScript_Interview_Everything 大前端知识体系与面试宝典,从前端到后端,全栈工程师,成为六边形战士 1.1. 快速上手 01快手上手.js 02.若传入的是另一个promise对象,则状态由传入的promise对象决定 03.若传入了一个实现了 then 方法的对象,则执行该then方法且由此方法

    2024年02月08日
    浏览(41)
  • 彻底弄懂Java中的MultipartFile接口和File类

            不管是在项目中还是日常需求,我们总是有操作文件数据的需求,Java中操作文件不可避免就要使用File类,而Spring中为我们提供了一个操作文件的接口,通过该接口我们可以获取用户上传的文件对象并写入文件系统中。 前言 一、File类 二、MultipartFile接口 2.1 源码和

    2024年02月04日
    浏览(46)
  • 一篇文章彻底弄懂Golang私有仓库配置问题

    一般通过 go get 拉取的是公共仓库的代码(如: github.com中的代码),是不需要任务权限就能拉下来。但当我们配置的私有仓库一般都需要用户名密码来登录才能拉取代码,所以私有仓库主要是解决认证问题。 在早期版本的Go中,“go get”用于构建和安装包。现在,“go get”专门用

    2024年01月16日
    浏览(47)
  • 一篇文章带你彻底弄懂Java的==符号

    本篇文章6735字,大概阅读时间20分钟。本文中使用到的JDK版本为1.8.0_301 目录 ==符号的定义 基本类型中==符号的判断 String类型中==符号的判断         在Java中==符号的作用分为两类:         1:==符号在八种基本类型的作用是比较对应基本类型的 数值是否相等         2:

    2024年02月08日
    浏览(54)
  • 彻底弄懂ip掩码中的网络地址、广播地址、主机地址

    IP掩码(或子网掩码)用于确定一个IP地址的网络部分和主机部分。它是一个32位的二进制数字,与IP地址做逻辑与运算,将IP地址划分为网络地址和主机地址两部分。 在理解IP地址段中的网络地址、广播地址和主机地址之前,首先需要了解IP地址的构成。IP地址由网络号和主机

    2024年02月06日
    浏览(44)
  • 【最短路算法】一篇文章彻底弄懂Dijkstra算法|多图解+代码详解

    博主简介: 努力学习的大一在校计算机专业学生,热爱学习和创作。目前在学习和分享:算法、数据结构、Java等相关知识。 博主主页: @是瑶瑶子啦 所属专栏: 算法 ;该专栏专注于蓝桥杯和ACM等算法竞赛🔥 近期目标: 写好专栏的每一篇文章 Dijkstra算法适用于 最短路问题

    2023年04月08日
    浏览(38)
  • socket send函数系列,彻底弄懂socket发送函数,可做参考手册

    目录 1.系统调用流程 2 发送函数系列 2.1 send函数 2.2 sendto函数 2.3 sendmsg函数 2.4 write函数 2.5 writev函数 图 1 2.1 send函数 send函数为套接字发送函数,需套接字进行connect操作才能使用。 2.2 sendto函数 sendto函数为套接字发送函数,不管套接字是否进行connect操作都能使用。 未connect:

    2024年02月07日
    浏览(46)
  • 一文弄懂 Diffusion Model

    Diffusion Model 首先定义了一个前向扩散过程,总共包含 T 个时间步,如下图所示: 最左边的蓝色圆圈  x0   表示真实自然图像,对应下方的狗子图片。 最右边的蓝色圆圈  xT   则表示纯高斯噪声,对应下方的噪声图片。 最中间的蓝色圆圈  xt   则表示加了噪声的  x0   ,对应

    2024年02月13日
    浏览(44)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包