Поделиться через


Общие сведения о коллекции BlockingCollection

BlockingCollection<T> — это потокобезопасный класс коллекции, обеспечивающий следующие возможности:

  • реализует шаблон "производитель-получатель";

  • поддерживает параллельное добавление и извлечение элементов из нескольких потоков;

  • допускает указание максимальной емкости;

  • поддерживает операции вставки и удаления, блокирующиеся при опустошении или заполнении коллекции;

  • поддерживает условные операции вставки и удалении, не блокирующиеся или блокирующиеся лишь на определенное время;

  • инкапсулирует все типы коллекций, реализующие интерфейс IProducerConsumerCollection<T>;

  • поддерживает отмену с помощью токенов отмены;

  • поддерживает два вида перечисления с помощью оператора foreach (For Each в Visual Basic):

    1. перечисление "только для чтения";

    2. перечисление, при котором элементы по мере перечисления удаляются.

Поддержка границ и блокировки

BlockingCollection<T> поддерживает границы и блокировку. Поддержка границ означает возможность задать максимальную емкость коллекции. Границы важны в ряде сценариев, поскольку они позволяют контролировать максимальный размер коллекции в памяти, предотвращая ситуации, в которых потоки-создатели слишком сильно обгоняют потоки-потребители.

Элементы коллекции могут параллельно добавляться из нескольких задач или потоков. Если коллекция достигает максимальной емкости, то потоки-создатели перейдут в состояние блокировки, пока не будет удален хотя бы один элемент. Элементы коллекции могут параллельно удаляться несколькими потребителями. Если коллекция становится пустой, то потоки-потребители перейдут в состояние блокировки, пока поток-создатель не добавит хотя бы один элемент. Поток-создатель может вызвать метод CompleteAdding, чтобы указать, что он больше не будет добавлять элементы. Потребители могут отслеживать свойство IsCompleted, позволяющее определить, что коллекция опустела, а новые элементы добавляться не будут. В следующем примере демонстрируется простая коллекция BlockingCollection с максимальной емкостью в 100 элементов. Задача-создатель добавляет элементы в коллекцию на протяжении всего времени сохранения истинности некоторого внешнего условия, а после этого вызывает метод CompleteAdding. Задача-потребитель извлекает элементы, пока свойство IsCompleted не примет логическое значение "true".

// A bounded collection. It can hold no more
// than 100 items at once.
BlockingCollection<Data> dataItems = new BlockingCollection<Data>(100);

// A simple blocking consumer with no cancellation.
Task.Run(() =>
{
    while (!dataItems.IsCompleted)
    {

        Data data = null;
        // Blocks if dataItems.Count == 0.
        // IOE means that Take() was called on a completed collection.
        // Some other thread can call CompleteAdding after we pass the
        // IsCompleted check but before we call Take.
        // In this example, we can simply catch the exception since the
        // loop will break on the next iteration.
        try
        {
            data = dataItems.Take();
        }
        catch (InvalidOperationException) { }

        if (data != null)
        {
            Process(data);
        }
    }
    Console.WriteLine("\r\nNo more items to take.");
});

// A simple blocking producer with no cancellation.
Task.Run(() =>
{
    while (moreItemsToAdd)
    {
        Data data = GetData();
        // Blocks if numbers.Count == dataItems.BoundedCapacity
        dataItems.Add(data);
    }
    // Let consumer know we are done.
    dataItems.CompleteAdding();
});

' A bounded collection. It can hold no more 
' than 100 items at once.
Dim dataItems = New BlockingCollection(Of Data)(100)

' A simple blocking consumer with no cancellation.
Task.Factory.StartNew(Sub()
                          While dataItems.IsCompleted = False
                              Dim dataItem As Data = Nothing
                              Try
                                  dataItem = dataItems.Take()
                              Catch e As InvalidOperationException
                                  ' IOE means that Take() was called on a completed collection.
                                  ' In this example, we can simply catch the exception since the 
                                  ' loop will break on the next iteration.
                              End Try
                              If (dataItem IsNot Nothing) Then
                                  Process(dataItem)
                              End If
                          End While
                          Console.WriteLine(vbCrLf & "No more items to take.")
                      End Sub)

