并发计数类LongAdder怎么用

40次阅读
没有评论

共计 6365 个字符,预计需要花费 16 分钟才能阅读完成。

本篇内容介绍了“并发计数类 LongAdder 怎么用”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让丸趣 TV 小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

AtomicLong 是通过 CAS(即 Compare And Swap) 原理来完成原子递增递减操作,在并发情况下不会出现线程不安全结果。AtomicLong 中的 value 是使用 volatile 修饰,并发下各个线程对 value 最新值均可见。我们以 incrementAndGet() 方法来深入。

 
public final long incrementAndGet() {

        return unsafe.getAndAddLong(this, valueOffset, 1L) + 1L;

    }

这里是调用了 unsafe 的方法

    public final long getAndAddLong(Object var1, long var2, long var4) {

        long var6;

        do {

            var6 = this.getLongVolatile(var1, var2);

        } while(!this.compareAndSwapLong(var1, var2, var6, var6 + var4));

        return var6;

    }

方法中 this.compareAndSwapLong() 有 4 个参数,var1 是需要修改的类对象,var2 是需要修改的字段的内存地址,var6 是修改前字段的值,var6+var4 是修改后字段的值。compareAndSwapLong 只有该字段实际值和 var6 值相当的时候,才可以成功设置其为 var6+var4。

再继续往深一层去看

public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);

这里 Unsafe.compareAndSwapLong 是 native 方法,底层通过 JNI(Java Native Interface) 来完成调用,实际就是借助 C 来调用 CPU 指令来完成。

实现中使用了 do-while 循环,如果 CAS 失败,则会继续重试,直到成功为止。并发特别高的时候,虽然这里可能会有很多次循环,但是还是可以保证线程安全的。不过如果自旋 CAS 操作长时间不成功,竞争较大,会带 CPU 带来极大的开销,占用更多的执行资源,可能会影响其他主业务的计算等。

LongAdder 怎么优化 AtomicLong

Doug Lea 在 jdk1.5 的时候就针对 HashMap 进行了线程安全和并发性能的优化,推出了分段锁实现的 ConcurrentHashMap。一般 Java 面试,基本上离不开 ConcurrentHashMap 这个网红问题。另外在 ForkJoinPool 中,Doug Lea 在其工作窃取算法上对 WorkQueue 使用了细粒度锁来较少并发的竞争, 更多细节可参考我的原创文章 ForkJoin 使用和原理剖析。如果已经对 ConcurrentHashMap 有了较为深刻的理解,那么现在来看 LongAdder 的实现就会相对简单了。

来看 LongAdder 的 increase() 方法实现,

    public void add(long x) {

        Cell[] as; long b, v; int m; Cell a;

        // 第一个 if 进行了两个判断,(1) 如果 cells 不为空,则直接进入第二个 if 语句中。(2) 同样会先使用 cas 指令来尝试 add,如果成功则直接返回。如果失败则说明存在竞争,需要重新 add

        if ((as = cells) != null || !casBase(b = base, b + x)) {

            boolean uncontended = true;

            if (as == null || (m = as.length – 1) 0 ||

                (a = as[getProbe() m]) == null ||

                !(uncontended = a.cas(v = a.value, v + x)))

                longAccumulate(x, null, uncontended);

        }

    }

这里用到了 Cell 类对象,Cell 对象是 LongAdder 高并发实现的关键。在 casBase 冲突严重的时候,就会去创建 Cell 对象并添加到 cells 中,下面会详细分析。

    @sun.misc.Contended static final class Cell {

        volatile long value;

        Cell(long x) {value = x;}

        // 提供 CAS 方法修改当前 Cell 对象上的 value

        final boolean cas(long cmp, long val) {

            return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);

        }

        // Unsafe mechanics

        private static final sun.misc.Unsafe UNSAFE;

        private static final long valueOffset;

        static {

            try {

                UNSAFE = sun.misc.Unsafe.getUnsafe();

                Class ? ak = Cell.class;

                valueOffset = UNSAFE.objectFieldOffset

                    (ak.getDeclaredField( value));

            } catch (Exception e) {

                throw new Error(e);

            }

        }

    }

