Поделиться через


Using Async with ManualResetEventSlim

When I first started writing programs many years ago, my computer had only one processor and no notion of thread, because there was only one. Understanding code for it just meant following one “thread” of execution.

I was trying to create a way to queue items to work on in a different thread than the UI thread. I wanted the client thread to be very fast, allowing the expensive processing to be done in the background.
It’s easy to create your own dedicated thread, and ThreadPool threads are quite handy.
I have some old code that would do this by adding items to a queue and then set a ManualResetEventSlim to Signaled. A background thread waits for this event, and upon signaling, would go to work on the items in the queue.
However, this pattern consumes a dedicated or ThreadPool Thread to process the incoming events. That particular application was an automatically run test app, so nobody complained about its performance.

This can easily be shown by looking at the call stack in the debugger:
Using WatiHandle.WaitOne, or similar waits on a thread pool thread causes that thread to be busy waiting and thus consuming one ThreadPool Thread as seen in the debugger.
Here, even when using await, the background task says WaitOne, causing the entire thread to block:

                                 var x = new ManualResetEventSlim();
                                 await Task.Run(() => x.WaitHandle.WaitOne());

mscorlib.dll!System.Threading.WaitHandle.InternalWaitOne(System.Runtime.InteropServices.SafeHandle waitableSafeHandle, long millisecondsTimeout, bool hasThreadAffinity, bool exitContext)    Unknown
mscorlib.dll!System.Threading.WaitHandle.WaitOne(int millisecondsTimeout, bool exitContext)    Unknown
mscorlib.dll!System.Threading.WaitHandle.WaitOne()    Unknown
QueueTest.exe!QueueTest.MainWindow.MainWindow_Loaded.AnonymousMethod__5() Line 89    C#
mscorlib.dll!System.Threading.Tasks.Task<bool>.InnerInvoke()    Unknown
mscorlib.dll!System.Threading.Tasks.Task.Execute()    Unknown
mscorlib.dll!System.Threading.Tasks.Task.ExecutionContextCallback(object obj)    Unknown
mscorlib.dll!System.Threading.ExecutionContext.RunInternal(System.Threading.ExecutionContext executionContext, System.Threading.ContextCallback callback, object state, bool preserveSyncCtx)    Unknown
mscorlib.dll!System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext executionContext, System.Threading.ContextCallback callback, object state, bool preserveSyncCtx)    Unknown
mscorlib.dll!System.Threading.Tasks.Task.ExecuteWithThreadLocal(ref System.Threading.Tasks.Task currentTaskSlot)    Unknown
mscorlib.dll!System.Threading.Tasks.Task.ExecuteEntry(bool bPreventDoubleExecution)    Unknown
mscorlib.dll!System.Threading.Tasks.Task.System.Threading.IThreadPoolWorkItem.ExecuteWorkItem()    Unknown
mscorlib.dll!System.Threading.ThreadPoolWorkQueue.Dispatch()    Unknown
mscorlib.dll!System.Threading._ThreadPoolWaitCallback.PerformWaitCallback()    Unknown

Consuming a thread includes the entire thread stack (1 Meg by default). Each time a thread is created, every DLL in the process (that doesn’t call  DisableThreadLibraryCalls) will have its DLLMain called, which means a lot of processing just to initialize a thread.
A glance at Task Manager or Process Explorer shows many applications with dozens of threads, include Microsoft Edge and Visual Studio.

The number of simultaneous threads that can run at a time is limited to the number of logical processors. All the other threads are dormant. There are perhaps thousands of thread switches at any time on a machine, swapping one thread for another.

Using async and await without the WaitOne() and the ilk, a thread can be freed to do other work.

Threads should be thought of as a shared resource. Using one dedicated to a particular task can be inefficient.

Below is a sample using AyncManualResetEvent, which does not consume any dedicated threads for background processing.

 

See also:
ThreadPool.RegisterWaitForSingleObject  Method

<code>

 
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Windows;
using System.Windows.Controls;
using System.Windows.Data;
using System.Windows.Documents;
using System.Windows.Input;
using System.Windows.Media;
using System.Windows.Media.Imaging;
using System.Windows.Navigation;
using System.Windows.Shapes;
 
