Partilhar via


Consultando sequências observáveis usando operadores LINQ

Em Ponte com eventos .NET existentes, convertemos eventos .NET existentes em sequências observáveis para assiná-los. Neste tópico, examinaremos a natureza de primeira classe de sequências observáveis como objetos T> IObservable<, nos quais operadores LINQ genéricos são fornecidos pelos assemblies Rx para manipular esses objetos. A maioria dos operadores pega uma sequência observável e executa alguma lógica nela e gera outra sequência observável. Além disso, como você pode ver em nossos exemplos de código, você pode até mesmo encadear vários operadores em uma sequência de origem para ajustar a sequência resultante para seu requisito exato.

Usando operadores diferentes

Já usamos os operadores Criar e Gerar em tópicos anteriores para criar e retornar sequências simples. Também usamos o operador FromEventPattern para converter eventos .NET existentes em sequências observáveis. Neste tópico, usaremos outros operadores LINQ estáticos do tipo Observável para que você possa filtrar, agrupar e transformar dados. Esses operadores tomam sequências observáveis como entrada e produzem sequências observáveis como saída.

Combinando sequências diferentes

Nesta seção, examinaremos alguns dos operadores que combinam várias sequências observáveis em uma única sequência observável. Observe que os dados não são transformados quando combinamos sequências.

No exemplo a seguir, usamos o operador Concat para combinar duas sequências em uma única sequência e assinar. Para fins de ilustração, usaremos o operador Range(x, y) muito simples para criar uma sequência de inteiros que começa com x e produz y números sequenciais posteriormente.

var source1 = Observable.Range(1, 3);
var source2 = Observable.Range(1, 3);
source1.Concat(source2)
       .Subscribe(Console.WriteLine);
Console.ReadLine();

Observe que a sequência resultante é 1,2,3,1,2,3. Isso ocorre porque, quando você usa o operador Concat, a 2ª sequência (source2) não estará ativa até que a 1ª sequência (source1) termine de enviar por push todos os seus valores. Ele é somente depois source1 de concluído e, em seguida source2 , começará a enviar valores por push para a sequência resultante. Em seguida, o assinante obterá todos os valores da sequência resultante.

Compare isso com o operador Merge. Se você executar o código de exemplo a seguir, obterá 1,1,2,2,3,3. Isso ocorre porque as duas sequências estão ativas ao mesmo tempo e os valores são enviados por push conforme ocorrem nas fontes. A sequência resultante só é concluída quando a última sequência de origem tiver terminado de enviar valores por push.

Observe que, para que a Mesclagem funcione, todas as sequências observáveis de origem precisam ser do mesmo tipo de T> IObservable<. A sequência resultante será do tipo IObservable<T>. Se source1 produzir um OnError no meio da sequência, a sequência resultante será concluída imediatamente.

var source1 = Observable.Range(1, 3);
var source2 = Observable.Range(1, 3);
source1.Merge(source2)
       .Subscribe(Console.WriteLine);
Console.ReadLine();

Outra comparação pode ser feita com o operador Catch. Nesse caso, se source1 for concluído sem nenhum erro, source2 não será iniciado. Portanto, se você executar o código de exemplo a seguir, obterá 1,2,3 somente uma vez source2 que (o que produz 4,5,6) será ignorado.

var source1 = Observable.Range(1, 3);
var source2 = Observable.Range(4, 3);
source1.Catch(source2)
       .Subscribe(Console.WriteLine);
Console.ReadLine();

Por fim, vamos examinar OnErrorResumeNext. Esse operador passará para source2 mesmo que source1 não possa ser concluído devido a um erro. No exemplo a seguir, embora source1 represente uma sequência que termina com uma exceção (usando o operador Throw), o assinante receberá valores (1,2,3) publicados pelo source2. Portanto, se você espera que qualquer sequência de origem produza qualquer erro, é uma aposta mais segura usar OnErrorResumeNext para garantir que o assinante ainda receberá alguns valores.

var source1 = Observable.Throw<int>(new Exception("An error has occurred."));
var source2 = Observable.Range(4, 3);
source1.OnErrorResumeNext(source2)
       .Subscribe(Console.WriteLine);
Console.ReadLine();

Observe que, para que todos esses operadores de combinação funcionem, todas as sequências observáveis precisam ser do mesmo tipo de T.

Projeção

O operador Select pode converter cada elemento de uma sequência observável em outro formulário.

No exemplo a seguir, projetamos uma sequência de inteiros em cadeias de caracteres de comprimento n, respectivamente.

var seqNum = Observable.Range(1, 5);
var seqString = from n in seqNum
                select new string('*', (int)n);
seqString.Subscribe(str => { Console.WriteLine(str); });
Console.ReadKey();

No exemplo a seguir, que é uma extensão do exemplo de conversão de eventos do .NET que vimos no tópico Ponte com Eventos .NET Existentes, usamos o operador Select para projetar o tipo de dados MouseEventArgs IEventArgs<> em um tipo point. Dessa forma, estamos transformando uma sequência de eventos de movimentação de mouse em um tipo de dados que pode ser analisado e manipulado ainda mais, como pode ser visto na próxima seção "Filtragem".

var frm = new Form();
IObservable<EventPattern<MouseEventArgs>> move = Observable.FromEventPattern<MouseEventArgs>(frm, "MouseMove");
IObservable<System.Drawing.Point> points = from evt in move
                                          select evt.EventArgs.Location;
points.Subscribe(pos => Console.WriteLine("mouse at " + pos));
Application.Run(frm);

