Partager via


Procédure pas à pas : Créer un type de bloc de flux de données personnalisé

Bien que la bibliothèque de flux de données TPL fournisse plusieurs types de blocs de flux de données offrant une large gamme de fonctionnalités, vous pouvez aussi créer des types de blocs personnalisés. Ce document décrit comment créer un type de bloc de flux de données qui implémente un comportement personnalisé.

Prérequis

Consultez Flux de données avant de lire ce document.

Notes

La bibliothèque de flux de données TPL (espace de noms System.Threading.Tasks.Dataflow) n'est pas distribuée avec .NET. Pour installer l’espace de noms System.Threading.Tasks.Dataflow dans Visual Studio, ouvrez votre projet, choisissez Gérer les packages NuGet dans le menu Projet, puis recherchez en ligne le package System.Threading.Tasks.Dataflow. Vous pouvez également l’installer à l’aide de l’interface CLI .NET Core en exécutant dotnet add package System.Threading.Tasks.Dataflow.

Définition du bloc de flux de données d’une fenêtre coulissante

Imaginez une application de flux de données qui exige que les valeurs d’entrée soient mises en mémoire tampon, puis affichées sous la forme d’une fenêtre coulissante. Par exemple, pour les valeurs d’entrée {0, 1, 2, 3, 4, 5} et une taille de fenêtre de 3, un bloc de flux de données de fenêtre coulissante génère les tableaux de sortie {0, 1, 2}, {1, 2, 3}, {2, 3, 4,} et {3, 4, 5}. Les sections suivantes décrivent deux façons de créer un type de bloc de flux de données qui implémente ce comportement personnalisé. La première technique utilise la méthode Encapsulate pour combiner la fonctionnalité d’un objet ISourceBlock<TOutput> et d’un objet ITargetBlock<TInput> dans un bloc propagateur. La seconde technique définit une classe dérivée de IPropagatorBlock<TInput,TOutput> et combine la fonctionnalité existante pour générer le comportement personnalisé.

Utilisation de la méthode d’encapsulage pour définir le bloc de flux d’une fenêtre coulissante

L’exemple suivant utilise la méthode Encapsulate pour créer un bloc propagateur à partir d’une source et d’une cible. Un bloc propagateur permet à un bloc source et à un bloc cible d’agir en tant que destinataire et expéditeur des données.

Cette technique est utile lorsque vous avez besoin d’une fonctionnalité de flux de données personnalisée, mais pas d’un type qui fournit d’autres méthodes, propriétés ou champs.

// Creates a IPropagatorBlock<T, T[]> object propagates data in a
// sliding window fashion.
public static IPropagatorBlock<T, T[]> CreateSlidingWindow<T>(int windowSize)
{
   // Create a queue to hold messages.
   var queue = new Queue<T>();

   // The source part of the propagator holds arrays of size windowSize
   // and propagates data out to any connected targets.
   var source = new BufferBlock<T[]>();

   // The target part receives data and adds them to the queue.
   var target = new ActionBlock<T>(item =>
   {
      // Add the item to the queue.
      queue.Enqueue(item);
      // Remove the oldest item when the queue size exceeds the window size.
      if (queue.Count > windowSize)
         queue.Dequeue();
      // Post the data in the queue to the source block when the queue size
      // equals the window size.
      if (queue.Count == windowSize)
         source.Post(queue.ToArray());
   });

   // When the target is set to the completed state, propagate out any
   // remaining data and set the source to the completed state.
   target.Completion.ContinueWith(delegate
   {
      if (queue.Count > 0 && queue.Count < windowSize)
         source.Post(queue.ToArray());
      source.Complete();
   });

   // Return a IPropagatorBlock<T, T[]> object that encapsulates the
   // target and source blocks.
   return DataflowBlock.Encapsulate(target, source);
}
' Creates a IPropagatorBlock<T, T[]> object propagates data in a 
' sliding window fashion.
Public Shared Function CreateSlidingWindow(Of T)(ByVal windowSize As Integer) As IPropagatorBlock(Of T, T())
    ' Create a queue to hold messages.
    Dim queue = New Queue(Of T)()

    ' The source part of the propagator holds arrays of size windowSize
    ' and propagates data out to any connected targets.
    Dim source = New BufferBlock(Of T())()

    ' The target part receives data and adds them to the queue.
    Dim target = New ActionBlock(Of T)(Sub(item)
                                           ' Add the item to the queue.
                                           ' Remove the oldest item when the queue size exceeds the window size.
                                           ' Post the data in the queue to the source block when the queue size
                                           ' equals the window size.
                                           queue.Enqueue(item)
                                           If queue.Count > windowSize Then
                                               queue.Dequeue()
                                           End If
                                           If queue.Count = windowSize Then
                                               source.Post(queue.ToArray())
                                           End If
                                       End Sub)

    ' When the target is set to the completed state, propagate out any
    ' remaining data and set the source to the completed state.
    target.Completion.ContinueWith(Sub()
                                       If queue.Count > 0 AndAlso queue.Count < windowSize Then
                                           source.Post(queue.ToArray())
                                       End If
                                       source.Complete()
                                   End Sub)

    ' Return a IPropagatorBlock<T, T[]> object that encapsulates the 
    ' target and source blocks.
    Return DataflowBlock.Encapsulate(target, source)
