ConcurrentHashMap线程安全,底层数组+链表+红黑树。
思想,分而治之。
JDK7有Segment对象,分段锁
JDK8没有了这个对象
总结,
首先计算hash,
如果集合没有元素,开始initTable方法初始化,这里扩容讲,这里只是初始化扩容,不是重点,重点是真正多线程扩容那里。
否则找到数组下标元素进行判断,这里找到这个位置上的数组用到了unsafe类中方法保证可见性,如果这个位置为null,新建Node,这里新建用cas去修改,保证原子性。这样当有其他线程进入后,发现cas操作失败,就会跳出,再次循环判断
这样比方第一个线程先将值放入,第二个就不会走到这里,
如果有元素,判断是链表还是树
如果是链表,比较key是否相等,相等,赋值,不相等,new一个Node节点.这里有个binCount记录链表大小,然后判断binCount判断是否需要转红黑树
如果是TreeBin对象,对象里有TreeNode,感觉套了个壳子,包括整颗红黑树,这样加锁加在了TreeBin上,这里和hashmap不一样,hashmap里面就是TreeNode,hashmap不用加锁,线程不安全,所以不用再有个TreeBin对象。然后将这个元素放到红黑树中
初始化扩容
发现是空,开始初始化,tab = initTable();只有一个线程可以进行初始化
sizeCtl有多个情况,首先默认是0,cas操作先将其改成-1,-1表示有线程正在扩容,然后扩容sc=12,sizeCtl也变成域值12。这里如果另一个线程发现sizectl=-1,然后暂时放弃cpu资源,再去和其他线程去竞争cpu资源Thread.yield(); 如果一个线程初始化完成,其他线程发现已经初始化了,就退出了
这个方法是初始化扩容,只有一个线程可以,第一次扩容初始化sizeCtl变成12
private transient volatile int sizeCtl;
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
//如果不是null,说明其他线程已经初始化好了,直接退出方法
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
//另一个线程从主内存拿到的sizeCtl=-1,走到这里,暂时放弃cpu资源,再去和其他线程去竞争cpu资源
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
//第一次集合空,初始化扩容走这里,对sizeCtl进行cas做-1操作
try {
if ((tab = table) == null || tab.length == 0) {
//这里第一次sc=0,n=16
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
//这里n-3/4n=1/4 n,就是0.75n=12
sc = n - (n >>> 2);
}
} finally {
//第一次扩容初始化sizeCtl变成12
sizeCtl = sc;
}
break;
}
}
return tab;
}
put方法源码
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
//初始化数组方法,这里只有一个线程可以初始化
tab = initTable();
//这里用到了Unsafe类中方法
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//这里用了cas去修改
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
//其他线程如果失败,重新开始for循环
break; // no lock when adding to empty bin
}
//判断是否是-1,-1表示集合在进行扩容,两个线程同时进行扩容,
//这里这个线程也进行扩容,这样扩容更快
//这个方法帮助扩容
else if ((fh = f.hash) == MOVED)
//这里扩容时候会有一个ForwordingNode对象,这个对象hash就是MOVED=-1
//put时候发现是forwordingNode对象,说明这个对象已经被转移到新数组上去了
//调用help方法帮助转移元素
tab = helpTransfer(tab, f);
else {
//这里说明该位置有元素,这里要判断到底是放到哪里,是红黑树还是链表
V oldVal = null;
//这里又涉及到并发问题,这里1.7用的分段锁,1.8用了synchronized
//对链表里面第一个节点f进行加锁
synchronized (f) {
//判断是否==f,加锁过程可能有其他线程操作修改了,如果修改了,就不走这里了
if (tabAt(tab, i) == f) {
//fh >= 0,表示f是链表上面的节点
if (fh >= 0) {
//binCount=1
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
//否则是树里面的一个节点
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
//binCount大于8,改成红黑树
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
//真正启动扩容,首先要统计size
addCount(1L, binCount);
return null;
}
这里转红黑树
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
if (tab != null) {
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
tryPresize(n << 1);
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
synchronized (b) {
if (tabAt(tab, index) == b) {
//这里改成双向链表
TreeNode<K,V> hd = null, tl = null;
for (Node<K,V> e = b; e != null; e = e.next) {
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);
if ((p.prev = tl) == null)
hd = p;
else
tl.next = p;
tl = p;
}
setTabAt(tab, index, new TreeBin<K,V>(hd));
}
}
}
}
}
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}
这里计数
addCount方法,cas并发高效率也不行,所以用另外一个数组CounterCell[],分成了CountCell,对它进行+1操作。如果一个线程用不了cell,还是对baseCount操作
jdk1.8 版本中,对 ConcurrentHashMap 做了优化,取消了分段锁的设计,取而代之的是通过 cas 操作和 synchronized 关键字来实现优化,而扩容的时候也利用了一种分而治之的思想来提升扩容效率。
在 HashMap 中,调用 put 方法之后会通过 ++size 的方式来存储当前集合中元素的个数,但是在并发模式下,这种操作是不安全的,所以不能通过这种方式,那么是否可以通过 CAS 操作来修改 size 呢?
直接通过 CAS 操作来修改 size 是可行的,但是假如同时有非常多的线程要修改 size 操作,那么只会有一个线程能够替换成功,其他线程只能不断的尝试 CAS,这会影响到 ConcurrentHashMap 集合的性能,所以作者就想到了一个分而治之的思想来完成计数。
作者定义了一个数组来计数,而且这个用来计数的数组也能扩容,每次线程需要计数的时候,都通过随机的方式获取一个数组下标的位置进行操作,这样就可以尽可能的降低了锁的粒度,最后获取 size 时,则通过遍历数组来实现计数:
假如 CounterCell 为空且 CAS 失败,那么就会通过调用 fullAddCount 方法来对 CounterCell 数组进行初始化。
多个线程同时put,有个size属性,我们要通过多个线程给size这个属性+1.
这里出现并发,我们怎么控制安全,可以用cas控制,但是很多线程的话,cas效率不高。所以这里用到了一个重要思想,分而治之。
源码里又定义了一个CounterCell数组。如果cas操作baseCount成功,+1.但是还有很多线程是失败的,这些线程每个线程会生成一个随机数ThreadLocalRandom.getProbe()
然后计算ThreadLocalRandom.getProbe() & m,得到一个下标值,就像上面那张图,cell数组长度是4,然后每个线程通过计算得到下标后,还是利用cas,对CounterCell这个中的value进行+1操作,然后计数结束开始统计,遍历数组CounterCell个数加上baseCount个数就是最终的总个数。这样效率会快,这就是分而治之思想。
如果cell是null,初始化,如果另一个线程初始化失败,就会去cas操作baseCount,不成功就从一开始继续循环。
如果cell不是null,扩容
这里面有一个比较重要的变量 cellsBusy,默认是 0,表示当前没有线程在初始化或者扩容,所以这里判断如果 cellsBusy==0,而 as 其实在前面就是把全局变量 CounterCell 数组的赋值,这里之所以再判断一次就是再确认有没有其他线程修改过全局数组 CounterCell,所以条件满足的话就会通过 CAS 操作修改 cellsBusy 为 1,表示当前自己在初始化了,其他线程就不能同时进来初始化操作了。
最后可以看到,默认是一个长度为 2 的数组,也就是采用了 2 个数组位置进行存储当前 ConcurrentHashMap 的元素数量。
@sun.misc.Contended static final class CounterCell {
volatile long value;
CounterCell(long x) { value = x; }
}
x=1
private final void addCount(long x, int check) {
//默认CounterCell数组是null
CounterCell[] as; long b, s;
//第一次进来countcell是null,cas修改baseCount=0+1=1,修改成功,不会走这里,走下面方法,和CounterCell数组没关系了,如果修改失败,false,取反是true。走进来了
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
//
CounterCell a; long v; int m;
boolean uncontended = true;
//ThreadLocalRandom.getProbe() & m算出数组下标,
//如果as是空的或者计算出的下标值是null或者cas操作失败
//cas是对CounterCell中的value属性进行+1操作,如果没有成功也是调用fullAddCount
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
//假如 CounterCell 为空且 CAS 失败,那么就会通过调用 fullAddCount 方法来对 CounterCell 数组进行初始化及扩容。
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();//计算当前集合个数有多少个
}
//扩容
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
//判断s集合总个数当前域值sizeCtl比较,while循环扩容
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
//算出一个负数
int rs = resizeStamp(n) << RESIZE_STAMP_SHIFT;
if (sc < 0) {
if (sc == rs + MAX_RESIZERS || sc == rs + 1 ||
(nt = nextTable) == null || transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
//sc=12时候走到这里,cas操作,算出的rs,只有一个线程可以将sc改成一个负数,另一个线程就会走到上面
else if (U.compareAndSwapInt(this, SIZECTL, sc, rs + 2))
//转移
transfer(tab, null);
s = sumCount();
}
}
}
//这个方法到底做什么的,对cell数组初始化数组长度2
//我们可以认为参数是 1,false
private final void fullAddCount(long x, boolean wasUncontended) {
int h;
//对线程生成一个随机数
if ((h = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit(); // force initialization
h = ThreadLocalRandom.getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
//循环
for (;;) {
CounterCell[] as; CounterCell a; int n; long v;
//不是null
if ((as = counterCells) != null && (n = as.length) > 0) {
//如果当前线程对应的下标值为null
if ((a = as[(n - 1) & h]) == null) {
//cellsBusy == 0表示没有线程使用
if (cellsBusy == 0) {
//new一个CounterCell
CounterCell r = new CounterCell(x); // Optimistic create
//cas改成0到1,占用。说明当前线程要用这个数组了,将new的对象放到这个数组中
if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean created = false;
try { // Recheck under lock
CounterCell[] rs; int m, j;
//这里判断是否有其他线程对其修改,若没有将new的cell放到数组中
if ((rs = counterCells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
//最后将标记归零
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail
//再去生成一个新的hash,再次循环,再次生成新的下标
wasUncontended = true; // Continue after rehash
//cas对CELLVALUE进行+1
else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
break;
else if (counterCells != as || n >= NCPU)
collide = false; // At max size or stale
else if (!collide)
collide = true;
else if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
try {
if (counterCells == as) {// Expand table unless stale
CounterCell[] rs = new CounterCell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
counterCells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = ThreadLocalRandom.advanceProbe(h);
}
//counterCells是null的情况,初始化,
//cellsBusy == 0 表示没有线程使用,cas操作0改成1表示有人用这个数组
else if (cellsBusy == 0 && counterCells == as &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean init = false;
try { // Initialize table
if (counterCells == as) {
//new一个大小为2的数组
CounterCell[] rs = new CounterCell[2];
//x=1,属性赋值,value为1,这种场景初始化成功,把x也加上了
rs[h & 1] = new CounterCell(x);
counterCells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
//如果两个线程去初始化cell数组,然后一个线程失败了,走到这里,还是去cas改变baseCount的值
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
break; // Fall back on using base
}
}
fwd 节点: 这个代表的是占位节点,最关键的就是这个节点的 hash 值为 -1,所以一旦发现某一个节点中的 hash 值为 -1 就可以知道当前节点已经被迁移了。
advance: 代表是否可以继续推进下一个槽位,只有当前槽位数据被迁移完成之后才可以设置为 true
finishing: 是否已经完成数据迁移。
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
//这里ForwordingNode对象,hash=-1
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
//一个线程转移有没有必要继续往前走去转移其他位置,
boolean advance = true;
//当前线程扩容工作是否完成,如果是true,代表当前线程做完了工作
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
//第一次默认advance=true
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
//第一次通过下面cas操作确认边界,通过下面算出当前线程要控制的区域。TRANSFERINDEX控制从哪个位置开始去计算(根据其去计算),其他线程cas失败后继续循环,从哪个位置开始计算,得到该线程的区域
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
//全部转移完成后,finishing修改为true,这里全部转移完成才会属性赋值table,转移结束。
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
//判断线程数,看整个集合是否全部完成,如果不等于,说明还有线程没有完成,这里判断sc是否是初始值,如果等于初始值,说明所有线程都结束了扩容,将advance和finish改成true。
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
synchronized (f) {
//将元素转移到新的上去,
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) {
//这里和hashmap扩容转移一样的
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
ConcurrentHashMap 的扩容
接下来我们回到 addCount 方法,这个方法在添加元素数量的同时,也会判断当前 ConcurrentHashMap 的大小是否达到了扩容的阈值,如果达到,需要扩容。
在多并发下如何实现扩容才不会冲突呢?可能大家都想到了采用分而治之的思想,在 ConcurrentHashMap 中采用的是分段扩容法,即每个线程负责一段,默认最小是 16,也就是说如果 ConcurrentHashMap 中只有 16 个槽位,那么就只会有一个线程参与扩容。如果大于 16 则根据当前 CPU 数来进行分配,最大参与扩容线程数不会超过 CPU 数。
初始化好了新的数组,接下来就是要准备确认边界。也就是要确认当前线程负责的槽位,确认好之后会从大到小开始往前推进,比如线程一负责 1-16,那么对应的数组边界就是 0-15,然后会从最后一位 15 开始迁移数据,每个线程操作自己的区域。
迁移完成后,会进行判断线程是否都执行完成,然后才进行tab属性赋值操作。
while 循环彻底结束之后,会进入到下面这个 if 判断,红框中就是当前线程自己完成了迁移之后,会将扩容线程数进行递减,递减之后会再次通过一个条件判断,这个条件其实就是前面进入扩容前条件的反推,如果成立说明扩容已经完成,扩容完成之后会将 nextTable 设置为 null,所以上面不满足扩容的第 4 个条件就是在这里设置的。文章来源:https://www.toymoban.com/news/detail-437233.html
重点:ConcurrentHashMap的计数和扩容
这里都是多线程,计数是通过一个cell数组去实现,扩容是每个线程都有自己的区域,然后进行迁移,将数据放到新数组上面去。都是分而治之的思想。文章来源地址https://www.toymoban.com/news/detail-437233.html
到了这里,关于ConcurrentHashMap底层源码解析的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!