Condividi tramite


Flusso di dati (Task Parallel Library)

La Task Parallel Library (TPL) fornisce componenti del flusso di dati per aumentare l'affidabilità delle applicazioni abilitate per la concorrenza. Questi componenti del flusso di dati vengono definiti collettivamente libreria del flusso di dati TPL. Con questo modello del flusso di dati viene promossa la programmazione basata su attori fornendo il passaggio di messaggi in-process per attività di pipelining e per un flusso di dati con granularità grossolana. I componenti del flusso di dati sono basati sui tipi e sull'infrastruttura di pianificazione della libreria TPL e si integrano con il supporto dei linguaggi C#, Visual Basic e F# per la programmazione asincrona. Questi componenti sono utili qualora siano presenti più operazioni che devono comunicare tra loro in modo asincrono o qualora si desideri elaborare i dati non appena diventano disponibili. Si consideri, ad esempio, un'applicazione tramite cui vengono elaborati i dati immagine di una webcam. Tramite il modello del flusso di dati, l'applicazione è in grado di elaborare i fotogrammi delle immagini quando diventano disponibili. Se tramite l'applicazione vengono migliorati i fotogrammi dell'immagine, ad esempio, eseguendo la correzione della luce o la riduzione occhi rossi, è possibile creare una pipeline di componenti del flusso di dati. In ogni fase della pipeline è possibile utilizzare la funzionalità di parallelismo con maggiore granulosità grossolana, ad esempio la funzionalità fornita dalla libreria TPL, per trasformare l'immagine.

In questo documento viene fornita una panoramica della libreria del flusso di dati TPL. Vengono descritti il modello di programmazione e i tipi di blocchi di flussi di dati predefiniti e viene indicato come configurare i blocchi di flussi di dati per soddisfare specifici requisiti delle applicazioni.

Nota

La libreria del flusso di dati TPL (spazio dei nomi System.Threading.Tasks.Dataflow) non viene distribuita con .NET. Per installare lo spazio dei nomi System.Threading.Tasks.Dataflow in Visual Studio, aprire il progetto in Visual Studio, scegliere Gestisci pacchetti NuGet dal menu Progetto ed eseguire una ricerca online del pacchetto System.Threading.Tasks.Dataflow. In alternativa, per installarlo usando l'interfaccia della riga di comando di .NET Core, eseguire dotnet add package System.Threading.Tasks.Dataflow.

Modello di programmazione

La libreria del flusso di dati TPL fornisce una base per il passaggio dei messaggi e per la parallelizzazione delle applicazioni con utilizzo intensivo di I/O e di CPU con velocità effettiva elevata e bassa latenza. Offre inoltre un controllo esplicito sul modo in cui i dati vengono memorizzati nel buffer e spostati nel sistema. Per comprendere meglio il modello di programmazione del flusso di dati, si consideri un'applicazione tramite cui vengono caricate in modo asincrono le immagini dal disco e viene creata una composizione di queste immagini. Per i modelli di programmazione tradizionali viene in genere richiesto l'utilizzo di callback e oggetti di sincronizzazione, ad esempio blocchi, per coordinare le attività e accedere ai dati condivisi. Tramite il modello di programmazione del flusso di dati è possibile creare oggetti del flusso di dati mediante i quali vengono elaborate le immagini mentre sono lette dal disco. Nel modello del flusso di dati è possibile dichiarare la modalità di gestione dei dati quando disponibili, nonché tutte le dipendenze tra i dati. Poiché le dipendenze tra i dati sono gestite dal runtime, è spesso possibile evitare la necessità di sincronizzare l'accesso ai dati condivisi. Inoltre, dal momento che tramite il runtime il lavoro viene pianificato in base all'arrivo asincrono di dati, con il flusso di dati è possibile migliorare la velocità di risposta e la velocità effettiva gestendo i thread sottostanti in modo efficiente. Per un esempio in cui si usa il modello di programmazione del flusso di dati per implementare l'elaborazione di immagini in un'applicazione Windows Forms, vedere Procedura dettagliata: uso del flusso di dati in un'applicazione Windows Forms.

Origini e destinazioni

La libreria del flusso di dati TPL è costituita da blocchi di flussi di dati, vale a dire strutture dei dati in cui i dati vengono memorizzati nel buffer ed elaborati. Tramite la libreria TPL vengono definiti tre tipi di blocchi di flussi di dati: blocchi di origine, blocchi di destinazione e blocchi di propagazione. Un blocco di origine viene utilizzato come origine di dati da cui è possibile leggere. Un blocco di destinazione viene utilizzato come destinatario di dati in cui è possibile scrivere. Un blocco di propagazione viene utilizzato sia come blocco di origine sia come blocco di destinazione, quindi da cui è possibile leggere e in cui è possibile scrivere. Tramite la libreria TPL vengono definite le seguenti interfacce: System.Threading.Tasks.Dataflow.ISourceBlock<TOutput> per rappresentare le origini, System.Threading.Tasks.Dataflow.ITargetBlock<TInput> per rappresentare le destinazioni e System.Threading.Tasks.Dataflow.IPropagatorBlock<TInput,TOutput> per rappresentare le propagazioni. L'interfaccia IPropagatorBlock<TInput,TOutput> eredita sia da ISourceBlock<TOutput> sia da ITargetBlock<TInput>.

La libreria del flusso di dati TPL fornisce vari tipi di blocchi di flussi di dati predefiniti tramite cui vengono implementate le interfacce ISourceBlock<TOutput>, ITargetBlock<TInput> e IPropagatorBlock<TInput,TOutput>. Questi tipi di blocchi di flussi di dati sono descritti nella sezione Tipi di blocchi di flussi di dati predefiniti di questo documento.

Connessione di blocchi

È possibile connettere i blocchi di flussi di dati per creare pipeline, che sono sequenze lineari di blocchi di flussi di dati, o reti, che sono grafici di blocchi di flussi di dati. Una pipeline è un tipo di rete. I dati di origini in una pipeline o in una rete vengono propagati nelle destinazioni in modo asincrono quando i dati in questione diventano disponibili. Tramite il metodo ISourceBlock<TOutput>.LinkTo un blocco di flussi di dati di origine viene connesso a un blocco di destinazione. Un'origine può essere collegata a zero o più destinazioni, mentre queste ultime possono essere collegate da zero o più origini. È possibile aggiungere o rimuovere contemporaneamente blocchi di flussi di dati a o da una pipeline o rete. Tramite i tipi di blocchi di flussi di dati predefiniti vengono gestiti tutti gli aspetti di collegamento e scollegamento thread safety.

Per un esempio di connessione di blocchi di flussi di dati per formare una pipeline di base, vedere Procedura dettagliata: creazione di una pipeline del flusso di dati. Per un esempio di connessione di blocchi di flussi di dati per formare una rete più complessa, vedere Procedura dettagliata: uso del flusso di dati in un'applicazione Windows Forms. Per un esempio di scollegamento di una destinazione da un'origine dopo l'offerta di un messaggio alla destinazione da parte dell'origine, vedere Procedura: Scollegare i blocchi di flussi di dati.

Filtro

Quando si chiama il metodo ISourceBlock<TOutput>.LinkTo per collegare un'origine a una destinazione, è possibile fornire un delegato tramite cui si stabilisce se da parte del blocco di destinazione viene accettato o rifiutato un messaggio in base al valore del messaggio in questione. Questo meccanismo di filtro è utile per garantire la ricezione solo di determinati valori da parte di un blocco di flussi di dati. Per la maggior parte dei tipi di blocchi di flussi di dati predefiniti, se un blocco di origine è connesso a più blocchi di destinazione, quando da parte di un blocco di destinazione viene rifiutato un messaggio, quest'ultimo tramite l'origine viene offerto alla destinazione successiva. L'ordine di offerta dei messaggi da parte dell'origine alle destinazioni viene definito dall'origine e può variare a seconda del tipo di origine. L'offerta di un messaggio da parte della maggior parte dei tipi di blocco di origine viene arrestata dopo l'accettazione del messaggio in questione da una destinazione. Un'eccezione a questa regola è la classe BroadcastBlock<T> mediante la quale ogni messaggio viene offerto a tutte le destinazioni, anche se viene rifiutato da alcune di queste. Per un esempio che usa il filtro per elaborare solo determinati messaggi, vedere Procedura dettagliata: uso del flusso di dati in un'applicazione Windows Forms.

