How to: Use the Context Class to Implement a Cooperative Semaphore
This topic shows how to use the Concurrency::Context class to implement a cooperative semaphore class.
The Context class lets you block or yield the current execution context. Blocking or yielding the current context is useful when the current context cannot proceed because a resource is not available. A semaphore is an example of one situation where the current execution context must wait for a resource to become available. A semaphore, like a critical section object, is a synchronization object that enables code in one context to have exclusive access to a resource. However, unlike a critical section object, a semaphore enables more than one context to access the resource concurrently. If the maximum number of contexts holds a semaphore lock, each additional context must wait for another context to release the lock.
To implement the semaphore class
Declare a class that is named semaphore. Add public and private sections to this class.
// A semaphore type that uses cooperative blocking semantics. class semaphore { public: private: };
In the private section of the semaphore class, declare a variable of type LONG that holds the semaphore count, and a Concurrency::concurrent_queue object that holds the contexts that must wait to acquire the semaphore.
// The semaphore count. LONG _semaphore_count; // A concurrency-safe queue of contexts that must wait to // acquire the semaphore. concurrent_queue<Context*> _waiting_contexts;
In the public section of the semaphore class, implement the constructor. The constructor takes a LONG value that specifies the maximum number of contexts that can concurrently hold the lock.
explicit semaphore(LONG capacity) : _semaphore_count(capacity) { }
In the public section of the semaphore class, implement the acquire method. This method decrements the semaphore count as an atomic operation. If the semaphore count becomes negative, add the current context to the end of the wait queue and call the Concurrency::Context::Block method to block the current context.
// Acquires access to the semaphore. void acquire() { // The capacity of the semaphore is exceeded when the semaphore count // falls below zero. When this happens, add the current context to the // back of the wait queue and block the current context. if (InterlockedDecrement(&_semaphore_count) < 0) { _waiting_contexts.push(Context::CurrentContext()); Context::Block(); } }
In the public section of the semaphore class, implement the release method. This method increments the semaphore count as an atomic operation. If the semaphore count is negative before the increment operation, there is at least one context that is waiting for the lock. In this case, unblock the context that is at the front of the wait queue.
// Releases access to the semaphore. void release() { // If the semaphore count is negative, unblock the first waiting context. if (InterlockedIncrement(&_semaphore_count) <= 0) { // A call to acquire might have decremented the counter, but has not // yet finished adding the context to the queue. // Create a spin loop that waits for the context to become available. Context* waiting = NULL; while (!_waiting_contexts.try_pop(waiting)) { Context::Yield(); } // Unblock the context. waiting->Unblock(); } }
Example
The semaphore class in this example behaves cooperatively because the Context::Block and Context::Yield methods yield execution so that the runtime can perform other tasks.
The acquire method decrements the counter, but it might not finish adding the context to the wait queue before another context calls the release method. To account for this, the release method uses a spin loop that calls the Concurrency::Context::Yield method to wait for the acquire method to finish adding the context.
The release method can call the Context::Unblock method before the acquire method calls the Context::Block method. You do not have to protect against this race condition because the runtime allows for these methods to be called in any order. If the release method calls Context::Unblock before the acquire method calls Context::Block for the same context, that context remains unblocked. The runtime only requires that each call to Context::Block is matched with a corresponding call to Context::Unblock.
The following example shows the complete semaphore class. The wmain function shows basic usage of this class. The wmain function uses the Concurrency::parallel_for algorithm to create several tasks that require access to the semaphore. Because three threads can hold the lock at any time, some tasks must wait for another task to finish and release the lock.
// cooperative-semaphore.cpp
// compile with: /EHsc
#include <windows.h>
#include <concrt.h>
#include <ppl.h>
#include <concurrent_queue.h>
#include <iostream>
#include <sstream>
using namespace Concurrency;
using namespace std;
// A semaphore type that uses cooperative blocking semantics.
class semaphore
{
public:
explicit semaphore(LONG capacity)
: _semaphore_count(capacity)
{
}
// Acquires access to the semaphore.
void acquire()
{
// The capacity of the semaphore is exceeded when the semaphore count
// falls below zero. When this happens, add the current context to the
// back of the wait queue and block the current context.
if (InterlockedDecrement(&_semaphore_count) < 0)
{
_waiting_contexts.push(Context::CurrentContext());
Context::Block();
}
}
// Releases access to the semaphore.
void release()
{
// If the semaphore count is negative, unblock the first waiting context.
if (InterlockedIncrement(&_semaphore_count) <= 0)
{
// A call to acquire might have decremented the counter, but has not
// yet finished adding the context to the queue.
// Create a spin loop that waits for the context to become available.
Context* waiting = NULL;
while (!_waiting_contexts.try_pop(waiting))
{
Context::Yield();
}
// Unblock the context.
waiting->Unblock();
}
}
private:
// The semaphore count.
LONG _semaphore_count;
// A concurrency-safe queue of contexts that must wait to
// acquire the semaphore.
concurrent_queue<Context*> _waiting_contexts;
};
int wmain()
{
// Create a semaphore that allows at most three threads to
// hold the lock.
semaphore s(3);
parallel_for(0, 10, [&](int i) {
// Acquire the lock.
s.acquire();
// Print a message to the console.
wstringstream ss;
ss << L"In loop iteration " << i << L"..." << endl;
wcout << ss.str();
// Simulate work by waiting for two seconds.
wait(2000);
// Release the lock.
s.release();
});
}
This example produces the following sample output.
In loop iteration 5...
In loop iteration 0...
In loop iteration 6...
In loop iteration 1...
In loop iteration 2...
In loop iteration 7...
In loop iteration 3...
In loop iteration 8...
In loop iteration 9...
In loop iteration 4...
For more information about the concurrent_queue class, see Parallel Containers and Objects. For more information about the parallel_for algorithm, see Parallel Algorithms.
Compiling the Code
Copy the example code and paste it in a Visual Studio project, or paste it in a file that is named cooperative-semaphore.cpp and then run the following command in a Visual Studio 2010 Command Prompt window.
cl.exe /EHsc cooperative-semaphore.cpp
Robust Programming
You can use the Resource Acquisition Is Initialization (RAII) pattern to limit access to a semaphore object to a given scope. Under the RAII pattern, a data structure is allocated on the stack. That data structure initializes or acquires a resource when it is created and destroys or releases that resource when the data structure is destroyed. The RAII pattern guarantees that the destructor is called before the enclosing scope exits. Therefore, the resource is correctly managed when an exception is thrown or when a function contains multiple return statements.
The following example defines a class that is named scoped_lock, which is defined in the public section of the semaphore class. The scoped_lock class resembles the Concurrency::critical_section::scoped_lock and Concurrency::reader_writer_lock::scoped_lock classes. The constructor of the semaphore::scoped_lock class acquires access to the given semaphore object and the destructor releases access to that object.
// An exception-safe RAII wrapper for the semaphore class.
class scoped_lock
{
public:
// Acquires access to the semaphore.
scoped_lock(semaphore& s)
: _s(s)
{
_s.acquire();
}
// Releases access to the semaphore.
~scoped_lock()
{
_s.release();
}
private:
semaphore& _s;
};
The following example modifies the body of the work function that is passed to the parallel_for algorithm so that it uses RAII to ensure that the semaphore is released before the function returns. This technique ensures that the work function is exception-safe.
parallel_for(0, 10, [&](int i) {
// Create an exception-safe scoped_lock object that holds the lock
// for the duration of the current scope.
semaphore::scoped_lock auto_lock(s);
// Print a message to the console.
wstringstream ss;
ss << L"In loop iteration " << i << L"..." << endl;
wcout << ss.str();
// Simulate work by waiting for two seconds.
wait(2000);
});