首頁技術文章正文

Java培訓:java高性能并發(fā)計數(shù)器之巔峰對決

更新時間:2022-08-25 來源:黑馬程序員 瀏覽量:

  并發(fā)計數(shù)器各個方案介紹

  方案概述

  1. jdk5提供的原子更新長整型類 AtomicLong

  2. synchronized

  3. jdk8提供的 LongAdder 【單機推薦】

  4. Redisson分布式累加器【分布式推薦】

  方案介紹

  jdk5提供的原子更新長整型類 AtomicLong

  在JDK1.5開始就新增了并發(fā)的Integer/Long的操作工具類AtomicInteger和AtomicLong。

  AtomicLong 利用底層操作系統(tǒng)的CAS來保證原子性,在一個死循環(huán)內不斷執(zhí)行CAS操作,直到操作成功。不過,CAS操作的一個問題是在并發(fā)量比較大的時候,可能很多次的執(zhí)行CAS操作都不成功,這樣性能就受到較大影響。

  示例代碼

AtomicLong value = new AtomicLong(0);  //定義
incrementAndGet(); //遞增1
```

  synchronized

  synchronized是一個重量級鎖,主要是因為線程競爭鎖會引起操作系統(tǒng)用戶態(tài)和內核態(tài)切換,浪費資源效率不高,在jdk1.5之前,synchronized沒有做任何優(yōu)化,但在jdk1.6做了性能優(yōu)化,它會經歷偏向鎖,輕量級鎖,最后才到重量級鎖這個過程,在性能方面有了很大的提升,在jdk1.7的ConcurrentHashMap是基于ReentrantLock的實現(xiàn)了鎖,但在jdk1.8之后又替換成了synchronized,就從這一點可以看出JVM團隊對synchronized的性能還是挺有信心的。下面我們分別來介紹下無鎖,偏向鎖,輕量級鎖,重量級鎖。

  jdk8提供的 LongAdder 【單機推薦】

  在JDK8中又新增了LongAdder,這是一個針對Long類型的數(shù)據(jù)的操作工具類。

  那我們知道,在ConcurrentHashMap中,對Map分割成多個segment,這樣多個Segment的操作就可以并行執(zhí)行,從而可以提高性能。在JDK8中,LongAdder與ConcurrentHashMap類似,將內部操作數(shù)據(jù)value分離成一個Cell數(shù)組,每個線程訪問時,通過Hash等算法映射到其中一個Cell上。

  計算最終的數(shù)據(jù)結果,則是各個Cell數(shù)組的累計求和。

  LongAddr常用api方法

add():  //增加指定的數(shù)值;
increament(): //增加1;
decrement(): //減少1;
intValue();  //intValue();/floatValue()/doubleValue():得到最終計數(shù)后的結果
sum()://求和,得到最終計數(shù)結果
sumThenReset()://求和得到最終計數(shù)結果,并重置value。
```

  Redisson分布式累加器【分布式推薦】

  基于Redis的Redisson分布式整長型累加器(LongAdder)采用了與java.util.concurrent.atomic.LongAdder類似的接口。通過利用客戶端內置的LongAdder對象,為分布式環(huán)境下遞增和遞減操作提供了很高得性能。據(jù)統(tǒng)計其性能最高比分布式AtomicLong對象快 10000 倍以上。

RLongAddr itheimaLongAddr = redission.getLongAddr("itheimaLongAddr");
itheimaLongAddr.add(100);  //添加指定數(shù)量
itheimaLongAddr.increment(); //遞增1
itheimaLongAddr.increment(); //遞減1
itheimaLongAddr.sum(); //聚合求和
```

  基于Redis的Redisson分布式雙精度浮點累加器(DoubleAdder)采用了與java.util.concurrent.atomic.DoubleAdder類似的接口。通過利用客戶端內置的DoubleAdder對象,為分布式環(huán)境下遞增和遞減操作提供了很高得性能。據(jù)統(tǒng)計其性能最高比分布式AtomicDouble對象快 12000 倍。

  示例代碼

RLongDouble itheimaDouble = redission.getLongDouble("itheimaLongDouble");
itheimaDouble.add(100);  //添加指定數(shù)量
itheimaDouble.increment(); //遞增1
itheimaDouble.increment(); //遞減1
itheimaDouble.sum(); //聚合求和
```

  以上【整長型累加器】和【雙精度浮點累加器】完美適用于分布式統(tǒng)計計量場景。

  各個方案性能測試

  測試代碼