Importante

Poiché ogni tipo di blocco di flussi di dati di origine predefinito garantisce la propagazione all'esterno di questi messaggi nell'ordine in cui vengono ricevuti, ogni messaggio deve essere letto dal blocco di origine prima che da parte di quest'ultimo possa essere elaborato il messaggio successivo. Di conseguenza, quando si utilizzano i filtri per connettere più destinazioni a un'origine, accertarsi che ogni messaggio venga ricevuto da almeno un blocco di destinazione. In caso contrario, si potrebbe verificare un deadlock dell'applicazione.

Passaggio dei messaggi

Il modello di programmazione del flusso di dati è correlato al concetto di passaggio dei messaggi, in base al quale i componenti indipendenti di un programma comunicano con un altro programma inviando messaggi. Un modo per propagare i messaggi tra i componenti dell'applicazione consiste nel chiamare i metodi Post (sincrono) e SendAsync (asincrono) per inviare messaggi ai blocchi di ei flussi di dati di destinazione, e i metodi Receive, ReceiveAsync e TryReceive per ricevere messaggi dai blocchi di origine. È possibile combinare questi metodi con pipeline o reti del flusso di dati inviando i dati di input al nodo principale (blocco di destinazione) e ricevendo i dati di output dal nodo terminale della pipeline o dai nodi terminali della rete (uno o più blocchi di origine). È inoltre possibile utilizzare il metodo Choose per leggere dalla prima delle origini fornite con dati disponibili ed eseguire azioni sui dati in questione.

I blocchi di origine offrono dati ai blocchi di destinazione chiamando il metodo ITargetBlock<TInput>.OfferMessage. La risposta a un messaggio offerto viene fornita dal blocco di destinazione in tre modalità: il messaggio può essere accettato, rifiutato o posticipato. Quando il messaggio viene accettato dalla destinazione, il metodo OfferMessage restituisce Accepted. Quando il messaggio viene rifiutato dalla destinazione, il metodo OfferMessage restituisce Declined. Quando da parte della destinazione viene richiesto di non ricevere più messaggi dall'origine, il metodo OfferMessage restituisce DecliningPermanently. I tipi di blocchi di origine predefiniti non offrono messaggi alle destinazioni collegate dopo la ricezione di un valore restituito di questo tipo e vengono scollegati automaticamente da queste destinazioni.

Quando tramite un blocco di destinazione il messaggio viene posticipato per un utilizzo successivo, il metodo OfferMessage restituisce Postponed. Un blocco di destinazione che posticipa un messaggio può chiamare successivamente il metodo ISourceBlock<TOutput>.ReserveMessage per provare a prenotare il messaggio offerto. A questo punto, il messaggio è ancora disponibile e può essere utilizzato dal blocco di destinazione oppure è stato accettato da un'altra destinazione. Quando il messaggio viene richiesto dal blocco di destinazione in un secondo momento o non è più necessario, viene chiamato rispettivamente il metodo ISourceBlock<TOutput>.ConsumeMessage o ReleaseReservation. La prenotazione dei messaggi viene in genere utilizzata dai tipi di blocchi di flussi di dati che operano in modalità non greedy. La modalità non greedy verrà illustrata più avanti in questo documento. Anziché riservare un messaggio posposto, in un blocco di destinazione può inoltre essere utilizzato il metodo ISourceBlock<TOutput>.ConsumeMessage per tentare di utilizzare direttamente il messaggio posposto.

Completamento dei blocchi di flussi di dati

I blocchi di flussi di dati supportano inoltre il concetto di completamento. Non viene eseguito alcun lavoro ulteriore da parte di un blocco di flussi di dati che si trova nello stato completato. A ogni blocco di flussi di dati è associato un oggetto System.Threading.Tasks.Task, noto come attività di completamento, che rappresenta lo stato di completamento del blocco. Dal momento che è possibile attendere il completamento di un oggetto Task, tramite le attività di completamento è possibile attendere il completamento di uno o più nodi terminali di una rete del flusso di dati. Tramite l'interfaccia IDataflowBlock vengono definiti il metodo Complete, mediante il quale il blocco di flussi di dati viene informato del completamento di una relativa richiesta, e la proprietà Completion, che restituisce l'attività di completamento del blocco di flussi di dati. Sia ISourceBlock<TOutput> sia ITargetBlock<TInput> ereditano l'interfaccia IDataflowBlock.

Sono disponibili due modalità per determinare se un blocco di flussi di dati viene completato correttamente, se tramite esso vengono rilevati uno o più errori o se è stato annullato. La prima consiste nel chiamare il metodo Task.Wait sull'attività di completamento in un blocco try-catch (Try-Catch in Visual Basic). Nell'esempio seguente viene creato un oggetto ActionBlock<TInput> tramite cui viene generato ArgumentOutOfRangeException se il valore di input è minore di zero. AggregateException viene generato quando in questo esempio viene chiamato il metodo Wait sull'attività di completamento. È possibile accedere a ArgumentOutOfRangeException tramite la proprietà InnerExceptions dell'oggetto AggregateException.

// Create an ActionBlock<int> object that prints its input
// and throws ArgumentOutOfRangeException if the input
// is less than zero.
var throwIfNegative = new ActionBlock<int>(n =>
{
   Console.WriteLine("n = {0}", n);
   if (n < 0)
   {
      throw new ArgumentOutOfRangeException();
   }
});

// Post values to the block.
throwIfNegative.Post(0);
throwIfNegative.Post(-1);
throwIfNegative.Post(1);
throwIfNegative.Post(-2);
throwIfNegative.Complete();

// Wait for completion in a try/catch block.
try
{
   throwIfNegative.Completion.Wait();
}
catch (AggregateException ae)
{
   // If an unhandled exception occurs during dataflow processing, all
   // exceptions are propagated through an AggregateException object.
   ae.Handle(e =>
   {
      Console.WriteLine("Encountered {0}: {1}",
         e.GetType().Name, e.Message);
      return true;
   });
}

/* Output:
n = 0
n = -1
Encountered ArgumentOutOfRangeException: Specified argument was out of the range
 of valid values.
*/
' Create an ActionBlock<int> object that prints its input
' and throws ArgumentOutOfRangeException if the input
' is less than zero.
Dim throwIfNegative = New ActionBlock(Of Integer)(Sub(n)
                                                      Console.WriteLine("n = {0}", n)
                                                      If n < 0 Then
                                                          Throw New ArgumentOutOfRangeException()
                                                      End If
                                                  End Sub)

' Post values to the block.
throwIfNegative.Post(0)
throwIfNegative.Post(-1)
throwIfNegative.Post(1)
throwIfNegative.Post(-2)
throwIfNegative.Complete()

' Wait for completion in a try/catch block.
Try
    throwIfNegative.Completion.Wait()
Catch ae As AggregateException
    ' If an unhandled exception occurs during dataflow processing, all
    ' exceptions are propagated through an AggregateException object.
    ae.Handle(Function(e)
                  Console.WriteLine("Encountered {0}: {1}", e.GetType().Name, e.Message)
                  Return True
              End Function)
End Try

'          Output:
'         n = 0
'         n = -1
'         Encountered ArgumentOutOfRangeException: Specified argument was out of the range
'          of valid values.
'         

In questo esempio viene illustrato il caso in cui un'eccezione non gestita viene inserita nel delegato di un blocco di flussi di dati di esecuzione. Si consiglia di gestire le eccezioni nei corpi di blocchi di questo tipo. In caso contrario, tuttavia, il comportamento del blocco è simile a quello di un annullamento e i messaggi in ingresso non vengono elaborati.

