Partager via


Interrogation de séquences observables à l’aide d’opérateurs LINQ

Dans Pontage avec des événements .NET existants, nous avons converti les événements .NET existants en séquences observables pour s’y abonner. Dans cette rubrique, nous allons examiner la nature de première classe des séquences observables en tant qu’objets T> IObservable<, dans lesquels les opérateurs LINQ génériques sont fournis par les assemblys Rx pour manipuler ces objets. La plupart des opérateurs prennent une séquence observable et exécutent une logique sur celle-ci et produisent une autre séquence observable. En outre, comme vous pouvez le voir dans nos exemples de code, vous pouvez même chaîner plusieurs opérateurs sur une séquence source pour ajuster la séquence résultante à vos besoins exacts.

Utilisation de différents opérateurs

Nous avons déjà utilisé les opérateurs Créer et générer dans les rubriques précédentes pour créer et retourner des séquences simples. Nous avons également utilisé l’opérateur FromEventPattern pour convertir des événements .NET existants en séquences observables. Dans cette rubrique, nous allons utiliser d’autres opérateurs LINQ statiques du type Observable afin que vous puissiez filtrer, regrouper et transformer des données. De tels opérateurs prennent la ou les séquences observables comme entrée et produisent une ou plusieurs séquences observables en tant que sortie.

Combinaison de différentes séquences

Dans cette section, nous allons examiner certains des opérateurs qui combinent différentes séquences observables en une seule séquence observable. Notez que les données ne sont pas transformées lorsque nous combinons des séquences.

Dans l’exemple suivant, nous utilisons l’opérateur Concat pour combiner deux séquences en une seule séquence et s’y abonner. À des fins d’illustration, nous allons utiliser l’opérateur Range(x, y) très simple pour créer une séquence d’entiers qui commence par x et produit ensuite des nombres séquentiels y.

var source1 = Observable.Range(1, 3);
var source2 = Observable.Range(1, 3);
source1.Concat(source2)
       .Subscribe(Console.WriteLine);
Console.ReadLine();

Notez que la séquence résultante est 1,2,3,1,2,3. En effet, lorsque vous utilisez l’opérateur Concat, la 2e séquence (source2) ne sera pas active tant que la première séquence (source1) n’aura pas terminé d’envoyer toutes ses valeurs. Ce n’est qu’une fois source1 l’opération terminée, puis source2 commence à envoyer (push) les valeurs à la séquence résultante. L’abonné obtient ensuite toutes les valeurs de la séquence résultante.

Comparez cela avec l’opérateur Merge. Si vous exécutez l’exemple de code suivant, vous obtenez 1,1,2,2,3,3. Cela est dû au fait que les deux séquences sont actives en même temps et que les valeurs sont poussées à mesure qu’elles se produisent dans les sources. La séquence résultante se termine uniquement lorsque la dernière séquence source a terminé d’envoyer des valeurs.

Notez que pour que la fusion fonctionne, toutes les séquences observables sources doivent être du même type IObservable<T>. La séquence résultante sera de type IObservable<T>. Si source1 génère un OnError au milieu de la séquence, la séquence résultante se termine immédiatement.

var source1 = Observable.Range(1, 3);
var source2 = Observable.Range(1, 3);
source1.Merge(source2)
       .Subscribe(Console.WriteLine);
Console.ReadLine();

Une autre comparaison peut être effectuée avec l’opérateur Catch. Dans ce cas, si source1 se termine sans erreur, ne source2 démarre pas. Par conséquent, si vous exécutez l’exemple de code suivant, vous obtenez 1,2,3 uniquement car source2 (qui produit 4,5,6) est ignoré.

var source1 = Observable.Range(1, 3);
var source2 = Observable.Range(4, 3);
source1.Catch(source2)
       .Subscribe(Console.WriteLine);
Console.ReadLine();

Enfin, examinons OnErrorResumeNext. Cet opérateur passe à source2 même si source1 ne peut pas être terminé en raison d’une erreur. Dans l’exemple suivant, même si source1 représente une séquence qui se termine par une exception (à l’aide de l’opérateur Throw), l’abonné reçoit les valeurs (1,2,3) publiées par source2. Par conséquent, si vous vous attendez à ce que l’une des séquences sources génère une erreur, il est plus sûr d’utiliser OnErrorResumeNext pour garantir que l’abonné recevra toujours certaines valeurs.