End Function

Dérivation d’IPropagatorBlock pour définir le bloc de flux de données d’une fenêtre coulissante

L’exemple suivant montre la classe SlidingWindowBlock. Cette classe est dérivée IPropagatorBlock<TInput,TOutput> pour pouvoir agir à la fois comme une source et une cible de données. Comme dans l’exemple précédent, la classe SlidingWindowBlock s’appuie sur des types de blocs de flux de données existants. Cependant, la classe SlidingWindowBlock implémente également les méthodes requises par les interfaces ISourceBlock<TOutput>, ITargetBlock<TInput> et IDataflowBlock. Toutes ces méthodes transfèrent le travail aux membres du type de bloc de flux de données prédéfini. Par exemple, la méthode Post transmet le travail au membre de données m_target, qui est également un objet ITargetBlock<TInput>.

Cette technique est utile lorsque vous avez besoin d’une fonctionnalité de flux de données personnalisée, mais également d’un type qui fournit d’autres méthodes, propriétés ou champs. Par exemple, la classe SlidingWindowBlock est également dérivée de IReceivableSourceBlock<TOutput> afin de fournir les méthodes TryReceive et TryReceiveAll. La classe SlidingWindowBlock apporte également une extensibilité en fournissant la propriété WindowSize, qui récupère le nombre d’éléments dans la fenêtre coulissante.

