0%

ConcurrentHashMap源码分析

前言

ConcurrentHashMap的源码长的让人惊叹。
我在idea打开的1.8版本达到了6000多行。

ConcurrentHashMap是多线程安全的HashMap,但是和HashTable的简单的加上同步的方法不同,他运用了很多高级复杂的方法来进行同步,很多都是避免的昂贵的锁操作。

底层其实有很多和HashMap相同的地方,比如处理Hash冲突都是链表和红黑树。
树化和链表化的阈值都是一样的。

简单的说有下面几个重点:

  • Unsafe的运用,就是CAS操作
  • 每个bin为一把锁
  • 红黑树的读写分离
1
2
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>  
implements ConcurrentMap<K,V>, Serializable

ConcurrentMap

ConcureentHashMap实现了ConcurrentMap接口,这个接口的方法全部来自Map
看起来只是一个标记作用,但是其实不是。
比如在Map

1
2
3
4
5
6
7
default V putIfAbsent(K key, V value) {
V v = get(key);
if (v == null) {
v = put(key, value);
}
return v;
}

putIfAbsent方法被设为default,只要implements了这个接口就自动获得这个方法。
但是在ConcurrentMap中,

1
V putIfAbsent(K key, V value);

去掉了default,这样在ConcurrentHashMap就必须自己实现了。
ConcurrentHashMap的实现都是原子的。

阈值参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
//数组的最大长度,就是桶的数量,下面简称bin了
private static final int MAXIMUM_CAPACITY = 1 << 30;

//如果我们没有指定初始的bin的大小,默认是16,2的4次方
private static final int DEFAULT_CAPACITY = 16;

//和toArray及相关的方法有关,最大的Array大小
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

//和之前的版本的分段锁有关,这个我们最后再谈,这个保留这个只是为了保持兼容性
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;

//默认的负载因子,当桶被填了超过百分之75时,会resize,增加桶的数量
//换句话说,当空的bin少于百分之25时,会增加bin的数量
private static final float LOAD_FACTOR = 0.75f;

//当bin中链表的数量超过8时,会变成红黑树
static final int TREEIFY_THRESHOLD = 8;

//当树中节点少于6时,会变成链表。
static final int UNTREEIFY_THRESHOLD = 6;

//进行树化的另一个条件,就是需要bin的数量达到64
static final int MIN_TREEIFY_CAPACITY = 64;

private static final int MIN_TRANSFER_STRIDE = 16;
private static int RESIZE_STAMP_BITS = 16;

//help resize的最大线程数
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;

private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;

//在扩容时的状态
static final int MOVED = -1;
static final int TREEBIN = -2;
static final int RESERVED = -3;

Node类

链表Node

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
//为了简化删了一些,重点是这个类的setValue方法,是不支持的。
//这个是链表节点的类,而树节点的类在下面。
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;

Node(int hash, K key, V val, Node<K,V> next) {
this.hash = hash;
this.key = key;
this.val = val;
this.next = next;
}
public final int hashCode() { return key.hashCode() ^ val.hashCode(); }
public final String toString(){ return key + "=" + val; }
public final V setValue(V value) {
throw new UnsupportedOperationException();
}

Node<K,V> find(int h, Object k) {
Node<K,V> e = this;
if (k != null) {
do {
K ek;
if (e.hash == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
} while ((e = e.next) != null);
}
return null;
}
}

红黑树节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//这个是红黑树的节点类,继承了Node类。
static final class TreeNode<K,V> extends Node<K,V> {
TreeNode<K,V> parent; // red-black tree links
TreeNode<K,V> left;
TreeNode<K,V> right;
TreeNode<K,V> prev; // needed to unlink next upon deletion
boolean red;

TreeNode(int hash, K key, V val, Node<K,V> next,
TreeNode<K,V> parent) {
super(hash, key, val, next);
this.parent = parent;
}

Node<K,V> find(int h, Object k) {
return findTreeNode(h, k, null);
}

/**
* Returns the TreeNode (or null if not found) for the given key
* starting at given root.
*/
final TreeNode<K,V> findTreeNode(int h, Object k, Class<?> kc) {
//...
}
}

