Programmazione asincrona in Azure SDK per Java
Questo articolo descrive il modello di programmazione asincrona in Azure SDK per Java.
Azure SDK contiene inizialmente solo API asincrone non bloccanti per l'interazione con i servizi di Azure. Queste API consentono di usare Azure SDK per creare applicazioni scalabili che usano le risorse di sistema in modo efficiente. Tuttavia, Azure SDK per Java contiene anche client sincroni per soddisfare un pubblico più ampio e rendere anche le librerie client raggiungibili per gli utenti che non hanno familiarità con la programmazione asincrona. (Vedere È possibile adottare le linee guida per la progettazione di Azure SDK. Di conseguenza, tutte le librerie client Java in Azure SDK per Java offrono client asincroni e sincroni. È tuttavia consigliabile usare i client asincroni per i sistemi di produzione per ottimizzare l'uso delle risorse di sistema.
Flussi reattivi
Se si esamina la sezione Client del servizio asincrona nelle linee guida per la progettazione di Java Azure SDK, si noterà che, anziché usare CompletableFuture
java 8, le API asincrone usano tipi reattivi. Perché abbiamo scelto tipi reattivi rispetto ai tipi disponibili in modo nativo in JDK?
Java 8 ha introdotto funzionalità come Flussi, lambda e CompletableFuture. Queste funzionalità offrono molte funzionalità, ma presentano alcune limitazioni.
CompletableFuture
fornisce funzionalità basate su callback, non bloccante e l'interfaccia CompletionStage
consentita per una facile composizione di una serie di operazioni asincrone. Le espressioni lambda rendono queste API basate su push più leggibili. Flussi fornire operazioni di tipo funzionale per gestire una raccolta di elementi dati. Tuttavia, i flussi sono sincroni e non possono essere riutilizzati. CompletableFuture
consente di effettuare una singola richiesta, di fornire supporto per un callback e di prevedere una singola risposta. Tuttavia, molti servizi cloud richiedono la possibilità di trasmettere i dati, ad esempio Hub eventi.
I flussi reattivi possono aiutare a superare queste limitazioni trasmettendo elementi da un'origine a un sottoscrittore. Quando un sottoscrittore richiede dati da un'origine, l'origine invia un numero qualsiasi di risultati. Non è necessario inviarli tutti contemporaneamente. Il trasferimento avviene in un periodo di tempo, ogni volta che l'origine contiene dati da inviare.
In questo modello il sottoscrittore registra i gestori eventi per elaborare i dati all'arrivo. Queste interazioni basate su push notificano al sottoscrittore tramite segnali distinti:
- Una
onSubscribe()
chiamata indica che il trasferimento dei dati sta per iniziare. - Una
onError()
chiamata indica che si è verificato un errore, che contrassegna anche la fine del trasferimento dei dati. - Una
onComplete()
chiamata indica il completamento corretto del trasferimento dei dati.
A differenza di Java Flussi, i flussi reattivi considerano gli errori come eventi di prima classe. I flussi reattivi hanno un canale dedicato per l'origine per comunicare eventuali errori al sottoscrittore. Inoltre, i flussi reattivi consentono al sottoscrittore di negoziare la velocità di trasferimento dei dati per trasformare questi flussi in un modello push-pull.
La specifica di Flussi reattiva fornisce uno standard per la modalità di trasferimento dei dati. A livello generale, la specifica definisce le quattro interfacce seguenti e specifica le regole su come implementare queste interfacce.
- Publisher è l'origine di un flusso di dati.
- Il Sottoscrittore è il consumer di un flusso di dati.
- La sottoscrizione gestisce lo stato del trasferimento dei dati tra un server di pubblicazione e un sottoscrittore.
- Il processore è sia un server di pubblicazione che un sottoscrittore.
Esistono alcune librerie Java note che forniscono implementazioni di questa specifica, ad esempio RxJava, Akka Flussi, Vert.x e Project Reactor.
Azure SDK per Java ha adottato Project Reactor per offrire le API asincrone. Il fattore principale che guida questa decisione è stato quello di fornire un'integrazione senza problemi con Spring Webflux, che utilizza anche Project Reactor. Un altro fattore che contribuisce a scegliere Project Reactor su RxJava era che Project Reactor usa Java 8 ma RxJava, al momento, era ancora in Java 7. Project Reactor offre anche un set completo di operatori componibili e che consentono di scrivere codice dichiarativo per la compilazione di pipeline di elaborazione dati. Un altro aspetto interessante di Project Reactor è che dispone di adattatori per la conversione dei tipi Project Reactor in altri tipi di implementazione comuni.
Confronto di API di operazioni sincrone e asincrone
Sono stati illustrati i client e le opzioni sincroni per i client asincroni. La tabella seguente riepiloga l'aspetto delle API progettate usando queste opzioni:
Tipo di API | Nessun valore | Valore singolo | Più valori |
---|---|---|---|
Java standard - API sincrone | void |
T |
Iterable<T> |
Java standard - API asincrone | CompletableFuture<Void> |
CompletableFuture<T> |
CompletableFuture<List<T>> |
Interfacce Flussi reattive | Publisher<Void> |
Publisher<T> |
Publisher<T> |
Implementazione del reattore di progetto di Flussi reattiva | Mono<Void> |
Mono<T> |
Flux<T> |
Per motivi di completezza, vale la pena ricordare che Java 9 ha introdotto la classe Flow che include le quattro interfacce di flussi reattivi. Tuttavia, questa classe non include alcuna implementazione.
Usare le API asincrone in Azure SDK per Java
La specifica dei flussi reattivi non distingue tra i tipi di editori. Nella specifica dei flussi reattivi, gli editori producono semplicemente zero o più elementi di dati. In molti casi, esiste una distinzione utile tra un editore che produce al massimo un elemento dati rispetto a uno che produce zero o più. Nelle API basate sul cloud questa distinzione indica se una richiesta restituisce una risposta a valore singolo o una raccolta. Project Reactor fornisce due tipi per fare questa distinzione: Mono e Flux. Un'API che restituisce un Mono
oggetto conterrà una risposta con al massimo un valore e un'API che restituisce un Flux
conterrà una risposta con zero o più valori.
Si supponga, ad esempio, di usare configurationAsyncClient per recuperare una configurazione archiviata usando il servizio di configurazione app Azure. Per altre informazioni, vedere Che cos'è app Azure Configuration?).
Se si crea un oggetto ConfigurationAsyncClient
e si chiama getConfigurationSetting()
sul client, viene restituito un Mono
oggetto , che indica che la risposta contiene un singolo valore. Tuttavia, la chiamata a questo metodo da sola non esegue alcuna operazione. Il client non ha ancora effettuato una richiesta al servizio di configurazione app Azure. In questa fase, l'oggetto Mono<ConfigurationSetting>
restituito da questa API è semplicemente un "assembly" della pipeline di elaborazione dati. Ciò significa che la configurazione necessaria per l'utilizzo dei dati è completa. Per attivare effettivamente il trasferimento dei dati, ovvero per effettuare la richiesta al servizio e ottenere la risposta, è necessario sottoscrivere l'oggetto restituito Mono
. Quindi, quando si gestiscono questi flussi reattivi, è necessario ricordare di chiamare subscribe()
perché nulla accade fino a quando non si esegue questa operazione.
Nell'esempio seguente viene illustrato come sottoscrivere Mono
e stampare il valore di configurazione nella console.
ConfigurationAsyncClient asyncClient = new ConfigurationClientBuilder()
.connectionString("<your connection string>")
.buildAsyncClient();
asyncClient.getConfigurationSetting("<your config key>", "<your config value>").subscribe(
config -> System.out.println("Config value: " + config.getValue()),
ex -> System.out.println("Error getting configuration: " + ex.getMessage()),
() -> System.out.println("Successfully retrieved configuration setting"));
System.out.println("Done");
Si noti che dopo aver chiamato getConfigurationSetting()
sul client, il codice di esempio sottoscrive il risultato e fornisce tre espressioni lambda separate. La prima espressione lambda utilizza i dati ricevuti dal servizio, che viene attivato in caso di esito positivo della risposta. La seconda espressione lambda viene attivata se si verifica un errore durante il recupero della configurazione. La terza espressione lambda viene richiamata al termine del flusso di dati, ovvero non sono previsti altri elementi dati da questo flusso.
Nota
Come per tutta la programmazione asincrona, dopo la creazione della sottoscrizione, l'esecuzione procede come di consueto. Se non c'è nulla da mantenere attivo e in esecuzione il programma, può terminare prima del completamento dell'operazione asincrona. Il thread principale chiamato subscribe()
non attenderà finché non si effettua la chiamata di rete a app Azure Configurazione e si riceve una risposta. Nei sistemi di produzione, è possibile continuare a elaborare qualcos'altro, ma in questo esempio è possibile aggiungere un piccolo ritardo chiamando Thread.sleep()
o usando un CountDownLatch
per consentire il completamento dell'operazione asincrona.
Come illustrato nell'esempio seguente, le API che restituiscono un Flux
oggetto seguono anche un modello simile. La differenza è che il primo callback fornito al subscribe()
metodo viene chiamato più volte per ogni elemento dati nella risposta. L'errore o i callback di completamento vengono chiamati esattamente una volta e vengono considerati come segnali del terminale. Nessun altro callback viene richiamato se uno di questi segnali viene ricevuto dal server di pubblicazione.
EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
.connectionString("<your connection string>")
.consumerGroup("<your consumer group>")
.buildAsyncConsumerClient();
asyncClient.receive().subscribe(
event -> System.out.println("Sequence number of received event: " + event.getData().getSequenceNumber()),
ex -> System.out.println("Error receiving events: " + ex.getMessage()),
() -> System.out.println("Successfully completed receiving all events"));
Contropressione
Cosa accade quando l'origine produce i dati a una velocità superiore a quella che il sottoscrittore può gestire? Il sottoscrittore può essere sovraccaricato con i dati, che possono causare errori di memoria insufficiente. Il sottoscrittore deve essere in grado di comunicare di nuovo con il server di pubblicazione per rallentare quando non può rimanere aggiornato. Per impostazione predefinita, quando si chiama subscribe()
su un oggetto Flux
come illustrato nell'esempio precedente, il sottoscrittore richiede un flusso non associato di dati, che indica al server di pubblicazione di inviare i dati il più rapidamente possibile. Questo comportamento non è sempre auspicabile e il sottoscrittore potrebbe dover controllare la frequenza di pubblicazione tramite "backpressure". La backpressura consente al sottoscrittore di assumere il controllo del flusso degli elementi di dati. Un sottoscrittore richiederà un numero limitato di elementi dati che possono gestire. Dopo che il sottoscrittore ha completato l'elaborazione di questi elementi, il sottoscrittore può richiedere altro. Usando la backpressure, è possibile trasformare un modello push per il trasferimento dei dati in un modello push-pull.
L'esempio seguente illustra come controllare la frequenza con cui gli eventi vengono ricevuti dal consumer di Hub eventi:
EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
.connectionString("<your connection string>")
.consumerGroup("<your consumer group>")
.buildAsyncConsumerClient();
asyncClient.receive().subscribe(new Subscriber<PartitionEvent>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
this.subscription.request(1); // request 1 data element to begin with
}
@Override
public void onNext(PartitionEvent partitionEvent) {
System.out.println("Sequence number of received event: " + partitionEvent.getData().getSequenceNumber());
this.subscription.request(1); // request another event when the subscriber is ready
}
@Override
public void onError(Throwable throwable) {
System.out.println("Error receiving events: " + throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("Successfully completed receiving all events")
}
});
Quando il sottoscrittore si connette per la prima volta al server di pubblicazione, il server di pubblicazione consegna un'istanza del sottoscrittore Subscription
, che gestisce lo stato del trasferimento dei dati. Si tratta Subscription
del supporto attraverso il quale il sottoscrittore può applicare la pressione rovesciata chiamando request()
per specificare il numero di elementi dati che può gestire.
Se il sottoscrittore richiede più di un elemento dati ogni volta che chiama onNext()
, request(10)
ad esempio, il server di pubblicazione invierà immediatamente i 10 elementi successivi se sono disponibili o quando diventano disponibili. Questi elementi si accumulano in un buffer alla fine del sottoscrittore e, poiché ogni onNext()
chiamata richiederà più 10, il backlog continua a crescere fino a quando il server di pubblicazione non ha più elementi di dati da inviare o l'overflow del buffer del sottoscrittore, causando errori di memoria insufficiente.
Annullare una sottoscrizione
Una sottoscrizione gestisce lo stato del trasferimento dei dati tra un server di pubblicazione e un sottoscrittore. La sottoscrizione è attiva fino a quando il server di pubblicazione non ha completato il trasferimento di tutti i dati al sottoscrittore o il sottoscrittore non è più interessato a ricevere dati. Esistono due modi per annullare una sottoscrizione, come illustrato di seguito.
Nell'esempio seguente viene annullata la sottoscrizione eliminando il sottoscrittore:
EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
.connectionString("<your connection string>")
.consumerGroup("<your consumer group>")
.buildAsyncConsumerClient();
Disposable disposable = asyncClient.receive().subscribe(
partitionEvent -> {
Long num = partitionEvent.getData().getSequenceNumber()
System.out.println("Sequence number of received event: " + num);
},
ex -> System.out.println("Error receiving events: " + ex.getMessage()),
() -> System.out.println("Successfully completed receiving all events"));
// much later on in your code, when you are ready to cancel the subscription,
// you can call the dispose method, as such:
disposable.dispose();
Nell'esempio seguente viene annullata la sottoscrizione chiamando il cancel()
metodo in Subscription
:
EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
.connectionString("<your connection string>")
.consumerGroup("<your consumer group>")
.buildAsyncConsumerClient();
asyncClient.receive().subscribe(new Subscriber<PartitionEvent>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
this.subscription.request(1); // request 1 data element to begin with
}
@Override
public void onNext(PartitionEvent partitionEvent) {
System.out.println("Sequence number of received event: " + partitionEvent.getData().getSequenceNumber());
this.subscription.cancel(); // Cancels the subscription. No further event is received.
}
@Override
public void onError(Throwable throwable) {
System.out.println("Error receiving events: " + throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("Successfully completed receiving all events")
}
});
Conclusione
I thread sono risorse costose che non è consigliabile sprecare in attesa di risposte dalle chiamate al servizio remoto. Man mano che l'adozione di architetture di microservizi aumenta, la necessità di ridimensionare e usare le risorse diventa essenziale in modo efficiente. Le API asincrone sono favorevoli quando sono presenti operazioni associate alla rete. Azure SDK per Java offre un set completo di API per le operazioni asincrone per ottimizzare le risorse di sistema. È consigliabile provare i clienti asincroni.
Per altre informazioni sugli operatori più adatti alle proprie attività specifiche, vedere Quale operatore è necessario? nella Guida di riferimento di Reactor 3.
Passaggi successivi
Dopo aver compreso meglio i vari concetti di programmazione asincrona, è importante imparare a scorrere i risultati. Per altre informazioni sulle strategie di iterazione migliori e sui dettagli sul funzionamento della paginazione, vedere Paginazione e iterazione in Azure SDK per Java.