Quando un blocco di flussi di dati viene annullato in modo esplicito, nell'oggetto AggregateException è contenuto OperationCanceledException nella proprietà InnerExceptions. Per altre informazioni sull'annullamento del flusso di dati, vedere la sezione Abilitazione dell'annullamento.

La seconda modalità per determinare lo stato di completamento di un blocco di flussi di dati consiste nell'usare una continuazione dell'attività di completamento o le funzionalità asincrone dei linguaggi C# e Visual Basic per attendere in modo asincrono l'attività di completamento. Il delegato fornito al metodo Task.ContinueWith accetta un oggetto Task che rappresenta l'attività precedente. Nel caso della proprietà Completion, il delegato per la continuazione accetta l'attività di completamento stessa. L'esempio seguente è simile al precedente, tranne per il fatto che viene usato anche il metodo ContinueWith per creare un'attività di continuazione che stampa lo stato dell'operazione globale del flusso di dati.

// Create an ActionBlock<int> object that prints its input
// and throws ArgumentOutOfRangeException if the input
// is less than zero.
var throwIfNegative = new ActionBlock<int>(n =>
{
   Console.WriteLine("n = {0}", n);
   if (n < 0)
   {
      throw new ArgumentOutOfRangeException();
   }
});

// Create a continuation task that prints the overall
// task status to the console when the block finishes.
throwIfNegative.Completion.ContinueWith(task =>
{
   Console.WriteLine("The status of the completion task is '{0}'.",
      task.Status);
});

// Post values to the block.
throwIfNegative.Post(0);
throwIfNegative.Post(-1);
throwIfNegative.Post(1);
throwIfNegative.Post(-2);
throwIfNegative.Complete();

// Wait for completion in a try/catch block.
try
{
   throwIfNegative.Completion.Wait();
}
catch (AggregateException ae)
{
   // If an unhandled exception occurs during dataflow processing, all
   // exceptions are propagated through an AggregateException object.
   ae.Handle(e =>
   {
      Console.WriteLine("Encountered {0}: {1}",
         e.GetType().Name, e.Message);
      return true;
   });
}

/* Output:
n = 0
n = -1
The status of the completion task is 'Faulted'.
Encountered ArgumentOutOfRangeException: Specified argument was out of the range
 of valid values.
*/
' Create an ActionBlock<int> object that prints its input
' and throws ArgumentOutOfRangeException if the input
' is less than zero.
Dim throwIfNegative = New ActionBlock(Of Integer)(Sub(n)
                                                      Console.WriteLine("n = {0}", n)
                                                      If n < 0 Then
                                                          Throw New ArgumentOutOfRangeException()
                                                      End If
                                                  End Sub)

' Create a continuation task that prints the overall 
' task status to the console when the block finishes.
throwIfNegative.Completion.ContinueWith(Sub(task) Console.WriteLine("The status of the completion task is '{0}'.", task.Status))

' Post values to the block.
throwIfNegative.Post(0)
throwIfNegative.Post(-1)
throwIfNegative.Post(1)
throwIfNegative.Post(-2)
throwIfNegative.Complete()

' Wait for completion in a try/catch block.
Try
    throwIfNegative.Completion.Wait()
Catch ae As AggregateException
    ' If an unhandled exception occurs during dataflow processing, all
    ' exceptions are propagated through an AggregateException object.
    ae.Handle(Function(e)
                  Console.WriteLine("Encountered {0}: {1}", e.GetType().Name, e.Message)
                  Return True
              End Function)
End Try

'          Output:
'         n = 0
'         n = -1
'         The status of the completion task is 'Faulted'.
'         Encountered ArgumentOutOfRangeException: Specified argument was out of the range
'          of valid values.
'         

È inoltre possibile utilizzare proprietà come IsCanceled nel corpo dell'attività di continuazione per determinare informazioni aggiuntive sullo stato di completamento di un blocco di flussi di dati. Per altre informazioni sulle attività di continuazione e il modo in cui interagiscono con l'annullamento e la gestione degli errori, vedere Concatenamento di attività tramite attività di continuazione, Annullamento delle attività e Gestione delle eccezioni.

Tipi di blocchi di flussi di dati predefiniti

La libreria del flusso di dati TPL fornisce vari tipi di blocchi di flussi di dati predefiniti. Questi tipi sono suddivisi in tre categorie: blocchi di buffering, blocchi di esecuzione e blocchi di raggruppamento. Nelle sezioni seguenti vengono descritti i tipi di blocchi che compongono queste categorie.

Blocchi di buffering

Nei blocchi di buffering sono contenuti i dati utilizzati dai consumer di dati. La libreria del flusso di dati TPL fornisce tre tipi di blocco di buffering: System.Threading.Tasks.Dataflow.BufferBlock<T>, System.Threading.Tasks.Dataflow.BroadcastBlock<T> e System.Threading.Tasks.Dataflow.WriteOnceBlock<T>.

BufferBlock<T>

La classe BufferBlock<T> rappresenta una struttura di messaggistica asincrona di utilizzo generale. Questa classe archivia una coda di messaggi FIFO (First In, First Out) che possono essere letti da più destinazioni o in cui possono scrivere più origini. Quando da parte di una destinazione viene ricevuto un messaggio da un oggetto BufferBlock<T>, il messaggio viene rimosso dalla coda di messaggi. Pertanto, sebbene a un oggetto BufferBlock<T> possano essere associate più destinazioni, ogni messaggio verrà ricevuto da una sola destinazione. La classe BufferBlock<T> è utile quando si vogliono passare più messaggi a un altro componente e tale componente deve ricevere ogni messaggio.

Nell'esempio di base seguente vengono inseriti alcuni valori Int32 in un oggetto BufferBlock<T> che, successivamente, vengono riletti dall'oggetto in questione.

// Create a BufferBlock<int> object.
var bufferBlock = new BufferBlock<int>();

// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
   bufferBlock.Post(i);
}

// Receive the messages back from the block.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(bufferBlock.Receive());
}

/* Output:
   0
   1
   2
 */
' Create a BufferBlock<int> object.
Dim bufferBlock = New BufferBlock(Of Integer)()

' Post several messages to the block.
For i As Integer = 0 To 2
    bufferBlock.Post(i)
Next i

' Receive the messages back from the block.
For i As Integer = 0 To 2
    Console.WriteLine(bufferBlock.Receive())
Next i

'          Output:
'            0
'            1
'            2
'          

Per un esempio completo in cui viene illustrato come scrivere e leggere i messaggi in un oggetto BufferBlock<T>, vedere Procedura: Scrivere messaggi in un blocco di flussi di dati e leggere messaggi da un blocco di flussi di dati.

BroadcastBlock<T>

La classe BroadcastBlock<T> è utile quando è necessario passare più messaggi a un altro componente, ma il componente necessita solo del valore più recente. Questa classe è utile anche quando si vuole trasmettere un messaggio a più componenti.

Nell'esempio di base seguente viene inserito un valore Double in un oggetto BroadcastBlock<T> che, successivamente, viene riletto più volte dall'oggetto in questione. Dal momento che, dopo essere stati letti, i valori non vengono rimossi dagli oggetti BroadcastBlock<T>, lo stesso valore è disponibile ogni volta.

// Create a BroadcastBlock<double> object.
var broadcastBlock = new BroadcastBlock<double>(null);

// Post a message to the block.
broadcastBlock.Post(Math.PI);

// Receive the messages back from the block several times.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(broadcastBlock.Receive());
}

/* Output:
   3.14159265358979
   3.14159265358979
   3.14159265358979
 */
' Create a BroadcastBlock<double> object.
Dim broadcastBlock = New BroadcastBlock(Of Double)(Nothing)

' Post a message to the block.
broadcastBlock.Post(Math.PI)

' Receive the messages back from the block several times.
For i As Integer = 0 To 2
    Console.WriteLine(broadcastBlock.Receive())
Next i

'          Output:
'            3.14159265358979
'            3.14159265358979
'            3.14159265358979
'          

