如何:從資料流程區塊寫入及讀取訊息
本文說明如何使用工作平行程式庫 (TPL) 資料流程程式庫,在資料流程區塊中寫入及讀取訊息。 TPL 資料流程程式庫提供了在資料流程區塊中寫入和讀取訊息的同步和非同步方法。 本文說明如何使用 System.Threading.Tasks.Dataflow.BufferBlock<T> 類別。 BufferBlock<T> 類別會緩衝訊息,並且同時做為訊息來源和訊息目標。
注意
TPL 資料流程程式庫 (System.Threading.Tasks.Dataflow 命名空間) 並未隨 .NET 散發。 若要在 Visual Studio 中安裝 System.Threading.Tasks.Dataflow 命名空間,請開啟您的專案,從 [專案] 功能表中選擇 [管理 NuGet 套件],並於線上搜尋 System.Threading.Tasks.Dataflow
套件。 除此之外也可使用 .Net Core CLI (執行 dotnet add package System.Threading.Tasks.Dataflow
) 加以安裝。
以同步方式寫入及讀取
下列範例會使用 Post 方法寫入 BufferBlock<T> 資料流程區塊,以及使用 Receive 方法從同一個物件讀取。
var bufferBlock = new BufferBlock<int>();
// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
bufferBlock.Post(i);
}
// Receive the messages back from the block.
for (int i = 0; i < 3; i++)
{
Console.WriteLine(bufferBlock.Receive());
}
// Output:
// 0
// 1
// 2
Dim bufferBlock = New BufferBlock(Of Integer)()
' Post several messages to the block.
For i As Integer = 0 To 2
bufferBlock.Post(i)
Next i
' Receive the messages back from the block.
For i As Integer = 0 To 2
Console.WriteLine(bufferBlock.Receive())
Next i
' Output:
' 0
' 1
' 2
您也可以使用 TryReceive 方法從資料流程區塊中讀取,如下列範例所示。 TryReceive 方法不會封鎖目前執行緒,當您偶爾輪詢資料時很有用。
// Post more messages to the block.
for (int i = 0; i < 3; i++)
{
bufferBlock.Post(i);
}
// Receive the messages back from the block.
while (bufferBlock.TryReceive(out int value))
{
Console.WriteLine(value);
}
// Output:
// 0
// 1
// 2
' Post more messages to the block.
For i As Integer = 0 To 2
bufferBlock.Post(i)
Next i
' Receive the messages back from the block.
Dim value As Integer
Do While bufferBlock.TryReceive(value)
Console.WriteLine(value)
Loop
' Output:
' 0
' 1
' 2
由於 Post 方法會同步執行,因此先前範例中的 BufferBlock<T> 物件會在第二個迴圈讀取資料前接收所有資料。 下列範例會使用 Task.WhenAll(Task[]) 擴充第一個範例,以並行方式讀取和寫入訊息區塊。 因為 WhenAll 會等候正在同時執行的所有非同步作業,所以不會以任何特定順序將值寫入 BufferBlock<T> 物件。
// Write to and read from the message block concurrently.
var post01 = Task.Run(() =>
{
bufferBlock.Post(0);
bufferBlock.Post(1);
});
var receive = Task.Run(() =>
{
for (int i = 0; i < 3; i++)
{
Console.WriteLine(bufferBlock.Receive());
}
});
var post2 = Task.Run(() =>
{
bufferBlock.Post(2);
});
await Task.WhenAll(post01, receive, post2);
// Output:
// 0
// 1
// 2
' Write to and read from the message block concurrently.
Dim post01 = Task.Run(Sub()
bufferBlock.Post(0)
bufferBlock.Post(1)
End Sub)
Dim receive = Task.Run(Sub()
For i As Integer = 0 To 2
Console.WriteLine(bufferBlock.Receive())
Next i
End Sub)
Dim post2 = Task.Run(Sub() bufferBlock.Post(2))
Task.WaitAll(post01, receive, post2)
' Output:
' 0
' 1
' 2
以非同步方式寫入及讀取
下列範例會使用 SendAsync 方法以非同步方式寫入 BufferBlock<T> 物件,以及使用 ReceiveAsync 方法以非同步方式從同一個物件讀取。 這個範例會使用 async 和 await 運算子 (在 Visual Basic 中為 Async 和 Await) 以非同步方式對目標區塊傳送和讀取資料。 當您必須讓資料流程區塊延後訊息時,SendAsync 方法會很有用。 當您想要在有資料可用時處理資料時,ReceiveAsync 方法會很有用。 如需訊息如何在訊息區塊之間傳播的詳細資訊,請參閱資料流程中的<訊息傳遞>一節。
// Post more messages to the block asynchronously.
for (int i = 0; i < 3; i++)
{
await bufferBlock.SendAsync(i);
}
// Asynchronously receive the messages back from the block.
for (int i = 0; i < 3; i++)
{
Console.WriteLine(await bufferBlock.ReceiveAsync());
}
// Output:
// 0
// 1
// 2
' Post more messages to the block asynchronously.
For i As Integer = 0 To 2
await bufferBlock.SendAsync(i)
Next i
' Asynchronously receive the messages back from the block.
For i As Integer = 0 To 2
Console.WriteLine(await bufferBlock.ReceiveAsync())
Next i
' Output:
' 0
' 1
' 2
完整範例
下列範例顯示本文的所有程式碼。
using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
// Demonstrates a how to write to and read from a dataflow block.
class DataflowReadWrite
{
// Demonstrates asynchronous dataflow operations.
static async Task AsyncSendReceive(BufferBlock<int> bufferBlock)
{
// Post more messages to the block asynchronously.
for (int i = 0; i < 3; i++)
{
await bufferBlock.SendAsync(i);
}
// Asynchronously receive the messages back from the block.
for (int i = 0; i < 3; i++)
{
Console.WriteLine(await bufferBlock.ReceiveAsync());
}
// Output:
// 0
// 1
// 2
}
static async Task Main()
{
var bufferBlock = new BufferBlock<int>();
// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
bufferBlock.Post(i);
}
// Receive the messages back from the block.
for (int i = 0; i < 3; i++)
{
Console.WriteLine(bufferBlock.Receive());
}
// Output:
// 0
// 1
// 2
// Post more messages to the block.
for (int i = 0; i < 3; i++)
{
bufferBlock.Post(i);
}
// Receive the messages back from the block.
while (bufferBlock.TryReceive(out int value))
{
Console.WriteLine(value);
}
// Output:
// 0
// 1
// 2
// Write to and read from the message block concurrently.
var post01 = Task.Run(() =>
{
bufferBlock.Post(0);
bufferBlock.Post(1);
});
var receive = Task.Run(() =>
{
for (int i = 0; i < 3; i++)
{
Console.WriteLine(bufferBlock.Receive());
}
});
var post2 = Task.Run(() =>
{
bufferBlock.Post(2);
});
await Task.WhenAll(post01, receive, post2);
// Output:
// 0
// 1
// 2
// Demonstrate asynchronous dataflow operations.
await AsyncSendReceive(bufferBlock);
}
}
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow
' Demonstrates a how to write to and read from a dataflow block.
Friend Class DataflowReadWrite
' Demonstrates asynchronous dataflow operations.
Private Shared async Function AsyncSendReceive(ByVal bufferBlock As BufferBlock(Of Integer)) As Task
' Post more messages to the block asynchronously.
For i As Integer = 0 To 2
await bufferBlock.SendAsync(i)
Next i
' Asynchronously receive the messages back from the block.
For i As Integer = 0 To 2
Console.WriteLine(await bufferBlock.ReceiveAsync())
Next i
' Output:
' 0
' 1
' 2
End Function
Shared Sub Main(ByVal args() As String)
Dim bufferBlock = New BufferBlock(Of Integer)()
' Post several messages to the block.
For i As Integer = 0 To 2
bufferBlock.Post(i)
Next i
' Receive the messages back from the block.
For i As Integer = 0 To 2
Console.WriteLine(bufferBlock.Receive())
Next i
' Output:
' 0
' 1
' 2
' Post more messages to the block.
For i As Integer = 0 To 2
bufferBlock.Post(i)
Next i
' Receive the messages back from the block.
Dim value As Integer
Do While bufferBlock.TryReceive(value)
Console.WriteLine(value)
Loop
' Output:
' 0
' 1
' 2
' Write to and read from the message block concurrently.
Dim post01 = Task.Run(Sub()
bufferBlock.Post(0)
bufferBlock.Post(1)
End Sub)
Dim receive = Task.Run(Sub()
For i As Integer = 0 To 2
Console.WriteLine(bufferBlock.Receive())
Next i
End Sub)
Dim post2 = Task.Run(Sub() bufferBlock.Post(2))
Task.WaitAll(post01, receive, post2)
' Output:
' 0
' 1
' 2
' Demonstrate asynchronous dataflow operations.
AsyncSendReceive(bufferBlock).Wait()
End Sub
End Class
下一步
這個範例將示範如何直接讀取和寫入訊息區塊。 您也可以連接資料流程區塊組成「管線」,這是資料流程區塊的線性序列,或是組成「網路」,這是資料流程區塊的圖形。 在管線或網路中,當資料可供使用時,來源會非同步散佈資料至目標。 如需建立基本資料流程管線的範例,請參閱逐步解說:建立資料流程管線。 如需建立更複雜的資料流程網路的範例,請參閱逐步解說:在 Windows Forms 應用程式中使用資料流程。