非同步代理程式程式庫中的最佳做法
本文件說明如何有效使用異步代理程序連結庫。 代理程式連結庫會針對粗細數據流和管線工作,升級動作專案型程序設計模型和同進程訊息傳遞。
如需 Agents 連結庫的詳細資訊,請參閱 異步代理程式連結庫。
區段
本文件包含下列章節:
使用代理程式隔離狀態
代理程式連結庫可讓您透過異步訊息傳遞機制連接隔離元件,以提供共享狀態的替代方案。 異步代理程式在隔離內部狀態與其他元件時最為有效。 藉由隔離狀態,多個元件通常不會對共用數據採取動作。 狀態隔離可讓應用程式調整規模,因為它可減少共用記憶體上的爭用。 狀態隔離也會降低死結和競爭條件的機會,因為元件不需要同步存取共享數據。
您通常會藉由在代理程序類別的 或 protected
區段中保存數據成員private
,以及使用訊息緩衝區來傳達狀態變更,來隔離代理程序的狀態。 下列範例顯示 basic_agent
衍生自 concurrency::agent 的 類別。 類別 basic_agent
會使用兩個訊息緩衝區來與外部元件通訊。 一個訊息緩衝區保存傳入訊息;另一個訊息緩衝區會保存傳出訊息。
// basic-agent.cpp
// compile with: /c /EHsc
#include <agents.h>
// An agent that uses message buffers to isolate state and communicate
// with other components.
class basic_agent : public concurrency::agent
{
public:
basic_agent(concurrency::unbounded_buffer<int>& input)
: _input(input)
{
}
// Retrieves the message buffer that holds output messages.
concurrency::unbounded_buffer<int>& output()
{
return _output;
}
protected:
void run()
{
while (true)
{
// Read from the input message buffer.
int value = concurrency::receive(_input);
// TODO: Do something with the value.
int result = value;
// Write the result to the output message buffer.
concurrency::send(_output, result);
}
done();
}
private:
// Holds incoming messages.
concurrency::unbounded_buffer<int>& _input;
// Holds outgoing messages.
concurrency::unbounded_buffer<int> _output;
};
如需如何定義和使用代理程式的完整範例,請參閱逐步解說:建立代理程式型應用程式和逐步解說:建立數據流代理程式。
[靠上]
使用節流機制來限制數據管線中的訊息數目
許多訊息緩衝區類型,例如 並行::unbounded_buffer,可以保存無限數量的訊息。 當訊息產生者將訊息傳送至數據管線的速度比取用者可以處理這些訊息更快時,應用程式可以進入記憶體不足或記憶體不足的狀態。 您可以使用節流機制,例如旗號,限制數據管線中同時作用中的訊息數目。
下列基本範例示範如何使用號誌來限制數據管線中的訊息數目。 數據管線會使用 並行::wait 函式來模擬至少需要 100 毫秒的作業。 因為傳送者產生訊息的速度比取用者可以處理這些訊息更快,因此此範例會 semaphore
定義 類別,讓應用程式限制作用中訊息的數目。
// message-throttling.cpp
// compile with: /EHsc
#include <windows.h> // for GetTickCount()
#include <atomic>
#include <agents.h>
#include <concrt.h>
#include <concurrent_queue.h>
#include <sstream>
#include <iostream>
using namespace concurrency;
using namespace std;
// A semaphore type that uses cooperative blocking semantics.
class semaphore
{
public:
explicit semaphore(long long capacity)
: _semaphore_count(capacity)
{
}
// Acquires access to the semaphore.
void acquire()
{
// The capacity of the semaphore is exceeded when the semaphore count
// falls below zero. When this happens, add the current context to the
// back of the wait queue and block the current context.
if (--_semaphore_count < 0)
{
_waiting_contexts.push(Context::CurrentContext());
Context::Block();
}
}
// Releases access to the semaphore.
void release()
{
// If the semaphore count is negative, unblock the first waiting context.
if (++_semaphore_count <= 0)
{
// A call to acquire might have decremented the counter, but has not
// yet finished adding the context to the queue.
// Create a spin loop that waits for the context to become available.
Context* waiting = NULL;
while (!_waiting_contexts.try_pop(waiting))
{
(Context::Yield)(); // <windows.h> defines Yield as a macro. The parenthesis around Yield prevent the macro expansion so that Context::Yield() is called.
}
// Unblock the context.
waiting->Unblock();
}
}
private:
// The semaphore count.
atomic<long long> _semaphore_count;
// A concurrency-safe queue of contexts that must wait to
// acquire the semaphore.
concurrent_queue<Context*> _waiting_contexts;
};
// A synchronization primitive that is signaled when its
// count reaches zero.
class countdown_event
{
public:
countdown_event(long long count)
: _current(count)
{
// Set the event if the initial count is zero.
if (_current == 0LL)
_event.set();
}
// Decrements the event counter.
void signal() {
if(--_current == 0LL) {
_event.set();
}
}
// Increments the event counter.
void add_count() {
if(++_current == 1LL) {
_event.reset();
}
}
// Blocks the current context until the event is set.
void wait() {
_event.wait();
}
private:
// The current count.
atomic<long long> _current;
// The event that is set when the counter reaches zero.
event _event;
// Disable copy constructor.
countdown_event(const countdown_event&);
// Disable assignment.
countdown_event const & operator=(countdown_event const&);
};
int wmain()
{
// The number of messages to send to the consumer.
const long long MessageCount = 5;
// The number of messages that can be active at the same time.
const long long ActiveMessages = 2;
// Used to compute the elapsed time.
DWORD start_time;
// Computes the elapsed time, rounded-down to the nearest
// 100 milliseconds.
auto elapsed = [&start_time] {
return (GetTickCount() - start_time)/100*100;
};
// Limits the number of active messages.
semaphore s(ActiveMessages);
// Enables the consumer message buffer to coordinate completion
// with the main application.
countdown_event e(MessageCount);
// Create a data pipeline that has three stages.
// The first stage of the pipeline prints a message.
transformer<int, int> print_message([&elapsed](int n) -> int {
wstringstream ss;
ss << elapsed() << L": received " << n << endl;
wcout << ss.str();
// Send the input to the next pipeline stage.
return n;
});
// The second stage of the pipeline simulates a
// time-consuming operation.
transformer<int, int> long_operation([](int n) -> int {
wait(100);
// Send the input to the next pipeline stage.
return n;
});
// The third stage of the pipeline releases the semaphore
// and signals to the main appliation that the message has
// been processed.
call<int> release_and_signal([&](int unused) {
// Enable the sender to send the next message.
s.release();
// Signal that the message has been processed.
e.signal();
});
// Connect the pipeline.
print_message.link_target(&long_operation);
long_operation.link_target(&release_and_signal);
// Send several messages to the pipeline.
start_time = GetTickCount();
for(auto i = 0; i < MessageCount; ++i)
{
// Acquire access to the semaphore.
s.acquire();
// Print the message to the console.
wstringstream ss;
ss << elapsed() << L": sending " << i << L"..." << endl;
wcout << ss.str();
// Send the message.
send(print_message, i);
}
// Wait for the consumer to process all messages.
e.wait();
}
/* Sample output:
0: sending 0...
0: received 0
0: sending 1...
0: received 1
100: sending 2...
100: received 2
200: sending 3...
200: received 3
300: sending 4...
300: received 4
*/
物件 semaphore
會限制管線最多同時處理兩則訊息。
此範例中的產生者會將相對較少的訊息傳送給取用者。 因此,此範例不會示範潛在的記憶體不足或記憶體不足狀況。 不過,當數據管線包含相對大量的訊息時,此機制很有用。
如需如何建立此範例中使用的旗號類別的詳細資訊,請參閱 如何:使用內容類別來實作合作號誌。
[靠上]
不要在數據管線中執行精細的工作
當數據管線所執行的工作相當粗略時,代理程序連結庫最有用。 例如,一個應用程式元件可能會從檔案或網路連線讀取數據,並偶爾將數據傳送到另一個元件。 代理程式連結庫用來傳播訊息的通訊協定會導致訊息傳遞機制的額外負荷超過平行模式連結庫 (PPL) 所提供的工作平行建構。 因此,請確定數據管線所執行的工作已足夠長,足以抵消此額外負荷。
雖然數據管線在工作粗略時最有效,但數據管線的每個階段都可以使用PPL建構,例如工作組和平行演算法來執行更精細的工作。 如需在每個處理階段使用精細平行處理原則的粗略數據網路範例,請參閱 逐步解說:建立影像處理網路。
[靠上]
不要依值傳遞大型訊息承載
在某些情況下,運行時間會建立它從一個訊息緩衝區傳遞至另一個訊息緩衝區的每個訊息複本。 例如, concurrency::overwrite_buffer 類別會提供它接收給每個目標之每個訊息的複本。 當您使用訊息傳遞函式,例如 concurrency::send 和 concurrency::receive ,將訊息寫入和讀取訊息緩衝區時,運行時間也會建立訊息數據的複本。 雖然此機制有助於消除同時寫入共用數據的風險,但是當訊息承載相對較大時,可能會導致記憶體效能不佳。
當您傳遞具有大型承載的訊息時,您可以使用指標或參考來改善記憶體效能。 下列範例會比較依值傳遞大型訊息,將指標傳遞至相同的訊息類型。 此範例會定義兩種代理程序類型, producer
和 consumer
,以處理 message_data
物件。 此範例會比較產生者將數個對象傳送給取用者所需的時間,以及產生者代理程式將數 message_data
個指標傳送給 message_data
取用者所需的時間。
// message-payloads.cpp
// compile with: /EHsc
#include <Windows.h>
#include <agents.h>
#include <iostream>
using namespace concurrency;
using namespace std;
// Calls the provided work function and returns the number of milliseconds
// that it takes to call that function.
template <class Function>
__int64 time_call(Function&& f)
{
__int64 begin = GetTickCount();
f();
return GetTickCount() - begin;
}
// A message structure that contains large payload data.
struct message_data
{
int id;
string source;
unsigned char binary_data[32768];
};
// A basic agent that produces values.
template <typename T>
class producer : public agent
{
public:
explicit producer(ITarget<T>& target, unsigned int message_count)
: _target(target)
, _message_count(message_count)
{
}
protected:
void run();
private:
// The target buffer to write to.
ITarget<T>& _target;
// The number of messages to send.
unsigned int _message_count;
};
// Template specialization for message_data.
template <>
void producer<message_data>::run()
{
// Send a number of messages to the target buffer.
while (_message_count > 0)
{
message_data message;
message.id = _message_count;
message.source = "Application";
send(_target, message);
--_message_count;
}
// Set the agent to the finished state.
done();
}
// Template specialization for message_data*.
template <>
void producer<message_data*>::run()
{
// Send a number of messages to the target buffer.
while (_message_count > 0)
{
message_data* message = new message_data;
message->id = _message_count;
message->source = "Application";
send(_target, message);
--_message_count;
}
// Set the agent to the finished state.
done();
}
// A basic agent that consumes values.
template <typename T>
class consumer : public agent
{
public:
explicit consumer(ISource<T>& source, unsigned int message_count)
: _source(source)
, _message_count(message_count)
{
}
protected:
void run();
private:
// The source buffer to read from.
ISource<T>& _source;
// The number of messages to receive.
unsigned int _message_count;
};
// Template specialization for message_data.
template <>
void consumer<message_data>::run()
{
// Receive a number of messages from the source buffer.
while (_message_count > 0)
{
message_data message = receive(_source);
--_message_count;
// TODO: Do something with the message.
// ...
}
// Set the agent to the finished state.
done();
}
template <>
void consumer<message_data*>::run()
{
// Receive a number of messages from the source buffer.
while (_message_count > 0)
{
message_data* message = receive(_source);
--_message_count;
// TODO: Do something with the message.
// ...
// Release the memory for the message.
delete message;
}
// Set the agent to the finished state.
done();
}
int wmain()
{
// The number of values for the producer agent to send.
const unsigned int count = 10000;
__int64 elapsed;
// Run the producer and consumer agents.
// This version uses message_data as the message payload type.
wcout << L"Using message_data..." << endl;
elapsed = time_call([count] {
// A message buffer that is shared by the agents.
unbounded_buffer<message_data> buffer;
// Create and start the producer and consumer agents.
producer<message_data> prod(buffer, count);
consumer<message_data> cons(buffer, count);
prod.start();
cons.start();
// Wait for the agents to finish.
agent::wait(&prod);
agent::wait(&cons);
});
wcout << L"took " << elapsed << L"ms." << endl;
// Run the producer and consumer agents a second time.
// This version uses message_data* as the message payload type.
wcout << L"Using message_data*..." << endl;
elapsed = time_call([count] {
// A message buffer that is shared by the agents.
unbounded_buffer<message_data*> buffer;
// Create and start the producer and consumer agents.
producer<message_data*> prod(buffer, count);
consumer<message_data*> cons(buffer, count);
prod.start();
cons.start();
// Wait for the agents to finish.
agent::wait(&prod);
agent::wait(&cons);
});
wcout << L"took " << elapsed << L"ms." << endl;
}
此範例會產生下列範例輸出:
Using message_data...
took 437ms.
Using message_data*...
took 47ms.
使用指標的版本會執行得更好,因為它不需要運行時間建立它從產生者傳遞至取用者之每個 message_data
物件的完整複本。
[靠上]
未定義擁有權時,在數據網路中使用shared_ptr
當您透過訊息傳遞管線或網路透過指標傳送訊息時,通常會為網路前端的每個訊息配置記憶體,並在網路結尾釋放該記憶體。 雖然此機制經常運作良好,但在某些情況下,很難或無法使用它。 例如,請考慮數據網路包含多個結束節點的情況。 在此情況下,沒有清楚的位置可以釋放訊息的記憶體。
若要解決此問題,您可以使用 std::shared_ptr 等機制,讓多個元件擁有指標。 當擁有資源的最終 shared_ptr
對象終結時,也會釋放資源。
下列範例示範如何使用 在 shared_ptr
多個訊息緩衝區之間共享指標值。 此範例會將並行::overwrite_buffer 對象連接到三個並行::call 物件。 類別 overwrite_buffer
會向每個目標提供訊息。 由於數據網路結尾有多個數據的擁有者,此範例會使用 shared_ptr
來讓每個 call
對象共用訊息的擁有權。
// message-sharing.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>
#include <sstream>
using namespace concurrency;
using namespace std;
// A type that holds a resource.
class resource
{
public:
resource(int id) : _id(id)
{
wcout << L"Creating resource " << _id << L"..." << endl;
}
~resource()
{
wcout << L"Destroying resource " << _id << L"..." << endl;
}
// Retrieves the identifier for the resource.
int id() const { return _id; }
// TODO: Add additional members here.
private:
// An identifier for the resource.
int _id;
// TODO: Add additional members here.
};
int wmain()
{
// A message buffer that sends messages to each of its targets.
overwrite_buffer<shared_ptr<resource>> input;
// Create three call objects that each receive resource objects
// from the input message buffer.
call<shared_ptr<resource>> receiver1(
[](shared_ptr<resource> res) {
wstringstream ss;
ss << L"receiver1: received resource " << res->id() << endl;
wcout << ss.str();
},
[](shared_ptr<resource> res) {
return res != nullptr;
}
);
call<shared_ptr<resource>> receiver2(
[](shared_ptr<resource> res) {
wstringstream ss;
ss << L"receiver2: received resource " << res->id() << endl;
wcout << ss.str();
},
[](shared_ptr<resource> res) {
return res != nullptr;
}
);
event e;
call<shared_ptr<resource>> receiver3(
[&e](shared_ptr<resource> res) {
e.set();
},
[](shared_ptr<resource> res) {
return res == nullptr;
}
);
// Connect the call objects to the input message buffer.
input.link_target(&receiver1);
input.link_target(&receiver2);
input.link_target(&receiver3);
// Send a few messages through the network.
send(input, make_shared<resource>(42));
send(input, make_shared<resource>(64));
send(input, shared_ptr<resource>(nullptr));
// Wait for the receiver that accepts the nullptr value to
// receive its message.
e.wait();
}
此範例會產生下列範例輸出:
Creating resource 42...
receiver1: received resource 42
Creating resource 64...
receiver2: received resource 42
receiver1: received resource 64
Destroying resource 42...
receiver2: received resource 64
Destroying resource 64...
另請參閱
並行執行階段最佳做法
非同步代理程式程式庫
逐步解說:建立代理程式架構應用程式
逐步解說:建立資料流程代理程式
逐步解說:建立影像處理網路
平行模式程式庫中的最佳做法
並行執行階段中的一般最佳做法