Partilhar via


Como: Especificar o grau de paralelismo em um bloco de fluxo de dados

Este documento descreve como definir a ExecutionDataflowBlockOptions.MaxDegreeOfParallelism propriedade para permitir que um bloco de fluxo de dados de execução processe mais de uma mensagem de cada vez. Fazer isso é útil quando você tem um bloco de fluxo de dados que executa um cálculo de longa duração e pode se beneficiar do processamento de mensagens em paralelo. Este exemplo usa a System.Threading.Tasks.Dataflow.ActionBlock<TInput> classe para executar várias operações de fluxo de dados simultaneamente, no entanto, você pode especificar o grau máximo de paralelismo em qualquer um dos tipos de bloco de execução predefinidos que a Biblioteca de Fluxo de Dados TPL fornece, ActionBlock<TInput>, System.Threading.Tasks.Dataflow.TransformBlock<TInput,TOutput>e System.Threading.Tasks.Dataflow.TransformManyBlock<TInput,TOutput>.

Nota

A biblioteca de fluxo de dados TPL (o namespace) não é distribuída com o System.Threading.Tasks.Dataflow .NET. Para instalar o System.Threading.Tasks.Dataflow namespace no Visual Studio, abra seu projeto, escolha Gerenciar pacotes NuGet no menu Projeto e pesquise o System.Threading.Tasks.Dataflow pacote online. Como alternativa, para instalá-lo usando a CLI do .NET Core, execute dotnet add package System.Threading.Tasks.Dataflow.

Exemplo

O exemplo a seguir executa dois cálculos de fluxo de dados e imprime o tempo decorrido necessário para cada cálculo. O primeiro cálculo especifica um grau máximo de paralelismo de 1, que é o padrão. Um grau máximo de paralelismo de 1 faz com que o bloco de fluxo de dados processe mensagens em série. O segundo cálculo se assemelha ao primeiro, exceto que especifica um grau máximo de paralelismo que é igual ao número de processadores disponíveis. Isso permite que o bloco de fluxo de dados execute várias operações em paralelo.

using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to specify the maximum degree of parallelism
// when using dataflow.
class Program
{
   // Performs several computations by using dataflow and returns the elapsed
   // time required to perform the computations.
   static TimeSpan TimeDataflowComputations(int maxDegreeOfParallelism,
      int messageCount)
   {
      // Create an ActionBlock<int> that performs some work.
      var workerBlock = new ActionBlock<int>(
         // Simulate work by suspending the current thread.
         millisecondsTimeout => Thread.Sleep(millisecondsTimeout),
         // Specify a maximum degree of parallelism.
         new ExecutionDataflowBlockOptions
         {
            MaxDegreeOfParallelism = maxDegreeOfParallelism
         });

      // Compute the time that it takes for several messages to
      // flow through the dataflow block.

      Stopwatch stopwatch = new Stopwatch();
      stopwatch.Start();

      for (int i = 0; i < messageCount; i++)
      {
         workerBlock.Post(1000);
      }
      workerBlock.Complete();

      // Wait for all messages to propagate through the network.
      workerBlock.Completion.Wait();

      // Stop the timer and return the elapsed number of milliseconds.
      stopwatch.Stop();
      return stopwatch.Elapsed;
   }
   static void Main(string[] args)
   {
      int processorCount = Environment.ProcessorCount;
      int messageCount = processorCount;

      // Print the number of processors on this computer.
      Console.WriteLine("Processor count = {0}.", processorCount);

      TimeSpan elapsed;

      // Perform two dataflow computations and print the elapsed
      // time required for each.

      // This call specifies a maximum degree of parallelism of 1.
      // This causes the dataflow block to process messages serially.
      elapsed = TimeDataflowComputations(1, messageCount);
      Console.WriteLine("Degree of parallelism = {0}; message count = {1}; " +
         "elapsed time = {2}ms.", 1, messageCount, (int)elapsed.TotalMilliseconds);

      // Perform the computations again. This time, specify the number of
      // processors as the maximum degree of parallelism. This causes
      // multiple messages to be processed in parallel.
      elapsed = TimeDataflowComputations(processorCount, messageCount);
      Console.WriteLine("Degree of parallelism = {0}; message count = {1}; " +
         "elapsed time = {2}ms.", processorCount, messageCount, (int)elapsed.TotalMilliseconds);
   }
}

