Partager via


Procédure pas à pas : création d’un pipeline de dataflow

Bien que vous puissiez utiliser les méthodes DataflowBlock.Receive, DataflowBlock.ReceiveAsync et DataflowBlock.TryReceive pour recevoir des messages des blocs sources, vous pouvez également connecter des blocs de messages pour former un pipeline de flux de données. Un pipeline de flux de données est une série de composants, ou de blocs de flux de données, qui effectuent chacun une tâche spécifique qui contribue à un plus grand objectif. Chaque bloc de flux de données d'un pipeline de flux de données effectue un travail lorsqu'il reçoit un message d'un autre bloc de flux de données. Ce processus s'apparente à une chaîne de montage en construction automobile. Comme chaque véhicule passe via la ligne de montage, un poste assemble le châssis, le suivant installe le moteur, et ainsi de suite. Étant donné qu'une ligne d'assemblage permet à plusieurs véhicules d'être assemblés en même temps, cela fournit une productivité supérieure à l'assemblage un par un des véhicules.

Ce document montre un pipeline de flux de données qui télécharge le livre L’Illiade d’Homère à partir d’un site web et recherche dans le texte les mots dont l’inversion des caractères permet d’obtenir un autre mot. La formation du pipeline de flux de données dans ce document comprend les étapes suivantes :

  1. Créez des blocs de flux de données qui participent au pipeline.

  2. Connectez chaque bloc de flux de données au bloc suivant dans le pipeline. Chaque bloc reçoit comme entrée la sortie du bloc précédent du pipeline.

  3. Pour chaque bloc de flux de données, créez une tâche de continuation qui définit le bloc suivant à l’état arrêté après que le bloc précédent ait terminé.

  4. Publiez les données au début du pipeline.

  5. Marquez le début du pipeline comme terminé.

  6. Attendez que le pipeline termine tous les travaux.

Prérequis

Lisez la rubrique Flux de données avant de démarrer cette procédure pas à pas.

Création d'une application console

Dans Visual Studio, créez un projet Application console en Visual C# ou en Visual Basic. Installez le package NuGet System.Threading.Tasks.Dataflow.

Notes

La bibliothèque de flux de données TPL (espace de noms System.Threading.Tasks.Dataflow) n'est pas distribuée avec .NET. Pour installer l’espace de noms System.Threading.Tasks.Dataflow dans Visual Studio, ouvrez votre projet, choisissez Gérer les packages NuGet dans le menu Projet, puis recherchez en ligne le package System.Threading.Tasks.Dataflow. Vous pouvez également l’installer à l’aide de l’interface CLI .NET Core en exécutant dotnet add package System.Threading.Tasks.Dataflow.

Ajoutez le code suivant à votre projet pour créer l'application de base.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to create a basic dataflow pipeline.
// This program downloads the book "The Iliad of Homer" by Homer from the Web
// and finds all reversed words that appear in that book.
static class Program
{
   static void Main()
   {
   }
}
Imports System.Net.Http
Imports System.Threading.Tasks.Dataflow

' Demonstrates how to create a basic dataflow pipeline.
' This program downloads the book "The Iliad of Homer" by Homer from the Web 
' and finds all reversed words that appear in that book.
Module DataflowReversedWords

    Sub Main()
    End Sub

End Module

Création des blocs de flux de données

Ajoutez le code suivant à la méthode Main pour créer des blocs de flux de données qui participent au pipeline. Le tableau suivant résume le rôle de chaque membre du pipeline.

//
// Create the members of the pipeline.
//

// Downloads the requested resource as a string.
var downloadString = new TransformBlock<string, string>(async uri =>
{
   Console.WriteLine("Downloading '{0}'...", uri);

   return await new HttpClient(new HttpClientHandler{ AutomaticDecompression = System.Net.DecompressionMethods.GZip }).GetStringAsync(uri);
});

// Separates the specified text into an array of words.
var createWordList = new TransformBlock<string, string[]>(text =>
{
   Console.WriteLine("Creating word list...");

   // Remove common punctuation by replacing all non-letter characters
   // with a space character.
   char[] tokens = text.Select(c => char.IsLetter(c) ? c : ' ').ToArray();
   text = new string(tokens);

   // Separate the text into an array of words.
   return text.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
});

