逐步解說:建立自訂訊息區
本文件說明如何建立自定義消息區塊類型,以依優先順序排序傳入訊息。
雖然內建的訊息區塊類型提供廣泛的功能,但您可以建立自己的訊息區塊類型並加以自定義,以符合應用程式的需求。 如需異步代理程序連結庫所提供之內建消息塊類型的描述,請參閱 異步消息塊。
必要條件
開始本逐步解說之前,請先閱讀下列檔:
區段
本逐步解說包含下列各節:
設計自訂消息塊
消息塊會參與傳送和接收訊息的動作。 傳送訊息的訊息區塊稱為 來源區塊。 接收訊息的訊息區塊稱為 目標區塊。 傳送和接收訊息的訊息區塊稱為 傳播器區塊。 Agents Library 會使用抽象類 concurrency::ISource 來代表來源區塊,而抽象類 並行::ITarget 則代表目標區塊。 做為來源的訊息區塊類型衍生自 ISource
;做為目標的訊息區塊類型衍生自 ITarget
。
雖然您可以直接從 ISource
和 ITarget
衍生訊息區塊類型,但代理程序連結庫會定義三個基類,這些基類會執行所有訊息區塊類型通用的大部分功能,例如,以並行安全的方式處理錯誤,並將訊息區塊連接在一起。 concurrency ::source_block 類別衍生自 ISource
,並將訊息傳送至其他區塊。 concurrency ::target_block 類別衍生自 ITarget
,並從其他區塊接收訊息。 concurrency::p ropagator_block 類別衍生自 ISource
和 ITarget
,並將訊息傳送至其他區塊,並接收來自其他區塊的訊息。 我們建議您使用這三個基類來處理基礎結構詳細數據,以便專注於訊息區塊的行為。
source_block
、 target_block
和 propagator_block
類別是範本,這些範本會參數化於管理來源和目標區塊之間的連線或連結的類型上,以及管理訊息處理方式的類型。 代理程式連結庫會定義兩種執行連結管理的類型: 並行::single_link_registry 和 並行::multi_link_registry。 類別 single_link_registry
可讓訊息區塊鏈接至一個來源或一個目標。 類別 multi_link_registry
可讓訊息區塊鏈接至多個來源或多個目標。 Agents Library 會定義一個類別來執行訊息管理、 並行::ordered_message_processor。 類別 ordered_message_processor
可讓訊息區塊按照接收訊息的順序來處理訊息。
若要進一步瞭解訊息區塊與其來源和目標的關係,請考慮下列範例。 此範例顯示並行::transformer 類別的宣告。
template<
class _Input,
class _Output
>
class transformer : public propagator_block<
single_link_registry<ITarget<_Output>>,
multi_link_registry<ISource<_Input>>
>;
類別 transformer
衍生自 propagator_block
,因此可作為來源區塊和目標區塊。 它會接受 類型 _Input
為的訊息,並傳送 類型 _Output
為的訊息。 類別 transformer
會 single_link_registry
指定為任何目標區塊的連結管理員,以及 multi_link_registry
作為任何來源區塊的連結管理員。 因此, transformer
物件最多可以有一個目標和無限數目的來源。
衍生自 source_block
的類別必須實作六種方法:propagate_to_any_targets、accept_message、reserve_message、consume_message、release_message和resume_propagation。 衍生自 target_block
的 類別必須實作 propagate_message 方法,而且可以選擇性地實 作 send_message 方法。 衍生自 propagator_block
的功能相當於從和target_block
衍生。source_block
運行 propagate_to_any_targets
時間會呼叫 方法,以異步或同步方式處理任何傳入訊息,並傳播任何傳出訊息。 目標 accept_message
區塊會呼叫 方法以接受訊息。 許多訊息區塊類型,例如 unbounded_buffer
,只會將訊息傳送至接收它的第一個目標。 因此,它會將訊息的擁有權轉移至目標。 其他訊息區塊類型,例如 並行::overwrite_buffer,會提供訊息給其每個目標區塊。 因此, overwrite_buffer
為每個目標建立訊息的複本。
reserve_message
、 consume_message
release_message
和 resume_propagation
方法可讓訊息區塊參與訊息保留。 目標區塊會在提供訊息時呼叫 reserve_message
方法,而且必須保留訊息以供稍後使用。 在目標區塊保留訊息之後,它可以呼叫 consume_message
方法來取用該訊息或 release_message
取消保留的方法。 如同 accept_message
方法,的 consume_message
實作可以傳輸訊息的擁有權,或傳回訊息的複本。 在目標區塊取用或釋放保留訊息之後,運行時間會呼叫 resume_propagation
方法。 一般而言,此方法會繼續傳播訊息,從佇列中的下一個訊息開始。
運行時間會呼叫 propagate_message
方法,以異步方式將訊息從另一個區塊傳送至目前的區塊。 方法 send_message
類似 propagate_message
,不同之處在於它會以同步方式,而不是以異步方式將訊息傳送至目標區塊。 的預設實作 send_message
會拒絕所有傳入訊息。 如果訊息未傳遞與目標區塊相關聯的選擇性篩選函式,則運行時間不會呼叫上述任一方法。 如需訊息篩選的詳細資訊,請參閱 異步消息塊。
[靠上]
定義 priority_buffer 類別
類別 priority_buffer
是自定義訊息區塊類型,會先依優先順序排序傳入訊息,然後依接收訊息的順序排序。 類別 priority_buffer
類似於 concurrency::unbounded_buffer 類別,因為它保存訊息佇列,也因為它同時作為來源和目標消息塊,而且可以同時擁有多個來源和多個目標。 不過, unbounded_buffer
只會根據從其來源接收訊息的順序,以訊息傳播為基礎。
類別 priority_buffer
會接收 std::tuple 類型的訊息,其中包含 PriorityType
和 Type
元素。 PriorityType
是指保留每個訊息優先順序的類型; Type
是指訊息的數據部分。 類別 priority_buffer
會傳送 類型 Type
為的訊息。 類別 priority_buffer
也會管理兩個 消息佇列:傳入訊息的 std::p riority_queue 物件,以及傳出訊息的 std::queue 物件。 當物件同時接收多個訊息,或在取用者讀取任何訊息之前收到多個訊息時,依優先順序排序訊息會很有用 priority_buffer
。
除了衍生自 propagator_block
的類別必須實作的七種方法之外,類別 priority_buffer
也會覆寫 link_target_notification
和 send_message
方法。 類別 priority_buffer
也會定義兩個公用協助程式方法, enqueue
以及 dequeue
和私用協助程式方法 propagate_priority_order
。
下列程式描述如何實作 priority_buffer
類別。
定義priority_buffer類別
建立C++標頭檔,並將它命名為
priority_buffer.h
。 或者,您可以使用屬於專案一部分的現有頭檔。在 中
priority_buffer.h
,新增下列程序代碼。#pragma once #include <agents.h> #include <queue>
在 命名空間中
std
,定義 std::less 和 std::greater 在並行::message 物件上作用的特製化。namespace std { // A specialization of less that tests whether the priority element of a // message is less than the priority element of another message. template<class Type, class PriorityType> struct less<concurrency::message<tuple<PriorityType,Type>>*> { typedef concurrency::message<tuple<PriorityType, Type>> MessageType; bool operator()(const MessageType* left, const MessageType* right) const { // apply operator< to the first element (the priority) // of the tuple payload. return (get<0>(left->payload) < get<0>(right->payload)); } }; // A specialization of less that tests whether the priority element of a // message is greater than the priority element of another message. template<class Type, class PriorityType> struct greater<concurrency::message<tuple<PriorityType, Type>>*> { typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType; bool operator()(const MessageType* left, const MessageType* right) const { // apply operator> to the first element (the priority) // of the tuple payload. return (get<0>(left->payload) > get<0>(right->payload)); } }; }
類別
priority_buffer
會將message
物件儲存在物件中priority_queue
。 這些類型特製化可讓優先順序佇列根據其優先順序來排序訊息。 優先順序是物件的第一個專案tuple
。在命名空間中
concurrencyex
,宣告 類別priority_buffer
。namespace concurrencyex { template<class Type, typename PriorityType = int, typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>> class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>, concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>> { public: protected: private: }; }
priority_buffer
類別衍生自propagator_block
。 因此,它可以同時傳送和接收訊息。 類別priority_buffer
可以有多個目標可接收 類型Type
為的訊息。 它也可以有多個來源可傳送 類型tuple<PriorityType, Type>
為的訊息。在 類別的
priority_buffer
區private
段中,新增下列成員變數。// Stores incoming messages. // The type parameter Pr specifies how to order messages by priority. std::priority_queue< concurrency::message<_Source_type>*, std::vector<concurrency::message<_Source_type>*>, Pr > _input_messages; // Synchronizes access to the input message queue. concurrency::critical_section _input_lock; // Stores outgoing messages. std::queue<concurrency::message<_Target_type>*> _output_messages;
物件
priority_queue
會保存傳入訊息;queue
物件會保存傳出訊息。priority_buffer
物件可以同時接收多個訊息;critical_section
物件會同步處理輸入訊息佇列的存取。在區
private
段中,定義複製建構函式和指派運算符。 這可防止priority_queue
物件被指派。// Hide assignment operator and copy constructor. priority_buffer const &operator =(priority_buffer const&); priority_buffer(priority_buffer const &);
在區
public
段中,定義許多訊息區塊類型通用的建構函式。 同時定義解構函式。// Constructs a priority_buffer message block. priority_buffer() { initialize_source_and_target(); } // Constructs a priority_buffer message block with the given filter function. priority_buffer(filter_method const& filter) { initialize_source_and_target(); register_filter(filter); } // Constructs a priority_buffer message block that uses the provided // Scheduler object to propagate messages. priority_buffer(concurrency::Scheduler& scheduler) { initialize_source_and_target(&scheduler); } // Constructs a priority_buffer message block with the given filter function // and uses the provided Scheduler object to propagate messages. priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter) { initialize_source_and_target(&scheduler); register_filter(filter); } // Constructs a priority_buffer message block that uses the provided // SchedulerGroup object to propagate messages. priority_buffer(concurrency::ScheduleGroup& schedule_group) { initialize_source_and_target(NULL, &schedule_group); } // Constructs a priority_buffer message block with the given filter function // and uses the provided SchedulerGroup object to propagate messages. priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter) { initialize_source_and_target(NULL, &schedule_group); register_filter(filter); } // Destroys the message block. ~priority_buffer() { // Remove all links. remove_network_links(); }
在區
public
段中,定義 方法和enqueue
dequeue
。 這些協助程式方法提供將訊息傳送至物件及接收訊息的priority_buffer
替代方式。// Sends an item to the message block. bool enqueue(Type const& item) { return concurrency::asend<Type>(this, item); } // Receives an item from the message block. Type dequeue() { return receive<Type>(this); }
在區
protected
段中,定義propagate_to_any_targets
方法。// Transfers the message at the front of the input queue to the output queue // and propagates out all messages in the output queue. virtual void propagate_to_any_targets(concurrency::message<_Target_type>*) { // Retrieve the message from the front of the input queue. concurrency::message<_Source_type>* input_message = NULL; { concurrency::critical_section::scoped_lock lock(_input_lock); if (_input_messages.size() > 0) { input_message = _input_messages.top(); _input_messages.pop(); } } // Move the message to the output queue. if (input_message != NULL) { // The payload of the output message does not contain the // priority of the message. concurrency::message<_Target_type>* output_message = new concurrency::message<_Target_type>(get<1>(input_message->payload)); _output_messages.push(output_message); // Free the memory for the input message. delete input_message; // Do not propagate messages if the new message is not the head message. // In this case, the head message is reserved by another message block. if (_output_messages.front()->msg_id() != output_message->msg_id()) { return; } } // Propagate out the output messages. propagate_priority_order(); }
方法會將
propagate_to_any_targets
位於輸入佇列前端的訊息傳送至輸出佇列,並傳播輸出佇列中的所有訊息。在區
protected
段中,定義accept_message
方法。// Accepts a message that was offered by this block by transferring ownership // to the caller. virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id) { concurrency::message<_Target_type>* message = NULL; // Transfer ownership if the provided message identifier matches // the identifier of the front of the output message queue. if (!_output_messages.empty() && _output_messages.front()->msg_id() == msg_id) { message = _output_messages.front(); _output_messages.pop(); } return message; }
當目標區塊呼叫
accept_message
方法時,priority_buffer
類別會將訊息的擁有權傳送至接受訊息的第一個目標區塊。 (這類似於 的行為unbounded_buffer
。在區
protected
段中,定義reserve_message
方法。// Reserves a message that was previously offered by this block. virtual bool reserve_message(concurrency::runtime_object_identity msg_id) { // Allow the message to be reserved if the provided message identifier // is the message identifier of the front of the message queue. return (!_output_messages.empty() && _output_messages.front()->msg_id() == msg_id); }
類別
priority_buffer
允許目標區塊在提供的訊息標識碼符合佇列前端的訊息識別碼時保留訊息。 換句話說,如果priority_buffer
物件尚未收到其他訊息,且尚未傳播出目前訊息,目標可以保留訊息。在區
protected
段中,定義consume_message
方法。// Transfers the message that was previously offered by this block // to the caller. The caller of this method is the target block that // reserved the message. virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id) { // Transfer ownership of the message to the caller. return accept_message(msg_id); }
目標區塊會呼叫
consume_message
,以傳輸其保留之訊息的擁有權。在區
protected
段中,定義release_message
方法。// Releases a previous message reservation. virtual void release_message(concurrency::runtime_object_identity msg_id) { // The head message must be the one that is reserved. if (_output_messages.empty() || _output_messages.front()->msg_id() != msg_id) { throw message_not_found(); } }
目標區塊會呼叫
release_message
,以取消對訊息的保留。在區
protected
段中,定義resume_propagation
方法。// Resumes propagation after a reservation has been released. virtual void resume_propagation() { // Propagate out any messages in the output queue. if (_output_messages.size() > 0) { async_send(NULL); } }
運行時間會在目標區塊取用或釋放保留訊息之後呼叫
resume_propagation
。 這個方法會傳播輸出佇列中的任何訊息。在區
protected
段中,定義link_target_notification
方法。// Notifies this block that a new target has been linked to it. virtual void link_target_notification(concurrency::ITarget<_Target_type>*) { // Do not propagate messages if a target block reserves // the message at the front of the queue. if (_M_pReservedFor != NULL) { return; } // Propagate out any messages that are in the output queue. propagate_priority_order(); }
成員
_M_pReservedFor
變數是由基類source_block
所定義。 這個成員變數指向目標區塊,如果有的話,該區塊會保留輸出佇列前端的訊息。 當新的目標連結至priority_buffer
物件時,執行時間會呼叫link_target_notification
。 如果沒有任何目標保留,這個方法會傳播輸出佇列中的任何訊息。在區
private
段中,定義propagate_priority_order
方法。// Propagates messages in priority order. void propagate_priority_order() { // Cancel propagation if another block reserves the head message. if (_M_pReservedFor != NULL) { return; } // Propagate out all output messages. // Because this block preserves message ordering, stop propagation // if any of the messages are not accepted by a target block. while (!_output_messages.empty()) { // Get the next message. concurrency::message<_Target_type> * message = _output_messages.front(); concurrency::message_status status = declined; // Traverse each target in the order in which they are connected. for (target_iterator iter = _M_connectedTargets.begin(); *iter != NULL; ++iter) { // Propagate the message to the target. concurrency::ITarget<_Target_type>* target = *iter; status = target->propagate(message, this); // If the target accepts the message then ownership of message has // changed. Do not propagate this message to any other target. if (status == accepted) { break; } // If the target only reserved this message, we must wait until the // target accepts the message. if (_M_pReservedFor != NULL) { break; } } // If status is anything other than accepted, then the head message // was not propagated out. To preserve the order in which output // messages are propagated, we must stop propagation until the head // message is accepted. if (status != accepted) { break; } } }
此方法會從輸出佇列傳播所有訊息。 佇列中的每個訊息都會提供給每個目標區塊,直到其中一個目標區塊接受訊息為止。 類別
priority_buffer
會保留傳出訊息的順序。 因此,在此方法將任何其他訊息提供給目標區塊之前,輸出佇列中的第一個訊息必須由目標區塊接受。在區
protected
段中,定義propagate_message
方法。// Asynchronously passes a message from an ISource block to this block. // This method is typically called by propagator_block::propagate. virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message, concurrency::ISource<_Source_type>* source) { // Accept the message from the source block. message = source->accept(message->msg_id(), this); if (message != NULL) { // Insert the message into the input queue. The type parameter Pr // defines how to order messages by priority. { concurrency::critical_section::scoped_lock lock(_input_lock); _input_messages.push(message); } // Asynchronously send the message to the target blocks. async_send(NULL); return accepted; } else { return missed; } }
propagate_message
方法可讓priority_buffer
類別做為訊息接收者或目標。 此方法會接收所提供來源區塊所提供的訊息,並將該訊息插入優先順序佇列中。 方法propagate_message
接著會以異步方式將所有輸出訊息傳送至目標區塊。當您呼叫 concurrency::asend 函式或訊息區塊連接到其他消息區塊時,運行時間會呼叫這個方法。
在區
protected
段中,定義send_message
方法。// Synchronously passes a message from an ISource block to this block. // This method is typically called by propagator_block::send. virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message, concurrency::ISource<_Source_type>* source) { // Accept the message from the source block. message = source->accept(message->msg_id(), this); if (message != NULL) { // Insert the message into the input queue. The type parameter Pr // defines how to order messages by priority. { concurrency::critical_section::scoped_lock lock(_input_lock); _input_messages.push(message); } // Synchronously send the message to the target blocks. sync_send(NULL); return accepted; } else { return missed; } }
方法
send_message
類似propagate_message
。 不過,它會以同步方式傳送輸出訊息,而不是以異步方式傳送。運行時間會在同步傳送作業期間呼叫這個方法,例如當您呼叫 concurrency::send 函式時。
類別 priority_buffer
包含許多訊息區塊類型中典型的建構函式多載。 某些建構函式多載會採用 並行::排程器 或 並行::ScheduleGroup 物件,這可讓訊息區塊由特定工作排程器管理。 其他建構函式多載會採用篩選函式。 篩選函式可讓訊息區塊根據其承載接受或拒絕訊息。 如需訊息篩選的詳細資訊,請參閱 異步消息塊。 如需工作排程器的詳細資訊,請參閱 工作排程器。
priority_buffer
因為類別會依優先順序排序訊息,然後依接收訊息的順序排序,因此當這個類別以異步方式接收訊息時最有用,例如,當您呼叫並行函式::asend 函式或訊息區塊連線到其他消息區塊時。
[靠上]
完整範例
下列範例顯示 類別的完整定義 priority_buffer
。
// priority_buffer.h
#pragma once
#include <agents.h>
#include <queue>
namespace std
{
// A specialization of less that tests whether the priority element of a
// message is less than the priority element of another message.
template<class Type, class PriorityType>
struct less<concurrency::message<tuple<PriorityType,Type>>*>
{
typedef concurrency::message<tuple<PriorityType, Type>> MessageType;
bool operator()(const MessageType* left, const MessageType* right) const
{
// apply operator< to the first element (the priority)
// of the tuple payload.
return (get<0>(left->payload) < get<0>(right->payload));
}
};
// A specialization of less that tests whether the priority element of a
// message is greater than the priority element of another message.
template<class Type, class PriorityType>
struct greater<concurrency::message<tuple<PriorityType, Type>>*>
{
typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType;
bool operator()(const MessageType* left, const MessageType* right) const
{
// apply operator> to the first element (the priority)
// of the tuple payload.
return (get<0>(left->payload) > get<0>(right->payload));
}
};
}
namespace concurrencyex
{
// A message block type that orders incoming messages first by priority,
// and then by the order in which messages are received.
template<class Type,
typename PriorityType = int,
typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>,
concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>>
{
public:
// Constructs a priority_buffer message block.
priority_buffer()
{
initialize_source_and_target();
}
// Constructs a priority_buffer message block with the given filter function.
priority_buffer(filter_method const& filter)
{
initialize_source_and_target();
register_filter(filter);
}
// Constructs a priority_buffer message block that uses the provided
// Scheduler object to propagate messages.
priority_buffer(concurrency::Scheduler& scheduler)
{
initialize_source_and_target(&scheduler);
}
// Constructs a priority_buffer message block with the given filter function
// and uses the provided Scheduler object to propagate messages.
priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter)
{
initialize_source_and_target(&scheduler);
register_filter(filter);
}
// Constructs a priority_buffer message block that uses the provided
// SchedulerGroup object to propagate messages.
priority_buffer(concurrency::ScheduleGroup& schedule_group)
{
initialize_source_and_target(NULL, &schedule_group);
}
// Constructs a priority_buffer message block with the given filter function
// and uses the provided SchedulerGroup object to propagate messages.
priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter)
{
initialize_source_and_target(NULL, &schedule_group);
register_filter(filter);
}
// Destroys the message block.
~priority_buffer()
{
// Remove all links.
remove_network_links();
}
// Sends an item to the message block.
bool enqueue(Type const& item)
{
return concurrency::asend<Type>(this, item);
}
// Receives an item from the message block.
Type dequeue()
{
return receive<Type>(this);
}
protected:
// Asynchronously passes a message from an ISource block to this block.
// This method is typically called by propagator_block::propagate.
virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message,
concurrency::ISource<_Source_type>* source)
{
// Accept the message from the source block.
message = source->accept(message->msg_id(), this);
if (message != NULL)
{
// Insert the message into the input queue. The type parameter Pr
// defines how to order messages by priority.
{
concurrency::critical_section::scoped_lock lock(_input_lock);
_input_messages.push(message);
}
// Asynchronously send the message to the target blocks.
async_send(NULL);
return accepted;
}
else
{
return missed;
}
}
// Synchronously passes a message from an ISource block to this block.
// This method is typically called by propagator_block::send.
virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message,
concurrency::ISource<_Source_type>* source)
{
// Accept the message from the source block.
message = source->accept(message->msg_id(), this);
if (message != NULL)
{
// Insert the message into the input queue. The type parameter Pr
// defines how to order messages by priority.
{
concurrency::critical_section::scoped_lock lock(_input_lock);
_input_messages.push(message);
}
// Synchronously send the message to the target blocks.
sync_send(NULL);
return accepted;
}
else
{
return missed;
}
}
// Accepts a message that was offered by this block by transferring ownership
// to the caller.
virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id)
{
concurrency::message<_Target_type>* message = NULL;
// Transfer ownership if the provided message identifier matches
// the identifier of the front of the output message queue.
if (!_output_messages.empty() &&
_output_messages.front()->msg_id() == msg_id)
{
message = _output_messages.front();
_output_messages.pop();
}
return message;
}
// Reserves a message that was previously offered by this block.
virtual bool reserve_message(concurrency::runtime_object_identity msg_id)
{
// Allow the message to be reserved if the provided message identifier
// is the message identifier of the front of the message queue.
return (!_output_messages.empty() &&
_output_messages.front()->msg_id() == msg_id);
}
// Transfers the message that was previously offered by this block
// to the caller. The caller of this method is the target block that
// reserved the message.
virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id)
{
// Transfer ownership of the message to the caller.
return accept_message(msg_id);
}
// Releases a previous message reservation.
virtual void release_message(concurrency::runtime_object_identity msg_id)
{
// The head message must be the one that is reserved.
if (_output_messages.empty() ||
_output_messages.front()->msg_id() != msg_id)
{
throw message_not_found();
}
}
// Resumes propagation after a reservation has been released.
virtual void resume_propagation()
{
// Propagate out any messages in the output queue.
if (_output_messages.size() > 0)
{
async_send(NULL);
}
}
// Notifies this block that a new target has been linked to it.
virtual void link_target_notification(concurrency::ITarget<_Target_type>*)
{
// Do not propagate messages if a target block reserves
// the message at the front of the queue.
if (_M_pReservedFor != NULL)
{
return;
}
// Propagate out any messages that are in the output queue.
propagate_priority_order();
}
// Transfers the message at the front of the input queue to the output queue
// and propagates out all messages in the output queue.
virtual void propagate_to_any_targets(concurrency::message<_Target_type>*)
{
// Retrieve the message from the front of the input queue.
concurrency::message<_Source_type>* input_message = NULL;
{
concurrency::critical_section::scoped_lock lock(_input_lock);
if (_input_messages.size() > 0)
{
input_message = _input_messages.top();
_input_messages.pop();
}
}
// Move the message to the output queue.
if (input_message != NULL)
{
// The payload of the output message does not contain the
// priority of the message.
concurrency::message<_Target_type>* output_message =
new concurrency::message<_Target_type>(get<1>(input_message->payload));
_output_messages.push(output_message);
// Free the memory for the input message.
delete input_message;
// Do not propagate messages if the new message is not the head message.
// In this case, the head message is reserved by another message block.
if (_output_messages.front()->msg_id() != output_message->msg_id())
{
return;
}
}
// Propagate out the output messages.
propagate_priority_order();
}
private:
// Propagates messages in priority order.
void propagate_priority_order()
{
// Cancel propagation if another block reserves the head message.
if (_M_pReservedFor != NULL)
{
return;
}
// Propagate out all output messages.
// Because this block preserves message ordering, stop propagation
// if any of the messages are not accepted by a target block.
while (!_output_messages.empty())
{
// Get the next message.
concurrency::message<_Target_type> * message = _output_messages.front();
concurrency::message_status status = declined;
// Traverse each target in the order in which they are connected.
for (target_iterator iter = _M_connectedTargets.begin();
*iter != NULL;
++iter)
{
// Propagate the message to the target.
concurrency::ITarget<_Target_type>* target = *iter;
status = target->propagate(message, this);
// If the target accepts the message then ownership of message has
// changed. Do not propagate this message to any other target.
if (status == accepted)
{
break;
}
// If the target only reserved this message, we must wait until the
// target accepts the message.
if (_M_pReservedFor != NULL)
{
break;
}
}
// If status is anything other than accepted, then the head message
// was not propagated out. To preserve the order in which output
// messages are propagated, we must stop propagation until the head
// message is accepted.
if (status != accepted)
{
break;
}
}
}
private:
// Stores incoming messages.
// The type parameter Pr specifies how to order messages by priority.
std::priority_queue<
concurrency::message<_Source_type>*,
std::vector<concurrency::message<_Source_type>*>,
Pr
> _input_messages;
// Synchronizes access to the input message queue.
concurrency::critical_section _input_lock;
// Stores outgoing messages.
std::queue<concurrency::message<_Target_type>*> _output_messages;
private:
// Hide assignment operator and copy constructor.
priority_buffer const &operator =(priority_buffer const&);
priority_buffer(priority_buffer const &);
};
}
下列範例會同時對 對象執行一些 asend
和 并行::receive 作業 priority_buffer
。
// priority_buffer.cpp
// compile with: /EHsc
#include <ppl.h>
#include <iostream>
#include "priority_buffer.h"
using namespace concurrency;
using namespace concurrencyex;
using namespace std;
int wmain()
{
// Concurrently perform a number of asend and receive operations
// on a priority_buffer object.
priority_buffer<int> pb;
parallel_invoke(
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(2, 36)); },
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(0, 12)); },
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(1, 24)); },
[&pb] {
for (int i = 0; i < 75; ++i) {
wcout << receive(pb) << L' ';
if ((i+1) % 25 == 0)
wcout << endl;
}
}
);
}
此範例會產生下列範例輸出。
36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36
24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24
12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12
類別 priority_buffer
會先依優先順序排序訊息,然後依接收訊息的順序排序訊息。 在此範例中,具有較大數值優先順序的訊息會插入佇列前端。
[靠上]
編譯程式碼
複製範例程式代碼,並將其貼到 Visual Studio 專案中,或將 類別的定義 priority_buffer
貼到名為 的檔案中,並將測試程式貼到名為 priority_buffer.h
priority_buffer.cpp
的檔案中,然後在 Visual Studio 命令提示字元視窗中執行下列命令。
cl.exe /EHsc priority_buffer.cpp