var source1 = Observable.Throw<int>(new Exception("An error has occurred."));
var source2 = Observable.Range(4, 3);
source1.OnErrorResumeNext(source2)
       .Subscribe(Console.WriteLine);
Console.ReadLine();

Notez que pour que tous ces opérateurs de combinaison fonctionnent, toutes les séquences observables doivent être du même type de T.

Projection

L’opérateur Select peut traduire chaque élément d’une séquence observable dans une autre forme.

Dans l’exemple suivant, nous projetons une séquence d’entiers dans des chaînes de longueur n respectivement.

var seqNum = Observable.Range(1, 5);
var seqString = from n in seqNum
                select new string('*', (int)n);
seqString.Subscribe(str => { Console.WriteLine(str); });
Console.ReadKey();

Dans l’exemple suivant, qui est une extension de l’exemple de conversion d’événements .NET que nous avons vu dans la rubrique Pontage avec des événements .NET existants , nous utilisons l’opérateur Select pour projeter le type de données IEventPattern<MouseEventArgs> dans un type Point . De cette façon, nous transformons une séquence d’événements de déplacement de souris en un type de données qui peut être analysé et manipulé plus loin, comme on peut le voir dans la section « Filtrage » suivante.

var frm = new Form();
IObservable<EventPattern<MouseEventArgs>> move = Observable.FromEventPattern<MouseEventArgs>(frm, "MouseMove");
IObservable<System.Drawing.Point> points = from evt in move
                                          select evt.EventArgs.Location;
points.Subscribe(pos => Console.WriteLine("mouse at " + pos));
Application.Run(frm);

Enfin, examinons l’opérateur SelectMany. L’opérateur SelectMany a de nombreuses surcharges, dont l’une prend un argument de fonction de sélecteur. Cette fonction de sélecteur est appelée sur chaque valeur envoyée par la source observable. Pour chacune de ces valeurs, le sélecteur la projette dans une séquence mini observable. À la fin, l’opérateur SelectMany aplatit toutes ces mini-séquences en une seule séquence résultante, qui est ensuite envoyée à l’abonné.

L’observable retournée par SelectMany publie OnCompleted une fois la séquence source et toutes les mini-séquences observables produites par le sélecteur sont terminées. Il déclenche OnError lorsqu’une erreur s’est produite dans le flux source, lorsqu’une exception a été levée par la fonction de sélecteur ou lorsqu’une erreur s’est produite dans l’une des séquences mini observables.

Dans l’exemple suivant, nous créons d’abord une séquence source qui produit un entier toutes les 5 secondes, puis nous décidons de prendre simplement les 2 premières valeurs produites (à l’aide de l’opérateur Take). Nous utilisons SelectMany ensuite pour projeter chacun de ces entiers à l’aide d’une autre séquence de {100, 101, 102}. Ce faisant, deux séquences mini observables sont produites, {100, 101, 102} et {100, 101, 102}. Ceux-ci sont finalement aplatit en un seul flux d’entiers de {100, 101, 102, 100, 101, 102} et poussés vers l’observateur.

var source1 = Observable.Interval(TimeSpan.FromSeconds(5)).Take(2);
var proj = Observable.Range(100, 3);
var resultSeq = source1.SelectMany(proj);

var sub = resultSeq.Subscribe(x => Console.WriteLine("OnNext : {0}", x.ToString()),
                              ex => Console.WriteLine("Error : {0}", ex.ToString()),
                              () => Console.WriteLine("Completed"));
Console.ReadKey();

Filtrage

Dans l’exemple suivant, nous utilisons l’opérateur Generate pour créer une séquence observable simple de nombres. L’opérateur Generate a plusieurs surcharges. Dans notre exemple, il prend un état initial (0 dans notre exemple), une fonction conditionnelle pour se terminer (moins de 10 fois), un itérateur (+1), un sélecteur de résultats (une fonction carrée de la valeur actuelle). , et impriment uniquement les moins de 15 à l’aide des opérateurs Where et Select.

  
IObservable<int> seq = Observable.Generate(0, i => i < 10, i => i + 1, i => i * i);
IObservable<int> source = from n in seq
                          where n < 5
                          select n;