// Removes short words and duplicates.
var filterWordList = new TransformBlock<string[], string[]>(words =>
{
   Console.WriteLine("Filtering word list...");

   return words
      .Where(word => word.Length > 3)
      .Distinct()
      .ToArray();
});

// Finds all words in the specified collection whose reverse also
// exists in the collection.
var findReversedWords = new TransformManyBlock<string[], string>(words =>
{
   Console.WriteLine("Finding reversed words...");

   var wordsSet = new HashSet<string>(words);

   return from word in words.AsParallel()
          let reverse = new string(word.Reverse().ToArray())
          where word != reverse && wordsSet.Contains(reverse)
          select word;
});

// Prints the provided reversed words to the console.
var printReversedWords = new ActionBlock<string>(reversedWord =>
{
   Console.WriteLine("Found reversed words {0}/{1}",
      reversedWord, new string(reversedWord.Reverse().ToArray()));
});
'
' Create the members of the pipeline.
' 

' Downloads the requested resource as a string.
Dim downloadString = New TransformBlock(Of String, String)(
    Async Function(uri)
        Console.WriteLine("Downloading '{0}'...", uri)

        Return Await New HttpClient().GetStringAsync(uri)
    End Function)

' Separates the specified text into an array of words.
Dim createWordList = New TransformBlock(Of String, String())(
   Function(text)
       Console.WriteLine("Creating word list...")

     ' Remove common punctuation by replacing all non-letter characters 
     ' with a space character.
     Dim tokens() As Char = text.Select(Function(c) If(Char.IsLetter(c), c, " "c)).ToArray()
       text = New String(tokens)

     ' Separate the text into an array of words.
     Return text.Split(New Char() {" "c}, StringSplitOptions.RemoveEmptyEntries)
   End Function)

' Removes short words and duplicates.
Dim filterWordList = New TransformBlock(Of String(), String())(
   Function(words)
       Console.WriteLine("Filtering word list...")

       Return words.Where(Function(word) word.Length > 3).Distinct().ToArray()
   End Function)

' Finds all words in the specified collection whose reverse also 
' exists in the collection.
Dim findReversedWords = New TransformManyBlock(Of String(), String)(
   Function(words)

       Dim wordsSet = New HashSet(Of String)(words)

       Return From word In words.AsParallel()
              Let reverse = New String(word.Reverse().ToArray())
              Where word <> reverse AndAlso wordsSet.Contains(reverse)
              Select word
   End Function)

' Prints the provided reversed words to the console.    
Dim printReversedWords = New ActionBlock(Of String)(
   Sub(reversedWord)
       Console.WriteLine("Found reversed words {0}/{1}", reversedWord, New String(reversedWord.Reverse().ToArray()))
   End Sub)
Membre Type Description
downloadString TransformBlock<TInput,TOutput> Télécharge le texte du livre depuis le Web.
createWordList TransformBlock<TInput,TOutput> Sépare le texte du livre dans un tableau de mots.
filterWordList TransformBlock<TInput,TOutput> Supprime les mots courts et les doublons du tableau de mots.
findReversedWords TransformManyBlock<TInput,TOutput> Recherche tous les mots dans la collection filtrée de tableau de mots dont le changement se produit également dans le tableau de mots.
printReversedWords ActionBlock<TInput> Affiche les mots et les mots inversés correspondants dans la console.

Bien que vous puissiez combiner plusieurs étapes de cet exemple dans le pipeline de flux de données en une étape, l’exemple illustre le concept de composition de plusieurs tâches distinctes de flux de données pour effectuer une plus grande tâche. L'exemple utilise TransformBlock<TInput,TOutput> pour permettre à chaque membre du pipeline d'exécuter une opération sur les données d'entrée et d'envoyer les résultats à l'étape suivante dans le pipeline. Le membre findReversedWords du pipeline est un objet TransformManyBlock<TInput,TOutput> car il génère des sorties multiples indépendantes pour chaque entrée. La queue du pipeline, printReversedWords, est un objet ActionBlock<TInput> car elle exécute une action sur son entrée, et ne produit aucun résultat.

