逐步解說:建立資料流程代理程式
本文件示範如何建立以數據流為基礎的代理程式型應用程式,而不是控制流程。
控制流程 是指程式中作業的執行順序。 控制流程是使用條件語句、迴圈等控制結構進行管制。 或者,數據流是指程序設計模型,只有在所有必要數據都可用時,才會進行計算。 數據流程序設計模型與訊息傳遞的概念有關,其中程式的獨立元件會藉由傳送訊息彼此通訊。
異步代理程式同時支援控制流程和數據流程序設計模型。 雖然在許多情況下,控制流程模型是適當的,但數據流模型適用於其他模型,例如,當代理程式接收數據並執行以該數據承載為基礎的動作時。
必要條件
開始本逐步解說之前,請先閱讀下列檔:
區段
本逐步解說包含下列各節:
建立基本控制流程代理程式
請考慮下列定義 類別的 control_flow_agent
範例。 類別 control_flow_agent
在三個訊息緩衝區上作用:一個輸入緩衝區和兩個輸出緩衝區。 方法 run
會從迴圈中的來源訊息緩衝區讀取,並使用條件語句來引導程序執行的流程。 代理程式會針對非零、負值遞增一個計數器,併為非零、正值遞增另一個計數器。 代理程式收到零的 sentinel 值之後,它會將計數器的值傳送至輸出訊息緩衝區。 和 negatives
positives
方法可讓應用程式從代理程式讀取負值和正值計數。
// A basic agent that uses control-flow to regulate the order of program
// execution. This agent reads numbers from a message buffer and counts the
// number of positive and negative values.
class control_flow_agent : public agent
{
public:
explicit control_flow_agent(ISource<int>& source)
: _source(source)
{
}
// Retrieves the count of negative numbers that the agent received.
size_t negatives()
{
return receive(_negatives);
}
// Retrieves the count of positive numbers that the agent received.
size_t positives()
{
return receive(_positives);
}
protected:
void run()
{
// Counts the number of negative and positive values that
// the agent receives.
size_t negative_count = 0;
size_t positive_count = 0;
// Read from the source buffer until we receive
// the sentinel value of 0.
int value = 0;
while ((value = receive(_source)) != 0)
{
// Send negative values to the first target and
// non-negative values to the second target.
if (value < 0)
++negative_count;
else
++positive_count;
}
// Write the counts to the message buffers.
send(_negatives, negative_count);
send(_positives, positive_count);
// Set the agent to the completed state.
done();
}
private:
// Source message buffer to read from.
ISource<int>& _source;
// Holds the number of negative and positive numbers that the agent receives.
single_assignment<size_t> _negatives;
single_assignment<size_t> _positives;
};
雖然此範例會基本使用代理程式中的控制流程,但它示範以控制流程為基礎的程式設計序列本質。 即使輸入訊息緩衝區中可能有多個訊息,每個訊息都必須循序處理。 數據流模型可讓條件語句的兩個分支同時評估。 數據流模型也可讓您建立更複雜的傳訊網路,以在數據可供使用時採取行動。
[靠上]
建立基本數據流代理程式
本節說明如何將 類別轉換成 control_flow_agent
使用數據流模型來執行相同的工作。
數據流代理程序的運作方式是建立訊息緩衝區網路,每個訊息緩衝區都有特定用途。 某些訊息區塊會使用篩選函式,根據其承載來接受或拒絕訊息。 篩選函式可確保消息塊只接收特定值。
將控制流程代理程式轉換為數據流代理程式
將類別的
control_flow_agent
主體複製到另一個類別,例如dataflow_agent
。 或者,您可以重新命名 類別control_flow_agent
。拿掉從
run
方法呼叫receive
的循環主體。
void run()
{
// Counts the number of negative and positive values that
// the agent receives.
size_t negative_count = 0;
size_t positive_count = 0;
// Write the counts to the message buffers.
send(_negatives, negative_count);
send(_positives, positive_count);
// Set the agent to the completed state.
done();
}
- 在方法中
run
,在變數negative_count
和positive_count
初始化之後,新增countdown_event
對象來追蹤使用中作業計數。
// Tracks the count of active operations.
countdown_event active;
// An event that is set by the sentinel.
event received_sentinel;
本主題稍後會顯示 類別 countdown_event
。
- 建立將參與數據流網路的訊息緩衝區物件。
//
// Create the members of the dataflow network.
//
// Increments the active counter.
transformer<int, int> increment_active(
[&active](int value) -> int {
active.add_count();
return value;
});
// Increments the count of negative values.
call<int> negatives(
[&](int value) {
++negative_count;
// Decrement the active counter.
active.signal();
},
[](int value) -> bool {
return value < 0;
});
// Increments the count of positive values.
call<int> positives(
[&](int value) {
++positive_count;
// Decrement the active counter.
active.signal();
},
[](int value) -> bool {
return value > 0;
});
// Receives only the sentinel value of 0.
call<int> sentinel(
[&](int value) {
// Decrement the active counter.
active.signal();
// Set the sentinel event.
received_sentinel.set();
},
[](int value) -> bool {
return value == 0;
});
// Connects the _source message buffer to the rest of the network.
unbounded_buffer<int> connector;
- 連接訊息緩衝區以形成網路。
//
// Connect the network.
//
// Connect the internal nodes of the network.
connector.link_target(&negatives);
connector.link_target(&positives);
connector.link_target(&sentinel);
increment_active.link_target(&connector);
// Connect the _source buffer to the internal network to
// begin data flow.
_source.link_target(&increment_active);
- 等候
event
設定和countdown event
物件。 這些事件表示代理程式已收到 sentinel 值,且所有作業都已完成。
// Wait for the sentinel event and for all operations to finish.
received_sentinel.wait();
active.wait();
下圖顯示 類別的完整資料串流網路 dataflow_agent
:
下表描述網路的成員。
member | 描述 |
---|---|
increment_active |
並行::transformer 物件,會遞增使用中事件計數器,並將輸入值傳遞至網路的其餘部分。 |
negatives , positives |
concurrency::call 物件,以遞增數位計數並遞減使用中事件計數器。 每個物件都會使用篩選條件來接受負數或正數。 |
sentinel |
concurrency::call 物件,只接受零的 sentinel 值,並遞減使用中事件計數器。 |
connector |
將 來源訊息緩衝區連線到內部網路的並行::unbounded_buffer 物件。 |
run
因為方法是在個別線程上呼叫,其他線程可以在網路完全連線之前,將訊息傳送至網路。 數據 _source
成員是物件 unbounded_buffer
,會緩衝從應用程式傳送至代理程式的所有輸入。 為了確保網路會處理所有輸入訊息,代理程式會先連結網路的內部節點,然後將該網路的 connector
開頭連結至 _source
數據成員。 這可確保訊息不會在形成網路時進行處理。
由於此範例中的網路是以數據流為基礎,而不是以控制流程為基礎,因此網路必須與代理程式通訊,該代理程式已完成處理每個輸入值,而且 sentinel 節點已接收其值。 這個範例會使用 countdown_event
物件來表示已處理所有輸入值,以及 concurrency::event 物件,表示 sentinel 節點已收到其值。 類別 countdown_event
會使用 event
物件,在計數器值達到零時發出訊號。 數據流網路的前端會在每次收到值時遞增計數器。 網路的每個終端節點會在處理輸入值之後遞減計數器。 代理程式形成數據流網路之後,它會等候 sentinel 節點設定 event
物件,並讓 countdown_event
對象發出其計數器已達到零的訊號。
下列範例顯示 control_flow_agent
、 dataflow_agent
和 countdown_event
類別。 函 wmain
式會 control_flow_agent
建立和 dataflow_agent
物件,並使用 函 send_values
式將一系列隨機值傳送給代理程式。
// dataflow-agent.cpp
// compile with: /EHsc
#include <windows.h>
#include <agents.h>
#include <iostream>
#include <random>
using namespace concurrency;
using namespace std;
// A basic agent that uses control-flow to regulate the order of program
// execution. This agent reads numbers from a message buffer and counts the
// number of positive and negative values.
class control_flow_agent : public agent
{
public:
explicit control_flow_agent(ISource<int>& source)
: _source(source)
{
}
// Retrieves the count of negative numbers that the agent received.
size_t negatives()
{
return receive(_negatives);
}
// Retrieves the count of positive numbers that the agent received.
size_t positives()
{
return receive(_positives);
}
protected:
void run()
{
// Counts the number of negative and positive values that
// the agent receives.
size_t negative_count = 0;
size_t positive_count = 0;
// Read from the source buffer until we receive
// the sentinel value of 0.
int value = 0;
while ((value = receive(_source)) != 0)
{
// Send negative values to the first target and
// non-negative values to the second target.
if (value < 0)
++negative_count;
else
++positive_count;
}
// Write the counts to the message buffers.
send(_negatives, negative_count);
send(_positives, positive_count);
// Set the agent to the completed state.
done();
}
private:
// Source message buffer to read from.
ISource<int>& _source;
// Holds the number of negative and positive numbers that the agent receives.
single_assignment<size_t> _negatives;
single_assignment<size_t> _positives;
};
// A synchronization primitive that is signaled when its
// count reaches zero.
class countdown_event
{
public:
countdown_event(unsigned int count = 0L)
: _current(static_cast<long>(count))
{
// Set the event if the initial count is zero.
if (_current == 0L)
_event.set();
}
// Decrements the event counter.
void signal() {
if(InterlockedDecrement(&_current) == 0L) {
_event.set();
}
}
// Increments the event counter.
void add_count() {
if(InterlockedIncrement(&_current) == 1L) {
_event.reset();
}
}
// Blocks the current context until the event is set.
void wait() {
_event.wait();
}
private:
// The current count.
volatile long _current;
// The event that is set when the counter reaches zero.
event _event;
// Disable copy constructor.
countdown_event(const countdown_event&);
// Disable assignment.
countdown_event const & operator=(countdown_event const&);
};
// A basic agent that resembles control_flow_agent, but uses uses dataflow to
// perform computations when data becomes available.
class dataflow_agent : public agent
{
public:
dataflow_agent(ISource<int>& source)
: _source(source)
{
}
// Retrieves the count of negative numbers that the agent received.
size_t negatives()
{
return receive(_negatives);
}
// Retrieves the count of positive numbers that the agent received.
size_t positives()
{
return receive(_positives);
}
protected:
void run()
{
// Counts the number of negative and positive values that
// the agent receives.
size_t negative_count = 0;
size_t positive_count = 0;
// Tracks the count of active operations.
countdown_event active;
// An event that is set by the sentinel.
event received_sentinel;
//
// Create the members of the dataflow network.
//
// Increments the active counter.
transformer<int, int> increment_active(
[&active](int value) -> int {
active.add_count();
return value;
});
// Increments the count of negative values.
call<int> negatives(
[&](int value) {
++negative_count;
// Decrement the active counter.
active.signal();
},
[](int value) -> bool {
return value < 0;
});
// Increments the count of positive values.
call<int> positives(
[&](int value) {
++positive_count;
// Decrement the active counter.
active.signal();
},
[](int value) -> bool {
return value > 0;
});
// Receives only the sentinel value of 0.
call<int> sentinel(
[&](int value) {
// Decrement the active counter.
active.signal();
// Set the sentinel event.
received_sentinel.set();
},
[](int value) -> bool {
return value == 0;
});
// Connects the _source message buffer to the rest of the network.
unbounded_buffer<int> connector;
//
// Connect the network.
//
// Connect the internal nodes of the network.
connector.link_target(&negatives);
connector.link_target(&positives);
connector.link_target(&sentinel);
increment_active.link_target(&connector);
// Connect the _source buffer to the internal network to
// begin data flow.
_source.link_target(&increment_active);
// Wait for the sentinel event and for all operations to finish.
received_sentinel.wait();
active.wait();
// Write the counts to the message buffers.
send(_negatives, negative_count);
send(_positives, positive_count);
// Set the agent to the completed state.
done();
}
private:
// Source message buffer to read from.
ISource<int>& _source;
// Holds the number of negative and positive numbers that the agent receives.
single_assignment<size_t> _negatives;
single_assignment<size_t> _positives;
};
// Sends a number of random values to the provided message buffer.
void send_values(ITarget<int>& source, int sentinel, size_t count)
{
// Send a series of random numbers to the source buffer.
mt19937 rnd(42);
for (size_t i = 0; i < count; ++i)
{
// Generate a random number that is not equal to the sentinel value.
int n;
while ((n = rnd()) == sentinel);
send(source, n);
}
// Send the sentinel value.
send(source, sentinel);
}
int wmain()
{
// Signals to the agent that there are no more values to process.
const int sentinel = 0;
// The number of samples to send to each agent.
const size_t count = 1000000;
// The source buffer that the application writes numbers to and
// the agents read numbers from.
unbounded_buffer<int> source;
//
// Use a control-flow agent to process a series of random numbers.
//
wcout << L"Control-flow agent:" << endl;
// Create and start the agent.
control_flow_agent cf_agent(source);
cf_agent.start();
// Send values to the agent.
send_values(source, sentinel, count);
// Wait for the agent to finish.
agent::wait(&cf_agent);
// Print the count of negative and positive numbers.
wcout << L"There are " << cf_agent.negatives()
<< L" negative numbers."<< endl;
wcout << L"There are " << cf_agent.positives()
<< L" positive numbers."<< endl;
//
// Perform the same task, but this time with a dataflow agent.
//
wcout << L"Dataflow agent:" << endl;
// Create and start the agent.
dataflow_agent df_agent(source);
df_agent.start();
// Send values to the agent.
send_values(source, sentinel, count);
// Wait for the agent to finish.
agent::wait(&df_agent);
// Print the count of negative and positive numbers.
wcout << L"There are " << df_agent.negatives()
<< L" negative numbers."<< endl;
wcout << L"There are " << df_agent.positives()
<< L" positive numbers."<< endl;
}
此範例會產生下列範例輸出:
Control-flow agent:
There are 500523 negative numbers.
There are 499477 positive numbers.
Dataflow agent:
There are 500523 negative numbers.
There are 499477 positive numbers.
編譯程式碼
複製範例程式代碼,並將其貼到 Visual Studio 專案中,或貼到名為 dataflow-agent.cpp
的檔案中,然後在 Visual Studio 命令提示字元視窗中執行下列命令。
cl.exe /EHsc dataflow-agent.cpp
[靠上]
建立訊息記錄代理程式
下列範例顯示 log_agent
類別,類似於 dataflow_agent
類別。 類別 log_agent
會實作異步記錄代理程式,以將記錄訊息寫入檔案和控制台。 類別 log_agent
可讓應用程式將訊息分類為參考、警告或錯誤。 它也可讓應用程式指定每個記錄類別是否寫入檔案、主控台或兩者。 此範例會將所有記錄訊息寫入檔案,並將錯誤訊息只寫入主控台。
// log-filter.cpp
// compile with: /EHsc
#include <windows.h>
#include <agents.h>
#include <sstream>
#include <fstream>
#include <iostream>
using namespace concurrency;
using namespace std;
// A synchronization primitive that is signaled when its
// count reaches zero.
class countdown_event
{
public:
countdown_event(unsigned int count = 0L)
: _current(static_cast<long>(count))
{
// Set the event if the initial count is zero.
if (_current == 0L)
{
_event.set();
}
}
// Decrements the event counter.
void signal()
{
if(InterlockedDecrement(&_current) == 0L)
{
_event.set();
}
}
// Increments the event counter.
void add_count()
{
if(InterlockedIncrement(&_current) == 1L)
{
_event.reset();
}
}
// Blocks the current context until the event is set.
void wait()
{
_event.wait();
}
private:
// The current count.
volatile long _current;
// The event that is set when the counter reaches zero.
event _event;
// Disable copy constructor.
countdown_event(const countdown_event&);
// Disable assignment.
countdown_event const & operator=(countdown_event const&);
};
// Defines message types for the logger.
enum log_message_type
{
log_info = 0x1,
log_warning = 0x2,
log_error = 0x4,
};
// An asynchronous logging agent that writes log messages to
// file and to the console.
class log_agent : public agent
{
// Holds a message string and its logging type.
struct log_message
{
wstring message;
log_message_type type;
};
public:
log_agent(const wstring& file_path, log_message_type file_messages, log_message_type console_messages)
: _file(file_path)
, _file_messages(file_messages)
, _console_messages(console_messages)
, _active(0)
{
if (_file.bad())
{
throw invalid_argument("Unable to open log file.");
}
}
// Writes the provided message to the log.
void log(const wstring& message, log_message_type type)
{
// Increment the active message count.
_active.add_count();
// Send the message to the network.
log_message msg = { message, type };
send(_log_buffer, msg);
}
void close()
{
// Signal that the agent is now closed.
_closed.set();
}
protected:
void run()
{
//
// Create the dataflow network.
//
// Writes a log message to file.
call<log_message> writer([this](log_message msg)
{
if ((msg.type & _file_messages) != 0)
{
// Write the message to the file.
write_to_stream(msg, _file);
}
if ((msg.type & _console_messages) != 0)
{
// Write the message to the console.
write_to_stream(msg, wcout);
}
// Decrement the active counter.
_active.signal();
});
// Connect _log_buffer to the internal network to begin data flow.
_log_buffer.link_target(&writer);
// Wait for the closed event to be signaled.
_closed.wait();
// Wait for all messages to be processed.
_active.wait();
// Close the log file and flush the console.
_file.close();
wcout.flush();
// Set the agent to the completed state.
done();
}
private:
// Writes a logging message to the specified output stream.
void write_to_stream(const log_message& msg, wostream& stream)
{
// Write the message to the stream.
wstringstream ss;
switch (msg.type)
{
case log_info:
ss << L"info: ";
break;
case log_warning:
ss << L"warning: ";
break;
case log_error:
ss << L"error: ";
}
ss << msg.message << endl;
stream << ss.str();
}
private:
// The file stream to write messages to.
wofstream _file;
// The log message types that are written to file.
log_message_type _file_messages;
// The log message types that are written to the console.
log_message_type _console_messages;
// The head of the network. Propagates logging messages
// to the rest of the network.
unbounded_buffer<log_message> _log_buffer;
// Counts the number of active messages in the network.
countdown_event _active;
// Signals that the agent has been closed.
event _closed;
};
int wmain()
{
// Union of all log message types.
log_message_type log_all = log_message_type(log_info | log_warning | log_error);
// Create a logging agent that writes all log messages to file and error
// messages to the console.
log_agent logger(L"log.txt", log_all, log_error);
// Start the agent.
logger.start();
// Log a few messages.
logger.log(L"===Logging started.===", log_info);
logger.log(L"This is a sample warning message.", log_warning);
logger.log(L"This is a sample error message.", log_error);
logger.log(L"===Logging finished.===", log_info);
// Close the logger and wait for the agent to finish.
logger.close();
agent::wait(&logger);
}
此範例會將下列輸出寫入主控台。
error: This is a sample error message.
此範例也會產生log.txt檔案,其中包含下列文字。
info: ===Logging started.===
warning: This is a sample warning message.
error: This is a sample error message.
info: ===Logging finished.===
編譯程式碼
複製範例程式代碼,並將其貼到 Visual Studio 專案中,或貼到名為 log-filter.cpp
的檔案中,然後在 Visual Studio 命令提示字元視窗中執行下列命令。
cl.exe /EHsc log-filter.cpp
[靠上]