namespace QueueTest
{
    /// <summary>
    /// Interaction logic for MainWindow.xaml
    /// </summary>
    public partial class MainWindow : Window
    {
        public MainWindow()
        {
            InitializeComponent();
            this.Loaded += MainWindow_Loaded;
        }
        private void MainWindow_Loaded(object sender, RoutedEventArgs e)
        {
            try
            {
                var dp = new DockPanel();
                this.Content = dp;
                var spControls = new StackPanel()
                {
                    Orientation = Orientation.Vertical
                };
                dp.Children.Add(spControls);
                var btnGo = new Button()
                {
                    Content = "_Go",
                    HorizontalAlignment = HorizontalAlignment.Left,
                    Width = 100
                };
                spControls.Children.Add(btnGo);
 
                var txtStatus = new TextBox()
                {
                    Margin = new Thickness(10, 0, 0, 0),
                    IsReadOnly = true,
                    //                    Width = 900,
                    //Height = 100,
                    MaxHeight = 800,
                    VerticalScrollBarVisibility = ScrollBarVisibility.Auto
                };
                Action<string> addStatusMsg = (str) =>
                {
                    str = DateTime.Now.ToString("hh:mm:ss.fff") + $" tid= {Thread.CurrentThread.ManagedThreadId} {str}";
                    Dispatcher.BeginInvoke(new Action(
                        () =>
                        {
                            txtStatus.AppendText($"{str}\r\n");
                            txtStatus.ScrollToEnd();
                        }
                        ));
                };
                dp.LastChildFill = true;
                dp.Children.Add(txtStatus);
                addStatusMsg("We have 2 queues: one for normal telemetry and one for notification service");
                var queueNotifications = new ConcurrentQueue<string>();
                CancellationTokenSource tks = null;
                bool fIsRunning = false;
                int itemToQueue = 0;
                var evntNewItemAvailable = new AsyncManualResetEvent();
 
                btnGo.Click += (ob, eb) =>
                {
                    if (!fIsRunning)
                    {
                        tks = new CancellationTokenSource();
                        Task.Run(async () =>
                        {
                            while (!tks.Token.IsCancellationRequested)
                            {
                                addStatusMsg($"awaiting notification");
                                await evntNewItemAvailable.WaitAsync();
                                while (queueNotifications.Count() > 0)
                                {
                                    if (queueNotifications.TryDequeue(out var result))
                                    {
                                        addStatusMsg($"notification processed {result} ");
                                    }
                                }
                                evntNewItemAvailable.Reset();
                            }
                            addStatusMsg("end task notification");
                        });
                        Task.Run(async () =>
                        {
                            // simulate incoming notifications by putting them into the queue periodically
                            while (!tks.Token.IsCancellationRequested)
                            {
                                await Task.Delay(TimeSpan.FromMilliseconds(1), tks.Token);
                                if (!tks.IsCancellationRequested)
                                {
                                    var newitem = ++itemToQueue;
                                    addStatusMsg($"Queuing new notification {newitem}");
                                    // we add the item to 2 queues
                                    queueNotifications.Enqueue(newitem.ToString());
                                    evntNewItemAvailable.Set();
                                }
                            }
                        });
                    }
                    else
                    {
                        evntNewItemAvailable.Set();
                        tks.Cancel();
                        addStatusMsg("Canceling");
                    }
                    fIsRunning = !fIsRunning;
 
                };
            }
            catch (Exception ex)
            {
                this.Content = ex.ToString();
            }
        }
    }
    // https://blogs.msdn.microsoft.com/pfxteam/2012/02/11/building-async-coordination-primitives-part-1-asyncmanualresetevent/
    internal class AsyncManualResetEvent
    {
        private volatile TaskCompletionSource<bool> tcs = new TaskCompletionSource<bool>();
 
        public Task WaitAsync() => this.tcs.Task;
 
        public void Set()
        {
            // https://stackoverflow.com/questions/12693046/configuring-the-continuation-behaviour-of-a-taskcompletionsources-task
            Task.Run(() =>
            this.tcs.TrySetResult(true));
        }
 
        public void Reset()
        {
            while (true)
            {
                var tcs = this.tcs;
                if (!tcs.Task.IsCompleted ||
                    Interlocked.CompareExchange(ref this.tcs, new TaskCompletionSource<bool>(), tcs) == tcs)
                {
                    return;
                }
            }
        }
    }
}
 

</code>