// Propagates data in a sliding window fashion.
public class SlidingWindowBlock<T> : IPropagatorBlock<T, T[]>,
                                     IReceivableSourceBlock<T[]>
{
   // The size of the window.
   private readonly int m_windowSize;
   // The target part of the block.
   private readonly ITargetBlock<T> m_target;
   // The source part of the block.
   private readonly IReceivableSourceBlock<T[]> m_source;

   // Constructs a SlidingWindowBlock object.
   public SlidingWindowBlock(int windowSize)
   {
      // Create a queue to hold messages.
      var queue = new Queue<T>();

      // The source part of the propagator holds arrays of size windowSize
      // and propagates data out to any connected targets.
      var source = new BufferBlock<T[]>();

      // The target part receives data and adds them to the queue.
      var target = new ActionBlock<T>(item =>
      {
         // Add the item to the queue.
         queue.Enqueue(item);
         // Remove the oldest item when the queue size exceeds the window size.
         if (queue.Count > windowSize)
            queue.Dequeue();
         // Post the data in the queue to the source block when the queue size
         // equals the window size.
         if (queue.Count == windowSize)
            source.Post(queue.ToArray());
      });

      // When the target is set to the completed state, propagate out any
      // remaining data and set the source to the completed state.
      target.Completion.ContinueWith(delegate
      {
         if (queue.Count > 0 && queue.Count < windowSize)
            source.Post(queue.ToArray());
         source.Complete();
      });

      m_windowSize = windowSize;
      m_target = target;
      m_source = source;
   }

   // Retrieves the size of the window.
   public int WindowSize { get { return m_windowSize; } }

   #region IReceivableSourceBlock<TOutput> members

   // Attempts to synchronously receive an item from the source.
   public bool TryReceive(Predicate<T[]> filter, out T[] item)
   {
      return m_source.TryReceive(filter, out item);
   }

   // Attempts to remove all available elements from the source into a new
   // array that is returned.
   public bool TryReceiveAll(out IList<T[]> items)
   {
      return m_source.TryReceiveAll(out items);
   }

   #endregion

   #region ISourceBlock<TOutput> members

   // Links this dataflow block to the provided target.
   public IDisposable LinkTo(ITargetBlock<T[]> target, DataflowLinkOptions linkOptions)
   {
      return m_source.LinkTo(target, linkOptions);
   }

   // Called by a target to reserve a message previously offered by a source
   // but not yet consumed by this target.
   bool ISourceBlock<T[]>.ReserveMessage(DataflowMessageHeader messageHeader,
      ITargetBlock<T[]> target)
   {
      return m_source.ReserveMessage(messageHeader, target);
   }

   // Called by a target to consume a previously offered message from a source.
   T[] ISourceBlock<T[]>.ConsumeMessage(DataflowMessageHeader messageHeader,
      ITargetBlock<T[]> target, out bool messageConsumed)
   {
      return m_source.ConsumeMessage(messageHeader,
         target, out messageConsumed);
   }

   // Called by a target to release a previously reserved message from a source.
   void ISourceBlock<T[]>.ReleaseReservation(DataflowMessageHeader messageHeader,
      ITargetBlock<T[]> target)
   {
      m_source.ReleaseReservation(messageHeader, target);
   }

   #endregion

   #region ITargetBlock<TInput> members

   // Asynchronously passes a message to the target block, giving the target the
   // opportunity to consume the message.
   DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader messageHeader,
      T messageValue, ISourceBlock<T> source, bool consumeToAccept)
   {
      return m_target.OfferMessage(messageHeader,
         messageValue, source, consumeToAccept);
   }

   #endregion

   #region IDataflowBlock members

   // Gets a Task that represents the completion of this dataflow block.
   public Task Completion { get { return m_source.Completion; } }

   // Signals to this target block that it should not accept any more messages,
   // nor consume postponed messages.
   public void Complete()
   {
      m_target.Complete();
   }

   public void Fault(Exception error)
   {
      m_target.Fault(error);
   }

   #endregion
}
    ' Propagates data in a sliding window fashion.
    Public Class SlidingWindowBlock(Of T)
        Implements IPropagatorBlock(Of T, T()), IReceivableSourceBlock(Of T())
        ' The size of the window.
        Private ReadOnly m_windowSize As Integer
        ' The target part of the block.
        Private ReadOnly m_target As ITargetBlock(Of T)
        ' The source part of the block.
        Private ReadOnly m_source As IReceivableSourceBlock(Of T())

        ' Constructs a SlidingWindowBlock object.
        Public Sub New(ByVal windowSize As Integer)
            ' Create a queue to hold messages.
            Dim queue = New Queue(Of T)()

            ' The source part of the propagator holds arrays of size windowSize
            ' and propagates data out to any connected targets.
            Dim source = New BufferBlock(Of T())()

            ' The target part receives data and adds them to the queue.
            Dim target = New ActionBlock(Of T)(Sub(item)
                                                   ' Add the item to the queue.
                                                   ' Remove the oldest item when the queue size exceeds the window size.
                                                   ' Post the data in the queue to the source block when the queue size
                                                   ' equals the window size.
                                                   queue.Enqueue(item)
                                                   If queue.Count > windowSize Then
                                                       queue.Dequeue()
                                                   End If
                                                   If queue.Count = windowSize Then
                                                       source.Post(queue.ToArray())
                                                   End If
                                               End Sub)

            ' When the target is set to the completed state, propagate out any
            ' remaining data and set the source to the completed state.
            target.Completion.ContinueWith(Sub()
                                               If queue.Count > 0 AndAlso queue.Count < windowSize Then
                                                   source.Post(queue.ToArray())
                                               End If
                                               source.Complete()
                                           End Sub)

            m_windowSize = windowSize
            m_target = target
            m_source = source
        End Sub

        ' Retrieves the size of the window.
        Public ReadOnly Property WindowSize() As Integer
            Get
                Return m_windowSize
            End Get
        End Property

        '#Region "IReceivableSourceBlock<TOutput> members"

        ' Attempts to synchronously receive an item from the source.
        Public Function TryReceive(ByVal filter As Predicate(Of T()), <System.Runtime.InteropServices.Out()> ByRef item() As T) As Boolean Implements IReceivableSourceBlock(Of T()).TryReceive
            Return m_source.TryReceive(filter, item)
        End Function

        ' Attempts to remove all available elements from the source into a new 
        ' array that is returned.
        Public Function TryReceiveAll(<System.Runtime.InteropServices.Out()> ByRef items As IList(Of T())) As Boolean Implements IReceivableSourceBlock(Of T()).TryReceiveAll
            Return m_source.TryReceiveAll(items)
        End Function

        '#End Region