```
package com.itheima._01性能比較;

import org.junit.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;

/**
 * @author 黑馬程序員
 */
public class CountTest {

    private int count = 0;

    @Test
    public void startCompare() {
        compareDetail(1, 100 * 10000);
        compareDetail(20, 100 * 10000);
        compareDetail(30, 100 * 10000);
        compareDetail(40, 100 * 10000);
        compareDetail(100, 100 * 10000);
    }

    /**
     * @param threadCount 線程數(shù)
     * @param times 每個線程增加的次數(shù)
     */
    public void compareDetail(int threadCount, int times) {
        try {
            System.out.println(String.format("threadCount: %s, times: %s", threadCount, times));
            long start = System.currentTimeMillis();
            testSynchronized(threadCount, times);
            System.out.println("testSynchronized cost: " + (System.currentTimeMillis() - start));

            start = System.currentTimeMillis();
            testAtomicLong(threadCount, times);
            System.out.println("testAtomicLong cost: " + (System.currentTimeMillis() - start));

            start = System.currentTimeMillis();
            testLongAdder(threadCount, times);
            System.out.println("testLongAdder cost: " + (System.currentTimeMillis() - start));
            System.out.println();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void testSynchronized(int threadCount, int times) throws InterruptedException {
        List<Thread> threadList = new ArrayList<>();
        for (int i = 0; i < threadCount; i++) {
            threadList.add(new Thread(()-> {
                for (int j = 0; j < times; j++) {
                    add();
                }
            }));
        }
        for (Thread thread : threadList) {
            thread.start();
        }
        for (Thread thread : threadList) {
            thread.join();
        }
    }

    public synchronized void add() {
        count++;
    }

    public void testAtomicLong(int threadCount, int times) throws InterruptedException {
        AtomicLong count = new AtomicLong();
        List<Thread> threadList = new ArrayList<>();
        for (int i = 0; i < threadCount; i++) {
            threadList.add(new Thread(()-> {
                for (int j = 0; j < times; j++) {
                    count.incrementAndGet();
                }
            }));
        }
        for (Thread thread : threadList) {
            thread.start();
        }
        for (Thread thread : threadList) {
            thread.join();
        }
    }

    public void testLongAdder(int threadCount, int times) throws InterruptedException {
        LongAdder count = new LongAdder();
        List<Thread> threadList = new ArrayList<>();
        for (int i = 0; i < threadCount; i++) {
            threadList.add(new Thread(()-> {
                for (int j = 0; j < times; j++) {
                    count.increment();
                }
            }));
        }
        for (Thread thread : threadList) {
            thread.start();
        }
        for (Thread thread : threadList) {
            thread.join();
        }
    }
}
```

  運行結果

threadCount: 1, times: 1000000
testSynchronized cost: 69
testAtomicLong cost: 16
testLongAdder cost: 15

threadCount: 20, times: 1000000
testSynchronized cost: 639
testAtomicLong cost: 457
testLongAdder cost: 59

threadCount: 30, times: 1000000
testSynchronized cost: 273
testAtomicLong cost: 538
testLongAdder cost: 70

threadCount: 40, times: 1000000
testSynchronized cost: 312
testAtomicLong cost: 717
testLongAdder cost: 81

