Tworzenie i subskrybowanie prostych sekwencji obserwowanych
Nie trzeba ręcznie implementować interfejsu IObservable<T> w celu utworzenia widocznych sekwencji. Podobnie nie trzeba implementować serwera IObserver<T> , aby zasubskrybować sekwencję. Po zainstalowaniu zestawów reaktywnego rozszerzenia można skorzystać z typu obserwowanego , który zapewnia wiele statycznych operatorów LINQ, aby utworzyć prostą sekwencję z zerem, co najmniej jednym elementem. Ponadto język Rx udostępnia metody rozszerzenia Subskrybuj, które przyjmują różne kombinacje programów obsługi OnNext, OnError i OnCompleted pod względem delegatów.
Tworzenie i subskrybowanie prostej sekwencji
W poniższym przykładzie użyto operatora Zakres typu Obserwowalnego , aby utworzyć prostą możliwą do obserwowania kolekcję liczb. Obserwator subskrybuje tę kolekcję przy użyciu metody Subskrybuj klasy Obserwowalnej i udostępnia akcje, które obsługują polecenia OnNext, OnError i OnCompleted.
Operator Range ma kilka przeciążeń. W naszym przykładzie tworzy sekwencję liczb całkowitych, która zaczyna się od x i tworzy liczby sekwencyjne y później.
Gdy tylko nastąpi subskrypcja, wartości są wysyłane do obserwatora. Delegat OnNext wyświetla następnie wartości.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;
namespace SimpleSequence
{
class Program
{
static void Main(string[] args)
{
IObservable<int> source = Observable.Range(1, 10);
IDisposable subscription = source.Subscribe(
x => Console.WriteLine("OnNext: {0}", x),
ex => Console.WriteLine("OnError: {0}", ex.Message),
() => Console.WriteLine("OnCompleted"));
Console.WriteLine("Press ENTER to unsubscribe...");
Console.ReadLine();
subscription.Dispose();
}
}
}
Gdy obserwator subskrybuje obserwowaną sekwencję, wątek wywołujący metodę Subskrybuj może różnić się od wątku, w którym sekwencja jest uruchamiana do ukończenia. W związku z tym wywołanie Subskrybuj jest asynchroniczne w tym, że obiekt wywołujący nie zostanie zablokowany, dopóki nie zostanie ukończona obserwacja sekwencji. Zostanie to omówione bardziej szczegółowo w temacie Using Schedulers (Korzystanie z harmonogramów ).
Zwróć uwagę, że metoda Subskrybuj zwraca wartość IDisposable, aby można było łatwo anulować subskrypcję sekwencji i usunąć ją łatwo. Po wywołaniu metody Dispose w sekwencji możliwej do obserwacji obserwator przestanie nasłuchiwać danych, które można obserwować. Zwykle nie trzeba jawnie wywoływać metody Dispose, chyba że trzeba anulować subskrypcję wcześniej lub gdy sekwencja z możliwością obserwacji źródła ma dłuższy okres życia niż obserwator. Subskrypcje w języku Rx są przeznaczone do scenariuszy fire-and-forget bez użycia finalizatora. Gdy wystąpienie IDisposable jest zbierane przez moduł odśmiecający pamięci, usługa Rx nie usuwa automatycznie subskrypcji. Należy jednak pamiętać, że domyślne zachowanie operatorów obserwowanych polega na jak najszybszym usunięciu subskrypcji (tj. po opublikowaniu komunikatów OnCompleted lub OnError). Na przykład kod var x = Observable.Zip(a,b).Subscribe();
zasubskrybuje x do obu sekwencji a i b. W przypadku wystąpienia błędu x zostanie natychmiast anulowana subskrypcja b.
Możesz również dostosować przykład kodu, aby użyć operatora Create typu Obserwowalnego , który tworzy i zwraca obserwatora z określonych delegatów akcji OnNext, OnError i OnCompleted. Następnie można przekazać tego obserwatora do metody Subskrybuj typ obserwowany . Poniższy przykład pokazuje, jak to zrobić.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;
namespace SimpleSequence
{
class Program
{
static void Main(string[] args)
{
IObservable<int> source = Observable.Range(1, 10);
IObserver<int> obsvr = Observer.Create<int>(
x => Console.WriteLine("OnNext: {0}", x),
ex => Console.WriteLine("OnError: {0}", ex.Message),
() => Console.WriteLine("OnCompleted"));
IDisposable subscription = source.Subscribe(obsvr);
Console.WriteLine("Press ENTER to unsubscribe...");
Console.ReadLine();
subscription.Dispose();
}
}
}
Oprócz tworzenia widocznej sekwencji od podstaw można konwertować istniejące moduły wyliczania, zdarzenia platformy .NET i wzorce asynchroniczne na widoczne sekwencje. W innych tematach w tej sekcji pokazano, jak to zrobić.
Zwróć uwagę, że w tym temacie przedstawiono tylko kilka operatorów, które mogą od podstaw utworzyć możliwą do obserwacji sekwencję. Aby dowiedzieć się więcej na temat innych operatorów LINQ, zobacz Wykonywanie zapytań względem sekwencji obserwowanych przy użyciu operatorów LINQ.
Używanie czasomierza
Poniższy przykład używa operatora czasomierza do utworzenia sekwencji. Sekwencja wypchnie pierwszą wartość po upływie 5 sekund, a następnie wypchnie kolejne wartości co 1 sekundę. Na potrzeby ilustracji należy połączyć operator znacznika czasu z zapytaniem, aby każda wartość wypchnięta została dołączona przez czas jego opublikowania. Dzięki temu, gdy subskrybujemy tę sekwencję źródłową, możemy otrzymać zarówno jej wartość, jak i znacznik czasu.
Console.WriteLine(“Current Time: “ + DateTime.Now);
var source = Observable.Timer(TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(1))
.Timestamp();
using (source.Subscribe(x => Console.WriteLine("{0}: {1}", x.Value, x.Timestamp)))
{
Console.WriteLine("Press any key to unsubscribe");
Console.ReadKey();
}
Console.WriteLine("Press any key to exit");
Console.ReadKey();
Dane wyjściowe będą podobne do następujących:
Current Time: 5/31/2011 5:35:08 PM
Press any key to unsubscribe
0: 5/31/2011 5:35:13 PM -07:00
1: 5/31/2011 5:35:14 PM -07:00
2: 5/31/2011 5:35:15 PM -07:00
Korzystając z operatora Znacznik czasu, sprawdziliśmy, że pierwszy element jest rzeczywiście wypychany 5 sekund po rozpoczęciu sekwencji, a każdy element zostanie opublikowany 1 sekundę później.
Konwertowanie kolekcji wyliczalnej na sekwencję obserwowaną
Za pomocą operatora ToObservable można przekonwertować ogólną kolekcję wyliczalną na sekwencji obserwowanej i subskrybować ją.
IEnumerable<int> e = new List<int> { 1, 2, 3, 4, 5 };
IObservable<int> source = e.ToObservable();
IDisposable subscription = source.Subscribe(
x => Console.WriteLine("OnNext: {0}", x),
ex => Console.WriteLine("OnError: {0}", ex.Message),
() => Console.WriteLine("OnCompleted"));
Console.ReadKey();
Zimne a gorące obserwowalne
Zimne obserwowalne elementy zaczynają działać po subskrypcji, tj. obserwowana sekwencja rozpoczyna wypychanie wartości do obserwatorów po wywołaniu subskrypcji. Wartości nie są również udostępniane wśród subskrybentów. Różni się to od gorących obserwacji, takich jak zdarzenia przenoszenia myszy lub znaczniki zapasów, które już generują wartości, nawet zanim subskrypcja jest aktywna. Gdy obserwator subskrybuje gorącą sekwencję obserwowaną, otrzyma bieżącą wartość w strumieniu. Gorąca sekwencja obserwowana jest współużytkowana wśród wszystkich subskrybentów, a każdy subskrybent jest wypychany do następnej wartości w sekwencji. Na przykład, nawet jeśli nikt nie zasubskrybował określonego znacznika akcji, znacznik będzie nadal aktualizować jego wartość na podstawie ruchu rynkowego. Gdy subskrybent zarejestruje zainteresowanie tym znacznikiem, automatycznie otrzyma najnowszy znacznik.
W poniższym przykładzie pokazano zimną, zauważalną sekwencję. W tym przykładzie użyjemy operatora Interwał, aby utworzyć prostą możliwą do obserwowania sekwencję liczb pompowanych w określonych odstępach czasu, w tym przypadku co 1 sekundę.
Dwóch obserwatorów następnie subskrybuje tę sekwencję i wyświetla jej wartości. Zauważysz, że sekwencja jest resetowany dla każdego subskrybenta, w którym druga subskrypcja ponownie uruchomi sekwencję z pierwszej wartości.
IObservable<int> source = Observable.Interval(TimeSpan.FromSeconds(1));
IDisposable subscription1 = source.Subscribe(
x => Console.WriteLine("Observer 1: OnNext: {0}", x),
ex => Console.WriteLine("Observer 1: OnError: {0}", ex.Message),
() => Console.WriteLine("Observer 1: OnCompleted"));
IDisposable subscription2 = source.Subscribe(
x => Console.WriteLine("Observer 2: OnNext: {0}", x),
ex => Console.WriteLine("Observer 2: OnError: {0}", ex.Message),
() => Console.WriteLine("Observer 2: OnCompleted"));
Console.WriteLine("Press any key to unsubscribe");
Console.ReadLine();
subscription1.Dispose();
subscription2.Dispose();
W poniższym przykładzie przekonwertujemy poprzednią sekwencję source
z możliwością obserwacji zimnej na gorącą przy użyciu operatora Publikuj, który zwraca wystąpienie IConnectableObservable o nazwie hot
. Operator Publikowania udostępnia mechanizm udostępniania subskrypcji przez emisję pojedynczej subskrypcji do wielu subskrybentów.
hot
działa jako serwer proxy i subskrybuje element source
, a następnie odbiera wartości z source
, wypycha je do własnych subskrybentów. Aby ustanowić subskrypcję do kopii source
zapasowej i rozpocząć odbieranie wartości, użyjemy metody IConnectableObservable.Connect(). Ponieważ funkcja IConnectableObservable dziedziczy wartość IObservable, możemy użyć opcji Subskrybuj, aby zasubskrybować tę gorącą sekwencję jeszcze przed rozpoczęciem działania. Zwróć uwagę, że w przykładzie sekwencja gorąca nie została uruchomiona po subscription1
zasubskrybowaniu. W związku z tym żadna wartość nie jest wypychana do subskrybenta. Po wywołaniu połączenia wartości są następnie wypychane do subscription1
elementu . Po opóźnieniu 3 sekundy subscription2
subskrybuje hot
i rozpoczyna odbieranie wartości natychmiast z bieżącej pozycji (3 w tym przypadku) do końca. Dane wyjściowe wyglądają następująco:
Current Time: 6/1/2011 3:38:49 PM
Current Time after 1st subscription: 6/1/2011 3:38:49 PM
Current Time after Connect: 6/1/2011 3:38:52 PM
Observer 1: OnNext: 0
Observer 1: OnNext: 1
Current Time just before 2nd subscription: 6/1/2011 3:38:55 PM
Observer 1: OnNext: 2
Observer 1: OnNext: 3
Observer 2: OnNext: 3
Observer 1: OnNext: 4
Observer 2: OnNext: 4
Console.WriteLine("Current Time: " + DateTime.Now);
var source = Observable.Interval(TimeSpan.FromSeconds(1)); //creates a sequence
IConnectableObservable<long> hot = Observable.Publish<long>(source); // convert the sequence into a hot sequence
IDisposable subscription1 = hot.Subscribe( // no value is pushed to 1st subscription at this point
x => Console.WriteLine("Observer 1: OnNext: {0}", x),
ex => Console.WriteLine("Observer 1: OnError: {0}", ex.Message),
() => Console.WriteLine("Observer 1: OnCompleted"));
Console.WriteLine("Current Time after 1st subscription: " + DateTime.Now);
Thread.Sleep(3000); //idle for 3 seconds
hot.Connect(); // hot is connected to source and starts pushing value to subscribers
Console.WriteLine("Current Time after Connect: " + DateTime.Now);
Thread.Sleep(3000); //idle for 3 seconds
Console.WriteLine("Current Time just before 2nd subscription: " + DateTime.Now);
IDisposable subscription2 = hot.Subscribe( // value will immediately be pushed to 2nd subscription
x => Console.WriteLine("Observer 2: OnNext: {0}", x),
ex => Console.WriteLine("Observer 2: OnError: {0}", ex.Message),
() => Console.WriteLine("Observer 2: OnCompleted"));
Console.ReadKey();