Cenni preliminari su BlockingCollection
BlockingCollection<T> è una classe di insiemi thread-safe che fornisce le funzionalità seguenti:
Implementazione del modello producer-consumer.
Aggiunta e rimozione simultanea di elementi da più thread.
Capacità massima facoltativa.
Operazioni di inserimento e rimozione che si bloccano quando l'insieme è vuoto o pieno.
Operazioni "di tentativo" di inserimento e rimozione che non si bloccano o che si bloccano per un determinato periodo di tempo.
Incapsulamento di qualsiasi tipo di insieme che implementa IProducerConsumerCollection<T>.
Annullamento con i token di annullamento.
Due tipi di enumerazione con foreach (For Each in Visual Basic):
Enumerazione di sola lettura.
Enumerazione che rimuove gli elementi quando vengono enumerati.
Supporto di limitazione e blocco
BlockingCollection<T> supporta la limitazione e il blocco. La limitazione consente di impostare la capacità massima dell'insieme. La limitazione è importante in determinati scenari poiché consente di controllare la dimensione massima dell'insieme in memoria. Inoltre, impedisce ai thread di tipo producer di spostarsi troppo oltre i thread di tipo consumer.
Più thread o attività possono aggiungere contemporaneamente elementi all'insieme. In tal caso, se viene raggiunta la capacità massima specificata per l'insieme, i thread di tipo producer si bloccheranno finché non viene rimosso un elemento. Più consumer possono rimuovere contemporaneamente elementi. In tal caso, se l'insieme diventa vuoto, i thread di tipo consumer si bloccheranno finché un producer non aggiunge un elemento. Un thread di tipo producer può chiamare CompleteAdding per indicare che non verranno aggiunti altri elementi. I consumer monitorano la proprietà IsCompleted per sapere quando l'insieme è vuoto e non verranno aggiunti altri elementi. Nell'esempio seguente viene mostrato un oggetto BlockingCollection semplice con un limite di capacità di 100. Un'attività di tipo producer aggiunge elementi all'insieme finché una condizione esterna è vera, quindi chiama CompleteAdding. L'attività di tipo consumer accetta elementi finché la proprietà IsCompleted non è true.
' A bounded collection. It can hold no more
' than 100 items at once.
Dim dataItems = New BlockingCollection(Of Data)(100)
' A simple blocking consumer with no cancellation.
Task.Factory.StartNew(Sub()
While dataItems.IsCompleted = False
Dim dataItem As Data = Nothing
Try
dataItem = dataItems.Take()
Catch e As InvalidOperationException
' IOE means that Take() was called on a completed collection.
' In this example, we can simply catch the exception since the
' loop will break on the next iteration.
End Try
If (dataItem IsNot Nothing) Then
Process(dataItem)
End If
End While
Console.WriteLine(vbCrLf & "No more items to take.")
End Sub)
' A simple blocking producer with no cancellation.
Task.Factory.StartNew(Sub()
While moreItemsToAdd = True
Dim item As Data = GetData()
' Blocks if numbers.Count = dataItems.BoundedCapacity
dataItems.Add(item)
End While
' Let consumer know we are done.
dataItems.CompleteAdding()
End Sub)
// A bounded collection. It can hold no more
// than 100 items at once.
BlockingCollection<Data> dataItems = new BlockingCollection<Data>(100);
// A simple blocking consumer with no cancellation.
Task.Factory.StartNew(() =>
{
while (!dataItems.IsCompleted)
{
Data data = null;
// Blocks if number.Count == 0
// IOE means that Take() was called on a completed collection.
// Some other thread can call CompleteAdding after we pass the
// IsCompleted check but before we call Take.
// In this example, we can simply catch the exception since the
// loop will break on the next iteration.
try
{
data = dataItems.Take();
}
catch (InvalidOperationException) { }
if (data != null)
{
Process(data);
}
}
Console.WriteLine("\r\nNo more items to take.");
});
// A simple blocking producer with no cancellation.
Task.Factory.StartNew(() =>
{
while (moreItemsToAdd)
{
Data data = GetData();
// Blocks if numbers.Count == dataItems.BoundedCapacity
dataItems.Add(data);
}
// Let consumer know we are done.
dataItems.CompleteAdding();
});
Per un esempio completo, vedere Procedura: aggiungere e rimuovere singoli elementi di un oggetto BlockingCollection.
Operazioni di blocco temporizzate
Nelle operazioni di blocco temporizzate TryAdd e TryTake sugli insiemi limitati, il metodo tenta di aggiungere o rimuovere un elemento. Se è disponibile un elemento, questo viene posizionato nella variabile passata per riferimento e il metodo restituisce true. Se dopo un periodo di timeout specificato non viene recuperato alcun elemento, il metodo restituisce false. Il thread è quindi libero di fare altre operazioni utili prima di ritentare di accedere all'insieme. Per un esempio di accesso con blocco temporizzato, vedere il secondo esempio di Procedura: aggiungere e rimuovere singoli elementi di un oggetto BlockingCollection.
Annullamento di operazioni di aggiunta e rimozione
In genere le operazioni di aggiunta e rimozione vengono eseguite in un ciclo. Per annullare un ciclo è possibile passare un CancellationToken al metodo TryAdd o TryTake, quindi controllare il valore della proprietà IsCancellationRequested del token in ogni iterazione. Se il valore è true è possibile decidere se rispondere alla richiesta di annullamento pulendo qualsiasi risorsa e uscendo dal ciclo. Nell'esempio seguente viene illustrato un overload di TryAdd che accetta un token di annullamento. Viene inoltre mostrato il codice che lo utilizza:
Do While moreItems = True
' Cancellation causes OCE. We know how to handle it.
Try
success = bc.TryAdd(itemToAdd, 2, ct)
Catch ex As OperationCanceledException
bc.CompleteAdding()
Exit Do
End Try
Loop
do
{
// Cancellation causes OCE. We know how to handle it.
try
{
success = bc.TryAdd(itemToAdd, 2, ct);
}
catch (OperationCanceledException)
{
bc.CompleteAdding();
break;
}
//...
} while (moreItems == true);
Per un esempio di come aggiungere il supporto per l'annullamento, vedere il secondo esempio di Procedura: aggiungere e rimuovere singoli elementi di un oggetto BlockingCollection.
Specifica del tipo di insieme
Quando si crea un oggetto BlockingCollection<T>, oltre al limite di capacità è possibile specificare anche il tipo di insieme da utilizzare. Ad esempio, è possibile specificare un oggetto ConcurrentConcurrentQueue per applicare un comportamento first in-first out (FIFO) o un oggetto ConcurrentStack<T> per applicare un comportamento last in-first out (LIFO). È possibile utilizzare una classe di insiemi che implementa l'interfaccia IProducerConsumerCollection<T>. Il tipo di insieme predefinito per BlockingCollection<T> è ConcurrentQueue<T>. Nell'esempio di codice seguente viene mostrato come creare un oggetto BlockingCollection<T> di stringhe avente una capacità di 1000 e che utilizza un oggetto ConcurrentBag:
Dim bc = New BlockingCollection(Of String)(New ConcurrentBag(Of String()), 1000)
BlockingCollection<string> bc = new BlockingCollection<string>(new ConcurrentBag<string>(), 1000 );
Per ulteriori informazioni, vedere Procedura: aggiungere funzionalità di delimitazione e blocco a una classe di insiemi.
Supporto di IEnumerable
BlockingCollection<T> fornisce un metodo GetConsumingEnumerable che consente ai consumer di utilizzare foreach (For Each in Visual Basic) per rimuovere elementi fino al completamento dell'insieme, ovvero finché non è vuoto e non verranno aggiunti altri elementi. Per ulteriori informazioni, vedere Procedura: utilizzare ForEach per rimuovere elementi in un oggetto BlockingCollection.
Utilizzo di molti oggetti BlockingCollections come uno solo
Per gli scenari in cui un consumer deve rimuovere elementi simultaneamente da più insiemi è possibile creare matrici di BlockingCollection<T> e utilizzare i metodi statici quali TakeFromAny e AddToAny che eseguiranno operazioni di aggiunta o rimozione in qualsiasi insieme nella matrice. Se un dato insieme è bloccato, il metodo tenta immediatamente di accedere a un altro insieme finché non ne individua uno che può eseguire l'operazione. Per ulteriori informazioni, vedere Procedura: utilizzare matrici di insiemi di blocco in una pipeline.