Criando e assinando sequências observáveis simples
Você não precisa implementar a interface IObservable<T> manualmente para criar sequências observáveis. Da mesma forma, você não precisa implementar o IObserver<T> para assinar uma sequência. Ao instalar os assemblies de Extensão Reativa, você pode aproveitar o tipo Observável , que fornece muitos operadores LINQ estáticos para criar uma sequência simples com zero, um ou mais elementos. Além disso, o Rx fornece métodos de extensão Subscribe que têm várias combinações de manipuladores OnNext, OnError e OnCompleted em termos de delegados.
Criando e assinando uma sequência simples
O exemplo a seguir usa o operador Range do tipo Observável para criar uma coleção simples observável de números. O observador assina esta coleção usando o método Subscribe da classe Observable e fornece ações que são delegados que lidam com OnNext, OnError e OnCompleted.
O operador Range tem várias sobrecargas. Em nosso exemplo, ele cria uma sequência de inteiros que começa com x e produz y números sequenciais posteriormente.
Assim que a assinatura ocorrer, os valores serão enviados ao observador. O delegado OnNext imprime os valores.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;
namespace SimpleSequence
{
class Program
{
static void Main(string[] args)
{
IObservable<int> source = Observable.Range(1, 10);
IDisposable subscription = source.Subscribe(
x => Console.WriteLine("OnNext: {0}", x),
ex => Console.WriteLine("OnError: {0}", ex.Message),
() => Console.WriteLine("OnCompleted"));
Console.WriteLine("Press ENTER to unsubscribe...");
Console.ReadLine();
subscription.Dispose();
}
}
}
Quando um observador assina uma sequência observável, o thread que chama o método Subscribe pode ser diferente do thread no qual a sequência é executada até a conclusão. Portanto, a chamada Subscribe é assíncrona, pois o chamador não é bloqueado até que a observação da sequência seja concluída. Isso será abordado em mais detalhes no tópico Usando agendadores .
Observe que o método Subscribe retorna um IDisposable, para que você possa cancelar a assinatura de uma sequência e descartá-la facilmente. Quando você invocar o método Dispose na sequência observável, o observador deixará de ouvir o observável para dados. Normalmente, você não precisa chamar Dispose explicitamente, a menos que precise cancelar a assinatura antecipadamente ou quando a sequência observável de origem tiver um período de vida mais longo do que o observador. As assinaturas no Rx foram projetadas para cenários de fogo e de esquecer sem o uso de um finalizador. Quando a instância IDisposable é coletada pelo coletor de lixo, o Rx não descarta automaticamente a assinatura. No entanto, observe que o comportamento padrão dos operadores Observáveis é descartar a assinatura o mais rápido possível (ou seja, quando uma mensagem OnCompleted ou OnError é publicada). Por exemplo, o código var x = Observable.Zip(a,b).Subscribe();
assinará x para ambas as sequências a e b. Se um gerar um erro, x será imediatamente desinscrito de b.
Você também pode ajustar o exemplo de código para usar o operador Create do tipo Observável , que cria e retorna um observador dos delegados de ação OnNext, OnError e OnCompleted especificados. Em seguida, você pode passar esse observador para o método Subscribe do tipo Observável . O exemplo a seguir mostra como fazer isso.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;
namespace SimpleSequence
{
class Program
{
static void Main(string[] args)
{
IObservable<int> source = Observable.Range(1, 10);
IObserver<int> obsvr = Observer.Create<int>(
x => Console.WriteLine("OnNext: {0}", x),
ex => Console.WriteLine("OnError: {0}", ex.Message),
() => Console.WriteLine("OnCompleted"));
IDisposable subscription = source.Subscribe(obsvr);
Console.WriteLine("Press ENTER to unsubscribe...");
Console.ReadLine();
subscription.Dispose();
}
}
}
Além de criar uma sequência observável do zero, você pode converter enumeradores existentes, eventos .NET e padrões assíncronos em sequências observáveis. Os outros tópicos desta seção mostrarão como fazer isso.
Observe que este tópico mostra apenas alguns operadores que podem criar uma sequência observável do zero. Para saber mais sobre outros operadores LINQ, consulte Consultando sequências observáveis usando operadores LINQ.
Usando um temporizador
O exemplo a seguir usa o operador Timer para criar uma sequência. A sequência efetuará push do primeiro valor depois que 5 segundos tiver decorrido e, em seguida, efetuará push dos valores subsequentes a cada 1 segundo. Para fins de ilustração, encadeamos o operador Timestamp à consulta para que cada valor enviado por push seja acrescentado pelo momento em que ele for publicado. Ao fazer isso, quando assinamos essa sequência de origem, podemos receber seu valor e o carimbo de data/hora.
Console.WriteLine(“Current Time: “ + DateTime.Now);
var source = Observable.Timer(TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(1))
.Timestamp();
using (source.Subscribe(x => Console.WriteLine("{0}: {1}", x.Value, x.Timestamp)))
{
Console.WriteLine("Press any key to unsubscribe");
Console.ReadKey();
}
Console.WriteLine("Press any key to exit");
Console.ReadKey();
A saída será semelhante a:
Current Time: 5/31/2011 5:35:08 PM
Press any key to unsubscribe
0: 5/31/2011 5:35:13 PM -07:00
1: 5/31/2011 5:35:14 PM -07:00
2: 5/31/2011 5:35:15 PM -07:00
Usando o operador Timestamp, verificamos se o primeiro item é realmente enviado por push 5 segundos após o início da sequência e cada item é publicado 1 segundo depois.
Convertendo uma coleção enumerável em uma sequência observável
Usando o operador ToObservable, você pode converter uma coleção enumerável genérica em uma sequência observável e assiná-la.
IEnumerable<int> e = new List<int> { 1, 2, 3, 4, 5 };
IObservable<int> source = e.ToObservable();
IDisposable subscription = source.Subscribe(
x => Console.WriteLine("OnNext: {0}", x),
ex => Console.WriteLine("OnError: {0}", ex.Message),
() => Console.WriteLine("OnCompleted"));
Console.ReadKey();
Observáveis Frios vs. Frequentes
Observáveis frios começam a ser executados na assinatura, ou seja, a sequência observável só começa a enviar valores por push para os observadores quando Subscribe é chamado. Os valores também não são compartilhados entre os assinantes. Isso é diferente dos observáveis frequentes, como eventos de movimentação do mouse ou tickers de ações que já estão produzindo valores antes mesmo de uma assinatura estar ativa. Quando um observador assina uma sequência observável quente, ele obtém o valor atual no fluxo. A sequência observável quente é compartilhada entre todos os assinantes e cada assinante é enviado por push para o próximo valor na sequência. Por exemplo, mesmo que ninguém tenha assinado um determinado ticker de ações, o ticker continuará a atualizar seu valor com base na movimentação do mercado. Quando um assinante registra interesse nesse ticker, ele receberá automaticamente o tique mais recente.
O exemplo a seguir demonstra uma sequência observável a frio. Neste exemplo, usamos o operador Interval para criar uma sequência observável simples de números bombeados em intervalos específicos, nesse caso, a cada 1 segundo.
Em seguida, dois observadores assinam essa sequência e imprimem seus valores. Você observará que a sequência é redefinida para cada assinante, no qual a segunda assinatura reiniciará a sequência do primeiro valor.
IObservable<int> source = Observable.Interval(TimeSpan.FromSeconds(1));
IDisposable subscription1 = source.Subscribe(
x => Console.WriteLine("Observer 1: OnNext: {0}", x),
ex => Console.WriteLine("Observer 1: OnError: {0}", ex.Message),
() => Console.WriteLine("Observer 1: OnCompleted"));
IDisposable subscription2 = source.Subscribe(
x => Console.WriteLine("Observer 2: OnNext: {0}", x),
ex => Console.WriteLine("Observer 2: OnError: {0}", ex.Message),
() => Console.WriteLine("Observer 2: OnCompleted"));
Console.WriteLine("Press any key to unsubscribe");
Console.ReadLine();
subscription1.Dispose();
subscription2.Dispose();
No exemplo a seguir, convertemos a sequência source
observável a frio anterior em uma quente usando o operador Publish, que retorna uma instância IConnectableObservable que nomeamos hot
como . O operador Publish fornece um mecanismo para compartilhar assinaturas transmitindo uma única assinatura para vários assinantes.
hot
atua como um proxy e assina o source
, então, como ele recebe valores de source
, os envia por push para seus próprios assinantes. Para estabelecer uma assinatura para o backup source
e começar a receber valores, usamos o método IConnectableObservable.Connect(). Como IConnectableObservable herda IObservable, podemos usar Subscribe para assinar essa sequência quente antes mesmo de começar a ser executada. Observe que, no exemplo, a sequência quente não foi iniciada quando subscription1
a assina. Portanto, nenhum valor é enviado por push para o assinante. Depois de chamar Conectar, os valores são então enviados por push para subscription1
. Após um atraso de 3 segundoshot
, subscription2
assina e começa a receber os valores imediatamente da posição atual (3 nesse caso) até o final. A saída tem esta aparência:
Current Time: 6/1/2011 3:38:49 PM
Current Time after 1st subscription: 6/1/2011 3:38:49 PM
Current Time after Connect: 6/1/2011 3:38:52 PM
Observer 1: OnNext: 0
Observer 1: OnNext: 1
Current Time just before 2nd subscription: 6/1/2011 3:38:55 PM
Observer 1: OnNext: 2
Observer 1: OnNext: 3
Observer 2: OnNext: 3
Observer 1: OnNext: 4
Observer 2: OnNext: 4
Console.WriteLine("Current Time: " + DateTime.Now);
var source = Observable.Interval(TimeSpan.FromSeconds(1)); //creates a sequence
IConnectableObservable<long> hot = Observable.Publish<long>(source); // convert the sequence into a hot sequence
IDisposable subscription1 = hot.Subscribe( // no value is pushed to 1st subscription at this point
x => Console.WriteLine("Observer 1: OnNext: {0}", x),
ex => Console.WriteLine("Observer 1: OnError: {0}", ex.Message),
() => Console.WriteLine("Observer 1: OnCompleted"));
Console.WriteLine("Current Time after 1st subscription: " + DateTime.Now);
Thread.Sleep(3000); //idle for 3 seconds
hot.Connect(); // hot is connected to source and starts pushing value to subscribers
Console.WriteLine("Current Time after Connect: " + DateTime.Now);
Thread.Sleep(3000); //idle for 3 seconds
Console.WriteLine("Current Time just before 2nd subscription: " + DateTime.Now);
IDisposable subscription2 = hot.Subscribe( // value will immediately be pushed to 2nd subscription
x => Console.WriteLine("Observer 2: OnNext: {0}", x),
ex => Console.WriteLine("Observer 2: OnError: {0}", ex.Message),
() => Console.WriteLine("Observer 2: OnCompleted"));
Console.ReadKey();