Per un esempio completo che illustra come usare la classe BroadcastBlock<T> per trasmettere un messaggio a più blocchi di destinazione, vedere Procedura: Specificare un'utilità di pianificazione in un blocco di flussi di dati.

WriteOnceBlock<T>

La classe WriteOnceBlock<T> è simile alla classe BroadcastBlock<T>, con la differenza che in un oggetto WriteOnceBlock<T> è possibile scrivere una sola volta. L'oggetto WriteOnceBlock<T> può essere considerato come la parola chiave C# readonly (ReadOnly in Visual Basic), con la differenza che un oggetto WriteOnceBlock<T> diventa non modificabile dopo la ricezione di un valore invece che in fase di costruzione. Come per la classe BroadcastBlock<T>, quando da una destinazione riceve un messaggio da un oggetto WriteOnceBlock<T>, il messaggio viene rimosso dalla coda di messaggi. Pertanto, più destinazioni riceveranno una copia del messaggio. La classe WriteOnceBlock<T> è utile quando si desidera propagare solo il primo di più messaggi.

Nell'esempio di base seguente vengono inseriti valori String in un oggetto WriteOnceBlock<T> che, successivamente, vengono riletti dall'oggetto in questione. Poiché un oggetto WriteOnceBlock<T> può essere scritto una sola volta, dopo la ricezione di un oggetto WriteOnceBlock<T>, i messaggi successivi vengono rimossi.

// Create a WriteOnceBlock<string> object.
var writeOnceBlock = new WriteOnceBlock<string>(null);

// Post several messages to the block in parallel. The first
// message to be received is written to the block.
// Subsequent messages are discarded.
Parallel.Invoke(
   () => writeOnceBlock.Post("Message 1"),
   () => writeOnceBlock.Post("Message 2"),
   () => writeOnceBlock.Post("Message 3"));

// Receive the message from the block.
Console.WriteLine(writeOnceBlock.Receive());

/* Sample output:
   Message 2
 */
' Create a WriteOnceBlock<string> object.
Dim writeOnceBlock = New WriteOnceBlock(Of String)(Nothing)

' Post several messages to the block in parallel. The first 
' message to be received is written to the block. 
' Subsequent messages are discarded.
Parallel.Invoke(Function() writeOnceBlock.Post("Message 1"), Function() writeOnceBlock.Post("Message 2"), Function() writeOnceBlock.Post("Message 3"))

' Receive the message from the block.
Console.WriteLine(writeOnceBlock.Receive())

'          Sample output:
'            Message 2
'          

Per un esempio completo in cui viene illustrato l'uso dell'oggetto WriteOnceBlock<T> per ricevere il valore della prima operazione completata, vedere Procedura: Scollegare i blocchi di flussi di dati.

Blocchi di esecuzione

Tramite i blocchi di esecuzione viene chiamato un delegato fornito dall'utente per ogni blocco di dati ricevuti. La libreria del flusso di dati TPL fornisce tre tipi di blocco di esecuzione: ActionBlock<TInput>, System.Threading.Tasks.Dataflow.TransformBlock<TInput,TOutput> e System.Threading.Tasks.Dataflow.TransformManyBlock<TInput,TOutput>.

ActionBlock<T>

La classe ActionBlock<TInput> è un blocco di destinazione tramite cui viene chiamato un delegato alla ricezione dei dati. Si consideri un oggetto ActionBlock<TInput> come un delegato eseguito in modo asincrono quando i dati diventano disponibili. Il delegato fornito a un oggetto ActionBlock<TInput> può essere di tipo Action<T> o di tipo System.Func<TInput, Task>. Quando si utilizza un oggetto ActionBlock<TInput> con Action<T>, l'elaborazione di ogni elemento di input viene considerata completata alla restituzione del delegato. Quando si utilizza un oggetto ActionBlock<TInput> con System.Func<TInput, Task>, l'elaborazione di ogni elemento di input viene considerata completata solo quando l'oggetto restituito Task viene completato. Tramite questi due meccanismi è possibile utilizzare ActionBlock<TInput> sia per l'elaborazione sincrona sia per quella asincrona di ogni elemento di input.

Nell'esempio di base seguente vengono inseriti più valori Int32 in un oggetto ActionBlock<TInput>. Tramite l'oggetto ActionBlock<TInput> i valori in questione vengono stampati sulla console. In questo esempio il blocco viene quindi impostato sullo stato completato e attende il completamento di tutte le attività del flusso di dati.

// Create an ActionBlock<int> object that prints values
// to the console.
var actionBlock = new ActionBlock<int>(n => Console.WriteLine(n));

// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
   actionBlock.Post(i * 10);
}

// Set the block to the completed state and wait for all
// tasks to finish.
actionBlock.Complete();
actionBlock.Completion.Wait();

/* Output:
   0
   10
   20
 */
' Create an ActionBlock<int> object that prints values
' to the console.
Dim actionBlock = New ActionBlock(Of Integer)(Function(n) WriteLine(n))

' Post several messages to the block.
For i As Integer = 0 To 2
    actionBlock.Post(i * 10)
Next i

' Set the block to the completed state and wait for all 
' tasks to finish.
actionBlock.Complete()
actionBlock.Completion.Wait()

'          Output:
'            0
'            10
'            20
'          

Per esempi completi in cui viene illustrato l'uso dei delegati con la classe ActionBlock<TInput>, vedere Procedura: Eseguire un'azione alla ricezione di dati in un blocco di flussi di dati.

TransformBlock<TInput, TOutput>

La classe TransformBlock<TInput,TOutput> è simile alla classe ActionBlock<TInput>, con la differenza che viene utilizzata sia come origine sia come destinazione. Il delegato passato a un oggetto TransformBlock<TInput,TOutput> restituisce un valore di tipo TOutput. Il delegato fornito a un oggetto TransformBlock<TInput,TOutput> può essere di tipo System.Func<TInput, TOutput> o di tipo System.Func<TInput, Task<TOutput>>. Quando si utilizza un oggetto TransformBlock<TInput,TOutput> con System.Func<TInput, TOutput>, l'elaborazione di ogni elemento di input viene considerata completata alla restituzione del delegato. Quando si utilizza un oggetto TransformBlock<TInput,TOutput> utilizzato con System.Func<TInput, Task<TOutput>>, l'elaborazione di ogni elemento di input viene considerata completata solo quando l'oggetto restituito Task<TResult> viene completato. Analogamente all'oggetto ActionBlock<TInput>, tramite questi due meccanismi, è possibile utilizzare TransformBlock<TInput,TOutput> sia per l'elaborazione sincrona sia per quella asincrona di ogni elemento di input.

Nell'esempio di base seguente viene creato un oggetto TransformBlock<TInput,TOutput> con cui si calcola la radice quadrata dell'input. L'oggetto TransformBlock<TInput,TOutput> accetta valori Int32 come input e tramite esso vengono generati valori Double come output.

// Create a TransformBlock<int, double> object that
// computes the square root of its input.
var transformBlock = new TransformBlock<int, double>(n => Math.Sqrt(n));

// Post several messages to the block.
transformBlock.Post(10);
transformBlock.Post(20);
transformBlock.Post(30);

// Read the output messages from the block.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(transformBlock.Receive());
}

/* Output:
   3.16227766016838
   4.47213595499958
   5.47722557505166
 */
' Create a TransformBlock<int, double> object that 
' computes the square root of its input.
Dim transformBlock = New TransformBlock(Of Integer, Double)(Function(n) Math.Sqrt(n))

' Post several messages to the block.
transformBlock.Post(10)
transformBlock.Post(20)
transformBlock.Post(30)

' Read the output messages from the block.
For i As Integer = 0 To 2
    Console.WriteLine(transformBlock.Receive())
Next i

'          Output:
'            3.16227766016838
'            4.47213595499958
'            5.47722557505166
'          

Per esempi completi in cui si usa l'oggetto TransformBlock<TInput,TOutput> in una rete di blocchi di flussi di dati nella quale viene eseguita l'elaborazione di immagini in un'applicazione Windows Forms, vedere Procedura dettagliata: Uso del flusso di dati in un'applicazione Windows Forms.