' A simple blocking producer with no cancellation.
Task.Factory.StartNew(Sub()
                          While moreItemsToAdd = True
                              Dim item As Data = GetData()

                              ' Blocks if dataItems.Count = dataItems.BoundedCapacity.
                              dataItems.Add(item)
                          End While

                          ' Let consumer know we are done.
                          dataItems.CompleteAdding()
                      End Sub)

Полный пример см. в разделе Практическое руководство. Добавление и удаление отдельных элементов коллекции BlockingCollection.

Операции с временной блокировкой

При использовании операций TryAdd и TryTake ограниченных коллекций, использующих временную блокировку, метод предпринимает попытку добавления или удаления элемента. Если элемент доступен, то он заносится в указанную по ссылке переменную, после чего метод возвращает значение true. Если по прошествии определенного времени ожидания элемент так и не удается извлечь, метод возвращает значение false. После этого поток может перейти к другой полезной работе, прежде чем повторить попытку обращения к коллекции. Пример доступа с временной блокировкой см. во втором примере из раздела Практическое руководство. Добавление и удаление отдельных элементов коллекции BlockingCollection.

Отмена операций Add и Take

Операции Add и Take обычно выполняются в цикле. Отменить цикл можно, передав в метод TryAdd или TryTake токен CancellationToken и проверяя значение его свойства IsCancellationRequested на каждой итерации. Если значение имеет значение true, то вы можете ответить на запрос отмены, очищая все ресурсы и выход из цикла. В следующем примере показана перегрузка метода TryAdd, которая принимает токен отмены, а также код, использующий эту перегрузку:

do
{
    // Cancellation causes OCE. We know how to handle it.
    try
    {
        success = bc.TryAdd(itemToAdd, 2, ct);
    }
    catch (OperationCanceledException)
    {
        bc.CompleteAdding();
        break;
    }
    //...
} while (moreItems == true);
Do While moreItems = True
    ' Cancellation causes OCE. We know how to handle it.
    Try
        success = bc.TryAdd(itemToAdd, 2, ct)
    Catch ex As OperationCanceledException
        bc.CompleteAdding()
        Exit Do
    End Try
Loop

Пример добавления поддержки отмены см. во втором примере из статьи Практическое руководство. Добавление и удаление отдельных элементов коллекции BlockingCollection.

Указание типа коллекции

При создании класса BlockingCollection<T> можно указать не только ограничиваемую емкость, но и тип используемой коллекции. Например, можно задать объект ConcurrentQueue<T> для использования принципа "первым поступил — первым обслужен" или объект ConcurrentStack<T> для использования принципа "последним поступил — первым обслужен". Использовать можно любой класс коллекции, реализующий интерфейс IProducerConsumerCollection<T>. Тип коллекции, используемый в классе BlockingCollection<T> по умолчанию — это ConcurrentQueue<T>. В следующем примере кода показано, как создать строковую коллекцию BlockingCollection<T> емкостью в 1000 элементов, которая использует класс ConcurrentBag<T>:

Dim bc = New BlockingCollection(Of String)(New ConcurrentBag(Of String()), 1000)  
BlockingCollection<string> bc = new BlockingCollection<string>(new ConcurrentBag<string>(), 1000 );  

Дополнительные сведения см. в статье Практическое руководство. Добавление функций границы и блокировки в коллекцию.

Поддержка интерфейса IEnumerable

BlockingCollection<T> предоставляет метод GetConsumingEnumerable, позволяющий потребителям использовать оператор foreach (For Each в Visual Basic) для удаления элементов коллекции до тех пор, пока коллекция не будет исчерпана (то есть достигнет состояния, в котором коллекция пуста и новые элементы не будут добавляться). Дополнительные сведения см. в разделе Практическое руководство. Использование оператора ForEach для удаления элементов в коллекции BlockingCollection.

Использование нескольких коллекций BlockingCollection в качестве одной коллекции

В сценариях, где потребителю необходимо одновременно извлекать элементы из нескольких коллекций, можно создавать массивы BlockingCollection<T> и использовать такие статические методы, как TakeFromAny и AddToAny, позволяющие добавлять и извлекать элементы любой коллекции в массиве. Если одна из коллекций заблокирована, метод немедленно переходит к другой, пока не найдет коллекцию, способную выполнить операцию. Дополнительные сведения см. в разделе Практическое руководство. Использование массивов для блокировки коллекций в конвейере.

См. также