重慶分公司,新征程啟航
為企業(yè)提供網(wǎng)站建設(shè)、域名注冊、服務(wù)器等服務(wù)
為企業(yè)提供網(wǎng)站建設(shè)、域名注冊、服務(wù)器等服務(wù)
(1)CyclicBarrier是什么?
(2)CyclicBarrier具有什么特性?
(3)CyclicBarrier與CountDownLatch的對比?
CyclicBarrier,回環(huán)柵欄,它會阻塞一組線程直到這些線程同時達到某個條件才繼續(xù)執(zhí)行。它與CountDownLatch很類似,但又不同,CountDownLatch需要調(diào)用countDown()方法觸發(fā)事件,而CyclicBarrier不需要,它就像一個柵欄一樣,當一組線程都到達了柵欄處才繼續(xù)往下走。
public class CyclicBarrierTest {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
for (int i = 0; i < 3; i++) {
new Thread(()->{
System.out.println("before");
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("after");
}).start();
}
}
}
這段方法很簡單,使用一個CyclicBarrier使得三個線程保持同步,當三個線程同時到達cyclicBarrier.await();
處大家再一起往下運行。
private static class Generation {
boolean broken = false;
}
Generation,中文翻譯為代,一代人的代,用于控制CyclicBarrier的循環(huán)使用。
比如,上面示例中的三個線程完成后進入下一代,繼續(xù)等待三個線程達到柵欄處再一起執(zhí)行,而CountDownLatch則做不到這一點,CountDownLatch是一次性的,無法重置其次數(shù)。
// 重入鎖
private final ReentrantLock lock = new ReentrantLock();
// 條件鎖,名稱為trip,絆倒的意思,可能是指線程來了先絆倒,等達到一定數(shù)量了再喚醒
private final Condition trip = lock.newCondition();
// 需要等待的線程數(shù)量
private final int parties;
// 當喚醒的時候執(zhí)行的命令
private final Runnable barrierCommand;
// 代
private Generation generation = new Generation();
// 當前這一代還需要等待的線程數(shù)
private int count;
通過屬性可以看到,CyclicBarrier內(nèi)部是通過重入鎖的條件鎖來實現(xiàn)的,那么你可以腦補一下這個場景嗎?
彤哥來腦補一下:假如初始時count = parties = 3
,當?shù)谝粋€線程到達柵欄處,count減1,然后把它加入到Condition的隊列中,第二個線程到達柵欄處也是如此,第三個線程到達柵欄處,count減為0,調(diào)用Condition的signalAll()通知另外兩個線程,然后把它們加入到AQS的隊列中,等待當前線程運行完畢,調(diào)用lock.unlock()的時候依次從AQS的隊列中喚醒一個線程繼續(xù)運行,也就是說實際上三個線程先依次(排隊)到達柵欄處,再依次往下運行。
以上純屬彤哥腦補的內(nèi)容,真實情況是不是如此呢,且往后看。
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
// 初始化parties
this.parties = parties;
// 初始化count等于parties
this.count = parties;
// 初始化都到達柵欄處執(zhí)行的命令
this.barrierCommand = barrierAction;
}
public CyclicBarrier(int parties) {
this(parties, null);
}
構(gòu)造方法需要傳入一個parties變量,也就是需要等待的線程數(shù)。
每個需要在柵欄處等待的線程都需要顯式地調(diào)用await()方法等待其它線程的到來。
public int await() throws InterruptedException, BrokenBarrierException {
try {
// 調(diào)用dowait方法,不需要超時
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
// 加鎖
lock.lock();
try {
// 當前代
final Generation g = generation;
// 檢查
if (g.broken)
throw new BrokenBarrierException();
// 中斷檢查
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
// count的值減1
int index = --count;
// 如果數(shù)量減到0了,走這段邏輯(最后一個線程走這里)
if (index == 0) { // tripped
boolean ranAction = false;
try {
// 如果初始化的時候傳了命令,這里執(zhí)行
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
// 調(diào)用下一代方法
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// 這個循環(huán)只有非最后一個線程可以走到
for (;;) {
try {
if (!timed)
// 調(diào)用condition的await()方法
trip.await();
else if (nanos > 0L)
// 超時等待方法
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
// 檢查
if (g.broken)
throw new BrokenBarrierException();
// 正常來說這里肯定不相等
// 因為上面打破柵欄的時候調(diào)用nextGeneration()方法時generation的引用已經(jīng)變化了
if (g != generation)
return index;
// 超時檢查
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
private void nextGeneration() {
// 調(diào)用condition的signalAll()將其隊列中的等待者全部轉(zhuǎn)移到AQS的隊列中
trip.signalAll();
// 重置count
count = parties;
// 進入下一代
generation = new Generation();
}
dowait()方法里的整個邏輯分成兩部分:
(1)最后一個線程走上面的邏輯,當count減為0的時候,打破柵欄,它調(diào)用nextGeneration()方法通知條件隊列中的等待線程轉(zhuǎn)移到AQS的隊列中等待被喚醒,并進入下一代。
(2)非最后一個線程走下面的for循環(huán)邏輯,這些線程會阻塞在condition的await()方法處,它們會加入到條件隊列中,等待被通知,當它們喚醒的時候已經(jīng)更新?lián)Q“代”了,這時候返回。
學(xué)習(xí)過前面的章節(jié),看這個圖很簡單了,看不懂的同學(xué)還需要把推薦的內(nèi)容好好看看哦^^
(1)CyclicBarrier會使一組線程阻塞在await()處,當最后一個線程到達時喚醒(只是從條件隊列轉(zhuǎn)移到AQS隊列中)前面的線程大家再繼續(xù)往下走;
(2)CyclicBarrier不是直接使用AQS實現(xiàn)的一個同步器;
(3)CyclicBarrier基于ReentrantLock及其Condition實現(xiàn)整個同步邏輯;
CyclicBarrier與CountDownLatch的異同?
(1)兩者都能實現(xiàn)阻塞一組線程等待被喚醒;
(2)前者是最后一個線程到達時自動喚醒;
(3)后者是通過顯式地調(diào)用countDown()實現(xiàn)的;
(4)前者是通過重入鎖及其條件鎖實現(xiàn)的,后者是直接基于AQS實現(xiàn)的;
(5)前者具有“代”的概念,可以重復(fù)使用,后者只能使用一次;
(6)前者只能實現(xiàn)多個線程到達柵欄處一起運行;
(7)后者不僅可以實現(xiàn)多個線程等待一個線程條件成立,還能實現(xiàn)一個線程等待多個線程條件成立(詳見CountDownLatch那章使用案例);
1、死磕 java同步系列之開篇
2、死磕 java魔法類之Unsafe解析
3、死磕 java同步系列之JMM(Java Memory Model)
4、死磕 java同步系列之volatile解析
5、死磕 java同步系列之synchronized解析
6、死磕 java同步系列之自己動手寫一個鎖Lock
7、死磕 java同步系列之AQS起篇
8、死磕 java同步系列之ReentrantLock源碼解析(一)——公平鎖、非公平鎖
9、死磕 java同步系列之ReentrantLock源碼解析(二)——條件鎖
10、死磕 java同步系列之ReentrantLock VS synchronized
11、死磕 java同步系列之ReentrantReadWriteLock源碼解析
12、死磕 java同步系列之Semaphore源碼解析
13、死磕 java同步系列之CountDownLatch源碼解析
14、死磕 java同步系列之AQS終篇
15、死磕 java同步系列之StampedLock源碼解析
歡迎關(guān)注我的公眾號“彤哥讀源碼”,查看更多源碼系列文章, 與彤哥一起暢游源碼的海洋。
另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)scvps.cn,海內(nèi)外云服務(wù)器15元起步,三天無理由+7*72小時售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國服務(wù)器、虛擬主機、免備案服務(wù)器”等云主機租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡單易用、服務(wù)可用性高、性價比高”等特點與優(yōu)勢,專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場景需求。