İzlenecek Yol: Özel bir İleti Bloğu Oluşturma
Bu belgede, gelen iletileri önceliğe göre sıralayan özel bir ileti bloğu türünün nasıl oluşturulacağı açıklanır.
Yerleşik ileti bloğu türleri çok çeşitli işlevler sağlasa da, kendi ileti bloğu türünüzü oluşturabilir ve uygulamanızın gereksinimlerini karşılayacak şekilde özelleştirebilirsiniz. Zaman Uyumsuz Aracılar Kitaplığı tarafından sağlanan yerleşik ileti bloğu türlerinin açıklaması için bkz . Zaman Uyumsuz İleti Blokları.
Önkoşullar
Bu izlenecek yolu başlatmadan önce aşağıdaki belgeleri okuyun:
Bölümler
Bu izlenecek yol aşağıdaki bölümleri içerir:
Özel İleti Bloğu Tasarlama
İleti blokları, ileti gönderme ve alma eylemine katılır. İleti gönderen bir ileti bloğu, kaynak blok olarak bilinir. İletileri alan bir ileti bloğu hedef blok olarak bilinir. İletileri hem gönderen hem de alan bir ileti bloğu, bir yayıcı bloğu olarak bilinir. Aracılar Kitaplığı, kaynak blokları temsil etmek için soyut sınıf eşzamanlılığı::ISource ve hedef blokları temsil etmek için soyut sınıf eşzamanlılığı::ITarget kullanır. Kaynak olarak davranan ileti bloğu türleri ; ISource
hedef olarak davranan ileti bloğu türleri adresinden ITarget
türetilir.
İleti bloğu türünüzü doğrudan ve ITarget
öğesinden ISource
türetebilirsiniz ancak Aracılar Kitaplığı, tüm ileti bloğu türleri için ortak olan işlevlerin çoğunu gerçekleştiren üç temel sınıf tanımlar; örneğin, hataları işleme ve ileti bloklarını eşzamanlılık açısından güvenli bir şekilde birbirine bağlama. concurrency::source_block sınıfından ISource
türetilir ve diğer bloklara ileti gönderir. concurrency::target_block sınıfı diğer bloklardan ITarget
türetilir ve ileti alır. concurrency::p ropagator_block sınıfı, diğer bloklardan ISource
ITarget
türetilir ve diğer bloklara ileti gönderir ve diğer bloklardan iletiler alır. İleti bloğunuzun davranışına odaklanabilmeniz için altyapı ayrıntılarını işlemek için bu üç temel sınıfı kullanmanızı öneririz.
source_block
, target_block
ve sınıfları, kaynak ve propagator_block
hedef bloklar arasındaki bağlantıları veya bağlantıları yöneten bir türde ve iletilerin nasıl işlendiğini yöneten bir tür üzerinde parametreleştirilmiş şablonlardır. Aracılar Kitaplığı, bağlantı yönetimi gerçekleştiren iki tür tanımlar: eşzamanlılık::single_link_registry ve eşzamanlılık::multi_link_registry. sınıfı, single_link_registry
bir ileti bloğunun bir kaynağa veya bir hedefe bağlanmasına olanak tanır. sınıfı, multi_link_registry
bir ileti bloğunun birden çok kaynağa veya birden çok hedefe bağlanmasına olanak tanır. Aracılar Kitaplığı, ileti yönetimi gerçekleştiren bir sınıf tanımlar: eşzamanlılık::ordered_message_processor. sınıfı, ordered_message_processor
ileti bloklarının iletileri aldığı sırayla işlemesini sağlar.
İleti bloklarının kaynakları ve hedefleri ile ilişkisini daha iyi anlamak için aşağıdaki örneği göz önünde bulundurun. Bu örnekte concurrency::transformer sınıfının bildirimi gösterilir.
template<
class _Input,
class _Output
>
class transformer : public propagator_block<
single_link_registry<ITarget<_Output>>,
multi_link_registry<ISource<_Input>>
>;
transformer
sınıfı öğesinden propagator_block
türetilir ve bu nedenle hem kaynak blok hem de hedef blok olarak davranır. türünde _Input
iletileri kabul eder ve türünde _Output
iletiler gönderir. transformer
sınıfı, herhangi bir hedef blok için bağlantı yöneticisi ve multi_link_registry
herhangi bir kaynak blok için bağlantı yöneticisi olarak belirtirsingle_link_registry
. Bu nedenle, bir transformer
nesnenin en fazla bir hedefi ve sınırsız sayıda kaynağı olabilir.
öğesinden source_block
türetilen bir sınıf altı yöntem uygulamalıdır: propagate_to_any_targets, accept_message, reserve_message, consume_message, release_message ve resume_propagation. öğesinden target_block
türetilen bir sınıf propagate_message yöntemini uygulamalıdır ve isteğe bağlı olarak send_message yöntemini uygulayabilir. 'den propagator_block
türetme işlevi, hem target_block
hem de source_block
'den türetmeye eşdeğerdir.
yöntemi propagate_to_any_targets
, çalışma zamanı tarafından zaman uyumsuz veya zaman uyumlu olarak gelen iletileri işlemek ve giden iletileri yaymak için çağrılır. yöntemi accept_message
, iletileri kabul etmek için hedef bloklar tarafından çağrılır. gibi unbounded_buffer
birçok ileti bloğu türü, iletileri yalnızca alacak ilk hedefe gönderir. Bu nedenle, iletinin sahipliğini hedefe aktarır. Concurrency::overwrite_buffer gibi diğer ileti bloğu türleri, hedef bloklarının her birine ileti sunar. Bu nedenle, overwrite_buffer
hedeflerinin her biri için iletinin bir kopyasını oluşturur.
reserve_message
, consume_message
, release_message
ve resume_propagation
yöntemleri, ileti bloklarının ileti ayırmaya katılmasını sağlar. Hedef bloklar, bir ileti sunulduğunda yöntemini çağırır reserve_message
ve iletiyi daha sonra kullanmak üzere ayırmaları gerekir. Hedef blok bir iletiyi ayırdıktan sonra, bu iletiyi kullanmak için yöntemini veya release_message
rezervasyonu iptal etmek için yöntemini çağırabilirconsume_message
. yönteminde accept_message
olduğu gibi, uygulaması consume_message
iletinin sahipliğini aktarabilir veya iletinin bir kopyasını döndürebilir. Hedef blok ayrılmış bir iletiyi tükettiğinde veya serbest bıraktığında çalışma zamanı yöntemini çağırır resume_propagation
. Bu yöntem genellikle kuyruktaki bir sonraki iletiden başlayarak ileti yaymaya devam eder.
Çalışma zamanı, bir iletiyi başka bir bloktan geçerli bir bloka zaman uyumsuz olarak aktarmak için yöntemini çağırır propagate_message
. send_message
yöntemine benzerpropagate_message
, ancak zaman uyumsuz olarak değil zaman uyumlu olarak iletiyi hedef bloklara gönderir. varsayılan uygulaması send_message
tüm gelen iletileri reddeder. İleti hedef blokla ilişkili isteğe bağlı filtre işlevini geçirmezse çalışma zamanı bu yöntemlerden herhangi birini çağırmaz. İleti filtreleri hakkında daha fazla bilgi için bkz . Zaman Uyumsuz İleti Blokları.
[Üst]
priority_buffer Sınıfını Tanımlama
priority_buffer
sınıfı, gelen iletileri önce iletilerin alınma sırasına göre sıralayan özel bir ileti bloğu türüdür. sınıfı priority_buffer
eşzamanlılık::unbounded_buffer sınıfına benzer çünkü bir ileti kuyruğu barındırır ve hem kaynak hem de hedef ileti bloğu işlevi görür ve hem birden çok kaynağa hem de birden çok hedefe sahip olabilir. Bununla birlikte, unbounded_buffer
ileti yayma işlemini yalnızca kaynaklarından ileti alma sırasına göre temel alır.
sınıfı, priority_buffer
ve Type
öğelerini içeren PriorityType
std::tuple türünde iletiler alır. PriorityType
her iletinin önceliğini tutan türü ifade eder; Type
iletinin veri bölümünü ifade eder. priority_buffer
sınıfı türünde Type
iletiler gönderir. sınıfı priority_buffer
iki ileti kuyruğu da yönetir: gelen iletiler için std ::p riority_queue nesnesi ve giden iletiler için std::queue nesnesi. Bir nesne aynı anda birden çok ileti aldığında veya tüketiciler tarafından okunmadan önce birden çok ileti aldığında iletileri önceliğe göre sıralamak yararlı olur priority_buffer
.
türetilen propagator_block
bir sınıfın uygulaması gereken yedi yönteme ek olarak, priority_buffer
sınıfı ve send_message
yöntemlerini de geçersiz kılarlink_target_notification
. priority_buffer
sınıfı ayrıca iki ortak yardımcı yöntemi enqueue
ve ile dequeue
özel yardımcı yöntemi olarak propagate_priority_order
tanımlar.
Aşağıdaki yordamda sınıfın priority_buffer
nasıl uygulandığı açıklanmaktadır.
Priority_buffer sınıfını tanımlamak için
Bir C++ üst bilgi dosyası oluşturun ve adını verin
priority_buffer.h
. Alternatif olarak, projenizin parçası olan mevcut bir üst bilgi dosyasını kullanabilirsiniz.içine
priority_buffer.h
aşağıdaki kodu ekleyin.#pragma once #include <agents.h> #include <queue>
std
Ad alanında, eşzamanlılık::ileti nesneleri üzerinde hareket eden std::less ve std::greater özelleştirmelerini tanımlayın.namespace std { // A specialization of less that tests whether the priority element of a // message is less than the priority element of another message. template<class Type, class PriorityType> struct less<concurrency::message<tuple<PriorityType,Type>>*> { typedef concurrency::message<tuple<PriorityType, Type>> MessageType; bool operator()(const MessageType* left, const MessageType* right) const { // apply operator< to the first element (the priority) // of the tuple payload. return (get<0>(left->payload) < get<0>(right->payload)); } }; // A specialization of less that tests whether the priority element of a // message is greater than the priority element of another message. template<class Type, class PriorityType> struct greater<concurrency::message<tuple<PriorityType, Type>>*> { typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType; bool operator()(const MessageType* left, const MessageType* right) const { // apply operator> to the first element (the priority) // of the tuple payload. return (get<0>(left->payload) > get<0>(right->payload)); } }; }
priority_buffer
sınıfı nesneleri birpriority_queue
nesnede depolarmessage
. Bu tür özelleştirmeleri, öncelik kuyruğunun iletileri önceliklerine göre sıralamasını sağlar. Öncelik, nesnenin ilk öğesidirtuple
.Ad alanında
concurrencyex
sınıfınıpriority_buffer
bildirin.namespace concurrencyex { template<class Type, typename PriorityType = int, typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>> class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>, concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>> { public: protected: private: }; }
priority_buffer
sınıfı öğesindenpropagator_block
türetilir. Bu nedenle, hem ileti gönderebilir hem de alabilir. sınıfı,priority_buffer
türündeType
iletiler alan birden çok hedefe sahip olabilir. Ayrıca, türündetuple<PriorityType, Type>
iletiler gönderen birden çok kaynağı da olabilir.private
sınıfının bölümündepriority_buffer
aşağıdaki üye değişkenlerini ekleyin.// Stores incoming messages. // The type parameter Pr specifies how to order messages by priority. std::priority_queue< concurrency::message<_Source_type>*, std::vector<concurrency::message<_Source_type>*>, Pr > _input_messages; // Synchronizes access to the input message queue. concurrency::critical_section _input_lock; // Stores outgoing messages. std::queue<concurrency::message<_Target_type>*> _output_messages;
Nesne
priority_queue
gelen iletileri, nesne isequeue
giden iletileri barındırıyor. Birpriority_buffer
nesne aynı anda birden çok ileti alabilir;critical_section
nesne giriş iletileri kuyruğuna erişimi eşitler.private
bölümünde kopyalama oluşturucuyu ve atama işlecini tanımlayın. Bu, nesnelerin atanabilir olmasını engellerpriority_queue
.// Hide assignment operator and copy constructor. priority_buffer const &operator =(priority_buffer const&); priority_buffer(priority_buffer const &);
public
bölümünde, birçok ileti bloğu türü için ortak olan oluşturucuları tanımlayın. Yıkıcıyı da tanımlayın.// Constructs a priority_buffer message block. priority_buffer() { initialize_source_and_target(); } // Constructs a priority_buffer message block with the given filter function. priority_buffer(filter_method const& filter) { initialize_source_and_target(); register_filter(filter); } // Constructs a priority_buffer message block that uses the provided // Scheduler object to propagate messages. priority_buffer(concurrency::Scheduler& scheduler) { initialize_source_and_target(&scheduler); } // Constructs a priority_buffer message block with the given filter function // and uses the provided Scheduler object to propagate messages. priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter) { initialize_source_and_target(&scheduler); register_filter(filter); } // Constructs a priority_buffer message block that uses the provided // SchedulerGroup object to propagate messages. priority_buffer(concurrency::ScheduleGroup& schedule_group) { initialize_source_and_target(NULL, &schedule_group); } // Constructs a priority_buffer message block with the given filter function // and uses the provided SchedulerGroup object to propagate messages. priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter) { initialize_source_and_target(NULL, &schedule_group); register_filter(filter); } // Destroys the message block. ~priority_buffer() { // Remove all links. remove_network_links(); }
public
bölümünde vedequeue
yöntemlerinienqueue
tanımlayın. Bu yardımcı yöntemler, bir nesneye ileti göndermek ve nesneden ileti almak için alternatif birpriority_buffer
yol sağlar.// Sends an item to the message block. bool enqueue(Type const& item) { return concurrency::asend<Type>(this, item); } // Receives an item from the message block. Type dequeue() { return receive<Type>(this); }
protected
bölümünde yöntemini tanımlayınpropagate_to_any_targets
.// Transfers the message at the front of the input queue to the output queue // and propagates out all messages in the output queue. virtual void propagate_to_any_targets(concurrency::message<_Target_type>*) { // Retrieve the message from the front of the input queue. concurrency::message<_Source_type>* input_message = NULL; { concurrency::critical_section::scoped_lock lock(_input_lock); if (_input_messages.size() > 0) { input_message = _input_messages.top(); _input_messages.pop(); } } // Move the message to the output queue. if (input_message != NULL) { // The payload of the output message does not contain the // priority of the message. concurrency::message<_Target_type>* output_message = new concurrency::message<_Target_type>(get<1>(input_message->payload)); _output_messages.push(output_message); // Free the memory for the input message. delete input_message; // Do not propagate messages if the new message is not the head message. // In this case, the head message is reserved by another message block. if (_output_messages.front()->msg_id() != output_message->msg_id()) { return; } } // Propagate out the output messages. propagate_priority_order(); }
yöntemi,
propagate_to_any_targets
giriş kuyruğunun önündeki iletiyi çıkış kuyruğuna aktarır ve çıkış kuyruğundaki tüm iletileri yayılır.protected
bölümünde yöntemini tanımlayınaccept_message
.// Accepts a message that was offered by this block by transferring ownership // to the caller. virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id) { concurrency::message<_Target_type>* message = NULL; // Transfer ownership if the provided message identifier matches // the identifier of the front of the output message queue. if (!_output_messages.empty() && _output_messages.front()->msg_id() == msg_id) { message = _output_messages.front(); _output_messages.pop(); } return message; }
Hedef blok yöntemini çağırdığında
accept_message
,priority_buffer
sınıf iletinin sahipliğini bunu kabul eden ilk hedef bloğuna aktarır. (Bu, .) davranışınaunbounded_buffer
benzer.protected
bölümünde yöntemini tanımlayınreserve_message
.// Reserves a message that was previously offered by this block. virtual bool reserve_message(concurrency::runtime_object_identity msg_id) { // Allow the message to be reserved if the provided message identifier // is the message identifier of the front of the message queue. return (!_output_messages.empty() && _output_messages.front()->msg_id() == msg_id); }
sınıfı,
priority_buffer
sağlanan ileti tanımlayıcısı kuyruğun önündeki iletinin tanımlayıcısı ile eşleştiğinde hedef bloğun bir iletiyi ayırmasına izin verir. Başka bir deyişle, nesne henüz ek bir ileti almadıysapriority_buffer
ve geçerli iletiyi henüz yaymadıysa, hedef iletiyi ayırabilir.protected
bölümünde yöntemini tanımlayınconsume_message
.// Transfers the message that was previously offered by this block // to the caller. The caller of this method is the target block that // reserved the message. virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id) { // Transfer ownership of the message to the caller. return accept_message(msg_id); }
Hedef blok, ayrıldığı iletinin sahipliğini aktarmak için çağrılar
consume_message
.protected
bölümünde yöntemini tanımlayınrelease_message
.// Releases a previous message reservation. virtual void release_message(concurrency::runtime_object_identity msg_id) { // The head message must be the one that is reserved. if (_output_messages.empty() || _output_messages.front()->msg_id() != msg_id) { throw message_not_found(); } }
Bir iletiye yapılan rezervasyon iptal etmek için hedef blok çağrıları
release_message
.protected
bölümünde yöntemini tanımlayınresume_propagation
.// Resumes propagation after a reservation has been released. virtual void resume_propagation() { // Propagate out any messages in the output queue. if (_output_messages.size() > 0) { async_send(NULL); } }
Çalışma zamanı, bir hedef bloğun ayrılmış bir iletiyi kullanmasından veya serbest bırakmasından sonra çağırır
resume_propagation
. Bu yöntem çıkış kuyruğundaki tüm iletileri yayılım.protected
bölümünde yöntemini tanımlayınlink_target_notification
.// Notifies this block that a new target has been linked to it. virtual void link_target_notification(concurrency::ITarget<_Target_type>*) { // Do not propagate messages if a target block reserves // the message at the front of the queue. if (_M_pReservedFor != NULL) { return; } // Propagate out any messages that are in the output queue. propagate_priority_order(); }
_M_pReservedFor
Üye değişkeni, temel sınıfısource_block
tarafından tanımlanır. Bu üye değişkeni, varsa çıkış kuyruğunun önündeki iletiye rezervasyon tutan hedef bloğu gösterir. Nesneyepriority_buffer
yeni bir hedef bağlandığında çalışma zamanı çağırırlink_target_notification
. Bu yöntem, rezervasyon tutan bir hedef yoksa çıkış kuyruğundaki tüm iletileri yayılır.private
bölümünde yöntemini tanımlayınpropagate_priority_order
.// Propagates messages in priority order. void propagate_priority_order() { // Cancel propagation if another block reserves the head message. if (_M_pReservedFor != NULL) { return; } // Propagate out all output messages. // Because this block preserves message ordering, stop propagation // if any of the messages are not accepted by a target block. while (!_output_messages.empty()) { // Get the next message. concurrency::message<_Target_type> * message = _output_messages.front(); concurrency::message_status status = declined; // Traverse each target in the order in which they are connected. for (target_iterator iter = _M_connectedTargets.begin(); *iter != NULL; ++iter) { // Propagate the message to the target. concurrency::ITarget<_Target_type>* target = *iter; status = target->propagate(message, this); // If the target accepts the message then ownership of message has // changed. Do not propagate this message to any other target. if (status == accepted) { break; } // If the target only reserved this message, we must wait until the // target accepts the message. if (_M_pReservedFor != NULL) { break; } } // If status is anything other than accepted, then the head message // was not propagated out. To preserve the order in which output // messages are propagated, we must stop propagation until the head // message is accepted. if (status != accepted) { break; } } }
Bu yöntem çıkış kuyruğundaki tüm iletileri yayılım. Hedef bloklardan biri iletiyi kabul edene kadar kuyruktaki her ileti her hedef bloğuna sunulur. sınıfı giden
priority_buffer
iletilerin sırasını korur. Bu nedenle, bu yöntem hedef bloklara başka bir ileti sunmadan önce çıkış kuyruğundaki ilk ileti bir hedef blok tarafından kabul edilmelidir.protected
bölümünde yöntemini tanımlayınpropagate_message
.// Asynchronously passes a message from an ISource block to this block. // This method is typically called by propagator_block::propagate. virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message, concurrency::ISource<_Source_type>* source) { // Accept the message from the source block. message = source->accept(message->msg_id(), this); if (message != NULL) { // Insert the message into the input queue. The type parameter Pr // defines how to order messages by priority. { concurrency::critical_section::scoped_lock lock(_input_lock); _input_messages.push(message); } // Asynchronously send the message to the target blocks. async_send(NULL); return accepted; } else { return missed; } }
yöntemi,
propagate_message
sınıfınpriority_buffer
ileti alıcısı veya hedef olarak davranmasını sağlar. Bu yöntem, sağlanan kaynak bloğu tarafından sunulan iletiyi alır ve bu iletiyi öncelik kuyruğuna ekler. Yöntemipropagate_message
daha sonra tüm çıkış iletilerini zaman uyumsuz olarak hedef bloklara gönderir.eşzamanlılık::asend işlevini çağırdığınızda veya ileti bloğu diğer ileti bloklarına bağlandığında çalışma zamanı bu yöntemi çağırır.
protected
bölümünde yöntemini tanımlayınsend_message
.// Synchronously passes a message from an ISource block to this block. // This method is typically called by propagator_block::send. virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message, concurrency::ISource<_Source_type>* source) { // Accept the message from the source block. message = source->accept(message->msg_id(), this); if (message != NULL) { // Insert the message into the input queue. The type parameter Pr // defines how to order messages by priority. { concurrency::critical_section::scoped_lock lock(_input_lock); _input_messages.push(message); } // Synchronously send the message to the target blocks. sync_send(NULL); return accepted; } else { return missed; } }
send_message
yöntemine benzer.propagate_message
Ancak çıkış iletilerini zaman uyumsuz yerine zaman uyumlu olarak gönderir.Çalışma zamanı zaman uyumlu gönderme işlemi sırasında bu yöntemi çağırır, örneğin concurrency::send işlevini çağırdığınızda.
sınıfı, priority_buffer
birçok ileti bloğu türünde tipik olan oluşturucu aşırı yüklemeleri içerir. Bazı oluşturucu aşırı yüklemeleri eşzamanlılık::Scheduler veya concurrency::ScheduleGroup nesnelerini alır ve bu da ileti bloğunun belirli bir görev zamanlayıcı tarafından yönetilmesini sağlar. Diğer oluşturucu aşırı yüklemeleri bir filtre işlevi alır. Filtre işlevleri, ileti bloklarının yükü temelinde bir iletiyi kabul etmelerini veya reddetmelerini sağlar. İleti filtreleri hakkında daha fazla bilgi için bkz . Zaman Uyumsuz İleti Blokları. Görev zamanlayıcıları hakkında daha fazla bilgi için bkz . Görev Zamanlayıcı.
priority_buffer
Sınıf iletileri önceliğe ve sonra iletilerin alınma sırasına göre sıraladığından, bu sınıf en çok iletileri zaman uyumsuz olarak aldığında, örneğin eşzamanlılık::asend işlevini çağırdığınızda veya ileti bloğu diğer ileti bloklarına bağlandığında kullanışlıdır.
[Üst]
Tam Örnek
Aşağıdaki örnekte sınıfın tam tanımı gösterilmektedir priority_buffer
.
// priority_buffer.h
#pragma once
#include <agents.h>
#include <queue>
namespace std
{
// A specialization of less that tests whether the priority element of a
// message is less than the priority element of another message.
template<class Type, class PriorityType>
struct less<concurrency::message<tuple<PriorityType,Type>>*>
{
typedef concurrency::message<tuple<PriorityType, Type>> MessageType;
bool operator()(const MessageType* left, const MessageType* right) const
{
// apply operator< to the first element (the priority)
// of the tuple payload.
return (get<0>(left->payload) < get<0>(right->payload));
}
};
// A specialization of less that tests whether the priority element of a
// message is greater than the priority element of another message.
template<class Type, class PriorityType>
struct greater<concurrency::message<tuple<PriorityType, Type>>*>
{
typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType;
bool operator()(const MessageType* left, const MessageType* right) const
{
// apply operator> to the first element (the priority)
// of the tuple payload.
return (get<0>(left->payload) > get<0>(right->payload));
}
};
}
namespace concurrencyex
{
// A message block type that orders incoming messages first by priority,
// and then by the order in which messages are received.
template<class Type,
typename PriorityType = int,
typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>,
concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>>
{
public:
// Constructs a priority_buffer message block.
priority_buffer()
{
initialize_source_and_target();
}
// Constructs a priority_buffer message block with the given filter function.
priority_buffer(filter_method const& filter)
{
initialize_source_and_target();
register_filter(filter);
}
// Constructs a priority_buffer message block that uses the provided
// Scheduler object to propagate messages.
priority_buffer(concurrency::Scheduler& scheduler)
{
initialize_source_and_target(&scheduler);
}
// Constructs a priority_buffer message block with the given filter function
// and uses the provided Scheduler object to propagate messages.
priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter)
{
initialize_source_and_target(&scheduler);
register_filter(filter);
}
// Constructs a priority_buffer message block that uses the provided
// SchedulerGroup object to propagate messages.
priority_buffer(concurrency::ScheduleGroup& schedule_group)
{
initialize_source_and_target(NULL, &schedule_group);
}
// Constructs a priority_buffer message block with the given filter function
// and uses the provided SchedulerGroup object to propagate messages.
priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter)
{
initialize_source_and_target(NULL, &schedule_group);
register_filter(filter);
}
// Destroys the message block.
~priority_buffer()
{
// Remove all links.
remove_network_links();
}
// Sends an item to the message block.
bool enqueue(Type const& item)
{
return concurrency::asend<Type>(this, item);
}
// Receives an item from the message block.
Type dequeue()
{
return receive<Type>(this);
}
protected:
// Asynchronously passes a message from an ISource block to this block.
// This method is typically called by propagator_block::propagate.
virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message,
concurrency::ISource<_Source_type>* source)
{
// Accept the message from the source block.
message = source->accept(message->msg_id(), this);
if (message != NULL)
{
// Insert the message into the input queue. The type parameter Pr
// defines how to order messages by priority.
{
concurrency::critical_section::scoped_lock lock(_input_lock);
_input_messages.push(message);
}
// Asynchronously send the message to the target blocks.
async_send(NULL);
return accepted;
}
else
{
return missed;
}
}
// Synchronously passes a message from an ISource block to this block.
// This method is typically called by propagator_block::send.
virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message,
concurrency::ISource<_Source_type>* source)
{
// Accept the message from the source block.
message = source->accept(message->msg_id(), this);
if (message != NULL)
{
// Insert the message into the input queue. The type parameter Pr
// defines how to order messages by priority.
{
concurrency::critical_section::scoped_lock lock(_input_lock);
_input_messages.push(message);
}
// Synchronously send the message to the target blocks.
sync_send(NULL);
return accepted;
}
else
{
return missed;
}
}
// Accepts a message that was offered by this block by transferring ownership
// to the caller.
virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id)
{
concurrency::message<_Target_type>* message = NULL;
// Transfer ownership if the provided message identifier matches
// the identifier of the front of the output message queue.
if (!_output_messages.empty() &&
_output_messages.front()->msg_id() == msg_id)
{
message = _output_messages.front();
_output_messages.pop();
}
return message;
}
// Reserves a message that was previously offered by this block.
virtual bool reserve_message(concurrency::runtime_object_identity msg_id)
{
// Allow the message to be reserved if the provided message identifier
// is the message identifier of the front of the message queue.
return (!_output_messages.empty() &&
_output_messages.front()->msg_id() == msg_id);
}
// Transfers the message that was previously offered by this block
// to the caller. The caller of this method is the target block that
// reserved the message.
virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id)
{
// Transfer ownership of the message to the caller.
return accept_message(msg_id);
}
// Releases a previous message reservation.
virtual void release_message(concurrency::runtime_object_identity msg_id)
{
// The head message must be the one that is reserved.
if (_output_messages.empty() ||
_output_messages.front()->msg_id() != msg_id)
{
throw message_not_found();
}
}
// Resumes propagation after a reservation has been released.
virtual void resume_propagation()
{
// Propagate out any messages in the output queue.
if (_output_messages.size() > 0)
{
async_send(NULL);
}
}
// Notifies this block that a new target has been linked to it.
virtual void link_target_notification(concurrency::ITarget<_Target_type>*)
{
// Do not propagate messages if a target block reserves
// the message at the front of the queue.
if (_M_pReservedFor != NULL)
{
return;
}
// Propagate out any messages that are in the output queue.
propagate_priority_order();
}
// Transfers the message at the front of the input queue to the output queue
// and propagates out all messages in the output queue.
virtual void propagate_to_any_targets(concurrency::message<_Target_type>*)
{
// Retrieve the message from the front of the input queue.
concurrency::message<_Source_type>* input_message = NULL;
{
concurrency::critical_section::scoped_lock lock(_input_lock);
if (_input_messages.size() > 0)
{
input_message = _input_messages.top();
_input_messages.pop();
}
}
// Move the message to the output queue.
if (input_message != NULL)
{
// The payload of the output message does not contain the
// priority of the message.
concurrency::message<_Target_type>* output_message =
new concurrency::message<_Target_type>(get<1>(input_message->payload));
_output_messages.push(output_message);
// Free the memory for the input message.
delete input_message;
// Do not propagate messages if the new message is not the head message.
// In this case, the head message is reserved by another message block.
if (_output_messages.front()->msg_id() != output_message->msg_id())
{
return;
}
}
// Propagate out the output messages.
propagate_priority_order();
}
private:
// Propagates messages in priority order.
void propagate_priority_order()
{
// Cancel propagation if another block reserves the head message.
if (_M_pReservedFor != NULL)
{
return;
}
// Propagate out all output messages.
// Because this block preserves message ordering, stop propagation
// if any of the messages are not accepted by a target block.
while (!_output_messages.empty())
{
// Get the next message.
concurrency::message<_Target_type> * message = _output_messages.front();
concurrency::message_status status = declined;
// Traverse each target in the order in which they are connected.
for (target_iterator iter = _M_connectedTargets.begin();
*iter != NULL;
++iter)
{
// Propagate the message to the target.
concurrency::ITarget<_Target_type>* target = *iter;
status = target->propagate(message, this);
// If the target accepts the message then ownership of message has
// changed. Do not propagate this message to any other target.
if (status == accepted)
{
break;
}
// If the target only reserved this message, we must wait until the
// target accepts the message.
if (_M_pReservedFor != NULL)
{
break;
}
}
// If status is anything other than accepted, then the head message
// was not propagated out. To preserve the order in which output
// messages are propagated, we must stop propagation until the head
// message is accepted.
if (status != accepted)
{
break;
}
}
}
private:
// Stores incoming messages.
// The type parameter Pr specifies how to order messages by priority.
std::priority_queue<
concurrency::message<_Source_type>*,
std::vector<concurrency::message<_Source_type>*>,
Pr
> _input_messages;
// Synchronizes access to the input message queue.
concurrency::critical_section _input_lock;
// Stores outgoing messages.
std::queue<concurrency::message<_Target_type>*> _output_messages;
private:
// Hide assignment operator and copy constructor.
priority_buffer const &operator =(priority_buffer const&);
priority_buffer(priority_buffer const &);
};
}
Aşağıdaki örnek, bir priority_buffer
nesne üzerinde eşzamanlı olarak bir dizi asend
ve eşzamanlılık::alma işlemi gerçekleştirir.
// priority_buffer.cpp
// compile with: /EHsc
#include <ppl.h>
#include <iostream>
#include "priority_buffer.h"
using namespace concurrency;
using namespace concurrencyex;
using namespace std;
int wmain()
{
// Concurrently perform a number of asend and receive operations
// on a priority_buffer object.
priority_buffer<int> pb;
parallel_invoke(
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(2, 36)); },
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(0, 12)); },
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(1, 24)); },
[&pb] {
for (int i = 0; i < 75; ++i) {
wcout << receive(pb) << L' ';
if ((i+1) % 25 == 0)
wcout << endl;
}
}
);
}
Bu örnek aşağıdaki örnek çıktıyı oluşturur.
36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36
24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24
12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12
priority_buffer
sınıfı iletileri önce önce iletileri aldığı sırayla sıralar. Bu örnekte, daha yüksek sayısal önceliğe sahip iletiler kuyruğun önüne eklenir.
[Üst]
Kod Derleniyor
Örnek kodu kopyalayıp bir Visual Studio projesine yapıştırın veya sınıfın priority_buffer
tanımını adlandırılmış bir dosyaya ve test programını adlandırılmış priority_buffer.h
priority_buffer.cpp
bir dosyaya yapıştırın ve ardından visual studio komut istemi penceresinde aşağıdaki komutu çalıştırın.
cl.exe /EHsc priority_buffer.cpp
Ayrıca bkz.
Eşzamanlılık Çalışma Zamanı İzlenecek Yollar
Zaman Uyumsuz İleti Blokları
İleti Geçirme İşlevleri