0%

LongAdder解析

1
2
3
4
5
6
7
8
public class LongAdder extends Striped64 implements Serializable {

}

abstract class Striped64 extends Number {
transient volatile Cell[] cells;
transient volatile long base;
}

主要思路就先cas那个base变量,如果race condition不严重的话,那么就成功返回。

但是如果race condition比较严重,那么就新建很多的cell,在cell里增加value,这样就降低了很多的并发阻塞。

最后汇总的时候,除了base,还需要把每个cell中的value也加上去。

1
2
3
4
5
6
7
8
9
10
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}

第一个if,表示如果cells还是为空,并且cas增加到base中失败,那么就进行第二个if中的操作。
第二个if中,如果as为空,或者as未完全初始化

getProbe() & m相当于一次hash找到对应的cell,如果cell为空,
或者cell不为空,但是cell自身的cas加值还是失败,那么最终会调用longAccumulate方法。

longAccumulate方法在Striped64类中,其中我们由add中传入的参数是

  • 要增加的x
  • null
  • false

下面的方法就是对引入的cell就行操作,包括增加,扩容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
transient volatile int cellsBusy;    
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
//得到与线程相关的hashCode。
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as; Cell a; int n; long v;
//如果cell已经不为null,完成初始化。
if ((as = cells) != null && (n = as.length) > 0) {
//找到与当前线程有关的cell,和HashMap有点像
//如果当前cell为空,那么就进行初始化
if ((a = as[(n - 1) & h]) == null) {
//cellsBusy相当于一个锁,用volatile修饰,== 0时表示可以对cells进行操作。
if (cellsBusy == 0) { // Try to attach new Cell
Cell r = new Cell(x); // Optimistically create
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
//检查是否发生了一次cas失败,如果是那就标为true,进行下一次循环
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
//调用cell本身的cas
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
else if (!collide)
collide = true;
//对cells进行扩充
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h);
}
//如果cell还是为空,那么就进行自旋,如果得到锁,那么就进行初始化
//初始化为2个的cell,finally释放自旋锁
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
//最后还是尝试加在base上,如果失败,那就再来一次for循环。
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}

这一段代码的for循环中可以分为三部分。

第一个部分,如果cells已经完成初始化。

  • 如果cell还是空,那么就自旋,把新的cell插到cells中。
  • 如果已经发生了一次cas失败,那么就标为true,以便下一次cas
  • 如果cell不为空,那么调用cell的cas
  • 如果各种方法都试了,那么没办法了,尝试扩充cells

第二个部分,因为cell还未完成初始化,那么就拿到自旋锁进行初始化。

第三个部分,如果已经被别的线程在进行初始化,那么就再次尝试调用加在base上。
如果失败,那么就再来一次for循环。

可以看到每次尝试失败,那么就调用代价更高的方法再进行尝试。
所有方法用尽了,那没办法,再重新试一次吧。

可以看到LongAdder和AtomicLong比起来,都是cas的运用。
AtomicLong如果cas失败,那么就进行while循环进行cas。
但是LongAdder把cas的压力分散到很多个cell中,最后加的时候把每个cell中的value都加上去。
高并发效率上,肯定是LongAdder更高一点,但是LongAdder相应的也更费空间。

另外,在ConcurrentHashMap中就是运用了LongAdder的思想。

参考:
https://coolshell.cn/articles/11454.html