Использование оператора foreach для удаления элементов из коллекции BlockingCollection
Помимо извлечения элементов из коллекции BlockingCollection<T> с помощью методов Take и TryTake, можно использовать цикл foreach (For Each в Visual Basic) с BlockingCollection<T>.GetConsumingEnumerable для удаления элементов до тех пор, пока добавление не будет завершено и коллекция не станет пустой. Это называется изменяющим перечислением или поглощающим перечислением, поскольку, в отличие от типичного цикла foreach
(For Each
), этот перечислитель изменяет исходную коллекцию путем удаления элементов.
Пример
В приведенном ниже примере показано, как удалить все элементы в классе BlockingCollection<T>, используя цикл foreach
(For Each
).
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
class Example
{
// Limit the collection size to 2000 items at any given time.
// Set itemsToProduce to > 500 to hit the limit.
const int UpperLimit = 1000;
// Adjust this number to see how it impacts the producing-consuming pattern.
const int ItemsToProduce = 100;
static readonly BlockingCollection<long> Collection =
new BlockingCollection<long>(UpperLimit);
// Variables for diagnostic output only.
static readonly Stopwatch Stopwatch = new Stopwatch();
static int TotalAdditions = 0;
static async Task Main()
{
Stopwatch.Start();
// Queue the consumer task.
var consumerTask = Task.Run(() => RunConsumer());
// Queue the producer tasks.
var produceTaskOne = Task.Run(() => RunProducer("A", 0));
var produceTaskTwo = Task.Run(() => RunProducer("B", ItemsToProduce));
var producerTasks = new[] { produceTaskOne , produceTaskTwo };
// Create a cleanup task that will call CompleteAdding after
// all producers are done adding items.
var cleanupTask = Task.Factory.ContinueWhenAll(producerTasks, _ => Collection.CompleteAdding());
// Wait for all tasks to complete
await Task.WhenAll(consumerTask, produceTaskOne, produceTaskTwo, cleanupTask);
// Keep the console window open while the
// consumer thread completes its output.
Console.WriteLine("Press any key to exit");
Console.ReadKey(true);
}
static void RunProducer(string id, int start)
{
var additions = 0;
for (var i = start; i < start + ItemsToProduce; i++)
{
// The data that is added to the collection.
var ticks = Stopwatch.ElapsedTicks;
// Display additions and subtractions.
Console.WriteLine($"{id} adding tick value {ticks}. item# {i}");
if (!Collection.IsAddingCompleted)
{
Collection.Add(ticks);
}
// Counter for demonstration purposes only.
additions++;
// Comment this line to speed up the producer threads.
Thread.SpinWait(100000);
}
Interlocked.Add(ref TotalAdditions, additions);
Console.WriteLine($"{id} is done adding: {additions} items");
}
static void RunConsumer()
{
// GetConsumingEnumerable returns the enumerator for the underlying collection.
var subtractions = 0;
foreach (var item in Collection.GetConsumingEnumerable())
{
Console.WriteLine(
$"Consuming tick value {item:D18} : item# {subtractions++} : current count = {Collection.Count}");
}
Console.WriteLine(
$"Total added: {TotalAdditions} Total consumed: {subtractions} Current count: {Collection.Count}");
Stopwatch.Stop();
}
}
Option Strict On
Option Explicit On
Imports System.Diagnostics
Imports System.Threading
Imports System.Threading.Tasks
Imports System.Collections.Concurrent
Module EnumerateBC
Class Program
' Limit the collection size to 2000 items
' at any given time. Set itemsToProduce to >500
' to hit the limit.
Const upperLimit As Integer = 1000
' Adjust this number to see how it impacts
' the producing-consuming pattern.
Const itemsToProduce As Integer = 100
Shared collection As BlockingCollection(Of Long) = New BlockingCollection(Of Long)(upperLimit)
' Variables for diagnostic output only.
Shared sw As New Stopwatch()
Shared totalAdditions As Integer = 0
' Counter for synchronizing producers.
Shared producersStillRunning As Integer = 2
Shared Sub Main()
' Start the stopwatch.
sw.Start()
' Queue the Producer threads.
Dim task1 = Task.Factory.StartNew(Sub() RunProducer("A", 0))
Dim task2 = Task.Factory.StartNew(Sub() RunProducer("B", itemsToProduce))
' Store in an array for use with ContinueWhenAll
Dim producers() As Task = {task1, task2}
' Create a cleanup task that will call CompleteAdding after
' all producers are done adding items.
Dim cleanup As Task = Task.Factory.ContinueWhenAll(producers, Sub(p) collection.CompleteAdding())
' Queue the Consumer thread. Put this call
' before Parallel.Invoke to begin consuming as soon as
' the producers add items.
Task.Factory.StartNew(Sub() RunConsumer())
' Keep the console window open while the
' consumer thread completes its output.
Console.ReadKey()
End Sub
Shared Sub RunProducer(ByVal ID As String, ByVal start As Integer)
Dim additions As Integer = 0
For i As Integer = start To start + itemsToProduce - 1
' The data that is added to the collection.
Dim ticks As Long = sw.ElapsedTicks
'Display additions and subtractions.
Console.WriteLine("{0} adding tick value {1}. item# {2}", ID, ticks, i)
' Don't try to add item after CompleteAdding
' has been called.
If collection.IsAddingCompleted = False Then
collection.Add(ticks)
End If
' Counter for demonstration purposes only.
additions = additions + 1
' Uncomment this line to
' slow down the producer threads without sleeping.
Thread.SpinWait(100000)
Next
Interlocked.Add(totalAdditions, additions)
Console.WriteLine("{0} is done adding: {1} items", ID, additions)
End Sub
Shared Sub RunConsumer()
' GetConsumingEnumerable returns the enumerator for the
' underlying collection.
Dim subtractions As Integer = 0
For Each item In collection.GetConsumingEnumerable
subtractions = subtractions + 1
Console.WriteLine("Consuming tick value {0} : item# {1} : current count = {2}",
item.ToString("D18"), subtractions, collection.Count)
Next
Console.WriteLine("Total added: {0} Total consumed: {1} Current count: {2} ",
totalAdditions, subtractions, collection.Count())
sw.Stop()
Console.WriteLine("Press any key to exit.")
End Sub
End Class
End Module
Этот пример использует цикл foreach
совместно с методом BlockingCollection<T>.GetConsumingEnumerable в потоке-потребителе, что приводит к удалению каждого элемента из коллекции, как только он перечислен. System.Collections.Concurrent.BlockingCollection<T> ограничивает максимальное количество элементов, находящихся в коллекции в любой момент времени. Выполнение перечисления коллекции подобным образом блокирует поток-потребитель, если доступные элементы отсутствуют или коллекция является пустой. В этом примере блокировка не имеет значения, так как поток-производитель добавляет элементы быстрее, чем их может использовать потребитель.
BlockingCollection<T>.GetConsumingEnumerable возвращает IEnumerable<T>
, поэтому невозможно гарантировать перечисление в том же порядке. Однако в качестве базового типа коллекции внутренним образом используется System.Collections.Concurrent.ConcurrentQueue<T>, который будет выводить объекты из очереди по принципу "первым поступил — первым обслужен" (FIFO). Если выполняются одновременные вызовы BlockingCollection<T>.GetConsumingEnumerable, они будут конкурировать. Один использованный элемент (выведенный из очереди) в одном перечислении не может присутствовать в другом.
Для перечисления коллекции без ее изменения просто используйте оператор foreach
(For Each
) без использования метода GetConsumingEnumerable. Тем не менее важно понимать, что такой тип перечисления представляет снимок коллекции в конкретный момент времени. Если другие потоки добавят или удалят элементы параллельно с выполнением цикла, цикл может не отобразить фактическое состояние коллекции.