TransformManyBlock<TInput, TOutput>

La classe TransformManyBlock<TInput,TOutput> è simile alla classe TransformBlock<TInput,TOutput>, ad eccezione del fatto che tramite l'oggetto TransformManyBlock<TInput,TOutput> vengono generati zero o più valori di output per ogni valore di input anziché un solo valore di output per ogni valore di input. Il delegato fornito a un oggetto TransformManyBlock<TInput,TOutput> può essere di tipo System.Func<TInput, IEnumerable<TOutput>> o di tipo System.Func<TInput, Task<IEnumerable<TOutput>>>. Quando si utilizza un oggetto TransformManyBlock<TInput,TOutput> con System.Func<TInput, IEnumerable<TOutput>>, l'elaborazione di ogni elemento di input viene considerata completata alla restituzione del delegato. Quando si utilizza un oggetto TransformManyBlock<TInput,TOutput> con System.Func<TInput, Task<IEnumerable<TOutput>>>, l'elaborazione di ogni elemento di input viene considerata completa solo quando l'oggetto restituito System.Threading.Tasks.Task<IEnumerable<TOutput>> viene completato.

Nell'esempio di base seguente viene creato un oggetto TransformManyBlock<TInput,TOutput> tramite cui le stringhe vengono suddivise in singole sequenze di caratteri. L'oggetto TransformManyBlock<TInput,TOutput> accetta valori String come input e tramite esso vengono generati valori Char come output.

// Create a TransformManyBlock<string, char> object that splits
// a string into its individual characters.
var transformManyBlock = new TransformManyBlock<string, char>(
   s => s.ToCharArray());

// Post two messages to the first block.
transformManyBlock.Post("Hello");
transformManyBlock.Post("World");

// Receive all output values from the block.
for (int i = 0; i < ("Hello" + "World").Length; i++)
{
   Console.WriteLine(transformManyBlock.Receive());
}

/* Output:
   H
   e
   l
   l
   o
   W
   o
   r
   l
   d
 */
' Create a TransformManyBlock<string, char> object that splits
' a string into its individual characters.
Dim transformManyBlock = New TransformManyBlock(Of String, Char)(Function(s) s.ToCharArray())

' Post two messages to the first block.
transformManyBlock.Post("Hello")
transformManyBlock.Post("World")

' Receive all output values from the block.
For i As Integer = 0 To ("Hello" & "World").Length - 1
    Console.WriteLine(transformManyBlock.Receive())
Next i

'          Output:
'            H
'            e
'            l
'            l
'            o
'            W
'            o
'            r
'            l
'            d
'          

Per esempi completi in cui si usa TransformManyBlock<TInput,TOutput> per generare più output indipendenti per ogni input in una pipeline del flusso di dati, vedere Procedura dettagliata: Creazione di una pipeline del flusso di dati.

Grado di parallelismo

Tramite ogni oggetto ActionBlock<TInput>, TransformBlock<TInput,TOutput> e TransformManyBlock<TInput,TOutput> i messaggi di input vengono inseriti nel buffer finché il blocco non è pronto per elaborarli. Per impostazione predefinita, i messaggi vengono elaborati da queste classi nell'ordine in cui vengono ricevuti, un messaggio alla volta. È inoltre possibile specificare il grado di parallelismo per consentire l'elaborazione simultanea di più messaggi da parte degli oggetti ActionBlock<TInput>, TransformBlock<TInput,TOutput> e TransformManyBlock<TInput,TOutput>. Per ulteriori informazioni sull'esecuzione simultanea, vedere la sezione Specifica del grado di parallelismo più avanti in questo documento. Per un esempio in cui viene impostato il grado di parallelismo per consentire l'elaborazione di più di un messaggio alla volta da parte di un blocco di flussi di dati di esecuzione, vedere Procedura: specificare il grado di parallelismo in un blocco di flussi di dati.

Riepilogo dei tipi delegati

Nella tabella seguente vengono riepilogati i tipi delegati che possono essere forniti agli oggetti ActionBlock<TInput>, TransformBlock<TInput,TOutput> e TransformManyBlock<TInput,TOutput>. Viene inoltre specificato se il tipo delegato viene eseguito in modo sincrono o asincrono.

Type Tipo delegato sincrono Tipo delegato asincrono
ActionBlock<TInput> System.Action System.Func<TInput, Task>
TransformBlock<TInput,TOutput> System.Func<TInput, TOutput> System.Func<TInput, Task<TOutput>>
TransformManyBlock<TInput,TOutput> System.Func<TInput, IEnumerable<TOutput>> System.Func<TInput, Task<IEnumerable<TOutput>>>

È inoltre possibile utilizzare le espressioni lambda quando si utilizzano tipi di blocchi di esecuzione. Per un esempio in cui viene illustrato l'uso di un'espressione lambda con un blocco di esecuzione, vedere Procedura: eseguire un'azione alla ricezione di dati in un blocco di flussi di dati.

Blocchi di raggruppamento

Con i blocchi di raggruppamento è possibile combinare i dati da una o più origini e con vari vincoli. La libreria del flusso di dati TPL fornisce tre tipi di blocchi join: BatchBlock<T>, JoinBlock<T1,T2> e BatchedJoinBlock<T1,T2>.

BatchBlock<T>

Tramite la classe BatchBlock<T> vengono combinati set di dati di input, noti come batch, in matrici di dati di output. È possibile specificare le dimensioni di ciascun batch quando si crea un oggetto BatchBlock<T>. Quando tramite l'oggetto BatchBlock<T> si riceve il numero specificato di elementi di input, viene propagata all'esterno in modo asincrono una matrice contenente gli elementi in questione. Se un oggetto BatchBlock<T> è impostato sullo stato completato ma in esso non sono contenuti elementi sufficienti per formare un batch, viene propagata all'esterno una matrice finale contenente gli elementi di input rimanenti.

La classe BatchBlock<T> viene eseguita in modalità greedy o non greedy. In modalità greedy, vale a dire l'impostazione predefinita, un oggetto BatchBlock<T> accetta ogni messaggio che viene offerto e propaga all'esterno una matrice dopo la ricezione del numero specificato di elementi. In modalità non greedy, tramite un oggetto BatchBlock<T> vengono posticipati tutti i messaggi in ingresso finché da parte di un numero sufficiente di origini non vengono offerti messaggi al blocco per formare un batch. L'esecuzione in modalità greedy è in genere migliore rispetto a quella non greedy dal momento che necessita di un sovraccarico inferiore di elaborazione. Tuttavia, è possibile utilizzare la modalità non greedy quando è necessario coordinare l'utilizzo da più origini in modo atomico. Specificare la modalità non greedy impostando la proprietà Greedy su False nel parametro dataflowBlockOptions nel costruttore BatchBlock<T>.

Nell'esempio di base seguente vengono inseriti alcuni valori Int32 in un oggetto BatchBlock<T> contenente dieci elementi in un batch. Per garantire che tutti i valori vengano propagati all'esterno dell'oggetto BatchBlock<T>, in questo esempio viene chiamato il metodo Complete. Tramite il metodo Complete viene impostato l'oggetto BatchBlock<T> sullo stato completato e, pertanto, tramite l'oggetto BatchBlock<T> vengono propagati all'esterno tutti gli elementi rimanenti come batch finale.

// Create a BatchBlock<int> object that holds ten
// elements per batch.
var batchBlock = new BatchBlock<int>(10);

// Post several values to the block.
for (int i = 0; i < 13; i++)
{
   batchBlock.Post(i);
}
// Set the block to the completed state. This causes
// the block to propagate out any remaining
// values as a final batch.
batchBlock.Complete();

// Print the sum of both batches.

Console.WriteLine("The sum of the elements in batch 1 is {0}.",
   batchBlock.Receive().Sum());

Console.WriteLine("The sum of the elements in batch 2 is {0}.",
   batchBlock.Receive().Sum());

/* Output:
   The sum of the elements in batch 1 is 45.
   The sum of the elements in batch 2 is 33.
 */
