Walkthrough: Creating a Custom Message Block
The new home for Visual Studio documentation is Visual Studio 2017 Documentation on docs.microsoft.com.
This document describes how to create a custom message block type that orders incoming messages by priority.
Although the built-in message block types provide a wide-range of functionality, you can create your own message block type and customize it to meet the requirements of your application. For a description of the built-in message block types that are provided by the Asynchronous Agents Library, see Asynchronous Message Blocks.
Prerequisites
Read the following documents before you start this walkthrough:
Sections
This walkthrough contains the following sections:
Designing a Custom Message Block
Defining the priority_buffer Class
The Complete Example
Designing a Custom Message Block
Message blocks participate in the act of sending and receiving messages. A message block that sends messages is known as a source block. A message block that receives messages is known as a target block. A message block that both sends and receives messages is known as a propagator block. The Agents Library uses the abstract class concurrency::ISource to represent source blocks and the abstract class concurrency::ITarget to represent target blocks. Message block types that act as sources derive from ISource
; message block types that act as targets derive from ITarget
.
Although you can derive your message block type directly from ISource
and ITarget
, the Agents Library defines three base classes that perform much of the functionality that is common to all message block types, for example, handling errors and connecting message blocks together in a concurrency-safe manner. The concurrency::source_block class derives from ISource
and sends messages to other blocks. The concurrency::target_block class derives from ITarget
and receives messages from other blocks. The concurrency::propagator_block class derives from ISource
and ITarget
and sends messages to other blocks and it receives messages from other blocks. We recommend that you use these three base classes to handle infrastructure details so that you can focus on the behavior of your message block.
The source_block
, target_block
, and propagator_block
classes are templates that are parameterized on a type that manages the connections, or links, between source and target blocks and on a type that manages how messages are processed. The Agents Library defines two types that perform link management, concurrency::single_link_registry and concurrency::multi_link_registry. The single_link_registry
class enables a message block to be linked to one source or to one target. The multi_link_registry
class enables a message block to be linked to multiple sources or multiple targets. The Agents Library defines one class that performs message management, concurrency::ordered_message_processor. The ordered_message_processor
class enables message blocks to process messages in the order in which it receives them.
To better understand how message blocks relate to their sources and targets, consider the following example. This example shows the declaration of the concurrency::transformer class.
template<
class _Input,
class _Output
>
class transformer : public propagator_block<
single_link_registry<ITarget<_Output>>,
multi_link_registry<ISource<_Input>>
>;
The transformer
class derives from propagator_block
, and therefore acts as both a source block and as a target block. It accepts messages of type _Input
and sends messages of type _Output
. The transformer
class specifies single_link_registry
as the link manager for any target blocks and multi_link_registry
as the link manager for any source blocks. Therefore, a transformer
object can have up to one target and an unlimited number of sources.
A class that derives from source_block
must implement six methods: propagate_to_any_targets, accept_message, reserve_message, consume_message, release_message, and resume_propagation. A class that derives from target_block
must implement the propagate_message method and can optionally implement the send_message method. Deriving from propagator_block
is functionally equivalent to deriving from both source_block
and target_block
.
The propagate_to_any_targets
method is called by the runtime to asynchronously or synchronously process any incoming messages and propagate out any outgoing messages. The accept_message
method is called by target blocks to accept messages. Many message block types, such as unbounded_buffer
, send messages only to the first target that would receive it. Therefore, it transfers ownership of the message to the target. Other message block types, such as concurrency::overwrite_buffer, offer messages to each of its target blocks. Therefore, overwrite_buffer
creates a copy of the message for each of its targets.
The reserve_message
, consume_message
, release_message
, and resume_propagation
methods enable message blocks to participate in message reservation. Target blocks call the reserve_message
method when they are offered a message and have to reserve the message for later use. After a target block reserves a message, it can call the consume_message
method to consume that message or the release_message
method to cancel the reservation. As with the accept_message
method, the implementation of consume_message
can either transfer ownership of the message or return a copy of the message. After a target block either consumes or releases a reserved message, the runtime calls the resume_propagation
method. Typically, this method continues message propagation, starting with the next message in the queue.
The runtime calls the propagate_message
method to asynchronously transfer a message from another block to the current one. The send_message
method resembles propagate_message
, except that it synchronously, instead of asynchronously, sends the message to the target blocks. The default implementation of send_message
rejects all incoming messages. The runtime does not call either of these methods if the message does not pass the optional filter function that is associated with the target block. For more information about message filters, see Asynchronous Message Blocks.
[Top]
Defining the priority_buffer Class
The priority_buffer
class is a custom message block type that orders incoming messages first by priority, and then by the order in which messages are received. The priority_buffer
class resembles the concurrency::unbounded_buffer class because it holds a queue of messages, and also because it acts as both a source and a target message block and can have both multiple sources and multiple targets. However, unbounded_buffer
bases message propagation only on the order in which it receives messages from its sources.
The priority_buffer
class receives messages of type std::tuple that contain PriorityType
and Type
elements. PriorityType
refers to the type that holds the priority of each message; Type
refers to the data portion of the message. The priority_buffer
class sends messages of type Type
. The priority_buffer
class also manages two message queues: a std::priority_queue object for incoming messages and a std::queue object for outgoing messages. Ordering messages by priority is useful when a priority_buffer
object receives multiple messages simultaneously or when it receives multiple messages before any messages are read by consumers.
In addition to the seven methods that a class that derives from propagator_block
must implement, the priority_buffer
class also overrides the link_target_notification
and send_message
methods. The priority_buffer
class also defines two public helper methods, enqueue
and dequeue
, and a private helper method, propagate_priority_order
.
The following procedure describes how to implement the priority_buffer
class.
To define the priority_buffer class
Create a C++ header file and name it
priority_buffer.h
. Alternatively, you can use an existing header file that is part of your project.In
priority_buffer.h
, add the following code.
#pragma once
#include <agents.h>
#include <queue>
- In the
std
namespace, define specializations of std::less and std::greater that act on concurrency::message objects.
namespace std
{
// A specialization of less that tests whether the priority element of a
// message is less than the priority element of another message.
template<class Type, class PriorityType>
struct less<concurrency::message<tuple<PriorityType,Type>>*>
{
typedef concurrency::message<tuple<PriorityType, Type>> MessageType;
bool operator()(const MessageType* left, const MessageType* right) const
{
// apply operator< to the first element (the priority)
// of the tuple payload.
return (get<0>(left->payload) < get<0>(right->payload));
}
};
// A specialization of less that tests whether the priority element of a
// message is greater than the priority element of another message.
template<class Type, class PriorityType>
struct greater<concurrency::message<tuple<PriorityType, Type>>*>
{
typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType;
bool operator()(const MessageType* left, const MessageType* right) const
{
// apply operator> to the first element (the priority)
// of the tuple payload.
return (get<0>(left->payload) > get<0>(right->payload));
}
};
}
The `priority_buffer` class stores `message` objects in a `priority_queue` object. These type specializations enable the priority queue to sort messages according to their priority. The priority is the first element of the `tuple` object.
- In the
concurrencyex
namespace, declare thepriority_buffer
class.
namespace concurrencyex
{
template<class Type,
typename PriorityType = int,
typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>,
concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>>
{
public:
protected:
private:
};
}
The `priority_buffer` class derives from `propagator_block`. Therefore, it can both send and receive messages. The `priority_buffer` class can have multiple targets that receive messages of type `Type`. It can also have multiple sources that send messages of type `tuple<PriorityType, Type>`.
- In the
private
section of thepriority_buffer
class, add the following member variables.
// Stores incoming messages.
// The type parameter Pr specifies how to order messages by priority.
std::priority_queue<
concurrency::message<_Source_type>*,
std::vector<concurrency::message<_Source_type>*>,
Pr
> _input_messages;
// Synchronizes access to the input message queue.
concurrency::critical_section _input_lock;
// Stores outgoing messages.
std::queue<concurrency::message<_Target_type>*> _output_messages;
The `priority_queue` object holds incoming messages; the `queue` object holds outgoing messages. A `priority_buffer` object can receive multiple messages simultaneously; the `critical_section` object synchronizes access to the queue of input messages.
- In the
private
section, define the copy constructor and the assignment operator. This preventspriority_queue
objects from being assignable.
// Hide assignment operator and copy constructor.
priority_buffer const &operator =(priority_buffer const&);
priority_buffer(priority_buffer const &);
- In the
public
section, define the constructors that are common to many message block types. Also define the destructor.
// Constructs a priority_buffer message block.
priority_buffer()
{
initialize_source_and_target();
}
// Constructs a priority_buffer message block with the given filter function.
priority_buffer(filter_method const& filter)
{
initialize_source_and_target();
register_filter(filter);
}
// Constructs a priority_buffer message block that uses the provided
// Scheduler object to propagate messages.
priority_buffer(concurrency::Scheduler& scheduler)
{
initialize_source_and_target(&scheduler);
}
// Constructs a priority_buffer message block with the given filter function
// and uses the provided Scheduler object to propagate messages.
priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter)
{
initialize_source_and_target(&scheduler);
register_filter(filter);
}
// Constructs a priority_buffer message block that uses the provided
// SchedulerGroup object to propagate messages.
priority_buffer(concurrency::ScheduleGroup& schedule_group)
{
initialize_source_and_target(NULL, &schedule_group);
}
// Constructs a priority_buffer message block with the given filter function
// and uses the provided SchedulerGroup object to propagate messages.
priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter)
{
initialize_source_and_target(NULL, &schedule_group);
register_filter(filter);
}
// Destroys the message block.
~priority_buffer()
{
// Remove all links.
remove_network_links();
}
- In the
public
section, define the methodsenqueue
anddequeue
. These helper methods provide an alternative way to send messages to and receive messages from apriority_buffer
object.
// Sends an item to the message block.
bool enqueue(Type const& item)
{
return concurrency::asend<Type>(this, item);
}
// Receives an item from the message block.
Type dequeue()
{
return receive<Type>(this);
}
- In the
protected
section, define thepropagate_to_any_targets
method.
// Transfers the message at the front of the input queue to the output queue
// and propagates out all messages in the output queue.
virtual void propagate_to_any_targets(concurrency::message<_Target_type>*)
{
// Retrieve the message from the front of the input queue.
concurrency::message<_Source_type>* input_message = NULL;
{
concurrency::critical_section::scoped_lock lock(_input_lock);
if (_input_messages.size() > 0)
{
input_message = _input_messages.top();
_input_messages.pop();
}
}
// Move the message to the output queue.
if (input_message != NULL)
{
// The payload of the output message does not contain the
// priority of the message.
concurrency::message<_Target_type>* output_message =
new concurrency::message<_Target_type>(get<1>(input_message->payload));
_output_messages.push(output_message);
// Free the memory for the input message.
delete input_message;
// Do not propagate messages if the new message is not the head message.
// In this case, the head message is reserved by another message block.
if (_output_messages.front()->msg_id() != output_message->msg_id())
{
return;
}
}
// Propagate out the output messages.
propagate_priority_order();
}
The `propagate_to_any_targets` method transfers the message that is at the front of the input queue to the output queue and propagates out all messages in the output queue.
- In the
protected
section, define theaccept_message
method.
// Accepts a message that was offered by this block by transferring ownership
// to the caller.
virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id)
{
concurrency::message<_Target_type>* message = NULL;
// Transfer ownership if the provided message identifier matches
// the identifier of the front of the output message queue.
if (!_output_messages.empty() &&
_output_messages.front()->msg_id() == msg_id)
{
message = _output_messages.front();
_output_messages.pop();
}
return message;
}
When a target block calls the `accept_message` method, the `priority_buffer` class transfers ownership of the message to the first target block that accepts it. (This resembles the behavior of `unbounded_buffer`.)
- In the
protected
section, define thereserve_message
method.
// Reserves a message that was previously offered by this block.
virtual bool reserve_message(concurrency::runtime_object_identity msg_id)
{
// Allow the message to be reserved if the provided message identifier
// is the message identifier of the front of the message queue.
return (!_output_messages.empty() &&
_output_messages.front()->msg_id() == msg_id);
}
The `priority_buffer` class permits a target block to reserve a message when the provided message identifier matches the identifier of the message that is at the front of the queue. In other words, a target can reserve the message if the `priority_buffer` object has not yet received an additional message and has not yet propagated out the current one.
- In the
protected
section, define theconsume_message
method.
// Transfers the message that was previously offered by this block
// to the caller. The caller of this method is the target block that
// reserved the message.
virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id)
{
// Transfer ownership of the message to the caller.
return accept_message(msg_id);
}
A target block calls `consume_message` to transfer ownership of the message that it reserved.
- In the
protected
section, define therelease_message
method.
// Releases a previous message reservation.
virtual void release_message(concurrency::runtime_object_identity msg_id)
{
// The head message must be the one that is reserved.
if (_output_messages.empty() ||
_output_messages.front()->msg_id() != msg_id)
{
throw message_not_found();
}
}
A target block calls `release_message` to cancel its reservation to a message.
- In the
protected
section, define theresume_propagation
method.
// Resumes propagation after a reservation has been released.
virtual void resume_propagation()
{
// Propagate out any messages in the output queue.
if (_output_messages.size() > 0)
{
async_send(NULL);
}
}
The runtime calls `resume_propagation` after a target block either consumes or releases a reserved message. This method propagates out any messages that are in the output queue.
- In the
protected
section, define thelink_target_notification
method.
// Notifies this block that a new target has been linked to it.
virtual void link_target_notification(concurrency::ITarget<_Target_type>*)
{
// Do not propagate messages if a target block reserves
// the message at the front of the queue.
if (_M_pReservedFor != NULL)
{
return;
}
// Propagate out any messages that are in the output queue.
propagate_priority_order();
}
The `_M_pReservedFor` member variable is defined by the base class, `source_block`. This member variable points to the target block, if any, that is holding a reservation to the message that is at the front of the output queue. The runtime calls `link_target_notification` when a new target is linked to the `priority_buffer` object. This method propagates out any messages that are in the output queue if no target is holding a reservation.
- In the
private
section, define thepropagate_priority_order
method.
// Propagates messages in priority order.
void propagate_priority_order()
{
// Cancel propagation if another block reserves the head message.
if (_M_pReservedFor != NULL)
{
return;
}
// Propagate out all output messages.
// Because this block preserves message ordering, stop propagation
// if any of the messages are not accepted by a target block.
while (!_output_messages.empty())
{
// Get the next message.
concurrency::message<_Target_type> * message = _output_messages.front();
concurrency::message_status status = declined;
// Traverse each target in the order in which they are connected.
for (target_iterator iter = _M_connectedTargets.begin();
*iter != NULL;
++iter)
{
// Propagate the message to the target.
concurrency::ITarget<_Target_type>* target = *iter;
status = target->propagate(message, this);
// If the target accepts the message then ownership of message has
// changed. Do not propagate this message to any other target.
if (status == accepted)
{
break;
}
// If the target only reserved this message, we must wait until the
// target accepts the message.
if (_M_pReservedFor != NULL)
{
break;
}
}
// If status is anything other than accepted, then the head message
// was not propagated out. To preserve the order in which output
// messages are propagated, we must stop propagation until the head
// message is accepted.
if (status != accepted)
{
break;
}
}
}
This method propagates out all messages from the output queue. Every message in the queue is offered to every target block until one of the target blocks accepts the message. The `priority_buffer` class preserves the order of the outgoing messages. Therefore, the first message in the output queue must be accepted by a target block before this method offers any other message to the target blocks.
- In the
protected
section, define thepropagate_message
method.
// Asynchronously passes a message from an ISource block to this block.
// This method is typically called by propagator_block::propagate.
virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message,
concurrency::ISource<_Source_type>* source)
{
// Accept the message from the source block.
message = source->accept(message->msg_id(), this);
if (message != NULL)
{
// Insert the message into the input queue. The type parameter Pr
// defines how to order messages by priority.
{
concurrency::critical_section::scoped_lock lock(_input_lock);
_input_messages.push(message);
}
// Asynchronously send the message to the target blocks.
async_send(NULL);
return accepted;
}
else
{
return missed;
}
}
The `propagate_message` method enables the `priority_buffer` class to act as a message receiver, or target. This method receives the message that is offered by the provided source block and inserts that message into the priority queue. The `propagate_message` method then asynchronously sends all output messages to the target blocks.
The runtime calls this method when you call the [concurrency::asend](../Topic/asend%20Function.md) function or when the message block is connected to other message blocks.
- In the
protected
section, define thesend_message
method.
// Synchronously passes a message from an ISource block to this block.
// This method is typically called by propagator_block::send.
virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message,
concurrency::ISource<_Source_type>* source)
{
// Accept the message from the source block.
message = source->accept(message->msg_id(), this);
if (message != NULL)
{
// Insert the message into the input queue. The type parameter Pr
// defines how to order messages by priority.
{
concurrency::critical_section::scoped_lock lock(_input_lock);
_input_messages.push(message);
}
// Synchronously send the message to the target blocks.
sync_send(NULL);
return accepted;
}
else
{
return missed;
}
}
The `send_message` method resembles `propagate_message`. However it sends the output messages synchronously instead of asynchronously.
The runtime calls this method during a synchronous send operation, such as when you call the [concurrency::send](../Topic/send%20Function.md) function.
The priority_buffer
class contains constructor overloads that are typical in many message block types. Some constructor overloads take concurrency::Scheduler or concurrency::ScheduleGroup objects, which enable the message block to be managed by a specific task scheduler. Other constructor overloads take a filter function. Filter functions enable message blocks to accept or reject a message on the basis of its payload. For more information about message filters, see Asynchronous Message Blocks. For more information about task schedulers, see Task Scheduler.
Because the priority_buffer
class orders messages by priority and then by the order in which messages are received, this class is most useful when it receives messages asynchronously, for example, when you call the concurrency::asend function or when the message block is connected to other message blocks.
[Top]
The Complete Example
The following example shows the complete definition of the priority_buffer
class.
// priority_buffer.h
#pragma once
#include <agents.h>
#include <queue>
namespace std
{
// A specialization of less that tests whether the priority element of a
// message is less than the priority element of another message.
template<class Type, class PriorityType>
struct less<concurrency::message<tuple<PriorityType,Type>>*>
{
typedef concurrency::message<tuple<PriorityType, Type>> MessageType;
bool operator()(const MessageType* left, const MessageType* right) const
{
// apply operator< to the first element (the priority)
// of the tuple payload.
return (get<0>(left->payload) < get<0>(right->payload));
}
};
// A specialization of less that tests whether the priority element of a
// message is greater than the priority element of another message.
template<class Type, class PriorityType>
struct greater<concurrency::message<tuple<PriorityType, Type>>*>
{
typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType;
bool operator()(const MessageType* left, const MessageType* right) const
{
// apply operator> to the first element (the priority)
// of the tuple payload.
return (get<0>(left->payload) > get<0>(right->payload));
}
};
}
namespace concurrencyex
{
// A message block type that orders incoming messages first by priority,
// and then by the order in which messages are received.
template<class Type,
typename PriorityType = int,
typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>,
concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>>
{
public:
// Constructs a priority_buffer message block.
priority_buffer()
{
initialize_source_and_target();
}
// Constructs a priority_buffer message block with the given filter function.
priority_buffer(filter_method const& filter)
{
initialize_source_and_target();
register_filter(filter);
}
// Constructs a priority_buffer message block that uses the provided
// Scheduler object to propagate messages.
priority_buffer(concurrency::Scheduler& scheduler)
{
initialize_source_and_target(&scheduler);
}
// Constructs a priority_buffer message block with the given filter function
// and uses the provided Scheduler object to propagate messages.
priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter)
{
initialize_source_and_target(&scheduler);
register_filter(filter);
}
// Constructs a priority_buffer message block that uses the provided
// SchedulerGroup object to propagate messages.
priority_buffer(concurrency::ScheduleGroup& schedule_group)
{
initialize_source_and_target(NULL, &schedule_group);
}
// Constructs a priority_buffer message block with the given filter function
// and uses the provided SchedulerGroup object to propagate messages.
priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter)
{
initialize_source_and_target(NULL, &schedule_group);
register_filter(filter);
}
// Destroys the message block.
~priority_buffer()
{
// Remove all links.
remove_network_links();
}
// Sends an item to the message block.
bool enqueue(Type const& item)
{
return concurrency::asend<Type>(this, item);
}
// Receives an item from the message block.
Type dequeue()
{
return receive<Type>(this);
}
protected:
// Asynchronously passes a message from an ISource block to this block.
// This method is typically called by propagator_block::propagate.
virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message,
concurrency::ISource<_Source_type>* source)
{
// Accept the message from the source block.
message = source->accept(message->msg_id(), this);
if (message != NULL)
{
// Insert the message into the input queue. The type parameter Pr
// defines how to order messages by priority.
{
concurrency::critical_section::scoped_lock lock(_input_lock);
_input_messages.push(message);
}
// Asynchronously send the message to the target blocks.
async_send(NULL);
return accepted;
}
else
{
return missed;
}
}
// Synchronously passes a message from an ISource block to this block.
// This method is typically called by propagator_block::send.
virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message,
concurrency::ISource<_Source_type>* source)
{
// Accept the message from the source block.
message = source->accept(message->msg_id(), this);
if (message != NULL)
{
// Insert the message into the input queue. The type parameter Pr
// defines how to order messages by priority.
{
concurrency::critical_section::scoped_lock lock(_input_lock);
_input_messages.push(message);
}
// Synchronously send the message to the target blocks.
sync_send(NULL);
return accepted;
}
else
{
return missed;
}
}
// Accepts a message that was offered by this block by transferring ownership
// to the caller.
virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id)
{
concurrency::message<_Target_type>* message = NULL;
// Transfer ownership if the provided message identifier matches
// the identifier of the front of the output message queue.
if (!_output_messages.empty() &&
_output_messages.front()->msg_id() == msg_id)
{
message = _output_messages.front();
_output_messages.pop();
}
return message;
}
// Reserves a message that was previously offered by this block.
virtual bool reserve_message(concurrency::runtime_object_identity msg_id)
{
// Allow the message to be reserved if the provided message identifier
// is the message identifier of the front of the message queue.
return (!_output_messages.empty() &&
_output_messages.front()->msg_id() == msg_id);
}
// Transfers the message that was previously offered by this block
// to the caller. The caller of this method is the target block that
// reserved the message.
virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id)
{
// Transfer ownership of the message to the caller.
return accept_message(msg_id);
}
// Releases a previous message reservation.
virtual void release_message(concurrency::runtime_object_identity msg_id)
{
// The head message must be the one that is reserved.
if (_output_messages.empty() ||
_output_messages.front()->msg_id() != msg_id)
{
throw message_not_found();
}
}
// Resumes propagation after a reservation has been released.
virtual void resume_propagation()
{
// Propagate out any messages in the output queue.
if (_output_messages.size() > 0)
{
async_send(NULL);
}
}
// Notifies this block that a new target has been linked to it.
virtual void link_target_notification(concurrency::ITarget<_Target_type>*)
{
// Do not propagate messages if a target block reserves
// the message at the front of the queue.
if (_M_pReservedFor != NULL)
{
return;
}
// Propagate out any messages that are in the output queue.
propagate_priority_order();
}
// Transfers the message at the front of the input queue to the output queue
// and propagates out all messages in the output queue.
virtual void propagate_to_any_targets(concurrency::message<_Target_type>*)
{
// Retrieve the message from the front of the input queue.
concurrency::message<_Source_type>* input_message = NULL;
{
concurrency::critical_section::scoped_lock lock(_input_lock);
if (_input_messages.size() > 0)
{
input_message = _input_messages.top();
_input_messages.pop();
}
}
// Move the message to the output queue.
if (input_message != NULL)
{
// The payload of the output message does not contain the
// priority of the message.
concurrency::message<_Target_type>* output_message =
new concurrency::message<_Target_type>(get<1>(input_message->payload));
_output_messages.push(output_message);
// Free the memory for the input message.
delete input_message;
// Do not propagate messages if the new message is not the head message.
// In this case, the head message is reserved by another message block.
if (_output_messages.front()->msg_id() != output_message->msg_id())
{
return;
}
}
// Propagate out the output messages.
propagate_priority_order();
}
private:
// Propagates messages in priority order.
void propagate_priority_order()
{
// Cancel propagation if another block reserves the head message.
if (_M_pReservedFor != NULL)
{
return;
}
// Propagate out all output messages.
// Because this block preserves message ordering, stop propagation
// if any of the messages are not accepted by a target block.
while (!_output_messages.empty())
{
// Get the next message.
concurrency::message<_Target_type> * message = _output_messages.front();
concurrency::message_status status = declined;
// Traverse each target in the order in which they are connected.
for (target_iterator iter = _M_connectedTargets.begin();
*iter != NULL;
++iter)
{
// Propagate the message to the target.
concurrency::ITarget<_Target_type>* target = *iter;
status = target->propagate(message, this);
// If the target accepts the message then ownership of message has
// changed. Do not propagate this message to any other target.
if (status == accepted)
{
break;
}
// If the target only reserved this message, we must wait until the
// target accepts the message.
if (_M_pReservedFor != NULL)
{
break;
}
}
// If status is anything other than accepted, then the head message
// was not propagated out. To preserve the order in which output
// messages are propagated, we must stop propagation until the head
// message is accepted.
if (status != accepted)
{
break;
}
}
}
private:
// Stores incoming messages.
// The type parameter Pr specifies how to order messages by priority.
std::priority_queue<
concurrency::message<_Source_type>*,
std::vector<concurrency::message<_Source_type>*>,
Pr
> _input_messages;
// Synchronizes access to the input message queue.
concurrency::critical_section _input_lock;
// Stores outgoing messages.
std::queue<concurrency::message<_Target_type>*> _output_messages;
private:
// Hide assignment operator and copy constructor.
priority_buffer const &operator =(priority_buffer const&);
priority_buffer(priority_buffer const &);
};
}
The following example concurrently performs a number of asend
and concurrency::receive operations on a priority_buffer
object.
// priority_buffer.cpp
// compile with: /EHsc
#include <ppl.h>
#include <iostream>
#include "priority_buffer.h"
using namespace concurrency;
using namespace concurrencyex;
using namespace std;
int wmain()
{
// Concurrently perform a number of asend and receive operations
// on a priority_buffer object.
priority_buffer<int> pb;
parallel_invoke(
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(2, 36)); },
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(0, 12)); },
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(1, 24)); },
[&pb] {
for (int i = 0; i < 75; ++i) {
wcout << receive(pb) << L' ';
if ((i+1) % 25 == 0)
wcout << endl;
}
}
);
}
This example produces the following sample output.
36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36
24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24
12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12
The priority_buffer
class orders messages first by priority and then by the order in which it receives messages. In this example, messages with greater numerical priority are inserted towards the front of the queue.
[Top]
Compiling the Code
Copy the example code and paste it in a Visual Studio project, or paste the definition of the priority_buffer
class in a file that is named priority_buffer.h
and the test program in a file that is named priority_buffer.cpp
and then run the following command in a Visual Studio Command Prompt window.
cl.exe /EHsc priority_buffer.cpp
See Also
Concurrency Runtime Walkthroughs
Asynchronous Message Blocks
Message Passing Functions