Por fim, vamos examinar o operador SelectMany. O operador SelectMany tem muitas sobrecargas, uma das quais usa um argumento de função seletor. Essa função seletora é invocada em cada valor enviado pela origem observável. Para cada um desses valores, o seletor projeta-o em uma mini sequência observável. No final, o operador SelectMany mescla todas essas mini sequências em uma única sequência resultante, que é então enviada por push para o assinante.

O observável retornado de SelectMany publica OnCompleted após a sequência de origem e todas as mini sequências observáveis produzidas pelo seletor foram concluídas. Ele aciona OnError quando ocorreu um erro no fluxo de origem, quando uma exceção foi gerada pela função seletora ou quando ocorreu um erro em qualquer uma das mini sequências observáveis.

No exemplo a seguir, primeiro criamos uma sequência de origem que produz um inteiro a cada 5 segundos e decidimos usar apenas os dois primeiros valores produzidos (usando o operador Take). Em seguida, usamos SelectMany para projetar cada um desses inteiros usando outra sequência de {100, 101, 102}. Ao fazer isso, duas mini sequências observáveis são produzidas e {100, 101, 102}{100, 101, 102}. Estes são finalmente achatados em um único fluxo de inteiros de {100, 101, 102, 100, 101, 102} e enviados para o observador.

var source1 = Observable.Interval(TimeSpan.FromSeconds(5)).Take(2);
var proj = Observable.Range(100, 3);
var resultSeq = source1.SelectMany(proj);

var sub = resultSeq.Subscribe(x => Console.WriteLine("OnNext : {0}", x.ToString()),
                              ex => Console.WriteLine("Error : {0}", ex.ToString()),
                              () => Console.WriteLine("Completed"));
Console.ReadKey();

Filtragem

No exemplo a seguir, usamos o operador Generate para criar uma sequência observável simples de números. O operador Generate tem várias sobrecargas. Em nosso exemplo, é preciso um estado inicial (0 em nosso exemplo), uma função condicional para terminar (menos de 10 vezes), um iterador (+1), um seletor de resultados (uma função quadrada do valor atual). e imprimem apenas aqueles menores que 15 usando os operadores Where e Select.

  
IObservable<int> seq = Observable.Generate(0, i => i < 10, i => i + 1, i => i * i);
IObservable<int> source = from n in seq
                          where n < 5
                          select n;
source.Subscribe(x => {Console.WriteLine(x);});   // output is 0, 1, 4, 9
Console.ReadKey();

O exemplo a seguir é uma extensão do exemplo de projeção que você viu anteriormente neste tópico. Nesse exemplo, usamos o operador Select para projetar o tipo de dados IEventPattern<MouseEventArgs> em um tipo point . No exemplo a seguir, usamos o operador Where e Select para escolher apenas o movimento do mouse que estamos interessados. Nesse caso, filtramos os movimentos do mouse para aqueles no primeiro bisetor (em que as coordenadas x e y são iguais).

var frm = new Form(); 
IObservable<EventPattern<MouseEventArgs>> move = Observable.FromEventPattern<MouseEventArgs>(frm, "MouseMove");
IObservable<System.Drawing.Point> points = from evt in move
                                          select evt.EventArgs.Location;
var overfirstbisector = from pos in points
                        where pos.X == pos.Y 
                        select pos;
var movesub = overfirstbisector.Subscribe(pos => Console.WriteLine("mouse at " + pos));
Application.Run(frm);

Operação baseada em tempo

Você pode usar os operadores buffer para executar operações baseadas em tempo.

Armazenar em buffer uma sequência observável significa que os valores de uma sequência observável são colocados em um buffer com base em um período de tempo especificado ou por um limite de contagem. Isso é especialmente útil em situações em que você espera que uma quantidade tremenda de dados seja enviada pela sequência e o assinante não tenha o recurso para processar esses valores. Ao armazenar em buffer os resultados com base no tempo ou na contagem e retornar apenas uma sequência de valores quando os critérios forem excedidos (ou quando a sequência de origem for concluída), o assinante poderá processar chamadas OnNext em seu próprio ritmo. 

No exemplo a seguir, primeiro criamos uma sequência simples de inteiros para cada segundo. Em seguida, usamos o operador Buffer e especificamos que cada buffer conterá 5 itens da sequência. OnNext é chamado quando o buffer está cheio. Em seguida, contabilizamos a soma do buffer usando o operador Sum. O buffer é liberado automaticamente e outro ciclo começa. A impressão será 10, 35, 60… na qual 10=0+1+2+3+4, 35=5+6+7+8+9 e assim por diante.

var seq = Observable.Interval(TimeSpan.FromSeconds(1));
var bufSeq = seq.Buffer(5);
bufSeq.Subscribe(values => Console.WriteLine(values.Sum()));
Console.ReadKey();

Também podemos criar um buffer com um período de tempo especificado. No exemplo a seguir, o buffer conterá itens acumulados por 3 segundos. A impressão será 3, 12, 21... em que 3=0+1+2, 12=3+4+5 e assim por diante.

var seq = Observable.Interval(TimeSpan.FromSeconds(1));
var bufSeq = seq.Buffer(TimeSpan.FromSeconds(3));
bufSeq.Subscribe(value => Console.WriteLine(value.Sum()));  
Console.ReadKey();

Observe que, se você estiver usando Buffer ou Janela, precisará verificar se a sequência não está vazia antes de filtrar.

Operadores LINQ por categorias

O tópico Operadores LINQ por Categorias lista todos os principais operadores LINQ implementados pelo tipo Observável por suas categorias; especificamente: criação, conversão, combinação, funcional, matemática, tempo, exceções, diversos, seleção e primitivos.

Consulte Também

Referência

Observable

Conceitos

Operadores LINQ por categorias