Procédure : Utiliser JoinBlock pour lire des données de plusieurs sources
Ce document explique comment utiliser la classe JoinBlock<T1,T2> pour effectuer une opération lorsque des données sont disponibles à partir de plusieurs sources. Il présente aussi comment utiliser le mode non gourmand pour permettre à plusieurs blocs de jointure de partager plus efficacement une source de données.
Notes
La bibliothèque de flux de données TPL (espace de noms System.Threading.Tasks.Dataflow) n'est pas distribuée avec .NET. Pour installer l’espace de noms System.Threading.Tasks.Dataflow dans Visual Studio, ouvrez votre projet, choisissez Gérer les packages NuGet dans le menu Projet, puis recherchez en ligne le package System.Threading.Tasks.Dataflow
. Vous pouvez également l’installer à l’aide de l’interface CLI .NET Core en exécutant dotnet add package System.Threading.Tasks.Dataflow
.
Exemple
L'exemple suivant définit trois types de ressources, NetworkResource
, FileResource
et MemoryResource
, et effectue des opérations lorsque les ressources sont disponibles. Cet exemple nécessite une paire NetworkResource
et MemoryResource
pour effectuer la première opération et une paire FileResource
et MemoryResource
pour effectuer la seconde opération. Pour permettre aux opérations de se produire lorsque toutes les ressources requises sont disponibles, cet exemple utilise la classe JoinBlock<T1,T2>. Lorsqu'un objet JoinBlock<T1,T2> reçoit les données de toutes les sources, il envoie ces données à la cible, qui dans cet exemple est un objet ActionBlock<TInput>. Les objets JoinBlock<T1,T2> lisent à partir d'un pool partagé d'objets MemoryResource
.
using System;
using System.Threading;
using System.Threading.Tasks.Dataflow;
// Demonstrates how to use non-greedy join blocks to distribute
// resources among a dataflow network.
class Program
{
// Represents a resource. A derived class might represent
// a limited resource such as a memory, network, or I/O
// device.
abstract class Resource
{
}
// Represents a memory resource. For brevity, the details of
// this class are omitted.
class MemoryResource : Resource
{
}
// Represents a network resource. For brevity, the details of
// this class are omitted.
class NetworkResource : Resource
{
}
// Represents a file resource. For brevity, the details of
// this class are omitted.
class FileResource : Resource
{
}
static void Main(string[] args)
{
// Create three BufferBlock<T> objects. Each object holds a different
// type of resource.
var networkResources = new BufferBlock<NetworkResource>();
var fileResources = new BufferBlock<FileResource>();
var memoryResources = new BufferBlock<MemoryResource>();
// Create two non-greedy JoinBlock<T1, T2> objects.
// The first join works with network and memory resources;
// the second pool works with file and memory resources.
var joinNetworkAndMemoryResources =
new JoinBlock<NetworkResource, MemoryResource>(
new GroupingDataflowBlockOptions
{
Greedy = false
});
var joinFileAndMemoryResources =
new JoinBlock<FileResource, MemoryResource>(
new GroupingDataflowBlockOptions
{
Greedy = false
});
// Create two ActionBlock<T> objects.
// The first block acts on a network resource and a memory resource.
// The second block acts on a file resource and a memory resource.
var networkMemoryAction =
new ActionBlock<Tuple<NetworkResource, MemoryResource>>(
data =>
{
// Perform some action on the resources.
// Print a message.
Console.WriteLine("Network worker: using resources...");
// Simulate a lengthy operation that uses the resources.
Thread.Sleep(new Random().Next(500, 2000));
// Print a message.
Console.WriteLine("Network worker: finished using resources...");
// Release the resources back to their respective pools.
networkResources.Post(data.Item1);
memoryResources.Post(data.Item2);
});
var fileMemoryAction =
new ActionBlock<Tuple<FileResource, MemoryResource>>(
data =>
{
// Perform some action on the resources.
// Print a message.
Console.WriteLine("File worker: using resources...");
// Simulate a lengthy operation that uses the resources.
Thread.Sleep(new Random().Next(500, 2000));
// Print a message.
Console.WriteLine("File worker: finished using resources...");
// Release the resources back to their respective pools.
fileResources.Post(data.Item1);
memoryResources.Post(data.Item2);
});
// Link the resource pools to the JoinBlock<T1, T2> objects.
// Because these join blocks operate in non-greedy mode, they do not
// take the resource from a pool until all resources are available from
// all pools.
networkResources.LinkTo(joinNetworkAndMemoryResources.Target1);
memoryResources.LinkTo(joinNetworkAndMemoryResources.Target2);
fileResources.LinkTo(joinFileAndMemoryResources.Target1);
memoryResources.LinkTo(joinFileAndMemoryResources.Target2);
// Link the JoinBlock<T1, T2> objects to the ActionBlock<T> objects.
joinNetworkAndMemoryResources.LinkTo(networkMemoryAction);
joinFileAndMemoryResources.LinkTo(fileMemoryAction);
// Populate the resource pools. In this example, network and
// file resources are more abundant than memory resources.
networkResources.Post(new NetworkResource());
networkResources.Post(new NetworkResource());
networkResources.Post(new NetworkResource());
memoryResources.Post(new MemoryResource());
fileResources.Post(new FileResource());
fileResources.Post(new FileResource());
fileResources.Post(new FileResource());
// Allow data to flow through the network for several seconds.
Thread.Sleep(10000);
}
}
/* Sample output:
File worker: using resources...
File worker: finished using resources...
Network worker: using resources...
Network worker: finished using resources...
File worker: using resources...
File worker: finished using resources...
Network worker: using resources...
Network worker: finished using resources...
File worker: using resources...
File worker: finished using resources...
File worker: using resources...
File worker: finished using resources...
Network worker: using resources...
Network worker: finished using resources...
Network worker: using resources...
Network worker: finished using resources...
File worker: using resources...
*/
Imports System.Threading
Imports System.Threading.Tasks.Dataflow
' Demonstrates how to use non-greedy join blocks to distribute
' resources among a dataflow network.
Friend Class Program
' Represents a resource. A derived class might represent
' a limited resource such as a memory, network, or I/O
' device.
Private MustInherit Class Resource
End Class
' Represents a memory resource. For brevity, the details of
' this class are omitted.
Private Class MemoryResource
Inherits Resource
End Class
' Represents a network resource. For brevity, the details of
' this class are omitted.
Private Class NetworkResource
Inherits Resource
End Class
' Represents a file resource. For brevity, the details of
' this class are omitted.
Private Class FileResource
Inherits Resource
End Class
Shared Sub Main(ByVal args() As String)
' Create three BufferBlock<T> objects. Each object holds a different
' type of resource.
Dim networkResources = New BufferBlock(Of NetworkResource)()
Dim fileResources = New BufferBlock(Of FileResource)()
Dim memoryResources = New BufferBlock(Of MemoryResource)()
' Create two non-greedy JoinBlock<T1, T2> objects.
' The first join works with network and memory resources;
' the second pool works with file and memory resources.
Dim joinNetworkAndMemoryResources = New JoinBlock(Of NetworkResource, MemoryResource)(New GroupingDataflowBlockOptions With {.Greedy = False})
Dim joinFileAndMemoryResources = New JoinBlock(Of FileResource, MemoryResource)(New GroupingDataflowBlockOptions With {.Greedy = False})
' Create two ActionBlock<T> objects.
' The first block acts on a network resource and a memory resource.
' The second block acts on a file resource and a memory resource.
Dim networkMemoryAction = New ActionBlock(Of Tuple(Of NetworkResource, MemoryResource))(Sub(data)
' Perform some action on the resources.
' Print a message.
' Simulate a lengthy operation that uses the resources.
' Print a message.
' Release the resources back to their respective pools.
Console.WriteLine("Network worker: using resources...")
Thread.Sleep(New Random().Next(500, 2000))
Console.WriteLine("Network worker: finished using resources...")
networkResources.Post(data.Item1)
memoryResources.Post(data.Item2)
End Sub)
Dim fileMemoryAction = New ActionBlock(Of Tuple(Of FileResource, MemoryResource))(Sub(data)
' Perform some action on the resources.
' Print a message.
' Simulate a lengthy operation that uses the resources.
' Print a message.
' Release the resources back to their respective pools.
Console.WriteLine("File worker: using resources...")
Thread.Sleep(New Random().Next(500, 2000))
Console.WriteLine("File worker: finished using resources...")
fileResources.Post(data.Item1)
memoryResources.Post(data.Item2)
End Sub)
' Link the resource pools to the JoinBlock<T1, T2> objects.
' Because these join blocks operate in non-greedy mode, they do not
' take the resource from a pool until all resources are available from
' all pools.
networkResources.LinkTo(joinNetworkAndMemoryResources.Target1)
memoryResources.LinkTo(joinNetworkAndMemoryResources.Target2)
fileResources.LinkTo(joinFileAndMemoryResources.Target1)
memoryResources.LinkTo(joinFileAndMemoryResources.Target2)
' Link the JoinBlock<T1, T2> objects to the ActionBlock<T> objects.
joinNetworkAndMemoryResources.LinkTo(networkMemoryAction)
joinFileAndMemoryResources.LinkTo(fileMemoryAction)
' Populate the resource pools. In this example, network and
' file resources are more abundant than memory resources.
networkResources.Post(New NetworkResource())
networkResources.Post(New NetworkResource())
networkResources.Post(New NetworkResource())
memoryResources.Post(New MemoryResource())
fileResources.Post(New FileResource())
fileResources.Post(New FileResource())
fileResources.Post(New FileResource())
' Allow data to flow through the network for several seconds.
Thread.Sleep(10000)
End Sub
End Class
' Sample output:
'File worker: using resources...
'File worker: finished using resources...
'Network worker: using resources...
'Network worker: finished using resources...
'File worker: using resources...
'File worker: finished using resources...
'Network worker: using resources...
'Network worker: finished using resources...
'File worker: using resources...
'File worker: finished using resources...
'File worker: using resources...
'File worker: finished using resources...
'Network worker: using resources...
'Network worker: finished using resources...
'Network worker: using resources...
'Network worker: finished using resources...
'File worker: using resources...
'
Pour permettre l'utilisation efficace du pool partagé des objets MemoryResource
, cet exemple spécifie un objet GroupingDataflowBlockOptions qui contient le jeu de propriétés Greedy à False
pour créer les objets JoinBlock<T1,T2> qui agissent en mode non gourmand. Un bloc de jointure non-gourmand remet les messages entrants jusqu'à ce qu'il y en ait un disponible à partir de chaque source. Si n'importe quel message remis à plus tard a été reçu par un autre bloc, le bloc de jointure redémarre le processus. Le mode non gourmand permet aux blocs de jointure qui partagent un ou plusieurs blocs sources d'avancer quand les autres blocs attendent des données. Dans cet exemple, si un objet MemoryResource
est ajouté au pool memoryResources
, le premier bloc de jointure peut progresser pour recevoir sa deuxième source de données. Si cet exemple visait à utiliser le mode gourmand, qui est la valeur par défaut, un bloc de jointure peut prendre l'objet MemoryResource
et attendre que la deuxième ressource soit disponible. Toutefois, si l'autre bloc de jointure a sa deuxième source de données disponible, il ne peut pas avancer car l'objet MemoryResource
a été pris par l'autre bloc de jointure.
Programmation fiable
L'utilisation de jointures non gourmandes peut également vous aider à empêcher tout interblocage dans votre application. Dans une application logicielle, un blocage se produit lorsque au moins deux processus comportent chacun une ressource et attendent mutuellement qu’un autre processus en libère une autre. Examinez la requête qui définit deux objets JoinBlock<T1,T2>. Les deux objets lisent chacun des données de deux blocs sources partagés. En mode gourmand, si un bloc de jointure lit depuis la première source et le deuxième bloc de jointure lit depuis la seconde source, l'application peut être interbloquée car les deux blocs de jointure attendent mutuellement l'autre pour libérer sa ressource. En mode non gourmand, chaque bloc de jointure lit uniquement à partir de ses sources lorsque toutes les données sont disponibles, éliminant par conséquent, le risque d'interblocage.