source.Subscribe(x => {Console.WriteLine(x);});   // output is 0, 1, 4, 9
Console.ReadKey();

L’exemple suivant est une extension de l’exemple de projection que vous avez vu précédemment dans cette rubrique. Dans cet exemple, nous avons utilisé l’opérateur Select pour projeter le type de données IEventPattern<MouseEventArgs> dans un type Point . Dans l’exemple suivant, nous utilisons l’opérateur Where et Select pour sélectionner uniquement les mouvements de souris qui nous intéressent. Dans ce cas, nous filtrons les déplacements de la souris sur ceux du premier bisector (où les coordonnées x et y sont égales).

var frm = new Form(); 
IObservable<EventPattern<MouseEventArgs>> move = Observable.FromEventPattern<MouseEventArgs>(frm, "MouseMove");
IObservable<System.Drawing.Point> points = from evt in move
                                          select evt.EventArgs.Location;
var overfirstbisector = from pos in points
                        where pos.X == pos.Y 
                        select pos;
var movesub = overfirstbisector.Subscribe(pos => Console.WriteLine("mouse at " + pos));
Application.Run(frm);

Opération basée sur le temps

Vous pouvez utiliser les opérateurs de mémoire tampon pour effectuer des opérations basées sur le temps.

La mise en mémoire tampon d’une séquence observable signifie que les valeurs d’une séquence observable sont placées dans une mémoire tampon en fonction d’un intervalle de temps spécifié ou d’un seuil de nombre. Cela est particulièrement utile dans les situations où vous vous attendez à ce qu’une quantité énorme de données soit envoyée hors de la séquence et que l’abonné ne dispose pas de la ressource pour traiter ces valeurs. En mettant en mémoire tampon les résultats en fonction de l’heure ou du nombre, et en retournant une séquence de valeurs uniquement lorsque les critères sont dépassés (ou lorsque la séquence source est terminée), l’abonné peut traiter les appels OnNext à son propre rythme. 

Dans l’exemple suivant, nous créons d’abord une séquence simple d’entiers pour chaque seconde. Nous utilisons ensuite l’opérateur Buffer et spécifions que chaque mémoire tampon contiendra 5 éléments de la séquence. OnNext est appelé lorsque la mémoire tampon est pleine. Nous calculons ensuite la somme de la mémoire tampon à l’aide de l’opérateur Sum. La mémoire tampon est automatiquement vidée et un autre cycle commence. L’impression sera 10, 35, 60… 10=0+1+2+3+4, 35=5+6+7+8+9, et ainsi de suite.

var seq = Observable.Interval(TimeSpan.FromSeconds(1));
var bufSeq = seq.Buffer(5);
bufSeq.Subscribe(values => Console.WriteLine(values.Sum()));
Console.ReadKey();

Nous pouvons également créer une mémoire tampon avec un intervalle de temps spécifié. Dans l’exemple suivant, la mémoire tampon contient les éléments qui se sont accumulés pendant 3 secondes. L’impression sera 3, 12, 21... dans lequel 3=0+1+2, 12=3+4+5, et ainsi de suite.

var seq = Observable.Interval(TimeSpan.FromSeconds(1));
var bufSeq = seq.Buffer(TimeSpan.FromSeconds(3));
bufSeq.Subscribe(value => Console.WriteLine(value.Sum()));  
Console.ReadKey();

Notez que si vous utilisez Buffer ou Window, vous devez vous assurer que la séquence n’est pas vide avant de la filtrer.

Opérateurs LINQ par catégories

La rubrique Opérateurs LINQ by Categories répertorie tous les principaux opérateurs LINQ implémentés par le type Observable par leurs catégories ; plus précisément : création, conversion, combinaison, fonctionnelle, mathématique, temps, exceptions, divers, sélection et primitives.

Voir aussi

Référence

Observable

Concepts

Opérateurs LINQ par catégories