Общие сведения о коллекции BlockingCollection
BlockingCollection<T> — это потокобезопасный класс коллекции, обеспечивающий следующие возможности:
реализует шаблон "производитель-получатель";
поддерживает параллельное добавление и извлечение элементов из нескольких потоков;
допускает указание максимальной емкости;
поддерживает операции вставки и удаления, блокирующиеся при опустошении или заполнении коллекции;
поддерживает условные операции вставки и удалении, не блокирующиеся или блокирующиеся лишь на определенное время;
инкапсулирует все типы коллекций, реализующие интерфейс IProducerConsumerCollection<T>;
поддерживает отмену с помощью токенов отмены;
поддерживает два вида перечисления с помощью оператора
foreach
(For Each
в Visual Basic):перечисление "только для чтения";
перечисление, при котором элементы по мере перечисления удаляются.
Поддержка границ и блокировки
BlockingCollection<T> поддерживает границы и блокировку. Поддержка границ означает возможность задать максимальную емкость коллекции. Границы важны в ряде сценариев, поскольку они позволяют контролировать максимальный размер коллекции в памяти, предотвращая ситуации, в которых потоки-создатели слишком сильно обгоняют потоки-потребители.
Элементы коллекции могут параллельно добавляться из нескольких задач или потоков. Если коллекция достигает максимальной емкости, то потоки-создатели перейдут в состояние блокировки, пока не будет удален хотя бы один элемент. Элементы коллекции могут параллельно удаляться несколькими потребителями. Если коллекция становится пустой, то потоки-потребители перейдут в состояние блокировки, пока поток-создатель не добавит хотя бы один элемент. Поток-создатель может вызвать метод CompleteAdding, чтобы указать, что он больше не будет добавлять элементы. Потребители могут отслеживать свойство IsCompleted, позволяющее определить, что коллекция опустела, а новые элементы добавляться не будут. В следующем примере демонстрируется простая коллекция BlockingCollection с максимальной емкостью в 100 элементов. Задача-создатель добавляет элементы в коллекцию на протяжении всего времени сохранения истинности некоторого внешнего условия, а после этого вызывает метод CompleteAdding. Задача-потребитель извлекает элементы, пока свойство IsCompleted не примет логическое значение "true".
// A bounded collection. It can hold no more
// than 100 items at once.
BlockingCollection<Data> dataItems = new BlockingCollection<Data>(100);
// A simple blocking consumer with no cancellation.
Task.Run(() =>
{
while (!dataItems.IsCompleted)
{
Data data = null;
// Blocks if dataItems.Count == 0.
// IOE means that Take() was called on a completed collection.
// Some other thread can call CompleteAdding after we pass the
// IsCompleted check but before we call Take.
// In this example, we can simply catch the exception since the
// loop will break on the next iteration.
try
{
data = dataItems.Take();
}
catch (InvalidOperationException) { }
if (data != null)
{
Process(data);
}
}
Console.WriteLine("\r\nNo more items to take.");
});
// A simple blocking producer with no cancellation.
Task.Run(() =>
{
while (moreItemsToAdd)
{
Data data = GetData();
// Blocks if numbers.Count == dataItems.BoundedCapacity
dataItems.Add(data);
}
// Let consumer know we are done.
dataItems.CompleteAdding();
});
' A bounded collection. It can hold no more
' than 100 items at once.
Dim dataItems = New BlockingCollection(Of Data)(100)
' A simple blocking consumer with no cancellation.
Task.Factory.StartNew(Sub()
While dataItems.IsCompleted = False
Dim dataItem As Data = Nothing
Try
dataItem = dataItems.Take()
Catch e As InvalidOperationException
' IOE means that Take() was called on a completed collection.
' In this example, we can simply catch the exception since the
' loop will break on the next iteration.
End Try
If (dataItem IsNot Nothing) Then
Process(dataItem)
End If
End While
Console.WriteLine(vbCrLf & "No more items to take.")
End Sub)
' A simple blocking producer with no cancellation.
Task.Factory.StartNew(Sub()
While moreItemsToAdd = True
Dim item As Data = GetData()
' Blocks if dataItems.Count = dataItems.BoundedCapacity.
dataItems.Add(item)
End While
' Let consumer know we are done.
dataItems.CompleteAdding()
End Sub)
Полный пример см. в разделе Практическое руководство. Добавление и удаление отдельных элементов коллекции BlockingCollection.
Операции с временной блокировкой
При использовании операций TryAdd и TryTake ограниченных коллекций, использующих временную блокировку, метод предпринимает попытку добавления или удаления элемента. Если элемент доступен, то он заносится в указанную по ссылке переменную, после чего метод возвращает значение true. Если по прошествии определенного времени ожидания элемент так и не удается извлечь, метод возвращает значение false. После этого поток может перейти к другой полезной работе, прежде чем повторить попытку обращения к коллекции. Пример доступа с временной блокировкой см. во втором примере из раздела Практическое руководство. Добавление и удаление отдельных элементов коллекции BlockingCollection.
Отмена операций Add и Take
Операции Add и Take обычно выполняются в цикле. Отменить цикл можно, передав в метод TryAdd или TryTake токен CancellationToken и проверяя значение его свойства IsCancellationRequested на каждой итерации. Если значение имеет значение true, то вы можете ответить на запрос отмены, очищая все ресурсы и выход из цикла. В следующем примере показана перегрузка метода TryAdd, которая принимает токен отмены, а также код, использующий эту перегрузку:
do
{
// Cancellation causes OCE. We know how to handle it.
try
{
success = bc.TryAdd(itemToAdd, 2, ct);
}
catch (OperationCanceledException)
{
bc.CompleteAdding();
break;
}
//...
} while (moreItems == true);
Do While moreItems = True
' Cancellation causes OCE. We know how to handle it.
Try
success = bc.TryAdd(itemToAdd, 2, ct)
Catch ex As OperationCanceledException
bc.CompleteAdding()
Exit Do
End Try
Loop
Пример добавления поддержки отмены см. во втором примере из статьи Практическое руководство. Добавление и удаление отдельных элементов коллекции BlockingCollection.
Указание типа коллекции
При создании класса BlockingCollection<T> можно указать не только ограничиваемую емкость, но и тип используемой коллекции. Например, можно задать объект ConcurrentQueue<T> для использования принципа "первым поступил — первым обслужен" или объект ConcurrentStack<T> для использования принципа "последним поступил — первым обслужен". Использовать можно любой класс коллекции, реализующий интерфейс IProducerConsumerCollection<T>. Тип коллекции, используемый в классе BlockingCollection<T> по умолчанию — это ConcurrentQueue<T>. В следующем примере кода показано, как создать строковую коллекцию BlockingCollection<T> емкостью в 1000 элементов, которая использует класс ConcurrentBag<T>:
Dim bc = New BlockingCollection(Of String)(New ConcurrentBag(Of String()), 1000)
BlockingCollection<string> bc = new BlockingCollection<string>(new ConcurrentBag<string>(), 1000 );
Дополнительные сведения см. в статье Практическое руководство. Добавление функций границы и блокировки в коллекцию.
Поддержка интерфейса IEnumerable
BlockingCollection<T> предоставляет метод GetConsumingEnumerable, позволяющий потребителям использовать оператор foreach
(For Each
в Visual Basic) для удаления элементов коллекции до тех пор, пока коллекция не будет исчерпана (то есть достигнет состояния, в котором коллекция пуста и новые элементы не будут добавляться). Дополнительные сведения см. в разделе Практическое руководство. Использование оператора ForEach для удаления элементов в коллекции BlockingCollection.
Использование нескольких коллекций BlockingCollection в качестве одной коллекции
В сценариях, где потребителю необходимо одновременно извлекать элементы из нескольких коллекций, можно создавать массивы BlockingCollection<T> и использовать такие статические методы, как TakeFromAny и AddToAny, позволяющие добавлять и извлекать элементы любой коллекции в массиве. Если одна из коллекций заблокирована, метод немедленно переходит к другой, пока не найдет коллекцию, способную выполнить операцию. Дополнительные сведения см. в разделе Практическое руководство. Использование массивов для блокировки коллекций в конвейере.