#Region "ISourceBlock<TOutput> members"

        ' Links this dataflow block to the provided target.
        Public Function LinkTo(ByVal target As ITargetBlock(Of T()), ByVal linkOptions As DataflowLinkOptions) As IDisposable Implements ISourceBlock(Of T()).LinkTo
            Return m_source.LinkTo(target, linkOptions)
        End Function

        ' Called by a target to reserve a message previously offered by a source 
        ' but not yet consumed by this target.
        Private Function ReserveMessage(ByVal messageHeader As DataflowMessageHeader, ByVal target As ITargetBlock(Of T())) As Boolean Implements ISourceBlock(Of T()).ReserveMessage
            Return m_source.ReserveMessage(messageHeader, target)
        End Function

        ' Called by a target to consume a previously offered message from a source.
        Private Function ConsumeMessage(ByVal messageHeader As DataflowMessageHeader, ByVal target As ITargetBlock(Of T()), ByRef messageConsumed As Boolean) As T() Implements ISourceBlock(Of T()).ConsumeMessage
            Return m_source.ConsumeMessage(messageHeader, target, messageConsumed)
        End Function

        ' Called by a target to release a previously reserved message from a source.
        Private Sub ReleaseReservation(ByVal messageHeader As DataflowMessageHeader, ByVal target As ITargetBlock(Of T())) Implements ISourceBlock(Of T()).ReleaseReservation
            m_source.ReleaseReservation(messageHeader, target)
        End Sub

#End Region

#Region "ITargetBlock<TInput> members"

        ' Asynchronously passes a message to the target block, giving the target the 
        ' opportunity to consume the message.
        Private Function OfferMessage(ByVal messageHeader As DataflowMessageHeader, ByVal messageValue As T, ByVal source As ISourceBlock(Of T), ByVal consumeToAccept As Boolean) As DataflowMessageStatus Implements ITargetBlock(Of T).OfferMessage
            Return m_target.OfferMessage(messageHeader, messageValue, source, consumeToAccept)
        End Function

#End Region

#Region "IDataflowBlock members"

        ' Gets a Task that represents the completion of this dataflow block.
        Public ReadOnly Property Completion() As Task Implements IDataflowBlock.Completion
            Get
                Return m_source.Completion
            End Get
        End Property

        ' Signals to this target block that it should not accept any more messages, 
        ' nor consume postponed messages. 
        Public Sub Complete() Implements IDataflowBlock.Complete
            m_target.Complete()
        End Sub

        Public Sub Fault(ByVal [error] As Exception) Implements IDataflowBlock.Fault
            m_target.Fault([error])
        End Sub

#End Region
    End Class

Exemple complet

L'exemple suivant présente le code complet pour cette visite. Il montre également comment utiliser les deux blocs de fenêtre coulissante dans une méthode qui écrit dans le bloc, lit à partir de celui-ci, puis imprime les résultats dans la console.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to create a custom dataflow block type.
class Program
{
   // Creates a IPropagatorBlock<T, T[]> object propagates data in a
   // sliding window fashion.
   public static IPropagatorBlock<T, T[]> CreateSlidingWindow<T>(int windowSize)
   {
      // Create a queue to hold messages.
      var queue = new Queue<T>();

      // The source part of the propagator holds arrays of size windowSize
      // and propagates data out to any connected targets.
      var source = new BufferBlock<T[]>();

      // The target part receives data and adds them to the queue.
      var target = new ActionBlock<T>(item =>
      {
         // Add the item to the queue.
         queue.Enqueue(item);
         // Remove the oldest item when the queue size exceeds the window size.
         if (queue.Count > windowSize)
            queue.Dequeue();
         // Post the data in the queue to the source block when the queue size
         // equals the window size.
         if (queue.Count == windowSize)
            source.Post(queue.ToArray());
      });

      // When the target is set to the completed state, propagate out any
      // remaining data and set the source to the completed state.
      target.Completion.ContinueWith(delegate
      {
         if (queue.Count > 0 && queue.Count < windowSize)
            source.Post(queue.ToArray());
         source.Complete();
      });

      // Return a IPropagatorBlock<T, T[]> object that encapsulates the
      // target and source blocks.
      return DataflowBlock.Encapsulate(target, source);
   }

