Tutorial: Crear un bloque de mensajes personalizado
En este documento se describe cómo crear un tipo de bloque de mensajes personalizado que ordena los mensajes entrantes por prioridad.
Aunque los tipos integrados de bloques de mensajes proporciona una amplia gama de funcionalidad, puede crear su propio tipo de bloque de mensajes y personalizarlo para satisfacer los requisitos de la aplicación. Para obtener una descripción de los tipos de bloques de mensajes integrados proporcionados por la Biblioteca de agentes asincrónicos, consulte Bloques de mensajes asincrónicos.
Requisitos previos
Lea los documentos siguientes antes de iniciar este tutorial:
Secciones
Este tutorial contiene las siguientes secciones:
Diseño de un bloque de mensajes personalizado
Los bloques de mensajes participan en el acto de enviar y recibir mensajes. Un bloque de mensajes que envía mensajes se conoce como un bloque de origen. Un bloque de mensajes que recibe mensajes se conoce como un bloque de destino. Un bloque de mensajes que envía y recibe mensajes se conoce como un bloque propagador. La Biblioteca de agentes usa la clase abstracta concurrency::ISource para representar bloques de origen y la clase abstracta concurrency::ITarget para representar bloques de destino. Los tipos de bloques de mensajes que actúan como orígenes derivan de ISource
; los tipos de bloques de mensajes que actúan como destinos derivan de ITarget
.
Aunque puede derivar el tipo de bloque de mensajes directamente de ISource
y ITarget
, la Biblioteca de agentes define tres clases base que realizan gran parte de la funcionalidad común a todos los tipos de bloques de mensajes; por ejemplo, control de errores y conexión de los bloques de mensajes de manera segura para simultaneidad. La clase concurrency::source_block deriva de ISource
y envía mensajes a otros bloques. La clase concurrency::target_block deriva de ITarget
y recibe mensajes de otros bloques. La clase concurrency::propagator_block deriva de ISource
y ITarget
, y envía mensajes a otros bloques y recibe mensajes de otros bloques. Se recomienda usar estas tres clases base para controlar los detalles de infraestructura de modo que se pueda centrar en el comportamiento del bloque de mensajes.
Las clases source_block
, target_block
y propagator_block
son plantillas que se parametrizan en un tipo que administra las conexiones, o vínculos, entre los bloques de origen y de destino y en un tipo que administra cómo se procesan los mensajes. La Biblioteca de agentes define dos tipos que realizan administración de vínculos: concurrency::single_link_registry y concurrency::multi_link_registry. La clase single_link_registry
permite vincular un bloque de mensajes a un origen o a un destino. La clase multi_link_registry
permite vincular un bloque de mensajes a varios orígenes o a varios destinos. La Biblioteca de agentes define una clase que realiza administración de mensajes, concurrency::ordered_message_processor. La clase ordered_message_processor
permite que los bloques de mensajes procesen los mensajes en el orden en que se reciben.
Para entender mejor cómo se relacionan los bloques de mensajes con sus orígenes y destinos, considere el ejemplo siguiente. En este ejemplo se muestra la declaración de la clase concurrency::transformer.
template<
class _Input,
class _Output
>
class transformer : public propagator_block<
single_link_registry<ITarget<_Output>>,
multi_link_registry<ISource<_Input>>
>;
La clase transformer
se deriva de propagator_block
y, por tanto, actúa como bloque de origen y como bloque de destino. Acepta mensajes de tipo _Input
y envía mensajes de tipo _Output
. La clase transformer
especifica single_link_registry
como administrador de vínculos para los bloques de destino y multi_link_registry
como administrador de vínculos para los bloques de origen. Por tanto, un objeto transformer
puede tener hasta un destino y un número ilimitado de orígenes.
Una clase derivada de source_block
debe implementar seis métodos:propagate_to_any_targets, accept_message, reserve_message, consume_message, release_message y resume_propagation. Una clase derivada de target_block
debe implementar el método propagate_message y puede implementar opcionalmente el método send_message. Derivar de propagator_block
es funcionalmente equivalente a la derivación de source_block
y target_block
.
El runtime llama al método propagate_to_any_targets
para procesar de forma sincrónica o asincrónica los mensajes entrantes y propagar los mensajes salientes. Los bloques de destino llaman al método accept_message
para aceptar mensajes. Muchos tipos de bloques de mensajes, como unbounded_buffer
, envían mensajes solo al primer destino que los recibiría. Por tanto, transfiere la propiedad del mensaje al destino. Otros tipos de bloques de mensajes, como concurrency::overwrite_buffer, ofrecen mensajes a cada uno de sus bloques de destino. Por tanto, overwrite_buffer
crea una copia del mensaje para cada uno de sus destinos.
Los métodos reserve_message
, consume_message
, release_message
y resume_propagation
permiten a los bloques de mensajes participar en la reserva de mensajes. Los bloques de destino llaman al método reserve_message
cuando se les ofrece un mensaje y tienen que reservar el mensaje para su uso posterior. Después de que un bloque de destino reserva un mensaje, puede llamar al método consume_message
para usar ese mensaje o al método release_message
para cancelar la reserva. Como sucede con el método accept_message
, la implementación de consume_message
puede transferir la propiedad del mensaje o devolver una copia del mensaje. Después de que un bloque de destino usa o libera un mensaje reservado, el runtime llama al método resume_propagation
. Normalmente, este método continúa la propagación de mensajes, comenzando por el siguiente mensaje de la cola.
El runtime llama al método propagate_message
para transferir de forma asincrónica un mensaje de otro bloque al actual. El método send_message
es similar a propagate_message
, excepto en que envía de forma sincrónica, en lugar de asincrónica, el mensaje a los bloques de destino. La implementación predeterminada de send_message
rechaza todos los mensajes entrantes. El runtime no llama a ninguno de estos métodos si el mensaje no supera la función opcional de filtro asociada al bloque de destino. Para más información sobre filtros de mensajes, consulte Bloques de mensajes asincrónicos.
[Arriba]
Definición de la clase priority_buffer
La clase priority_buffer
es un tipo de bloque de mensajes personalizado que ordena los mensajes entrantes primero por prioridad y, a continuación, en el orden en que se reciben los mensajes. La clase priority_buffer
es similar a la clase concurrency::unbounded_buffer porque contiene una cola de mensajes, y también porque actúa como bloque de mensajes de origen y de destino, y puede tener varios orígenes y varios destinos. Sin embargo, unbounded_buffer
basa la propagación de mensaje solo en el orden en que recibe mensajes de sus orígenes.
La clase priority_buffer
recibe mensajes de tipo std::tuple que contienen elementos PriorityType
y Type
. PriorityType
se refiere al tipo que contiene la prioridad de cada mensaje; Type
se refiere a la parte de datos del mensaje. La clase priority_buffer
envía mensajes de tipo Type
. La clase priority_buffer
también administra dos colas de mensajes: un objeto std::priority_queue para los mensajes entrantes y un objeto std::queue para los mensajes salientes. Ordenar los mensajes por prioridad es útil cuando un objeto priority_buffer
recibe varios mensajes simultáneamente o cuando recibe varios mensajes antes de que los consumidores lean cualquier mensaje.
Además de los siete métodos que una clase derivada de propagator_block
debe implementar, la clase priority_buffer
también invalida los métodos link_target_notification
y send_message
. La clase priority_buffer
también define dos métodos del asistente públicos, enqueue
y dequeue
, y un método del asistente privado, propagate_priority_order
.
En el procedimiento siguiente se describe cómo implementar la clase priority_buffer
.
Para definir la clase priority_buffer
Cree el archivo de encabezado de C++ y nómbrelo
priority_buffer.h
. O bien, puede usar un archivo de encabezado existente que forme parte del proyecto.En
priority_buffer.h
, agregue el código siguiente.#pragma once #include <agents.h> #include <queue>
En el espacio de nombres
std
, defina especializaciones de std::less y std::greater que actúen en objetos de concurrency::message.namespace std { // A specialization of less that tests whether the priority element of a // message is less than the priority element of another message. template<class Type, class PriorityType> struct less<concurrency::message<tuple<PriorityType,Type>>*> { typedef concurrency::message<tuple<PriorityType, Type>> MessageType; bool operator()(const MessageType* left, const MessageType* right) const { // apply operator< to the first element (the priority) // of the tuple payload. return (get<0>(left->payload) < get<0>(right->payload)); } }; // A specialization of less that tests whether the priority element of a // message is greater than the priority element of another message. template<class Type, class PriorityType> struct greater<concurrency::message<tuple<PriorityType, Type>>*> { typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType; bool operator()(const MessageType* left, const MessageType* right) const { // apply operator> to the first element (the priority) // of the tuple payload. return (get<0>(left->payload) > get<0>(right->payload)); } }; }
La clase
priority_buffer
almacena objetosmessage
en un objetopriority_queue
. Estas especializaciones de tipo permiten a la cola de prioridad ordenar los mensajes según su prioridad. La prioridad es el primer elemento del objetotuple
.En el espacio de nombres
concurrencyex
, declare la clasepriority_buffer
.namespace concurrencyex { template<class Type, typename PriorityType = int, typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>> class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>, concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>> { public: protected: private: }; }
La clase
priority_buffer
deriva depropagator_block
. Por tanto, puede enviar y recibir mensajes. La clasepriority_buffer
puede tener varios destinos que reciben mensajes de tipoType
. También puede tener varios orígenes que envían mensajes de tipotuple<PriorityType, Type>
.En la sección
private
de la clasepriority_buffer
, agregue las variables miembro siguientes.// Stores incoming messages. // The type parameter Pr specifies how to order messages by priority. std::priority_queue< concurrency::message<_Source_type>*, std::vector<concurrency::message<_Source_type>*>, Pr > _input_messages; // Synchronizes access to the input message queue. concurrency::critical_section _input_lock; // Stores outgoing messages. std::queue<concurrency::message<_Target_type>*> _output_messages;
El objeto
priority_queue
contiene mensajes entrantes; el objetoqueue
contiene mensajes salientes. Un objetopriority_buffer
puede recibir varios mensajes simultáneamente; el objetocritical_section
sincroniza el acceso a la cola de mensajes entrantes.En la sección
private
, defina el constructor de copias y el operador de asignación. Esto impide que los objetospriority_queue
sean asignables.// Hide assignment operator and copy constructor. priority_buffer const &operator =(priority_buffer const&); priority_buffer(priority_buffer const &);
En la sección
public
, defina los constructores que son comunes a muchos tipos de bloques de mensajes. Defina también el destructor.// Constructs a priority_buffer message block. priority_buffer() { initialize_source_and_target(); } // Constructs a priority_buffer message block with the given filter function. priority_buffer(filter_method const& filter) { initialize_source_and_target(); register_filter(filter); } // Constructs a priority_buffer message block that uses the provided // Scheduler object to propagate messages. priority_buffer(concurrency::Scheduler& scheduler) { initialize_source_and_target(&scheduler); } // Constructs a priority_buffer message block with the given filter function // and uses the provided Scheduler object to propagate messages. priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter) { initialize_source_and_target(&scheduler); register_filter(filter); } // Constructs a priority_buffer message block that uses the provided // SchedulerGroup object to propagate messages. priority_buffer(concurrency::ScheduleGroup& schedule_group) { initialize_source_and_target(NULL, &schedule_group); } // Constructs a priority_buffer message block with the given filter function // and uses the provided SchedulerGroup object to propagate messages. priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter) { initialize_source_and_target(NULL, &schedule_group); register_filter(filter); } // Destroys the message block. ~priority_buffer() { // Remove all links. remove_network_links(); }
En la sección
public
, defina los métodosenqueue
ydequeue
. Estos métodos del asistente proporcionan una manera alternativa de enviar mensajes a y recibir mensajes de un objetopriority_buffer
.// Sends an item to the message block. bool enqueue(Type const& item) { return concurrency::asend<Type>(this, item); } // Receives an item from the message block. Type dequeue() { return receive<Type>(this); }
En la sección
protected
, defina el métodopropagate_to_any_targets
.// Transfers the message at the front of the input queue to the output queue // and propagates out all messages in the output queue. virtual void propagate_to_any_targets(concurrency::message<_Target_type>*) { // Retrieve the message from the front of the input queue. concurrency::message<_Source_type>* input_message = NULL; { concurrency::critical_section::scoped_lock lock(_input_lock); if (_input_messages.size() > 0) { input_message = _input_messages.top(); _input_messages.pop(); } } // Move the message to the output queue. if (input_message != NULL) { // The payload of the output message does not contain the // priority of the message. concurrency::message<_Target_type>* output_message = new concurrency::message<_Target_type>(get<1>(input_message->payload)); _output_messages.push(output_message); // Free the memory for the input message. delete input_message; // Do not propagate messages if the new message is not the head message. // In this case, the head message is reserved by another message block. if (_output_messages.front()->msg_id() != output_message->msg_id()) { return; } } // Propagate out the output messages. propagate_priority_order(); }
El método
propagate_to_any_targets
transfiere a la cola de salida el mensaje que está al principio de la cola de entrada y propaga todos los mensajes de la cola de salida.En la sección
protected
, defina el métodoaccept_message
.// Accepts a message that was offered by this block by transferring ownership // to the caller. virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id) { concurrency::message<_Target_type>* message = NULL; // Transfer ownership if the provided message identifier matches // the identifier of the front of the output message queue. if (!_output_messages.empty() && _output_messages.front()->msg_id() == msg_id) { message = _output_messages.front(); _output_messages.pop(); } return message; }
Cuando un bloque de destino llama al método
accept_message
, la clasepriority_buffer
transfiere la propiedad del mensaje al primer bloque de destino que lo acepta. (Esto es similar al comportamiento deunbounded_buffer
.)En la sección
protected
, defina el métodoreserve_message
.// Reserves a message that was previously offered by this block. virtual bool reserve_message(concurrency::runtime_object_identity msg_id) { // Allow the message to be reserved if the provided message identifier // is the message identifier of the front of the message queue. return (!_output_messages.empty() && _output_messages.front()->msg_id() == msg_id); }
La clase
priority_buffer
permite a un bloque de destino reservar un mensaje cuando el identificador de mensaje proporcionado coincide con el identificador de mensaje que está en el principio de la cola. Es decir, un destino puede reservar el mensaje si el objetopriority_buffer
aún no ha recibido un mensaje adicional y aún no ha propagado el actual.En la sección
protected
, defina el métodoconsume_message
.// Transfers the message that was previously offered by this block // to the caller. The caller of this method is the target block that // reserved the message. virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id) { // Transfer ownership of the message to the caller. return accept_message(msg_id); }
Un bloque de destino llama a
consume_message
para transferir la propiedad del mensaje reservado.En la sección
protected
, defina el métodorelease_message
.// Releases a previous message reservation. virtual void release_message(concurrency::runtime_object_identity msg_id) { // The head message must be the one that is reserved. if (_output_messages.empty() || _output_messages.front()->msg_id() != msg_id) { throw message_not_found(); } }
Un bloque de destino llama a
release_message
para cancelar la reserva un mensaje.En la sección
protected
, defina el métodoresume_propagation
.// Resumes propagation after a reservation has been released. virtual void resume_propagation() { // Propagate out any messages in the output queue. if (_output_messages.size() > 0) { async_send(NULL); } }
El runtime llama a
resume_propagation
después de que un bloque de destino use o libere un mensaje reservado. Este método propaga cualquier mensaje que esté en la cola de salida.En la sección
protected
, defina el métodolink_target_notification
.// Notifies this block that a new target has been linked to it. virtual void link_target_notification(concurrency::ITarget<_Target_type>*) { // Do not propagate messages if a target block reserves // the message at the front of the queue. if (_M_pReservedFor != NULL) { return; } // Propagate out any messages that are in the output queue. propagate_priority_order(); }
La variable miembro
_M_pReservedFor
está definida por la clase base,source_block
. Esta variable miembro apunta al bloque de destino, si existe, que mantiene una reserva del mensaje que está al principio de la cola de salida. El runtime llama alink_target_notification
cuando un nuevo destino se vincula al objetopriority_buffer
. Este método propaga cualquier mensaje que está en la cola de salida si ningún destino mantiene una reserva.En la sección
private
, defina el métodopropagate_priority_order
.// Propagates messages in priority order. void propagate_priority_order() { // Cancel propagation if another block reserves the head message. if (_M_pReservedFor != NULL) { return; } // Propagate out all output messages. // Because this block preserves message ordering, stop propagation // if any of the messages are not accepted by a target block. while (!_output_messages.empty()) { // Get the next message. concurrency::message<_Target_type> * message = _output_messages.front(); concurrency::message_status status = declined; // Traverse each target in the order in which they are connected. for (target_iterator iter = _M_connectedTargets.begin(); *iter != NULL; ++iter) { // Propagate the message to the target. concurrency::ITarget<_Target_type>* target = *iter; status = target->propagate(message, this); // If the target accepts the message then ownership of message has // changed. Do not propagate this message to any other target. if (status == accepted) { break; } // If the target only reserved this message, we must wait until the // target accepts the message. if (_M_pReservedFor != NULL) { break; } } // If status is anything other than accepted, then the head message // was not propagated out. To preserve the order in which output // messages are propagated, we must stop propagation until the head // message is accepted. if (status != accepted) { break; } } }
Este método propaga todos los mensajes de la cola de salida. Todos los mensajes de la cola se ofrecen a todos los bloques de destino hasta que uno de los bloques de destino acepta el mensaje. La clase
priority_buffer
conserva el orden de los mensajes salientes. Por tanto, un bloque de destino debe aceptar el primer mensaje de la cola de salida antes de que este método ofrezca ningún otro mensaje a los bloques de destino.En la sección
protected
, defina el métodopropagate_message
.// Asynchronously passes a message from an ISource block to this block. // This method is typically called by propagator_block::propagate. virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message, concurrency::ISource<_Source_type>* source) { // Accept the message from the source block. message = source->accept(message->msg_id(), this); if (message != NULL) { // Insert the message into the input queue. The type parameter Pr // defines how to order messages by priority. { concurrency::critical_section::scoped_lock lock(_input_lock); _input_messages.push(message); } // Asynchronously send the message to the target blocks. async_send(NULL); return accepted; } else { return missed; } }
El método
propagate_message
permite que la clasepriority_buffer
actúe como receptor de mensajes o destino. Este método recibe el mensaje ofrecido por el bloque de origen proporcionado e inserta ese mensaje en la cola de prioridad. El métodopropagate_message
envía de forma asincrónica todos los mensajes de salida a los bloques de destino.El runtime llama a este método cuando se llama a la función concurrency::asend o cuando el bloque de mensajes está conectado con otros bloques de mensajes.
En la sección
protected
, defina el métodosend_message
.// Synchronously passes a message from an ISource block to this block. // This method is typically called by propagator_block::send. virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message, concurrency::ISource<_Source_type>* source) { // Accept the message from the source block. message = source->accept(message->msg_id(), this); if (message != NULL) { // Insert the message into the input queue. The type parameter Pr // defines how to order messages by priority. { concurrency::critical_section::scoped_lock lock(_input_lock); _input_messages.push(message); } // Synchronously send the message to the target blocks. sync_send(NULL); return accepted; } else { return missed; } }
El método
send_message
es similar apropagate_message
. Sin embargo, envía los mensajes de salida sincrónicamente en lugar de hacerlo de forma asincrónica.El runtime llama a este método durante una operación sincrónica de envío, como cuando llama a la función concurrency::send.
La clase priority_buffer
contiene sobrecargas del constructor que son típicas en muchos tipos de bloques de mensajes. Algunas sobrecargas de constructor toman objetos concurrency::Scheduler o concurrency::ScheduleGroup, que permiten que un programador de tareas específico administre el bloque de mensajes. Otras sobrecargas del constructor toman una función de filtro. Las funciones de filtro permiten a los bloques de mensajes aceptar o rechazar un mensaje en función de su carga. Para más información sobre filtros de mensajes, consulte Bloques de mensajes asincrónicos. Para obtener más información sobre los programadores de tareas, consulte Programador de tareas.
Puesto que la clase priority_buffer
ordena los mensajes por prioridad y, a continuación, por el orden en que se reciben, esta clase es más útil cuando recibe los mensajes de forma asincrónica, por ejemplo, cuando se llama a la función concurrency::asend o cuando el bloque de mensajes está conectado a otros bloques de mensajes.
[Arriba]
Ejemplo completo
En el ejemplo siguiente se muestra la definición completa de la clase priority_buffer
.
// priority_buffer.h
#pragma once
#include <agents.h>
#include <queue>
namespace std
{
// A specialization of less that tests whether the priority element of a
// message is less than the priority element of another message.
template<class Type, class PriorityType>
struct less<concurrency::message<tuple<PriorityType,Type>>*>
{
typedef concurrency::message<tuple<PriorityType, Type>> MessageType;
bool operator()(const MessageType* left, const MessageType* right) const
{
// apply operator< to the first element (the priority)
// of the tuple payload.
return (get<0>(left->payload) < get<0>(right->payload));
}
};
// A specialization of less that tests whether the priority element of a
// message is greater than the priority element of another message.
template<class Type, class PriorityType>
struct greater<concurrency::message<tuple<PriorityType, Type>>*>
{
typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType;
bool operator()(const MessageType* left, const MessageType* right) const
{
// apply operator> to the first element (the priority)
// of the tuple payload.
return (get<0>(left->payload) > get<0>(right->payload));
}
};
}
namespace concurrencyex
{
// A message block type that orders incoming messages first by priority,
// and then by the order in which messages are received.
template<class Type,
typename PriorityType = int,
typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>,
concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>>
{
public:
// Constructs a priority_buffer message block.
priority_buffer()
{
initialize_source_and_target();
}
// Constructs a priority_buffer message block with the given filter function.
priority_buffer(filter_method const& filter)
{
initialize_source_and_target();
register_filter(filter);
}
// Constructs a priority_buffer message block that uses the provided
// Scheduler object to propagate messages.
priority_buffer(concurrency::Scheduler& scheduler)
{
initialize_source_and_target(&scheduler);
}
// Constructs a priority_buffer message block with the given filter function
// and uses the provided Scheduler object to propagate messages.
priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter)
{
initialize_source_and_target(&scheduler);
register_filter(filter);
}
// Constructs a priority_buffer message block that uses the provided
// SchedulerGroup object to propagate messages.
priority_buffer(concurrency::ScheduleGroup& schedule_group)
{
initialize_source_and_target(NULL, &schedule_group);
}
// Constructs a priority_buffer message block with the given filter function
// and uses the provided SchedulerGroup object to propagate messages.
priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter)
{
initialize_source_and_target(NULL, &schedule_group);
register_filter(filter);
}
// Destroys the message block.
~priority_buffer()
{
// Remove all links.
remove_network_links();
}
// Sends an item to the message block.
bool enqueue(Type const& item)
{
return concurrency::asend<Type>(this, item);
}
// Receives an item from the message block.
Type dequeue()
{
return receive<Type>(this);
}
protected:
// Asynchronously passes a message from an ISource block to this block.
// This method is typically called by propagator_block::propagate.
virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message,
concurrency::ISource<_Source_type>* source)
{
// Accept the message from the source block.
message = source->accept(message->msg_id(), this);
if (message != NULL)
{
// Insert the message into the input queue. The type parameter Pr
// defines how to order messages by priority.
{
concurrency::critical_section::scoped_lock lock(_input_lock);
_input_messages.push(message);
}
// Asynchronously send the message to the target blocks.
async_send(NULL);
return accepted;
}
else
{
return missed;
}
}
// Synchronously passes a message from an ISource block to this block.
// This method is typically called by propagator_block::send.
virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message,
concurrency::ISource<_Source_type>* source)
{
// Accept the message from the source block.
message = source->accept(message->msg_id(), this);
if (message != NULL)
{
// Insert the message into the input queue. The type parameter Pr
// defines how to order messages by priority.
{
concurrency::critical_section::scoped_lock lock(_input_lock);
_input_messages.push(message);
}
// Synchronously send the message to the target blocks.
sync_send(NULL);
return accepted;
}
else
{
return missed;
}
}
// Accepts a message that was offered by this block by transferring ownership
// to the caller.
virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id)
{
concurrency::message<_Target_type>* message = NULL;
// Transfer ownership if the provided message identifier matches
// the identifier of the front of the output message queue.
if (!_output_messages.empty() &&
_output_messages.front()->msg_id() == msg_id)
{
message = _output_messages.front();
_output_messages.pop();
}
return message;
}
// Reserves a message that was previously offered by this block.
virtual bool reserve_message(concurrency::runtime_object_identity msg_id)
{
// Allow the message to be reserved if the provided message identifier
// is the message identifier of the front of the message queue.
return (!_output_messages.empty() &&
_output_messages.front()->msg_id() == msg_id);
}
// Transfers the message that was previously offered by this block
// to the caller. The caller of this method is the target block that
// reserved the message.
virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id)
{
// Transfer ownership of the message to the caller.
return accept_message(msg_id);
}
// Releases a previous message reservation.
virtual void release_message(concurrency::runtime_object_identity msg_id)
{
// The head message must be the one that is reserved.
if (_output_messages.empty() ||
_output_messages.front()->msg_id() != msg_id)
{
throw message_not_found();
}
}
// Resumes propagation after a reservation has been released.
virtual void resume_propagation()
{
// Propagate out any messages in the output queue.
if (_output_messages.size() > 0)
{
async_send(NULL);
}
}
// Notifies this block that a new target has been linked to it.
virtual void link_target_notification(concurrency::ITarget<_Target_type>*)
{
// Do not propagate messages if a target block reserves
// the message at the front of the queue.
if (_M_pReservedFor != NULL)
{
return;
}
// Propagate out any messages that are in the output queue.
propagate_priority_order();
}
// Transfers the message at the front of the input queue to the output queue
// and propagates out all messages in the output queue.
virtual void propagate_to_any_targets(concurrency::message<_Target_type>*)
{
// Retrieve the message from the front of the input queue.
concurrency::message<_Source_type>* input_message = NULL;
{
concurrency::critical_section::scoped_lock lock(_input_lock);
if (_input_messages.size() > 0)
{
input_message = _input_messages.top();
_input_messages.pop();
}
}
// Move the message to the output queue.
if (input_message != NULL)
{
// The payload of the output message does not contain the
// priority of the message.
concurrency::message<_Target_type>* output_message =
new concurrency::message<_Target_type>(get<1>(input_message->payload));
_output_messages.push(output_message);
// Free the memory for the input message.
delete input_message;
// Do not propagate messages if the new message is not the head message.
// In this case, the head message is reserved by another message block.
if (_output_messages.front()->msg_id() != output_message->msg_id())
{
return;
}
}
// Propagate out the output messages.
propagate_priority_order();
}
private:
// Propagates messages in priority order.
void propagate_priority_order()
{
// Cancel propagation if another block reserves the head message.
if (_M_pReservedFor != NULL)
{
return;
}
// Propagate out all output messages.
// Because this block preserves message ordering, stop propagation
// if any of the messages are not accepted by a target block.
while (!_output_messages.empty())
{
// Get the next message.
concurrency::message<_Target_type> * message = _output_messages.front();
concurrency::message_status status = declined;
// Traverse each target in the order in which they are connected.
for (target_iterator iter = _M_connectedTargets.begin();
*iter != NULL;
++iter)
{
// Propagate the message to the target.
concurrency::ITarget<_Target_type>* target = *iter;
status = target->propagate(message, this);
// If the target accepts the message then ownership of message has
// changed. Do not propagate this message to any other target.
if (status == accepted)
{
break;
}
// If the target only reserved this message, we must wait until the
// target accepts the message.
if (_M_pReservedFor != NULL)
{
break;
}
}
// If status is anything other than accepted, then the head message
// was not propagated out. To preserve the order in which output
// messages are propagated, we must stop propagation until the head
// message is accepted.
if (status != accepted)
{
break;
}
}
}
private:
// Stores incoming messages.
// The type parameter Pr specifies how to order messages by priority.
std::priority_queue<
concurrency::message<_Source_type>*,
std::vector<concurrency::message<_Source_type>*>,
Pr
> _input_messages;
// Synchronizes access to the input message queue.
concurrency::critical_section _input_lock;
// Stores outgoing messages.
std::queue<concurrency::message<_Target_type>*> _output_messages;
private:
// Hide assignment operator and copy constructor.
priority_buffer const &operator =(priority_buffer const&);
priority_buffer(priority_buffer const &);
};
}
En el ejemplo siguiente se realiza simultáneamente una serie de operaciones asend
y concurrency::receive sobre un objeto priority_buffer
.
// priority_buffer.cpp
// compile with: /EHsc
#include <ppl.h>
#include <iostream>
#include "priority_buffer.h"
using namespace concurrency;
using namespace concurrencyex;
using namespace std;
int wmain()
{
// Concurrently perform a number of asend and receive operations
// on a priority_buffer object.
priority_buffer<int> pb;
parallel_invoke(
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(2, 36)); },
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(0, 12)); },
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(1, 24)); },
[&pb] {
for (int i = 0; i < 75; ++i) {
wcout << receive(pb) << L' ';
if ((i+1) % 25 == 0)
wcout << endl;
}
}
);
}
Este ejemplo genera la siguiente salida de ejemplo.
36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36
24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24
12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12
La clase priority_buffer
ordena los mensajes primero por prioridad y, a continuación, por el orden en que recibe los mensajes. En este ejemplo, los mensajes con mayor prioridad numérica se insertan al principio de la cola.
[Arriba]
Compilar el código
Copie el código de ejemplo y péguelo en un proyecto de Visual Studio, o pegue la definición de la clase priority_buffer
en un archivo denominado priority_buffer.h
, el programa de prueba en un archivo denominado priority_buffer.cpp
y, a continuación, ejecute el comando siguiente en una ventana del símbolo del sistema de Visual Studio.
cl.exe /EHsc priority_buffer.cpp
Consulte también
Tutoriales del Runtime de simultaneidad
Bloques de mensajes asincrónicos
Funciones que pasan mensajes