ConcurrentHashMap浅析 - 969251639/study GitHub Wiki
ConcurrentHashMap是jdk提供的线程安全的哈希映射容器,它提供了非常高效率来安全完成的哈希表的扩容操作,线程冲突时最多只锁定一个桶的位置,里面的实现非常的棒,是一个必须掌握的知识点。
首先来看下它的大概结构,其实和HashMap是一样的
同样适用红黑树和链表解决哈希冲突,只不过它是线程安全的,且要保证效率,所以会比HashMap复杂很多很多
下面开始着重分析ConcurrentHashMap的实现
//哈希表最大容量,2 ^ 30
private static final int MAXIMUM_CAPACITY = 1 << 30;
//默认容量
private static final int DEFAULT_CAPACITY = 16;
//最大数组大小
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
//默认并发级别,扩容时一个线程处理16个桶
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
//负载因子
private static final float LOAD_FACTOR = 0.75f;
//链表转红黑树阈值
static final int TREEIFY_THRESHOLD = 8;
//红黑树转链表阈值
static final int UNTREEIFY_THRESHOLD = 6;
//如果一个链表的数量达到了转红黑树的阈值,不一定会转红黑树,而是会根据下面的MIN_TREEIFY_CAPACITY来判断是扩容还是转红黑树
//比如如果哈希表的数组长度小于64,那么达到转红黑树的阈值后会去做扩容,而不是去转红黑树,反之则会去按照上面的规则转红黑树机构存储
static final int MIN_TREEIFY_CAPACITY = 64;
//该桶的位置已经迁移了
static final int MOVED = -1; // hash for forwarding nodes
//红黑树结构的桶
static final int TREEBIN = -2; // hash for roots of trees
//计算数组下标用
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
CPU核数
static final int NCPU = Runtime.getRuntime().availableProcessors();
//扩容时线程迁移原数组的桶至少是16个,即一个线程至少处理16个桶的迁移
private static final int MIN_TRANSFER_STRIDE = 16;
//用来计算扩容时的二进制数
private static int RESIZE_STAMP_BITS = 16;
//最大参与扩容的线程数
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
//用来空sizeCtl变量的计算位数
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
//存放元素的数组,即哈希表
transient volatile Node<K,V>[] table;
//扩容时的新数组
private transient volatile Node<K,V>[] nextTable;
//存储哈希表所有元素的总和
private transient volatile long baseCount;
//该属性有多重含义,根据注释的解释如下:
//0: 默认值
//>0: 哈希表扩容的阈值
//-1: 哈希表初始化
//<-1: 代表-(sizeCtl)个线程在扩容
private transient volatile int sizeCtl;
//扩容时用来保存扩容前的还剩至少多少个下标需要迁移的下标索引
private transient volatile int transferIndex;
//高并发计算哈希表总数时用
private transient volatile int cellsBusy;
private transient volatile CounterCell[] counterCells;
// views
private transient KeySetView<K,V> keySet;//存储哈希表键的视图容器
private transient ValuesView<K,V> values;//存储哈希表值的视图容器
private transient EntrySetView<K,V> entrySet;//存储哈希表键值的视图容器
private static final sun.misc.Unsafe U;
private static final long SIZECTL;//cas操作sizeCtl变量
private static final long TRANSFERINDEX;//cas操作transferIndex变量
private static final long BASECOUNT;//cas操作baseCount变量
private static final long CELLSBUSY;//cas操作cellsBusy变量
private static final long CELLVALUE;//cas操作CounterCell对象的value变量
private static final long ABASE;
private static final int ASHIFT;
static {
try {
U = sun.misc.Unsafe.getUnsafe();
Class<?> k = ConcurrentHashMap.class;
SIZECTL = U.objectFieldOffset
(k.getDeclaredField("sizeCtl"));
TRANSFERINDEX = U.objectFieldOffset
(k.getDeclaredField("transferIndex"));
BASECOUNT = U.objectFieldOffset
(k.getDeclaredField("baseCount"));
CELLSBUSY = U.objectFieldOffset
(k.getDeclaredField("cellsBusy"));
Class<?> ck = CounterCell.class;
CELLVALUE = U.objectFieldOffset
(ck.getDeclaredField("value"));
Class<?> ak = Node[].class;
ABASE = U.arrayBaseOffset(ak);
int scale = U.arrayIndexScale(ak);
if ((scale & (scale - 1)) != 0)
throw new Error("data type scale not a power of two");
ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
} catch (Exception e) {
throw new Error(e);
}
}
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;//哈希值
final K key;/键
volatile V val;/值,volatile修饰
volatile Node<K,V> next;//链表的下一个节点,volatile修饰
Node(int hash, K key, V val, Node<K,V> next) {/哈希,键值对,下一个节点构造
this.hash = hash;
this.key = key;
this.val = val;
this.next = next;//如果next为null,表示链表只有一个节点
}
public final K getKey() { return key; }
public final V getValue() { return val; }
public final int hashCode() { return key.hashCode() ^ val.hashCode(); }
public final String toString(){ return key + "=" + val; }
public final V setValue(V value) {//不支持直接set操作
throw new UnsupportedOperationException();
}
public final boolean equals(Object o) {//键值对相等且是Map.Entry类
Object k, v, u; Map.Entry<?,?> e;
return ((o instanceof Map.Entry) &&
(k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
(v = e.getValue()) != null &&
(k == key || k.equals(key)) &&
(v == (u = val) || v.equals(u)));
}
/**
* Virtualized support for map.get(); overridden in subclasses.
*/
Node<K,V> find(int h, Object k) {//支持节点查找
Node<K,V> e = this;
if (k != null) {
do {
K ek;
//哈希值相等且key相等
if (e.hash == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
} while ((e = e.next) != null);//链表中的节点挨个找
}
return null;
}
}
节点和HashMap的差不多,都是用链表,但有一个比较特殊的节点,用来在扩容时占位,也就是当查找元素或者添加元素的时候,发现该桶的头节点是占位节点那么如果是查找操作,就需要find操作来进行特殊查找,因为读写并发,是需要特殊处理,如果是添加操作就帮助扩容
static final class ForwardingNode<K,V> extends Node<K,V> {//占位节点
final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {//初始化时只是标记该节点的哈希值为MOVED(-1)
super(MOVED, null, null, null);
this.nextTable = tab;//扩容时如果完成搬迁,那么回到新的数组中去,这里的tab就是新数组,那么查找时到新数组中去查找
}
Node<K,V> find(int h, Object k) {
// loop to avoid arbitrarily deep recursion on forwarding nodes
outer: for (Node<K,V>[] tab = nextTable;;) {//死循环
Node<K,V> e; int n;
if (k == null || tab == null || (n = tab.length) == 0 ||
(e = tabAt(tab, (n - 1) & h)) == null)//键为空或数组为空或桶的头结点为空,返回空
return null;
for (;;) {//自旋
int eh; K ek;
if ((eh = e.hash) == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))//命中返回该节点
return e;
if (eh < 0) {//头结点的哈希值小于0,表示是占位节点
if (e instanceof ForwardingNode) {//占位节点,将持有占位元素的数组赋给tab,跳回到outer,继续查找
tab = ((ForwardingNode<K,V>)e).nextTable;
continue outer;
}
else//调用Node的find方法,挨个查找
return e.find(h, k);
}
if ((e = e.next) == null)//遍历完链表,下一个元素为空,返回null
return null;
}
}
}
}
//通过哈希码计算数组的下标位置
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}
//返回大于输入参数且最近的2的整数次幂的数。比如10,则返回16。
private static final int tableSizeFor(int c) {
int n = c - 1;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}
//获取数组下标为i的元素
@SuppressWarnings("unchecked")
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
//cas设置数组下标为i的值
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
//设置数组下标为i的值
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}
public ConcurrentHashMap() {//默认16大小的哈希表
}
public ConcurrentHashMap(int initialCapacity) {//初始化指定容量的哈希表
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));//tableSizeFor方法确保容量都是2的n次方
this.sizeCtl = cap;
}
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
this.sizeCtl = DEFAULT_CAPACITY;
putAll(m);
}
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, 1);
}
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);//tableSizeFor方法确保容量都是2的n次方
this.sizeCtl = cap;
}
public V put(K key, V value) {
return putVal(key, value, false);
}
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
逐行分析
- put方法会调用putVal方法进行添加,putVal方法的第三个参数用来控制添加时是否能覆盖旧元素,false表示可以覆盖
- ConcurrentHashMap不支持key或value为空,这点和HashMap不一样
//ConcurrentHashmap和Hashmap不一样,ConcurrentHashmap的key和value都不能为null
if (key == null || value == null) throw new NullPointerException();
- 计算哈希值
int hash = spread(key.hashCode());
- 声明一个变量,用来保存是否是因为哈希冲突而添加或修改
int binCount = 0;
- 开始死循环,将当前的哈希数组赋给tab变量
for (Node<K,V>[] tab = table;;) {//死循环哈希表
...
}
- 声明几个变量如下
//f:桶的第一个节点
//n:当前的哈希数组长度
//i:计算出来的哈希数组位置下标
//fh:桶的第一个节点的哈希值
Node<K,V> f; int n, i, fh;
- 判断以下四种情况
7.1 是否需要初始化
if (tab == null || (n = tab.length) == 0)//如果哈希数组为空,则进行初始化
tab = initTable();
哈希表初始化,如果多个线程进入初始化方法,则保证第一个线程去做初始化,其余的让出CPU调度权限,等待第一个线程初始化完后继续第5步的循环
private final Node<K,V>[] initTable() {//初始化哈希表
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {//如果哈希表为空
if ((sc = sizeCtl) < 0)//如果有其他线程在初始化,让出该线程的CPU资源,继续循环判断是否初始化完成,完成后退出
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {//cas设置sizeCtl为-1,表示初始化状态
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;如果设置过了初始容量大小,则用设置的大小,否则用默认的大小16(sizeCtl在构造方法可以被设置成需要创建初始容量大小)
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];//创建存储哈希表的数组
table = tab = nt;//将创建哈希数组赋给成员变量table
sc = n - (n >>> 2);//用减法和位预算代替乘法预算,相当于n = n + n * loadFactor
}
} finally {
sizeCtl = sc;//初始化成功后sizeCtl 设置成扩容阈值
}
break;
}
}
return tab;
}
7.2 如果哈希数组不为空,且插入到的哈希表的桶位置为空,cas设置该位置的头结点(cas保证只有一个线程能操作成功,其余的线程继续跳回第5步循环)
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
break;// no lock when adding to empty bin
}
}
7.3 如果恰巧碰到该桶的位置的头结点的哈希值是占位节点(只有占位节点的哈希值是MOVED,具体看上面的ForwardingNode的构造方法),则表示哈希表在扩容,那么该线程去帮助扩容 else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f);//扩容时分析
7.4 如果7.1,7.2,7.3都不满足,那么肯定是到哈希冲突这里,下面处理哈希冲突的情况
else {
V oldVal = null;
synchronized (f) {//防止并发,锁住头结点,即锁住该桶
//双重判断,如果头结点没有变则进入下面的操作,否则重新循环,这里主要考虑到并发的情况下,头结点被改了,那么第二线程就不应该进入到下面的逻辑,而是要重新循环上面的判断,主要还是扩容的判断,有可能第一个线程去做扩容,而导致头结点被改掉(主要是被改成了ForwardingNode类型的节点,被修改ForwardingNode后需要重新循环后会去帮其扩容,返回一个新的哈希数组,然后再次循环,将值添加到新哈希数组中),防止数据不一致问题
if (tabAt(tab, i) == f) {
if (fh >= 0) {//头结点的元素的哈希值大于等于0表示这是一个链表结构(红黑树的哈希值为-2)
binCount = 1;//链表深度为1
for (Node<K,V> e = f;; ++binCount) {//循环链表
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {//命中
oldVal = e.val;;//保存修改前原有值
if (!onlyIfAbsent)//是否覆盖
e.val = value;//覆盖原有的值
break;//跳出
}
Node<K,V> pred = e;//用来保存链表的前一个节点
if ((e = e.next) == null) {//是否是最后一个节点
pred.next = new Node<K,V>(hash, key, value, null);//新增到链表末尾
break;//跳出
}
}
}
else if (f instanceof TreeBin) {//红黑树
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
//binCount不等于0表示是新增,若果不是覆盖,那么肯定是遍历到了最后节点后新增,这时oldVal==null,binCount=链表的个数
if (binCount != 0) {//如果是新增成功,判断是否需要转红黑树
if (binCount >= TREEIFY_THRESHOLD)//如果链表个数大于等于TREEIFY_THRESHOLD(8),那么尝试转红黑树(不一定转,有可能去扩容)
treeifyBin(tab, i);
if (oldVal != null)//如果原有的值不为空,那么表示这是一个覆盖操作,而不是新增操作,那么直接返回原有值,否则break出去后会进入下面的addCount方法去做元素个数统计
return oldVal;
break;//跳出
}
}
之前说过,当链表的个数大于转红黑树的阈值,是不一定把链表转成红黑树,需要哈希数组的长达大于MIN_TREEIFY_CAPACITY(64)
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
if (tab != null) {
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)//如果哈希数组的长度小于64,尝试去扩容
tryPresize(n << 1);
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
...
}
}
}
- 在第7步的分析中,其实发现只有覆盖操作会直接返回,其余的都算是新增操作,所以除了覆盖,都需要进入下面的统计哈希表的元素数量
addCount(1L, binCount);
return null;
addCount方法主要有两个功能,一个是新增元素后重新统计哈希元素的数量,另一个是新增元素后是否需要扩容
//addCount有两个参数,x表示新增元素的数量,check表示是否需要去做扩容标志位
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {//计算元素数量
...
}
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
下面重点分析下判断扩容的逻辑
8.1 判断是否需要扩容
//大于等于0表示判断是否需要扩容,一般调用新增元素(putVal方法)后该值会大于等于0,其他的像清空哈希表(clear方法, check=-1),则该值小于0,表示不需要做扩容的判断
if (check >= 0) {
...
}
8.2 声明几个变量如下
//tab:当前哈希数组
//nt:扩容后的新哈希数组
//n:当前哈希数组的长度
//sc:sizeCtl的值
Node<K,V>[] tab, nt; int n, sc;
8.3 判断当前哈希表是否需要扩容
//s >= (long)(sc = sizeCtl) 当前的哈希表容量大小大于扩容的阈值(这时的sizeCtl表示扩容的阈值)
// (tab = table) != null 原有的数组不为空
//(n = tab.length) < MAXIMUM_CAPACITY 小于最大容量
//以上三个条件都满足则进入扩容
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
...
}
8.4 进入满足扩容的while循环内,有两个分支如下
8.4.1 获取校验值
int rs = resizeStamp(n);
private static int RESIZE_STAMP_BITS = 16;
static final int resizeStamp(int n) {
return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}
前半部用Integer.numberOfLeadingZeros(n)函数返回当前数组长度n的二进制中,前面的非0个数, Integer.numberOfLeadingZeros(n)方法中,如果n=0返回32(java的int由32组成,也就是32个0),n=1返回31(二进制中的最后一位为非0,前面32位都是0),如果n是负数,则返回0(二进制的第一位是符号位,负数是1,非负数是0)
public static void main(String[] args) throws FileNotFoundException {
System.out.println(Integer.numberOfLeadingZeros(-1));
System.out.println(Integer.numberOfLeadingZeros(0));
System.out.println(Integer.numberOfLeadingZeros(1));
}
输出0,32,31
默认情况下,数组长度为16(DEFAULT_CAPACITY),那么Integer.numberOfLeadingZeros(16)返回27,后半部用1 << (RESIZE_STAMP_BITS - 1),即1左移15位得到int的高16位二进制数,两者异或,得到一个第16位数为1的整数校验值
8.4.2 如果sizeCtl小于0,表示哈希表已经处于扩容状态
if (sc < 0) {//sc小于0,说明已经在扩容中
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);//开始扩容,注意这里的第二个参数用来控制是不是第一个扩容线程,null表示是第一个扩容线程
}
上面的if分支代码也有两个分支:
分支1:判断扩容操作是否已完成
有四个判断来判断是否已扩容完成
判断1:(sc >>> RESIZE_STAMP_SHIFT) != rs
校验值是否相等,即sizeCtl是否发生了变化
判断2:sc == rs + 1 这里应该是表示扩容完成
这里的判断有点奇怪,按理说应该是sc == (rs << RESIZE_STAMP_SHIFT) + 1,因为这时就只要最后一个线程在处理最后一个桶后会使 ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)返回true,而第一个扩容线程会将sizeCtl 的值设置为(rs << RESIZE_STAMP_SHIFT) + 2,所以这里应该是 (rs << RESIZE_STAMP_SHIFT) + 1,之所以是+1而不是+2是因为+2表示只剩最后一个线程处理最后一部分迁移内容,而这最后一个线程处理完后会将nextTable设置成null,那么下面的那个if就为true
扩容完成的代码:
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
判断3:sc == rs + MAX_RESIZERS
达到最大扩容线程数量
判断4:transferIndex <= 0
已经没有任何需要搬迁的桶的下标,也就是说搬迁时需要从原数组从后往前搬,加入原数组长度是16,那么搬到最后一个桶的下标是0,而这个transferIndex正式存储了这个最后一次被搬运的下标值,如果小于等于0就是表示已经没有任何桶需要再继续搬迁了
满足以上其中一个条件表示,跳出循环,即不需要做扩容操作
分支2:如果扩容操作未完成,cas的将sizeCtl的值+1,表示有一个新的线程参与扩容,该线程参与扩容完成后也会将sizeCtl的值-1,表示已有一个扩线程退出扩容
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))//将sizeCtl+1,表示参与扩容的线程数+1,最后一个扩容的线程结束后会回到(rs << RESIZE_STAMP_SHIFT) + 2)
transfer(tab, nt);//开始扩容,注意这里的第二个参数用来控制是不是第一个扩容线程,null表示是第一个扩容线程
8.4.3 如果sizeCtl大于等于0,表示哈希表还未处于扩容状态(sizeCtl刚开始为库容的阈值)
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))//如果没有开始扩容,cas设置sizeCtl 的值为(rs << RESIZE_STAMP_SHIFT) + 2
transfer(tab, null);//开始扩容,注意这里的第二个参数用来控制是不是第一个扩容线程,null表示是第一个扩容线程
s = sumCount();//重新计算哈希表容量(有可能扩容后又并发的新增元素后有达到扩容阈值,继续上面的扩容判断循环)
刚开始扩容时,sizeCtl>0,cas修改sizeCtl为rs << RESIZE_STAMP_SHIFT) + 2
U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)
rs的高16未末因为异或为1,那么这里再左移16位,也就是到32位,即首位操作符肯定是1,所以rs << RESIZE_STAMP_SHIFT肯定是个很大的负数,举个例子
默认数组大小为16,resizeStamp(16)返回32795,32795转成二进制,然后左移16位+2 = -2145714174(System.out.println((32795 << 16) + 2);)
在上面的分析中可以看到,当新增元素且不是覆盖的那种则都会调用addCount方法进行计算,再看addCount方法的计算前需要先看哈希表的下面几个成员变量的含义
//存储哈希表所有元素的总和
private transient volatile long baseCount;
//高并发计算哈希表总数时用
private transient volatile int cellsBusy;
private transient volatile CounterCell[] counterCells;
其中CounterCell是一个计数器,只有在高并发的场景下才会使用它来做计数,一般场景下用baseCount就可以了
/**
* A padded cell for distributing counts. Adapted from LongAdder
* and Striped64. See their internal docs for explanation.
*/
@sun.misc.Contended static final class CounterCell {
volatile long value;
CounterCell(long x) { value = x; }
}
CounterCell只有一个成员变量和一个构造方法,它的实现思路是和LongAdder一样,使用分治法降低cas操作失败,也有可能分治合并结果时出现结果不准
接下来看ConcurrentHashMap的两个获取哈希表元素数量的方法
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
public long mappingCount() {
long n = sumCount();
return (n < 0L) ? 0L : n; // ignore transient negative values
}
这两者的区别size只能支持到Int的大小数量,而mappingCount则是支持long级别的数量大小,两者都是需要依赖sumCount方法来进行统计
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {//如果counterCells数组不为空,循环数组,和baseCount累加后返回
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
sumCount方法很简单,如果counterCells数组不为空,则将它的value逐个跟baseCount累加后返回,那么,counterCells什么情况下不为null呢?答案就在addCount中的统计部分
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
if ((as = counterCells) != null ||//counterCells数组不为空
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {//cas设置baseCount=baseCount+x失败
CounterCell a; long v; int m;
boolean uncontended = true;
//如果counterCells为空或者随机一个counterCells数组上的value为空或者cas设置cellValue失败
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);//这里会对counterCells进行初始化后分段统计
return;
}
if (check <= 1)//如果是修改的链表深度不超过1,则认为哈希是均匀的,这时候不需要考虑扩容,统计后返回
return;
s = sumCount();//重新计算哈希表元素总数,这里会包含counterCells的统计
}
...
}
从上面可以看到addCount的统计部分的第一个判断就是已经存在counterCells或者并发激烈导致cas设置失败的时候会进入counterCells的初始化(刚开始counterCells数组肯定为空),而最终的统计算法就在fullAddCount方法中实现,fullAddCount方法中的实现思路和LongAdder一样,这也解释了为什么这里的统计不用cas直接去计算baseCount的值,因为高并发下的cas失败率会很高,cpu负载加重,而且大部分的cas都是无用的重试,为了优化这个问题,作者也就跟着LongAdder的思路去优化这个cas问题,其目的就是减少cas的次数来提高效率
扩容在ConcurrentHashMap中主要由以下几个点触发
9.1 当添加一个元素后,如果链表中的长度大于等于8,则不一定会去转红黑树,而是有可能先去做扩容,减少链表的长度
final V putVal(K key, V value, boolean onlyIfAbsent) {//putVal上面已经分析过,看主要部分
...
if (binCount >= TREEIFY_THRESHOLD)//TREEIFY_THRESHOLD默认是8,代表链表转红黑树的阈值
treeifyBin(tab, i);
...
}
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
if (tab != null) {
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)//这里如果当前哈希数组的长度小于MIN_TREEIFY_CAPACITY(64)的话其实是不会去将链表转红黑树存储,而是调用tryPresize去扩容
tryPresize(n << 1);//参数为n左移1位也就是n*2
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
...
}
}
}
接下来看tryPresize方法做了什么:
private final void tryPresize(int size) {
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(size + (size >>> 1) + 1);//确保扩容的数组长度是2的n次方
int sc;
while ((sc = sizeCtl) >= 0) {//sizeCtl大于等于0表示要么扩容完成后sizeCtl保存了新阈值要么还没有扩容
Node<K,V>[] tab = table; int n;
if (tab == null || (n = tab.length) == 0) {//还行数组长度为空则先进行初始化,和initTable方法一样,这部分主要是在putAll中使用,上面的方法不会进入到分支中
n = (sc > c) ? sc : c;
if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {//将sizeCtl设置为-1,表示初始化
try {
if (table == tab) {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];//初始化数组
table = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;//设置扩容阈值
}
}
}
else if (c <= sc || n >= MAXIMUM_CAPACITY)//如果达到哈希容量的最大值,跳出
break;
else if (tab == table) {//下面的思路和addCount方法中的判断扩容部分一样
int rs = resizeStamp(n);
if (sc < 0) {//sizeCtl小于0,说明哈希表已经在扩容中
Node<K,V>[] nt;
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)//上面的这5个判断都是用来判断扩容是否完成或者达到上限,其中一个满足则跳出,不需要扩容
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))//将sizeCtl的值加1,表示已有一个线程参与扩容,如果cas成功的话
transfer(tab, nt);//扩容
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))//将sizeCtl的值设置为 (rs << RESIZE_STAMP_SHIFT) + 2),如果cas成功
transfer(tab, null);//扩容
}
}
}
9.2 当新增完元素后也会判断是否需要扩容
final V putVal(K key, V value, boolean onlyIfAbsent) {//putVal上面已经分析过,看主要部分
...
addCount(1L, binCount);
...
}
private final void addCount(long x, int check) {
...
}
上面的那两个方法在第7部分已经分析过了,这里就不详说,这里只要知道新增完元素后也会判断是否需要扩容
9.3 在新增元素的时候,发现计算出来的哈希值对应的桶的头结点类型是ForwardingNode占位节点,这时已经表明哈希表在扩容,帮助其扩容
final V putVal(K key, V value, boolean onlyIfAbsent) {//putVal上面已经分析过,看主要部分
...
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
...
}
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
//ForwardingNode是占位节点,扩容时有两种情况会被标记为ForwardingNode节点
//1. 迁移时该桶的位置上没有任何元素,直接标记为ForwardingNode节点
//2. 当一个线程搬迁玩该桶上的所有元素后也会标记为ForwardingNode节点
//也就是说ForwardingNode节点就是代表了迁移完成了的节点,所以他的属性nextTable指向新的哈希数组
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {//确保是ForwardingNode节点
int rs = resizeStamp(tab.length);//计算校验值
//这个while有三个判断,分析如下
//nextTab == nextTable:当扩容完成后,成员变量nextTable会被设置为null,所以这个判断应该是扩容完后返回false
//table == tab:当扩容完成后,成员变量table=新哈希数组,所以这个判断也和上面一样是扩容完成后返回false
//sizeCtl < 0:sizeCtl含义上面说过了,当sizeCtl小于0的时候代表哈希表还正在扩容(-1表示初始化,但这个只有1次)
//综上判断这个while条件用来判断是否扩容完
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
//下面的处理也是和addCount方法一样
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)//判断是否库容完
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {//将sizeCtl的值加1,表示已有一个线程参与扩容,如果cas成功的话
transfer(tab, nextTab);//扩容
break;
}
}
return nextTab;//返回新哈希数组
}
return table;
}
上面的三个扩容点已经全部分析完,接下来分析扩容最终的执行方法transfer
9.4 扩容
//两个参数,tab:原哈希数组,nextTab:新哈希数组(如果为null,那么表示第一个线程扩容,需要去初始化它,外围控制它是否是null还是非null)
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
//声明两个变量,n:原哈希数组长度,stride:一个线程一次搬运多少个桶
int n = tab.length, stride;
//根据cpu核数计算出一个线程一次至少搬运多少个桶,如果小于MIN_TRANSFER_STRIDE(16),那么设置成MIN_TRANSFER_STRIDE,表示至少一次搬运16个桶
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) {//红黑树结构
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
上面的方法比较长,分几部分来讲解 9.4.1 新哈希数组初始化
//如果新哈希数组为空,则表示需要先初始化它,这个null只有在扩容时cas成功后的第一个线程会是null,其余都是非null,可以在之前的addCount等方法中找到
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];//创建一个原有哈希数组长度两倍的新哈希数组
nextTab = nt;//保存到参数变量nextTab中
} catch (Throwable ex) { // 如果因为OOM导致上面的分配数组内存不够,将sizeCtl设置为最大容量数,不继续扩容
sizeCtl = Integer.MAX_VALUE;//设置数组上限,防止继续扩容
return;//返回
}
nextTable = nextTab;//成功则将新数组出给成员变量nextTable
transferIndex = n;//transferIndex初始化时记录原有数组的长度,也是个成员变量,它表示可以被搬迁的数组的最大下标
}
9.4.2 搬运前控制变量的声明和准备
int nextn = nextTab.length;//新数组的长度
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);//创建一个占位的节点,当其他线程发现是占位节点,则会跳过,继续处理下一个节点,被占位后的节点表示该桶要么为空,要么搬迁完了
boolean advance = true;//用来表示是否能继续领取处理下一个桶的标志位
boolean finishing = false; // to ensure sweep before committing nextTab //标记是否扩容完成
9.4.3 分段领取搬迁任务
//死循环,开始扩容,这里有两个初始化参数,i表示数组的下标,即桶在数组中索引值,bound是用来记录当前线程可以处理的桶的下标
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {//advance为true,开始领取需要处理的桶区间(一般一次领16个桶)
int nextIndex, nextBound;
//i-1小于等于0或者扩容完成,不需要继续领取(i-1大于bound会在下面的第二个elseif分支处理,初始情况下该条件用于为false)
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {//小于等于0表示没有其他区间可以领取,可以退出
i = -1;//跳出这个while后会用到来判断当前线程是否需要继续参与扩容
advance = false;//不需要再领取
//cas的领取自己的区间,默认从0开始,否则从nextIndex - stride开始,
//这里假设默认原有数组是16,那么第一个线程进来后cas成功,则nextBound=transferIndex=0,i=15,bound=0,
//即该线程处理数组下标0到15的元素,第二个线程则在第二次循环时进入到上面的第一个elseif分支,不会继续领取任务,而是去做退出,
//如果原有数组是64,那么第一个线程cas成功,则transferIndex=nextBound=64-16=48,i=63,bound=48,
//即该线程处理数组下标48到63的元素,第二个线程下次循环后也会cas,设置transferIndex=nextBound=48-16=32,i=47,bound=32,
//即该线程处理数组下标32到47的元素
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;//负责搬迁的起始下标
i = nextIndex - 1;//负责搬迁的结束下标
advance = false;//领取到指定的搬运区间后,跳出while循环去搬迁
}
}
}
...
}
上面的代码主要通过i去控制数组的下标,从后往前搬,而bound控制搬运的起始下标,这样通过i和bound就可以判断是否完成了自己的任务,完成任务后继续判断还有没有其他数组区间可以领取,也就是transferIndex控制的部分,它表示可以被搬迁的数组的最大下标,如果小于等于0,表示最头部也已经被领走了,就不需要再继续领任务跳出循环,如果可以继续领,则cas的去领取一段数组区间
9.4.3 判断自己的搬迁任务是否已经执行完或者扩容是否已完成
//i<0 表示该线程没有领到迁移区间
//i>=n 没找到满足它的条件??
//i+n>=nextn 没找到满足它的条件??
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {//扩容成功
nextTable = null;//设置nextTable为null,表示扩容完成,addCount方法时会根据判断
table = nextTab;//将新数组覆盖原数组
sizeCtl = (n << 1) - (n >>> 1);//设置新的扩容阈值
return;
}
//将sizeCtl-1表示已有一个线程已处理玩扩容工作
//当最后减到(sc - 2) = resizeStamp(n) << RESIZE_STAMP_SHIFT时与刚开始扩容时设置的sc的值相对应,表示扩容完成
//U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;//结束当前线程的扩容操作
finishing = advance = true;//设置扩容完成标记
i = n; //重新从原有的数组从后到前挨个遍历,确保所有桶都已经迁移完成(即都被标记为ForwardingNode),最后当i减到小于0时,进入上面的那个if退出
}
}
这部分刚开始有个判断,有三个条件,后面的两个条件有点奇怪,首席第一个判断到好理解,前面分析过i是领取的任务的最大区间下标,它是从数组的从后往前挪,也就是每次循环后都要--i,所以不管多少个线程,i到最后总能减到0以下,而减到0以下无非就是没任务领了,或者扩容完成了,所以当i小于0就是进入该线程退出扩容的标记,而后面的两个没看懂什么情况下满足加入原数组长度是48,那么i=47,而i又是递减的,肯定不满足i>=n;而nextn表示新数组长度,也就是48*2=96,那么i+n=47+48=95<96,也是不满足i + n >= nextn的条件
接下来根据扩容完成标记变量finishing判断是否库容完成,如果是扩容完成,那么需要完成一些扫尾操作(只有一个线程在搬迁玩后做扫尾工作,其余的线程都会提前退出)
最后也是最重要的一部分,判断是不是最后一个线程扩容完成,只有最后一个线程完成后才会去做扫尾工作 前面的可以进入扩容的方法,比如addCount,可以知道sizeCtl在扩容时会被设置成一个很小的负数,然后除了第一个线程,往后的所有参与扩容的线程都会讲sizeCtl+1,表示有一个线程参与进来了,而这里每个线程都会讲sizeCtl-1,回到最原始的扩容时的sizeCtl的值,而第一个进程在搬迁时会将sizeCtl设置成(rs << RESIZE_STAMP_SHIFT) + 2),所以这个值就是判断是不是最后一个线程搬迁完的标记,相等则是最后一个线程,否则还有其他线程还在搬迁,则先退出返回
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;//结束当前线程的扩容操作
最后一个线程除了做扫尾的工作外还要再次检查一遍真个哈希表是否已经全部被搬迁完了
i = n; // recheck before commit
这时i会回到原数组的大小,然后回到循环,从后往前走(这时finishing已经为true,不会在while中逗留,while循环只是将i-1),逐步判断所有的桶的头结点是不是ForwardingNode(表示处理完成的桶)
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
一直循环,直至最后i减到小于0,进入到上面的9.4.3的if分支中,做扫尾工作
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
这样整个扩容就算完成了
9.4.4 如果要搬迁的桶是空,那么直接将其标记为ForwardingNode占位节点,没有元素要搬迁,也就是相当于搬迁完成
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);//如果cas成功,advance=true,继续处理下一个桶(回到while后i会先减1),如果cas失败,返回false,重新循环后会回到这个else if,发现(f = tabAt(tab, i)) == null(这时tabAt(tab, i)=fwd),又继续循环,跳到下面9.4.5的一个判断后继续处理下一个桶
9.4.5 如果发现的桶的头结点是ForwardingNode占位节点,跳过这个桶,继续下一个桶的迁移
else if ((fh = f.hash) == MOVED)
advance = true; //advance设置为true,回到while循环继续处理下一个桶,也就是tabAt(tab, i-1)
9.4.6 搬迁桶下面的所有元素
进到这一步说明这个桶的头结点没有被其他线程处理过且有元素在上面,第一个要做的就是给这个头结点加锁,防止并发修改
synchronized (f) {//锁住头节点,防止put进新元素
if (tabAt(tab, i) == f) {//保证锁住的头结点和当前位置上头结点相等
Node<K,V> ln, hn;//两个变量,ln(low node)表示低位节点,hn(hign node)表示高位节点
if (fh >= 0) {//头结点的哈希值大于0,表示是链表结构(红黑树数为-2)
//&操作符使runBit的值要么0,那么1,因为数组的长度总是为2的整数倍,转成二进制只有一个位为1,其余为0,假如,当前节点位置为5,原数组长度为16,那么这里的runBit要么为5,要么为16+5=21
int runBit = fh & n;
Node<K,V> lastRun = f;//记录最后一个属于runBit的最后一个节点
for (Node<K,V> p = f.next; p != null; p = p.next) {// 循环遍历链表
int b = p.hash & n;
if (b != runBit) {//如果不等,重新设置runBit和lastRun,这样能保障lastRun之后的节点的runBit都是一样
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {//如果runBit为0,则放低水位
ln = lastRun;
hn = null;
}
else {//如果runBit不等于0,则放高水位
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {//循环到lastRun节点,因为lastRun节点后的(ph & n)都一样
int ph = p.hash; K pk = p.key; V pv = p.val;
Node的构造方法中的最后一个参数表示下一个节点
if ((ph & n) == 0)//构造低位节点的链表
ln = new Node<K,V>(ph, pk, pv, ln);
else//构造高位节点的链表
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);//设置低位的链表到新数组的i位置
setTabAt(nextTab, i + n, hn);//设置高位的链表到新数组中的i+n位置
setTabAt(tab, i, fwd);//将该节点标记为ForwardingNode节点,表示该位置已经迁移完
advance = true;//继续下一位置的搬迁
}
}
else if (f instanceof TreeBin) {//红黑树结构
...
}
}
}
总结一下,上面的库容步骤主要就三步
- 领取区间任务
- 处理迁移任务
- 扩容完的扫尾工作
get方法相对上面来说就简单多了
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());//计算哈希值
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {//如果命中的桶的头结点不为空
if ((eh = e.hash) == h) {//要找的元素是否就是在头结点
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
//eh = -1:ForwardingNode
//eh = -2:TreeNode
else if (eh < 0)//如果不在头结点需要深入查找,头结点的哈希值如果小于0,则说明是红黑树结构,调用红黑树节点TreeNode的查找方法,或者是扩容时的ForwardingNode节点,这时也会去调用ForwardingNode的find方法(TreeNode和ForwardingNode都重写了find方法),而ForwardingNode中保存了新的哈希数组nextTable的引用,这样就能指引到去新哈希数组上查找了
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {//如果是链表结构,挨个遍历链表上的节点
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;//找不到返回null
}
public void clear() {
long delta = 0L; // negative number of deletions 记录被清空的元素数量,负数
int i = 0;//数组下标
Node<K,V>[] tab = table;
while (tab != null && i < tab.length) {//遍历哈希表
int fh;
Node<K,V> f = tabAt(tab, i);
if (f == null)//如果该桶为空
++i;//去取下一个位置的桶
else if ((fh = f.hash) == MOVED) {//如果是处于扩容的占位节点
tab = helpTransfer(tab, f);//帮助其扩容后返回新的哈希表
i = 0; // restart 重新开始循环新的哈希表
}
else {
synchronized (f) {//锁住头结点
if (tabAt(tab, i) == f) {
Node<K,V> p = (fh >= 0 ? f :
(f instanceof TreeBin) ?
((TreeBin<K,V>)f).first : null);
while (p != null) {//遍历链表或红黑树
--delta;
p = p.next;
}
setTabAt(tab, i++, null);//设置值为null
}
}
}
}
if (delta != 0L)//不等于0说明有元素被设置为空
addCount(delta, -1);//重新计算,第一个参数表示要减掉的数量,所以delta是个负数,第二个参数为-1表示不需要做扩容判断
}