老熟女激烈的高潮_日韩一级黄色录像_亚洲1区2区3区视频_精品少妇一区二区三区在线播放_国产欧美日产久久_午夜福利精品导航凹凸

重慶分公司,新征程啟航

為企業提供網站建設、域名注冊、服務器等服務

Java并發結合源碼分析AQS原理

前言:

創新互聯建站自2013年創立以來,是專業互聯網技術服務公司,擁有項目做網站、網站制作網站策劃,項目實施與項目整合能力。我們以讓每一個夢想脫穎而出為使命,1280元蔡甸做網站,已為上家服務,為蔡甸各地企業和個人服務,聯系電話:028-86922220

如果說J.U.C包下的核心是什么?那我想答案只有一個就是AQS。那么AQS是什么呢?接下來讓我們一起揭開AQS的神秘面紗

AQS是什么?

AQS是AbstractQueuedSynchronizer的簡稱。為什么說它是核心呢?是因為它提供了一個基于FIFO的隊列和state變量來構建鎖和其他同步裝置的基礎框架。下面是其底層的數據結構。

AQS的特點

1、其內使用Node實現FIFO(FirstInFirstOut)隊列。可用于構建鎖或者其他同步裝置的基礎框架

2、且利用了一個int類表示狀態。在AQS中維護了一個volatile int state,通常表示有線程訪問資源的狀態,當state>1的時候表示線程重入的數量,主要有三個方法控制:getState(),setState(),CompareAndSetState()。后面的源碼分析多用到這幾個方法

3、使用方法是繼承,子類通過繼承并通過實現它的方法管理其狀態(acquire和release)的方法操縱狀態。

4、同時實現排它鎖和共享鎖模式。實際上AQS功能主要分為兩類:獨占(只有一個線程能執行)和共享(多個線程同時執行),它的子類要么使用獨占功能要么使用共享功能,而ReentrantLock是通過兩個內部類來實現獨占和共享

CountDownLatch如何借助AQS實現計數功能?

先來說一下CountDownLatch,CountDownLatch是一個同步輔助類,通過它可以來完成類似阻塞當前線程的功能,即一個或多個線程一起等待,直到其他線程執行的操作完成。要實現上面的功能,CountDownLatch是通過一個給定的原子操作的計數器來實現。調用該類的await()方法的線程會一直處于阻塞狀態,直到其他線程調用countDown()方法使得計數器的值變為0之后線程才會執行,這個計數器是不能被重置的。通常這個類會用在程序執行需要等待某個條件完成的場景,比如說并行計算,可將一個數據量很大的計算拆分成一個個子任務,當子任務完成之后,再將最終的結果匯總。每次訪問CountDownLatch只能有一個線程,但是這個線程在使用完countDown()方法之后能多個線程能繼續運行,而調用await()方法的線程就一定要計數器為0才會運行

下面來分析CountDownLatch的源碼以及如何使用AQS框架

public class CountDownLatch {
  /**
   * CountDownLatch 實現同步控制
   * 底層是使用AQS的state來代表count
   */
  private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;
    //初始化內部類實際上是設置AQS的state
    Sync(int count) {
      setState(count);
    }

    int getCount() {
      return getState();
    }
    //嘗試獲取共享是看當前的state是否為0
    protected int tryAcquireShared(int acquires) {
      return (getState() == 0) ? 1 : -1;
    }
    /*嘗試釋放共享鎖則是遞減計數直到state==0就返回false代表資源已經釋放完全否則就會使用CAS來讓state減一*/
    protected boolean tryReleaseShared(int releases) {
      
      for (;;) {
        int c = getState();
        if (c == 0)
          return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
          return nextc == 0;
      }
    }
  }

  private final Sync sync;

  /**
   * 初始化CountDownLatch,實際上是初始化內部類,實際上是設置AQS的state,count不能小于0
   */
  public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
  }

  /**
   * 這里實際上是調用了AQS里的acquireSharedInterruptibly方法,完成的功能就是先去查看線程是否被中斷,中斷則拋出異常,沒有被中斷就會嘗試獲取共享資源。   * 注意在syn內部類中重寫了tryAcquireShared,也就是當state為0就返回1,這時候就會將當前線程放入AQS的隊列中去,也就是這時候線程可以不再阻塞而是嘗試去獲取鎖
   */
  public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
  }

  /**
   * 原理同上面方法,但是加了一個時間參數來設置等待的時間
   */
  public boolean await(long timeout, TimeUnit unit)
    throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
  }

  /**
   * 這里傳入參數為1,同樣上面內部類一樣重寫了AQS的tryReleaseShared方法,使用這個重寫的方法來讓計數器原子操作的減一
   */
  public void countDown() {
    sync.releaseShared(1);
  }

  /**
   * 就是獲取AQS的state
   */
  public long getCount() {
    return sync.getCount();
  }

  /**
   * 轉換成字符串的方法
   */
  public String toString() {
    return super.toString() + "[Count = " + sync.getCount() + "]";
  }
}