threadCount: 100, times: 1000000
testSynchronized cost: 719
testAtomicLong cost: 2098
testLongAdder cost: 225
```

  結論

  

1661393077095_1.png

  并發(fā)量比較低的時候AtomicLong優(yōu)勢比較明顯,因為AtomicLong底層是一個樂觀鎖,不用阻塞線程,不斷cas即可。但是在并發(fā)比較高的時候用synchronized比較有優(yōu)勢,因為大量線程不斷cas,會導致cpu持續(xù)飆高,反而會降低效率

  LongAdder無論并發(fā)量高低,優(yōu)勢都比較明顯。且并發(fā)量越高,優(yōu)勢越明顯

  原理分析

  AtomicLong 實現(xiàn)原子操作原理

  非原子操作示例代碼

package com.itheima._02Unsafe測試;

import java.util.ArrayList;
import java.util.List;

/**
 * @author 黑馬程序員
 */
public class Test1 {

    private int value = 0;

    public static void main(String[] args) throws InterruptedException {
        Test1 test1 = new Test1();
        test1.increment();
        System.out.println("期待值:" + 100 * 100 + ",最終結果值:" + test1.value);
        //結果,期待值:10000,最終結果值:xxxx
    }

    private void increment() throws InterruptedException {
        List<Thread> list = new ArrayList<>();
        //啟動100個線程,每個線程對value進行累加100次
        for (int i = 0; i < 100; i++) {
            Thread t = new Thread(() -> {
                for (int j = 0; j < 100; j++) {
                    value++;
                }
            });
            list.add(t);
            t.start();
        }

        //保證所有線程運行完成
        for (Thread thread : list) {
            thread.join();
        }
    }

}
```

  運行效果

1661393298046_2.jpg

  結論

  > 可以發(fā)現(xiàn)輸出的結果值錯誤,這是因為 `value++` 不是一個原子操作,它將 `value++` 拆分成了 3 個步驟 `load、add、store`,多線程并發(fā)有可能上一個線程 add 過后還沒有 store 下一個線程又執(zhí)行了 load 了這種重復造成得到的結果可能比最終值要小。

  AtomicLong是JDK1.5提供的原子操作示例代碼

package com.itheima._03AtomicLong的CAS原子操作示例;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

/**
 * @author 黑馬程序員
 */
public class Test2 {

    private AtomicLong value = new AtomicLong(0);

    public static void main(String[] args) throws InterruptedException {
        Test2 test1 = new Test2();
        test1.increment();
        System.out.println("期待值:" + 100 * 100 + ",最終結果值:" + test1.value);
        //結果,期待值:10000,最終結果值:10000
    }

    private void increment() throws InterruptedException {
        List<Thread> list = new ArrayList<>();
        //啟動100個線程,每個線程對value進行累加100次
        for (int i = 0; i < 100; i++) {
            Thread t = new Thread(() -> {
                for (int j = 0; j < 100; j++) {
                    value.incrementAndGet();
                }
            });
            list.add(t);
            t.start();
        }

        //保證所有線程運行完成
        for (Thread thread : list) {
            thread.join();
        }
    }

}
```

  運行效果

1661393347277_3.jpg

  AtomicLong CAS原理介紹

  1.使用volatile保證內存可見性,獲取主存中最新的操作數(shù)據(jù)

  2.使用CAS(Compare-And-Swap)操作保證數(shù)據(jù)原子性

  CAS算法是jdk對并發(fā)操作共享數(shù)據(jù)的支持,包含了3個操作數(shù)

  第一個操作數(shù):內存值value(V)

  第二個操作數(shù):預估值expect(O)

  第三個操作數(shù):更新值new(N)

  含義:CAS比較交換的過程可以通俗的理解為CAS(V,O,N),包含三個值分別為:V 內存地址(主存)存放的實際值;O 預期的值(舊值);N 更新的新值。當V和O相同時,也就是說舊值和內存中實際的值相同表明該值沒有被其他線程更改過,即該舊值O就是目前來說最新的值了,自然而然可以將新值N賦值給V;當V和O不相同時,會一致循環(huán)下去直至修改成功。

  AtomicLong底層CAS實現(xiàn)原子操作原理

  查看incrementAndGet()方法源碼

public final long incrementAndGet() {
    return unsafe.getAndAddLong(this, valueOffset, 1L) + 1L;
}
```

getAndAddLong方法源碼

```java
public final long getAndAddLong(Object var1, long var2, long var4) {
    long var6;
    do {
        var6 = this.getLongVolatile(var1, var2);
    } while(!this.compareAndSwapLong(var1, var2, var6, var6 + var4));

    return var6;
}
```

  > 這里是一個循環(huán)CAS操作

  compareAndSwapLong方法源碼

