共用方式為


建立和訂閱簡單的可觀察序列

您不需要手動實作 IObservable < T > 介面,即可建立可觀察的序列。 同樣地,您不需要實作 IObserver < T > 來訂閱序列。 藉由安裝回應式延伸模組元件,您可以利用 Observable 類型來提供許多靜態 LINQ 運算子,讓您建立具有零個或多個元素的簡單序列。 此外,Rx 提供訂閱擴充方法,其採用各種 OnNext、OnError 和 OnCompleted 處理常式的委派組合。

建立和訂閱簡單的序列

下列範例會使用 Observable 類型的 Range 運算子來建立簡單的可觀察數位集合。 觀察者會使用 Observable 類別的 Subscribe 方法訂閱此集合,並提供處理 OnNext、OnError 和 OnCompleted 之委派的動作。

Range 運算子有數個多載。 在我們的範例中,它會建立以 x 開頭的整數序列,並在之後產生 y 序號。 

一旦訂用帳戶發生,值就會傳送給觀察者。 OnNext 委派接著會列印出值。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;

namespace SimpleSequence
{
    class Program
    {
        static void Main(string[] args)
        {
            IObservable<int> source = Observable.Range(1, 10);
            IDisposable subscription = source.Subscribe(
                x => Console.WriteLine("OnNext: {0}", x),
                ex => Console.WriteLine("OnError: {0}", ex.Message),
                () => Console.WriteLine("OnCompleted"));
            Console.WriteLine("Press ENTER to unsubscribe...");
            Console.ReadLine();
            subscription.Dispose();
        }
    }
}

當觀察者訂閱可觀察的序列時,呼叫 Subscribe 方法的執行緒可能會與循序執行到完成的執行緒不同。 因此,訂閱呼叫是非同步,在序列的觀察完成之前,呼叫端不會遭到封鎖。 如需詳細資訊,請參閱 使用排程器 主題。

請注意,Subscribe 方法會傳回 IDisposable,讓您可以取消訂閱序列並輕鬆地處置它。 當您在可觀察序列上叫用 Dispose 方法時,觀察者會停止接聽可觀察的資料。  通常,除非您需要提早取消訂閱,或來源可觀察序列的生命週期比觀察者長時,否則您不需要明確呼叫 Dispose。 Rx 中的訂用帳戶是針對不使用完成項的引發和忘記案例所設計。 當垃圾收集行程收集 IDisposable 實例時,Rx 不會自動處置訂用帳戶。 不過,請注意,Observable 運算子的預設行為是儘快處置訂用帳戶, (亦即在發行 OnCompleted 或 OnError 訊息時) 。 例如,程式碼 var x = Observable.Zip(a,b).Subscribe(); 會訂閱 x 序列 a 和 b。 如果 擲回錯誤,則會立即取消訂閱 b 的 x。

您也可以調整程式碼範例,以使用 Observable 類型的 Create 運算子,它會從指定的 OnNext、OnError 和 OnCompleted 動作委派建立和傳回觀察者。 然後,您可以將此觀察者傳遞至 Observable 類型的 Subscribe 方法。 下列範例示範如何執行這項操作。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;

namespace SimpleSequence
{
    class Program
    {
        static void Main(string[] args)
        {
            IObservable<int> source = Observable.Range(1, 10);
            IObserver<int> obsvr = Observer.Create<int>(
                x => Console.WriteLine("OnNext: {0}", x),
                ex => Console.WriteLine("OnError: {0}", ex.Message),
                () => Console.WriteLine("OnCompleted"));
            IDisposable subscription = source.Subscribe(obsvr);
            Console.WriteLine("Press ENTER to unsubscribe...");
            Console.ReadLine();
            subscription.Dispose();
       }
    }
}

除了從頭開始建立可觀察序列之外,您還可以將現有的列舉值、.NET 事件和非同步模式轉換成可觀察的序列。 本節中的其他主題將示範如何執行這項操作。

請注意,本主題只會顯示一些可從頭開始建立可觀察序列的運算子。 若要深入瞭解其他 LINQ 運算子,請參閱 使用 LINQ 運算子查詢可觀察序列

使用計時器

下列範例會使用 Timer 運算子來建立序列。 序列會在超過 5 秒之後推送第一個值,然後每隔 1 秒推送一次後續的值。 基於圖例目的,我們會將 Timestamp 運算子鏈結至查詢,以便在發行時附加每個推送的值。 如此一來,當我們訂閱此來源序列時,我們可以同時接收其值和時間戳記。

Console.WriteLine(“Current Time: “ + DateTime.Now);

var source = Observable.Timer(TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(1))
                       .Timestamp();
using (source.Subscribe(x => Console.WriteLine("{0}: {1}", x.Value, x.Timestamp)))
      {
           Console.WriteLine("Press any key to unsubscribe");
           Console.ReadKey();
      }
Console.WriteLine("Press any key to exit");
Console.ReadKey();

輸出類似如下:

Current Time: 5/31/2011 5:35:08 PM

Press any key to unsubscribe

0: 5/31/2011 5:35:13 PM -07:00

1: 5/31/2011 5:35:14 PM -07:00

2: 5/31/2011 5:35:15 PM -07:00

藉由使用 Timestamp 運算子,我們已確認第一個專案確實在序列啟動後 5 秒外推,且每個專案稍後都會發佈 1 秒。