' Create a BatchBlock<int> object that holds ten
' elements per batch.
Dim batchBlock = New BatchBlock(Of Integer)(10)

' Post several values to the block.
For i As Integer = 0 To 12
    batchBlock.Post(i)
Next i
' Set the block to the completed state. This causes
' the block to propagate out any remaining
' values as a final batch.
batchBlock.Complete()

' Print the sum of both batches.

Console.WriteLine("The sum of the elements in batch 1 is {0}.", batchBlock.Receive().Sum())

Console.WriteLine("The sum of the elements in batch 2 is {0}.", batchBlock.Receive().Sum())

'          Output:
'            The sum of the elements in batch 1 is 45.
'            The sum of the elements in batch 2 is 33.
'          

Per un esempio completo in cui si usa BatchBlock<T> per migliorare l'efficienza delle operazioni di inserimento del database, vedere Procedura dettagliata: Uso di BatchBlock e BatchedJoinBlock per migliorare l'efficienza.

JoinBlock<T1, T2, ...>

Tramite le classi JoinBlock<T1,T2> e JoinBlock<T1,T2,T3> vengono raccolti elementi di input e propagati all'esterno oggetti System.Tuple<T1,T2> o System.Tuple<T1,T2,T3> contenenti gli elementi in questione. Le classi JoinBlock<T1,T2> e JoinBlock<T1,T2,T3> non ereditano da ITargetBlock<TInput>. Forniscono invece proprietà, Target1, Target2 e Target3, tramite cui viene implementato l'oggetto ITargetBlock<TInput>.

Come BatchBlock<T>, JoinBlock<T1,T2> e JoinBlock<T1,T2,T3> operano in modalità greedy o non greedy. In modalità greedy, vale a dire l'impostazione predefinita, un oggetto JoinBlock<T1,T2> o JoinBlock<T1,T2,T3> accetta ogni messaggio che viene offerto e tramite esso viene propagata all'esterno una tupla dopo che da ciascuna delle relative destinazioni viene ricevuto almeno un messaggio. In modalità non greedy, tramite un oggetto JoinBlock<T1,T2> o JoinBlock<T1,T2,T3> vengono posticipati tutti i messaggi in ingresso finché a tutte le destinazioni non sono stati offerti i dati richiesti per creare una tupla. A questo punto, nel blocco viene utilizzato un protocollo di commit a due fasi per recuperare atomicamente tutti gli elementi richiesti dalle origini. Questo rinvio consente a un'altra entità di utilizzare i dati nel frattempo per permettere al sistema complessivo di avanzare.

Nell'esempio di base seguente viene illustrato un caso in cui per un oggetto JoinBlock<T1,T2,T3> sono richiesti più dati per il calcolo di un valore. In questo esempio viene creato un oggetto JoinBlock<T1,T2,T3> per cui sono richiesti due valori Int32 e un valore Char per eseguire un'operazione aritmetica.

// Create a JoinBlock<int, int, char> object that requires
// two numbers and an operator.
var joinBlock = new JoinBlock<int, int, char>();

// Post two values to each target of the join.

joinBlock.Target1.Post(3);
joinBlock.Target1.Post(6);

joinBlock.Target2.Post(5);
joinBlock.Target2.Post(4);

joinBlock.Target3.Post('+');
joinBlock.Target3.Post('-');

// Receive each group of values and apply the operator part
// to the number parts.

for (int i = 0; i < 2; i++)
{
   var data = joinBlock.Receive();
   switch (data.Item3)
   {
      case '+':
         Console.WriteLine("{0} + {1} = {2}",
            data.Item1, data.Item2, data.Item1 + data.Item2);
         break;
      case '-':
         Console.WriteLine("{0} - {1} = {2}",
            data.Item1, data.Item2, data.Item1 - data.Item2);
         break;
      default:
         Console.WriteLine("Unknown operator '{0}'.", data.Item3);
         break;
   }
}

/* Output:
   3 + 5 = 8
   6 - 4 = 2
 */
' Create a JoinBlock<int, int, char> object that requires
' two numbers and an operator.
Dim joinBlock = New JoinBlock(Of Integer, Integer, Char)()

' Post two values to each target of the join.

joinBlock.Target1.Post(3)
joinBlock.Target1.Post(6)

joinBlock.Target2.Post(5)
joinBlock.Target2.Post(4)

joinBlock.Target3.Post("+"c)
joinBlock.Target3.Post("-"c)

' Receive each group of values and apply the operator part
' to the number parts.

For i As Integer = 0 To 1
    Dim data = joinBlock.Receive()
    Select Case data.Item3
        Case "+"c
            Console.WriteLine("{0} + {1} = {2}", data.Item1, data.Item2, data.Item1 + data.Item2)
        Case "-"c
            Console.WriteLine("{0} - {1} = {2}", data.Item1, data.Item2, data.Item1 - data.Item2)
        Case Else
            Console.WriteLine("Unknown operator '{0}'.", data.Item3)
    End Select
Next i

'          Output:
'            3 + 5 = 8
'            6 - 4 = 2
'          

Per un esempio completo in cui vengono usati oggetti JoinBlock<T1,T2> in modalità non greedy per condividere una risorsa in modo cooperativo, vedere Procedura: Usare JoinBlock per leggere dati da più origini.

BatchedJoinBlock<T1, T2, ...>

Tramite le classi BatchedJoinBlock<T1,T2> e BatchedJoinBlock<T1,T2,T3> vengono raccolti batch di elementi di input e vengono propagati all'esterno oggetti System.Tuple(IList(T1), IList(T2)) o System.Tuple(IList(T1), IList(T2), IList(T3)) contenenti gli elementi in questione. Si consideri BatchedJoinBlock<T1,T2> come una combinazione di BatchBlock<T> e JoinBlock<T1,T2>. Specificare le dimensioni di ciascun batch quando si crea un oggetto BatchedJoinBlock<T1,T2>. BatchedJoinBlock<T1,T2> fornisce inoltre le proprietà, Target1 e Target2, tramite cui viene implementato ITargetBlock<TInput>. Quando il numero specificato di elementi di input viene ricevuto attraverso tutte le destinazioni, tramite l'oggetto BatchedJoinBlock<T1,T2> viene propagato all'esterno in modo asincrono un oggetto System.Tuple(IList(T1), IList(T2)) contenente gli elementi in questione.

Nell'esempio di base seguente viene creato un oggetto BatchedJoinBlock<T1,T2> contenente i risultati, i valori Int32 e gli errori che sono oggetti Exception. In questo esempio vengono eseguite più operazioni e vengono scritti i risultati nella proprietà Target1 e gli errori nella proprietà Target2 dell'oggetto BatchedJoinBlock<T1,T2>. Poiché il conteggio delle operazioni riuscite e non riuscite non è noto in anticipo, gli oggetti IList<T> consentono a ogni destinazione di ricevere zero o più valori.

// For demonstration, create a Func<int, int> that
// returns its argument, or throws ArgumentOutOfRangeException
// if the argument is less than zero.
Func<int, int> DoWork = n =>
{
   if (n < 0)
      throw new ArgumentOutOfRangeException();
   return n;
};

// Create a BatchedJoinBlock<int, Exception> object that holds
// seven elements per batch.
var batchedJoinBlock = new BatchedJoinBlock<int, Exception>(7);

// Post several items to the block.
foreach (int i in new int[] { 5, 6, -7, -22, 13, 55, 0 })
{
   try
   {
      // Post the result of the worker to the
      // first target of the block.
      batchedJoinBlock.Target1.Post(DoWork(i));
   }
   catch (ArgumentOutOfRangeException e)
   {
      // If an error occurred, post the Exception to the
      // second target of the block.
      batchedJoinBlock.Target2.Post(e);
   }
}

// Read the results from the block.
var results = batchedJoinBlock.Receive();

// Print the results to the console.

// Print the results.
foreach (int n in results.Item1)
{
   Console.WriteLine(n);
}
// Print failures.
foreach (Exception e in results.Item2)
{
   Console.WriteLine(e.Message);
}