红黑树Bin

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//这个是红黑树的一个抽象。也是继承了Node类,
//所以在进行一些操作的时候需要用instanceof进行一些判断。
//我们一开始说ConcurrentHashMap和之前的分段锁不同,
//他每个bin都是一把锁,所以在TreeBin中有一些lock函数
//这个类还自带读写锁属性,因为ConcurrentHashMap的读是没有给头Node进行同步
//而是直接调用的红黑树的查找方法
//如果是链表的话,不加锁,但是如果是红黑树的话,会加上读锁
//线程得到写锁的时候必须先等读锁释放。
static final class TreeBin<K,V> extends Node<K,V> {
TreeNode<K,V> root;
volatile TreeNode<K,V> first;
volatile Thread waiter;
volatile int lockState;
// values for lockState
static final int WRITER = 1; // set while holding write lock
static final int WAITER = 2; // set when waiting for write lock
static final int READER = 4; // increment value for setting read lock
//TreeBin的put,get,remove基本操作。就是红黑树的操作。
}

ForwardingNode

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
//这是一个特殊的Node,当table正在transfer的时候,会被切到原table的bin的头结点去
//用于连接原table和nextTable
//它包含了nextTable,hash值为-1,其他的都是null。
//在并发transfer的时候,如果发现这个节点是ForwardingNode,那就说明这个节点的转移已经完成了。
static final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {
super(MOVED, null, null, null);
this.nextTable = tab;
}

Node<K,V> find(int h, Object k) {
// loop to avoid arbitrarily deep recursion on forwarding nodes
outer: for (Node<K,V>[] tab = nextTable;;) {
Node<K,V> e; int n;
if (k == null || tab == null || (n = tab.length) == 0 ||
(e = tabAt(tab, (n - 1) & h)) == null)
return null;
for (;;) {
int eh; K ek;
if ((eh = e.hash) == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
if (eh < 0) {
if (e instanceof ForwardingNode) {
tab = ((ForwardingNode<K,V>)e).nextTable;
continue outer;
}
else
return e.find(h, k);
}
if ((e = e.next) == null)
return null;
}
}
}
}

重要的成员变量


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
//cpu的个数
static final int NCPU = Runtime.getRuntime().availableProcessors();

//bin数组,在第一次插入操作进行初始化
transient volatile Node<K,V>[] table;

//扩容时的数组
private transient volatile Node<K,V>[] nextTable;

//当前键值对总个数
private transient volatile long baseCount;

//这个比较重要,是控制标识符,源码中叫Table initialization and resizing control
//就是进行初始化和扩容的控制。
//当是负数时,table正在初始化或者扩容
//-1代表正在初始化 ,-N 表示有N-1个线程正在进行扩容操作
//当table是null时,如果这个值大于0,那就是tablede初始化大小,如果等于0,那就使用默认大小
//当table不是null时,等于下一次resize时node需要达到的数量,就是容量乘以负载因子。
private transient volatile int sizeCtl;
private transient volatile int transferIndex;

//自旋锁
private transient volatile int cellsBusy;

//这个与size有关
private transient volatile CounterCell[] counterCells;


//下面这三个和迭代有关
private transient KeySetView<K,V> keySet;
private transient ValuesView<K,V> values;
private transient EntrySetView<K,V> entrySet;

unsafe

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// Unsafe mechanics
private static final sun.misc.Unsafe U;
private static final long SIZECTL;
private static final long TRANSFERINDEX;
private static final long BASECOUNT;
private static final long CELLSBUSY;
private static final long CELLVALUE;
private static final long ABASE;
private static final int ASHIFT;