將可列舉集合轉換成可觀察序列

使用 ToObservable 運算子,您可以將泛型可列舉集合轉換成可觀察的序列,並訂閱它。

IEnumerable<int> e = new List<int> { 1, 2, 3, 4, 5 };

IObservable<int> source = e.ToObservable();
IDisposable subscription = source.Subscribe(
                            x => Console.WriteLine("OnNext: {0}", x),
                            ex => Console.WriteLine("OnError: {0}", ex.Message),
                            () => Console.WriteLine("OnCompleted"));
Console.ReadKey();

冷與經常性觀察

冷觀察項會在訂閱時開始執行,亦即可觀察序列只會在呼叫 Subscribe 時開始將值推送至觀察者。 這些值也不會在訂閱者之間共用。 這與作用中的可觀察物件不同,例如滑鼠移動事件或庫存刻度器,這些事件即使在訂用帳戶作用中之前仍會產生值。 當觀察者訂閱熱觀察序列時,它會取得資料流程中的目前值。 熱觀察序列會在所有訂閱者之間共用,而且每個訂閱者都會推送序列中的下一個值。 例如,即使沒有任何人訂閱特定的股票刻度,刻度器仍會根據市場移動繼續更新其值。 當訂閱者在此刻度器中註冊興趣時,它會自動取得最新的刻度。

下列範例示範冷觀察序列。 在此範例中,我們使用 Interval 運算子來建立以特定間隔擷取的簡單可觀察數位序列,在此案例中為每 1 秒。

兩位觀察者接著訂閱此序列,並列印出其值。 您會注意到每個訂閱者的順序都會重設,第二個訂閱會從第一個值重新開機序列。

IObservable<int> source = Observable.Interval(TimeSpan.FromSeconds(1));   

IDisposable subscription1 = source.Subscribe(
                x => Console.WriteLine("Observer 1: OnNext: {0}", x),
                ex => Console.WriteLine("Observer 1: OnError: {0}", ex.Message),
                () => Console.WriteLine("Observer 1: OnCompleted"));

IDisposable subscription2 = source.Subscribe(
                x => Console.WriteLine("Observer 2: OnNext: {0}", x),
                ex => Console.WriteLine("Observer 2: OnError: {0}", ex.Message),
                () => Console.WriteLine("Observer 2: OnCompleted"));

Console.WriteLine("Press any key to unsubscribe");
Console.ReadLine();
subscription1.Dispose();
subscription2.Dispose();

在下列範例中,我們會使用 Publish 運算子,將先前的冷可觀察序列轉換成經常性存取序列 source ,這會傳回我們命名 hot 的 IConnectableObservable 實例。 發佈操作員會透過將單一訂閱廣播給多個訂閱者,來提供共用訂閱的機制。 hot 做為 Proxy 並訂閱 source ,然後當它接收來自 source 的值時,將它們推送至自己的訂閱者。 若要建立支援 source 和開始接收值的訂用帳戶,我們使用 IConnectableObservable.Connect () 方法。 由於 IConnectableObservable 繼承 IObservable,因此即使開始執行之前,我們仍可使用訂閱訂閱此熱序列。 請注意,在範例中,訂閱時 subscription1 尚未啟動經常性序列。 因此,不會將任何值推送至訂閱者。 呼叫 Connect 之後,值就會推送至 subscription1 。 延遲 3 秒之後, subscription2 請訂閱 hot 並開始立即從目前位置接收值,在此案例中 (3,) 到結束為止。 輸出如下所示:

Current Time: 6/1/2011 3:38:49 PM

Current Time after 1st subscription: 6/1/2011 3:38:49 PM

Current Time after Connect: 6/1/2011 3:38:52 PM

Observer 1: OnNext: 0

Observer 1: OnNext: 1

Current Time just before 2nd subscription: 6/1/2011 3:38:55 PM 

Observer 1: OnNext: 2

Observer 1: OnNext: 3

Observer 2: OnNext: 3

Observer 1: OnNext: 4

Observer 2: OnNext: 4
       
Console.WriteLine("Current Time: " + DateTime.Now);
var source = Observable.Interval(TimeSpan.FromSeconds(1));            //creates a sequence

IConnectableObservable<long> hot = Observable.Publish<long>(source);  // convert the sequence into a hot sequence

IDisposable subscription1 = hot.Subscribe(                        // no value is pushed to 1st subscription at this point
                            x => Console.WriteLine("Observer 1: OnNext: {0}", x),
                            ex => Console.WriteLine("Observer 1: OnError: {0}", ex.Message),
                            () => Console.WriteLine("Observer 1: OnCompleted"));
Console.WriteLine("Current Time after 1st subscription: " + DateTime.Now);
Thread.Sleep(3000);  //idle for 3 seconds
hot.Connect();       // hot is connected to source and starts pushing value to subscribers 
Console.WriteLine("Current Time after Connect: " + DateTime.Now);
Thread.Sleep(3000);  //idle for 3 seconds
Console.WriteLine("Current Time just before 2nd subscription: " + DateTime.Now);

IDisposable subscription2 = hot.Subscribe(     // value will immediately be pushed to 2nd subscription
                            x => Console.WriteLine("Observer 2: OnNext: {0}", x),
                            ex => Console.WriteLine("Observer 2: OnError: {0}", ex.Message),
                            () => Console.WriteLine("Observer 2: OnCompleted"));
Console.ReadKey();