public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);
```

  我們發(fā)現(xiàn)調用的是 native 的 `unsafe.compareAndSwapLong(Object obj, long valueOffset, Long expect, Long update)`,我們翻看 Hotspot 源碼發(fā)現(xiàn)在 unsafe.cpp 中定義了這樣一段代碼

  > Unsafe中基本都是調用native方法,那么就需要去JVM里面找對應的實現(xiàn)。

  >

  > 到`http://hg.openjdk.java.net/` 進行一步步選擇下載對應的hotspot版本,我這里下載的是`http://hg.openjdk.java.net/jdk8u/jdk8u60/hotspot/archive/tip.tar.gz`,

  >

  > 然后解hotspot目錄,發(fā)現(xiàn) `\src\share\vm\prims\unsafe.cpp`,這個就是對應jvm相關的c++實現(xiàn)類了。

  >

  > 比如我們對CAS部分的實現(xiàn)很感興趣,就可以在該文件中搜索compareAndSwapInt,此時可以看到對應的JNI方法為`Unsafe_CompareAndSwapInt`

UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapLong(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jlong e, jlong x))
  UnsafeWrapper("Unsafe_CompareAndSwapLong");
  Handle p (THREAD, JNIHandles::resolve(obj));
  jlong* addr = (jlong*)(index_oop_from_field_offset_long(p(), offset));
#ifdef SUPPORTS_NATIVE_CX8
  return (jlong)(Atomic::cmpxchg(x, addr, e)) == e;
#else
  if (VM_Version::supports_cx8())
    return (jlong)(Atomic::cmpxchg(x, addr, e)) == e;
  else {
    jboolean success = false;
    MutexLockerEx mu(UnsafeJlong_lock, Mutex::_no_safepoint_check_flag);
    jlong val = Atomic::load(addr);
    if (val == e) { Atomic::store(x, addr); success = true; }
    return success;
  }
#endif
UNSAFE_END
```

  Atomic::cmpxchg c++源碼

  可以看到調用了“Atomic::cmpxchg”方法,“Atomic::cmpxchg”方法在linux_x86和windows_x86的實現(xiàn)如下。

  linux_x86的實現(xiàn):

1661393503544_4.jpg

inline jint     Atomic::cmpxchg    (jint     exchange_value, volatile jint*     dest, jint     compare_value) {
  int mp = os::is_MP();
  __asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)"
                    : "=a" (exchange_value)
                    : "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
                    : "cc", "memory");
  return exchange_value;
}
```

  windows_x86的實現(xiàn)(c++源文件):

1661393567504_5.jpg

