Création et abonnement à des séquences observables simples
Vous n’avez pas besoin d’implémenter manuellement l’interface T> IObservable<pour créer des séquences observables. De même, vous n’avez pas besoin d’implémenter IObserver<T> pour vous abonner à une séquence. En installant les assemblys d’extension réactive, vous pouvez tirer parti du type Observable qui fournit de nombreux opérateurs LINQ statiques pour créer une séquence simple avec zéro, un ou plusieurs éléments. En outre, Rx fournit des méthodes d’extension Subscribe qui acceptent différentes combinaisons de gestionnaires OnNext, OnError et OnCompleted en termes de délégués.
Création et abonnement à une séquence simple
L’exemple suivant utilise l’opérateur Range du type Observable pour créer une collection observable simple de nombres. L’observateur s’abonne à cette collection à l’aide de la méthode Subscribe de la classe Observable et fournit des actions qui sont des délégués qui gèrent OnNext, OnError et OnCompleted.
L’opérateur Range a plusieurs surcharges. Dans notre exemple, il crée une séquence d’entiers qui commence par x et produit ensuite des nombres séquentiels y.
Dès que l’abonnement se produit, les valeurs sont envoyées à l’observateur. Le délégué OnNext imprime ensuite les valeurs.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;
namespace SimpleSequence
{
class Program
{
static void Main(string[] args)
{
IObservable<int> source = Observable.Range(1, 10);
IDisposable subscription = source.Subscribe(
x => Console.WriteLine("OnNext: {0}", x),
ex => Console.WriteLine("OnError: {0}", ex.Message),
() => Console.WriteLine("OnCompleted"));
Console.WriteLine("Press ENTER to unsubscribe...");
Console.ReadLine();
subscription.Dispose();
}
}
}
Lorsqu’un observateur s’abonne à une séquence observable, le thread appelant la méthode Subscribe peut être différent du thread dans lequel la séquence s’exécute jusqu’à la fin. Par conséquent, l’appel Subscribe est asynchrone en ce que l’appelant n’est pas bloqué tant que l’observation de la séquence n’est pas terminée. Cela sera abordé plus en détail dans la rubrique Utilisation de planificateurs .
Notez que la méthode Subscribe renvoie un IDisposable, afin que vous puissiez vous désabonner d’une séquence et la supprimer facilement. Lorsque vous appelez la méthode Dispose sur la séquence observable, l’observateur cesse d’écouter l’observable pour les données. Normalement, vous n’avez pas besoin d’appeler explicitement Dispose, sauf si vous devez vous désabonner tôt ou lorsque la séquence observable source a une durée de vie plus longue que celle de l’observateur. Les abonnements dans Rx sont conçus pour les scénarios fire-and-forget sans l’utilisation d’un finaliseur. Lorsque le instance IDisposable est collecté par le récupérateur de mémoire, Rx ne se débarrasse pas automatiquement de l’abonnement. Toutefois, notez que le comportement par défaut des opérateurs Observable consiste à supprimer l’abonnement dès que possible (par exemple, lorsqu’un message OnCompleted ou OnError est publié). Par exemple, le code var x = Observable.Zip(a,b).Subscribe();
s’abonnera à x aux séquences a et b. Si a lève une erreur, x est immédiatement désinscrit de b.
Vous pouvez également ajuster l’exemple de code pour utiliser l’opérateur Create du type Observable , qui crée et retourne un observateur à partir des délégués d’action OnNext, OnError et OnCompleted spécifiés. Vous pouvez ensuite passer cet observateur à la méthode Subscribe du type Observable . L’exemple suivant montre comment procéder.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;
namespace SimpleSequence
{
class Program
{
static void Main(string[] args)
{
IObservable<int> source = Observable.Range(1, 10);
IObserver<int> obsvr = Observer.Create<int>(
x => Console.WriteLine("OnNext: {0}", x),
ex => Console.WriteLine("OnError: {0}", ex.Message),
() => Console.WriteLine("OnCompleted"));
IDisposable subscription = source.Subscribe(obsvr);
Console.WriteLine("Press ENTER to unsubscribe...");
Console.ReadLine();
subscription.Dispose();
}
}
}
En plus de créer une séquence observable à partir de zéro, vous pouvez convertir des énumérateurs, des événements .NET et des modèles asynchrones existants en séquences observables. Les autres rubriques de cette section vous montrent comment procéder.
Notez que cette rubrique vous montre uniquement quelques opérateurs qui peuvent créer une séquence observable à partir de zéro. Pour en savoir plus sur les autres opérateurs LINQ, consultez Interrogation de séquences observables à l’aide d’opérateurs LINQ.
Utilisation d’un minuteur
L’exemple suivant utilise l’opérateur Timer pour créer une séquence. La séquence envoie (push) la première valeur après 5 secondes, puis les valeurs suivantes sont envoyées toutes les 1 secondes. À des fins d’illustration, nous chaînons l’opérateur Timestamp à la requête afin que chaque valeur envoyée soit ajoutée au moment de sa publication. Ce faisant, lorsque nous nous abonneons à cette séquence source, nous pouvons recevoir sa valeur et son horodatage.
Console.WriteLine(“Current Time: “ + DateTime.Now);
var source = Observable.Timer(TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(1))
.Timestamp();
using (source.Subscribe(x => Console.WriteLine("{0}: {1}", x.Value, x.Timestamp)))
{
Console.WriteLine("Press any key to unsubscribe");
Console.ReadKey();
}
Console.WriteLine("Press any key to exit");
Console.ReadKey();
La sortie doit ressembler à ceci :
Current Time: 5/31/2011 5:35:08 PM
Press any key to unsubscribe
0: 5/31/2011 5:35:13 PM -07:00
1: 5/31/2011 5:35:14 PM -07:00
2: 5/31/2011 5:35:15 PM -07:00
À l’aide de l’opérateur Timestamp, nous avons vérifié que le premier élément est effectivement sorti 5 secondes après le démarrage de la séquence, et que chaque élément est publié 1 seconde plus tard.
Conversion d’une collection énumérable en séquence observable
À l’aide de l’opérateur ToObservable, vous pouvez convertir une collection énumérable générique en séquence observable et vous y abonner.
IEnumerable<int> e = new List<int> { 1, 2, 3, 4, 5 };
IObservable<int> source = e.ToObservable();
IDisposable subscription = source.Subscribe(
x => Console.WriteLine("OnNext: {0}", x),
ex => Console.WriteLine("OnError: {0}", ex.Message),
() => Console.WriteLine("OnCompleted"));
Console.ReadKey();
Observables froids et chauds
Les observables à froid commencent à s’exécuter lors de l’abonnement, c’est-à-dire que la séquence observable ne commence à envoyer des valeurs aux observateurs que lorsque Subscribe est appelé. Les valeurs ne sont pas non plus partagées entre les abonnés. Cela est différent des observables à chaud comme les événements de déplacement de souris ou les tickers boursiers qui produisent déjà des valeurs avant même qu’un abonnement soit actif. Lorsqu’un observateur s’abonne à une séquence observable à chaud, il obtient la valeur actuelle dans le flux. La séquence observable à chaud est partagée entre tous les abonnés, et chaque abonné reçoit la valeur suivante de la séquence. Par exemple, même si personne n’a souscrit à un ticker boursier particulier, le ticker continuera à mettre à jour sa valeur en fonction de l’évolution du marché. Lorsqu’un abonné s’intéresse à ce ticker, il obtient automatiquement la dernière graduation.
L’exemple suivant illustre une séquence observable à froid. Dans cet exemple, nous utilisons l’opérateur Interval pour créer une séquence observable simple de nombres pompés à des intervalles spécifiques, dans ce cas, toutes les 1 secondes.
Deux observateurs s’abonnent ensuite à cette séquence et impriment ses valeurs. Vous remarquerez que la séquence est réinitialisée pour chaque abonné, dans lequel le deuxième abonnement redémarre la séquence à partir de la première valeur.
IObservable<int> source = Observable.Interval(TimeSpan.FromSeconds(1));
IDisposable subscription1 = source.Subscribe(
x => Console.WriteLine("Observer 1: OnNext: {0}", x),
ex => Console.WriteLine("Observer 1: OnError: {0}", ex.Message),
() => Console.WriteLine("Observer 1: OnCompleted"));
IDisposable subscription2 = source.Subscribe(
x => Console.WriteLine("Observer 2: OnNext: {0}", x),
ex => Console.WriteLine("Observer 2: OnError: {0}", ex.Message),
() => Console.WriteLine("Observer 2: OnCompleted"));
Console.WriteLine("Press any key to unsubscribe");
Console.ReadLine();
subscription1.Dispose();
subscription2.Dispose();
Dans l’exemple suivant, nous convertissons la séquence source
observable à froid précédente en séquence à chaud à l’aide de l’opérateur Publish, qui retourne une instance que nous nomlons hot
. L’opérateur Publish fournit un mécanisme pour partager des abonnements en diffusant un seul abonnement à plusieurs abonnés.
hot
agit comme un proxy et s’abonne à source
, puis, à mesure qu’il reçoit des valeurs de source
, les envoie à ses propres abonnés. Pour établir un abonnement au stockage source
et commencer à recevoir des valeurs, nous utilisons la méthode IConnectableObservable.Connect(). Étant donné que IConnectableObservable hérite d’IObservable, nous pouvons utiliser S’abonner pour vous abonner à cette séquence à chaud avant même qu’elle ne commence à s’exécuter. Notez que dans l’exemple, la séquence à chaud n’a pas été démarrée lors subscription1
de l’abonnement. Par conséquent, aucune valeur n’est envoyée à l’abonné. Après avoir appelé Connect, les valeurs sont ensuite envoyées à subscription1
. Après un délai de 3 secondes, subscription2
s’abonne à et commence à hot
recevoir les valeurs immédiatement à partir de la position actuelle (3 dans ce cas) jusqu’à la fin. Une sortie classique ressemble à ceci :
Current Time: 6/1/2011 3:38:49 PM
Current Time after 1st subscription: 6/1/2011 3:38:49 PM
Current Time after Connect: 6/1/2011 3:38:52 PM
Observer 1: OnNext: 0
Observer 1: OnNext: 1
Current Time just before 2nd subscription: 6/1/2011 3:38:55 PM
Observer 1: OnNext: 2
Observer 1: OnNext: 3
Observer 2: OnNext: 3
Observer 1: OnNext: 4
Observer 2: OnNext: 4
Console.WriteLine("Current Time: " + DateTime.Now);
var source = Observable.Interval(TimeSpan.FromSeconds(1)); //creates a sequence
IConnectableObservable<long> hot = Observable.Publish<long>(source); // convert the sequence into a hot sequence
IDisposable subscription1 = hot.Subscribe( // no value is pushed to 1st subscription at this point
x => Console.WriteLine("Observer 1: OnNext: {0}", x),
ex => Console.WriteLine("Observer 1: OnError: {0}", ex.Message),
() => Console.WriteLine("Observer 1: OnCompleted"));
Console.WriteLine("Current Time after 1st subscription: " + DateTime.Now);
Thread.Sleep(3000); //idle for 3 seconds
hot.Connect(); // hot is connected to source and starts pushing value to subscribers
Console.WriteLine("Current Time after Connect: " + DateTime.Now);
Thread.Sleep(3000); //idle for 3 seconds
Console.WriteLine("Current Time just before 2nd subscription: " + DateTime.Now);
IDisposable subscription2 = hot.Subscribe( // value will immediately be pushed to 2nd subscription
x => Console.WriteLine("Observer 2: OnNext: {0}", x),
ex => Console.WriteLine("Observer 2: OnError: {0}", ex.Message),
() => Console.WriteLine("Observer 2: OnCompleted"));
Console.ReadKey();