重慶分公司,新征程啟航
為企業提供網站建設、域名注冊、服務器等服務
為企業提供網站建設、域名注冊、服務器等服務
Reactive 直接翻譯的意思式反應式,反應性。咋一看,似乎不太好懂。
舉個例子:在 Excel 里,C 單元格上設置函數 Sum(A+B),當你改變單元格 A 或者單元格 B 的數值時,單元格 C 的值同時也會發生變化。這種行為就是 Reactive。
在計算機編程領域,Reactive 一般指的是 Reactive programming。指的是一種面向數據流并傳播事件的異步編程范式(asynchronous programming paradigm)。
先舉個例子大家感受一下:
public static void main(String[] args) { FluxProcessorpublisher = UnicastProcessor.create(); publisher.doOnNext(event -> System.out.println("receive event: " + event)).subscribe(); publisher.onNext(1); // print 'receive event: 1' publisher.onNext(2); // print 'receive event: 2' }
代碼 1
以上例代碼(使用 Reactor 類庫)為例,publisher 產生了數據流 (1,2),并且傳播給了 OnNext 事件, 上例中 lambda 響應了該事件,輸出了相應的信息。上例代碼中生成數據流和注冊/執行 lambda 是在同一線程中,但也可以在不同線程中。
注:如果上述代碼執行邏輯有些疑惑,可以暫時將 lambda 理解成 callback 就可以了。
對于 Reactive 現在你應該大致有一點感覺了,但是 Reactive 有什么價值,有哪些設計原則,估計你還是有些模糊。這就是 Reactive Manifesto 要解決的疑問了。
使用 Reactive 方式構建的系統具有以下特征:
即時響應性 (Responsive)
只要有可能, 系統就會及時地做出響應。即時響應是可用性和實用性的基石, 而更加重要的是,即時響應意味著可以快速地檢測到問題并且有效地對其進行處理。即時響應的系統專注于提供快速而一致的響應時間, 確立可靠的反饋上限, 以提供一致的服務質量。這種一致的行為轉而將簡化錯誤處理、 建立最終用戶的信任并促使用戶與系統作進一步的互動。
回彈性 (Resilient)
系統在出現失敗時依然保持即時響應性。這不僅適用于高可用的、 任務關鍵型系統——任何不具備回彈性的系統都將會在發生失敗之后丟失即時響應性。回彈性是通過復制、 遏制、 隔離以及委托來實現的。失敗的擴散被遏制在了每個組件內部, 與其他組件相互隔離, 從而確保系統某部分的失敗不會危及整個系統,并能獨立恢復。每個組件的恢復都被委托給了另一個(外部的)組件, 此外,在必要時可以通過復制來保證高可用性。(因此)組件的客戶端不再承擔組件失敗的處理。
彈性 (Elastic)
系統在不斷變化的工作負載之下依然保持即時響應性。反應式系統可以對輸入(負載)的速率變化做出反應,比如通過增加或者減少被分配用于服務這些輸入(負載)的資源。這意味著設計上并沒有爭用點和中央瓶頸, 得以進行組件的分片或者復制, 并在它們之間分布輸入(負載)。通過提供相關的實時性能指標, 反應式系統能支持預測式以及反應式的伸縮算法。這些系統可以在常規的硬件以及軟件平臺上實現成本高效的彈性。
消息驅動 (Message Driven)
反應式系統依賴異步的消息傳遞,從而確保了松耦合、隔離、位置透明的組件之間有著明確邊界。這一邊界還提供了將失敗作為消息委托出去的手段。使用顯式的消息傳遞,可以通過在系統中塑造并監視消息流隊列, 并在必要時應用回壓, 從而實現負載管理、 彈性以及流量控制。使用位置透明的消息傳遞作為通信的手段, 使得跨集群或者在單個主機中使用相同的結構成分和語義來管理失敗成為了可能。非阻塞的通信使得接收者可以只在活動時才消耗資源, 從而減少系統開銷。
注:
知道了 Reactive 的概念,特征和價值后,是否有相關的產品或者框架來幫助我們構建 Reactive 式系統呢?在早些時候有一些類庫 (Rxjava 1.x, Rx.Net) 可以使用,但是規范并不統一,所以后來 Netfilx, Pivotal 等公司就制定了一套規范指導大家便于實現它(該規范也是受到早期產品的啟發),這就是 Reactive Stream 的作用。
Reactive Stream 是一個使用非阻塞 back pressure(回壓)實現異步流式數據處理的標準。目前已經在 JVM 和 JavaScript 語言中實現同一套語意的規范;以及嘗試在各種涉及到序列化和反序列化的傳輸協議(TCP, UDP, HTTP and WebSockets)基礎上,定義傳輸 reactive 數據流的網絡協議。
The purpose of Reactive Streams is to provide a standard for asynchronous stream processing with non-blocking backpressure.
當遇到未預料數據流時,依然可以在可控資源消耗下保持系統的可用性。
控制在一個異步邊界的流式數據交換。例如傳遞一個數據到另外一個線程或者線程池,確保接收方沒有 buffer(緩存)任意數量的數據。而 back pressure(回壓)是解決這種場景的不可或缺的特性。
此標準只描述通過回壓來實現異步流式數據交換的必要的行為和實體,最小接口,例如下方的 Publisher, Subscriber。Reactive Streams 只關注在這些組件之間的流式數據中轉,并不關注流式數據本身的組裝,分割,轉換等行為, 例如 map, zip 等 operator。Reactive Streams 規范包括:
Publisher
產生一個數據流(可能包含無限數據), Subscriber 們可以根據它們的需要消費這些數據。
public interface Publisher{ public void subscribe(Subscriber super T> s);}
Subscriber
Publisher 創建的元素的接收者。監聽指定的事件,例如 OnNext, OnComplete, OnError 等。
publicinterface Subscriber{ public void onSubscribe(Subscription s); public void onNext(T t); public void onError(Throwable t); public void onComplete();}
Subscription
是 Publisher 和 Subscriber 一對一的協調對象。Subscriber 可以通過它來向 Publisher 取消數據發送或者 request 更多數據。
public interface Subscription { public void request(long n); public void cancel();}
Processor
同時具備 Publisher 和 Subscriber 特征。代碼1中 FluxProcessor 既可以發送數據(OnNext),也可以接收數據 (doOnNext)。
public interface Processorextends Subscriber , Publisher {}
Thread
Callback
Future
Reactive Extensions (Rx)
Coroutines
Reactive 的實現原理個人認為還是回調,kotlin 協程實現原理同樣也是回調。但實現回掉的方式不一樣。一個是通過事件傳播, 一個是通過狀態機。但 cooutine 編程的易用性明顯強于 Rx,后面有空我會專門寫篇文章介紹 kotlin coroutine 的實現原理。
有了 Reactive Stream 這個規范,就會有相應實現該規范的類庫。Reactor 就是其中之一。
Reactor 是遵守 Reactive Stream 規范構建非阻塞應用的 Java 語言 Reactive 類庫,已經在 spring 5 中集成,與他相似的類庫有 RxJava2, RxJs, JDK9 Flow 等。
阿里內部的 Faas 系統目前使用 Reactor 來構建整個系統,包括函數應用和各種核心應用(邏輯架構)。根據我們壓測結果顯示,使用 Reactive 方式構建的系統確實會有這些特點:
另外從原理上,我認為資源利用率和吞吐量也會高于非反應式的應用。
阿里內部的 Faas 系統主要做了兩件事情:
涉及到 IO 的地方幾乎全異步化。例如中間件(HSF, MetaQ 等提供異步 API)調用。
IO 線程模型變化。使用較少(一般 CPU 核數)線程處理所有的請求。
傳統 Java 應用 IO 線程模型
參考 Netty 中 Reactor IO (worker thread pool) 模型,下方偽代碼(kotlin)進行了簡化。
// 非阻塞讀取客戶端請求數據(in), 讀取成功后執行lambda.inChannel.read(in) { workerThreadPool.execute{ // 阻塞處理業務邏輯(process), 業務邏輯在worker線程池中執行,同步執行完后,再向客戶端返回輸出(out) val out = process(in) outChannel.write(out) } }
Reactive 應用 IO 線程模型
IO 線程也可以執行業務邏輯 (process),可以不需要 worker 線程池。
// 非阻塞讀取客戶端請求數據(in), 讀取成功后執行lambda inChannel.read(in) { // IO線程執行業務邏輯(process), 然后向客戶端返回輸出(out). 這要求業務處理流程必須是非阻塞的. process(in){ out-> outChannel.write(out) { // this lambda is executed when the writing completes ... } }}
以 Reactive 方式構建的系統有很多值得學習和發揮價值的地方,但坦白講 Reactive programing 方式目前接受程度并不高。特別是使用 Java 語言開發同學,我個人也感同身受,因為這和 Java 面向命令控制流程的編程思維方式有較大差異。所以這里以 Reactor (Java) 學習為例:
反應式的系統有很多優點,但是完整構建反應式的系統卻并不容易。不僅僅是語言上的差異,還有一些組件就不支持非阻塞式的調用方式,例如:JDBC。但是有一些開源組織正在推動這些技術進行革新,例如:R2DBC。另外,為了方便構建反應式系統,一些組織/個人適配了一些主流技術組件 reactor-core, reactor-netty, reactor-rabbimq, reactor-kafka 等,來方便完整構建反應式系統。
當你的系統從底層到上層,從系統內部到依賴外部都變成了反應式,這就形成了 Reactive 架構。
這種架構價值有多大?未來可期。
創新互聯面向全國提供域名注冊、虛擬主機、云服務器、服務器托管與租用,如需了解,請聯系QQ:171356849微信:zh18159893430 咨詢,謝謝!