Démarrage rapide du streaming dans Orleans
Ce guide vous montre un moyen rapide de configurer et d’utiliser des flux Orleans. Pour en apprendre plus sur les détails des fonctionnalités de streaming, lisez les autres parties de cette documentation.
Configurations requises
Dans ce guide, vous allez utiliser un flux basé sur la mémoire qui utilise la messagerie de grain pour envoyer des données de flux aux abonnés. Vous allez utiliser le fournisseur de stockage en mémoire pour stocker des listes d’abonnements. L’utilisation de mécanismes basés sur la mémoire pour la diffusion en continu et le stockage est destinée uniquement au développement et aux tests locaux et n’est pas destinée aux environnements de production.
Sur le silo, où silo
est un ISiloBuilder, appelez AddMemoryStreams :
silo.AddMemoryStreams("StreamProvider")
.AddMemoryGrainStorage("PubSubStore");
Sur le client du cluster, où client
est un IClientBuilder, appelez AddMemoryStreams.
client.AddMemoryStreams("StreamProvider");
Dans ce guide, nous allons utiliser un flux simple basé sur les messages qui utilise la messagerie de grain pour envoyer des données de flux aux abonnés. Nous allons utiliser le fournisseur de stockage en mémoire pour stocker les listes d’abonnements et il n’est donc pas judicieux de l’utiliser pour les véritables applications de production.
Sur le silo, où hostBuilder
est un ISiloHostBuilder
, appelez AddSimpleMessageStreamProvider :
hostBuilder.AddSimpleMessageStreamProvider("SMSProvider")
.AddMemoryGrainStorage("PubSubStore");
Sur le client du cluster, où clientBuilder
est un IClientBuilder
, appelez AddSimpleMessageStreamProvider.
clientBuilder.AddSimpleMessageStreamProvider("SMSProvider");
Notes
Par défaut, les messages transmis via le flux de messages simple sont considérés comme immuables et peuvent être transmis par référence à d’autres grains. Pour désactiver ce comportement, vous devez configurer le fournisseur SMS pour désactiver SimpleMessageStreamProviderOptions.OptimizeForImmutableData
siloBuilder
.AddSimpleMessageStreamProvider(
"SMSProvider",
options => options.OptimizeForImmutableData = false);
Vous pouvez créer des flux, envoyer des données en les utilisant comme producteurs et recevoir des données en tant qu’abonnés.
Produire des événements
Il est relativement facile de produire des événements pour les flux. Vous devez d’abord accéder au fournisseur de flux que vous avez défini dans la configuration précédemment ("StreamProvider"
), puis choisir un flux et lui envoyer (push) les données.
// Pick a GUID for a chat room grain and chat room stream
var guid = new Guid("some guid identifying the chat room");
// Get one of the providers which we defined in our config
var streamProvider = GetStreamProvider("StreamProvider");
// Get the reference to a stream
var streamId = StreamId.Create("RANDOMDATA", guid);
var stream = streamProvider.GetStream<int>(streamId);
Il est relativement facile de produire des événements pour les flux. Vous devez d’abord accéder au fournisseur de flux que vous avez défini dans la configuration précédemment ("SMSProvider"
), puis choisir un flux et lui envoyer (push) les données.
// Pick a GUID for a chat room grain and chat room stream
var guid = new Guid("some guid identifying the chat room");
// Get one of the providers which we defined in our config
var streamProvider = GetStreamProvider("SMSProvider");
// Get the reference to a stream
var stream = streamProvider.GetStream<int>(guid, "RANDOMDATA");
Comme vous pouvez le voir, notre flux a un GUID et un espace de noms. Cela facilite l’identification des flux uniques. Par exemple, l’espace de noms d’une salle de conversation peut être « Salles » et le GUID peut être le GUID du grain RoomGrain propriétaire.
Ici, nous utilisons le GUID d’une salle de conversation connue. À l’aide de la méthode OnNextAsync
du flux, nous pouvons lui envoyer (push) des données. Faisons-le à l’intérieur d’un minuteur, à l’aide de nombres aléatoires. Vous pouvez également utiliser n’importe quel autre type de données pour le flux.
RegisterTimer(_ =>
{
return stream.OnNextAsync(Random.Shared.Next());
},
null,
TimeSpan.FromMilliseconds(1_000),
TimeSpan.FromMilliseconds(1_000));
S’abonner à des données de diffusion en continu et les recevoir
Pour recevoir des données, vous pouvez utiliser des abonnements implicites et explicites, qui sont décrits plus en détail dans Abonnements explicites et implicites. Cet exemple utilise des abonnements implicites, qui sont plus faciles. Lorsqu’un type de grain souhaite s’abonner implicitement à un flux, il utilise l’attribut [ImplicitStreamSubscription(espace de noms)].
Pour votre cas, définissez un ReceiverGrain
comme suit :
[ImplicitStreamSubscription("RANDOMDATA")]
public class ReceiverGrain : Grain, IRandomReceiver
Chaque fois que des données sont envoyées (push) aux flux de l’espace de noms RANDOMDATA
, tels qu’ils figurent dans le minuteur, un grain de type ReceiverGrain
avec le même Guid
que le flux reçoit le message. Même si aucune activation du grain n’existe actuellement, le runtime en crée automatiquement une nouvelle et lui envoie le message.
Pour que cela fonction, nous devons effectuer le processus d’abonnement en définissant notre méthode OnNextAsync
pour la réception des données. Pour ce faire, notre ReceiverGrain
doit appeler ce qui suit dans son OnActivateAsync
// Create a GUID based on our GUID as a grain
var guid = this.GetPrimaryKey();
// Get one of the providers which we defined in config
var streamProvider = GetStreamProvider("StreamProvider");
// Get the reference to a stream
var streamId = StreamId.Create("RANDOMDATA", guid);
var stream = streamProvider.GetStream<int>(streamId);
// Set our OnNext method to the lambda which simply prints the data.
// This doesn't make new subscriptions, because we are using implicit
// subscriptions via [ImplicitStreamSubscription].
await stream.SubscribeAsync<int>(
async (data, token) =>
{
Console.WriteLine(data);
await Task.CompletedTask;
});
// Create a GUID based on our GUID as a grain
var guid = this.GetPrimaryKey();
// Get one of the providers which we defined in config
var streamProvider = GetStreamProvider("SMSProvider");
// Get the reference to a stream
var stream = streamProvider.GetStream<int>(guid, "RANDOMDATA");
// Set our OnNext method to the lambda which simply prints the data.
// This doesn't make new subscriptions, because we are using implicit
// subscriptions via [ImplicitStreamSubscription].
await stream.SubscribeAsync<int>(
async (data, token) =>
{
Console.WriteLine(data);
await Task.CompletedTask;
});
Vous êtes prêt à commencer ! Maintenant, il faut simplement que quelque chose déclenche la création du grain de producteur, puis le minuteur sera inscrit et commencera à envoyer des ints aléatoires à toutes les parties intéressées.
Encore une fois, ce guide ignore beaucoup de détails et n’est valable que pour montrer une vue d’ensemble. Lisez les autres parties de ce manuel et d’autres ressources sur RX pour bien comprendre ce qui est disponible et de quelle manière.
La programmation réactive peut être une approche très puissante pour résoudre de nombreux problèmes. Vous pouvez par exemple utiliser LINQ dans l’abonné pour filtrer les numéros et réaliser de nombreuses opérations intéressantes.