由上面代碼可看見CountDownLatch實現了AQS的共享鎖,原理是操作state來實現計數,并且重寫了tryAcquireShared(),tryReleaseShared()等方法

Semaphore是如何借助AQS實現控制并發訪問線程個數?

Semaphore的功能類似于操作系統的信號量,可以很方便的控制某個資源同時被幾個線程訪問,即做并發訪問控制,與CountDownLatch類似,同樣是實現獲取和釋放兩個方法。Semaphore的使用場景:常用于僅能提供訪問的資源,比如數據庫的連接數最大只有30,而應用程序的并發數可能遠遠大于30,這時候就可以使用Semaphore來控制同時訪問的線程數。當Semaphore控制線程數到1的時候就和我們單線程一樣了。同樣Semaphore說是信號量的意思,我們這里就可以把它理解為十字路口的紅綠燈,可以控制車流量(這里是控制線程數)

下面來分析Semaphore的源碼以及如何使用AQS框

public class Semaphore implements java.io.Serializable {  private static final long serialVersionUID = -3222578661600680210L;  /** 所有機制都通過AbstractQueuedSynchronizer子類實現 */
  private final Sync sync;

  /**
   * 同樣是通過內部類來實現AQS主要功能,使用state來表示許可證數量
   */
  abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 1192457210091910933L;

    Sync(int permits) {
      setState(permits);
    }

    final int getPermits() {
      return getState();
    }
    /*
    * 不公平的獲取方式,會有一個搶占鎖的情況,即線程執行順序會亂
    */
    final int nonfairTryAcquireShared(int acquires) {
      for (;;) {
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
          compareAndSetState(available, remaining))
          return remaining;
      }
    }
    /*
    * 釋放資源
    */  
    protected final boolean tryReleaseShared(int releases) {
      for (;;) {
        int current = getState();
        int next = current + releases;
        if (next < current) // overflow
          throw new Error("Maximum permit count exceeded");
        if (compareAndSetState(current, next))
          return true;
      }
    }

    final void reducePermits(int reductions) {
      for (;;) {
        int current = getState();
        int next = current - reductions;
        if (next > current) // underflow
          throw new Error("Permit count underflow");
        if (compareAndSetState(current, next))
          return;
      }
    }

    final int drainPermits() {
      for (;;) {
        int current = getState();
        if (current == 0 || compareAndSetState(current, 0))
          return current;
      }
    }
  }

  /**
   * 不公平的sync版本,使用的就是sync定義的不公平鎖
   */
  static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -2694183684443567898L;

    NonfairSync(int permits) {
      super(permits);
    }

    protected int tryAcquireShared(int acquires) {
      return nonfairTryAcquireShared(acquires);
    }
  }

  /**
   * 公平版本,獲取鎖的線程順序就是線程啟動的順序。具體是使用hasQueuedPredecessors()方法判斷“當前線程”是不是CLH隊列中的第一個線程。   * 若不是的話,則返回-1,是就設置獲取許可證,并檢查許可證數量是否足夠
   */
  static final class FairSync extends Sync {
    private static final long serialVersionUID = 2014338818796000944L;

    FairSync(int permits) {
      super(permits);
    }

    protected int tryAcquireShared(int acquires) {
      for (;;) {
        if (hasQueuedPredecessors())
          return -1;
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
          compareAndSetState(available, remaining))
          return remaining;
      }
    }
  }

  /**
   * 默認使用不公平的版本,如果需要公平的,則需要兩個參數
   */
  public Semaphore(int permits) {
    sync = new NonfairSync(permits);
  }

  
  public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
  }

  /**
   * 分析同CountDownLatch中的類似方法,具體的實現都是內部類中的獲取方法,這里是獲取一個許可
   */
  public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
  }

  /**
   *功能同上,但是這里不會檢測線程是否被中斷
   */
  public void acquireUninterruptibly() {
    sync.acquireShared(1);
  }

  /**
   * 嘗試獲取
   */
  public boolean tryAcquire() {
    return sync.nonfairTryAcquireShared(1) >= 0;
  }

  /**
   * 在一段時間內一直嘗試獲取許可
   */
  public boolean tryAcquire(long timeout, TimeUnit unit)
    throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
  }

  /**
   * 當前線程釋放一個許可證
   */
  public void release() {
    sync.releaseShared(1);
  }

  /**
   * 可以規定一個線程獲得許可證的數量
   */
  public void acquire(int permits) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);
  }

  public void acquireUninterruptibly(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireShared(permits);
  }

  public boolean tryAcquire(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    return sync.nonfairTryAcquireShared(permits) >= 0;
  }

  public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
    throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
  }

  /**
   * 同樣可以規定一個線程釋放許可證的數量
   */
  public void release(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.releaseShared(permits);
  }

  /**
   * 當前的許可還剩幾個
   */
  public int availablePermits() {
    return sync.getPermits();
  }

  /**
   * 銷毀所有許可
   */
  public int drainPermits() {
    return sync.drainPermits();
  }

  protected void reducePermits(int reduction) {
    if (reduction < 0) throw new IllegalArgumentException();
    sync.reducePermits(reduction);
  }

  public final boolean hasQueuedThreads() {
    return sync.hasQueuedThreads();
  }

  public final int getQueueLength() {
    return sync.getQueueLength();
  }

  protected Collection getQueuedThreads() {
    return sync.getQueuedThreads();
  }
  public String toString() {
    return super.toString() + "[Permits = " + sync.getPermits() + "]";
  }}

上面對于Semaphore的一些重要內部類和常用方法進行了解釋,與CountDownLatch很類似,實現的都是共享的功能,即Semaphore允許得到許可證的線程同時執行,而CountDownLatch允許調用countDown()方法的線程同時執行。并且都是通過內部類實現的。相信看到這里,你能越來越看見AQS為什么被稱作JUC包的核心。下面就來介紹一下ReentrantLock

ReentrantLock是如何借助AQS實現鎖機制

ReentrantLock是可重入鎖,前面博客中寫到synchronized實現的鎖也是可重入的。不過synchronized是基于JVM指令實現,而ReentrantLock是使用Java代碼實現的。ReentrantLock重點就是需要我們手動聲明加鎖和釋放鎖,如果手工忘記釋放鎖,很有可能就會導致死鎖,即資源永遠都被鎖住,其他線程無法得到,當前線程也釋放不出去。ReentrantLock實現的是自旋鎖,通過循環調用CAS操作實現加鎖,避免了線程進入內核態的阻塞狀態,所以性能較好。ReentrantLock內部同樣實現了公平鎖和非公平鎖。事實上Synchronized能做的ReentrantLock都能做,但是反過來就不一樣了、

經過前面的源碼分析我們發現核心的都在當前類的內部類里,而當前類的一些方法不過是使用的內部類以及AQS的方法罷了,所以下面我們就來分析ReentrantLock中的三個內部類。

public class ReentrantLock implements Lock, java.io.Serializable {
  private static final long serialVersionUID = 7373984872572414699L;
  /** 同步的機制都是通過內部類來實現的 */
  private final Sync sync;

  /**
   * 在ReentrantLock中state表示的是線程重入鎖的次數,當state為0時才能釋放鎖
   */
  abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = -5179523762034025860L;

    /**
     * 這個抽象方法提供給公平鎖和不公平鎖來單獨實現,父類不實現
     */
    abstract void lock();