/* Sample output:
Processor count = 4.
Degree of parallelism = 1; message count = 4; elapsed time = 4032ms.
Degree of parallelism = 4; message count = 4; elapsed time = 1001ms.
*/
Imports System.Diagnostics
Imports System.Threading
Imports System.Threading.Tasks.Dataflow

' Demonstrates how to specify the maximum degree of parallelism 
' when using dataflow.
Friend Class Program
    ' Performs several computations by using dataflow and returns the elapsed
    ' time required to perform the computations.
    Private Shared Function TimeDataflowComputations(ByVal maxDegreeOfParallelism As Integer, ByVal messageCount As Integer) As TimeSpan
        ' Create an ActionBlock<int> that performs some work.
        Dim workerBlock = New ActionBlock(Of Integer)(Function(millisecondsTimeout) Pause(millisecondsTimeout), New ExecutionDataflowBlockOptions() With {.MaxDegreeOfParallelism = maxDegreeOfParallelism})
        ' Simulate work by suspending the current thread.
        ' Specify a maximum degree of parallelism.

        ' Compute the time that it takes for several messages to 
        ' flow through the dataflow block.

        Dim stopwatch As New Stopwatch()
        stopwatch.Start()

        For i As Integer = 0 To messageCount - 1
            workerBlock.Post(1000)
        Next i
        workerBlock.Complete()

        ' Wait for all messages to propagate through the network.
        workerBlock.Completion.Wait()

        ' Stop the timer and return the elapsed number of milliseconds.
        stopwatch.Stop()
        Return stopwatch.Elapsed
    End Function

    Private Shared Function Pause(ByVal obj As Object)
        Thread.Sleep(obj)
        Return Nothing
    End Function
    Shared Sub Main(ByVal args() As String)
        Dim processorCount As Integer = Environment.ProcessorCount
        Dim messageCount As Integer = processorCount

        ' Print the number of processors on this computer.
        Console.WriteLine("Processor count = {0}.", processorCount)

        Dim elapsed As TimeSpan

        ' Perform two dataflow computations and print the elapsed
        ' time required for each.

        ' This call specifies a maximum degree of parallelism of 1.
        ' This causes the dataflow block to process messages serially.
        elapsed = TimeDataflowComputations(1, messageCount)
        Console.WriteLine("Degree of parallelism = {0}; message count = {1}; " & "elapsed time = {2}ms.", 1, messageCount, CInt(Fix(elapsed.TotalMilliseconds)))

        ' Perform the computations again. This time, specify the number of 
        ' processors as the maximum degree of parallelism. This causes
        ' multiple messages to be processed in parallel.
        elapsed = TimeDataflowComputations(processorCount, messageCount)
        Console.WriteLine("Degree of parallelism = {0}; message count = {1}; " & "elapsed time = {2}ms.", processorCount, messageCount, CInt(Fix(elapsed.TotalMilliseconds)))
    End Sub
End Class

' Sample output:
'Processor count = 4.
'Degree of parallelism = 1; message count = 4; elapsed time = 4032ms.
'Degree of parallelism = 4; message count = 4; elapsed time = 1001ms.
'

Programação robusta

Por padrão, cada bloco de fluxo de dados predefinido propaga mensagens na ordem em que as mensagens são recebidas. Embora várias mensagens sejam processadas simultaneamente quando você especifica um grau máximo de paralelismo maior que 1, elas ainda são propagadas na ordem em que são recebidas.

Como a MaxDegreeOfParallelism propriedade representa o grau máximo de paralelismo, o bloco de fluxo de dados pode ser executado com um grau menor de paralelismo do que o especificado. O bloco de fluxo de dados pode usar um grau menor de paralelismo para atender aos seus requisitos funcionais ou para explicar a falta de recursos disponíveis do sistema. Um bloco de fluxo de dados nunca escolhe um grau maior de paralelismo do que o especificado.

Consulte também