使用現有的非同步來源橋接
除了 .NET 事件之外,其他非同步資料來源也存在於.NET Framework中。 其中一個是非同步方法模式。 在此設計模式中,會提供兩種方法。 其中一個方法 (通常名為 BeginX) ,用來啟動計算,並傳回傳遞至第二個方法的 IAsyncResult 控制碼, (通常名為 EndX) ,然後擷取計算的結果。 完成通常是透過實作 AsyncCallback 委派或輪詢 IAsyncResult.IsCompleted 來發出訊號。 程式碼遵守此模式通常難以閱讀和維護。 在本主題中,我們將示範如何使用 Rx Factory 方法,將這類非同步資料來源轉換成可觀察的序列。
將非同步模式轉換成可觀察的序列
.NET 中的許多非同步方法都會以 BeginX 和 EndX 等簽章撰寫,其中 X 是非同步執行的方法名稱。 BeginX 會採用引數來執行 方法、AsyncCallback,這是採取 IAsyncResult 並傳回任何專案,最後傳回物件狀態的動作。 EndX 會採用從 AsyncCallback 傳入的 IAsyncResult,以擷取非同步呼叫的值。
Observable型別的 FromAsyncPattern 運算子會包裝 Begin 和 End 方法 (,這些方法會當做參數傳遞至運算子) ,並傳回函式,其採用與 Begin 相同的參數並傳回可觀察的。 這個可觀察值代表發佈單一值的序列,這是您剛才指定之呼叫的非同步結果。
在下列範例中,我們將針對使用 IAsyncResult 模式的 Stream 物件,將 BeginRead 和 EndRead 轉換為傳回可觀察序列的函式。 針對 FromAsyncPattern 運算子的泛型參數,我們會指定 BeginRead up 到回呼的引數類型。 由於 EndRead 方法會傳回值,因此我們會將此類型附加為 FromAsyncPattern 的最終泛型參數。 如果您將滑鼠停留在 var
上 read
,您會注意到 FromAsyncPattern 的傳回值是具有下列簽章的函式委派: Func<byte[], int32,int32, IObservable<int32>>
,這表示此函式會採用 3 個參數, (BeginRead) 相同的參數,並傳回 IObservable < Int32 > 。 這個 IObservable 包含一個值,這是 EndRead 傳回的整數,並包含從資料流程讀取的位元組數,介於零 (0) 和您要求的位元組數目之間。 由於我們現在會取得 IObservable,而不是 IAsyncResult,因此可以使用可觀察和訂閱、剖析或撰寫它的所有 LINQ 運算子。
Stream inputStream = Console.OpenStandardInput();
var read = Observable.FromAsyncPattern<byte[], int, int, int>(inputStream.BeginRead, inputStream.EndRead);
byte[] someBytes = new byte[10];
IObservable<int> source = read(someBytes, 0, 10);
IDisposable subscription = source.Subscribe(
x => Console.WriteLine("OnNext: {0}", x),
ex => Console.WriteLine("OnError: {0}", ex.Message),
() => Console.WriteLine("OnCompleted"));
Console.ReadKey();