自己做頭像的網站非流光全網推廣平臺
一、問題提出
1.1 需求描述
? ? ? ? 有如下的需求,需要保證?account.withdraw()?取款方法的線程安全,代碼如下:
interface Account {// 獲取余額Integer getBalance();// 取款void withdraw(Integer amount);/*** 方法內會啟動 1000 個線程,每個線程做 -10 元 的操作* 如果初始余額為 10000 那么正確的結果應當是 0*/static void demo(Account account) {List<Thread> ts = new ArrayList<>();long start = System.nanoTime();for (int i = 0; i < 1000; i++) {ts.add(new Thread(() -> {account.withdraw(10);}));}ts.forEach(Thread::start);ts.forEach(t -> {try {t.join();} catch (InterruptedException e) {e.printStackTrace();}});long end = System.nanoTime();System.out.println(account.getBalance()+ " cost: " + (end - start) / 1000_000 + " ms");}
}
class AccountUnsafe implements Account {private Integer balance;public AccountUnsafe(Integer balance) {this.balance = balance;}@Overridepublic Integer getBalance() {return balance;}@Overridepublic void withdraw(Integer amount) {balance -= amount;}public static void main(String[] args) {Account.demo(new AccountUnsafe(10000));}
}
????????原有的實現(xiàn)并不是線程安全的,執(zhí)行結果如下所示:?
1.2 問題分析
????????為什么會出現(xiàn)線程安全問題?是因為在多線程的環(huán)境下取款的 withdraw() 方法里面是臨界區(qū),存在指令交錯的行為。
1.3 加鎖解決
????????首先想到的解決方式就是給 Account 對象加鎖,如下代碼:
class AccountUnsafe implements Account {private Integer balance;public AccountUnsafe(Integer balance) {this.balance = balance;}@Overridepublic Integer getBalance() {synchronized (this){return balance;}}@Overridepublic void withdraw(Integer amount) {synchronized (this){balance -= amount;}}public static void main(String[] args) {Account.demo(new AccountUnsafe(10000));}
}
?????????運行結果如下,沒有任何問題。
1.4 無鎖解決
? ? ? ? 也可以通過無鎖的方式解決上述的問題,如下代碼:
public class AccountSafe implements Account{private AtomicInteger balance;public AccountSafe(Integer balance) {this.balance = new AtomicInteger(balance);}@Overridepublic Integer getBalance() {return balance.get();}@Overridepublic void withdraw(Integer amount) {while(true){int prev = balance.get();int next = prev - amount;if(balance.compareAndSet(prev,next)){break;}}}public static void main(String[] args) {Account.demo(new AccountSafe(10000));}
}
?????????運行結果如下,沒有任何問題。
二、CAS 與 volatile
2.1 CAS
? ? ? ? 在上一小結看到的使用 AtomicInteger 的解決方法,內部并沒有用鎖來保護共享變量的線程安全。那么它是如何實現(xiàn)的呢?
@Overridepublic void withdraw(Integer amount) {// 需要不斷嘗試,直到成功為止while(true){// 比如拿到了舊值 1000int prev = balance.get();// 在這個基礎上 1000-10 = 990int next = prev - amount;/** compareAndSet 會做一個檢查,在 set 值之前先比較 prev 和當前值* 若 prev 值和當前值一致,則用 next 設置為新值,并返回 true 表示成功。* 若 prev 值和當前值不一致,則 next 作廢,返回 false 表示失敗,進入 while 下次循環(huán)重試* */if(balance.compareAndSet(prev,next)){break;}}}
? ? ? ? 這里面最關鍵的就是 compareAndSet()?方法,它的簡稱就是 CAS(也有 compare and swap 的說法),此方法是一個原子操作。
? ? ? ? 其實 CAS 的底層是 lock cmpxchg 指令,在單核 CPU 和多核 CPU 下都能夠保證原子性。
2.2 volatile
????????獲取共享變量時,為了保證該變量的可見性,需要使用 volatile 修飾。
????????它可以用來修飾成員變量和靜態(tài)成員變量,他可以避免線程從自己的工作緩存中查找變量的值,必須到主存中獲取它的值,線程操作 volatile 變量都是直接操作主存。即一個線程對 volatile 變量的修改,對另一個線程可見。
????????volatile 僅僅保證了共享變量的可見性,讓其它線程能夠看到最新值,但不能解決指令交錯問題(不能保證原子性)
????????CAS 必須借助 volatile 才能讀取到共享變量的最新值來實現(xiàn)【比較并交換】的效果。
2.3 為什么無鎖效率高
????????無鎖情況下,即使重試失敗,線程始終在高速運行,沒有停歇,而 synchronized 會讓線程在沒有獲得鎖的時候,發(fā)生上下文切換,進入阻塞。
????????但無鎖情況下,因為線程要保持運行,需要額外 CPU 的支持,CPU 在這里就好比高速跑道,沒有額外的跑道,線程想高速運行也無從談起,雖然不會進入阻塞,但由于沒有分到時間片,仍然會進入可運行狀態(tài),還是會導致上下文切換。
2.4 CAS 的特點
? ? ? ? 將?CAS 和 volatile 結合使用可以實現(xiàn)無鎖并發(fā),適用于線程數(shù)少、多核 CPU 的場景下。
????????CAS 是基于樂觀鎖的思想:最樂觀的估計,不怕別的線程來修改共享變量,就算改了也沒關系,我吃虧點再重試。
????????synchronized 是基于悲觀鎖的思想:最悲觀的估計,得防著其它線程來修改共享變量,我上了鎖你們都別想改,我改完了解開鎖,你們才有機會?
????????CAS 體現(xiàn)的是無鎖并發(fā)、無阻塞并發(fā)。因為沒有使用 synchronized,所以線程不會陷入阻塞,這是效率提升的因素之一;但如果競爭激烈,可以想到重試必然頻繁發(fā)生,反而效率會受影響 。
三、原子整數(shù)
? ? ? ? 在 JUC 并發(fā)包下提供了一些原子整數(shù)的工具類,如:AtomicBoolean、AtomicInteger 和 AtomicLong,這幾個類的用法類似,下面就以 AtomicInteger 為例,介紹下常用的方法。
public class AtomicTest {public static void main(String[] args) {// 無參構造的初始值為 0,有參構造的初始值需要自己指定AtomicInteger i = new AtomicInteger(0);// 獲取并自增,返回 0,類似于 i++System.out.println(i.getAndIncrement());// 自增并獲取(i = 1, 結果 i = 2, 返回 2),類似于 ++iSystem.out.println(i.incrementAndGet());// 自減并獲取(i = 2, 結果 i = 1, 返回 1),類似于 --iSystem.out.println(i.decrementAndGet());// 獲取并自減(i = 1, 結果 i = 0, 返回 1),類似于 i--System.out.println(i.getAndDecrement());// 獲取并加值(i = 0, 結果 i = 5, 返回 0)System.out.println(i.getAndAdd(5));// 加值并獲取(i = 5, 結果 i = 0, 返回 0)System.out.println(i.addAndGet(-5));// 獲取并更新(i = 0, p 為 i 的當前值, 結果 i = -2, 返回 0)// 其中函數(shù)中的操作能保證原子,但函數(shù)需要無副作用System.out.println(i.getAndUpdate(p -> p - 2));// 更新并獲取(i = -2, p 為 i 的當前值, 結果 i = 0, 返回 0)// 其中函數(shù)中的操作能保證原子,但函數(shù)需要無副作用System.out.println(i.updateAndGet(p -> p + 2));// 獲取并計算(i = 0, p 為 i 的當前值, x 為參數(shù)1, 結果 i = 10, 返回 0)// 其中函數(shù)中的操作能保證原子,但函數(shù)需要無副作用// getAndUpdate 如果在 lambda 中引用了外部的局部變量,要保證該局部變量是 final 的// getAndAccumulate 可以通過 參數(shù)1 來引用外部的局部變量,但因為其不在 lambda 中因此不必是 finalSystem.out.println(i.getAndAccumulate(10, (p, x) -> p + x));// 計算并獲取(i = 10, p 為 i 的當前值, x 為參數(shù)1, 結果 i = 0, 返回 0)// 其中函數(shù)中的操作能保證原子,但函數(shù)需要無副作用System.out.println(i.accumulateAndGet(-10, (p, x) -> p + x));}
}
四、原子引用
? ? ? ? 為什么需要原子引用,因為共享的數(shù)據(jù)并不一定都是基本數(shù)據(jù)類型的,還有可能是小數(shù)類型,那么我們就可以使用原子引用來保證其中的共享變量操作時的線程安全。
? ? ? ? 原子引用分為如下幾種:AtomicReference、AtomicMarkableReference、AtomicStampedReference
4.1?AtomicReference
? ? ? ? 以最開始的例子為例,假設此時的賬戶余額是 BigDecimal 類型,我們就需要使用?AtomicReferenceAtomicReference,如下代碼:
interface Account {// 獲取余額BigDecimal getBalance();// 取款void withdraw(BigDecimal amount);/*** 方法內會啟動 1000 個線程,每個線程做 -10 元 的操作* 如果初始余額為 10000 那么正確的結果應當是 0*/static void demo(Account account) {List<Thread> ts = new ArrayList<>();long start = System.nanoTime();for (int i = 0; i < 1000; i++) {ts.add(new Thread(() -> {account.withdraw(BigDecimal.TEN);}));}ts.forEach(Thread::start);ts.forEach(t -> {try {t.join();} catch (InterruptedException e) {e.printStackTrace();}});long end = System.nanoTime();System.out.println(account.getBalance()+ " cost: " + (end - start) / 1000_000 + " ms");}
}
public class AccountSafe implements Account{private AtomicReference<BigDecimal> balance;public AccountSafe(BigDecimal balance) {this.balance = new AtomicReference(balance);}@Overridepublic BigDecimal getBalance() {return balance.get();}@Overridepublic void withdraw(BigDecimal amount) {// 需要不斷嘗試,直到成功為止while(true){BigDecimal prev = balance.get();// 調用 subtract 相當于減的操作BigDecimal next = prev.subtract(amount);if(balance.compareAndSet(prev,next)){break;}}}public static void main(String[] args) {Account.demo(new AccountSafe(new BigDecimal("10000")));}
}
? ? ? ? 測試結果如下,沒有任何問題。
4.2 ABA 問題
? ? ? ? 主線程僅能判斷出共享變量的值與最初值 A 是否相同,不能感知到這種從 A 改為 B 又 改回 A 的情況,這就是我們所說的 ABA 問題,如下代碼:
@Slf4j(topic = "c.test")
public class Main8 {static AtomicReference<String> ref = new AtomicReference<>("A");public static void main(String[] args) throws InterruptedException {log.debug("main start...");// 獲取值 A// 這個共享變量被它線程修改過?String prev = ref.get();other();Thread.sleep(1000);// 嘗試改為 Clog.debug("change A->C {}", ref.compareAndSet(prev, "C"));}private static void other() throws InterruptedException {new Thread(() -> {log.debug("change A->B {}", ref.compareAndSet(ref.get(), "B"));}, "t1").start();Thread.sleep(500);new Thread(() -> {log.debug("change B->A {}", ref.compareAndSet(ref.get(), "A"));}, "t2").start();}
}
? ? ? ? 輸出結果如下:
4.3?AtomicStampedReference
????????如果主線程希望只要有其它線程動過了共享變量,那么自己的 cas 就算失敗,這時,僅比較值是不夠的,需要再加一個版本號 AtomicStampedReference。
????????AtomicStampedReference 可以給原子引用加上版本號,追蹤原子引用整個的變化過程,如: A -> B -> A ->C ,通過 AtomicStampedReference,我們可以知道,引用變量中途被更改了幾次。如下代碼:
@Slf4j(topic = "c.test")
public class Main9 {// 不光給變量一個初始值,還給一個初始的版本號static AtomicStampedReference<String> ref = new AtomicStampedReference<>("A", 0);public static void main(String[] args) throws InterruptedException {log.debug("main start...");// 獲取值 AString prev = ref.getReference();// 獲取版本號int stamp = ref.getStamp();log.debug("版本 {}", stamp);// 如果中間有其它線程干擾,發(fā)生了 ABA 現(xiàn)象other();Thread.sleep(1000);// 嘗試改為 C,此時 compareAndSet 方法需要四個參數(shù),當前值,期望值,當前版本號,期望版本號log.debug("change A->C {}", ref.compareAndSet(prev, "C", stamp, stamp + 1));}private static void other() throws InterruptedException {new Thread(() -> {log.debug("change A->B {}", ref.compareAndSet(ref.getReference(), "B",ref.getStamp(), ref.getStamp() + 1));log.debug("更新版本為 {}", ref.getStamp());}, "t1").start();Thread.sleep(500);new Thread(() -> {log.debug("change B->A {}", ref.compareAndSet(ref.getReference(), "A",ref.getStamp(), ref.getStamp() + 1));log.debug("更新版本為 {}", ref.getStamp());}, "t2").start();}
}
? ? ? ? 可以看到,解決了 ABA 問題,更新并沒有成功。?
4.4?AtomicMarkableReference
????????但是有時候,并不關心引用變量更改了幾次,只是單純的關心是否更改過,用一個布爾值就可以搞定,所以就有了 AtomicMarkableReference。
? ? ? ? 如下案例,主人要檢查垃圾袋滿沒滿,是否需要倒垃圾,如果滿了則更換新的垃圾袋;如果還空著呢,就用原有的垃圾袋。此時還有另外一個線程保潔阿姨,她負責倒空垃圾袋里面的垃圾,但是她還是用原來的垃圾袋,如果此時主人檢查垃圾袋是空的就不用再去更換垃圾袋了。
? ? ? ? 代碼如下所示:
@Slf4j(topic = "c.test")
public class Main9 {public static void main(String[] args) throws InterruptedException {GarbageBag bag = new GarbageBag("裝滿了垃圾");// 參數(shù)2 mark 可以看作一個標記,表示垃圾袋滿了AtomicMarkableReference<GarbageBag> ref = new AtomicMarkableReference<>(bag, true);log.debug("主線程 start...");GarbageBag prev = ref.getReference();log.debug(prev.toString());new Thread(() -> {log.debug("打掃衛(wèi)生的線程 start...");bag.setDesc("空垃圾袋");while (!ref.compareAndSet(bag, bag, true, false)) {}log.debug(bag.toString());}).start();Thread.sleep(1000);log.debug("主線程想換一只新垃圾袋?");boolean success = ref.compareAndSet(prev, new GarbageBag("空垃圾袋"), true, false);log.debug("換了么?" + success);log.debug(ref.getReference().toString());}
}class GarbageBag {String desc;public GarbageBag(String desc) {this.desc = desc;}public void setDesc(String desc) {this.desc = desc;}@Overridepublic String toString() {return super.toString() + " " + desc;}
}
? ? ? ? ?輸出結果如下所示:
五、原子數(shù)組
? ? ? ? 原子數(shù)據(jù)保護的是數(shù)組里面的元素,常用的原子數(shù)組類是 AtomicIntegerArray、AtomicLongArray 和 AtomicReferenceArray,測試類的代碼如下所示:
@Slf4j(topic = "c.test")
public class Main9 {public static void main(String[] args) throws InterruptedException {// 不安全的數(shù)組demo(() -> new int[10],(array) -> array.length,(array, index) -> array[index]++,array -> System.out.println(Arrays.toString(array)));// 安全的數(shù)組demo(() -> new AtomicIntegerArray(10),(array) -> array.length(),(array, index) -> array.getAndIncrement(index),array -> System.out.println(array));}/*** 參數(shù)1,提供數(shù)組、可以是線程不安全數(shù)組或線程安全數(shù)組* 參數(shù)2,獲取數(shù)組長度的方法* 參數(shù)3,自增方法,回傳 array, index* 參數(shù)4,打印數(shù)組的方法*/// supplier 提供者 無中生有 ()->結果// function 函數(shù) 一個參數(shù)一個結果 (參數(shù))->結果 , BiFunction (參數(shù)1,參數(shù)2)->結果// consumer 消費者 一個參數(shù)沒結果 (參數(shù))->void, BiConsumer (參數(shù)1,參數(shù)2)->private static <T> void demo(Supplier<T> arraySupplier,Function<T, Integer> lengthFun,BiConsumer<T, Integer> putConsumer,Consumer<T> printConsumer) {List<Thread> ts = new ArrayList<>();T array = arraySupplier.get();int length = lengthFun.apply(array);for (int i = 0; i < length; i++) {// 每個線程對數(shù)組作 10000 次操作ts.add(new Thread(() -> {for (int j = 0; j < 10000; j++) {putConsumer.accept(array, j % length);}}));}ts.forEach(t -> t.start()); // 啟動所有線程ts.forEach(t -> {try {t.join();} catch (InterruptedException e) {e.printStackTrace();}}); // 等所有線程結束printConsumer.accept(array);}
}
? ? ? ? 輸出結果如下所示:
六、字段更新器
? ? ? ? 字段更新器保護的是對象里面的某個屬性,即對這個屬性進行原子操作,但是需要配合 volatile 修飾的字段使用,否則會出現(xiàn)異常,如下代碼:
public class Test5 {private volatile int field;public static void main(String[] args) {AtomicIntegerFieldUpdater fieldUpdater =AtomicIntegerFieldUpdater.newUpdater(Test5.class, "field");Test5 test5 = new Test5();fieldUpdater.compareAndSet(test5, 0, 10);// 修改成功 field = 10System.out.println(test5.field);// 修改成功 field = 20fieldUpdater.compareAndSet(test5, 10, 20);System.out.println(test5.field);// 修改失敗 field = 20fieldUpdater.compareAndSet(test5, 10, 30);System.out.println(test5.field);}
}
? ? ? ? ?輸出結果如下:
七、原子累加器
? ? ? ? 累加器顧名思義,就是對一個整數(shù)進行累加的操作,在 jdk8 以后,新增了幾個專門用于累加操作的工具類,比如:LongAdder、LongAccumulator 等,他們的性能要比我們的 AtomicLong 性能要高很多。
????????性能提升的原因很簡單,就是在有競爭時,設置多個累加單元,Therad-0 累加 Cell[0],而 Thread-1 累加 Cell[1]... 最后將結果匯總。這樣它們在累加時操作的不同的 Cell 變量,因此減少了 CAS 重試失敗,從而提高性能。如下測試代碼:
public class Test5 {public static void main(String[] args) {for (int i = 0; i < 5; i++) {demo(() -> new LongAdder(), adder -> adder.increment());}for (int i = 0; i < 5; i++) {demo(() -> new AtomicLong(), adder -> adder.getAndIncrement());}}private static <T> void demo(Supplier<T> adderSupplier, Consumer<T> action) {T adder = adderSupplier.get();long start = System.nanoTime();List<Thread> ts = new ArrayList<>();// 4 個線程,每人累加 50 萬for (int i = 0; i < 40; i++) {ts.add(new Thread(() -> {for (int j = 0; j < 500000; j++) {action.accept(adder);}}));}ts.forEach(t -> t.start());ts.forEach(t -> {try {t.join();} catch (InterruptedException e) {e.printStackTrace();}});long end = System.nanoTime();System.out.println(adder + " cost:" + (end - start) / 1000_000);}
}
? ? ? ? 輸出結果如下所示:可以看到,相差的時間還是蠻大的: