Use foreach para remover itens em um BlockingCollection
Além de tirar itens de um BlockingCollection<T> usando o Take método and TryTake , você também pode usar um foreach (For Each no Visual Basic) com o para remover itens até que a BlockingCollection<T>.GetConsumingEnumerable adição seja concluída e a coleção esteja vazia. Isso é chamado de enumeração mutante ou enumeração de consumo porque, ao contrário de um loop (For Each
) típico foreach
, esse enumerador modifica a coleção de origem removendo itens.
Exemplo
O exemplo a seguir mostra como remover todos os itens em um BlockingCollection<T> usando um foreach
loop (For Each
).
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
class Example
{
// Limit the collection size to 2000 items at any given time.
// Set itemsToProduce to > 500 to hit the limit.
const int UpperLimit = 1000;
// Adjust this number to see how it impacts the producing-consuming pattern.
const int ItemsToProduce = 100;
static readonly BlockingCollection<long> Collection =
new BlockingCollection<long>(UpperLimit);
// Variables for diagnostic output only.
static readonly Stopwatch Stopwatch = new Stopwatch();
static int TotalAdditions = 0;
static async Task Main()
{
Stopwatch.Start();
// Queue the consumer task.
var consumerTask = Task.Run(() => RunConsumer());
// Queue the producer tasks.
var produceTaskOne = Task.Run(() => RunProducer("A", 0));
var produceTaskTwo = Task.Run(() => RunProducer("B", ItemsToProduce));
var producerTasks = new[] { produceTaskOne , produceTaskTwo };
// Create a cleanup task that will call CompleteAdding after
// all producers are done adding items.
var cleanupTask = Task.Factory.ContinueWhenAll(producerTasks, _ => Collection.CompleteAdding());
// Wait for all tasks to complete
await Task.WhenAll(consumerTask, produceTaskOne, produceTaskTwo, cleanupTask);
// Keep the console window open while the
// consumer thread completes its output.
Console.WriteLine("Press any key to exit");
Console.ReadKey(true);
}
static void RunProducer(string id, int start)
{
var additions = 0;
for (var i = start; i < start + ItemsToProduce; i++)
{
// The data that is added to the collection.
var ticks = Stopwatch.ElapsedTicks;
// Display additions and subtractions.
Console.WriteLine($"{id} adding tick value {ticks}. item# {i}");
if (!Collection.IsAddingCompleted)
{
Collection.Add(ticks);
}
// Counter for demonstration purposes only.
additions++;
// Comment this line to speed up the producer threads.
Thread.SpinWait(100000);
}
Interlocked.Add(ref TotalAdditions, additions);
Console.WriteLine($"{id} is done adding: {additions} items");
}
static void RunConsumer()
{
// GetConsumingEnumerable returns the enumerator for the underlying collection.
var subtractions = 0;
foreach (var item in Collection.GetConsumingEnumerable())
{
Console.WriteLine(
$"Consuming tick value {item:D18} : item# {subtractions++} : current count = {Collection.Count}");
}
Console.WriteLine(
$"Total added: {TotalAdditions} Total consumed: {subtractions} Current count: {Collection.Count}");
Stopwatch.Stop();
}
}
Option Strict On
Option Explicit On
Imports System.Diagnostics
Imports System.Threading
Imports System.Threading.Tasks
Imports System.Collections.Concurrent
Module EnumerateBC
Class Program
' Limit the collection size to 2000 items
' at any given time. Set itemsToProduce to >500
' to hit the limit.
Const upperLimit As Integer = 1000
' Adjust this number to see how it impacts
' the producing-consuming pattern.
Const itemsToProduce As Integer = 100
Shared collection As BlockingCollection(Of Long) = New BlockingCollection(Of Long)(upperLimit)
' Variables for diagnostic output only.
Shared sw As New Stopwatch()
Shared totalAdditions As Integer = 0
' Counter for synchronizing producers.
Shared producersStillRunning As Integer = 2
Shared Sub Main()
' Start the stopwatch.
sw.Start()
' Queue the Producer threads.
Dim task1 = Task.Factory.StartNew(Sub() RunProducer("A", 0))
Dim task2 = Task.Factory.StartNew(Sub() RunProducer("B", itemsToProduce))
' Store in an array for use with ContinueWhenAll
Dim producers() As Task = {task1, task2}
' Create a cleanup task that will call CompleteAdding after
' all producers are done adding items.
Dim cleanup As Task = Task.Factory.ContinueWhenAll(producers, Sub(p) collection.CompleteAdding())
' Queue the Consumer thread. Put this call
' before Parallel.Invoke to begin consuming as soon as
' the producers add items.
Task.Factory.StartNew(Sub() RunConsumer())
' Keep the console window open while the
' consumer thread completes its output.
Console.ReadKey()
End Sub
Shared Sub RunProducer(ByVal ID As String, ByVal start As Integer)
Dim additions As Integer = 0
For i As Integer = start To start + itemsToProduce - 1
' The data that is added to the collection.
Dim ticks As Long = sw.ElapsedTicks
'Display additions and subtractions.
Console.WriteLine("{0} adding tick value {1}. item# {2}", ID, ticks, i)
' Don't try to add item after CompleteAdding
' has been called.
If collection.IsAddingCompleted = False Then
collection.Add(ticks)
End If
' Counter for demonstration purposes only.
additions = additions + 1
' Uncomment this line to
' slow down the producer threads without sleeping.
Thread.SpinWait(100000)
Next
Interlocked.Add(totalAdditions, additions)
Console.WriteLine("{0} is done adding: {1} items", ID, additions)
End Sub
Shared Sub RunConsumer()
' GetConsumingEnumerable returns the enumerator for the
' underlying collection.
Dim subtractions As Integer = 0
For Each item In collection.GetConsumingEnumerable
subtractions = subtractions + 1
Console.WriteLine("Consuming tick value {0} : item# {1} : current count = {2}",
item.ToString("D18"), subtractions, collection.Count)
Next
Console.WriteLine("Total added: {0} Total consumed: {1} Current count: {2} ",
totalAdditions, subtractions, collection.Count())
sw.Stop()
Console.WriteLine("Press any key to exit.")
End Sub
End Class
End Module
Este exemplo usa um foreach
loop com o BlockingCollection<T>.GetConsumingEnumerable método no thread de consumo, que faz com que cada item seja removido da coleção conforme é enumerado. System.Collections.Concurrent.BlockingCollection<T> limita o número máximo de itens que estão na coleção a qualquer momento. Enumerar a coleção dessa maneira bloqueia o thread do consumidor se nenhum item estiver disponível ou se a coleção estiver vazia. Neste exemplo, o bloqueio não é uma preocupação porque o thread do produtor adiciona itens mais rápido do que eles podem ser consumidos.
As BlockingCollection<T>.GetConsumingEnumerable devoluções de um IEnumerable<T>
, portanto, a ordem não pode ser garantida. No entanto, internamente a System.Collections.Concurrent.ConcurrentQueue<T> é usado como o tipo de coleção subjacente - que irá retirar objetos da fila após a ordem FIFO (first-in-first-out). Se forem feitas chamadas BlockingCollection<T>.GetConsumingEnumerable simultâneas, eles competirão. Um item consumido (retirado da fila) em uma enumeração não pode ser observado na outra.
Para enumerar a coleção sem modificá-la, basta usar foreach
(For Each
) sem o GetConsumingEnumerable método. No entanto, é importante entender que esse tipo de enumeração representa um instantâneo da coleção em um ponto preciso no tempo. Se outros threads estiverem adicionando ou removendo itens simultaneamente enquanto você estiver executando o loop, o loop pode não representar o estado real da coleção.