   // Propagates data in a sliding window fashion.
   public class SlidingWindowBlock<T> : IPropagatorBlock<T, T[]>,
                                        IReceivableSourceBlock<T[]>
   {
      // The size of the window.
      private readonly int m_windowSize;
      // The target part of the block.
      private readonly ITargetBlock<T> m_target;
      // The source part of the block.
      private readonly IReceivableSourceBlock<T[]> m_source;

      // Constructs a SlidingWindowBlock object.
      public SlidingWindowBlock(int windowSize)
      {
         // Create a queue to hold messages.
         var queue = new Queue<T>();

         // The source part of the propagator holds arrays of size windowSize
         // and propagates data out to any connected targets.
         var source = new BufferBlock<T[]>();

         // The target part receives data and adds them to the queue.
         var target = new ActionBlock<T>(item =>
         {
            // Add the item to the queue.
            queue.Enqueue(item);
            // Remove the oldest item when the queue size exceeds the window size.
            if (queue.Count > windowSize)
               queue.Dequeue();
            // Post the data in the queue to the source block when the queue size
            // equals the window size.
            if (queue.Count == windowSize)
               source.Post(queue.ToArray());
         });

         // When the target is set to the completed state, propagate out any
         // remaining data and set the source to the completed state.
         target.Completion.ContinueWith(delegate
         {
            if (queue.Count > 0 && queue.Count < windowSize)
               source.Post(queue.ToArray());
            source.Complete();
         });

         m_windowSize = windowSize;
         m_target = target;
         m_source = source;
      }

      // Retrieves the size of the window.
      public int WindowSize { get { return m_windowSize; } }

      #region IReceivableSourceBlock<TOutput> members

      // Attempts to synchronously receive an item from the source.
      public bool TryReceive(Predicate<T[]> filter, out T[] item)
      {
         return m_source.TryReceive(filter, out item);
      }

      // Attempts to remove all available elements from the source into a new
      // array that is returned.
      public bool TryReceiveAll(out IList<T[]> items)
      {
         return m_source.TryReceiveAll(out items);
      }

      #endregion

      #region ISourceBlock<TOutput> members

      // Links this dataflow block to the provided target.
      public IDisposable LinkTo(ITargetBlock<T[]> target, DataflowLinkOptions linkOptions)
      {
         return m_source.LinkTo(target, linkOptions);
      }

      // Called by a target to reserve a message previously offered by a source
      // but not yet consumed by this target.
      bool ISourceBlock<T[]>.ReserveMessage(DataflowMessageHeader messageHeader,
         ITargetBlock<T[]> target)
      {
         return m_source.ReserveMessage(messageHeader, target);
      }

      // Called by a target to consume a previously offered message from a source.
      T[] ISourceBlock<T[]>.ConsumeMessage(DataflowMessageHeader messageHeader,
         ITargetBlock<T[]> target, out bool messageConsumed)
      {
         return m_source.ConsumeMessage(messageHeader,
            target, out messageConsumed);
      }

      // Called by a target to release a previously reserved message from a source.
      void ISourceBlock<T[]>.ReleaseReservation(DataflowMessageHeader messageHeader,
         ITargetBlock<T[]> target)
      {
         m_source.ReleaseReservation(messageHeader, target);
      }

      #endregion

      #region ITargetBlock<TInput> members

      // Asynchronously passes a message to the target block, giving the target the
      // opportunity to consume the message.
      DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader messageHeader,
         T messageValue, ISourceBlock<T> source, bool consumeToAccept)
      {
         return m_target.OfferMessage(messageHeader,
            messageValue, source, consumeToAccept);
      }

      #endregion

      #region IDataflowBlock members

