API de streaming Orleans
Les applications interagissent avec les flux via des API très similaires aux extensions réactives (Rx) dans .NET bien connues. La principale différence est que les extensions de flux Orleans sont asynchrones pour rendre le traitement plus efficace dans la structure de calcul distribuée et évolutive d’Orleans.
Flux asynchrone
Une application commence par utiliser un fournisseur de flux pour obtenir un handle vers un flux. Vous pouvez en apprendre plus sur les fournisseurs de flux ici, mais pour l’instant, vous pouvez les considérer comme une fabrique de flux permettant aux implémenteurs de personnaliser le comportement et la sémantique des flux :
IStreamProvider streamProvider = base.GetStreamProvider("SimpleStreamProvider");
StreamId streamId = StreamId.Create("MyStreamNamespace", Guid);
IAsyncStream<T> stream = streamProvider.GetStream<T>(streamId);
IStreamProvider streamProvider = base.GetStreamProvider("SimpleStreamProvider");
IAsyncStream<T> stream = streamProvider.GetStream<T>(Guid, "MyStreamNamespace");
Une application peut obtenir une référence au fournisseur de flux en appelant la méthode Grain.GetStreamProvider à l’intérieur d’un grain ou en appelant la méthode GrainClient.GetStreamProvider sur le client.
Orleans.Streams.IAsyncStream<T> est un handle logique et fortement typé dans un flux virtuel. Cela ressemble, dans l’esprit, à la référence des grains Orleans. Les appels à GetStreamProvider
et GetStream
sont purement locaux. Les arguments pour GetStream
sont un GUID et une chaîne supplémentaire que nous appelons un espace de noms de flux (qui peut être null). Ensemble, le GUID et la chaîne d’espace de noms composent l’identité de flux (similaire, dans l’esprit, aux arguments pour IGrainFactory.GetGrain). La combinaison du GUID et de la chaîne d’espace de noms offre une flexibilité supplémentaire pour déterminer les identités de flux. Tout comme le grain 7 peut exister dans le type de grain PlayerGrain
et un autre grain 7 peut exister dans le type de grain ChatRoomGrain
, le flux 123 peut exister avec l’espace de noms de flux PlayerEventsStream
et un autre flux 123 peut exister dans l’espace de noms de flux ChatRoomMessagesStream
.
Production et consommation
IAsyncStream<T> implémente les interfaces IAsyncObserver<T> et IAsyncObservable<T>. De cette façon, une application peut utiliser le flux pour produire de nouveaux événements dans le flux en utilisant Orleans.Streams.IAsyncObserver<T>
ou pour s’abonner à des événements et les consommer à partir d’un flux en utilisant Orleans.Streams.IAsyncObservable<T>
.
public interface IAsyncObserver<in T>
{
Task OnNextAsync(T item, StreamSequenceToken token = null);
Task OnCompletedAsync();
Task OnErrorAsync(Exception ex);
}
public interface IAsyncObservable<T>
{
Task<StreamSubscriptionHandle<T>> SubscribeAsync(IAsyncObserver<T> observer);
}
Pour produire des événements dans le flux, une application appelle simplement
await stream.OnNextAsync<T>(event)
Pour s’abonner à un flux, une application appelle
StreamSubscriptionHandle<T> subscriptionHandle = await stream.SubscribeAsync(IAsyncObserver)
L’argument pour SubscribeAsync peut être un objet qui implémente l’interface IAsyncObserver<T> ou une combinaison de fonctions lambda pour traiter les événements entrants. D’autres options pour SubscribeAsync
sont disponibles via la classe AsyncObservableExtensions. SubscribeAsync
retourne StreamSubscriptionHandle<T>, qui est un handle opaque qui peut être utilisé pour se désabonner du flux (similaire, dans l’esprit, à une version asynchrone de IDisposable).
await subscriptionHandle.UnsubscribeAsync()
Il est important de noter que l’abonnement est destiné à un grain, et non pas à l’activation. Une fois que le code de grain est abonné au flux, cet abonnement dépasse la durée de vie de l’activation et reste valide indéfiniment, jusqu’à ce que le code de grain (éventuellement dans le cadre d’une autre activation) se désabonne explicitement. Il s’agit du cœur de l’abstraction d’un flux virtuel : non seulement tous les flux existent toujours, logiquement, mais un abonnement de flux est également durable et vit au-delà de l’activation physique particulière qui a créé l’abonnement.
Multiplicité
Un flux Orleans peut avoir plusieurs producteurs et plusieurs consommateurs. Un message publié par un producteur sera remis à tous les consommateurs qui étaient abonnés au flux avant la publication du message.
En outre, le consommateur peut s’abonner au même flux plusieurs fois. Chaque fois qu’il s’abonne, il récupère un StreamSubscriptionHandle<T> unique. Si un grain (ou un client) est abonné X fois au même flux, il reçoit le même événement X fois, une fois pour chaque abonnement. Le consommateur peut également se désabonner d’un abonnement individuel. Il peut rechercher tous ses abonnements actuels en appelant :
IList<StreamSubscriptionHandle<T>> allMyHandles =
await IAsyncStream<T>.GetAllSubscriptionHandles();
Récupération après échec
Si le producteur d’un flux meurt (ou si son grain est désactivé), il n’a rien à faire. La prochaine fois que ce grain voudra produire d’autres événements, il pourra obtenir à nouveau le handle de flux et produire de nouveaux événements de la même façon.
La logique des consommateurs est un peu plus impliquée. Comme nous l’avons dit précédemment, une fois qu’un grain de consommateur est abonné à un flux, cet abonnement est valide jusqu’à ce que le grain se désabonne explicitement. Si le consommateur du flux meurt (ou que son grain est désactivé) et qu’un nouvel événement est généré sur le flux, le grain de consommateur est automatiquement réactivé (tout comme un grain Orleans standard est automatiquement activé quand un message lui est envoyé). La seule chose que le code de grain doit faire à présent est de fournir un IAsyncObserver<T> pour traiter les données. Le consommateur doit rattacher la logique de traitement dans le cadre de la méthode OnActivateAsync(). Pour ce faire, il peut appeler :
StreamSubscriptionHandle<int> newHandle =
await subscriptionHandle.ResumeAsync(IAsyncObserver);
Le consommateur utilise le handle précédent obtenu lorsqu’il s’est abonné pour la première fois à « reprendre le traitement ». Notez que ResumeAsync met simplement à jour un abonnement existant avec la nouvelle instance de la logique IAsyncObserver
et ne change pas le fait que ce consommateur est déjà abonné à ce flux.
Comment le consommateur obtient-il un ancien subscriptionHandle
? Il existe 2 options. Le consommateur a peut-être rendu persistant le handle qui lui a été donné à partir de l’opération SubscribeAsync
d’origine et peut l’utiliser maintenant. Sinon, si le consommateur n’a pas le handle, il peut demander à IAsyncStream<T>
tous ses handles d’abonnement actifs, en appelant :
IList<StreamSubscriptionHandle<T>> allMyHandles =
await IAsyncStream<T>.GetAllSubscriptionHandles();
Le consommateur peut désormais tous les reprendre ou se désabonner de certains s’il le souhaite.
Conseil
Si le grain de consommateur implémente directement l’interface IAsyncObserver<T> (public class MyGrain<T> : Grain, IAsyncObserver<T>
), en théorie, il ne doit pas être tenu de rattacher IAsyncObserver
et n’a donc pas besoin d’appeler ResumeAsync
. Le runtime de streaming doit être en mesure de déterminer automatiquement que le grain implémente déjà IAsyncObserver
et appellera simplement ces méthodes IAsyncObserver
. Toutefois, le runtime de streaming ne prend actuellement pas cela en charge et le code de grain doit toujours appeler explicitement ResumeAsync
, même si le grain implémente directement IAsyncObserver
.
Abonnements explicites et implicites
Par défaut, un consommateur de flux doit s’abonner explicitement au flux. Cet abonnement est généralement déclenché par un message externe que le grain (ou le client) reçoit qui lui demande de s’abonner. Par exemple, dans un service de conversation, quand un utilisateur rejoint une salle de conversation, son grain reçoit un message JoinChatGroup
avec le nom de la conversation, ce qui entraîne l’abonnement du grain d’utilisateur à ce flux de conversation.
En outre, les flux Orleans prennent également en charge les abonnements implicites. Dans ce modèle, le grain ne s’abonne pas explicitement au flux. Ce grain est abonné automatiquement, implicitement, simplement en fonction de son identité de grain et d’un ImplicitStreamSubscriptionAttribute. La valeur principale des abonnements implicites permet à l’activité de flux de déclencher automatiquement l’activation du grain (ce qui déclenche l’abonnement). Par exemple, en utilisant des flux SMS, si un grain souhaite produire un flux et un autre grain traiter ce flux, le producteur doit connaître l’identité du grain de consommateur et effectuer un appel de grain pour lui demander de s’abonner au flux. Après cela seulement, il pourra commencer à envoyer des événements. Au lieu de cela, en utilisant les abonnements implicites, le producteur peut simplement commencer à produire des événements dans un flux pour que le grain de consommateur soit automatiquement activé et s’abonne au flux. Dans ce cas, le producteur ne se soucie pas du tout de celui qui lit les événements.
L’implémentation de grain MyGrainType
peut déclarer un attribut [ImplicitStreamSubscription("MyStreamNamespace")]
. Cela indique au runtime de streaming que quand un événement est généré sur un flux dont l’identité correspond au GUID XXX et à l’espace de noms "MyStreamNamespace"
, il doit être remis au grain dont l’identité est XXX de type MyGrainType
. Autrement dit, le runtime mappe le flux <XXX, MyStreamNamespace>
au grain de consommateur <XXX, MyGrainType>
.
En raison de la présence de ImplicitStreamSubscription
, le runtime de streaming abonne automatiquement ce grain à un flux et remet à ce dernier les événements de flux. Toutefois, le code de grain doit toujours indiquer au runtime comment les événements doivent être traités. Essentiellement, il doit attacher IAsyncObserver
. Par conséquent, quand le grain est activé, le code de grain figurant dans OnActivateAsync
doit appeler :
IStreamProvider streamProvider =
base.GetStreamProvider("SimpleStreamProvider");
StreamId streamId =
StreamId.Create("MyStreamNamespace", this.GetPrimaryKey());
IAsyncStream<T> stream =
streamProvider.GetStream<T>(streamId);
StreamSubscriptionHandle<T> subscription =
await stream.SubscribeAsync(IAsyncObserver<T>);
IStreamProvider streamProvider =
base.GetStreamProvider("SimpleStreamProvider");
IAsyncStream<T> stream =
streamProvider.GetStream<T>(this.GetPrimaryKey(), "MyStreamNamespace");
StreamSubscriptionHandle<T> subscription =
await stream.SubscribeAsync(IAsyncObserver<T>);
Écriture de la logique d’abonnement
Voici les instructions sur la façon d’écrire la logique d’abonnement pour différents cas : abonnements explicites ou implicites, flux rembobinables ou non rembobinables. La principale différence entre les abonnements explicites et implicites est que pour ces derniers, le grain a toujours exactement un abonnement implicite pour chaque espace de noms de flux. Il est impossible de créer plusieurs abonnements (aucune multiplicité d’abonnement), il est impossible de se désabonner et la logique de grain doit toujours uniquement attacher la logique de traitement. Cela signifie également que, pour les abonnements implicites, il n’est jamais nécessaire de reprendre un abonnement. En revanche, pour les abonnements explicites, il est nécessaire de reprendre l’abonnement. Dans le cas contraire, si le grain s’abonne à nouveau, le grain sera abonné plusieurs fois.
Abonnements implicites :
Pour les abonnements implicites, le grain doit toujours s’abonner pour attacher la logique de traitement. Cela peut être fait dans le grain consommateur en implémentant les interfaces IStreamSubscriptionObserver
et IAsyncObserver<T>
, permettant ainsi au grain de s’activer indépendamment de l’abonnement. Pour s’abonner au flux, le grain crée un handle et appelle await handle.ResumeAsync(this)
dans sa méthode OnSubscribed(...)
.
Pour traiter les messages, la méthode IAsyncObserver<T>.OnNextAsync(...)
est implémentée pour recevoir les données du flux et un jeton de séquence. Alternativement, la méthode ResumeAsync
peut prendre un ensemble de délégués représentant les méthodes de l’interface IAsyncObserver<T>
, onNextAsync
, onErrorAsync
, et onCompletedAsync
.
public Task OnNextAsync(string item, StreamSequenceToken? token = null)
{
_logger.LogInformation($"Received an item from the stream: {item}");
}
public async Task OnSubscribed(IStreamSubscriptionHandleFactory handleFactory)
{
var handle = handleFactory.Create<string>();
await handle.ResumeAsync(this);
}
public override async Task OnActivateAsync()
{
var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
var stream =
streamProvider.GetStream<string>(
this.GetPrimaryKey(), "MyStreamNamespace");
await stream.SubscribeAsync(OnNextAsync);
}
Abonnements explicites :
Pour les abonnements explicites, un grain doit appeler SubscribeAsync
pour s’abonner au flux. Cela crée un abonnement et attache la logique de traitement. L’abonnement explicite existe jusqu’à ce que le grain se désabonne. Par conséquent, si un grain est désactivé et réactivé, le grain est toujours abonné explicitement, mais aucune logique de traitement n’est attachée. Dans ce cas, le grain doit rattacher la logique de traitement. Pour ce faire, dans son OnActivateAsync
, le grain doit d’abord déterminer quels abonnements il possède, en appelant IAsyncStream<T>.GetAllSubscriptionHandles(). Le grain doit exécuter ResumeAsync
sur chaque handle qui doivent poursuivre le traitement, ou UnsubscribeAsync sur tous les handles dont il n’a plus besoin. Le grain peut aussi éventuellement spécifier StreamSequenceToken
en tant qu’argument pour les appels ResumeAsync
, ce qui amènera cet abonnement explicite à commencer à consommer à partir de ce jeton.
public async override Task OnActivateAsync(CancellationToken cancellationToken)
{
var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
var streamId = StreamId.Create("MyStreamNamespace", this.GetPrimaryKey());
var stream = streamProvider.GetStream<string>(streamId);
var subscriptionHandles = await stream.GetAllSubscriptionHandles();
foreach (var handle in subscriptionHandles)
{
await handle.ResumeAsync(this);
}
}
public async override Task OnActivateAsync()
{
var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
var stream =
streamProvider.GetStream<string>(this.GetPrimaryKey(), "MyStreamNamespace");
var subscriptionHandles = await stream.GetAllSubscriptionHandles();
if (!subscriptionHandles.IsNullOrEmpty())
{
subscriptionHandles.ForEach(
async x => await x.ResumeAsync(OnNextAsync));
}
}
Ordre des flux et jetons de séquence
L’ordre de livraison des événements entre un producteur individuel et un consommateur individuel dépend du fournisseur de flux.
Avec SMS, le producteur contrôle explicitement l’ordre des événements vus par le consommateur en contrôlant la façon de les publier. Par défaut (si l’option SimpleMessageStreamProviderOptions.FireAndForgetDelivery du fournisseur SMS est définie sur false), si le producteur attend chaque appel OnNextAsync
, les événements arrivent dans l’ordre FIFO. Dans SMS, il incombe au producteur de décider comment gérer les échecs de remise qui seront indiqués par un objet Task
endommagé, retourné par l’appel OnNextAsync
.
Les flux de file d’attente Azure ne garantissent pas l’ordre FIFO, car les files d’attente Azure sous-jacentes ne garantissent pas l’ordre en cas d’échec. (Ils garantissent l’ordre FIFO dans les exécutions sans échec.) Quand un producteur produit l’événement dans la file d’attente Azure, si l’opération de file d’attente échoue, il incombe au producteur de tenter d’utiliser une autre file d’attente et de traiter ultérieurement les éventuels doublons de messages. Côté remise, le runtime de streaming Orleans retire l’événement de la file d’attente et tente de le remettre aux consommateurs pour traitement. Le runtime de streaming Orleans supprime l’événement de la file d’attente uniquement quand le traitement a réussi. Si la remise ou le traitement échoue, l’événement n’est pas supprimé de la file d’attente et il réapparaît automatiquement dans la file d’attente ultérieurement. Le runtime de streaming tente de le remettre à nouveau, ce qui peut potentiellement rompre l’ordre FIFO. Le comportement ci-dessus correspond à la sémantique normale des files d’attente Azure.
Ordre défini par l’application : Pour résoudre les problèmes d’ordre ci-dessus, une application peut éventuellement spécifier son ordre. Cela est possible via un objet StreamSequenceToken, qui est un objet IComparable opaque, utilisable pour ordonner les événements. Un producteur peut transmettre un StreamSequenceToken
facultatif à l’appel OnNext
. Cet objet StreamSequenceToken
sera transmis au consommateur et sera remis avec l’événement. De cette façon, une application peut raisonner et reconstruire son ordre indépendamment du runtime de streaming.
Flux rembobinables
Certains flux autorisent uniquement une application à s’abonner à eux à partir du dernier point dans le temps, tandis que d’autres flux autorisent les « retours dans le temps ». Cette dernière fonctionnalité dépend de la technologie de mise en file d’attente sous-jacente et du fournisseur de flux concerné. Par exemple, les files d’attente Azure autorisent uniquement la consommation des événements les plus récents en file d’attente, tandis qu’EventHub autorise la relecture d’événements à partir d’un point dans le temps arbitraire (jusqu’à un certain délai d’expiration). Les flux qui prennent en charge les retours dans le temps sont appelés flux rembobinables.
Le consommateur d’un flux rembobinable peut transmettre un StreamSequenceToken
à l’appel SubscribeAsync
. Le runtime remettra les événements à celui-ci à partir de ce StreamSequenceToken
. Un jeton null signifie que le consommateur souhaite recevoir les événements à partir du plus récent.
La possibilité de rembobiner un flux est très utile dans les scénarios de récupération. Par exemple, considérez un grain qui s’abonne à un flux et vérifie régulièrement son état avec le jeton de séquence le plus récent. Lors de la récupération à partir d’un échec, le grain peut se réabonner au même flux à partir du jeton de séquence vérifié le plus récent, récupérant ainsi sans perdre aucun des événements générés depuis le dernier point de contrôle.
Le fournisseur Event Hubs est rembobinable. Vous trouverez son code sur GitHub : Orleans/Azure/Orleans.Streaming.EventHubs. Les fournisseurs SMS et File d’attente Azurene sont pas rembobinables.
Traitement sans état avec scale-out automatique
Par défaut, le streaming Orleans est destiné à prendre en charge un grand nombre de flux relativement petits, tous traités individuellement par un ou plusieurs grains avec état. Collectivement, le traitement de tous ces flux est partitionné entre un grand nombre de grains normaux (avec état). Le code d’application contrôle ce partitionnement en affectant des ID de flux et des ID de grain et en effectuant des abonnements explicites. L’objectif est un traitement avec état partitionné.
Toutefois, il existe également un scénario intéressant de traitement sans état avec scale-out automatique. Dans ce scénario, une application a un petit nombre de flux (voire même un seul grand flux) et l’objectif est un traitement sans état. Un flux global d’événements en est un exemple, où le traitement implique le décodage de chaque événement et son transfert potentiel vers d’autres flux en vue de son traitement avec état. Le traitement de flux sans état avec scale-out peut être pris en charge dans Orleans via les grains StatelessWorkerAttribute.
État actuel du traitement sans état avec scale-out automatique : Ce processus n’est pas encore implémenté. Une tentative d’abonnement à un flux à partir d’un grain StatelessWorker
entraîne un comportement non défini. Nous envisageons de prendre en charge cette option.
Grains et clients Orleans
Orleans envoie du travail en streaming uniformément entre les grains et les clients Orleans. Autrement dit, les mêmes API peuvent être utilisées à l’intérieur d’un grain et dans un client Orleans pour produire et consommer des événements. Cela simplifie considérablement la logique d’application, ce qui rend redondantes les API spéciales côté client, telles que les observateurs de grain.
Streaming Pub-Sub entièrement managé et fiable
Pour suivre des abonnements de flux, Orleans utilise un composant d’exécution appelé Streaming Pub-Sub qui sert de point de rendez-vous pour les consommateurs de flux et les producteurs de flux. Pub-Sub effectue le suivi de tous les abonnements de flux, les rend persistants et met en correspondance les consommateurs de flux et les producteurs de flux.
Les applications peuvent choisir où et comment stocker les données Pub-Sub. Le composant Pub-Sub lui-même est implémenté en tant que grains (appelés PubSubRendezvousGrain
), qui utilisent la persistance déclarative d’Orleans. PubSubRendezvousGrain
utilise le fournisseur de stockage nommé PubSubStore
. Comme avec n’importe quel grain, vous pouvez désigner une implémentation pour un fournisseur de stockage. Pour le streaming Pub-Sub, vous pouvez modifier l’implémentation de PubSubStore
au moment de la construction du silo à l’aide du générateur d’hôtes de silo :
L’exemple suivant configure Pub-Sub pour stocker son état dans des tables Azure.
hostBuilder.AddAzureTableGrainStorage("PubSubStore",
options => options.ConfigureTableServiceClient("<Secret>"));
hostBuilder.AddAzureTableGrainStorage("PubSubStore",
options => options.ConnectionString = "<Secret>");
De cette façon, les données Pub-Sub sont stockées durablement dans une table Azure. Pour le développement initial, vous pouvez également utiliser un stockage en mémoire. Outre Pub-Sub, le runtime de streaming Orleans remet les événements des producteurs aux consommateurs, gère toutes les ressources d’exécution allouées aux flux activement utilisés, et effectue de manière transparente le garbage collection des ressources d’exécution à partir des flux inutilisés.
Configuration
Pour utiliser les flux, vous devez activer des fournisseurs de flux via les générateurs de clients de cluster ou d’hôtes de silo. Vous trouverez ici un complément d’information sur les fournisseurs de flux. Exemple de configuration d’un fournisseur de flux :
hostBuilder.AddMemoryStreams("StreamProvider")
.AddAzureQueueStreams<AzureQueueDataAdapterV2>("AzureQueueProvider",
optionsBuilder => optionsBuilder.Configure(
options => options.ConfigureTableServiceClient("<Secret>")))
.AddAzureTableGrainStorage("PubSubStore",
options => options.ConfigureTableServiceClient("<Secret>"));
hostBuilder.AddSimpleMessageStreamProvider("SMSProvider")
.AddAzureQueueStreams<AzureQueueDataAdapterV2>("AzureQueueProvider",
optionsBuilder => optionsBuilder.Configure(
options => options.ConnectionString = "<Secret>"))
.AddAzureTableGrainStorage("PubSubStore",
options => options.ConnectionString = "<Secret>");