Formation du pipeline

Ajoutez le code suivant pour adapter chaque bloc au bloc suivant dans le pipeline.

Lorsque vous appelez la méthode LinkTo pour adapter un bloc source de flux de données à un bloc cible de flux de données, le bloc source de flux de données se propage au bloc cible lorsque les données sont disponibles. Si vous spécifiez également DataflowLinkOptions avec PropagateCompletion défini sur true, la réussite ou l’échec de l’achèvement d’un bloc dans le pipeline entraînera l’achèvement du bloc suivant dans le pipeline.

//
// Connect the dataflow blocks to form a pipeline.
//

var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };

downloadString.LinkTo(createWordList, linkOptions);
createWordList.LinkTo(filterWordList, linkOptions);
filterWordList.LinkTo(findReversedWords, linkOptions);
findReversedWords.LinkTo(printReversedWords, linkOptions);
'
' Connect the dataflow blocks to form a pipeline.
'

Dim linkOptions = New DataflowLinkOptions With {.PropagateCompletion = True}

downloadString.LinkTo(createWordList, linkOptions)
createWordList.LinkTo(filterWordList, linkOptions)
filterWordList.LinkTo(findReversedWords, linkOptions)
findReversedWords.LinkTo(printReversedWords, linkOptions)

Publication des données du pipeline

Ajoutez le code suivant pour publier l'URL du livre de l'Iliade d'Homère au début du pipeline de flux de données.

// Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt");
' Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt")

Cet exemple utilise DataflowBlock.Post pour envoyer des données de façon synchrone au début du pipeline. Utilisez la méthode DataflowBlock.SendAsync lorsque vous devez envoyer de manière asynchrone des données à un nœud de flux de données.

Fermeture de l'activité du pipeline

Ajoutez le code suivant pour indiquer que le début du pipeline est terminé. Le début du pipeline propage son achèvement lorsqu’il a traité tous les messages mis en mémoire tampon.

// Mark the head of the pipeline as complete.
downloadString.Complete();
' Mark the head of the pipeline as complete.
downloadString.Complete()

Cet exemple envoie une URL via le pipeline de flux de données à traiter. Si vous devez envoyer plusieurs entrées par un pipeline, appelez la méthode IDataflowBlock.Complete après avoir soumis toutes les entrées. Vous pouvez omettre cette étape si votre application n'a pas de points bien définis à partir desquels les données ne sont plus disponibles ou si l'application n'a pas à attendre que le pipeline se termine.

Attente de la fermeture du pipeline

Ajoutez le code suivant pour attendre que le pipeline se termine. L’opération globale se termine lorsque la fin du pipeline est atteinte.

// Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait();
' Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait()

Vous pouvez attendre la fin de flux de données de tous les threads ou de plusieurs threads simultanément.

Exemple complet

L'exemple suivant présente le code complet pour cette visite.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to create a basic dataflow pipeline.
// This program downloads the book "The Iliad of Homer" by Homer from the Web
// and finds all reversed words that appear in that book.
static class DataflowReversedWords
{
   static void Main()
   {
      //
      // Create the members of the pipeline.
      //

      // Downloads the requested resource as a string.
      var downloadString = new TransformBlock<string, string>(async uri =>
      {
         Console.WriteLine("Downloading '{0}'...", uri);

         return await new HttpClient(new HttpClientHandler{ AutomaticDecompression = System.Net.DecompressionMethods.GZip }).GetStringAsync(uri);
      });

      // Separates the specified text into an array of words.
      var createWordList = new TransformBlock<string, string[]>(text =>
      {
         Console.WriteLine("Creating word list...");

         // Remove common punctuation by replacing all non-letter characters
         // with a space character.
         char[] tokens = text.Select(c => char.IsLetter(c) ? c : ' ').ToArray();
         text = new string(tokens);

         // Separate the text into an array of words.
         return text.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
      });

      // Removes short words and duplicates.
      var filterWordList = new TransformBlock<string[], string[]>(words =>
      {
         Console.WriteLine("Filtering word list...");

         return words
            .Where(word => word.Length > 3)
            .Distinct()
            .ToArray();
      });

      // Finds all words in the specified collection whose reverse also
      // exists in the collection.
      var findReversedWords = new TransformManyBlock<string[], string>(words =>
      {
         Console.WriteLine("Finding reversed words...");

         var wordsSet = new HashSet<string>(words);

         return from word in words.AsParallel()
                let reverse = new string(word.Reverse().ToArray())
                where word != reverse && wordsSet.Contains(reverse)
                select word;
      });

      // Prints the provided reversed words to the console.
      var printReversedWords = new ActionBlock<string>(reversedWord =>
      {
         Console.WriteLine("Found reversed words {0}/{1}",
            reversedWord, new string(reversedWord.Reverse().ToArray()));
      });

      //
      // Connect the dataflow blocks to form a pipeline.
      //

      var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };

      downloadString.LinkTo(createWordList, linkOptions);
      createWordList.LinkTo(filterWordList, linkOptions);
      filterWordList.LinkTo(findReversedWords, linkOptions);
      findReversedWords.LinkTo(printReversedWords, linkOptions);

      // Process "The Iliad of Homer" by Homer.
      downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt");

      // Mark the head of the pipeline as complete.
      downloadString.Complete();

      // Wait for the last block in the pipeline to process all messages.
      printReversedWords.Completion.Wait();
   }
}
/* Sample output:
   Downloading 'http://www.gutenberg.org/cache/epub/16452/pg16452.txt'...
   Creating word list...
   Filtering word list...
   Finding reversed words...
   Found reversed words doom/mood
   Found reversed words draw/ward
   Found reversed words aera/area
   Found reversed words seat/taes
   Found reversed words live/evil
   Found reversed words port/trop
   Found reversed words sleek/keels
   Found reversed words area/aera
   Found reversed words tops/spot
   Found reversed words evil/live
   Found reversed words mood/doom
   Found reversed words speed/deeps
   Found reversed words moor/room
   Found reversed words trop/port
   Found reversed words spot/tops
   Found reversed words spots/stops
   Found reversed words stops/spots
   Found reversed words reed/deer
   Found reversed words keels/sleek
   Found reversed words deeps/speed
   Found reversed words deer/reed
   Found reversed words taes/seat
   Found reversed words room/moor
   Found reversed words ward/draw
*/
Imports System.Net.Http
Imports System.Threading.Tasks.Dataflow

' Demonstrates how to create a basic dataflow pipeline.
' This program downloads the book "The Iliad of Homer" by Homer from the Web 
' and finds all reversed words that appear in that book.
Module DataflowReversedWords

    Sub Main()
        '
        ' Create the members of the pipeline.
        ' 

        ' Downloads the requested resource as a string.
        Dim downloadString = New TransformBlock(Of String, String)(
            Async Function(uri)
                Console.WriteLine("Downloading '{0}'...", uri)

                Return Await New HttpClient().GetStringAsync(uri)
            End Function)

        ' Separates the specified text into an array of words.
        Dim createWordList = New TransformBlock(Of String, String())(
           Function(text)
               Console.WriteLine("Creating word list...")

             ' Remove common punctuation by replacing all non-letter characters 
             ' with a space character.
             Dim tokens() As Char = text.Select(Function(c) If(Char.IsLetter(c), c, " "c)).ToArray()
               text = New String(tokens)

             ' Separate the text into an array of words.
             Return text.Split(New Char() {" "c}, StringSplitOptions.RemoveEmptyEntries)
           End Function)

        ' Removes short words and duplicates.
        Dim filterWordList = New TransformBlock(Of String(), String())(
           Function(words)
               Console.WriteLine("Filtering word list...")

               Return words.Where(Function(word) word.Length > 3).Distinct().ToArray()
           End Function)

        ' Finds all words in the specified collection whose reverse also 
        ' exists in the collection.
        Dim findReversedWords = New TransformManyBlock(Of String(), String)(
           Function(words)

               Dim wordsSet = New HashSet(Of String)(words)

               Return From word In words.AsParallel()
                      Let reverse = New String(word.Reverse().ToArray())
                      Where word <> reverse AndAlso wordsSet.Contains(reverse)
                      Select word
           End Function)

        ' Prints the provided reversed words to the console.    
        Dim printReversedWords = New ActionBlock(Of String)(
           Sub(reversedWord)
               Console.WriteLine("Found reversed words {0}/{1}", reversedWord, New String(reversedWord.Reverse().ToArray()))
           End Sub)

        '
        ' Connect the dataflow blocks to form a pipeline.
        '

        Dim linkOptions = New DataflowLinkOptions With {.PropagateCompletion = True}

        downloadString.LinkTo(createWordList, linkOptions)
        createWordList.LinkTo(filterWordList, linkOptions)
        filterWordList.LinkTo(findReversedWords, linkOptions)
        findReversedWords.LinkTo(printReversedWords, linkOptions)

        ' Process "The Iliad of Homer" by Homer.
        downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt")

        ' Mark the head of the pipeline as complete.
        downloadString.Complete()

        ' Wait for the last block in the pipeline to process all messages.
        printReversedWords.Completion.Wait()
    End Sub