      // Gets a Task that represents the completion of this dataflow block.
      public Task Completion { get { return m_source.Completion; } }

      // Signals to this target block that it should not accept any more messages,
      // nor consume postponed messages.
      public void Complete()
      {
         m_target.Complete();
      }

      public void Fault(Exception error)
      {
         m_target.Fault(error);
      }

      #endregion
   }

   // Demonstrates usage of the sliding window block by sending the provided
   // values to the provided propagator block and printing the output of
   // that block to the console.
   static void DemonstrateSlidingWindow<T>(IPropagatorBlock<T, T[]> slidingWindow,
      IEnumerable<T> values)
   {
      // Create an action block that prints arrays of data to the console.
      string windowComma = string.Empty;
      var printWindow = new ActionBlock<T[]>(window =>
      {
         Console.Write(windowComma);
         Console.Write("{");

         string comma = string.Empty;
         foreach (T item in window)
         {
            Console.Write(comma);
            Console.Write(item);
            comma = ",";
         }
         Console.Write("}");

         windowComma = ", ";
      });

      // Link the printer block to the sliding window block.
      slidingWindow.LinkTo(printWindow);

      // Set the printer block to the completed state when the sliding window
      // block completes.
      slidingWindow.Completion.ContinueWith(delegate { printWindow.Complete(); });

      // Print an additional newline to the console when the printer block completes.
      var completion = printWindow.Completion.ContinueWith(delegate { Console.WriteLine(); });

      // Post the provided values to the sliding window block and then wait
      // for the sliding window block to complete.
      foreach (T value in values)
      {
         slidingWindow.Post(value);
      }
      slidingWindow.Complete();

      // Wait for the printer to complete and perform its final action.
      completion.Wait();
   }

   static void Main(string[] args)
   {

      Console.Write("Using the DataflowBlockExtensions.Encapsulate method ");
      Console.WriteLine("(T=int, windowSize=3):");
      DemonstrateSlidingWindow(CreateSlidingWindow<int>(3), Enumerable.Range(0, 10));

      Console.WriteLine();

      var slidingWindow = new SlidingWindowBlock<char>(4);

      Console.Write("Using SlidingWindowBlock<T> ");
      Console.WriteLine("(T=char, windowSize={0}):", slidingWindow.WindowSize);
      DemonstrateSlidingWindow(slidingWindow, from n in Enumerable.Range(65, 10)
                                              select (char)n);
   }
}

/* Output:
Using the DataflowBlockExtensions.Encapsulate method (T=int, windowSize=3):
{0,1,2}, {1,2,3}, {2,3,4}, {3,4,5}, {4,5,6}, {5,6,7}, {6,7,8}, {7,8,9}

Using SlidingWindowBlock<T> (T=char, windowSize=4):
{A,B,C,D}, {B,C,D,E}, {C,D,E,F}, {D,E,F,G}, {E,F,G,H}, {F,G,H,I}, {G,H,I,J}
 */
Imports System.Collections.Generic
Imports System.Linq
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow

' Demonstrates how to create a custom dataflow block type.
Friend Class Program
    ' Creates a IPropagatorBlock<T, T[]> object propagates data in a 
    ' sliding window fashion.
    Public Shared Function CreateSlidingWindow(Of T)(ByVal windowSize As Integer) As IPropagatorBlock(Of T, T())
        ' Create a queue to hold messages.
        Dim queue = New Queue(Of T)()

        ' The source part of the propagator holds arrays of size windowSize
        ' and propagates data out to any connected targets.
        Dim source = New BufferBlock(Of T())()

        ' The target part receives data and adds them to the queue.
        Dim target = New ActionBlock(Of T)(Sub(item)
                                               ' Add the item to the queue.
                                               ' Remove the oldest item when the queue size exceeds the window size.
                                               ' Post the data in the queue to the source block when the queue size
                                               ' equals the window size.
                                               queue.Enqueue(item)
                                               If queue.Count > windowSize Then
                                                   queue.Dequeue()
                                               End If
                                               If queue.Count = windowSize Then
                                                   source.Post(queue.ToArray())
                                               End If
                                           End Sub)

        ' When the target is set to the completed state, propagate out any
        ' remaining data and set the source to the completed state.
        target.Completion.ContinueWith(Sub()
                                           If queue.Count > 0 AndAlso queue.Count < windowSize Then
                                               source.Post(queue.ToArray())
                                           End If
                                           source.Complete()
                                       End Sub)

        ' Return a IPropagatorBlock<T, T[]> object that encapsulates the 
        ' target and source blocks.
        Return DataflowBlock.Encapsulate(target, source)
    End Function

    ' Propagates data in a sliding window fashion.
    Public Class SlidingWindowBlock(Of T)
        Implements IPropagatorBlock(Of T, T()), IReceivableSourceBlock(Of T())
        ' The size of the window.
        Private ReadOnly m_windowSize As Integer
        ' The target part of the block.
        Private ReadOnly m_target As ITargetBlock(Of T)
        ' The source part of the block.
        Private ReadOnly m_source As IReceivableSourceBlock(Of T())

        ' Constructs a SlidingWindowBlock object.
        Public Sub New(ByVal windowSize As Integer)
            ' Create a queue to hold messages.
            Dim queue = New Queue(Of T)()

            ' The source part of the propagator holds arrays of size windowSize
            ' and propagates data out to any connected targets.
            Dim source = New BufferBlock(Of T())()

            ' The target part receives data and adds them to the queue.
            Dim target = New ActionBlock(Of T)(Sub(item)
                                                   ' Add the item to the queue.
                                                   ' Remove the oldest item when the queue size exceeds the window size.
                                                   ' Post the data in the queue to the source block when the queue size
                                                   ' equals the window size.
                                                   queue.Enqueue(item)
                                                   If queue.Count > windowSize Then
                                                       queue.Dequeue()
                                                   End If
                                                   If queue.Count = windowSize Then
                                                       source.Post(queue.ToArray())
                                                   End If
                                               End Sub)

            ' When the target is set to the completed state, propagate out any
            ' remaining data and set the source to the completed state.
            target.Completion.ContinueWith(Sub()
                                               If queue.Count > 0 AndAlso queue.Count < windowSize Then
                                                   source.Post(queue.ToArray())
                                               End If
                                               source.Complete()
                                           End Sub)

            m_windowSize = windowSize
            m_target = target
            m_source = source
        End Sub

        ' Retrieves the size of the window.
        Public ReadOnly Property WindowSize() As Integer
            Get
                Return m_windowSize
            End Get
        End Property

        '#Region "IReceivableSourceBlock<TOutput> members"

        ' Attempts to synchronously receive an item from the source.
        Public Function TryReceive(ByVal filter As Predicate(Of T()), <System.Runtime.InteropServices.Out()> ByRef item() As T) As Boolean Implements IReceivableSourceBlock(Of T()).TryReceive
            Return m_source.TryReceive(filter, item)
        End Function

        ' Attempts to remove all available elements from the source into a new 
        ' array that is returned.
        Public Function TryReceiveAll(<System.Runtime.InteropServices.Out()> ByRef items As IList(Of T())) As Boolean Implements IReceivableSourceBlock(Of T()).TryReceiveAll
            Return m_source.TryReceiveAll(items)
        End Function

        '#End Region

#Region "ISourceBlock<TOutput> members"

        ' Links this dataflow block to the provided target.
        Public Function LinkTo(ByVal target As ITargetBlock(Of T()), ByVal linkOptions As DataflowLinkOptions) As IDisposable Implements ISourceBlock(Of T()).LinkTo
            Return m_source.LinkTo(target, linkOptions)
        End Function

        ' Called by a target to reserve a message previously offered by a source 
        ' but not yet consumed by this target.
        Private Function ReserveMessage(ByVal messageHeader As DataflowMessageHeader, ByVal target As ITargetBlock(Of T())) As Boolean Implements ISourceBlock(Of T()).ReserveMessage
            Return m_source.ReserveMessage(messageHeader, target)
        End Function

        ' Called by a target to consume a previously offered message from a source.
        Private Function ConsumeMessage(ByVal messageHeader As DataflowMessageHeader, ByVal target As ITargetBlock(Of T()), ByRef messageConsumed As Boolean) As T() Implements ISourceBlock(Of T()).ConsumeMessage
            Return m_source.ConsumeMessage(messageHeader, target, messageConsumed)
        End Function

        ' Called by a target to release a previously reserved message from a source.
        Private Sub ReleaseReservation(ByVal messageHeader As DataflowMessageHeader, ByVal target As ITargetBlock(Of T())) Implements ISourceBlock(Of T()).ReleaseReservation
            m_source.ReleaseReservation(messageHeader, target)
        End Sub