而这一句 a = as[getProbe() m] 其实就是通过 getProbe() 拿到当前 Thread 的 threadLocalRandomProbe 的 probe Hash 值。这个值其实是一个随机值,这个随机值由当前线程 ThreadLocalRandom.current() 产生。不用 Rondom 的原因是因为这里已经是高并发了,多线程情况下 Rondom 会极大可能得到同一个随机值。因此这里使用 threadLocalRandomProbe 在高并发时会更加随机,减少冲突。更多 ThreadLocalRandom 信息想要深入了解可关注这篇文章并发包中 ThreadLocalRandom 类原理浅尝。拿到 as 数组中当前线程的 Cell 对象,然后再进行 CAS 的更新操作, 我们在源码上进行分析。longAccumulate() 是在父类 Striped64.java 中。

    final void longAccumulate(long x, LongBinaryOperator fn,

                              boolean wasUncontended) {

        int h;

        if ((h = getProbe()) == 0) {

       
  // 如果当前线程的随机数为 0,则初始化随机数

            ThreadLocalRandom.current(); // force initialization

            h = getProbe();

            wasUncontended = true;

        }

        boolean collide = false;                // True if last slot nonempty

        for (;;) {

            Cell[] as; Cell a; int n; long v;

            // 如果当前 cells 数组不为空

            if ((as = cells) != null (n = as.length) 0) {

           
// 如果线程随机数对应的 cells 对应数组下标的 Cell 元素不为空,

                if ((a = as[(n – 1) h]) == null) {

               
// 当使用到 LongAdder 的 Cell 数组相关的操作时,需要先获取全局的 cellsBusy 的锁,才可以进行相关操作。如果当前有其他线程的使用,则放弃这一步,继续 for 循环重试。

                    if (cellsBusy == 0) {       // Try to attach new Cell

                   
//Cell 的初始值是 x,创建完毕则说明已经加上

                        Cell r = new Cell(x);   // Optimistically create

                        //casCellsBusy 获取锁,cellsBusy 通过 CAS 方式获取锁,当成功设置 cellsBusy 为 1 时,则获取到锁。

                        if (cellsBusy == 0 casCellsBusy()) {

                            boolean created = false;

                            try {               // Recheck under lock

                                Cell[] rs; int m, j;

                                if ((rs = cells) != null

                                    (m = rs.length) 0

                                    rs[j = (m – 1) h] == null) {

                                    rs[j] = r;

                                    created = true;

                                }

                            } finally {

                           
  //finally 里面释放锁

                                cellsBusy = 0;

                            }

                            if (created)

                                break;

                            continue;           // Slot is now non-empty

                        }

                    }

                    collide = false;

                }

                else if (!wasUncontended)       // CAS already known to fail

                    wasUncontended = true;      // Continue after rehash

                // 如果 a 不为空,则对 a 进行 cas 增 x 操作,成功则返回    

                else if (a.cas(v = a.value, ((fn == null) ? v + x :

                                             fn.applyAsLong(v, x))))

                    break;

                //cells 的长度 n 已经大于 CPU 数量,则继续扩容没有意义,因此直接标记为不冲突

                else if (n = NCPU || cells != as)

                    collide = false;            // At max size or stale

                else if (!collide)

                    collide = true;

                // 到这一步则说明 a 不为空但是 a 上进行 CAS 操作也有多个线程在竞争,因此需要扩容 cells 数组,其长度为原长度的 2 倍

                else if (cellsBusy == 0 casCellsBusy()) {

                    try {

                        if (cells == as) {      // Expand table unless stale

                            Cell[] rs = new Cell[n 1];

                            for (int i = 0; i ++i)

                                rs[i] = as[i];

                            cells = rs;

                        }

                    } finally {

                        cellsBusy = 0;

                    }

                    collide = false;

                    continue;                   // Retry with expanded table

                }

                // 继续使用新的随机数,避免在同一个 Cell 上竞争

                h = advanceProbe(h);

            }

            // 如果 cells 为空,则需要先创建 Cell 数组。初始长度为 2.(个人理解这个 if 放在前面会比较好一点,哈哈)

            else if (cellsBusy == 0 cells == as casCellsBusy()) {

                boolean init = false;

                try {                           // Initialize table

                    if (cells == as) {

                        Cell[] rs = new Cell[2];

                        rs[h 1] = new Cell(x);

                        cells = rs;

                        init = true;

                    }

                } finally {

                    cellsBusy = 0;

                }

                if (init)

                    break;

            }

            // 如果在 a 上竞争失败,且扩容竞争也失败了,则在 casBase 上尝试增加数量

            else if (casBase(v = base, ((fn == null) ? v + x :

                                        fn.applyAsLong(v, x))))

                break;                          // Fall back on using base

        }

    }

最后是求 LongAdder 的总数,这一步就非常简单了,把 base 的值和所有 cells 上的 value 值加起来就是总数了。

    public long sum() {

        Cell[] as = cells; Cell a;

        long sum = base;

        if (as != null) {

            for (int i = 0; i as.length; ++i) {

                if ((a = as[i]) != null)

                    sum += a.value;

            }

        }

        return sum;

    }

思考

源码中 Cell 数组的会控制在不超过 NCPU 的两倍,原因是 LongAdder 其实在底层是依赖 CPU 的 CAS 指令来操作,如果多出太多,即使在代码层面没有竞争,在底层 CPU 的竞争会更多,所以这里会有一个数量的限制。所以在 LongAdder 的设计中,由于使用到 CAS 指令操作,瓶颈在于 CPU 上。

YY 一下,那么有没有方式可以突破这个瓶颈呢?我个人觉得是有的,但是有前提条件,应用场景极其有限。基于 ThreadLocal 的设计,假设统计只在一个固定的线程池中进行,假设线程池中的线程不会销毁 (异常补充线程的就暂时不管了),则可以认为线程数量是固定且不变的,那么统计则可以依赖于只在当前线程中进行,那么即使是高并发,就转化为 ThreadLocal 这种单线程操作了,完全可以摆脱 CAS 的 CPU 指令操作的限制,那么性能将极大提升。

“并发计数类 LongAdder 怎么用”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注丸趣 TV 网站,丸趣 TV 小编将为大家输出更多高质量的实用文章!

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-04发表,共计6365字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)