Procédure pas à pas : création d'un bloc de message personnalisé
Ce document explique comment créer un type de bloc de messages personnalisé qui trie les messages entrants par priorité.
Bien que les types de blocs de messages intégrés fournissent un large éventail de fonctionnalités, vous pouvez créer votre propre type de bloc de message et le personnaliser pour répondre aux exigences de votre application. Pour obtenir une description des types de blocs de messages intégrés fournis par la bibliothèque d’agents asynchrones, consultez blocs de messages asynchrones.
Prérequis
Lisez les documents suivants avant de commencer cette procédure pas à pas :
Sections
Cette procédure pas à pas contient les sections suivantes :
Conception d’un bloc de messages personnalisé
Les blocs de messages participent à l’acte d’envoi et de réception de messages. Un bloc de messages qui envoie des messages est appelé bloc source. Un bloc de messages qui reçoit des messages est appelé bloc cible. Un bloc de messages qui envoie et reçoit des messages est appelé bloc de propagation. La bibliothèque agents utilise la concurrence de classe abstraite ::ISource pour représenter les blocs sources et la concurrence de classe abstraite ::ITarget pour représenter les blocs cibles. Types de blocs de message qui agissent en tant que sources dérivent ISource
; les types de blocs de messages qui agissent en tant que cibles dérivent de ITarget
.
Bien que vous puissiez dériver directement votre type de ISource
bloc de message et ITarget
, la bibliothèque d’agents définit trois classes de base qui effectuent une grande partie des fonctionnalités communes à tous les types de blocs de messages, par exemple, la gestion des erreurs et la connexion de blocs de messages ensemble d’une manière sécurisée par accès concurrentiel. La classe concurrency ::source_block dérive et ISource
envoie des messages à d’autres blocs. La classe concurrency ::target_block dérive et ITarget
reçoit des messages d’autres blocs. La classe concurrency ::p ropagator_block dérive et ISource
ITarget
envoie des messages à d’autres blocs et reçoit des messages d’autres blocs. Nous vous recommandons d’utiliser ces trois classes de base pour gérer les détails de l’infrastructure afin que vous puissiez vous concentrer sur le comportement de votre bloc de messages.
Les source_block
classes et propagator_block
les modèles target_block
sont des modèles paramétrés sur un type qui gère les connexions ou les liens, entre les blocs source et cible et sur un type qui gère le traitement des messages. La bibliothèque agents définit deux types qui effectuent la gestion des liens, concurrency ::single_link_registry et concurrency ::multi_link_registry. La single_link_registry
classe permet à un bloc de message d’être lié à une source ou à une cible. La multi_link_registry
classe permet à un bloc de message d’être lié à plusieurs sources ou à plusieurs cibles. La bibliothèque agents définit une classe qui effectue la gestion des messages, concurrency ::ordered_message_processor. La ordered_message_processor
classe permet aux blocs de messages de traiter les messages dans l’ordre dans lequel il les reçoit.
Pour mieux comprendre comment les blocs de messages sont liés à leurs sources et cibles, tenez compte de l’exemple suivant. Cet exemple montre la déclaration de la classe concurrency ::transformer .
template<
class _Input,
class _Output
>
class transformer : public propagator_block<
single_link_registry<ITarget<_Output>>,
multi_link_registry<ISource<_Input>>
>;
La transformer
classe dérive de propagator_block
, et agit donc comme un bloc source et comme un bloc cible. Il accepte les messages de type _Input
et envoie des messages de type _Output
. La transformer
classe spécifie single_link_registry
comme gestionnaire de liens pour tous les blocs cibles et multi_link_registry
comme gestionnaire de liens pour tous les blocs sources. Par conséquent, un transformer
objet peut avoir jusqu’à une cible et un nombre illimité de sources.
Une classe dérivée de source_block
doit implémenter six méthodes : propagate_to_any_targets, accept_message, reserve_message, consume_message, release_message et resume_propagation. Une classe dérivée de target_block
doit implémenter la méthode propagate_message et peut éventuellement implémenter la méthode send_message . La dérivation est propagator_block
fonctionnellement équivalente à la dérivation des deux source_block
et target_block
.
La propagate_to_any_targets
méthode est appelée par le runtime pour traiter de manière asynchrone ou synchrone tous les messages entrants et propager tous les messages sortants. La accept_message
méthode est appelée par les blocs cibles pour accepter les messages. De nombreux types de blocs de messages, tels que unbounded_buffer
, envoient des messages uniquement à la première cible qui le recevrait. Par conséquent, il transfère la propriété du message à la cible. D’autres types de blocs de messages, tels que concurrency ::overwrite_buffer, offrent des messages à chacun de ses blocs cibles. Par conséquent, overwrite_buffer
crée une copie du message pour chacune de ses cibles.
Les reserve_message
méthodes , et les resume_propagation
consume_message
release_message
méthodes permettent aux blocs de messages de participer à la réservation de messages. Les blocs cibles appellent la reserve_message
méthode lorsqu’elles sont proposées à un message et doivent réserver le message pour une utilisation ultérieure. Une fois qu’un bloc cible réserve un message, il peut appeler la consume_message
méthode pour consommer ce message ou la release_message
méthode pour annuler la réservation. Comme avec la accept_message
méthode, l’implémentation de consume_message
peut transférer la propriété du message ou retourner une copie du message. Une fois qu’un bloc cible consomme ou libère un message réservé, le runtime appelle la resume_propagation
méthode. En règle générale, cette méthode poursuit la propagation des messages, en commençant par le message suivant dans la file d’attente.
Le runtime appelle la propagate_message
méthode pour transférer de façon asynchrone un message d’un autre bloc vers celui actuel. La send_message
méthode ressemble propagate_message
, sauf qu’elle est synchrone, au lieu de manière asynchrone, envoie le message aux blocs cibles. L’implémentation par défaut de send_message
rejette tous les messages entrants. Le runtime n’appelle pas l’une de ces méthodes si le message ne transmet pas la fonction de filtre facultative associée au bloc cible. Pour plus d’informations sur les filtres de messages, consultez Blocs de messages asynchrones.
[Haut]
Définition de la classe priority_buffer
La priority_buffer
classe est un type de bloc de messages personnalisé qui commande d’abord les messages entrants par priorité, puis par l’ordre dans lequel les messages sont reçus. La priority_buffer
classe ressemble à la classe concurrency ::unbounded_buffer , car elle contient une file d’attente de messages, et également parce qu’elle agit à la fois comme une source et un bloc de messages cible et peut avoir à la fois plusieurs sources et plusieurs cibles. Toutefois, unbounded_buffer
base la propagation des messages uniquement sur l’ordre dans lequel il reçoit des messages de ses sources.
La priority_buffer
classe reçoit les messages de type std ::tuple qui contiennent et contiennent PriorityType
des Type
éléments. PriorityType
fait référence au type qui contient la priorité de chaque message ; Type
fait référence à la partie données du message. La priority_buffer
classe envoie des messages de type Type
. La priority_buffer
classe gère également deux files d’attente de messages : un objet std ::p riority_queue pour les messages entrants et un objet std ::queue pour les messages sortants. L’ordre des messages par priorité est utile lorsqu’un priority_buffer
objet reçoit plusieurs messages simultanément ou lorsqu’il reçoit plusieurs messages avant que les messages ne soient lus par les consommateurs.
Outre les sept méthodes qu’une classe dérivée de propagator_block
doit implémenter, la priority_buffer
classe remplace également les méthodes et send_message
les link_target_notification
méthodes. La priority_buffer
classe définit également deux méthodes d’assistance publique et dequeue
enqueue
une méthode d’assistance privée. propagate_priority_order
La procédure suivante décrit comment implémenter la priority_buffer
classe.
Pour définir la classe priority_buffer
Créez un fichier d’en-tête C++ et nommez-le
priority_buffer.h
. Vous pouvez également utiliser un fichier d’en-tête existant qui fait partie de votre projet.Dans
priority_buffer.h
, ajoutez le code suivant.#pragma once #include <agents.h> #include <queue>
Dans l’espace
std
de noms, définissez des spécialisations de std ::less et std ::greater qui agissent sur les objets concurrency ::message .namespace std { // A specialization of less that tests whether the priority element of a // message is less than the priority element of another message. template<class Type, class PriorityType> struct less<concurrency::message<tuple<PriorityType,Type>>*> { typedef concurrency::message<tuple<PriorityType, Type>> MessageType; bool operator()(const MessageType* left, const MessageType* right) const { // apply operator< to the first element (the priority) // of the tuple payload. return (get<0>(left->payload) < get<0>(right->payload)); } }; // A specialization of less that tests whether the priority element of a // message is greater than the priority element of another message. template<class Type, class PriorityType> struct greater<concurrency::message<tuple<PriorityType, Type>>*> { typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType; bool operator()(const MessageType* left, const MessageType* right) const { // apply operator> to the first element (the priority) // of the tuple payload. return (get<0>(left->payload) > get<0>(right->payload)); } }; }
La
priority_buffer
classe stocke desmessage
objets dans unpriority_queue
objet. Ces spécialisations de type permettent à la file d’attente de priorité de trier les messages en fonction de leur priorité. La priorité est le premier élément de l’objettuple
.Dans l’espace
concurrencyex
de noms, déclarez lapriority_buffer
classe.namespace concurrencyex { template<class Type, typename PriorityType = int, typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>> class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>, concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>> { public: protected: private: }; }
La classe
priority_buffer
est dérivée depropagator_block
. Par conséquent, il peut envoyer et recevoir des messages. Lapriority_buffer
classe peut avoir plusieurs cibles qui reçoivent des messages de typeType
. Il peut également avoir plusieurs sources qui envoient des messages de typetuple<PriorityType, Type>
.Dans la
private
section de lapriority_buffer
classe, ajoutez les variables membres suivantes.// Stores incoming messages. // The type parameter Pr specifies how to order messages by priority. std::priority_queue< concurrency::message<_Source_type>*, std::vector<concurrency::message<_Source_type>*>, Pr > _input_messages; // Synchronizes access to the input message queue. concurrency::critical_section _input_lock; // Stores outgoing messages. std::queue<concurrency::message<_Target_type>*> _output_messages;
L’objet
priority_queue
contient des messages entrants ; l’objetqueue
contient des messages sortants. Unpriority_buffer
objet peut recevoir plusieurs messages simultanément ; l’objetcritical_section
synchronise l’accès à la file d’attente des messages d’entrée.Dans la
private
section, définissez le constructeur de copie et l’opérateur d’affectation. Cela empêche lespriority_queue
objets d’être assignables.// Hide assignment operator and copy constructor. priority_buffer const &operator =(priority_buffer const&); priority_buffer(priority_buffer const &);
Dans la
public
section, définissez les constructeurs qui sont communs à de nombreux types de blocs de messages. Définissez également le destructeur.// Constructs a priority_buffer message block. priority_buffer() { initialize_source_and_target(); } // Constructs a priority_buffer message block with the given filter function. priority_buffer(filter_method const& filter) { initialize_source_and_target(); register_filter(filter); } // Constructs a priority_buffer message block that uses the provided // Scheduler object to propagate messages. priority_buffer(concurrency::Scheduler& scheduler) { initialize_source_and_target(&scheduler); } // Constructs a priority_buffer message block with the given filter function // and uses the provided Scheduler object to propagate messages. priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter) { initialize_source_and_target(&scheduler); register_filter(filter); } // Constructs a priority_buffer message block that uses the provided // SchedulerGroup object to propagate messages. priority_buffer(concurrency::ScheduleGroup& schedule_group) { initialize_source_and_target(NULL, &schedule_group); } // Constructs a priority_buffer message block with the given filter function // and uses the provided SchedulerGroup object to propagate messages. priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter) { initialize_source_and_target(NULL, &schedule_group); register_filter(filter); } // Destroys the message block. ~priority_buffer() { // Remove all links. remove_network_links(); }
Dans la
public
section, définissez les méthodesenqueue
etdequeue
. Ces méthodes d’assistance offrent une autre façon d’envoyer des messages à unpriority_buffer
objet et de les recevoir.// Sends an item to the message block. bool enqueue(Type const& item) { return concurrency::asend<Type>(this, item); } // Receives an item from the message block. Type dequeue() { return receive<Type>(this); }
Dans la
protected
section, définissez lapropagate_to_any_targets
méthode.// Transfers the message at the front of the input queue to the output queue // and propagates out all messages in the output queue. virtual void propagate_to_any_targets(concurrency::message<_Target_type>*) { // Retrieve the message from the front of the input queue. concurrency::message<_Source_type>* input_message = NULL; { concurrency::critical_section::scoped_lock lock(_input_lock); if (_input_messages.size() > 0) { input_message = _input_messages.top(); _input_messages.pop(); } } // Move the message to the output queue. if (input_message != NULL) { // The payload of the output message does not contain the // priority of the message. concurrency::message<_Target_type>* output_message = new concurrency::message<_Target_type>(get<1>(input_message->payload)); _output_messages.push(output_message); // Free the memory for the input message. delete input_message; // Do not propagate messages if the new message is not the head message. // In this case, the head message is reserved by another message block. if (_output_messages.front()->msg_id() != output_message->msg_id()) { return; } } // Propagate out the output messages. propagate_priority_order(); }
La
propagate_to_any_targets
méthode transfère le message qui se trouve à l’avant de la file d’attente d’entrée vers la file d’attente de sortie et propage tous les messages dans la file d’attente de sortie.Dans la
protected
section, définissez laaccept_message
méthode.// Accepts a message that was offered by this block by transferring ownership // to the caller. virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id) { concurrency::message<_Target_type>* message = NULL; // Transfer ownership if the provided message identifier matches // the identifier of the front of the output message queue. if (!_output_messages.empty() && _output_messages.front()->msg_id() == msg_id) { message = _output_messages.front(); _output_messages.pop(); } return message; }
Lorsqu’un bloc cible appelle la
accept_message
méthode, lapriority_buffer
classe transfère la propriété du message au premier bloc cible qui l’accepte. (Cela ressemble au comportement deunbounded_buffer
.)Dans la
protected
section, définissez lareserve_message
méthode.// Reserves a message that was previously offered by this block. virtual bool reserve_message(concurrency::runtime_object_identity msg_id) { // Allow the message to be reserved if the provided message identifier // is the message identifier of the front of the message queue. return (!_output_messages.empty() && _output_messages.front()->msg_id() == msg_id); }
La
priority_buffer
classe permet à un bloc cible de réserver un message lorsque l’identificateur de message fourni correspond à l’identificateur du message qui se trouve à l’avant de la file d’attente. En d’autres termes, une cible peut réserver le message si l’objetpriority_buffer
n’a pas encore reçu de message supplémentaire et n’a pas encore propagé le message actuel.Dans la
protected
section, définissez laconsume_message
méthode.// Transfers the message that was previously offered by this block // to the caller. The caller of this method is the target block that // reserved the message. virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id) { // Transfer ownership of the message to the caller. return accept_message(msg_id); }
Un bloc cible appelle
consume_message
pour transférer la propriété du message qu’il a réservé.Dans la
protected
section, définissez larelease_message
méthode.// Releases a previous message reservation. virtual void release_message(concurrency::runtime_object_identity msg_id) { // The head message must be the one that is reserved. if (_output_messages.empty() || _output_messages.front()->msg_id() != msg_id) { throw message_not_found(); } }
Un bloc cible appelle
release_message
pour annuler sa réservation à un message.Dans la
protected
section, définissez laresume_propagation
méthode.// Resumes propagation after a reservation has been released. virtual void resume_propagation() { // Propagate out any messages in the output queue. if (_output_messages.size() > 0) { async_send(NULL); } }
Le runtime appelle
resume_propagation
une fois qu’un bloc cible consomme ou libère un message réservé. Cette méthode propage tous les messages qui se trouvent dans la file d’attente de sortie.Dans la
protected
section, définissez lalink_target_notification
méthode.// Notifies this block that a new target has been linked to it. virtual void link_target_notification(concurrency::ITarget<_Target_type>*) { // Do not propagate messages if a target block reserves // the message at the front of the queue. if (_M_pReservedFor != NULL) { return; } // Propagate out any messages that are in the output queue. propagate_priority_order(); }
La
_M_pReservedFor
variable membre est définie par la classe de base.source_block
Cette variable membre pointe vers le bloc cible, le cas échéant, qui contient une réservation au message qui se trouve à l’avant de la file d’attente de sortie. Le runtime appellelink_target_notification
lorsqu’une nouvelle cible est liée à l’objetpriority_buffer
. Cette méthode propage tous les messages qui se trouvent dans la file d’attente de sortie si aucune cible ne contient de réservation.Dans la
private
section, définissez lapropagate_priority_order
méthode.// Propagates messages in priority order. void propagate_priority_order() { // Cancel propagation if another block reserves the head message. if (_M_pReservedFor != NULL) { return; } // Propagate out all output messages. // Because this block preserves message ordering, stop propagation // if any of the messages are not accepted by a target block. while (!_output_messages.empty()) { // Get the next message. concurrency::message<_Target_type> * message = _output_messages.front(); concurrency::message_status status = declined; // Traverse each target in the order in which they are connected. for (target_iterator iter = _M_connectedTargets.begin(); *iter != NULL; ++iter) { // Propagate the message to the target. concurrency::ITarget<_Target_type>* target = *iter; status = target->propagate(message, this); // If the target accepts the message then ownership of message has // changed. Do not propagate this message to any other target. if (status == accepted) { break; } // If the target only reserved this message, we must wait until the // target accepts the message. if (_M_pReservedFor != NULL) { break; } } // If status is anything other than accepted, then the head message // was not propagated out. To preserve the order in which output // messages are propagated, we must stop propagation until the head // message is accepted. if (status != accepted) { break; } } }
Cette méthode propage tous les messages de la file d’attente de sortie. Chaque message de la file d’attente est proposé à chaque bloc cible jusqu’à ce qu’un des blocs cibles accepte le message. La
priority_buffer
classe conserve l’ordre des messages sortants. Par conséquent, le premier message de la file d’attente de sortie doit être accepté par un bloc cible avant que cette méthode n’offre tout autre message aux blocs cibles.Dans la
protected
section, définissez lapropagate_message
méthode.// Asynchronously passes a message from an ISource block to this block. // This method is typically called by propagator_block::propagate. virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message, concurrency::ISource<_Source_type>* source) { // Accept the message from the source block. message = source->accept(message->msg_id(), this); if (message != NULL) { // Insert the message into the input queue. The type parameter Pr // defines how to order messages by priority. { concurrency::critical_section::scoped_lock lock(_input_lock); _input_messages.push(message); } // Asynchronously send the message to the target blocks. async_send(NULL); return accepted; } else { return missed; } }
La
propagate_message
méthode permet à lapriority_buffer
classe d’agir en tant que récepteur de messages ou cible. Cette méthode reçoit le message proposé par le bloc source fourni et insère ce message dans la file d’attente prioritaire. Lapropagate_message
méthode envoie ensuite de manière asynchrone tous les messages de sortie aux blocs cibles.Le runtime appelle cette méthode lorsque vous appelez la fonction concurrency ::asend ou lorsque le bloc de messages est connecté à d’autres blocs de messages.
Dans la
protected
section, définissez lasend_message
méthode.// Synchronously passes a message from an ISource block to this block. // This method is typically called by propagator_block::send. virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message, concurrency::ISource<_Source_type>* source) { // Accept the message from the source block. message = source->accept(message->msg_id(), this); if (message != NULL) { // Insert the message into the input queue. The type parameter Pr // defines how to order messages by priority. { concurrency::critical_section::scoped_lock lock(_input_lock); _input_messages.push(message); } // Synchronously send the message to the target blocks. sync_send(NULL); return accepted; } else { return missed; } }
La
send_message
méthode ressemblepropagate_message
à . Toutefois, il envoie les messages de sortie de manière synchrone au lieu de manière asynchrone.Le runtime appelle cette méthode pendant une opération d’envoi synchrone, par exemple lorsque vous appelez la fonction concurrency ::send .
La priority_buffer
classe contient des surcharges de constructeur qui sont typiques de nombreux types de blocs de messages. Certaines surcharges de constructeur prennent l’accès concurrentiel ::Scheduler ou les objets concurrency ::ScheduleGroup, ce qui permet au bloc de message d’être géré par un planificateur de tâches spécifique. D’autres surcharges de constructeur prennent une fonction de filtre. Les fonctions de filtre permettent aux blocs de messages d’accepter ou de rejeter un message en fonction de sa charge utile. Pour plus d’informations sur les filtres de messages, consultez Blocs de messages asynchrones. Pour plus d’informations sur les planificateurs de tâches, consultez Planificateur de tâches.
Étant donné que la priority_buffer
classe trie les messages par priorité, puis par l’ordre dans lequel les messages sont reçus, cette classe est la plus utile lorsqu’elle reçoit des messages de manière asynchrone, par exemple lorsque vous appelez la fonction concurrency ::asend ou lorsque le bloc de messages est connecté à d’autres blocs de messages.
[Haut]
Exemple complet
L’exemple suivant montre la définition complète de la priority_buffer
classe.
// priority_buffer.h
#pragma once
#include <agents.h>
#include <queue>
namespace std
{
// A specialization of less that tests whether the priority element of a
// message is less than the priority element of another message.
template<class Type, class PriorityType>
struct less<concurrency::message<tuple<PriorityType,Type>>*>
{
typedef concurrency::message<tuple<PriorityType, Type>> MessageType;
bool operator()(const MessageType* left, const MessageType* right) const
{
// apply operator< to the first element (the priority)
// of the tuple payload.
return (get<0>(left->payload) < get<0>(right->payload));
}
};
// A specialization of less that tests whether the priority element of a
// message is greater than the priority element of another message.
template<class Type, class PriorityType>
struct greater<concurrency::message<tuple<PriorityType, Type>>*>
{
typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType;
bool operator()(const MessageType* left, const MessageType* right) const
{
// apply operator> to the first element (the priority)
// of the tuple payload.
return (get<0>(left->payload) > get<0>(right->payload));
}
};
}
namespace concurrencyex
{
// A message block type that orders incoming messages first by priority,
// and then by the order in which messages are received.
template<class Type,
typename PriorityType = int,
typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>,
concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>>
{
public:
// Constructs a priority_buffer message block.
priority_buffer()
{
initialize_source_and_target();
}
// Constructs a priority_buffer message block with the given filter function.
priority_buffer(filter_method const& filter)
{
initialize_source_and_target();
register_filter(filter);
}
// Constructs a priority_buffer message block that uses the provided
// Scheduler object to propagate messages.
priority_buffer(concurrency::Scheduler& scheduler)
{
initialize_source_and_target(&scheduler);
}
// Constructs a priority_buffer message block with the given filter function
// and uses the provided Scheduler object to propagate messages.
priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter)
{
initialize_source_and_target(&scheduler);
register_filter(filter);
}
// Constructs a priority_buffer message block that uses the provided
// SchedulerGroup object to propagate messages.
priority_buffer(concurrency::ScheduleGroup& schedule_group)
{
initialize_source_and_target(NULL, &schedule_group);
}
// Constructs a priority_buffer message block with the given filter function
// and uses the provided SchedulerGroup object to propagate messages.
priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter)
{
initialize_source_and_target(NULL, &schedule_group);
register_filter(filter);
}
// Destroys the message block.
~priority_buffer()
{
// Remove all links.
remove_network_links();
}
// Sends an item to the message block.
bool enqueue(Type const& item)
{
return concurrency::asend<Type>(this, item);
}
// Receives an item from the message block.
Type dequeue()
{
return receive<Type>(this);
}
protected:
// Asynchronously passes a message from an ISource block to this block.
// This method is typically called by propagator_block::propagate.
virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message,
concurrency::ISource<_Source_type>* source)
{
// Accept the message from the source block.
message = source->accept(message->msg_id(), this);
if (message != NULL)
{
// Insert the message into the input queue. The type parameter Pr
// defines how to order messages by priority.
{
concurrency::critical_section::scoped_lock lock(_input_lock);
_input_messages.push(message);
}
// Asynchronously send the message to the target blocks.
async_send(NULL);
return accepted;
}
else
{
return missed;
}
}
// Synchronously passes a message from an ISource block to this block.
// This method is typically called by propagator_block::send.
virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message,
concurrency::ISource<_Source_type>* source)
{
// Accept the message from the source block.
message = source->accept(message->msg_id(), this);
if (message != NULL)
{
// Insert the message into the input queue. The type parameter Pr
// defines how to order messages by priority.
{
concurrency::critical_section::scoped_lock lock(_input_lock);
_input_messages.push(message);
}
// Synchronously send the message to the target blocks.
sync_send(NULL);
return accepted;
}
else
{
return missed;
}
}
// Accepts a message that was offered by this block by transferring ownership
// to the caller.
virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id)
{
concurrency::message<_Target_type>* message = NULL;
// Transfer ownership if the provided message identifier matches
// the identifier of the front of the output message queue.
if (!_output_messages.empty() &&
_output_messages.front()->msg_id() == msg_id)
{
message = _output_messages.front();
_output_messages.pop();
}
return message;
}
// Reserves a message that was previously offered by this block.
virtual bool reserve_message(concurrency::runtime_object_identity msg_id)
{
// Allow the message to be reserved if the provided message identifier
// is the message identifier of the front of the message queue.
return (!_output_messages.empty() &&
_output_messages.front()->msg_id() == msg_id);
}
// Transfers the message that was previously offered by this block
// to the caller. The caller of this method is the target block that
// reserved the message.
virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id)
{
// Transfer ownership of the message to the caller.
return accept_message(msg_id);
}
// Releases a previous message reservation.
virtual void release_message(concurrency::runtime_object_identity msg_id)
{
// The head message must be the one that is reserved.
if (_output_messages.empty() ||
_output_messages.front()->msg_id() != msg_id)
{
throw message_not_found();
}
}
// Resumes propagation after a reservation has been released.
virtual void resume_propagation()
{
// Propagate out any messages in the output queue.
if (_output_messages.size() > 0)
{
async_send(NULL);
}
}
// Notifies this block that a new target has been linked to it.
virtual void link_target_notification(concurrency::ITarget<_Target_type>*)
{
// Do not propagate messages if a target block reserves
// the message at the front of the queue.
if (_M_pReservedFor != NULL)
{
return;
}
// Propagate out any messages that are in the output queue.
propagate_priority_order();
}
// Transfers the message at the front of the input queue to the output queue
// and propagates out all messages in the output queue.
virtual void propagate_to_any_targets(concurrency::message<_Target_type>*)
{
// Retrieve the message from the front of the input queue.
concurrency::message<_Source_type>* input_message = NULL;
{
concurrency::critical_section::scoped_lock lock(_input_lock);
if (_input_messages.size() > 0)
{
input_message = _input_messages.top();
_input_messages.pop();
}
}
// Move the message to the output queue.
if (input_message != NULL)
{
// The payload of the output message does not contain the
// priority of the message.
concurrency::message<_Target_type>* output_message =
new concurrency::message<_Target_type>(get<1>(input_message->payload));
_output_messages.push(output_message);
// Free the memory for the input message.
delete input_message;
// Do not propagate messages if the new message is not the head message.
// In this case, the head message is reserved by another message block.
if (_output_messages.front()->msg_id() != output_message->msg_id())
{
return;
}
}
// Propagate out the output messages.
propagate_priority_order();
}
private:
// Propagates messages in priority order.
void propagate_priority_order()
{
// Cancel propagation if another block reserves the head message.
if (_M_pReservedFor != NULL)
{
return;
}
// Propagate out all output messages.
// Because this block preserves message ordering, stop propagation
// if any of the messages are not accepted by a target block.
while (!_output_messages.empty())
{
// Get the next message.
concurrency::message<_Target_type> * message = _output_messages.front();
concurrency::message_status status = declined;
// Traverse each target in the order in which they are connected.
for (target_iterator iter = _M_connectedTargets.begin();
*iter != NULL;
++iter)
{
// Propagate the message to the target.
concurrency::ITarget<_Target_type>* target = *iter;
status = target->propagate(message, this);
// If the target accepts the message then ownership of message has
// changed. Do not propagate this message to any other target.
if (status == accepted)
{
break;
}
// If the target only reserved this message, we must wait until the
// target accepts the message.
if (_M_pReservedFor != NULL)
{
break;
}
}
// If status is anything other than accepted, then the head message
// was not propagated out. To preserve the order in which output
// messages are propagated, we must stop propagation until the head
// message is accepted.
if (status != accepted)
{
break;
}
}
}
private:
// Stores incoming messages.
// The type parameter Pr specifies how to order messages by priority.
std::priority_queue<
concurrency::message<_Source_type>*,
std::vector<concurrency::message<_Source_type>*>,
Pr
> _input_messages;
// Synchronizes access to the input message queue.
concurrency::critical_section _input_lock;
// Stores outgoing messages.
std::queue<concurrency::message<_Target_type>*> _output_messages;
private:
// Hide assignment operator and copy constructor.
priority_buffer const &operator =(priority_buffer const&);
priority_buffer(priority_buffer const &);
};
}
L’exemple suivant effectue simultanément un certain nombre d’opérations asend
concurrency ::receive sur un priority_buffer
objet.
// priority_buffer.cpp
// compile with: /EHsc
#include <ppl.h>
#include <iostream>
#include "priority_buffer.h"
using namespace concurrency;
using namespace concurrencyex;
using namespace std;
int wmain()
{
// Concurrently perform a number of asend and receive operations
// on a priority_buffer object.
priority_buffer<int> pb;
parallel_invoke(
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(2, 36)); },
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(0, 12)); },
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(1, 24)); },
[&pb] {
for (int i = 0; i < 75; ++i) {
wcout << receive(pb) << L' ';
if ((i+1) % 25 == 0)
wcout << endl;
}
}
);
}
Cet exemple génère l’exemple de sortie suivant.
36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36
24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24
12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12
La priority_buffer
classe commande d’abord les messages par priorité, puis par l’ordre dans lequel il reçoit des messages. Dans cet exemple, les messages avec une priorité numérique supérieure sont insérés vers le devant de la file d’attente.
[Haut]
Compilation du code
Copiez l’exemple de code et collez-le dans un projet Visual Studio, ou collez la définition de la priority_buffer
classe dans un fichier nommé priority_buffer.h
et le programme de test dans un fichier nommé priority_buffer.cpp
, puis exécutez la commande suivante dans une fenêtre d’invite de commandes Visual Studio.
cl.exe /EHsc priority_buffer.cpp
Voir aussi
Procédures pas à pas relatives au runtime d’accès concurrentiel
Blocs de messages asynchrones
Fonctions de passage de messages