重慶分公司,新征程啟航
為企業提供網站建設、域名注冊、服務器等服務
為企業提供網站建設、域名注冊、服務器等服務
前言:
創新互聯建站自2013年創立以來,是專業互聯網技術服務公司,擁有項目做網站、網站制作網站策劃,項目實施與項目整合能力。我們以讓每一個夢想脫穎而出為使命,1280元蔡甸做網站,已為上家服務,為蔡甸各地企業和個人服務,聯系電話:028-86922220
如果說J.U.C包下的核心是什么?那我想答案只有一個就是AQS。那么AQS是什么呢?接下來讓我們一起揭開AQS的神秘面紗
AQS是什么?
AQS是AbstractQueuedSynchronizer的簡稱。為什么說它是核心呢?是因為它提供了一個基于FIFO的隊列和state變量來構建鎖和其他同步裝置的基礎框架。下面是其底層的數據結構。
AQS的特點
1、其內使用Node實現FIFO(FirstInFirstOut)隊列。可用于構建鎖或者其他同步裝置的基礎框架
2、且利用了一個int類表示狀態。在AQS中維護了一個volatile int state,通常表示有線程訪問資源的狀態,當state>1的時候表示線程重入的數量,主要有三個方法控制:getState(),setState(),CompareAndSetState()。后面的源碼分析多用到這幾個方法
3、使用方法是繼承,子類通過繼承并通過實現它的方法管理其狀態(acquire和release)的方法操縱狀態。
4、同時實現排它鎖和共享鎖模式。實際上AQS功能主要分為兩類:獨占(只有一個線程能執行)和共享(多個線程同時執行),它的子類要么使用獨占功能要么使用共享功能,而ReentrantLock是通過兩個內部類來實現獨占和共享
CountDownLatch如何借助AQS實現計數功能?
先來說一下CountDownLatch,CountDownLatch是一個同步輔助類,通過它可以來完成類似阻塞當前線程的功能,即一個或多個線程一起等待,直到其他線程執行的操作完成。要實現上面的功能,CountDownLatch是通過一個給定的原子操作的計數器來實現。調用該類的await()方法的線程會一直處于阻塞狀態,直到其他線程調用countDown()方法使得計數器的值變為0之后線程才會執行,這個計數器是不能被重置的。通常這個類會用在程序執行需要等待某個條件完成的場景,比如說并行計算,可將一個數據量很大的計算拆分成一個個子任務,當子任務完成之后,再將最終的結果匯總。每次訪問CountDownLatch只能有一個線程,但是這個線程在使用完countDown()方法之后能多個線程能繼續運行,而調用await()方法的線程就一定要計數器為0才會運行
下面來分析CountDownLatch的源碼以及如何使用AQS框架
public class CountDownLatch { /** * CountDownLatch 實現同步控制 * 底層是使用AQS的state來代表count */ private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; //初始化內部類實際上是設置AQS的state Sync(int count) { setState(count); } int getCount() { return getState(); } //嘗試獲取共享是看當前的state是否為0 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } /*嘗試釋放共享鎖則是遞減計數直到state==0就返回false代表資源已經釋放完全否則就會使用CAS來讓state減一*/ protected boolean tryReleaseShared(int releases) { for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } } private final Sync sync; /** * 初始化CountDownLatch,實際上是初始化內部類,實際上是設置AQS的state,count不能小于0 */ public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } /** * 這里實際上是調用了AQS里的acquireSharedInterruptibly方法,完成的功能就是先去查看線程是否被中斷,中斷則拋出異常,沒有被中斷就會嘗試獲取共享資源。 * 注意在syn內部類中重寫了tryAcquireShared,也就是當state為0就返回1,這時候就會將當前線程放入AQS的隊列中去,也就是這時候線程可以不再阻塞而是嘗試去獲取鎖 */ public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } /** * 原理同上面方法,但是加了一個時間參數來設置等待的時間 */ public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } /** * 這里傳入參數為1,同樣上面內部類一樣重寫了AQS的tryReleaseShared方法,使用這個重寫的方法來讓計數器原子操作的減一 */ public void countDown() { sync.releaseShared(1); } /** * 就是獲取AQS的state */ public long getCount() { return sync.getCount(); } /** * 轉換成字符串的方法 */ public String toString() { return super.toString() + "[Count = " + sync.getCount() + "]"; } }
由上面代碼可看見CountDownLatch實現了AQS的共享鎖,原理是操作state來實現計數,并且重寫了tryAcquireShared(),tryReleaseShared()等方法
Semaphore是如何借助AQS實現控制并發訪問線程個數?
Semaphore的功能類似于操作系統的信號量,可以很方便的控制某個資源同時被幾個線程訪問,即做并發訪問控制,與CountDownLatch類似,同樣是實現獲取和釋放兩個方法。Semaphore的使用場景:常用于僅能提供訪問的資源,比如數據庫的連接數最大只有30,而應用程序的并發數可能遠遠大于30,這時候就可以使用Semaphore來控制同時訪問的線程數。當Semaphore控制線程數到1的時候就和我們單線程一樣了。同樣Semaphore說是信號量的意思,我們這里就可以把它理解為十字路口的紅綠燈,可以控制車流量(這里是控制線程數)
下面來分析Semaphore的源碼以及如何使用AQS框
public class Semaphore implements java.io.Serializable { private static final long serialVersionUID = -3222578661600680210L; /** 所有機制都通過AbstractQueuedSynchronizer子類實現 */ private final Sync sync; /** * 同樣是通過內部類來實現AQS主要功能,使用state來表示許可證數量 */ abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 1192457210091910933L; Sync(int permits) { setState(permits); } final int getPermits() { return getState(); } /* * 不公平的獲取方式,會有一個搶占鎖的情況,即線程執行順序會亂 */ final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } /* * 釋放資源 */ protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } } final void reducePermits(int reductions) { for (;;) { int current = getState(); int next = current - reductions; if (next > current) // underflow throw new Error("Permit count underflow"); if (compareAndSetState(current, next)) return; } } final int drainPermits() { for (;;) { int current = getState(); if (current == 0 || compareAndSetState(current, 0)) return current; } } } /** * 不公平的sync版本,使用的就是sync定義的不公平鎖 */ static final class NonfairSync extends Sync { private static final long serialVersionUID = -2694183684443567898L; NonfairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } } /** * 公平版本,獲取鎖的線程順序就是線程啟動的順序。具體是使用hasQueuedPredecessors()方法判斷“當前線程”是不是CLH隊列中的第一個線程。 * 若不是的話,則返回-1,是就設置獲取許可證,并檢查許可證數量是否足夠 */ static final class FairSync extends Sync { private static final long serialVersionUID = 2014338818796000944L; FairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { for (;;) { if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } } /** * 默認使用不公平的版本,如果需要公平的,則需要兩個參數 */ public Semaphore(int permits) { sync = new NonfairSync(permits); } public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); } /** * 分析同CountDownLatch中的類似方法,具體的實現都是內部類中的獲取方法,這里是獲取一個許可 */ public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); } /** *功能同上,但是這里不會檢測線程是否被中斷 */ public void acquireUninterruptibly() { sync.acquireShared(1); } /** * 嘗試獲取 */ public boolean tryAcquire() { return sync.nonfairTryAcquireShared(1) >= 0; } /** * 在一段時間內一直嘗試獲取許可 */ public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } /** * 當前線程釋放一個許可證 */ public void release() { sync.releaseShared(1); } /** * 可以規定一個線程獲得許可證的數量 */ public void acquire(int permits) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException(); sync.acquireSharedInterruptibly(permits); } public void acquireUninterruptibly(int permits) { if (permits < 0) throw new IllegalArgumentException(); sync.acquireShared(permits); } public boolean tryAcquire(int permits) { if (permits < 0) throw new IllegalArgumentException(); return sync.nonfairTryAcquireShared(permits) >= 0; } public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException(); return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout)); } /** * 同樣可以規定一個線程釋放許可證的數量 */ public void release(int permits) { if (permits < 0) throw new IllegalArgumentException(); sync.releaseShared(permits); } /** * 當前的許可還剩幾個 */ public int availablePermits() { return sync.getPermits(); } /** * 銷毀所有許可 */ public int drainPermits() { return sync.drainPermits(); } protected void reducePermits(int reduction) { if (reduction < 0) throw new IllegalArgumentException(); sync.reducePermits(reduction); } public final boolean hasQueuedThreads() { return sync.hasQueuedThreads(); } public final int getQueueLength() { return sync.getQueueLength(); } protected CollectiongetQueuedThreads() { return sync.getQueuedThreads(); } public String toString() { return super.toString() + "[Permits = " + sync.getPermits() + "]"; }}
上面對于Semaphore的一些重要內部類和常用方法進行了解釋,與CountDownLatch很類似,實現的都是共享的功能,即Semaphore允許得到許可證的線程同時執行,而CountDownLatch允許調用countDown()方法的線程同時執行。并且都是通過內部類實現的。相信看到這里,你能越來越看見AQS為什么被稱作JUC包的核心。下面就來介紹一下ReentrantLock
ReentrantLock是如何借助AQS實現鎖機制
ReentrantLock是可重入鎖,前面博客中寫到synchronized實現的鎖也是可重入的。不過synchronized是基于JVM指令實現,而ReentrantLock是使用Java代碼實現的。ReentrantLock重點就是需要我們手動聲明加鎖和釋放鎖,如果手工忘記釋放鎖,很有可能就會導致死鎖,即資源永遠都被鎖住,其他線程無法得到,當前線程也釋放不出去。ReentrantLock實現的是自旋鎖,通過循環調用CAS操作實現加鎖,避免了線程進入內核態的阻塞狀態,所以性能較好。ReentrantLock內部同樣實現了公平鎖和非公平鎖。事實上Synchronized能做的ReentrantLock都能做,但是反過來就不一樣了、
經過前面的源碼分析我們發現核心的都在當前類的內部類里,而當前類的一些方法不過是使用的內部類以及AQS的方法罷了,所以下面我們就來分析ReentrantLock中的三個內部類。
public class ReentrantLock implements Lock, java.io.Serializable { private static final long serialVersionUID = 7373984872572414699L; /** 同步的機制都是通過內部類來實現的 */ private final Sync sync; /** * 在ReentrantLock中state表示的是線程重入鎖的次數,當state為0時才能釋放鎖 */ abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -5179523762034025860L; /** * 這個抽象方法提供給公平鎖和不公平鎖來單獨實現,父類不實現 */ abstract void lock(); /** * 首先得到當前線程,而后獲取state,如果state為0,也就是沒有線程獲得當前鎖,那么就設置當前線程擁有當前鎖的獨占訪問權,并且返回true。 * 如果state不為0,那么就看當前線程是否是已經獲得過鎖的線程,如果是就讓state+=acquire,acquire一般是1,即表示線程重入并且返回true。 * 上面兩個條件都不滿足就代表是鎖被其他線程獲取了,當前線程獲取不到,所以返回false */ final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } /** * 先判斷當前線程等不等于擁有鎖的線程,不等于就會拋異常,也就是釋放不了。 * 等于之后就看state-releases是否為0,當為0的時候就代表釋放完全。 * 可以設置鎖的狀態為沒有線程擁有,從而讓鎖能被其他線程競爭,否則就設置state,代表線程重入該鎖,并且線程還沒釋放完全。 */ protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } /* *該方法檢驗當前線程是否是鎖的獨占者 */ protected final boolean isHeldExclusively() { return getExclusiveOwnerThread() == Thread.currentThread(); } /* *該方法是創建一個條件鎖,本文不做具體分析 */ final ConditionObject newCondition() { return new ConditionObject(); } // Methods relayed from outer class final Thread getOwner() { return getState() == 0 ? null : getExclusiveOwnerThread(); } final int getHoldCount() { return isHeldExclusively() ? getState() : 0; } final boolean isLocked() { return getState() != 0; } /** * 使得該類從流中能重構實例,并且會重置為解鎖狀態 */ private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { s.defaultReadObject(); setState(0); } } /** * Sync 的不公平版本 */ static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /** * 將state從0更新到1成功的話就讓當前線程獲取鎖,否則就會嘗試獲得鎖和獲取當前節點的前一節點,并判斷這一個節點是否為頭節點,即當前線程是不是頭節點的直接后繼。 * 如果兩個中有一個失敗則線程中斷,進入阻塞狀態 */ final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } } /** * Sync 的公平版本 */ static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; /** * 嘗試獲得鎖和獲取當前節點的前一節點,并判斷這一個節點是否為頭節點,即當前線程是不是頭節點的直接后繼,如果兩個中有一個失敗則線程中斷,進入阻塞狀態。 * 也就是一定按照隊列中線程的順序來實現 */ final void lock() { acquire(1); } /** * 跟不公平的版本相比其實是在state為0的時候檢查當前線程是不是在隊列的頭部節點的直接后繼,來達到公平的概念 */ protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } } }
ReentrantLock和上面兩個類最不同的莫過于ReentrantLock使用的是獨占功能,即每次只能有一個線程來獲取ReentrantLock類。ReentrantLock類下還有很多方法,這里就不一一介紹,但是本質都是內部類中的實現以及AQS的一些調用
總結:
AQS只是一個基礎的框架,里面最核心的就是維護了state變量和CHL隊列,而其他的類全部都是通過繼承的方法進行擴展,雖然沒有直接說源碼,但是通過上面三個主要類的源碼分析再去看AQS已經不是難事。繼承主要改變的就是獲取和釋放的方法,通過這兩個方法來對state和隊列進行操作達到我們能夠進行的并發控制的功能,事實上J.U.C包下的類和能夠實現的功能遠不止這三個,后面會選擇重點的來介紹。
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持創新互聯。