static {
try {
U = sun.misc.Unsafe.getUnsafe();
Class<?> k = ConcurrentHashMap.class;
SIZECTL = U.objectFieldOffset
(k.getDeclaredField("sizeCtl"));
TRANSFERINDEX = U.objectFieldOffset
(k.getDeclaredField("transferIndex"));
BASECOUNT = U.objectFieldOffset
(k.getDeclaredField("baseCount"));
CELLSBUSY = U.objectFieldOffset
(k.getDeclaredField("cellsBusy"));
Class<?> ck = CounterCell.class;
CELLVALUE = U.objectFieldOffset
(ck.getDeclaredField("value"));
Class<?> ak = Node[].class;
//可以获取数组第一个元素的偏移地址
ABASE = U.arrayBaseOffset(ak);
//arrayIndexScale可以获取数组的转换因子,也就是数组中元素的增量地址
//将arrayBaseOffset与arrayIndexScale配合使用,可以定位数组中每个元素在内存中的位置。
int scale = U.arrayIndexScale(ak);
if ((scale & (scale - 1)) != 0)
throw new Error("data type scale not a power of two");
ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
} catch (Exception e) {
throw new Error(e);
}
}

辅助方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
//让hash值分布的更均匀,让原本的hash值前16位和后16位取异或在和HASH_BITS取与。
static final int HASH_BITS = 0x7fffffff;
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}

//找到与c最接近的2的整数次方的那个数,用的方法很巧妙。
private static final int tableSizeFor(int c) {
int n = c - 1;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}

//得到tab第i个位置的Node,用了unsafe,很玄幻的操作。
//为什么不直接用tab[i]返回呢
//因为在java内存模型中,我们已经知道每个线程都有一个工作内存,里面存储着table的副本
//虽然table是volatile修饰的,但不能保证线程每次都拿到table中的最新元素
//Unsafe.getObjectVolatile可以直接获取指定内存的数据,保证了每次拿到数据都是最新的。
//来自参考的第一篇文章。
//这里的ABASE指的是一个元素的长度
//i << ASHIFT计算的是第i个元素和第一个的偏移

Class<?> ak = Node[].class;
ABASE = U.arrayBaseOffset(ak);
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}

//原理同上。
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}

树化操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
//将index位置的tab树化
//但是如果table小于64,而直接扩容
//运用了同步,所以在多线程中是安全的。
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
if (tab != null) {
//如果table的长度小于64,则不转树,而进行扩容到两倍
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
tryPresize(n << 1);
//不然就进行转树操作。
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
synchronized (b) {
if (tabAt(tab, index) == b) {
TreeNode<K,V> hd = null, tl = null;
for (Node<K,V> e = b; e != null; e = e.next) {
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);
if ((p.prev = tl) == null)
hd = p;
else
tl.next = p;
tl = p;
}
setTabAt(tab, index, new TreeBin<K,V>(hd));
}
}
}
}
}

树退化为链表


这个操作只有在transfer时,发现红黑树节点少于6,才会进行。

1
2
3
4
5
6
7
8
9
10
11
12
static <K,V> Node<K,V> untreeify(Node<K,V> b) {
Node<K,V> hd = null, tl = null;
for (Node<K,V> q = b; q != null; q = q.next) {
Node<K,V> p = new Node<K,V>(q.hash, q.key, q.val, null);
if (tl == null)
hd = p;
else
tl.next = p;
tl = p;
}
return hd;
}

初始化方法


构造函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
//空的,使用默认值
public ConcurrentHashMap() {

}
//自定义初始化的容量大小,就是bin的数量。
//其中的tableSizeFor方法在HashMap中是一样的,为了保证容量是2的整数次方
//会找到最接近initialCapacity的那个2的整数次方数。
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}

//还可以自定义负载因子,但是并不能改变系统的负载因子,仅仅是用在构造函数中。
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, 1);
}
//自定义负载因子,初始容量,并发线程数,concurrencyLevel的是并发更新map的线程数
//就是1.8之前的实现的分段锁的个数,现在已经不用,在这儿仅仅为了保持兼容性。
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}

