BlockingCollection — Przegląd
BlockingCollection<T> to klasa kolekcji bezpieczna wątkowo, która udostępnia następujące funkcje:
Implementacja wzorca producent-konsument.
Równoczesne dodawanie i pobieranie elementów z wielu wątków.
Opcjonalna maksymalna pojemność.
Operacje wstawiania i usuwania, które blokują, gdy kolekcja jest pusta lub pełna.
Operacje wstawiania i usuwania "try", które nie blokują lub blokują maksymalnie określony okres czasu.
Hermetyzuje dowolny typ kolekcji, który implementuje IProducerConsumerCollection<T>
Anulowanie z tokenami anulowania.
Dwa rodzaje wyliczenia z
foreach
(For Each
w Visual Basic):Wyliczenie tylko do odczytu.
Wyliczenie, które usuwa elementy podczas wyliczania.
Obsługa ograniczenia i blokowania
BlockingCollection<T> obsługuje ograniczenia i blokowanie. Ograniczenie oznacza, że można ustawić maksymalną pojemność kolekcji. Ograniczenie jest ważne w niektórych scenariuszach, ponieważ umożliwia kontrolowanie maksymalnego rozmiaru kolekcji w pamięci i uniemożliwia tworzenie wątków zbyt daleko przed wątkami zużywającym.
Wiele wątków lub zadań może jednocześnie dodawać elementy do kolekcji, a jeśli kolekcja osiągnie określoną maksymalną pojemność, wątki tworzące będą blokowane do momentu usunięcia elementu. Wielu użytkowników może jednocześnie usuwać elementy, a jeśli kolekcja stanie się pusta, wątki zużywające będą blokowane, dopóki producent nie doda elementu. Wątek tworzący może wywołać CompleteAdding polecenie , aby wskazać, że nie zostaną dodane żadne elementy. Konsumenci monitorują IsCompleted właściwość, aby wiedzieć, kiedy kolekcja jest pusta i nie zostaną dodane żadne elementy. W poniższym przykładzie pokazano prostą wartość BlockingCollection z ograniczoną pojemnością 100. Zadanie producenta dodaje elementy do kolekcji, o ile jakiś warunek zewnętrzny jest spełniony, a następnie wywołuje metodę CompleteAdding. Zadanie odbiorcy przyjmuje elementy do momentu, gdy IsCompleted właściwość ma wartość true.
// A bounded collection. It can hold no more
// than 100 items at once.
BlockingCollection<Data> dataItems = new BlockingCollection<Data>(100);
// A simple blocking consumer with no cancellation.
Task.Run(() =>
{
while (!dataItems.IsCompleted)
{
Data data = null;
// Blocks if dataItems.Count == 0.
// IOE means that Take() was called on a completed collection.
// Some other thread can call CompleteAdding after we pass the
// IsCompleted check but before we call Take.
// In this example, we can simply catch the exception since the
// loop will break on the next iteration.
try
{
data = dataItems.Take();
}
catch (InvalidOperationException) { }
if (data != null)
{
Process(data);
}
}
Console.WriteLine("\r\nNo more items to take.");
});
// A simple blocking producer with no cancellation.
Task.Run(() =>
{
while (moreItemsToAdd)
{
Data data = GetData();
// Blocks if numbers.Count == dataItems.BoundedCapacity
dataItems.Add(data);
}
// Let consumer know we are done.
dataItems.CompleteAdding();
});
' A bounded collection. It can hold no more
' than 100 items at once.
Dim dataItems = New BlockingCollection(Of Data)(100)
' A simple blocking consumer with no cancellation.
Task.Factory.StartNew(Sub()
While dataItems.IsCompleted = False
Dim dataItem As Data = Nothing
Try
dataItem = dataItems.Take()
Catch e As InvalidOperationException
' IOE means that Take() was called on a completed collection.
' In this example, we can simply catch the exception since the
' loop will break on the next iteration.
End Try
If (dataItem IsNot Nothing) Then
Process(dataItem)
End If
End While
Console.WriteLine(vbCrLf & "No more items to take.")
End Sub)
' A simple blocking producer with no cancellation.
Task.Factory.StartNew(Sub()
While moreItemsToAdd = True
Dim item As Data = GetData()
' Blocks if dataItems.Count = dataItems.BoundedCapacity.
dataItems.Add(item)
End While
' Let consumer know we are done.
dataItems.CompleteAdding()
End Sub)
Pełny przykład można znaleźć w temacie How to: Add and Take Items Individually from a BlockingCollection (Jak dodawać i przyjmować elementy indywidualnie z kolekcji BlockingCollection).
Operacje blokowania czasowego
W przypadku czasowego blokowania TryAdd i TryTake operacji w powiązanych kolekcjach metoda próbuje dodać lub podjąć element. Jeśli element jest dostępny, jest umieszczany w zmiennej, która została przekazana przez odwołanie, a metoda zwraca wartość true. Jeśli żaden element nie zostanie pobrany po upływie określonego limitu czasu, metoda zwraca wartość false. Następnie wątek jest bezpłatny, aby wykonać inną przydatną pracę przed ponowną próbą uzyskania dostępu do kolekcji. Aby zapoznać się z przykładem czasu blokowania dostępu, zobacz drugi przykład w temacie How to: Add and Take Items Individually from a BlockingCollection (Jak dodawać i przyjmować elementy indywidualnie z kolekcji BlockingCollection).
Anulowanie operacji dodawania i wykonywania
Operacje dodawania i wykonywania są zwykle wykonywane w pętli. Pętlę można anulować, przekazując CancellationToken element do TryAdd metody lub TryTake , a następnie sprawdzając wartość właściwości tokenu IsCancellationRequested w każdej iteracji. Jeśli wartość ma wartość true, oznacza to, że odpowiadasz na żądanie anulowania, czyszcząc wszystkie zasoby i zamykając pętlę. W poniższym przykładzie pokazano przeciążenie TryAdd , które przyjmuje token anulowania i kod, który go używa:
do
{
// Cancellation causes OCE. We know how to handle it.
try
{
success = bc.TryAdd(itemToAdd, 2, ct);
}
catch (OperationCanceledException)
{
bc.CompleteAdding();
break;
}
//...
} while (moreItems == true);
Do While moreItems = True
' Cancellation causes OCE. We know how to handle it.
Try
success = bc.TryAdd(itemToAdd, 2, ct)
Catch ex As OperationCanceledException
bc.CompleteAdding()
Exit Do
End Try
Loop
Aby zapoznać się z przykładem dodawania obsługi anulowania, zobacz drugi przykład w temacie How to: Add and Take Items Individually from a BlockingCollection (Jak dodawać i przyjmować elementy indywidualnie z kolekcji BlockingCollection).
Określanie typu kolekcji
Podczas tworzenia obiektu BlockingCollection<T>można określić nie tylko powiązaną pojemność, ale także typ kolekcji do użycia. Można na przykład określić ConcurrentQueue<T> zachowanie pierwszego wyjścia (FIFO, first in-first out) lub ConcurrentStack<T> zachowanie ostatniego wyjścia (LIFO). Możesz użyć dowolnej klasy kolekcji, która implementuje IProducerConsumerCollection<T> interfejs. Domyślnym typem kolekcji dla elementu BlockingCollection<T> jest ConcurrentQueue<T>. Poniższy przykład kodu pokazuje, jak utworzyć BlockingCollection<T> ciągi o pojemności 1000 i używa elementu ConcurrentBag<T>:
Dim bc = New BlockingCollection(Of String)(New ConcurrentBag(Of String()), 1000)
BlockingCollection<string> bc = new BlockingCollection<string>(new ConcurrentBag<string>(), 1000 );
Aby uzyskać więcej informacji, zobacz How to: Add Bounding and Blocking Functionality to a Collection (Instrukcje: dodawanie funkcji ograniczenia i blokowania do kolekcji).
Obsługa IEnumerable
BlockingCollection<T> Udostępnia metodę umożliwiającą GetConsumingEnumerable użytkownikom używanie foreach
(For Each
w Visual Basic) usuwania elementów do momentu ukończenia kolekcji, co oznacza, że jest ona pusta i nie zostaną dodane żadne elementy. Aby uzyskać więcej informacji, zobacz How to: Use ForEach to Remove Items in a BlockingCollection (Instrukcje: usuwanie elementów forEach w kolekcji BlockingCollection).
Używanie wielu kolekcji BlockingCollections jako jednego
W przypadku scenariuszy, w których użytkownik musi pobrać elementy z wielu kolekcji jednocześnie, można utworzyć tablice BlockingCollection<T> i użyć metod statycznych, takich jak TakeFromAny i AddToAny które zostaną dodane do dowolnej kolekcji w tablicy lub do niego pobrać. Jeśli jedna kolekcja blokuje, metoda natychmiast próbuje innego, dopóki nie znajdzie tej, która może wykonać operację. Aby uzyskać więcej informacji, zobacz How to: Use Arrays of Blocking Collections in a Pipeline (Instrukcje: używanie tablic kolekcji blokujących w potoku).