End Module

' Sample output:
'Downloading 'http://www.gutenberg.org/cache/epub/16452/pg16452.txt'...
'Creating word list...
'Filtering word list...
'Finding reversed words...
'Found reversed words aera/area
'Found reversed words doom/mood
'Found reversed words draw/ward
'Found reversed words live/evil
'Found reversed words seat/taes
'Found reversed words area/aera
'Found reversed words port/trop
'Found reversed words sleek/keels
'Found reversed words tops/spot
'Found reversed words evil/live
'Found reversed words speed/deeps
'Found reversed words mood/doom
'Found reversed words moor/room
'Found reversed words spot/tops
'Found reversed words spots/stops
'Found reversed words trop/port
'Found reversed words stops/spots
'Found reversed words reed/deer
'Found reversed words deeps/speed
'Found reversed words deer/reed
'Found reversed words taes/seat
'Found reversed words keels/sleek
'Found reversed words room/moor
'Found reversed words ward/draw

Étapes suivantes

Cet exemple envoie une URL à traiter via le pipeline de flux de données. Si vous devez envoyer plusieurs valeurs d'entrée via le pipeline, vous pouvez introduire un forme de parallélisme dans votre application similaire à la façon dont des parties peuvent parcourir une fabrique d'automobiles. Lorsque le premier membre du pipeline envoie son résultat au deuxième membre, il peut traiter un autre élément en parallèle alors que le deuxième membre traite le premier résultat.

Le parallélisme qui est effectué à l’aide de pipelines de flux de données s’appelle le parallélisme de granularité grossière parce qu’il comprend généralement moins de tâches, mais plus grosses. Vous pouvez également utiliser le parallélisme de granularité fine de plus petites tâches de courte durée dans un pipeline de flux de données. Dans cet exemple, le membre findReversedWords du pipeline utilise PLINQ pour traiter plusieurs éléments dans la liste des travaux en parallèle. L'utilisation du parallélisme de granularité fine dans un pipeline de granularité grossière peut améliorer le débit global.

Vous pouvez également adapter un bloc de flux de données source à plusieurs blocs cibles pour créer un réseau de flux de données. La version surchargée de la méthode LinkTo accepte un objet Predicate<T> qui définit si le bloc cible reçoit les messages en fonction de sa valeur. La plupart des types de bloc de flux de données qui agissent comme sources envoient des messages à toutes les blocs cibles connectés, dans l'ordre dans lequel ils ont été connectés, jusqu'à ce que l'un des blocs reçoive ce message. En utilisant ce mécanisme de filtrage, vous pouvez créer des systèmes de blocs de flux de données connectés qui dirigent certaines données via un seul tracé et d’autres données via un autre tracé. Pour obtenir un exemple qui utilise le filtrage afin de créer un réseau de flux de données, consultez Procédure pas à pas : utilisation de flux de données dans une application Windows Forms.

Voir aussi