table初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
//table的初始化,这里需要考虑多线程竞争。
private final Node<K,V>[] initTable() {
Node<K,V>[] tab;
int sc;
while ((tab = table) == null || tab.length == 0) {
//如果已经有线程正在进行初始化,那么调用一下yield进入自旋等待初始化结束
//这里的yield其实取决于操作系统,作用是提醒操作系统我这个线程主动让出时间片。
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
//cas操作,只有一个线程进入初始化,通过cas设为-1。
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
//这个相当于sc = n*0.75,也就是乘上了负载因子0.75
//不过这种写法也真的是服。。。。
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}

基本操作

put方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
public V put(K key, V value) {
return putVal(key, value, false);
}

//第三个参数是是否只有在没有的情况进行put
final V putVal(K key, V value, boolean onlyIfAbsent) {
//key和value都不允许null
if (key == null || value == null) throw new NullPointerException();
//得到key的hash值,spread函数让他分布的更均匀点。
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
//如果table还没有进行初始化,进行初始化。
if (tab == null || (n = tab.length) == 0)
tab = initTable();
//如果tab[i]
//的位置还是空的,就进行cas操作set进去
//如果cas成功,就put成功了,就直接退出循环。
//如果失败,那就进行自旋,再次尝试插入。
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break;
}
//如果发现为-1,说明有其他线程正在进行扩容操作,就不自旋了
//一起去参加扩容把,关于扩容参见下面
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
//准备进行同步方法,和分段不同的是
//这里会给tab[i]的元素进行加锁。相当于一个bin一个锁。
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {
//如果fh大于0,说明这个bin是链表
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
//不然就是红黑树,调用TreeBin的put方法。
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
//如果链表长度大于阈值,就进行树化
//但是其实进去treeifyBin就会看到,还需要bin的数量大于64
//不然就是直接扩容
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
//如果插入成功,就总数加一,判断要不要进行扩容
addCount(1L, binCount);
return null;
}

get方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
//得到hash值
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
//说明正在进行扩容操作,node是forwardingNode,调用forwardingNode的find方法。
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}

remove

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public V remove(Object key) {
return replaceNode(key, null, null);
}

final V replaceNode(Object key, V value, Object cv) {
int hash = spread(key.hashCode());
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0 ||
(f = tabAt(tab, i = (n - 1) & hash)) == null)
break;
//如果正在进行迁移,那么就加入transfer行列。
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
boolean validated = false;
//否则就对table[i]进行加锁
synchronized (f) {
//...省略了,和get的方法差不多
}
if (validated) {
if (oldVal != null) {
if (value == null)
addCount(-1L, -1);
return oldVal;
}
break;
}
}
}
return null;
}

扩容操作

扩容操作分为两步

  • 初始化nextTable数组,这个由单线程完成
  • 迁移table中的数据到nextTable中,这个可以并发的进行
    主要操作是在transfer中,transfer方法可能被下面三个函数调用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
//扩容到size
private final void tryPresize(int size) {
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(size + (size >>> 1) + 1);
int sc;
//sizeCtl>=0说明没有正在扩容,或者表还没初始化
while ((sc = sizeCtl) >= 0) {
Node<K,V>[] tab = table; int n;
//如果表还没初始化
if (tab == null || (n = tab.length) == 0) {
//扩容的值取两者的较大的
n = (sc > c) ? sc : c;
//进行cas操作
if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
//成功,进行初始化
if (table == tab) {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
}
}
//如果现在的容量已经到达最大值或者扩容值比现在的小,就直接退出
else if (c <= sc || n >= MAXIMUM_CAPACITY)
break;
//已经进行过初始化
else if (tab == table) {
int rs = resizeStamp(n);
//如果sc小于0,说明正在进行扩容
//不过这个可能小于0么,,我看了看好像不太可能啊。
if (sc < 0) {
Node<K,V>[] nt;
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
//这里才是真正的进行扩容操作。
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
}
}
}

static final int resizeStamp(int n) {
return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}

//检查并进行扩容操作。
//首先初始化nextTable,大小为原来table大小的两倍
//把当前的size加上x,然后检查是否要进行扩容。
//这里涉及到size的计算,这个我们下面会讲。
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
//如果sc小于0,说明正在进行扩容操作,而加入进去进行扩容。
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
//不然就进行扩容,同nextTable中传了一个null进去
//在transfer中会检查,如果是null就会先new一下nextTable。
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}

