From a45a284002780b48ae81f016fa662c32b4598295 Mon Sep 17 00:00:00 2001 From: xuke <792228584@qq.com> Date: Wed, 8 Jul 2020 11:29:55 +0800 Subject: [PATCH 1/3] =?UTF-8?q?=E6=B5=8B=E8=AF=95hashMap?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ...\347\240\201\345\210\206\346\236\220.adoc" | 77 +++++++++++++++++++ 1 file changed, 77 insertions(+) create mode 100644 "docs/Hashmap\346\272\220\347\240\201\345\210\206\346\236\220.adoc" diff --git "a/docs/Hashmap\346\272\220\347\240\201\345\210\206\346\236\220.adoc" "b/docs/Hashmap\346\272\220\347\240\201\345\210\206\346\236\220.adoc" new file mode 100644 index 0000000..b5e27c5 --- /dev/null +++ "b/docs/Hashmap\346\272\220\347\240\201\345\210\206\346\236\220.adoc" @@ -0,0 +1,77 @@ += HashMap + +== HashMap 问题集 + +public class HashMap extends AbstractMap + implements Map, Cloneable, Serializable + +HashMap的数据结构是链表散列,也就是数组+链表key位置,允许null +原因:避免hash冲突。 +hash冲突:两个不同的元素用过哈希函数得出的实际存储地址相同。 +哈希冲突的解决方案有多种:开放定址法(发生冲突,继续寻找下一块未被占用的存储地址),再散列函数法,链地址法 +HashMap采用链地址法:数组+链表 + +entry:有next节点,构成链表: +static class Entry implements Map.Entry { + final K key; + V value; + Entry next; + final int hash; + …… +} + +创建一个hashMap的时候,table = new Entry[capacity]; 创建一个Entry数组。 + +put的时候,根据key的hashCode重新计算hash值,根据hash值得到这个元素在数组中的位置 也就是下标,如果数组在该位置上已有其他元素,那么在这个位置上的元素将以链表的形式存放,新加入的在立案表头,先加入的在尾部 + +public V put(K key, V value) { + //其允许存放null的key和null的value,当其key为null时,调用putForNullKey方法,放入到table[0]的这个位置 + if (key == null) + return putForNullKey(value); + //通过调用hash方法对key进行哈希,得到哈希之后的数值。该方法实现可以通过看源码,其目的是为了尽可能的让键值对可以分不到不同的桶中 + int hash = hash(key); + //根据上一步骤中求出的hash得到在数组中是索引i + int i = indexFor(hash, table.length); + //如果i处的Entry不为null,则通过其next指针不断遍历e元素的下一个元素。 + for (Entry e = table[i]; e != null; e = e.next) { + Object k; + if (e.hash == hash && ((k = e.key) == key || key.equals(k))) { + V oldValue = e.value; + e.value = value; + e.recordAccess(this); + return oldValue; + } + } + + modCount++; + addEntry(hash, key, value, i); + return null; +} + +get方法: +get()方法也会是首先计算key的 hashCode 找到数组中对应位置的某一元素,通过key的equals方法在对应位置的链表中找到要的元素, +key(hashcode)-->hash-->indexFor-->最终索引位置,找到对应位置table[i],再查看是否有链表,遍历链表,通过key的equals方法比对查找对应的记录。 + +public V get(Object key) { + if (key == null) + return getForNullKey(); + Entry entry = getEntry(key); + + return null == entry ? null : entry.getValue(); +} + +final Entry getEntry(Object key) { + int hash = (key == null) ? 0 : hash(key); + for (Entry e = table[indexFor(hash, table.length)]; + e != null; + e = e.next) { + Object k; + if (e.hash == hash && + ((k = e.key) == key || (key != null && key.equals(k)))) + return e; + } + return null; +} + +hashmap为什么不是线程安全的 +HashMap在put的时候,插入的元素超过了容量(由负载因子决定)的范围就会触发扩容操作,就是rehash,这个会重新将原数组的内容重新hash到新的扩容数组中,在多线程的环境下,存在同时其他的元素也在进行put操作,如果hash值相同,可能出现同时在同一数组下用链表表示,造成闭环,导致在get时会出现死循环,所以HashMap是线程不安全的。 \ No newline at end of file From 901e154a9c450dd9401003118080d7d5bc5c359b Mon Sep 17 00:00:00 2001 From: xuke <792228584@qq.com> Date: Thu, 9 Jul 2020 00:19:10 +0800 Subject: [PATCH 2/3] =?UTF-8?q?ConcurrentHashMap=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/ConcurrentHashMap1.7.adoc | 450 +++++++++++++++++++++++++++++++++ 1 file changed, 450 insertions(+) create mode 100644 docs/ConcurrentHashMap1.7.adoc diff --git a/docs/ConcurrentHashMap1.7.adoc b/docs/ConcurrentHashMap1.7.adoc new file mode 100644 index 0000000..2f86d4f --- /dev/null +++ b/docs/ConcurrentHashMap1.7.adoc @@ -0,0 +1,450 @@ += ConcurrentHashMap + +hashMap是不支持并发的,所以有了ConcurrentHashMap +类介绍: +public class ConcurrentHashMap extends AbstractMap + implements ConcurrentMap, Serializable +ConcurrentHashMap在初始化时会要求初始化concurrencyLevel作为segment数组长度,即并发度,代表最多有多少个线程可以同时操作ConcurrentHashMap,默认是16,每个segment片段里面含有键值对HashEntry数组,是真正存放键值对的地方,这就是ConcurrentHashMap的数据结构。 + +既然ConcurrentHashMap的并发基础是segment,那我们就先来看一下这个类 +static final class Segment extends ReentrantLock implements Serializable { + + //尝试获取锁的最多尝试次数,即自旋次数 + static final int MAX_SCAN_RETRIES = + Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1; + + //HashEntry数组,也就是键值对数组 + transient volatile HashEntry[] table; + //元素的个数 + transient int count; + //segment中发生改变元素的操作的次数,如put/remove + transient int modCount; + //当table大小超过阈值时,对table进行扩容,值为capacity *loadFactor + transient int threshold; + //加载因子 + final float loadFactor; + + Segment(float lf, int threshold, HashEntry[] tab) { + this.loadFactor = lf; + this.threshold = threshold; + this.table = tab; + } +} + +可以看到,segment的并发基础是重入锁。想要访问Segment片段,必须获得此Segment的重入锁 +来看构造函数,有三个参数,加载因子,阈值,HashEntry数组。其中,前两个参数用于扩容,而HashEntry是Segment的基本数据结构。多个HashEntry可以形成链表,用于解决hash冲突,跟HashMap的数组链表结构相似。 + +static final class HashEntry { + //hash值 + final int hash; + //键 + final K key; + //值 + volatile V value; + //下一个键值对 + volatile HashEntry next; + + HashEntry(int hash, K key, V value, HashEntry next) { + this.hash = hash; + this.key = key; + this.value = value; + this.next = next; + } +} +可以看到,它是有next指针的 + +而ConcurrentHashMap的构造方法如下: +public class ConcurrentHashMap extends AbstractMap + implements ConcurrentMap, Serializable { + + private static final long serialVersionUID = 7249069246763182397L; + + //默认的初始容量 + static final int DEFAULT_INITIAL_CAPACITY = 16; + + //默认加载因子 + static final float DEFAULT_LOAD_FACTOR = 0.75f; + + //默认的并发度,也就是默认的Segment数组长度 + static final int DEFAULT_CONCURRENCY_LEVEL = 16; + + //最大容量,ConcurrentMap最大容量 + static final int MAXIMUM_CAPACITY = 1 << 30; + + //每个segment中table数组的长度,必须是2^n,最小为2 + static final int MIN_SEGMENT_TABLE_CAPACITY = 2; + + //允许最大segment数量,用于限定concurrencyLevel的边界,必须是2^n + static final int MAX_SEGMENTS = 1 << 16; // slightly conservative + + //非锁定情况下调用size和contains方法的重试次数,避免由于table连续被修改导致无限重试 + static final int RETRIES_BEFORE_LOCK = 2; + + //计算segment位置的掩码值 + final int segmentMask; + + //用于计算算segment位置时,hash参与运算的位数 + final int segmentShift; + + //Segment数组 + final Segment[] segments; + + + public ConcurrentHashMap(int initialCapacity, + float loadFactor, int concurrencyLevel) { + //参数校验 + if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) + throw new IllegalArgumentException(); + if (concurrencyLevel > MAX_SEGMENTS) + concurrencyLevel = MAX_SEGMENTS; + // Find power-of-two sizes best matching arguments + //找到一个大于等于传入的concurrencyLevel的2^n数,且与concurrencyLevel最接近 + //ssize作为Segment数组 + int sshift = 0; + int ssize = 1; + while (ssize < concurrencyLevel) { + ++sshift; + ssize <<= 1; + } + this.segmentShift = 32 - sshift; + this.segmentMask = ssize - 1; + if (initialCapacity > MAXIMUM_CAPACITY) + initialCapacity = MAXIMUM_CAPACITY; + // 计算每个segment中table的容量 + int c = initialCapacity / ssize; + if (c * ssize < initialCapacity) + ++c; + int cap = MIN_SEGMENT_TABLE_CAPACITY; + // 确保cap是2^n + while (cap < c) + cap <<= 1; + // create segments and segments[0] + // 创建segments并初始化第一个segment数组,其余的segment延迟初始化 + Segment s0 = + new Segment(loadFactor, (int)(cap * loadFactor), + (HashEntry[])new HashEntry[cap]); + Segment[] ss = (Segment[])new Segment[ssize]; + UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0] + this.segments = ss; + } + +} + +concurrencyLevel 参数表示期望并发的修改 ConcurrentHashMap 的线程数量,用于决定 Segment 的数量,通过算法可以知道就是找到最接近传入的concurrencyLevel的2的幂次方。而segmentMask 和 segmentShift看上去有点难以理解,作用主要是根据key的hash值做计算定位在哪个Segment片段 + +从ConcurrentHashMap 的数据结构我们不难才想到,put方法一定是分两步走的 +1.根据键的值定位键值对在那个segment片段 2.调用Segment的put方法 + +public V put(K key, V value) { + Segment s; + if (value == null) + throw new NullPointerException(); + //计算键的hash值 + int hash = hash(key); + //通过hash值运算把键值对定位到segment[j]片段上 + int j = (hash >>> segmentShift) & segmentMask; + //检查segment[j]是否已经初始化了,没有的话调用ensureSegment初始化segment[j] + if ((s = (Segment)UNSAFE.getObject // nonvolatile; recheck + (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment + s = ensureSegment(j); + //向片段中插入键值对 + return s.put(key, hash, value, false); + } + +我们从ConcurrentHashMap的构造函数可以发现Segment数组只初始化了Segment[0],其余的Segment是用到了在初始化,用了延迟加载的策略,而延迟加载调用的就是ensureSegment方法 + +private Segment ensureSegment(int k) { + final Segment[] ss = this.segments; + long u = (k << SSHIFT) + SBASE; // raw offset + Segment seg; + //按照segment[0]的HashEntry数组长度和加载因子初始化Segment[k] + if ((seg = (Segment)UNSAFE.getObjectVolatile(ss, u)) == null) { + Segment proto = ss[0]; // use segment 0 as prototype + int cap = proto.table.length; + float lf = proto.loadFactor; + int threshold = (int)(cap * lf); + HashEntry[] tab = (HashEntry[])new HashEntry[cap]; + if ((seg = (Segment)UNSAFE.getObjectVolatile(ss, u)) + == null) { // recheck + Segment s = new Segment(lf, threshold, tab); + while ((seg = (Segment)UNSAFE.getObjectVolatile(ss, u)) + == null) { + if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s)) + break; + } + } + } + return seg; + } + +调用Segment的put方法插入键值对到Segment的HashEntry数组 + +final V put(K key, int hash, V value, boolean onlyIfAbsent) { + //Segment继承ReentrantLock,尝试获取独占锁 + HashEntry node = tryLock() ? null : + scanAndLockForPut(key, hash, value); + V oldValue; + try { + HashEntry[] tab = table; + //定位键值对在HashEntry数组上的位置 + int index = (tab.length - 1) & hash; + //获取这个位置的第一个键值对 + HashEntry first = entryAt(tab, index); + for (HashEntry e = first;;) { + if (e != null) {//此处有链表结构,一直循环到e==null + K k; + //存在与待插入键值对相同的键,则替换value + if ((k = e.key) == key || + (e.hash == hash && key.equals(k))) { + oldValue = e.value; + if (!onlyIfAbsent) {//onlyIfAbsent默认为false + e.value = value; + ++modCount; + } + break; + } + e = e.next; + } + else { + //node不为null,设置node的next为first,node为当前链表的头节点 + if (node != null) + node.setNext(first); + //node为null,创建头节点,指定next为first,node为当前链表的头节点 + else + node = new HashEntry(hash, key, value, first); + int c = count + 1; + //扩容条件 (1)entry数量大于阈值 (2) 当前数组tab长度小于最大容量。满足以上条件就扩容 + if (c > threshold && tab.length < MAXIMUM_CAPACITY) + //扩容 + rehash(node); + else + //tab的index位置设置为node, + setEntryAt(tab, index, node); + ++modCount; + count = c; + oldValue = null; + break; + } + } + } finally { + unlock(); + } + return oldValue; + } +在不超过最大重试次数MAX_SCAN_RETRIES通过CAS尝试获取锁 +private HashEntry scanAndLockForPut(K key, int hash, V value) { + //first,e:键值对的hash值定位到数组tab的第一个键值对 + HashEntry first = entryForHash(this, hash); + HashEntry e = first; + HashEntry node = null; + int retries = -1; // negative while locating node + //线程尝试通过CAS获取锁 + while (!tryLock()) { + HashEntry f; // to recheck first below + if (retries < 0) { + //当e==null或key.equals(e.key)时retry=0,走出这个分支 + if (e == null) { + if (node == null) // speculatively create node + //初始化键值对,next指向null + node = new HashEntry(hash, key, value, null); + retries = 0; + } + else if (key.equals(e.key)) + retries = 0; + else + e = e.next; + } + //超过最大自旋次数,阻塞 + else if (++retries > MAX_SCAN_RETRIES) { + lock(); + break; + } + //头节点发生变化,重新遍历 + else if ((retries & 1) == 0 && + (f = entryForHash(this, hash)) != first) { + e = first = f; // re-traverse if entry changed + retries = -1; + } + } + return node; + } + +rehash(HashEntry node) + +用于对Segment的table数组进行扩容,扩容后的数组长度是原数组的两倍。 + +private void rehash(HashEntry node) { + //扩容前的旧tab数组 + HashEntry[] oldTable = table; + //扩容前数组长度 + int oldCapacity = oldTable.length; + //扩容后数组长度(扩容前两倍) + int newCapacity = oldCapacity << 1; + //计算新的阈值 + threshold = (int)(newCapacity * loadFactor); + //新的tab数组 + HashEntry[] newTable = + (HashEntry[]) new HashEntry[newCapacity]; + //新的掩码 + int sizeMask = newCapacity - 1; + //遍历旧的数组 + for (int i = 0; i < oldCapacity ; i++) { + //遍历数组的每一个元素 + HashEntry e = oldTable[i]; + if (e != null) { + //元素e指向的下一个节点,如果存在hash冲突那么e不为空 + HashEntry next = e.next; + //计算元素在新数组的索引 + int idx = e.hash & sizeMask; + // 桶中只有一个元素,把当前的e设置给新的table + if (next == null) // Single node on list + newTable[idx] = e; + //桶中有布置一个元素的链表 + else { // Reuse consecutive sequence at same slot + HashEntry lastRun = e; + // idx 是当前链表的头结点 e 的新位置 + int lastIdx = idx; + for (HashEntry last = next; + last != null; + last = last.next) { + //k是单链表元素在新数组的位置 + int k = last.hash & sizeMask; + //lastRun是最后一个扩容后不在原桶处的Entry + if (k != lastIdx) { + lastIdx = k; + lastRun = last; + } + } + //lastRun以及它后面的元素都在一个桶中 + newTable[lastIdx] = lastRun; + // Clone remaining nodes + //遍历到lastRun即可 + for (HashEntry p = e; p != lastRun; p = p.next) { + V v = p.value; + int h = p.hash; + int k = h & sizeMask; + HashEntry n = newTable[k]; + newTable[k] = new HashEntry(h, p.key, v, n); + } + } + } + } + //处理引起扩容的那个待添加的节点 + int nodeIndex = node.hash & sizeMask; // add the new node + node.setNext(newTable[nodeIndex]); + newTable[nodeIndex] = node; + //把Segment的table指向扩容后的table + table = newTable; + } + +get方法: +get获取元素不需要加锁,效率高,获取key定位到的segment片段还是遍历table数组的HashEntry元素时使用了UNSAFE.getObjectVolatile保证了能够无锁且获取到最新的volatile变量的值 +public V get(Object key) { + Segment s; // manually integrate access methods to reduce overhead + HashEntry[] tab; + //计算key的hash值 + int h = hash(key); + //根据hash值计算key在哪个segment片段 + long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; + //获取segments[u]的table数组 + if ((s = (Segment)UNSAFE.getObjectVolatile(segments, u)) != null && + (tab = s.table) != null) { + //遍历table中的HashEntry元素 + for (HashEntry e = (HashEntry) UNSAFE.getObjectVolatile + (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE); + e != null; e = e.next) { + K k; + //找到相同的key,返回value + if ((k = e.key) == key || (e.hash == h && key.equals(k))) + return e.value; + } + } + return null; + } + +size方法: +size方法用来计算ConcurrentHashMap中储存元素的个数。那么在统计所有的segment元素的个数是否都需要上锁呢?如果不上锁在统计的过程中可能存在其他线程并发存储/删除元素,而如果上锁又会降低读写效率。ConcurrentHashMap在实现时使用了折中的方法,它会无锁遍历三次把所有的segment的modCount加到sum里面,如果与前一次遍历结果相比sum没有改变那么说明这两次遍历没有其他线程修改ConcurrentHashMap,返回segment的count的和;如果每次遍历与上一次相比都不一样那就上锁进行同步。 + +public int size() { + // Try a few times to get accurate count. On failure due to + // continuous async changes in table, resort to locking. + final Segment[] segments = this.segments; + int size; + boolean overflow; // true if size overflows 32 bits + long sum; // sum of modCounts + long last = 0L; // previous sum + int retries = -1; // first iteration isn't retry + try { + for (;;) { + //达到RETRIES_BEFORE_LOCK,也就是三次 + if (retries++ == RETRIES_BEFORE_LOCK) { + for (int j = 0; j < segments.length; ++j) + ensureSegment(j).lock(); // force creation + } + sum = 0L; + size = 0; + overflow = false; + for (int j = 0; j < segments.length; ++j) { + Segment seg = segmentAt(segments, j); + //遍历计算segment的modCount和count的和 + if (seg != null) { + sum += seg.modCount; + int c = seg.count; + //是否溢出int范围 + if (c < 0 || (size += c) < 0) + overflow = true; + } + } + //last是上一次的sum值,相等跳出循环 + if (sum == last) + break; + last = sum; + } + } finally { + //解锁 + if (retries > RETRIES_BEFORE_LOCK) { + for (int j = 0; j < segments.length; ++j) + segmentAt(segments, j).unlock(); + } + } + return overflow ? Integer.MAX_VALUE : size; + } + +remove就不说了 + +isEmpty +检查ConcurrentHashMap是否为空。同样没有使用同步锁,通过两次遍历:1.确定每个segment是否为0,其中任何一个segment的count不为0,就返回,都为0,就累加modCount为sum.2.第一个循环执行完还没有推出,map可能为空,再做一次遍历,如果在这个过程中任何一个segment的count不为0返回false,同时sum减去每个segment的modCount,若循环执行完程序还没有退出,比较sum是否为0,为0表示两次检查没有元素插入,map确实为空,否则map不为空。 + +public boolean isEmpty() { + //累计segment的modCount值 + long sum = 0L; + final Segment[] segments = this.segments; + for (int j = 0; j < segments.length; ++j) { + Segment seg = segmentAt(segments, j); + if (seg != null) { + if (seg.count != 0) + return false; + sum += seg.modCount; + } + } + //再次检查 + if (sum != 0L) { // recheck unless no modifications + for (int j = 0; j < segments.length; ++j) { + Segment seg = segmentAt(segments, j); + if (seg != null) { + if (seg.count != 0) + return false; + sum -= seg.modCount; + } + } + if (sum != 0L) + return false; + } + return true; + } + +总之: +ConcurrentHashMap引入分段锁的概念提高了并发量,每当线程要修改哈希表时并不是锁住整个表,而是去操作某一个segment片段,只对segment做同步,通过细化锁的粒度提高了效率,相对与HashTable对整个哈希表做同步处理更实用与多线程环境。 +参考链接: +https://www.cnblogs.com/rain4j/p/10972090.html + From 6d46f397e9e229318c72de2e3515d07211d21362 Mon Sep 17 00:00:00 2001 From: xuke <792228584@qq.com> Date: Fri, 10 Jul 2020 16:44:33 +0800 Subject: [PATCH 3/3] =?UTF-8?q?=E7=A9=B6=E6=9E=81COncurrentHashMap?= =?UTF-8?q?=E5=88=86=E6=9E=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ...ava.util.concurrent.ConcurrentHashMap.adoc | 1119 ++++++++++++++++- 1 file changed, 1110 insertions(+), 9 deletions(-) diff --git a/docs/java.util.concurrent.ConcurrentHashMap.adoc b/docs/java.util.concurrent.ConcurrentHashMap.adoc index 83f578b..594bd11 100644 --- a/docs/java.util.concurrent.ConcurrentHashMap.adoc +++ b/docs/java.util.concurrent.ConcurrentHashMap.adoc @@ -1,16 +1,1117 @@ -= `ConcurrentHashMap` += ConcurrentHashMap +hashMap是不支持并发的,所以有了ConcurrentHashMap +类介绍: +public class ConcurrentHashMap extends AbstractMap + implements ConcurrentMap, Serializable +ConcurrentHashMap在初始化时会要求初始化concurrencyLevel作为segment数组长度,即并发度,代表最多有多少个线程可以同时操作ConcurrentHashMap,默认是16,每个segment片段里面含有键值对HashEntry数组,是真正存放键值对的地方,这就是ConcurrentHashMap的数据结构。 -image::images/ConcurrentHashMap-segment-lock.png[] +既然ConcurrentHashMap的并发基础是segment,那我们就先来看一下这个类 +static final class Segment extends ReentrantLock implements Serializable { + + //尝试获取锁的最多尝试次数,即自旋次数 + static final int MAX_SCAN_RETRIES = + Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1; + + //HashEntry数组,也就是键值对数组 + transient volatile HashEntry[] table; + //元素的个数 + transient int count; + //segment中发生改变元素的操作的次数,如put/remove + transient int modCount; + //当table大小超过阈值时,对table进行扩容,值为capacity *loadFactor + transient int threshold; + //加载因子 + final float loadFactor; + + Segment(float lf, int threshold, HashEntry[] tab) { + this.loadFactor = lf; + this.threshold = threshold; + this.table = tab; + } +} + +可以看到,segment的并发基础是重入锁。想要访问Segment片段,必须获得此Segment的重入锁 +来看构造函数,有三个参数,加载因子,阈值,HashEntry数组。其中,前两个参数用于扩容,而HashEntry是Segment的基本数据结构。多个HashEntry可以形成链表,用于解决hash冲突,跟HashMap的数组链表结构相似。 + +static final class HashEntry { + //hash值 + final int hash; + //键 + final K key; + //值 + volatile V value; + //下一个键值对 + volatile HashEntry next; + + HashEntry(int hash, K key, V value, HashEntry next) { + this.hash = hash; + this.key = key; + this.value = value; + this.next = next; + } +} +可以看到,它是有next指针的 + +而ConcurrentHashMap的构造方法如下: +public class ConcurrentHashMap extends AbstractMap + implements ConcurrentMap, Serializable { + + private static final long serialVersionUID = 7249069246763182397L; + + //默认的初始容量 + static final int DEFAULT_INITIAL_CAPACITY = 16; + + //默认加载因子 + static final float DEFAULT_LOAD_FACTOR = 0.75f; + + //默认的并发度,也就是默认的Segment数组长度 + static final int DEFAULT_CONCURRENCY_LEVEL = 16; + + //最大容量,ConcurrentMap最大容量 + static final int MAXIMUM_CAPACITY = 1 << 30; + + //每个segment中table数组的长度,必须是2^n,最小为2 + static final int MIN_SEGMENT_TABLE_CAPACITY = 2; + + //允许最大segment数量,用于限定concurrencyLevel的边界,必须是2^n + static final int MAX_SEGMENTS = 1 << 16; // slightly conservative + + //非锁定情况下调用size和contains方法的重试次数,避免由于table连续被修改导致无限重试 + static final int RETRIES_BEFORE_LOCK = 2; + + //计算segment位置的掩码值 + final int segmentMask; + + //用于计算算segment位置时,hash参与运算的位数 + final int segmentShift; + + //Segment数组 + final Segment[] segments; + + + public ConcurrentHashMap(int initialCapacity, + float loadFactor, int concurrencyLevel) { + //参数校验 + if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) + throw new IllegalArgumentException(); + if (concurrencyLevel > MAX_SEGMENTS) + concurrencyLevel = MAX_SEGMENTS; + // Find power-of-two sizes best matching arguments + //找到一个大于等于传入的concurrencyLevel的2^n数,且与concurrencyLevel最接近 + //ssize作为Segment数组 + int sshift = 0; + int ssize = 1; + while (ssize < concurrencyLevel) { + ++sshift; + ssize <<= 1; + } + this.segmentShift = 32 - sshift; + this.segmentMask = ssize - 1; + if (initialCapacity > MAXIMUM_CAPACITY) + initialCapacity = MAXIMUM_CAPACITY; + // 计算每个segment中table的容量 + int c = initialCapacity / ssize; + if (c * ssize < initialCapacity) + ++c; + int cap = MIN_SEGMENT_TABLE_CAPACITY; + // 确保cap是2^n + while (cap < c) + cap <<= 1; + // create segments and segments[0] + // 创建segments并初始化第一个segment数组,其余的segment延迟初始化 + Segment s0 = + new Segment(loadFactor, (int)(cap * loadFactor), + (HashEntry[])new HashEntry[cap]); + Segment[] ss = (Segment[])new Segment[ssize]; + UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0] + this.segments = ss; + } + +} + +concurrencyLevel 参数表示期望并发的修改 ConcurrentHashMap 的线程数量,用于决定 Segment 的数量,通过算法可以知道就是找到最接近传入的concurrencyLevel的2的幂次方。而segmentMask 和 segmentShift看上去有点难以理解,作用主要是根据key的hash值做计算定位在哪个Segment片段 + +从ConcurrentHashMap 的数据结构我们不难才想到,put方法一定是分两步走的 +1.根据键的值定位键值对在那个segment片段 2.调用Segment的put方法 + +public V put(K key, V value) { + Segment s; + if (value == null) + throw new NullPointerException(); + //计算键的hash值 + int hash = hash(key); + //通过hash值运算把键值对定位到segment[j]片段上 + int j = (hash >>> segmentShift) & segmentMask; + //检查segment[j]是否已经初始化了,没有的话调用ensureSegment初始化segment[j] + if ((s = (Segment)UNSAFE.getObject // nonvolatile; recheck + (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment + s = ensureSegment(j); + //向片段中插入键值对 + return s.put(key, hash, value, false); + } + +我们从ConcurrentHashMap的构造函数可以发现Segment数组只初始化了Segment[0],其余的Segment是用到了在初始化,用了延迟加载的策略,而延迟加载调用的就是ensureSegment方法 + +private Segment ensureSegment(int k) { + final Segment[] ss = this.segments; + long u = (k << SSHIFT) + SBASE; // raw offset + Segment seg; + //按照segment[0]的HashEntry数组长度和加载因子初始化Segment[k] + if ((seg = (Segment)UNSAFE.getObjectVolatile(ss, u)) == null) { + Segment proto = ss[0]; // use segment 0 as prototype + int cap = proto.table.length; + float lf = proto.loadFactor; + int threshold = (int)(cap * lf); + HashEntry[] tab = (HashEntry[])new HashEntry[cap]; + if ((seg = (Segment)UNSAFE.getObjectVolatile(ss, u)) + == null) { // recheck + Segment s = new Segment(lf, threshold, tab); + while ((seg = (Segment)UNSAFE.getObjectVolatile(ss, u)) + == null) { + if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s)) + break; + } + } + } + return seg; + } + +调用Segment的put方法插入键值对到Segment的HashEntry数组 + +final V put(K key, int hash, V value, boolean onlyIfAbsent) { + //Segment继承ReentrantLock,尝试获取独占锁 + HashEntry node = tryLock() ? null : + scanAndLockForPut(key, hash, value); + V oldValue; + try { + HashEntry[] tab = table; + //定位键值对在HashEntry数组上的位置 + int index = (tab.length - 1) & hash; + //获取这个位置的第一个键值对 + HashEntry first = entryAt(tab, index); + for (HashEntry e = first;;) { + if (e != null) {//此处有链表结构,一直循环到e==null + K k; + //存在与待插入键值对相同的键,则替换value + if ((k = e.key) == key || + (e.hash == hash && key.equals(k))) { + oldValue = e.value; + if (!onlyIfAbsent) {//onlyIfAbsent默认为false + e.value = value; + ++modCount; + } + break; + } + e = e.next; + } + else { + //node不为null,设置node的next为first,node为当前链表的头节点 + if (node != null) + node.setNext(first); + //node为null,创建头节点,指定next为first,node为当前链表的头节点 + else + node = new HashEntry(hash, key, value, first); + int c = count + 1; + //扩容条件 (1)entry数量大于阈值 (2) 当前数组tab长度小于最大容量。满足以上条件就扩容 + if (c > threshold && tab.length < MAXIMUM_CAPACITY) + //扩容 + rehash(node); + else + //tab的index位置设置为node, + setEntryAt(tab, index, node); + ++modCount; + count = c; + oldValue = null; + break; + } + } + } finally { + unlock(); + } + return oldValue; + } +在不超过最大重试次数MAX_SCAN_RETRIES通过CAS尝试获取锁 +private HashEntry scanAndLockForPut(K key, int hash, V value) { + //first,e:键值对的hash值定位到数组tab的第一个键值对 + HashEntry first = entryForHash(this, hash); + HashEntry e = first; + HashEntry node = null; + int retries = -1; // negative while locating node + //线程尝试通过CAS获取锁 + while (!tryLock()) { + HashEntry f; // to recheck first below + if (retries < 0) { + //当e==null或key.equals(e.key)时retry=0,走出这个分支 + if (e == null) { + if (node == null) // speculatively create node + //初始化键值对,next指向null + node = new HashEntry(hash, key, value, null); + retries = 0; + } + else if (key.equals(e.key)) + retries = 0; + else + e = e.next; + } + //超过最大自旋次数,阻塞 + else if (++retries > MAX_SCAN_RETRIES) { + lock(); + break; + } + //头节点发生变化,重新遍历 + else if ((retries & 1) == 0 && + (f = entryForHash(this, hash)) != first) { + e = first = f; // re-traverse if entry changed + retries = -1; + } + } + return node; + } + +rehash(HashEntry node) + +用于对Segment的table数组进行扩容,扩容后的数组长度是原数组的两倍。 + +private void rehash(HashEntry node) { + //扩容前的旧tab数组 + HashEntry[] oldTable = table; + //扩容前数组长度 + int oldCapacity = oldTable.length; + //扩容后数组长度(扩容前两倍) + int newCapacity = oldCapacity << 1; + //计算新的阈值 + threshold = (int)(newCapacity * loadFactor); + //新的tab数组 + HashEntry[] newTable = + (HashEntry[]) new HashEntry[newCapacity]; + //新的掩码 + int sizeMask = newCapacity - 1; + //遍历旧的数组 + for (int i = 0; i < oldCapacity ; i++) { + //遍历数组的每一个元素 + HashEntry e = oldTable[i]; + if (e != null) { + //元素e指向的下一个节点,如果存在hash冲突那么e不为空 + HashEntry next = e.next; + //计算元素在新数组的索引 + int idx = e.hash & sizeMask; + // 桶中只有一个元素,把当前的e设置给新的table + if (next == null) // Single node on list + newTable[idx] = e; + //桶中有布置一个元素的链表 + else { // Reuse consecutive sequence at same slot + HashEntry lastRun = e; + // idx 是当前链表的头结点 e 的新位置 + int lastIdx = idx; + for (HashEntry last = next; + last != null; + last = last.next) { + //k是单链表元素在新数组的位置 + int k = last.hash & sizeMask; + //lastRun是最后一个扩容后不在原桶处的Entry + if (k != lastIdx) { + lastIdx = k; + lastRun = last; + } + } + //lastRun以及它后面的元素都在一个桶中 + newTable[lastIdx] = lastRun; + // Clone remaining nodes + //遍历到lastRun即可 + for (HashEntry p = e; p != lastRun; p = p.next) { + V v = p.value; + int h = p.hash; + int k = h & sizeMask; + HashEntry n = newTable[k]; + newTable[k] = new HashEntry(h, p.key, v, n); + } + } + } + } + //处理引起扩容的那个待添加的节点 + int nodeIndex = node.hash & sizeMask; // add the new node + node.setNext(newTable[nodeIndex]); + newTable[nodeIndex] = node; + //把Segment的table指向扩容后的table + table = newTable; + } + +get方法: +get获取元素不需要加锁,效率高,获取key定位到的segment片段还是遍历table数组的HashEntry元素时使用了UNSAFE.getObjectVolatile保证了能够无锁且获取到最新的volatile变量的值 +public V get(Object key) { + Segment s; // manually integrate access methods to reduce overhead + HashEntry[] tab; + //计算key的hash值 + int h = hash(key); + //根据hash值计算key在哪个segment片段 + long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; + //获取segments[u]的table数组 + if ((s = (Segment)UNSAFE.getObjectVolatile(segments, u)) != null && + (tab = s.table) != null) { + //遍历table中的HashEntry元素 + for (HashEntry e = (HashEntry) UNSAFE.getObjectVolatile + (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE); + e != null; e = e.next) { + K k; + //找到相同的key,返回value + if ((k = e.key) == key || (e.hash == h && key.equals(k))) + return e.value; + } + } + return null; + } + +size方法: +size方法用来计算ConcurrentHashMap中储存元素的个数。那么在统计所有的segment元素的个数是否都需要上锁呢?如果不上锁在统计的过程中可能存在其他线程并发存储/删除元素,而如果上锁又会降低读写效率。ConcurrentHashMap在实现时使用了折中的方法,它会无锁遍历三次把所有的segment的modCount加到sum里面,如果与前一次遍历结果相比sum没有改变那么说明这两次遍历没有其他线程修改ConcurrentHashMap,返回segment的count的和;如果每次遍历与上一次相比都不一样那就上锁进行同步。 + +public int size() { + // Try a few times to get accurate count. On failure due to + // continuous async changes in table, resort to locking. + final Segment[] segments = this.segments; + int size; + boolean overflow; // true if size overflows 32 bits + long sum; // sum of modCounts + long last = 0L; // previous sum + int retries = -1; // first iteration isn't retry + try { + for (;;) { + //达到RETRIES_BEFORE_LOCK,也就是三次 + if (retries++ == RETRIES_BEFORE_LOCK) { + for (int j = 0; j < segments.length; ++j) + ensureSegment(j).lock(); // force creation + } + sum = 0L; + size = 0; + overflow = false; + for (int j = 0; j < segments.length; ++j) { + Segment seg = segmentAt(segments, j); + //遍历计算segment的modCount和count的和 + if (seg != null) { + sum += seg.modCount; + int c = seg.count; + //是否溢出int范围 + if (c < 0 || (size += c) < 0) + overflow = true; + } + } + //last是上一次的sum值,相等跳出循环 + if (sum == last) + break; + last = sum; + } + } finally { + //解锁 + if (retries > RETRIES_BEFORE_LOCK) { + for (int j = 0; j < segments.length; ++j) + segmentAt(segments, j).unlock(); + } + } + return overflow ? Integer.MAX_VALUE : size; + } + +remove就不说了 + +isEmpty +检查ConcurrentHashMap是否为空。同样没有使用同步锁,通过两次遍历:1.确定每个segment是否为0,其中任何一个segment的count不为0,就返回,都为0,就累加modCount为sum.2.第一个循环执行完还没有推出,map可能为空,再做一次遍历,如果在这个过程中任何一个segment的count不为0返回false,同时sum减去每个segment的modCount,若循环执行完程序还没有退出,比较sum是否为0,为0表示两次检查没有元素插入,map确实为空,否则map不为空。 + +public boolean isEmpty() { + //累计segment的modCount值 + long sum = 0L; + final Segment[] segments = this.segments; + for (int j = 0; j < segments.length; ++j) { + Segment seg = segmentAt(segments, j); + if (seg != null) { + if (seg.count != 0) + return false; + sum += seg.modCount; + } + } + //再次检查 + if (sum != 0L) { // recheck unless no modifications + for (int j = 0; j < segments.length; ++j) { + Segment seg = segmentAt(segments, j); + if (seg != null) { + if (seg.count != 0) + return false; + sum -= seg.modCount; + } + } + if (sum != 0L) + return false; + } + return true; + } + +总之: +ConcurrentHashMap引入分段锁的概念提高了并发量,每当线程要修改哈希表时并不是锁住整个表,而是去操作某一个segment片段,只对segment做同步,通过细化锁的粒度提高了效率,相对与HashTable对整个哈希表做同步处理更实用与多线程环境。 +参考链接: +https://www.cnblogs.com/rain4j/p/10972090.html + + + +='COncurrentHashMap' + +1.8抛弃了Segment的概念 +在ConcurrentHashMap中通过一个Node[]数组来保存添加到map中的键值对,而在同一个数组位置是通过链表和红黑树的形式来保存的。但是这个数组只有在第一次添加元素的时候才会初始化,否则只是初始化一个ConcurrentHashMap对象的话,只是设定了一个sizeCtl变量,这个变量用来判断对象的一些状态和是否需要扩容,后面会详细解释。 +  第一次添加元素的时候,默认初期长度为16,当往map中继续添加元素的时候,通过hash值跟数组长度取与来决定放在数组的哪个位置,如果出现放在同一个位置的时候,优先以链表的形式存放,在同一个位置的个数又达到了8个以上,如果数组的长度还小于64的时候,则会扩容数组。如果数组的长度大于等于64了的话,在会将该节点的链表转换成树。 +  通过扩容数组的方式来把这些节点给分散开。然后将这些元素复制到扩容后的新的数组中,同一个链表中的元素通过hash值的数组长度位来区分,是还是放在原来的位置还是放到扩容的长度的相同位置去 。在扩容完成之后,如果某个节点的是树,同时现在该节点的个数又小于等于6个了,则会将该树转为链表。 +  取元素的时候,相对来说比较简单,通过计算hash来确定该元素在数组的哪个位置,然后在通过遍历链表或树来判断key和key的hash,取出value值。 + +private static final int MAXIMUM_CAPACITY = 1 << 30; +private static final int DEFAULT_CAPACITY = 16; +static final int TREEIFY_THRESHOLD = 8; +static final int UNTREEIFY_THRESHOLD = 6; +static final int MIN_TREEIFY_CAPACITY = 64; +static final int MOVED = -1; // 表示正在转移 +static final int TREEBIN = -2; // 表示已经转换成树 +static final int RESERVED = -3; // hash for transient reservations +static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash +transient volatile Node[] table;//默认没初始化的数组,用来保存元素 +private transient volatile Node[] nextTable;//转移的时候用的数组 +/** + * 用来控制表初始化和扩容的,默认值为0,当在初始化的时候指定了大小,这会将这个大小保存在sizeCtl中,大小为数组的0.75 + * 当为负的时候,说明表正在初始化或扩张, + * -1表示初始化 + * -(1+n) n:表示活动的扩张线程 + */ + private transient volatile int sizeCtl; + +几个重要的类: +Node,这是构成每个元素的基本类。 +static class Node implements Map.Entry { + final int hash; //key的hash值 + final K key; //key + volatile V val; //value + volatile Node next; //表示链表中的下一个节点 + + Node(int hash, K key, V val, Node next) { + this.hash = hash; + this.key = key; + this.val = val; + this.next = next; + } + public final K getKey() { return key; } + public final V getValue() { return val; } + public final int hashCode() { return key.hashCode() ^ val.hashCode(); } + } + +TreeNode,构造树的节点 + +static final class TreeNode extends Node { + TreeNode parent; // red-black tree links + TreeNode left; + TreeNode right; + TreeNode prev; // needed to unlink next upon deletion + boolean red; + + TreeNode(int hash, K key, V val, Node next, + TreeNode parent) { + super(hash, key, val, next); + this.parent = parent; + } +} +TreeBin 用作树的头结点,只存储root和first节点,不存储节点的key、value值。 + +static final class TreeBin extends Node { + TreeNode root; + volatile TreeNode first; + volatile Thread waiter; + volatile int lockState; + // values for lockState + static final int WRITER = 1; // set while holding write lock + static final int WAITER = 2; // set when waiting for write lock + static final int READER = 4; // increment value for setting read lock +} + +ForwardingNode在转移的时候放在头部的节点,是一个空节点 + +static final class ForwardingNode extends Node { + final Node[] nextTable; + ForwardingNode(Node[] tab) { + super(MOVED, null, null, null); + this.nextTable = tab; + }} + +ConcurrentHashMap的初始化 + +//空的构造 +public ConcurrentHashMapDebug() { + } +//如果在实例化对象的时候指定了容量,则初始化sizeCtl +public ConcurrentHashMapDebug(int initialCapacity) { + if (initialCapacity < 0) + throw new IllegalArgumentException(); + int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? + MAXIMUM_CAPACITY : + tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1)); + this.sizeCtl = cap; + } +//当出入一个Map的时候,先设定sizeCtl为默认容量,在添加元素 +public ConcurrentHashMapDebug(Map m) { + this.sizeCtl = DEFAULT_CAPACITY; + putAll(m); + } + +可以看到,在任何一个构造方法中,都没有对存储Map元素Node的table变量进行初始化。而是在第一次put操作的时候在进行初始化。 +  下面来看看数组的初始化方法initTable + +/** + * 初始化数组table, + * 如果sizeCtl小于0,说明别的数组正在进行初始化,则让出执行权 + * 如果sizeCtl大于0的话,则初始化一个大小为sizeCtl的数组 + * 否则的话初始化一个默认大小(16)的数组 + * 然后设置sizeCtl的值为数组长度的3/4 + */ + private final Node[] initTable() { + Node[] tab; int sc; + while ((tab = table) == null || tab.length == 0) { //第一次put的时候,table还没被初始化,进入while + if ((sc = sizeCtl) < 0) //sizeCtl初始值为0,当小于0的时候表示在别的线程在初始化表或扩展表 + Thread.yield(); // lost initialization race; just spin + else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { //SIZECTL:表示当前对象的内存偏移量,sc表示期望值,-1表示要替换的值,设定为-1表示要初始化表了 + try { + if ((tab = table) == null || tab.length == 0) { + int n = (sc > 0) ? sc : DEFAULT_CAPACITY; //指定了大小的时候就创建指定大小的Node数组,否则创建指定大小(16)的Node数组 + @SuppressWarnings("unchecked") + Node[] nt = (Node[])new Node[n]; + table = tab = nt; + sc = n - (n >>> 2); + } + } finally { + sizeCtl = sc; //初始化后,sizeCtl长度为数组长度的3/4 + } + break; + } + } + return tab; + } + +ConcurrentHashMap的put操作详解 +/* + * 单纯的额调用putVal方法,并且putVal的第三个参数设置为false + * 当设置为false的时候表示这个value一定会设置 + * true的时候,只有当这个key的value为空的时候才会设置 + */ + public V put(K key, V value) { + return putVal(key, value, false); + } + + +/* + * 当添加一对键值对的时候,首先会去判断保存这些键值对的数组是不是初始化了, + * 如果没有的话就初始化数组 + * 然后通过计算hash值来确定放在数组的哪个位置 + * 如果这个位置为空则直接添加,如果不为空的话,则取出这个节点来 + * 如果取出来的节点的hash值是MOVED(-1)的话,则表示当前正在对这个数组进行扩容,复制到新的数组,则当前线程也去帮助复制 + * 最后一种情况就是,如果这个节点,不为空,也不在扩容,则通过synchronized来加锁,进行添加操作 + * 然后判断当前取出的节点位置存放的是链表还是树 + * 如果是链表的话,则遍历整个链表,直到取出来的节点的key来个要放的key进行比较,如果key相等,并且key的hash值也相等的话, + * 则说明是同一个key,则覆盖掉value,否则的话则添加到链表的末尾 + * 如果是树的话,则调用putTreeVal方法把这个元素添加到树中去 + * 最后在添加完成之后,会判断在该节点处共有多少个节点(注意是添加前的个数),如果达到8个以上了的话, + * 则调用treeifyBin方法来尝试将处的链表转为树,或者扩容数组 + */ + final V putVal(K key, V value, boolean onlyIfAbsent) { + if (key == null || value == null) throw new NullPointerException();//K,V都不能为空,否则的话跑出异常 + int hash = spread(key.hashCode()); //取得key的hash值 + int binCount = 0; //用来计算在这个节点总共有多少个元素,用来控制扩容或者转移为树 + for (Node[] tab = table;;) { // + Node f; int n, i, fh; + if (tab == null || (n = tab.length) == 0) + tab = initTable(); //第一次put的时候table没有初始化,则初始化table + else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //通过哈希计算出一个表中的位置因为n是数组的长度,所以(n-1)&hash肯定不会出现数组越界 + if (casTabAt(tab, i, null, //如果这个位置没有元素的话,则通过cas的方式尝试添加,注意这个时候是没有加锁的 + new Node(hash, key, value, null))) //创建一个Node添加到数组中区,null表示的是下一个节点为空 + break; // no lock when adding to empty bin + } + /* + * 如果检测到某个节点的hash值是MOVED,则表示正在进行数组扩张的数据复制阶段, + * 则当前线程也会参与去复制,通过允许多线程复制的功能,一次来减少数组的复制所带来的性能损失 + */ + else if ((fh = f.hash) == MOVED) + tab = helpTransfer(tab, f); + else { + /* + * 如果在这个位置有元素的话,就采用synchronized的方式加锁, + * 如果是链表的话(hash大于0),就对这个链表的所有元素进行遍历, + * 如果找到了key和key的hash值都一样的节点,则把它的值替换到 + * 如果没找到的话,则添加在链表的最后面 + * 否则,是树的话,则调用putTreeVal方法添加到树中去 + * + * 在添加完之后,会对该节点上关联的的数目进行判断, + * 如果在8个以上的话,则会调用treeifyBin方法,来尝试转化为树,或者是扩容 + */ + V oldVal = null; + synchronized (f) { + if (tabAt(tab, i) == f) { //再次取出要存储的位置的元素,跟前面取出来的比较 + if (fh >= 0) { //取出来的元素的hash值大于0,当转换为树之后,hash值为-2 + binCount = 1; + for (Node e = f;; ++binCount) { //遍历这个链表 + K ek; + if (e.hash == hash && //要存的元素的hash,key跟要存储的位置的节点的相同的时候,替换掉该节点的value即可 + ((ek = e.key) == key || + (ek != null && key.equals(ek)))) { + oldVal = e.val; + if (!onlyIfAbsent) //当使用putIfAbsent的时候,只有在这个key没有设置值得时候才设置 + e.val = value; + break; + } + Node pred = e; + if ((e = e.next) == null) { //如果不是同样的hash,同样的key的时候,则判断该节点的下一个节点是否为空, + pred.next = new Node(hash, key, //为空的话把这个要加入的节点设置为当前节点的下一个节点 + value, null); + break; + } + } + } + else if (f instanceof TreeBin) { //表示已经转化成红黑树类型了 + Node p; + binCount = 2; + if ((p = ((TreeBin)f).putTreeVal(hash, key, //调用putTreeVal方法,将该元素添加到树中去 + value)) != null) { + oldVal = p.val; + if (!onlyIfAbsent) + p.val = value; + } + } + } + } + if (binCount != 0) { + if (binCount >= TREEIFY_THRESHOLD) //当在同一个节点的数目达到8个的时候,则扩张数组或将给节点的数据转为tree + treeifyBin(tab, i); + if (oldVal != null) + return oldVal; + break; + } + } + } + addCount(1L, binCount); //计数 + return null; + } + +ConcurrentHashMap的扩容详解 + +在put方法的详解中,我们可以看到,在同一个节点的个数超过8个的时候,会调用treeifyBin方法来看看是扩容还是转化为一棵树 +  同时在每次添加完元素的addCount方法中,也会判断当前数组中的元素是否达到了sizeCtl的量,如果达到了的话,则会进入transfer方法去扩容 + +/** + * Replaces all linked nodes in bin at given index unless table is + * too small, in which case resizes instead. + * 当数组长度小于64的时候,扩张数组长度一倍,否则的话把链表转为树 + */ + private final void treeifyBin(Node[] tab, int index) { + Node b; int n, sc; + if (tab != null) { + System.out.println("treeifyBin方\t==>数组长:"+tab.length); + if ((n = tab.length) < MIN_TREEIFY_CAPACITY) //MIN_TREEIFY_CAPACITY 64 + tryPresize(n << 1); // 数组扩容 + else if ((b = tabAt(tab, index)) != null && b.hash >= 0) { + synchronized (b) { //使用synchronized同步器,将该节点出的链表转为树 + if (tabAt(tab, index) == b) { + TreeNode hd = null, tl = null; //hd:树的头(head) + for (Node e = b; e != null; e = e.next) { + TreeNode p = + new TreeNode(e.hash, e.key, e.val, + null, null); + if ((p.prev = tl) == null) //把Node组成的链表,转化为TreeNode的链表,头结点任然放在相同的位置 + hd = p; //设置head + else + tl.next = p; + tl = p; + } + setTabAt(tab, index, new TreeBin(hd));//把TreeNode的链表放入容器TreeBin中 + } + } + } + } + } + +可以看到当需要扩容的时候,调用的时候tryPresize方法,看看trePresize的源码 + +/** + * 扩容表为指可以容纳指定个数的大小(总是2的N次方) + * 假设原来的数组长度为16,则在调用tryPresize的时候,size参数的值为16<<1(32),此时sizeCtl的值为12 + * 计算出来c的值为64,则要扩容到sizeCtl≥为止 + * 第一次扩容之后 数组长:32 sizeCtl:24 + * 第二次扩容之后 数组长:64 sizeCtl:48 + * 第二次扩容之后 数组长:128 sizeCtl:94 --> 这个时候才会退出扩容 + */ + private final void tryPresize(int size) { + /* + * MAXIMUM_CAPACITY = 1 << 30 + * 如果给定的大小大于等于数组容量的一半,则直接使用最大容量, + * 否则使用tableSizeFor算出来 + * 后面table一直要扩容到这个值小于等于sizeCtrl(数组长度的3/4)才退出扩容 + */ + int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : + tableSizeFor(size + (size >>> 1) + 1); + int sc; + while ((sc = sizeCtl) >= 0) { + Node[] tab = table; int n; +// printTable(tab); 调试用的 + /* + * 如果数组table还没有被初始化,则初始化一个大小为sizeCtrl和刚刚算出来的c中较大的一个大小的数组 + * 初始化的时候,设置sizeCtrl为-1,初始化完成之后把sizeCtrl设置为数组长度的3/4 + * 为什么要在扩张的地方来初始化数组呢?这是因为如果第一次put的时候不是put单个元素, + * 而是调用putAll方法直接put一个map的话,在putALl方法中没有调用initTable方法去初始化table, + * 而是直接调用了tryPresize方法,所以这里需要做一个是不是需要初始化table的判断 + */ + if (tab == null || (n = tab.length) == 0) { + n = (sc > c) ? sc : c; + if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { //初始化tab的时候,把sizeCtl设为-1 + try { + if (table == tab) { + @SuppressWarnings("unchecked") + Node[] nt = (Node[])new Node[n]; + table = nt; + sc = n - (n >>> 2); + } + } finally { + sizeCtl = sc; + } + } + } + /* + * 一直扩容到的c小于等于sizeCtl或者数组长度大于最大长度的时候,则退出 + * 所以在一次扩容之后,不是原来长度的两倍,而是2的n次方倍 + */ + else if (c <= sc || n >= MAXIMUM_CAPACITY) { + break; //退出扩张 + } + else if (tab == table) { + int rs = resizeStamp(n); + /* + * 如果正在扩容Table的话,则帮助扩容 + * 否则的话,开始新的扩容 + * 在transfer操作,将第一个参数的table中的元素,移动到第二个元素的table中去, + * 虽然此时第二个参数设置的是null,但是,在transfer方法中,当第二个参数为null的时候, + * 会创建一个两倍大小的table + */ + if (sc < 0) { + Node[] nt; + if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || + sc == rs + MAX_RESIZERS || (nt = nextTable) == null || + transferIndex <= 0) + break; + /* + * transfer的线程数加一,该线程将进行transfer的帮忙 + * 在transfer的时候,sc表示在transfer工作的线程数 + */ + if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) + transfer(tab, nt); + } + /* + * 没有在初始化或扩容,则开始扩容 + */ + else if (U.compareAndSwapInt(this, SIZECTL, sc, + (rs << RESIZE_STAMP_SHIFT) + 2)) { + transfer(tab, null); + } + } + } + } + +在tryPresize方法中,并没有加锁,允许多个线程进入,如果数组正在扩张,则当前线程也去帮助扩容。 +数组扩容的主要方法就是transfer方法 + +/** + * Moves and/or copies the nodes in each bin to new table. See + * above for explanation. + * 把数组中的节点复制到新的数组的相同位置,或者移动到扩张部分的相同位置 + * 在这里首先会计算一个步长,表示一个线程处理的数组长度,用来控制对CPU的使用, + * 每个CPU最少处理16个长度的数组元素,也就是说,如果一个数组的长度只有16,那只有一个线程会对其进行扩容的复制移动操作 + * 扩容的时候会一直遍历,知道复制完所有节点,没处理一个节点的时候会在链表的头部设置一个fwd节点,这样其他线程就会跳过他, + * 复制后在新数组中的链表不是绝对的反序的 + */ + private final void transfer(Node[] tab, Node[] nextTab) { + int n = tab.length, stride; + if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) //MIN_TRANSFER_STRIDE 用来控制不要占用太多CPU + stride = MIN_TRANSFER_STRIDE; // subdivide range //MIN_TRANSFER_STRIDE=16 + /* + * 如果复制的目标nextTab为null的话,则初始化一个table两倍长的nextTab + * 此时nextTable被设置值了(在初始情况下是为null的) + * 因为如果有一个线程开始了表的扩张的时候,其他线程也会进来帮忙扩张, + * 而只是第一个开始扩张的线程需要初始化下目标数组 + */ + if (nextTab == null) { // initiating + try { + @SuppressWarnings("unchecked") + Node[] nt = (Node[])new Node[n << 1]; + nextTab = nt; + } catch (Throwable ex) { // try to cope with OOME + sizeCtl = Integer.MAX_VALUE; + return; + } + nextTable = nextTab; + transferIndex = n; + } + int nextn = nextTab.length; + /* + * 创建一个fwd节点,这个是用来控制并发的,当一个节点为空或已经被转移之后,就设置为fwd节点 + * 这是一个空的标志节点 + */ + ForwardingNode fwd = new ForwardingNode(nextTab); + boolean advance = true; //是否继续向前查找的标志位 + boolean finishing = false; // to ensure sweep(清扫) before committing nextTab,在完成之前重新在扫描一遍数组,看看有没完成的没 + for (int i = 0, bound = 0;;) { + Node f; int fh; + while (advance) { + int nextIndex, nextBound; + if (--i >= bound || finishing) { + advance = false; + } + else if ((nextIndex = transferIndex) <= 0) { + i = -1; + advance = false; + } + else if (U.compareAndSwapInt + (this, TRANSFERINDEX, nextIndex, + nextBound = (nextIndex > stride ? + nextIndex - stride : 0))) { + bound = nextBound; + i = nextIndex - 1; + advance = false; + } + } + if (i < 0 || i >= n || i + n >= nextn) { + int sc; + if (finishing) { //已经完成转移 + nextTable = null; + table = nextTab; + sizeCtl = (n << 1) - (n >>> 1); //设置sizeCtl为扩容后的0.75 + return; + } + if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { + if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) { + return; + } + finishing = advance = true; + i = n; // recheck before commit + } + } + else if ((f = tabAt(tab, i)) == null) //数组中把null的元素设置为ForwardingNode节点(hash值为MOVED[-1]) + advance = casTabAt(tab, i, null, fwd); + else if ((fh = f.hash) == MOVED) + advance = true; // already processed + else { + synchronized (f) { //加锁操作 + if (tabAt(tab, i) == f) { + Node ln, hn; + if (fh >= 0) { //该节点的hash值大于等于0,说明是一个Node节点 + /* + * 因为n的值为数组的长度,且是power(2,x)的,所以,在&操作的结果只可能是0或者n + * 根据这个规则 + * 0--> 放在新表的相同位置 + * n--> 放在新表的(n+原来位置) + */ + int runBit = fh & n; + Node lastRun = f; + /* + * lastRun 表示的是需要复制的最后一个节点 + * 每当新节点的hash&n -> b 发生变化的时候,就把runBit设置为这个结果b + * 这样for循环之后,runBit的值就是最后不变的hash&n的值 + * 而lastRun的值就是最后一次导致hash&n 发生变化的节点(假设为p节点) + * 为什么要这么做呢?因为p节点后面的节点的hash&n 值跟p节点是一样的, + * 所以在复制到新的table的时候,它肯定还是跟p节点在同一个位置 + * 在复制完p节点之后,p节点的next节点还是指向它原来的节点,就不需要进行复制了,自己就被带过去了 + * 这也就导致了一个问题就是复制后的链表的顺序并不一定是原来的倒序 + */ + for (Node p = f.next; p != null; p = p.next) { + int b = p.hash & n; //n的值为扩张前的数组的长度 + if (b != runBit) { + runBit = b; + lastRun = p; + } + } + if (runBit == 0) { + ln = lastRun; + hn = null; + } + else { + hn = lastRun; + ln = null; + } + /* + * 构造两个链表,顺序大部分和原来是反的 + * 分别放到原来的位置和新增加的长度的相同位置(i/n+i) + */ + for (Node p = f; p != lastRun; p = p.next) { + int ph = p.hash; K pk = p.key; V pv = p.val; + if ((ph & n) == 0) + /* + * 假设runBit的值为0, + * 则第一次进入这个设置的时候相当于把旧的序列的最后一次发生hash变化的节点(该节点后面可能还有hash计算后同为0的节点)设置到旧的table的第一个hash计算后为0的节点下一个节点 + * 并且把自己返回,然后在下次进来的时候把它自己设置为后面节点的下一个节点 + */ + ln = new Node(ph, pk, pv, ln); + else + /* + * 假设runBit的值不为0, + * 则第一次进入这个设置的时候相当于把旧的序列的最后一次发生hash变化的节点(该节点后面可能还有hash计算后同不为0的节点)设置到旧的table的第一个hash计算后不为0的节点下一个节点 + * 并且把自己返回,然后在下次进来的时候把它自己设置为后面节点的下一个节点 + */ + hn = new Node(ph, pk, pv, hn); + } + setTabAt(nextTab, i, ln); + setTabAt(nextTab, i + n, hn); + setTabAt(tab, i, fwd); + advance = true; + } + else if (f instanceof TreeBin) { //否则的话是一个树节点 + TreeBin t = (TreeBin)f; + TreeNode lo = null, loTail = null; + TreeNode hi = null, hiTail = null; + int lc = 0, hc = 0; + for (Node e = t.first; e != null; e = e.next) { + int h = e.hash; + TreeNode p = new TreeNode + (h, e.key, e.val, null, null); + if ((h & n) == 0) { + if ((p.prev = loTail) == null) + lo = p; + else + loTail.next = p; + loTail = p; + ++lc; + } + else { + if ((p.prev = hiTail) == null) + hi = p; + else + hiTail.next = p; + hiTail = p; + ++hc; + } + } + /* + * 在复制完树节点之后,判断该节点处构成的树还有几个节点, + * 如果≤6个的话,就转回为一个链表 + */ + ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : + (hc != 0) ? new TreeBin(lo) : t; + hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : + (lc != 0) ? new TreeBin(hi) : t; + setTabAt(nextTab, i, ln); + setTabAt(nextTab, i + n, hn); + setTabAt(tab, i, fwd); + advance = true; + } + } + } + } + } + } + +到这里,ConcurrentHashMap的put操作和扩容都介绍的差不多了, +  下面的两点一定要注意: +    ·复制之后的新链表不是旧链表的绝对倒序。 +    ·在扩容的时候每个线程都有处理的步长,最少为16,在这个步长范围内的数组节点只有自己一个线程来处理 + +ConcurrentHashMap的get操作详解 + +相比put操作,get操作就显得很简单了。废话少说,直接上源码分析。 +/* + * 相比put方法,get就很单纯了,支持并发操作, + * 当key为null的时候回抛出NullPointerException的异常 + * get操作通过首先计算key的hash值来确定该元素放在数组的哪个位置 + * 然后遍历该位置的所有节点 + * 如果不存在的话返回null + */ + public V get(Object key) { + Node[] tab; Node e, p; int n, eh; K ek; + int h = spread(key.hashCode()); + if ((tab = table) != null && (n = tab.length) > 0 && + (e = tabAt(tab, (n - 1) & h)) != null) { + if ((eh = e.hash) == h) { + if ((ek = e.key) == key || (ek != null && key.equals(ek))) + return e.val; + } + else if (eh < 0) + return (p = e.find(h, key)) != null ? p.val : null; + while ((e = e.next) != null) { + if (e.hash == h && + ((ek = e.key) == key || (ek != null && key.equals(ek)))) + return e.val; + } + } + return null; + } +ConcurrentHashMap的同步机制 +  前面分析了下ConcurrentHashMap的源码,那么,对于一个映射集合来说,ConcurrentHashMap是如果来做到并发安全,又是如何做到高效的并发的呢? +  首先是读操作,从源码中可以看出来,在get操作中,根本没有使用同步机制,也没有使用unsafe方法,所以读操作是支持并发操作的。 +  那么写操作呢? +  分析这个之前,先看看什么情况下会引起数组的扩容,扩容是通过transfer方法来进行的。而调用transfer方法的只有trePresize、helpTransfer和addCount三个方法。 +  这三个方法又是分别在什么情况下进行调用的呢? +  ·tryPresize是在treeIfybin和putAll方法中调用,treeIfybin主要是在put添加元素完之后,判断该数组节点相关元素是不是已经超过8个的时候,如果超过则会调用这个方法来扩容数组或者把链表转为树。 +  ·helpTransfer是在当一个线程要对table中元素进行操作的时候,如果检测到节点的HASH值为MOVED的时候,就会调用helpTransfer方法,在helpTransfer中再调用transfer方法来帮助完成数组的扩容 +  ·addCount是在当对数组进行操作,使得数组中存储的元素个数发生了变化的时候会调用的方法。 +   +  所以引起数组扩容的情况如下: +  ·只有在往map中添加元素的时候,在某一个节点的数目已经超过了8个,同时数组的长度又小于64的时候,才会触发数组的扩容。 +  ·当数组中元素达到了sizeCtl的数量的时候,则会调用transfer方法来进行扩容 +   +  那么在扩容的时候,可以不可以对数组进行读写操作呢? +  事实上是可以的。当在进行数组扩容的时候,如果当前节点还没有被处理(也就是说还没有设置为fwd节点),那就可以进行设置操作。 +  如果该节点已经被处理了,则当前线程也会加入到扩容的操作中去。 +   +  那么,多个线程又是如何同步处理的呢? +  在ConcurrentHashMap中,同步处理主要是通过Synchronized和unsafe两种方式来完成的。 +  ·在取得sizeCtl、某个位置的Node的时候,使用的都是unsafe的方法,来达到并发安全的目的 +  ·当需要在某个位置设置节点的时候,则会通过Synchronized的同步机制来锁定该位置的节点。 +  ·在数组扩容的时候,则通过处理的步长和fwd节点来达到并发安全的目的,通过设置hash值为MOVED +  ·当把某个位置的节点复制到扩张后的table的时候,也通过Synchronized的同步机制来保证现程安全 + + + +链表转为红黑树的过程 +   前面在讲解tryifyBin的源码的时候讲到过,如果在当个bin上的元素超过了8个的时候,就会尝试去扩容数组或者是将链表转为红黑树。 +源码: + +TreeBin(TreeNode b) { + super(TREEBIN, null, null, null); //创建的TreeBin是一个空节点,hash值为TREEBIN(-2) + this.first = b; + TreeNode r = null; + for (TreeNode x = b, next; x != null; x = next) { + next = (TreeNode)x.next; + x.left = x.right = null; + if (r == null) { + x.parent = null; + x.red = false; + r = x; + }// + else { + K k = x.key; + int h = x.hash; + Class kc = null; + for (TreeNode p = r;;) {//x代表的是转换为树之前的顺序遍历到链表的位置的节点,r代表的是根节点 + int dir, ph; + K pk = p.key; + if ((ph = p.hash) > h) // + dir = -1; + else if (ph < h) + dir = 1; + else if ((kc == null && + (kc = comparableClassFor(k)) == null) || + (dir = compareComparables(kc, k, pk)) == 0) + dir = tieBreakOrder(k, pk); //当key不可以比较,或者相等的时候采取的一种排序措施 + TreeNode xp = p; + if ((p = (dir <= 0) ? p.left : p.right) == null) {//在这里判断要放的left/right是否为空,不为空继续用left/right节点来判断 + x.parent = xp; + if (dir <= 0) + xp.left = x; + else + xp.right = x; + r = balanceInsertion(r, x); //每次插入一个元素的时候都调用balanceInsertion来保持红黑树的平衡 + break; + } + } + } + } + this.root = r; + assert checkInvariants(root); + } -image::images/JDK1.8-ConcurrentHashMap-Structure.jpg[] -== 参考资料 -* http://note.youdao.com/share/?spm=5176.100239.blogcont36781.3.nHffVb&id=dde7a10b98aee57676408bc475ab0680&type=note#/[ConcurrentHashMap源码分析--Java8] -* http://www.cnblogs.com/huaizuo/p/5413069.html[探索jdk8之ConcurrentHashMap 的实现机制 - 淮左 - 博客园] -- 参考资料非常棒,建议都看看! -* http://blog.csdn.net/u010723709/article/details/48007881[ConcurrentHashMap源码分析(JDK8版本) - 惟愿无事 - 博客频道 - CSDN.NET] -* https://www.cnblogs.com/chengxiao/p/6842045.html[ConcurrentHashMap实现原理及源码分析 - dreamcatcher-cx - 博客园] -* https://www.jianshu.com/p/d10256f0ebea[ConcurrentHashMap 原理解析(JDK1.8) - 简书]