Procedura: scollegare i blocchi di flussi di dati
In questo documento viene descritto come scollegare un blocco di flussi di dati di destinazione dalla relativa origine.
Nota
La libreria del flusso di dati TPL (spazio dei nomi System.Threading.Tasks.Dataflow) non viene distribuita con .NET. Per installare lo spazio dei nomi System.Threading.Tasks.Dataflow in Visual Studio, aprire il progetto in Visual Studio, scegliere Gestisci pacchetti NuGet dal menu Progetto ed eseguire una ricerca online del pacchetto System.Threading.Tasks.Dataflow
. In alternativa, per installarlo usando l'interfaccia della riga di comando di .NET Core, eseguire dotnet add package System.Threading.Tasks.Dataflow
.
Esempio
Nell'esempio seguente vengono creati tre oggetti TransformBlock<TInput,TOutput>, tramite ognuno dei quali viene chiamato il metodo TrySolution
per calcolare un valore. Per questo esempio è necessario solo il risultato della prima chiamata a TrySolution
per il completamento.
using System;
using System.Threading;
using System.Threading.Tasks.Dataflow;
// Demonstrates how to unlink dataflow blocks.
class DataflowReceiveAny
{
// Receives the value from the first provided source that has
// a message.
public static T ReceiveFromAny<T>(params ISourceBlock<T>[] sources)
{
// Create a WriteOnceBlock<T> object and link it to each source block.
var writeOnceBlock = new WriteOnceBlock<T>(e => e);
foreach (var source in sources)
{
// Setting MaxMessages to one instructs
// the source block to unlink from the WriteOnceBlock<T> object
// after offering the WriteOnceBlock<T> object one message.
source.LinkTo(writeOnceBlock, new DataflowLinkOptions { MaxMessages = 1 });
}
// Return the first value that is offered to the WriteOnceBlock object.
return writeOnceBlock.Receive();
}
// Demonstrates a function that takes several seconds to produce a result.
static int TrySolution(int n, CancellationToken ct)
{
// Simulate a lengthy operation that completes within three seconds
// or when the provided CancellationToken object is cancelled.
SpinWait.SpinUntil(() => ct.IsCancellationRequested,
new Random().Next(3000));
// Return a value.
return n + 42;
}
static void Main(string[] args)
{
// Create a shared CancellationTokenSource object to enable the
// TrySolution method to be cancelled.
var cts = new CancellationTokenSource();
// Create three TransformBlock<int, int> objects.
// Each TransformBlock<int, int> object calls the TrySolution method.
Func<int, int> action = n => TrySolution(n, cts.Token);
var trySolution1 = new TransformBlock<int, int>(action);
var trySolution2 = new TransformBlock<int, int>(action);
var trySolution3 = new TransformBlock<int, int>(action);
// Post data to each TransformBlock<int, int> object.
trySolution1.Post(11);
trySolution2.Post(21);
trySolution3.Post(31);
// Call the ReceiveFromAny<T> method to receive the result from the
// first TransformBlock<int, int> object to finish.
int result = ReceiveFromAny(trySolution1, trySolution2, trySolution3);
// Cancel all calls to TrySolution that are still active.
cts.Cancel();
// Print the result to the console.
Console.WriteLine("The solution is {0}.", result);
cts.Dispose();
}
}
/* Sample output:
The solution is 53.
*/
Imports System.Threading
Imports System.Threading.Tasks.Dataflow
' Demonstrates how to unlink dataflow blocks.
Friend Class DataflowReceiveAny
' Receives the value from the first provided source that has
' a message.
Public Shared Function ReceiveFromAny(Of T)(ParamArray ByVal sources() As ISourceBlock(Of T)) As T
' Create a WriteOnceBlock<T> object and link it to each source block.
Dim writeOnceBlock = New WriteOnceBlock(Of T)(Function(e) e)
For Each source In sources
' Setting MaxMessages to one instructs
' the source block to unlink from the WriteOnceBlock<T> object
' after offering the WriteOnceBlock<T> object one message.
source.LinkTo(writeOnceBlock, New DataflowLinkOptions With {.MaxMessages = 1})
Next source
' Return the first value that is offered to the WriteOnceBlock object.
Return writeOnceBlock.Receive()
End Function
' Demonstrates a function that takes several seconds to produce a result.
Private Shared Function TrySolution(ByVal n As Integer, ByVal ct As CancellationToken) As Integer
' Simulate a lengthy operation that completes within three seconds
' or when the provided CancellationToken object is cancelled.
SpinWait.SpinUntil(Function() ct.IsCancellationRequested, New Random().Next(3000))
' Return a value.
Return n + 42
End Function
Shared Sub Main(ByVal args() As String)
' Create a shared CancellationTokenSource object to enable the
' TrySolution method to be cancelled.
Dim cts = New CancellationTokenSource()
' Create three TransformBlock<int, int> objects.
' Each TransformBlock<int, int> object calls the TrySolution method.
Dim action As Func(Of Integer, Integer) = Function(n) TrySolution(n, cts.Token)
Dim trySolution1 = New TransformBlock(Of Integer, Integer)(action)
Dim trySolution2 = New TransformBlock(Of Integer, Integer)(action)
Dim trySolution3 = New TransformBlock(Of Integer, Integer)(action)
' Post data to each TransformBlock<int, int> object.
trySolution1.Post(11)
trySolution2.Post(21)
trySolution3.Post(31)
' Call the ReceiveFromAny<T> method to receive the result from the
' first TransformBlock<int, int> object to finish.
Dim result As Integer = ReceiveFromAny(trySolution1, trySolution2, trySolution3)
' Cancel all calls to TrySolution that are still active.
cts.Cancel()
' Print the result to the console.
Console.WriteLine("The solution is {0}.", result)
cts.Dispose()
End Sub
End Class
' Sample output:
'The solution is 53.
'
Per ricevere il valore dal primo oggetto TransformBlock<TInput,TOutput> completato, nell'esempio viene definito il metodo ReceiveFromAny(T)
. Il metodo ReceiveFromAny(T)
accetta una matrice di oggetti ISourceBlock<TOutput> e tramite esso viene collegato ognuno di questi oggetti a un oggetto WriteOnceBlock<T>. Quando si utilizza il metodo LinkTo per collegare un blocco di flussi di dati di origine a un blocco di destinazione, tramite l'origine i messaggi vengono propagati nella destinazione quando i dati diventano disponibili. Poiché la classe WriteOnceBlock<T> accetta solo il primo messaggio offerto, tramite il metodo ReceiveFromAny(T)
viene generato il relativo risultato chiamando il metodo Receive. In questo modo viene generato il primo messaggio offerto all'oggetto WriteOnceBlock<T>. Il metodo LinkTo ha una versione sottoposta a overload che accetta un oggetto DataflowLinkOptions con una proprietà MaxMessages che, quando viene impostata su 1
, indica al blocco di origine di scollegarsi dalla destinazione dopo la ricezione di un messaggio dall'origine da parte della destinazione. È importante per l'oggetto WriteOnceBlock<T> scollegarsi dalle relative origini perché la relazione tra la matrice delle origini e l'oggetto WriteOnceBlock<T> non è più richiesta dopo la ricezione di un messaggio da parte dell'oggetto WriteOnceBlock<T>.
Per abilitare il completamento delle chiamate rimanenti a TrySolution
dopo che tramite una di esse viene calcolato un valore, il metodo TrySolution
accetta un oggetto CancellationToken che viene annullato dopo che la chiamata a ReceiveFromAny(T)
ha restituito un valore. Tramite il metodo SpinUntil viene restituito un valore quando questo oggetto CancellationToken viene annullato.