1 2 3 4 5 6 7 8
| public class LongAdder extends Striped64 implements Serializable { } abstract class Striped64 extends Number { transient volatile Cell[] cells; transient volatile long base; }
|
主要思路就先cas那个base变量,如果race condition
不严重的话,那么就成功返回。
但是如果race condition
比较严重,那么就新建很多的cell
,在cell
里增加value
,这样就降低了很多的并发阻塞。
最后汇总的时候,除了base
,还需要把每个cell
中的value
也加上去。
1 2 3 4 5 6 7 8 9 10
| public void add(long x) { Cell[] as; long b, v; int m; Cell a; if ((as = cells) != null || !casBase(b = base, b + x)) { boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 || (a = as[getProbe() & m]) == null || !(uncontended = a.cas(v = a.value, v + x))) longAccumulate(x, null, uncontended); } }
|
第一个if,表示如果cells还是为空,并且cas增加到base中失败,那么就进行第二个if中的操作。
第二个if中,如果as为空,或者as未完全初始化
getProbe() & m相当于一次hash找到对应的cell,如果cell为空,
或者cell不为空,但是cell自身的cas加值还是失败,那么最终会调用longAccumulate方法。
longAccumulate方法在Striped64类中,其中我们由add中传入的参数是
下面的方法就是对引入的cell就行操作,包括增加,扩容
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
| transient volatile int cellsBusy; final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { int h; if ((h = getProbe()) == 0) { ThreadLocalRandom.current(); h = getProbe(); wasUncontended = true; } boolean collide = false; for (;;) { Cell[] as; Cell a; int n; long v; if ((as = cells) != null && (n = as.length) > 0) { if ((a = as[(n - 1) & h]) == null) { if (cellsBusy == 0) { Cell r = new Cell(x); if (cellsBusy == 0 && casCellsBusy()) { boolean created = false; try { Cell[] rs; int m, j; if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r; created = true; } } finally { cellsBusy = 0; } if (created) break; continue; } } collide = false; } else if (!wasUncontended) wasUncontended = true; else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; else if (n >= NCPU || cells != as) collide = false; else if (!collide) collide = true; else if (cellsBusy == 0 && casCellsBusy()) { try { if (cells == as) { Cell[] rs = new Cell[n << 1]; for (int i = 0; i < n; ++i) rs[i] = as[i]; cells = rs; } } finally { cellsBusy = 0; } collide = false; continue; } h = advanceProbe(h); } else if (cellsBusy == 0 && cells == as && casCellsBusy()) { boolean init = false; try { if (cells == as) { Cell[] rs = new Cell[2]; rs[h & 1] = new Cell(x); cells = rs; init = true; } } finally { cellsBusy = 0; } if (init) break; } else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; } }
|
这一段代码的for循环中可以分为三部分。
第一个部分,如果cells已经完成初始化。
- 如果cell还是空,那么就自旋,把新的cell插到cells中。
- 如果已经发生了一次cas失败,那么就标为true,以便下一次cas
- 如果cell不为空,那么调用cell的cas
- 如果各种方法都试了,那么没办法了,尝试扩充cells
第二个部分,因为cell还未完成初始化,那么就拿到自旋锁进行初始化。
第三个部分,如果已经被别的线程在进行初始化,那么就再次尝试调用加在base上。
如果失败,那么就再来一次for循环。
可以看到每次尝试失败,那么就调用代价更高的方法再进行尝试。
所有方法用尽了,那没办法,再重新试一次吧。
可以看到LongAdder和AtomicLong比起来,都是cas的运用。
AtomicLong如果cas失败,那么就进行while循环进行cas。
但是LongAdder把cas的压力分散到很多个cell中,最后加的时候把每个cell中的value都加上去。
高并发效率上,肯定是LongAdder更高一点,但是LongAdder相应的也更费空间。
另外,在ConcurrentHashMap中就是运用了LongAdder的思想。
参考:
https://coolshell.cn/articles/11454.html