//因为进行扩容时,需要把原来的元素搬到nextTable中,这个过程可以各个线程协助完成。
//如果在put操作中发现table正在扩容,则当前线程就会加入help转移的队伍中来。
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
int rs = resizeStamp(tab.length);
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
//这里把通过cas把sc+1,如果成功,就加入transfer行列。
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}

其中并发迁移操作又分为几步:

  • 首先从table的最后一个节点开始找,如果table[i]是空,则通过cas把一个ForwardingNode放进去,标志已经迁移结束
  • 如果table[i]是forwardingNode,则继续向前找
  • 如果table[i]不为空,那么就对这个点进行加锁,把这个点的节点放到nextTable的i和i+n中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
//这个是扩容的转移函数,也是最重要的部分,上面的函数都对这个进行了调用。

//其中transferIndex标记了在transfer中之前的table长度。
private transient volatile int transferIndex;

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE;

//这里的判断为什么没有加锁呢,在多线程情况下看起来好像会造成错误
//我看了下这个函数的调用,其实就是上面几个,上面几个调用的时候
//nextTab肯定是已经进行了初始化的
//除了addCount中,那个也进行了cas进行保护。
if (nextTab == null) {
try {
@SuppressWarnings("unchecked")
//构造一个nextTable,他的容量是原来的两倍
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
//构造一个ForwardingNode,用于标记当前的bin已经被转移结束了。
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
//标记是否继续往下个节点找的标志
boolean advance = true;
//表示是否迁移结束的标志。
boolean finishing = false;
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
//找到了一个,把advance置空,准备进行迁移操作。
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
//如果节点为空,那么把forwardingNode放进去,标记为已经迁移结束。
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
advance = true;
else {
//把table[i]进行加锁。
//这边还对链表进行遍历,有点对半拆分的感觉
//把链表分表拆分为,hash&n等于0和不等于0的,然后分别放在新表的i和i+n位置
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
//如果是一个红黑树bin
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
//这里会判断红黑树的节点是否小于UNTREEIFY_THRESHOLD,也就是6
//如果小于,那么就是退化为链表
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}

size值的计算

这个是继承自AbstractMap的方法,问题是返回的是int,但是里面可能存在的值可能会大于int

1
2
3
4
5
6
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}

所以在1.8中增加了这个方法,并且从注释上可以看到作者希望我们应该调用这个方法

1
2
3
4
public long mappingCount() {
long n = sumCount();
return (n < 0L) ? 0L : n; // ignore transient negative values
}

他们都调用了sumCount()方法,这个就是把baseCount加上所有的as的值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@sun.misc.Contended static final class CounterCell {
volatile long value;
CounterCell(long x) { value = x; }
}
private transient volatile CounterCell[] counterCells;
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}

那为什么要这么做呢,这个回到上面的addCount方法看看,在进行更新的时候,会尝试使用cas更新baseCount
但是如果更新失败,那么就尝试在counterCells上做文章,这里做的几乎和LongAdder一样,就不赘述了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
//...
}

总结

有很多细节其实挺让我震惊的。

  • Node中如果可能是多线程访问的,都加了volatile进行修饰
  • 锲而不舍的追求高效率的位运算,连乘0.75都换成n - n >>> 2来代替

参考

http://blog.csdn.net/lsgqjh/article/details/54867107