PollerFlux<T,U> Class
- java.
lang. Object - reactor.
core. publisher. Flux - com.
azure. core. util. polling. PollerFlux<T,U>
- com.
- reactor.
Type Parameters
- T
The type of poll response value.
- U
The type of the final result of long-running operation.
public final class PollerFlux<T,U>
extends Flux<AsyncPollResponse<T,U>>
A Flux that simplifies the task of executing long-running operations against an Azure service. A subscription to PollerFlux<T,U> initiates a long-running operation and polls the status until it completes.
Code samples
Instantiating and subscribing to PollerFlux
LocalDateTime timeToReturnFinalResponse = LocalDateTime.now().plus(Duration.ofMillis(800));
// Create poller instance
PollerFlux<String, String> poller = new PollerFlux<>(Duration.ofMillis(100),
(context) -> Mono.empty(),
// Define your custom poll operation
(context) -> {
if (LocalDateTime.now().isBefore(timeToReturnFinalResponse)) {
System.out.println("Returning intermediate response.");
return Mono.just(new PollResponse<>(LongRunningOperationStatus.IN_PROGRESS,
"Operation in progress."));
} else {
System.out.println("Returning final response.");
return Mono.just(new PollResponse<>(LongRunningOperationStatus.SUCCESSFULLY_COMPLETED,
"Operation completed."));
}
},
(activationResponse, context) -> Mono.error(new RuntimeException("Cancellation is not supported")),
(context) -> Mono.just("Final Output"));
// Listen to poll responses
poller.subscribe(response -> {
// Process poll response
System.out.printf("Got response. Status: %s, Value: %s%n", response.getStatus(), response.getValue());
});
// Do something else
Asynchronously wait for polling to complete and then retrieve the final result
LocalDateTime timeToReturnFinalResponse = LocalDateTime.now().plus(Duration.ofMinutes(5));
// Create poller instance
PollerFlux<String, String> poller = new PollerFlux<>(Duration.ofMillis(100),
(context) -> Mono.empty(),
(context) -> {
if (LocalDateTime.now().isBefore(timeToReturnFinalResponse)) {
System.out.println("Returning intermediate response.");
return Mono.just(new PollResponse<>(LongRunningOperationStatus.IN_PROGRESS,
"Operation in progress."));
} else {
System.out.println("Returning final response.");
return Mono.just(new PollResponse<>(LongRunningOperationStatus.SUCCESSFULLY_COMPLETED,
"Operation completed."));
}
},
(activationResponse, context) -> Mono.just("FromServer:OperationIsCancelled"),
(context) -> Mono.just("FromServer:FinalOutput"));
poller.take(Duration.ofMinutes(30))
.last()
.flatMap(asyncPollResponse -> {
if (asyncPollResponse.getStatus() == LongRunningOperationStatus.SUCCESSFULLY_COMPLETED) {
// operation completed successfully, retrieving final result.
return asyncPollResponse
.getFinalResult();
} else {
return Mono.error(new RuntimeException("polling completed unsuccessfully with status:"
+ asyncPollResponse.getStatus()));
}
}).block();
Block for polling to complete and then retrieve the final result
AsyncPollResponse<String, String> terminalResponse = pollerFlux.blockLast();
System.out.printf("Polling complete. Final Status: %s", terminalResponse.getStatus());
if (terminalResponse.getStatus() == LongRunningOperationStatus.SUCCESSFULLY_COMPLETED) {
String finalResult = terminalResponse.getFinalResult().block();
System.out.printf("Polling complete. Final Status: %s", finalResult);
}
Asynchronously poll until poller receives matching status
final Predicate<AsyncPollResponse<String, String>> isComplete = response -> {
return response.getStatus() != LongRunningOperationStatus.IN_PROGRESS
&& response.getStatus() != LongRunningOperationStatus.NOT_STARTED;
};
pollerFlux
.takeUntil(isComplete)
.subscribe(completed -> {
System.out.println("Completed poll response, status: " + completed.getStatus());
});
Asynchronously cancel the long running operation
LocalDateTime timeToReturnFinalResponse = LocalDateTime.now().plus(Duration.ofMinutes(5));
// Create poller instance
PollerFlux<String, String> poller = new PollerFlux<>(Duration.ofMillis(100),
(context) -> Mono.empty(),
(context) -> {
if (LocalDateTime.now().isBefore(timeToReturnFinalResponse)) {
System.out.println("Returning intermediate response.");
return Mono.just(new PollResponse<>(LongRunningOperationStatus.IN_PROGRESS,
"Operation in progress."));
} else {
System.out.println("Returning final response.");
return Mono.just(new PollResponse<>(LongRunningOperationStatus.SUCCESSFULLY_COMPLETED,
"Operation completed."));
}
},
(activationResponse, context) -> Mono.just("FromServer:OperationIsCancelled"),
(context) -> Mono.just("FromServer:FinalOutput"));
// Asynchronously wait 30 minutes to complete the polling, if not completed
// within in the time then cancel the server operation.
poller.take(Duration.ofMinutes(30))
.last()
.flatMap(asyncPollResponse -> {
if (!asyncPollResponse.getStatus().isComplete()) {
return asyncPollResponse
.cancelOperation()
.then(Mono.error(new RuntimeException("Operation is cancelled!")));
} else {
return Mono.just(asyncPollResponse);
}
}).block();
Instantiating and subscribing to PollerFlux from a known polling strategy
// Create poller instance
PollerFlux<BinaryData, String> poller = PollerFlux.create(
Duration.ofMillis(100),
// pass in your custom activation operation
() -> Mono.just(new SimpleResponse<Void>(new HttpRequest(
HttpMethod.POST,
"http://httpbin.org"),
202,
new HttpHeaders().set("Operation-Location", "http://httpbin.org"),
null)),
new OperationResourcePollingStrategy<>(new HttpPipelineBuilder().build()),
TypeReference.createInstance(BinaryData.class),
TypeReference.createInstance(String.class));
// Listen to poll responses
poller.subscribe(response -> {
// Process poll response
System.out.printf("Got response. Status: %s, Value: %s%n", response.getStatus(), response.getValue());
});
// Do something else
Instantiating and subscribing to PollerFlux from a custom polling strategy
// Create custom polling strategy based on OperationResourcePollingStrategy
PollingStrategy<BinaryData, String> strategy = new OperationResourcePollingStrategy<BinaryData, String>(
new HttpPipelineBuilder().build()) {
// override any interface method to customize the polling behavior
@Override
public Mono<PollResponse<BinaryData>> poll(PollingContext<BinaryData> context,
TypeReference<BinaryData> pollResponseType) {
return Mono.just(new PollResponse<>(
LongRunningOperationStatus.SUCCESSFULLY_COMPLETED,
BinaryData.fromString("")));
}
};
// Create poller instance
PollerFlux<BinaryData, String> poller = PollerFlux.create(
Duration.ofMillis(100),
// pass in your custom activation operation
() -> Mono.just(new SimpleResponse<Void>(new HttpRequest(
HttpMethod.POST,
"http://httpbin.org"),
202,
new HttpHeaders().set("Operation-Location", "http://httpbin.org"),
null)),
strategy,
TypeReference.createInstance(BinaryData.class),
TypeReference.createInstance(String.class));
// Listen to poll responses
poller.subscribe(response -> {
// Process poll response
System.out.printf("Got response. Status: %s, Value: %s%n", response.getStatus(), response.getValue());
});
// Do something else
Constructor Summary
Method Summary
Methods inherited from java.lang.Object
Methods inherited from reactor.core.publisher.Flux
Constructor Details
PollerFlux
public PollerFlux(Duration pollInterval, Function
Creates PollerFlux.
Parameters:
Method Details
<T,U>create
public static PollerFlux
Creates PollerFlux.
This method differs from the PollerFlux constructor in that the constructor uses an activationOperation which returns a Mono that emits result, the create method uses an activationOperation which returns a Mono that emits PollResponse<T>. The PollResponse<T> holds the result. If the PollResponse<T> from the activationOperation indicate that long-running operation is completed then the pollOperation will not be called.
Parameters:
Returns:
<T,U>create
public static PollerFlux
Creates PollerFlux.
This method uses a PollingStrategy<T,U> to poll the status of a long-running operation after the activation operation is invoked. See PollingStrategy<T,U> for more details of known polling strategies and how to create a custom strategy.
Parameters:
Returns:
<T,U>error
public static PollerFlux
Creates a PollerFlux instance that returns an error on subscription.
Parameters:
Returns:
getPollInterval
public Duration getPollInterval()
Returns the current polling duration for this PollerFlux<T,U> instance.
Returns:
getSyncPoller
public SyncPoller
Gets a synchronous blocking poller.
Returns:
setPollInterval
public PollerFlux
Sets the poll interval for this poller. The new interval will be used for all subsequent polling operations including the subscriptions that are already in progress.
Parameters:
Returns:
subscribe
public void subscribe(CoreSubscriber> actual)
Overrides:
PollerFlux<T,U>.subscribe(CoreSubscriber<? super AsyncPollResponse<T,U>> actual)Parameters: