如何:使用 JoinBlock 從多個來源讀取資料
本文件將說明,如何在有多個來源的資料可用時,使用 JoinBlock<T1,T2> 類別執行作業。 另外也會示範如何使用非窮盡模式,讓多個聯結區塊更有效率地共用資料來源。
注意
TPL 資料流程程式庫 (System.Threading.Tasks.Dataflow 命名空間) 並未隨 .NET 散發。 若要在 Visual Studio 中安裝 System.Threading.Tasks.Dataflow 命名空間,請開啟您的專案,從 [專案] 功能表中選擇 [管理 NuGet 套件],並於線上搜尋 System.Threading.Tasks.Dataflow
套件。 除此之外也可使用 .Net Core CLI (執行 dotnet add package System.Threading.Tasks.Dataflow
) 加以安裝。
範例
下列範例會定義三個資源類型 NetworkResource
、FileResource
和 MemoryResource
,並且在有可用資源時執行作業。 這個範例需要 NetworkResource
和 MemoryResource
配對,以便執行第一個作業,而且需要 FileResource
和 MemoryResource
配對,以便執行第二個作業。 為了要讓這些作業在所有必要的資源可供使用時發生,這個範例會使用 JoinBlock<T1,T2> 類別。 當 JoinBlock<T1,T2> 物件接收來自所有來源的資料時,它會將該資料傳播至其目標,也就是這個範例中的 ActionBlock<TInput> 物件。 這兩個 JoinBlock<T1,T2> 物件都會從 MemoryResource
物件的共用集區讀取。
using System;
using System.Threading;
using System.Threading.Tasks.Dataflow;
// Demonstrates how to use non-greedy join blocks to distribute
// resources among a dataflow network.
class Program
{
// Represents a resource. A derived class might represent
// a limited resource such as a memory, network, or I/O
// device.
abstract class Resource
{
}
// Represents a memory resource. For brevity, the details of
// this class are omitted.
class MemoryResource : Resource
{
}
// Represents a network resource. For brevity, the details of
// this class are omitted.
class NetworkResource : Resource
{
}
// Represents a file resource. For brevity, the details of
// this class are omitted.
class FileResource : Resource
{
}
static void Main(string[] args)
{
// Create three BufferBlock<T> objects. Each object holds a different
// type of resource.
var networkResources = new BufferBlock<NetworkResource>();
var fileResources = new BufferBlock<FileResource>();
var memoryResources = new BufferBlock<MemoryResource>();
// Create two non-greedy JoinBlock<T1, T2> objects.
// The first join works with network and memory resources;
// the second pool works with file and memory resources.
var joinNetworkAndMemoryResources =
new JoinBlock<NetworkResource, MemoryResource>(
new GroupingDataflowBlockOptions
{
Greedy = false
});
var joinFileAndMemoryResources =
new JoinBlock<FileResource, MemoryResource>(
new GroupingDataflowBlockOptions
{
Greedy = false
});
// Create two ActionBlock<T> objects.
// The first block acts on a network resource and a memory resource.
// The second block acts on a file resource and a memory resource.
var networkMemoryAction =
new ActionBlock<Tuple<NetworkResource, MemoryResource>>(
data =>
{
// Perform some action on the resources.
// Print a message.
Console.WriteLine("Network worker: using resources...");
// Simulate a lengthy operation that uses the resources.
Thread.Sleep(new Random().Next(500, 2000));
// Print a message.
Console.WriteLine("Network worker: finished using resources...");
// Release the resources back to their respective pools.
networkResources.Post(data.Item1);
memoryResources.Post(data.Item2);
});
var fileMemoryAction =
new ActionBlock<Tuple<FileResource, MemoryResource>>(
data =>
{
// Perform some action on the resources.
// Print a message.
Console.WriteLine("File worker: using resources...");
// Simulate a lengthy operation that uses the resources.
Thread.Sleep(new Random().Next(500, 2000));
// Print a message.
Console.WriteLine("File worker: finished using resources...");
// Release the resources back to their respective pools.
fileResources.Post(data.Item1);
memoryResources.Post(data.Item2);
});
// Link the resource pools to the JoinBlock<T1, T2> objects.
// Because these join blocks operate in non-greedy mode, they do not
// take the resource from a pool until all resources are available from
// all pools.
networkResources.LinkTo(joinNetworkAndMemoryResources.Target1);
memoryResources.LinkTo(joinNetworkAndMemoryResources.Target2);
fileResources.LinkTo(joinFileAndMemoryResources.Target1);
memoryResources.LinkTo(joinFileAndMemoryResources.Target2);
// Link the JoinBlock<T1, T2> objects to the ActionBlock<T> objects.
joinNetworkAndMemoryResources.LinkTo(networkMemoryAction);
joinFileAndMemoryResources.LinkTo(fileMemoryAction);
// Populate the resource pools. In this example, network and
// file resources are more abundant than memory resources.
networkResources.Post(new NetworkResource());
networkResources.Post(new NetworkResource());
networkResources.Post(new NetworkResource());
memoryResources.Post(new MemoryResource());
fileResources.Post(new FileResource());
fileResources.Post(new FileResource());
fileResources.Post(new FileResource());
// Allow data to flow through the network for several seconds.
Thread.Sleep(10000);
}
}
/* Sample output:
File worker: using resources...
File worker: finished using resources...
Network worker: using resources...
Network worker: finished using resources...
File worker: using resources...
File worker: finished using resources...
Network worker: using resources...
Network worker: finished using resources...
File worker: using resources...
File worker: finished using resources...
File worker: using resources...
File worker: finished using resources...
Network worker: using resources...
Network worker: finished using resources...
Network worker: using resources...
Network worker: finished using resources...
File worker: using resources...
*/
Imports System.Threading
Imports System.Threading.Tasks.Dataflow
' Demonstrates how to use non-greedy join blocks to distribute
' resources among a dataflow network.
Friend Class Program
' Represents a resource. A derived class might represent
' a limited resource such as a memory, network, or I/O
' device.
Private MustInherit Class Resource
End Class
' Represents a memory resource. For brevity, the details of
' this class are omitted.
Private Class MemoryResource
Inherits Resource
End Class
' Represents a network resource. For brevity, the details of
' this class are omitted.
Private Class NetworkResource
Inherits Resource
End Class
' Represents a file resource. For brevity, the details of
' this class are omitted.
Private Class FileResource
Inherits Resource
End Class
Shared Sub Main(ByVal args() As String)
' Create three BufferBlock<T> objects. Each object holds a different
' type of resource.
Dim networkResources = New BufferBlock(Of NetworkResource)()
Dim fileResources = New BufferBlock(Of FileResource)()
Dim memoryResources = New BufferBlock(Of MemoryResource)()
' Create two non-greedy JoinBlock<T1, T2> objects.
' The first join works with network and memory resources;
' the second pool works with file and memory resources.
Dim joinNetworkAndMemoryResources = New JoinBlock(Of NetworkResource, MemoryResource)(New GroupingDataflowBlockOptions With {.Greedy = False})
Dim joinFileAndMemoryResources = New JoinBlock(Of FileResource, MemoryResource)(New GroupingDataflowBlockOptions With {.Greedy = False})
' Create two ActionBlock<T> objects.
' The first block acts on a network resource and a memory resource.
' The second block acts on a file resource and a memory resource.
Dim networkMemoryAction = New ActionBlock(Of Tuple(Of NetworkResource, MemoryResource))(Sub(data)
' Perform some action on the resources.
' Print a message.
' Simulate a lengthy operation that uses the resources.
' Print a message.
' Release the resources back to their respective pools.
Console.WriteLine("Network worker: using resources...")
Thread.Sleep(New Random().Next(500, 2000))
Console.WriteLine("Network worker: finished using resources...")
networkResources.Post(data.Item1)
memoryResources.Post(data.Item2)
End Sub)
Dim fileMemoryAction = New ActionBlock(Of Tuple(Of FileResource, MemoryResource))(Sub(data)
' Perform some action on the resources.
' Print a message.
' Simulate a lengthy operation that uses the resources.
' Print a message.
' Release the resources back to their respective pools.
Console.WriteLine("File worker: using resources...")
Thread.Sleep(New Random().Next(500, 2000))
Console.WriteLine("File worker: finished using resources...")
fileResources.Post(data.Item1)
memoryResources.Post(data.Item2)
End Sub)
' Link the resource pools to the JoinBlock<T1, T2> objects.
' Because these join blocks operate in non-greedy mode, they do not
' take the resource from a pool until all resources are available from
' all pools.
networkResources.LinkTo(joinNetworkAndMemoryResources.Target1)
memoryResources.LinkTo(joinNetworkAndMemoryResources.Target2)
fileResources.LinkTo(joinFileAndMemoryResources.Target1)
memoryResources.LinkTo(joinFileAndMemoryResources.Target2)
' Link the JoinBlock<T1, T2> objects to the ActionBlock<T> objects.
joinNetworkAndMemoryResources.LinkTo(networkMemoryAction)
joinFileAndMemoryResources.LinkTo(fileMemoryAction)
' Populate the resource pools. In this example, network and
' file resources are more abundant than memory resources.
networkResources.Post(New NetworkResource())
networkResources.Post(New NetworkResource())
networkResources.Post(New NetworkResource())
memoryResources.Post(New MemoryResource())
fileResources.Post(New FileResource())
fileResources.Post(New FileResource())
fileResources.Post(New FileResource())
' Allow data to flow through the network for several seconds.
Thread.Sleep(10000)
End Sub
End Class
' Sample output:
'File worker: using resources...
'File worker: finished using resources...
'Network worker: using resources...
'Network worker: finished using resources...
'File worker: using resources...
'File worker: finished using resources...
'Network worker: using resources...
'Network worker: finished using resources...
'File worker: using resources...
'File worker: finished using resources...
'File worker: using resources...
'File worker: finished using resources...
'Network worker: using resources...
'Network worker: finished using resources...
'Network worker: using resources...
'Network worker: finished using resources...
'File worker: using resources...
'
為了要讓 MemoryResource
物件共用集區的使用更有效率,這個範例會指定 GroupingDataflowBlockOptions 物件並將其 Greedy 屬性設定為 False
,建立以非窮盡模式執行的 JoinBlock<T1,T2> 物件。 非窮盡聯結區塊會延後所有傳入訊息,直到每個來源都有一個為止。 如果有任一個延後的訊息被另一個區塊所接受,聯結區塊就會重新啟動處理序。 非窮盡模式會在其他區塊等待資料的同時,讓共用一個或多個來源區塊的聯結區塊往前進行。 在這個範例中,如果 MemoryResource
物件已加入至 memoryResources
集區,則要接收其第二個資料來源的第一個聯結區塊就能繼續往前進行。 如果這個範例是使用窮盡模式,也就是預設值,則聯結區塊可能會採用 MemoryResource
物件並等待可供使用的第二個資源。 不過,如果其他聯結區塊都有自己的第二個資料來源可用,就無法繼續往前進行,因為 MemoryResource
物件已由另一個聯結區塊佔用。
穩固程式設計
使用非窮盡聯結還可以協助您防止應用程式中發生死結。 在軟體應用程式中,當兩個或多個處理序都保留資源,且互相等候另一個處理序釋放某些其他資源時,就會發生「死結」。 假設有一個應用程式定義了兩個 JoinBlock<T1,T2> 物件。 這兩個物件都會從兩個共用來源區塊讀取資料。 在窮盡模式下,如果其中一個聯結區塊是從第一個來源讀取,而第二個聯結區塊是從第二個來源讀取,則應用程式可能會發生死結,因為這兩個聯結區塊會互相等候彼此釋放其資源。 在非窮盡模式下,每個聯結區塊只會在所有資料都可用時才從其來源讀取,因此可排除死結的情況發生。