/* Output:
   5
   6
   13
   55
   0
   Specified argument was out of the range of valid values.
   Specified argument was out of the range of valid values.
 */
' For demonstration, create a Func<int, int> that 
' returns its argument, or throws ArgumentOutOfRangeException
' if the argument is less than zero.
Dim DoWork As Func(Of Integer, Integer) = Function(n)
                                              If n < 0 Then
                                                  Throw New ArgumentOutOfRangeException()
                                              End If
                                              Return n
                                          End Function

' Create a BatchedJoinBlock<int, Exception> object that holds 
' seven elements per batch.
Dim batchedJoinBlock = New BatchedJoinBlock(Of Integer, Exception)(7)

' Post several items to the block.
For Each i As Integer In New Integer() {5, 6, -7, -22, 13, 55, 0}
    Try
        ' Post the result of the worker to the 
        ' first target of the block.
        batchedJoinBlock.Target1.Post(DoWork(i))
    Catch e As ArgumentOutOfRangeException
        ' If an error occurred, post the Exception to the 
        ' second target of the block.
        batchedJoinBlock.Target2.Post(e)
    End Try
Next i

' Read the results from the block.
Dim results = batchedJoinBlock.Receive()

' Print the results to the console.

' Print the results.
For Each n As Integer In results.Item1
    Console.WriteLine(n)
Next n
' Print failures.
For Each e As Exception In results.Item2
    Console.WriteLine(e.Message)
Next e

'          Output:
'            5
'            6
'            13
'            55
'            0
'            Specified argument was out of the range of valid values.
'            Specified argument was out of the range of valid values.
'          

Per un esempio completo in cui si usa BatchedJoinBlock<T1,T2> per acquisire sia i risultati che tutte le eccezioni che si verificano durante la lettura da database da parte del programma, vedere Procedura dettagliata: Uso di BatchBlock e BatchedJoinBlock per migliorare l'efficienza.

Configurazione del comportamento dei blocchi di flussi di dati

È possibile abilitare opzioni aggiuntive fornendo un oggetto System.Threading.Tasks.Dataflow.DataflowBlockOptions al costruttore dei tipi di blocchi di flussi di dati. Tramite queste opzioni viene controllato il comportamento dell'utilità di pianificazione mediante la quale vengono gestiti l'attività sottostante e il grado di parallelismo. L'oggetto DataflowBlockOptions dispone inoltre di tipi derivati tramite cui viene fornito il comportamento specifico di determinati tipi di blocchi di flussi di dati. Nella tabella seguente viene riepilogato il tipo di opzioni associato a ogni tipo di blocco di flussi di dati.

Tipo di blocco di flussi di dati Tipo diDataflowBlockOptions
BufferBlock<T> DataflowBlockOptions
BroadcastBlock<T> DataflowBlockOptions
WriteOnceBlock<T> DataflowBlockOptions
ActionBlock<TInput> ExecutionDataflowBlockOptions
TransformBlock<TInput,TOutput> ExecutionDataflowBlockOptions
TransformManyBlock<TInput,TOutput> ExecutionDataflowBlockOptions
BatchBlock<T> GroupingDataflowBlockOptions
JoinBlock<T1,T2> GroupingDataflowBlockOptions
BatchedJoinBlock<T1,T2> GroupingDataflowBlockOptions

Nelle sezioni seguenti vengono fornite informazioni aggiuntive sui principali tipi di opzioni dei blocchi di flussi di dati disponibili nelle classi System.Threading.Tasks.Dataflow.DataflowBlockOptions, System.Threading.Tasks.Dataflow.ExecutionDataflowBlockOptions e System.Threading.Tasks.Dataflow.GroupingDataflowBlockOptions.

Specifica dell'Utilità di pianificazione

In ogni blocco di flussi di dati predefinito viene utilizzato il meccanismo di pianificazione delle attività TPL per eseguire attività quali la propagazione dei dati in una destinazione, la ricezione dei dati da un'origine e l'esecuzione di delegati definiti dall'utente quando i dati diventano disponibili. TaskScheduler è una classe astratta che rappresenta un'Utilità di pianificazione mediante la quale vengono inserite in coda le attività nei thread. Tramite l'Utilità di pianificazione predefinita, Default, viene utilizzata la classe ThreadPool per l'inserimento in coda e l'esecuzione del lavoro. È possibile eseguire l'override dell'Utilità di pianificazione predefinita impostando la proprietà TaskScheduler quando si costruisce un oggetto del blocco di flussi di dati.

Quando tramite la stessa Utilità di pianificazione vengono gestiti più blocchi di flussi di dati, è possibile applicarvi dei criteri. Ad esempio, se più blocchi di flussi di dati sono configurati ognuno per utilizzare l'utilità di pianificazione in esclusiva dello stesso oggetto ConcurrentExclusiveSchedulerPair, tutto il lavoro eseguito tramite questi blocchi viene serializzato. Analogamente, se questi blocchi sono configurati per utilizzare l'utilità di pianificazione in concorrenza dello stesso oggetto ConcurrentExclusiveSchedulerPair e l'utilità in questione è configurata per disporre di un livello massimo di concorrenza, tutto il lavoro di questi blocchi è limitato a quel numero di operazioni simultanee. Per un esempio in cui si usa la classe ConcurrentExclusiveSchedulerPair per consentire l'esecuzione in parallelo delle operazioni di lettura, mentre le operazioni di scrittura vengono eseguite separatamente da tutte le altre operazioni, vedere Procedura: Specificare un'utilità di pianificazione in un blocco di flussi di dati. Per altre informazioni sulle utilità di pianificazione delle attività in TPL, vedere l'argomento della classe TaskScheduler.

Specifica del grado di parallelismo

Per impostazione predefinita, tramite i tre tipi di blocchi di esecuzione forniti dalla libreria del flusso di dati TPL, ActionBlock<TInput>, TransformBlock<TInput,TOutput> e TransformManyBlock<TInput,TOutput>, viene elaborato un messaggio alla volta. Tramite questi tipi di blocchi di flussi di dati i messaggi vengono inoltre elaborati nell'ordine in cui vengono ricevuti. Per consentire a questi blocchi di flussi di dati di elaborare i messaggi contemporaneamente, impostare la proprietà ExecutionDataflowBlockOptions.MaxDegreeOfParallelism quando si crea l'oggetto del blocco di flussi di dati.

Il valore predefinito di MaxDegreeOfParallelism è 1, che garantisce l'elaborazione di un solo messaggio per volta da parte del blocco di flussi di dati. Impostando questa proprietà su un valore maggiore di 1 possono essere elaborati più messaggi contemporaneamente da parte del blocco di flussi di dati. Impostando questa proprietà su DataflowBlockOptions.Unbounded è possibile la gestione del livello massimo di concorrenza da parte dell'Utilità di pianificazione sottostante.

Importante

Quando si specifica un grado massimo di parallelismo maggiore di 1, vengono elaborati contemporaneamente più messaggi, pertanto questi ultimi potrebbero non essere elaborati nell'ordine in cui vengono ricevuti. L'ordine in cui i messaggi vengono restituiti dal blocco è, tuttavia, lo stesso in cui vengono ricevuti.

Poiché la proprietà MaxDegreeOfParallelism rappresenta il grado massimo di parallelismo, il blocco di flussi di dati può essere eseguito con un grado di parallelismo minore rispetto a quello specificato. È possibile che nel blocco di flussi di dati venga utilizzato un grado di parallelismo minore per soddisfare i requisiti funzionali o perché vi è una mancanza di risorse di sistema disponibili. Da parte di un blocco di flussi di dati non viene mai scelto un parallelismo maggiore di quello specificato.

Il valore della proprietà MaxDegreeOfParallelism è esclusivo per ogni oggetto del blocco di flussi di dati. Ad esempio, se da ognuno dei quattro oggetti del blocco di flussi di dati viene specificato 1 come massimo grado di parallelismo, tutti e quattro gli oggetti del blocco di flussi di dati possono essere potenzialmente eseguiti in parallelo.