inline jint     Atomic::cmpxchg    (jint     exchange_value, volatile jint*     dest, jint     compare_value) {
  // alternative for InterlockedCompareExchange
  int mp = os::is_MP();
  __asm {
    mov edx, dest
    mov ecx, exchange_value
    mov eax, compare_value
    LOCK_IF_MP(mp)
    cmpxchg dword ptr [edx], ecx
  }
}
```

  Atomic::cmpxchg方法解析:

  mp是“os::is_MP()”的返回結果,“os::is_MP()”是一個內聯(lián)函數(shù),用來判斷當前系統(tǒng)是否為多處理器。

  如果當前系統(tǒng)是多處理器,該函數(shù)返回1。

  否則,返回0。

  LOCK_IF_MP(mp)會根據(jù)mp的值來決定是否為cmpxchg指令添加lock前綴。

  如果通過mp判斷當前系統(tǒng)是多處理器(即mp值為1),則為cmpxchg指令添加lock前綴。

  否則,不加lock前綴。

  這是一種優(yōu)化手段,認為單處理器的環(huán)境沒有必要添加lock前綴,只有在多核情況下才會添加lock前綴,因為lock會導致性能下降。cmpxchg是匯編指令,作用是比較并交換操作數(shù)。

  > 底層會調用cmpxchg匯編指令,如果是多核處理器會加鎖實現(xiàn)原子操作

  反匯編指令查詢

  查看java程序運行的匯編指令資料

1661393650563_6.jpg

  將上圖2個文件拷貝到jre\bin目錄下,如下圖

1661393663582_7.jpg

  配置運行參數(shù)

  ```

  -server -Xcomp -XX:+UnlockDiagnosticVMOptions -XX:+PrintAssembly -XX:CompileCommand=compileonly,*

  ```

1661393691738_8.jpg

1661393707679_9.jpg

1661393719174_10.jpg

  運行Test2效果

1661393737130_11.jpg

  synchronized 實現(xiàn)同步操作原理

  鎖對象

  java中任何一個對象都可以稱為鎖對象,原因在于java對象在內存中存儲結構,如下圖所示:

1661393750947_12.jpg

  在對象頭中主要存儲的主要是一些運行時的數(shù)據(jù),如下所示:

1661393763314_13.jpg

  其中 在Mark Work中存儲著該對象作為鎖時的一些信息,如下所示是Mark Work中在64位系統(tǒng)中詳細信息:

1661393777046_14.jpg

  偏向鎖

  在無競爭環(huán)境中(沒有并發(fā))使用一種鎖

  > 偏向鎖的作用是當有線程訪問同步代碼或方法時,線程只需要判斷對象頭的Mark Word中判斷一下是否有偏向鎖指向線程ID.

  >

  > 偏向鎖記錄過程

  >

  > - 線程搶到了對象的同步鎖(鎖標志為01參考上圖即無其他線程占用)

  > - 對象Mark World 將是否偏向標志位設置為1

  > - 記錄搶到鎖的線程ID

  > - 進入偏向狀態(tài)

  輕量級鎖

  當有另外一個線程競爭獲取這個鎖時,由于該鎖已經是偏向鎖,當發(fā)現(xiàn)對象頭 Mark Word 中的線程 ID 不是自己的線程 ID,就會進行 CAS 操作獲取鎖,**如果獲取成功**,直接替換 Mark Word 中的線程 ID 為自己的 ID,該鎖會保持偏向鎖狀態(tài);**如果獲取鎖失敗**,代表當前鎖有一定的競爭,偏向鎖將升級為輕量級鎖。

  - 舉個例子來說明一下什么時候需要升級偏向鎖

  假設A線程 持有鎖 X(此時X是偏向鎖) 這是有個B線程也同樣用到了鎖X,而B線程在檢查鎖對象的Mark World時發(fā)現(xiàn)偏向鎖的線程ID已經指向了線程A。這時候就需要升級鎖X為輕量級鎖。輕量級鎖意味著標示該資源現(xiàn)在處于競爭狀態(tài)。

  當有其他線程想訪問加了輕量級鎖的資源時,會使用自旋鎖優(yōu)化,來進行資源訪問。

  > 自旋策略

  >

  > JVM 提供了一種自旋鎖,可以通過自旋方式不斷嘗試獲取鎖,從而避免線程被掛起阻塞。這是基于大多數(shù)情況下,線程持有鎖的時間都不會太長,畢竟線程被掛起阻塞可能會得不償失。

  >

  > 從 JDK1.7 開始,自旋鎖默認啟用,自旋次數(shù)由 JVM 設置決定,這里我不建議設置的重試次數(shù)過多,因為 CAS 重試操作意味著長時間地占用 CPU。自旋鎖重試之后如果搶鎖依然失敗,同步鎖就會升級至重量級鎖,鎖標志位改為 10。在這個狀態(tài)下,未搶到鎖的線程都會進入 Monitor,之后會被阻塞在 _WaitSet 隊列中。

  重量級鎖

  自旋失敗,很大概率 再一次自選也是失敗,因此直接升級成重量級鎖,進行線程阻塞,減少cpu消耗。

  當鎖升級為重量級鎖后,未搶到鎖的線程都會被阻塞,進入阻塞隊列。

  重量級鎖在高并發(fā)下性能就會變慢,因為所有沒有獲取鎖的線程會進行阻塞等待,到獲取鎖的時候被喚醒,這些操作都是消耗很多資源。

  輕量級鎖膨脹流程圖

1661393802532_15.jpg

  LongAdder 實現(xiàn)原子操作原理

  LongAdder實現(xiàn)高并發(fā)計數(shù)實現(xiàn)思路

  LongAdder實現(xiàn)高并發(fā)的秘密就是用空間換時間,對一個值的cas操作,變成對多個值的cas操作,當獲取數(shù)量的時候,對這多個值加和即可。

1661393816329_16.jpg

  測試代碼

```
package com.itheima._04LongAddr使用測試;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAccumulator;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.LongBinaryOperator;