    /**
     * 首先得到當前線程,而后獲取state,如果state為0,也就是沒有線程獲得當前鎖,那么就設置當前線程擁有當前鎖的獨占訪問權,并且返回true。     * 如果state不為0,那么就看當前線程是否是已經獲得過鎖的線程,如果是就讓state+=acquire,acquire一般是1,即表示線程重入并且返回true。     * 上面兩個條件都不滿足就代表是鎖被其他線程獲取了,當前線程獲取不到,所以返回false
     */
    final boolean nonfairTryAcquire(int acquires) {
      final Thread current = Thread.currentThread();
      int c = getState();
      if (c == 0) {
        if (compareAndSetState(0, acquires)) {
          setExclusiveOwnerThread(current);
          return true;
        }
      }
      else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
          throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
      }
      return false;
    }
    /**
     * 先判斷當前線程等不等于擁有鎖的線程,不等于就會拋異常,也就是釋放不了。     * 等于之后就看state-releases是否為0,當為0的時候就代表釋放完全。     * 可以設置鎖的狀態為沒有線程擁有,從而讓鎖能被其他線程競爭,否則就設置state,代表線程重入該鎖,并且線程還沒釋放完全。
     */
    protected final boolean tryRelease(int releases) {
      int c = getState() - releases;
      if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
      boolean free = false;
      if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
      }
      setState(c);
      return free;
    }
    /*
     *該方法檢驗當前線程是否是鎖的獨占者
     */
    protected final boolean isHeldExclusively() {
      return getExclusiveOwnerThread() == Thread.currentThread();
    }
    /*
     *該方法是創建一個條件鎖,本文不做具體分析
     */
    final ConditionObject newCondition() {
      return new ConditionObject();
    }

    // Methods relayed from outer class

    final Thread getOwner() {
      return getState() == 0 ? null : getExclusiveOwnerThread();
    }

    final int getHoldCount() {
      return isHeldExclusively() ? getState() : 0;
    }

    final boolean isLocked() {
      return getState() != 0;
    }

    /**
     * 使得該類從流中能重構實例,并且會重置為解鎖狀態
     */
    private void readObject(java.io.ObjectInputStream s)
      throws java.io.IOException, ClassNotFoundException {
      s.defaultReadObject();
      setState(0); 
    }
  }

  /**
   * Sync 的不公平版本
   */
  static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;

    /**
     * 將state從0更新到1成功的話就讓當前線程獲取鎖,否則就會嘗試獲得鎖和獲取當前節點的前一節點,并判斷這一個節點是否為頭節點,即當前線程是不是頭節點的直接后繼。     * 如果兩個中有一個失敗則線程中斷,進入阻塞狀態
     */
    final void lock() {
      if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
      else
        acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
      return nonfairTryAcquire(acquires);
    }
  }

  /**
   * Sync 的公平版本
   */
  static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;
    /**
    * 嘗試獲得鎖和獲取當前節點的前一節點,并判斷這一個節點是否為頭節點,即當前線程是不是頭節點的直接后繼,如果兩個中有一個失敗則線程中斷,進入阻塞狀態。    * 也就是一定按照隊列中線程的順序來實現
    */
    final void lock() {
      acquire(1);
    }

    /**
     * 跟不公平的版本相比其實是在state為0的時候檢查當前線程是不是在隊列的頭部節點的直接后繼,來達到公平的概念
     */
    protected final boolean tryAcquire(int acquires) {
      final Thread current = Thread.currentThread();
      int c = getState();
      if (c == 0) {
        if (!hasQueuedPredecessors() &&
          compareAndSetState(0, acquires)) {
          setExclusiveOwnerThread(current);
          return true;
        }
      }
      else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
          throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
      }
      return false;
    }
  }
}

ReentrantLock和上面兩個類最不同的莫過于ReentrantLock使用的是獨占功能,即每次只能有一個線程來獲取ReentrantLock類。ReentrantLock類下還有很多方法,這里就不一一介紹,但是本質都是內部類中的實現以及AQS的一些調用

總結:

AQS只是一個基礎的框架,里面最核心的就是維護了state變量和CHL隊列,而其他的類全部都是通過繼承的方法進行擴展,雖然沒有直接說源碼,但是通過上面三個主要類的源碼分析再去看AQS已經不是難事。繼承主要改變的就是獲取和釋放的方法,通過這兩個方法來對state和隊列進行操作達到我們能夠進行的并發控制的功能,事實上J.U.C包下的類和能夠實現的功能遠不止這三個,后面會選擇重點的來介紹。

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持創新互聯。


分享文章:Java并發結合源碼分析AQS原理
新聞來源:http://www.xueling.net.cn/article/jgohii.html

其他資訊

在線咨詢
服務熱線
服務熱線:028-86922220
TOP
主站蜘蛛池模板: 男人干女人逼 | 国产人澡人澡澡澡人碰视 | 激情综合色综合啪啪开心 | 国产99久久一区二区三区 | 国产盗摄一区二区三区在线 | 国产一级黄色 | 天天躁日日躁狠狠躁婷婷 | 高潮岳喷我一脸 | 精品人妻系列无码专区久久 | 亚洲第一无码精品一区 | 美女一级| 相部屋在线 | 91艹逼视频| 日本高清VA在线播放 | h视频在线免费观看 | 国产精品99久久久久久www | 国产Chinese男男GAy视频网 | 秋霞麻豆| 色爱无码AV综合区老司机非洲 | 尤物99国产成人精品视频 | 国产又粗又黄又爽又硬的软件 | 国产一区二区三区免费在线观看 | 中文字幕久久精品无码 | 久草在线久草在线2 | 国产成人动漫 | 国产亚洲久一区二区 | 天天射日日操 | 全部免费A片在线观看VR系列 | 日本特黄a级片 | 欧美激情一区二区久久久 | 激情综合网五月激情 | 欧美丝袜一区二区三区 | 麻豆人人妻人人妻人人片AV | 91爱在线?看 | 亚洲一区二区中文字幕在线观看 | 久久精品毛片免费观看 | 91精品久久久久久久久久另类 | 少妇性饥渴姓交HDSEX | 日韩字幕在线观看 | 色综合久久天天综合网 | 美女视频黄频大全免费的 |