Azure SDK for Java 中的異步程序設計
本文說明 Azure SDK for Java 中的異步程序設計模型。
Azure SDK 一開始只包含與 Azure 服務互動的非封鎖、異步 API。 這些 API 可讓您使用 Azure SDK 來建置可調整的應用程式,以有效率地使用系統資源。 不過,適用於 Java 的 Azure SDK 也包含同步用戶端,以迎合更廣泛的對象,同時也可讓不熟悉異步程式設計的使用者使用我們的用戶端連結庫。 (請參閱 在 Azure SDK 設計指導方針中可 接近。因此,Azure SDK for Java 中的所有 Java 用戶端連結庫都提供異步和同步用戶端。 不過,我們建議針對生產系統使用異步用戶端,以最大化系統資源的使用。
回應式數據流
如果您查看 Java Azure SDK 設計指導方針中的 Async 服務用戶端一節,您會發現,我們的異步 API 會使用回應式類型,而不是使用 CompletableFuture
Java 8 所提供的 。 為什麼我們會選擇回應式類型,而不是 JDK 中原生可用的類型?
Java 8 引進了 Streams、Lambda 和 CompletableFuture 等功能。 這些功能提供許多功能,但有一些限制。
CompletableFuture
提供以回呼為基礎的非封鎖功能,以及 CompletionStage
允許輕鬆組合一系列異步操作的介面。 Lambda 可讓這些推送式 API 更容易閱讀。 數據流提供功能樣式的作業來處理數據元素的集合。 不過,數據流是同步的,無法重複使用。 CompletableFuture
可讓您提出單一要求、提供回呼的支援,以及預期單一回應。 不過,許多雲端服務都需要能夠串流數據 - 例如事件中樞。
回應式串流可藉由將來源的專案串流至訂閱者,來協助克服這些限制。 當訂閱者向來源要求數據時,來源會傳回任意數目的結果。 它不需要一次傳送全部。 每當來源有數據要傳送時,傳輸就會在一段時間內發生。
在此模型中,訂閱者會註冊事件處理程式,以在數據送達時處理數據。 這些推送式互動會透過不同的訊號通知訂閱者:
- 呼叫
onSubscribe()
表示數據傳輸即將開始。 - 呼叫
onError()
表示發生錯誤,也會標示數據傳輸的結尾。 - 呼叫
onComplete()
表示數據傳輸成功完成。
不同於 Java 資料流,回應式數據流會將錯誤視為一流的事件。 回應式數據流具有專用通道,可供來源將任何錯誤傳達給訂閱者。 此外,回應數據流可讓訂閱者交涉數據傳輸速率,將這些數據流轉換成推送提取模型。
回應 式數據流 規格提供數據傳輸方式的標準。 概括而言,規格會定義下列四個介面,並指定如何實作這些介面的規則。
- Publisher 是數據流的來源。
- 訂閱者是數據流的取用者 。
- 訂閱 會管理發行者與訂閱者之間的數據傳輸狀態。
- 處理器 同時是發行者和訂閱者。
有一些已知的 Java 連結庫提供此規格的實作,例如 RxJava、Akka Streams、Vert.x 和 Project Reactor。
適用於 Java 的 Azure SDK 採用 Project Reactor 來提供其異步 API。 推動這一決定的主要因素是提供與 Spring Webflux 的順暢整合,後者也使用 Project Reactor。 選擇 Project Reactor 而非 RxJava 的另一個貢獻因素是 Project Reactor 使用 Java 8,但當時的 RxJava 仍在 Java 7。 Project Reactor 也提供一組豐富的運算符,這些運算符是可撰寫的,可讓您撰寫宣告式程式代碼來建置數據處理管線。 Project Reactor 的另一件好事是,它具有將 Project Reactor 類型轉換為其他熱門實作類型的配接器。
比較同步和異步操作的 API
我們已討論異步用戶端的同步客戶端和選項。 下表摘要說明使用這些選項所設計 API 的外觀:
API 類型 | 沒有任何值 | 單一值 | 多個值 |
---|---|---|---|
標準 Java - 同步 API | void |
T |
Iterable<T> |
標準 Java - 異步 API | CompletableFuture<Void> |
CompletableFuture<T> |
CompletableFuture<List<T>> |
回應式串流介面 | Publisher<Void> |
Publisher<T> |
Publisher<T> |
反應式數據流的項目反應堆實作 | Mono<Void> |
Mono<T> |
Flux<T> |
為了完整性,值得一提的是,Java 9 引進 了包含四個響應式數據流介面的 Flow 類別。 不過,這個類別不包含任何實作。
在適用於 Java 的 Azure SDK 中使用異步 API
回應式數據流規格不會區分發行者類型。 在響應數據流規格中,發行者只會產生零個或多個數據元素。 在許多情況下,發行者在產生最多一個數據元素與產生零或多個數據元素之間有一個有用的區別。 在雲端式 API 中,此區別會指出要求是否傳回單一值回應或集合。 Project Reactor 提供兩種類型來區分 - Mono 和 Flux。 傳回 Mono
的 API 將包含最多一個值的回應,而傳回 Flux
的 API 將包含具有零或多個值的回應。
例如,假設您使用 ConfigurationAsyncClient 來擷取使用 Azure 應用程式組態 服務所儲存的組態。 (如需詳細資訊,請參閱什麼是 Azure 應用程式組態?)
如果您在用戶端上建立 ConfigurationAsyncClient
並呼叫 getConfigurationSetting()
,它會傳回 Mono
,表示回應包含單一值。 不過,單獨呼叫此方法並不會執行任何動作。 用戶端尚未向 Azure 應用程式組態 服務提出要求。 在這個階段, Mono<ConfigurationSetting>
此 API 所傳回的 只是數據處理管線的「元件」。 這表示取用數據所需的設定已完成。 若要實際觸發資料傳輸(也就是向服務提出要求並取得回應),您必須訂閱傳 Mono
回的 。 因此,處理這些反應式數據流時,您必須記住呼叫 subscribe()
,因為直到您這樣做為止,都不會發生任何動作。
下列範例示範如何訂閱 , Mono
並將組態值列印至控制台。
ConfigurationAsyncClient asyncClient = new ConfigurationClientBuilder()
.connectionString("<your connection string>")
.buildAsyncClient();
asyncClient.getConfigurationSetting("<your config key>", "<your config value>").subscribe(
config -> System.out.println("Config value: " + config.getValue()),
ex -> System.out.println("Error getting configuration: " + ex.getMessage()),
() -> System.out.println("Successfully retrieved configuration setting"));
System.out.println("Done");
請注意,在用戶端上呼叫 getConfigurationSetting()
之後,範例程式代碼會訂閱結果,並提供三個不同的 Lambda。 第一個 Lambda 會取用從服務接收的數據,此數據會在成功回應時觸發。 如果在擷取組態時發生錯誤,就會觸發第二個 Lambda。 當數據流完成時,會叫用第三個 Lambda,這表示此數據流不會再使用任何數據元素。
注意
如同所有異步程序設計,建立訂用帳戶之後,執行會如往常一樣繼續進行。 如果沒有任何專案可讓程式保持作用中並執行,它可能會在異步作業完成之前終止。 呼叫的主要線程subscribe()
不會等到您進行網路呼叫 Azure 應用程式組態 並接收響應為止。 在生產系統中,您可以繼續處理其他專案,但在此範例中,您可以呼叫 Thread.sleep()
或使用 CountDownLatch
來讓異步操作有機會完成,以新增一個小延遲。
如下列範例所示,傳回 Flux
的 API 也會遵循類似的模式。 差異在於,針對回應中的每個數據元素,會呼叫提供給 subscribe()
方法的第一個回呼多次。 錯誤或完成回呼只呼叫一次,並視為終端機訊號。 如果從發行者收到其中一個訊號,則不會叫用任何其他回呼。
EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
.connectionString("<your connection string>")
.consumerGroup("<your consumer group>")
.buildAsyncConsumerClient();
asyncClient.receive().subscribe(
event -> System.out.println("Sequence number of received event: " + event.getData().getSequenceNumber()),
ex -> System.out.println("Error receiving events: " + ex.getMessage()),
() -> System.out.println("Successfully completed receiving all events"));
背壓
當來源以比訂閱者可處理更快的速率產生數據時,會發生什麼事? 訂閱者可能會因為數據而不知所措,這可能會導致記憶體不足的錯誤。 訂閱者需要一種方式來與發行者通訊,以在無法跟上時變慢速度。 根據預設,當您呼叫 subscribe()
如上述範例所示的 時 Flux
,訂閱者會要求未系結的數據流,指出發行者要儘快傳送數據。 此行為不一定是理想的行為,訂閱者可能必須透過「反壓」來控制發佈速率。 Backpressure 可讓訂閱者控制數據流元素。 訂閱者會要求可處理的數據元素數目有限。 訂閱者完成處理這些項目之後,訂閱者可以要求更多專案。 藉由使用backpressure,您可以將推送模型轉換成推送提取模型。
下列範例示範如何控制事件中樞取用者接收事件的速率:
EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
.connectionString("<your connection string>")
.consumerGroup("<your consumer group>")
.buildAsyncConsumerClient();
asyncClient.receive().subscribe(new Subscriber<PartitionEvent>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
this.subscription.request(1); // request 1 data element to begin with
}
@Override
public void onNext(PartitionEvent partitionEvent) {
System.out.println("Sequence number of received event: " + partitionEvent.getData().getSequenceNumber());
this.subscription.request(1); // request another event when the subscriber is ready
}
@Override
public void onError(Throwable throwable) {
System.out.println("Error receiving events: " + throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("Successfully completed receiving all events")
}
});
當訂閱者第一次「連接到」發行者時,發行者會交 Subscription
出實例,以管理數據傳輸的狀態。 這是 Subscription
訂閱者可以透過 request()
呼叫 來指定可以處理多少數據元素來套用backpressure的媒體。
例如,如果訂閱者每次呼叫 onNext()
時要求多個數據元素, request(10)
則發行者會在有可用的專案或可用時立即傳送接下來的10個專案。 這些元素會累積在訂閱者端的緩衝區中,而且由於每個 onNext()
呼叫會要求 10 個以上,待處理專案會持續成長,直到發行者沒有要傳送的數據元素,或訂閱者的緩衝區溢位,而導致記憶體不足的錯誤。
取消訂用帳戶
訂閱會管理發行者與訂閱者之間的數據傳輸狀態。 訂閱在發行者完成傳送所有數據給訂閱者或訂閱者不再有興趣接收數據之前,才會使用訂閱。 有幾種方式可以取消訂用帳戶,如下所示。
下列範例會藉由處置訂閱者來取消訂閱:
EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
.connectionString("<your connection string>")
.consumerGroup("<your consumer group>")
.buildAsyncConsumerClient();
Disposable disposable = asyncClient.receive().subscribe(
partitionEvent -> {
Long num = partitionEvent.getData().getSequenceNumber()
System.out.println("Sequence number of received event: " + num);
},
ex -> System.out.println("Error receiving events: " + ex.getMessage()),
() -> System.out.println("Successfully completed receiving all events"));
// much later on in your code, when you are ready to cancel the subscription,
// you can call the dispose method, as such:
disposable.dispose();
下列範例會在 上Subscription
呼叫 cancel()
方法,以取消訂用帳戶:
EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
.connectionString("<your connection string>")
.consumerGroup("<your consumer group>")
.buildAsyncConsumerClient();
asyncClient.receive().subscribe(new Subscriber<PartitionEvent>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
this.subscription.request(1); // request 1 data element to begin with
}
@Override
public void onNext(PartitionEvent partitionEvent) {
System.out.println("Sequence number of received event: " + partitionEvent.getData().getSequenceNumber());
this.subscription.cancel(); // Cancels the subscription. No further event is received.
}
@Override
public void onError(Throwable throwable) {
System.out.println("Error receiving events: " + throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("Successfully completed receiving all events")
}
});
推論
線程是昂貴的資源,您不應該浪費在等候遠端服務呼叫的回應時浪費。 隨著微服務架構的採用增加,有效率地調整和使用資源的需求變得至關重要。 當有網路系結作業時,會優先使用異步 API。 適用於 Java 的 Azure SDK 提供一組豐富的異步作業 API,以協助將系統資源最大化。 我們強烈建議您試用我們的異步用戶端。
如需最適合您特定工作的運算符詳細資訊,請參閱 Reactor 3 參考指南中的我需要哪一個運算符?
下一步
既然您已進一步瞭解各種異步程序設計概念,請務必瞭解如何逐一查看結果。 如需最佳反覆專案策略的詳細資訊,以及分頁運作方式的詳細數據,請參閱 適用於 Java 的 Azure SDK 中的分頁和反覆專案。