/**
 * @author 黑馬程序員
 */
public class Test3 {

    private LongAdder value = new LongAdder(); //默認初始值0

    public static void main(String[] args) throws InterruptedException {
        Test3 test1 = new Test3();
        test1.increment();
        System.out.println("期待值:" + 100 * 100 + ",最終結果值:" + test1.value.sum());
        //結果,期待值:10000,最終結果值:10000
    }

    private void increment() throws InterruptedException {
        List<Thread> list = new ArrayList<>();
        //啟動100個線程,每個線程對value進行累加100次
        for (int i = 0; i < 100; i++) {
            Thread t = new Thread(() -> {
                for (int j = 0; j < 100; j++) {
                    value.increment();
                }
            });
            list.add(t);
            t.start();
        }

        //保證所有線程運行完成
        for (Thread thread : list) {
            thread.join();
        }

    }

}
```

  源碼分析

  1. 先對base變量進行cas操作,cas成功后返回

  2. 對線程獲取一個hash值(調用getProbe),hash值對數(shù)組長度取模,定位到cell數(shù)組中的元素,對數(shù)組中的元素進行cas

  增加數(shù)量源碼

public void increment() {
    add(1L);
}
```

```java
public void add(long x) {
    Cell[] as; long b, v; int m; Cell a;
    if ((as = cells) != null || !casBase(b = base, b + x)) {
        boolean uncontended = true;
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[getProbe() & m]) == null ||
            !(uncontended = a.cas(v = a.value, v + x)))
            longAccumulate(x, null, uncontended);
    }
}
```

  當數(shù)組不為空,并且根據(jù)線程hash值定位到數(shù)組某個下標中的元素不為空,對這個元素cas成功則直接返回,否則進入longAccumulate方法

1661393945975_17.jpg

  1. cell數(shù)組已經初始化完成,主要是在cell數(shù)組中放元素,對cell數(shù)組進行擴容等操作

  2. cell數(shù)組沒有初始化,則對數(shù)組進行初始化

  3. cell數(shù)組正在初始化,這時其他線程利用cas對baseCount進行累加操作

  完整代碼

final void longAccumulate(long x, LongBinaryOperator fn,
                          boolean wasUncontended) {
    int h;
    if ((h = getProbe()) == 0) {
        ThreadLocalRandom.current(); // force initialization
        h = getProbe();
        wasUncontended = true;
    }
    boolean collide = false;                // True if last slot nonempty
    for (;;) {
        Cell[] as; Cell a; int n; long v;
        if ((as = cells) != null && (n = as.length) > 0) {
            if ((a = as[(n - 1) & h]) == null) {
                if (cellsBusy == 0) {       // Try to attach new Cell
                    Cell r = new Cell(x);   // Optimistically create
                    if (cellsBusy == 0 && casCellsBusy()) {
                        boolean created = false;
                        try {               // Recheck under lock
                            Cell[] rs; int m, j;
                            if ((rs = cells) != null &&
                                (m = rs.length) > 0 &&
                                rs[j = (m - 1) & h] == null) {
                                rs[j] = r;
                                created = true;
                            }
                        } finally {
                            cellsBusy = 0;
                        }
                        if (created)
                            break;
                        continue;           // Slot is now non-empty
                    }
                }
                collide = false;
            }
            else if (!wasUncontended)       // CAS already known to fail
                wasUncontended = true;      // Continue after rehash
            else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                         fn.applyAsLong(v, x))))
                break;
            else if (n >= NCPU || cells != as)
                collide = false;            // At max size or stale
            else if (!collide)
                collide = true;
            else if (cellsBusy == 0 && casCellsBusy()) {
                try {
                    if (cells == as) {      // Expand table unless stale
                        Cell[] rs = new Cell[n << 1];
                        for (int i = 0; i < n; ++i)
                            rs[i] = as[i];
                        cells = rs;
                    }
                } finally {
                    cellsBusy = 0;
                }
                collide = false;
                continue;                   // Retry with expanded table
            }
            h = advanceProbe(h);
        }
        else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
            boolean init = false;
            try {                           // Initialize table
                if (cells == as) {
                    Cell[] rs = new Cell[2];
                    rs[h & 1] = new Cell(x);
                    cells = rs;
                    init = true;
                }
            } finally {
                cellsBusy = 0;
            }
            if (init)
                break;
        }
        else if (casBase(v = base, ((fn == null) ? v + x :
                                    fn.applyAsLong(v, x))))
            break;                          // Fall back on using base
    }
}
```

  獲取計算數(shù)量源碼

public long sum() {
    Cell[] as = cells; Cell a;
    long sum = base;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}
```

  需要注意的是,調用sum()返回的數(shù)量有可能并不是當前的數(shù)量,因為在調用sum()方法的過程中,可能有其他數(shù)組對base變量或者cell數(shù)組進行了改動,所以需要確保所有線程運行完再獲取就是準確值

  LongAdder 的前世今生

  其實在 Jdk1.7 時代,LongAdder 還未誕生時,就有一些人想著自己去實現(xiàn)一個高性能的計數(shù)器了,比如一款 Java 性能監(jiān)控框架 dropwizard/metrics 就做了這樣事,在早期版本中,其優(yōu)化手段并沒有 Jdk1.8 的 LongAdder 豐富,而在 metrics 的最新版本中,其已經使用 Jdk1.8 的 LongAdder 替換掉了自己的輪子。在最后的測評中,我們將 metrics 版本的 LongAdder 也作為一個參考對象。

  應用場景

  AtomicLong等原子類的使用

  并發(fā)少競爭少(讀多寫少)的計數(shù)原子操作

  LongAdder 的使用

  高性能計數(shù)器的首選方案, 單體項目建議使用LongAddr,分布式環(huán)境建議使用Redisson分布式累加器

  應用場景功能:獲取全局自增id值

  Synchronized與Lock的使用比較

  Synchronized 適合少量的同步并發(fā)競爭

  Lock 適合大量的同步并發(fā)競爭