Per un esempio in cui si imposta il grado massimo di parallelismo per consentire l'esecuzione in parallelo di operazioni lunghe, vedere Procedura: specificare il grado di parallelismo in un blocco di flussi di dati.

Specifica del numero di messaggi per attività

Nei tipi di blocchi di flussi di dati predefiniti vengono utilizzate attività per elaborare più elementi di input. In questo modo è possibile ridurre il numero di oggetti di attività necessari per elaborare i dati, consentendo alle applicazioni un'esecuzione più efficiente. Tuttavia, quando tramite le attività di un set di blocchi di flussi di dati vengono elaborati i dati, è possibile che le attività di altri blocchi di flussi di dati debbano attendere il tempo di elaborazione inserendo in coda i messaggi. Per abilitare la migliore equità tra attività del flusso di dati, impostare la proprietà MaxMessagesPerTask. Quando la proprietà MaxMessagesPerTask è impostata su DataflowBlockOptions.Unbounded, vale a dire l'impostazione predefinita, tramite l'attività utilizzata da un blocco di flussi di dati vengono elaborati tutti i messaggi che sono disponibili. Quando la proprietà MaxMessagesPerTask è impostata su un valore diverso da Unbounded, tramite il blocco di flusso di dati viene elaborato al massimo questo numero di messaggi per l'oggetto Task. Sebbene l'impostazione della proprietà MaxMessagesPerTask possa aumentare l'equità tra le attività, può causare la generazione di più attività rispetto al necessario, che possono ridurre le prestazioni.

Abilitazione dell'annullamento

TPL fornisce un meccanismo che consente alle attività di coordinare l'annullamento in modo cooperativo. Per consentire ai blocchi di flussi di dati di far parte di questo meccanismo di annullamento, impostare la proprietà CancellationToken. Quando l'oggetto CancellationToken è impostato sullo stato di annullamento, da parte di tutti i blocchi di flussi di dati che controllano questo token viene terminata l'esecuzione dell'elemento corrente ma non viene avviata l'elaborazione degli elementi successivi. Tramite questi blocchi di flussi di dati vengono inoltre cancellati tutti i messaggi memorizzati nel buffer, vengono rilasciate le connessioni a tutti i blocchi di origine e di destinazione e viene eseguita la transizione allo stato di annullamento. Con la transizione allo stato di annullamento, alla proprietà Completion è associata la proprietà Status impostata su Canceled, a meno che non venga generata un'eccezione durante l'elaborazione. In questo caso, la proprietà Status è impostata su Faulted.

Per un esempio che illustri l'uso dell'annullamento in un'applicazione Windows Forms, vedere Procedura: annullare un blocco di flussi di dati. Per altre informazioni sull'annullamento in TPL, vedere Annullamento delle attività.

Specifica del comportamento greedy e non greedy a confronto

Diversi tipi di blocchi di flussi di dati di raggruppamento possono essere eseguiti in modalità greedy o non-greedy. Per impostazione predefinita, i tipi di blocchi di flussi di dati predefiniti vengono eseguiti in modalità greedy.

Per i tipi di blocchi join come JoinBlock<T1,T2>, la modalità greedy indica che il blocco accetta immediatamente i dati anche se quelli corrispondenti con cui è stato creato un join non sono ancora disponibili. La modalità non greedy indica che nel blocco vengono posticipati tutti i messaggi in ingresso finché non ne è disponibile uno per ciascuna delle relative destinazioni per completare il join. Se uno dei messaggi posposti non è più disponibile, tramite il blocco join vengono rilasciati tutti i messaggi posposti e viene riavviato il processo. Per la classe BatchBlock<T>, il comportamento greedy e non greedy è simile, ad eccezione del fatto che nella modalità non greedy, tramite un oggetto BatchBlock<T> vengono posticipati tutti i messaggi in ingresso finché non è disponibile un numero sufficiente da origini diverse per completare un batch.

Per specificare la modalità non greedy per un blocco di flussi di dati, impostare la proprietà Greedy su False. Per un esempio in cui viene illustrato l'uso della modalità non-greedy per consentire la condivisione di un'origine dati in modo più efficiente da parte di più blocchi join, vedere Procedura: usare JoinBlock per leggere dati da più origini.

Blocchi di flussi di dati personalizzati

Anche se la libreria del flusso di dati TPL fornisce molti tipi di blocchi predefiniti, è possibile crearne degli altri tramite cui eseguire comportamenti personalizzati. Implementare le interfacce ISourceBlock<TOutput> o ITargetBlock<TInput> direttamente o utilizzare il metodo Encapsulate per compilare un blocco complesso in cui viene incapsulato il comportamento di tipi di blocchi esistenti. Per esempi in cui viene illustrata l'implementazione delle funzionalità personalizzate dei blocchi di flussi di dati, vedere Procedura dettagliata: creazione di un tipo di blocco di flussi di dati personalizzato.

Posizione Descrizione
Procedura: scrivere messaggi in un blocco di flussi di dati e leggere messaggi da un blocco di flussi di dati Viene illustrato come scrivere messaggi in un oggetto BufferBlock<T> e come leggerli da quest'ultimo.
Procedura: implementare un modello di flusso di dati Producer-Consumer Viene descritto come utilizzare il modello di flusso di dati per implementare uno schema producer-consumer, in cui il producer invia messaggi a un blocco di flussi di dati e il consumer legge i messaggi dal blocco in questione.
Procedura: eseguire un'azione alla ricezione di dati in un blocco di flussi di dati Viene descritto come fornire delegati ai tipi di blocchi di flussi di dati di esecuzione, ActionBlock<TInput>, TransformBlock<TInput,TOutput> e TransformManyBlock<TInput,TOutput>.
Procedura dettagliata: creazione di una pipeline del flusso di dati Viene descritto come creare una pipeline di flusso di dati tramite cui viene scaricato un testo dal Web e vengono eseguite operazioni su questo testo.
Procedura: scollegare i blocchi di flussi di dati Illustra come usare il metodo LinkTo per scollegare un blocco di destinazione dall'origine dopo che l'origine ha offerto un messaggio alla destinazione.
Procedura dettagliata: uso del flusso di dati in un'applicazione Windows Forms Viene illustrato come creare una rete di blocchi di flussi di dati tramite cui viene eseguita l'elaborazione di immagini in un'applicazione Windows Form.
Procedura: annullare un blocco di flussi di dati Viene illustrato come utilizzare l'annullamento in un'applicazione Windows Form.
Procedura: usare JoinBlock per leggere dati da più origini Viene illustrato come utilizzare la classe JoinBlock<T1,T2> per eseguire un'operazione quando i dati sono disponibili da più origini e come utilizzare la modalità non greedy per consentire a più blocchi join di condividere in modo più efficiente un'origine dati.
Procedura: specificare il grado di parallelismo in un blocco di flussi di dati Viene illustrato come impostare la proprietà MaxDegreeOfParallelism per consentire a un blocco di flussi di dati di esecuzione di elaborare più messaggi alla volta.
Procedura: specificare un'utilità di pianificazione in un blocco di flussi di dati Viene illustrato come associare una specifica Utilità di pianificazione quando si utilizza il flusso di dati nell'applicazione.
Procedura dettagliata: uso di BatchBlock e BatchedJoinBlock per migliorare l'efficienza Viene descritto come utilizzare la classe BatchBlock<T> per migliorare l'efficienza delle operazioni di inserimento nel database e come utilizzare la classe BatchedJoinBlock<T1,T2> per acquisire i risultati e tutte le eccezioni che si verificano durante la lettura da un database da parte del programma.
Procedura dettagliata: creazione di un tipo di blocco di flussi di dati personalizzato Vengono illustrati due modi per creare un tipo di blocco di flussi di dati tramite cui viene implementato il comportamento personalizzato.
Task Parallel Library (TPL) Viene introdotta TPL, una libreria tramite cui viene semplificata la programmazione parallela e concorrente nelle applicazioni .NET Framework.