共用方式為


使用 LINQ 運算子查詢可觀察序列

[橋接現有的 .NET 事件] 中,我們已將現有的 .NET 事件轉換成可觀察的序列,以訂閱它們。 在本主題中,我們將探討可觀察序列的第一級本質,做為 IObservable < T > 物件,其中 Rx 元件會提供泛型 LINQ 運算子來管理這些物件。 大部分運算子都會採用可觀察的序列,並在其上執行一些邏輯,並輸出另一個可觀察序列。 此外,如我們的程式碼範例所示,您甚至可以鏈結來源序列上的多個運算子,以根據您的確切需求來調整結果序列。

使用不同的運算子

我們已使用先前主題中的 Create 和 Generate 運算子來建立和傳回簡單的序列。 我們也使用 FromEventPattern 運算子,將現有的 .NET 事件轉換成可觀察的序列。 在本主題中,我們將使用 Observable 類型的其他靜態 LINQ 運算子,以便篩選、分組和轉換資料。 這類運算子會將可觀察序列 () 作為輸入,並產生可觀察序列 () 作為輸出。

結合不同的序列

在本節中,我們將檢查將各種可觀察序列結合成單一可觀察序列的一些運算子。 請注意,當我們結合序列時,不會轉換資料。

在下列範例中,我們使用 Concat 運算子將兩個序列結合成單一序列,並訂閱它。 為了方便說明,我們將使用非常簡單的 Range (x、 y) 運算子來建立以 x 開頭的整數序列,並在之後產生 y 序號。

var source1 = Observable.Range(1, 3);
var source2 = Observable.Range(1, 3);
source1.Concat(source2)
       .Subscribe(Console.WriteLine);
Console.ReadLine();

請注意,結果序列為 1,2,3,1,2,3 。 這是因為當您使用 Concat 運算子時,第 2 個序列 (source2) 直到第 1 個序列 () source1 完成推送其所有值之後才會作用中。 只有在完成之後 source1 ,才會 source2 開始將值推送至結果序列。 訂閱者接著會從結果序列取得所有值。

將此與 Merge 運算子進行比較。 如果您執行下列範例程式碼,將會取得 1,1,2,2,3,3 。 這是因為兩個序列同時作用中,而且值會在來源中發生時推送出來。 結果序列只會在最後一個來源序列完成推送值時完成。

請注意,若要讓 Merge 能夠運作,所有來源可觀察序列都必須是相同類型的 IObservable < T > 。 結果序列的類型為 IObservable < T > 。 如果在 source1 序列中間產生 OnError,則結果序列會立即完成。

var source1 = Observable.Range(1, 3);
var source2 = Observable.Range(1, 3);
source1.Merge(source2)
       .Subscribe(Console.WriteLine);
Console.ReadLine();

另一個比較可以使用 Catch 運算子來完成。 在此情況下,如果 source1 完成但不發生任何錯誤,則 source2 不會啟動。 因此,如果您執行下列範例程式碼,則只會取得 1,2,3 ,因為 source2 會忽略會產生 4,5,6) (。

var source1 = Observable.Range(1, 3);
var source2 = Observable.Range(4, 3);
source1.Catch(source2)
       .Subscribe(Console.WriteLine);
Console.ReadLine();

最後,讓我們看看 OnErrorResumeNext。 即使 source1 因為錯誤而無法完成,此運算子仍會繼續進行 source2 。 在下列範例中,即使 source1 表示使用 Throw 運算子) 以例外狀況 (終止的序列,訂閱者仍會收到 () 發行 source2 的值 1,2,3 。 因此,如果您預期任一來源序列產生任何錯誤,使用 OnErrorResumeNext 保證訂閱者仍然會收到一些值是更安全的選擇。

var source1 = Observable.Throw<int>(new Exception("An error has occurred."));
var source2 = Observable.Range(4, 3);
source1.OnErrorResumeNext(source2)
       .Subscribe(Console.WriteLine);
Console.ReadLine();

請注意,若要讓所有這些組合運算子運作,所有可觀察的序列都必須是相同類型的 T。

投影

Select 運算子可以將可觀察序列的每個元素轉譯成另一種形式。

在下列範例中,我們會將整數序列分別投影為長度 n 的字串。

var seqNum = Observable.Range(1, 5);
var seqString = from n in seqNum
                select new string('*', (int)n);
seqString.Subscribe(str => { Console.WriteLine(str); });
Console.ReadKey();

在下列範例中,這是我們在橋 接現有 .NET 事件 主題中所見 .NET 事件轉換範例的延伸模組,我們使用 Select 運算子將 IEventPattern < MouseEventArgs > 資料類型投影到 Point 類型中。 如此一來,我們會將滑鼠移動事件序列轉換成可進一步剖析和操作的資料類型,如下一個「篩選」一節所示。

var frm = new Form();
IObservable<EventPattern<MouseEventArgs>> move = Observable.FromEventPattern<MouseEventArgs>(frm, "MouseMove");
IObservable<System.Drawing.Point> points = from evt in move
                                          select evt.EventArgs.Location;
points.Subscribe(pos => Console.WriteLine("mouse at " + pos));
Application.Run(frm);

最後,讓我們看看 SelectMany 運算子。 SelectMany 運算子有許多多載,其中一個會採用選取器函式引數。 此選取器函式會在來源可觀察所推送的每個值上叫用。 針對這些值,選取器會將它投影到迷你可觀察序列中。 最後,SelectMany 運算子會將所有這些迷你序列扁平化成單一結果序列,然後推送至訂閱者。

SelectMany 傳回的可觀察專案會在來源序列之後發佈 OnCompleted,以及選取器所產生的所有迷你可觀察序列都已完成。 當來來源資料流發生錯誤、選取器函式擲回例外狀況時,或發生任何迷你可觀察序列中的錯誤時,它會引發 OnError。

在下列範例中,我們會先建立來源序列,每 5 秒產生一個整數,然後決定只使用 Take 運算子) (產生的前 2 個值。 然後,我們會使用 SelectMany 另一個 序列 {100, 101, 102} 來投影每個整數。 如此一來,會產生兩個迷你可觀察序列和 {100, 101, 102}{100, 101, 102} 。 這些最後會壓平合併成 整數的單 {100, 101, 102, 100, 101, 102} 一資料流程,並推送至觀察者。

var source1 = Observable.Interval(TimeSpan.FromSeconds(5)).Take(2);
var proj = Observable.Range(100, 3);
var resultSeq = source1.SelectMany(proj);

var sub = resultSeq.Subscribe(x => Console.WriteLine("OnNext : {0}", x.ToString()),
                              ex => Console.WriteLine("Error : {0}", ex.ToString()),
                              () => Console.WriteLine("Completed"));
Console.ReadKey();

篩選

在下列範例中,我們使用 Generate 運算子來建立簡單的可觀察數位序列。 Generate 運算子有數個多載。 在我們的範例中,它會在範例) (0 的初始狀態、 (少於 10 次) 終止的條件函式、反覆運算器 (+1) 、結果選取器 (目前值的平方函數) 。 和 只會使用 Where 和 Select 運算子列印小於 15 的運算子。

  
IObservable<int> seq = Observable.Generate(0, i => i < 10, i => i + 1, i => i * i);
IObservable<int> source = from n in seq
                          where n < 5
                          select n;
source.Subscribe(x => {Console.WriteLine(x);});   // output is 0, 1, 4, 9
Console.ReadKey();

下列範例是您稍早在本主題中所見投影範例的延伸模組。 在該範例中,我們已使用 Select 運算子,將 IEventPattern < MouseEventArgs > 資料類型投影到 Point 類型中。 在下列範例中,我們使用 Where 和 Select 運算子,只挑選感興趣的滑鼠移動。 在此案例中,我們會篩選滑鼠移至第一個雙函式上方 (,其中 x 和 y 座標相等) 。

var frm = new Form(); 
IObservable<EventPattern<MouseEventArgs>> move = Observable.FromEventPattern<MouseEventArgs>(frm, "MouseMove");
IObservable<System.Drawing.Point> points = from evt in move
                                          select evt.EventArgs.Location;
var overfirstbisector = from pos in points
                        where pos.X == pos.Y 
                        select pos;
var movesub = overfirstbisector.Subscribe(pos => Console.WriteLine("mouse at " + pos));
Application.Run(frm);

以時間為基礎的作業

您可以使用 Buffer 運算子來執行以時間為基礎的作業。

緩衝可觀察序清單示可觀察序列的值會根據指定的時間範圍或計數臨界值放入緩衝區中。 這在預期序列推送出大量資料的情況中特別有用,而且訂閱者沒有處理這些值的資源。 藉由根據時間或計數緩衝結果,而且只有在超過準則 (或來源序列完成) 時,訂閱者才能以自己的步調處理 OnNext 呼叫。 

在下列範例中,我們會先為每秒建立一個簡單的整數序列。 接著,我們會使用 Buffer 運算子,並指定每個緩衝區會保存序列中的 5 個專案。 當緩衝區已滿時,會呼叫 OnNext。 接著,我們會使用 Sum 運算子來計算緩衝區的總和。 緩衝區會自動排清,而另一個週期會開始。 列印輸出將會 10, 35, 60… 是 10=0+1+2+3+4、35=5+6+7+8+9 等等。

var seq = Observable.Interval(TimeSpan.FromSeconds(1));
var bufSeq = seq.Buffer(5);
bufSeq.Subscribe(values => Console.WriteLine(values.Sum()));
Console.ReadKey();

我們也可以使用指定的時間範圍來建立緩衝區。 在下列範例中,緩衝區會保存累積 3 秒的專案。 列印輸出將會是 3、12、21... 其中 3=0+1+2、12=3+4+5 等等。

var seq = Observable.Interval(TimeSpan.FromSeconds(1));
var bufSeq = seq.Buffer(TimeSpan.FromSeconds(3));
bufSeq.Subscribe(value => Console.WriteLine(value.Sum()));  
Console.ReadKey();

請注意,如果您使用 Buffer 或 Window,則必須先確定序列在篩選之前不是空的。

依類別排序的 LINQ 運算子

依類別目錄實作之所有主要 LINQ 運算子的LINQ 運算子主題清單,具體來說:建立、轉換、結合、功能、數學、時間、例外狀況、其他、選取範圍和基本類型。

另請參閱

參考

Observable

概念

依類別排序的 LINQ 運算子