1661394039753_18.jpg

  總結

  并發(fā)情況優(yōu)化鎖思路:

  互斥鎖 -> 樂觀鎖 -> 鎖的粒度控制

  在Java中對應的實現(xiàn)方式:

  ReentrantLock或者Syschronized -> CAS + Volatile -> 拆分競爭點(longAddr,分布式累加器,ConcurrentHashMap等)

  ReentrantLock或者Syschronized 在高并發(fā)時都存在獲取鎖等待、阻塞、喚醒等操作,所以在使用的使用注意拆分競爭點。

  AtomicLong

  1. 并發(fā)量非常高,可能導致都在不停的爭搶該值,可能導致很多線程一致處于循環(huán)狀態(tài)而無法更新數(shù)據(jù),從而導致 CPU 資源的消耗過高。解決這個問題需要使用LongAdder

  2. ABA 問題,比如說上一個線程增加了某個值,又改變了某個值,然后后面的線程以為數(shù)據(jù)沒有發(fā)生過變化,其實已經被改動了。解決這個問題請參考《擴展:原子更新字段類-ABA問題解決》

  synchronized

  synchronized鎖升級實際上是把本來的悲觀鎖變成了 在一定條件下 使用無所(同樣線程獲取相同資源的偏向鎖),以及使用樂觀(自旋鎖 cas)和一定條件下悲觀(重量級鎖)的形式。

  偏向鎖:適用于單線程適用鎖的情況

  輕量級鎖:適用于競爭較不激烈的情況(這和樂觀鎖的使用范圍類似)

  重量級鎖:適用于競爭激烈的情況

  LongAdder

  - AtomicLong :并發(fā)場景下讀性能優(yōu)秀,寫性能急劇下降,不適合作為高性能的計數(shù)器方案。內需求量少。

  - LongAdder :并發(fā)場景下寫性能優(yōu)秀,讀性能由于組合求值的原因,不如直接讀值的方案,但由于計數(shù)器場景寫多讀少的緣故,整體性能在幾個方案中最優(yōu),是高性能計數(shù)器的首選方案。由于 Cells 數(shù)組以及緩存行填充的緣故,占用內存較大。

  最佳方案

  高性能計數(shù)器的首選方案, 單體項目建議使用LongAddr,分布式環(huán)境建議使用Redisson分布式累加器

  應用場景功能:獲取全局自增id值

分享到:
在線咨詢 我要報名
和我們在線交談!