#End Region

#Region "ITargetBlock<TInput> members"

        ' Asynchronously passes a message to the target block, giving the target the 
        ' opportunity to consume the message.
        Private Function OfferMessage(ByVal messageHeader As DataflowMessageHeader, ByVal messageValue As T, ByVal source As ISourceBlock(Of T), ByVal consumeToAccept As Boolean) As DataflowMessageStatus Implements ITargetBlock(Of T).OfferMessage
            Return m_target.OfferMessage(messageHeader, messageValue, source, consumeToAccept)
        End Function

#End Region

#Region "IDataflowBlock members"

        ' Gets a Task that represents the completion of this dataflow block.
        Public ReadOnly Property Completion() As Task Implements IDataflowBlock.Completion
            Get
                Return m_source.Completion
            End Get
        End Property

        ' Signals to this target block that it should not accept any more messages, 
        ' nor consume postponed messages. 
        Public Sub Complete() Implements IDataflowBlock.Complete
            m_target.Complete()
        End Sub

        Public Sub Fault(ByVal [error] As Exception) Implements IDataflowBlock.Fault
            m_target.Fault([error])
        End Sub

#End Region
    End Class

    ' Demonstrates usage of the sliding window block by sending the provided
    ' values to the provided propagator block and printing the output of 
    ' that block to the console.
    Private Shared Sub DemonstrateSlidingWindow(Of T)(ByVal slidingWindow As IPropagatorBlock(Of T, T()), ByVal values As IEnumerable(Of T))
        ' Create an action block that prints arrays of data to the console.
        Dim windowComma As String = String.Empty
        Dim printWindow = New ActionBlock(Of T())(Sub(window)
                                                      Console.Write(windowComma)
                                                      Console.Write("{")
                                                      Dim comma As String = String.Empty
                                                      For Each item As T In window
                                                          Console.Write(comma)
                                                          Console.Write(item)
                                                          comma = ","
                                                      Next item
                                                      Console.Write("}")
                                                      windowComma = ", "
                                                  End Sub)

        ' Link the printer block to the sliding window block.
        slidingWindow.LinkTo(printWindow)

        ' Set the printer block to the completed state when the sliding window
        ' block completes.
        slidingWindow.Completion.ContinueWith(Sub() printWindow.Complete())

        ' Print an additional newline to the console when the printer block completes.
        Dim completion = printWindow.Completion.ContinueWith(Sub() Console.WriteLine())

        ' Post the provided values to the sliding window block and then wait
        ' for the sliding window block to complete.
        For Each value As T In values
            slidingWindow.Post(value)
        Next value
        slidingWindow.Complete()

        ' Wait for the printer to complete and perform its final action.
        completion.Wait()
    End Sub

    Shared Sub Main(ByVal args() As String)

        Console.Write("Using the DataflowBlockExtensions.Encapsulate method ")
        Console.WriteLine("(T=int, windowSize=3):")
        DemonstrateSlidingWindow(CreateSlidingWindow(Of Integer)(3), Enumerable.Range(0, 10))

        Console.WriteLine()

        Dim slidingWindow = New SlidingWindowBlock(Of Char)(4)

        Console.Write("Using SlidingWindowBlock<T> ")
        Console.WriteLine("(T=char, windowSize={0}):", slidingWindow.WindowSize)
        DemonstrateSlidingWindow(slidingWindow, _
            From n In Enumerable.Range(65, 10) _
            Select ChrW(n))
    End Sub
End Class

' Output:
'Using the DataflowBlockExtensions.Encapsulate method (T=int, windowSize=3):
'{0,1,2}, {1,2,3}, {2,3,4}, {3,4,5}, {4,5,6}, {5,6,7}, {6,7,8}, {7,8,9}
'
'Using SlidingWindowBlock<T> (T=char, windowSize=4):
'{A,B,C,D}, {B,C,D,E}, {C,D,E,F}, {D,E,F,G}, {E,F,G,H}, {F,G,H,I}, {G,H,I,J}
' 

Voir aussi