演练:创建数据流代理
本文档演示如何创建基于数据流,而不是基于控制流的基于代理的应用程序。
控制流指的是程序中操作的执行顺序。 控制流是通过使用条件语句、循环等控制结构调节的。 此外,数据流指的是一种编程模型,在这种模型中,只有当所有需要的数据都可用时,才会进行计算。 数据流编程模型与消息传递这一概念相关,其中程序的独立组件通过发送消息相互通信。
异步代理同时支持控制流和数据流编程模型。 尽管控制流模型适用于许多情况,但数据流模型也适用于其他一些情况,例如,当代理接收数据并执行基于该数据的有效负载的操作时。
先决条件
在开始操作本演练之前,请阅读以下文档:
部分
本演练包含以下各节:
创建基本控制流代理
请看下面的示例,该示例定义了 control_flow_agent
类。 control_flow_agent
类对三个消息缓冲区(一个输入缓冲区和两个输出缓冲区)执行操作。 run
方法在一个循环中从源消息缓冲区读取,并使用条件语句来引导程序执行流。 代理针对非零负值递增一个计数器值,并为非零正值递增另一个计数器值。 代理收到为零的 Sentinel 值后,它会将计数器的值发送到输出消息缓冲区。 通过 negatives
和 positives
方法,应用程序可以从代理读取负值和正值的计数。
// A basic agent that uses control-flow to regulate the order of program
// execution. This agent reads numbers from a message buffer and counts the
// number of positive and negative values.
class control_flow_agent : public agent
{
public:
explicit control_flow_agent(ISource<int>& source)
: _source(source)
{
}
// Retrieves the count of negative numbers that the agent received.
size_t negatives()
{
return receive(_negatives);
}
// Retrieves the count of positive numbers that the agent received.
size_t positives()
{
return receive(_positives);
}
protected:
void run()
{
// Counts the number of negative and positive values that
// the agent receives.
size_t negative_count = 0;
size_t positive_count = 0;
// Read from the source buffer until we receive
// the sentinel value of 0.
int value = 0;
while ((value = receive(_source)) != 0)
{
// Send negative values to the first target and
// non-negative values to the second target.
if (value < 0)
++negative_count;
else
++positive_count;
}
// Write the counts to the message buffers.
send(_negatives, negative_count);
send(_positives, positive_count);
// Set the agent to the completed state.
done();
}
private:
// Source message buffer to read from.
ISource<int>& _source;
// Holds the number of negative and positive numbers that the agent receives.
single_assignment<size_t> _negatives;
single_assignment<size_t> _positives;
};
虽然在此示例中看到的是代理中的控制流的基本用法,但它展示了基于控制流的编程的连续性。 必须按顺序处理每个消息,即使在输入消息缓冲区中可能有多个消息。 数据流模型使条件语句的两个分支能同时求值。 数据流模型还使你能够创建更复杂的消息传送网络,在数据可用时对其执行操作。
[返回页首]
创建基本数据流代理
本部分介绍如何转换 control_flow_agent
类以使用数据流模型执行相同的任务。
数据流代理的工作方式是创建一个消息缓冲区网络,每个缓冲区都有特定用途。 某些消息块使用筛选功能,根据消息的有效负载接受或拒绝消息。 筛选器函数可确保消息块仅接收特定值。
将控制流代理转换为数据流代理
将
control_flow_agent
类的主体复制到另一个类,例如dataflow_agent
。 或者,可以重命名control_flow_agent
类。从
run
方法移除用来调用receive
的循环主体。
void run()
{
// Counts the number of negative and positive values that
// the agent receives.
size_t negative_count = 0;
size_t positive_count = 0;
// Write the counts to the message buffers.
send(_negatives, negative_count);
send(_positives, positive_count);
// Set the agent to the completed state.
done();
}
- 在
run
方法中,在变量negative_count
和positive_count
初始化后,添加countdown_event
对象,用于跟踪活动操作的计数。
// Tracks the count of active operations.
countdown_event active;
// An event that is set by the sentinel.
event received_sentinel;
本主题稍后会对 countdown_event
类进行介绍。
- 创建将参与数据流网络的消息缓冲区对象。
//
// Create the members of the dataflow network.
//
// Increments the active counter.
transformer<int, int> increment_active(
[&active](int value) -> int {
active.add_count();
return value;
});
// Increments the count of negative values.
call<int> negatives(
[&](int value) {
++negative_count;
// Decrement the active counter.
active.signal();
},
[](int value) -> bool {
return value < 0;
});
// Increments the count of positive values.
call<int> positives(
[&](int value) {
++positive_count;
// Decrement the active counter.
active.signal();
},
[](int value) -> bool {
return value > 0;
});
// Receives only the sentinel value of 0.
call<int> sentinel(
[&](int value) {
// Decrement the active counter.
active.signal();
// Set the sentinel event.
received_sentinel.set();
},
[](int value) -> bool {
return value == 0;
});
// Connects the _source message buffer to the rest of the network.
unbounded_buffer<int> connector;
- 连接消息缓冲区以构成网络。
//
// Connect the network.
//
// Connect the internal nodes of the network.
connector.link_target(&negatives);
connector.link_target(&positives);
connector.link_target(&sentinel);
increment_active.link_target(&connector);
// Connect the _source buffer to the internal network to
// begin data flow.
_source.link_target(&increment_active);
- 等待
event
和countdown event
对象设置完成。 这些事件表明代理已收到 Sentinel 值,并且所有操作都已完成。
// Wait for the sentinel event and for all operations to finish.
received_sentinel.wait();
active.wait();
下图显示了 dataflow_agent
类的完整数据流网络:
下表描述了网络的成员。
成员 | 说明 |
---|---|
increment_active |
一个 concurrency::transformer 对象,用于递增活动事件计数器值,并将输入值传递到网络的其余部分。 |
negatives , positives |
concurrency::call 对象,用于递增数字计数值并递减活动事件计数器值。 每个对象都使用筛选器来接受负数或正数。 |
sentinel |
一个 concurrency::call 对象,用于仅接受为零的 Sentinel 值并递减活动事件计数器值。 |
connector |
一个 concurrency::unbounded_buffer 对象,用于将源消息缓冲区连接到内部网络。 |
由于 run
方法是在一个单独的线程上调用的,因此在完全连接网络之前,其他线程可以将消息发送到网络。 _source
数据成员是一个 unbounded_buffer
对象,用于缓冲从应用程序发送到代理的所有输入。 为了确保网络能够处理所有输入消息,代理会首先链接网络的内部节点,然后将该网络的起点 connector
链接到 _source
数据成员。 这可以保证在形成网络的过程中不会处理消息。
由于此示例中的网络是基于数据流,而不是基于控制流的,网络必须向代理传达它已经完成了对每个输入值的处理,并且 Sentinel 节点也已接收到它的值。 此示例使用 countdown_event
对象来指示所有输入值均已经过处理,使用 concurrency::event 对象指示 Sentinel 节点已接收到它的值。 countdown_event
类使用 event
对象指示计数器值达到零。 每当数据流网络的头收到一个值时,都会递增计数器值。 在处理输入值后,网络的每个终端节点都会递减计数器值。 代理形成数据流网络后,它会等待 Sentinel 节点设置 event
对象,还会等待 countdown_event
对象指示其计数器值已达到零。
下面的示例展示了 control_flow_agent
、dataflow_agent
和 countdown_event
类。 wmain
函数创建了 control_flow_agent
和 dataflow_agent
对象,并使用 send_values
函数将一系列随机值发送到代理。
// dataflow-agent.cpp
// compile with: /EHsc
#include <windows.h>
#include <agents.h>
#include <iostream>
#include <random>
using namespace concurrency;
using namespace std;
// A basic agent that uses control-flow to regulate the order of program
// execution. This agent reads numbers from a message buffer and counts the
// number of positive and negative values.
class control_flow_agent : public agent
{
public:
explicit control_flow_agent(ISource<int>& source)
: _source(source)
{
}
// Retrieves the count of negative numbers that the agent received.
size_t negatives()
{
return receive(_negatives);
}
// Retrieves the count of positive numbers that the agent received.
size_t positives()
{
return receive(_positives);
}
protected:
void run()
{
// Counts the number of negative and positive values that
// the agent receives.
size_t negative_count = 0;
size_t positive_count = 0;
// Read from the source buffer until we receive
// the sentinel value of 0.
int value = 0;
while ((value = receive(_source)) != 0)
{
// Send negative values to the first target and
// non-negative values to the second target.
if (value < 0)
++negative_count;
else
++positive_count;
}
// Write the counts to the message buffers.
send(_negatives, negative_count);
send(_positives, positive_count);
// Set the agent to the completed state.
done();
}
private:
// Source message buffer to read from.
ISource<int>& _source;
// Holds the number of negative and positive numbers that the agent receives.
single_assignment<size_t> _negatives;
single_assignment<size_t> _positives;
};
// A synchronization primitive that is signaled when its
// count reaches zero.
class countdown_event
{
public:
countdown_event(unsigned int count = 0L)
: _current(static_cast<long>(count))
{
// Set the event if the initial count is zero.
if (_current == 0L)
_event.set();
}
// Decrements the event counter.
void signal() {
if(InterlockedDecrement(&_current) == 0L) {
_event.set();
}
}
// Increments the event counter.
void add_count() {
if(InterlockedIncrement(&_current) == 1L) {
_event.reset();
}
}
// Blocks the current context until the event is set.
void wait() {
_event.wait();
}
private:
// The current count.
volatile long _current;
// The event that is set when the counter reaches zero.
event _event;
// Disable copy constructor.
countdown_event(const countdown_event&);
// Disable assignment.
countdown_event const & operator=(countdown_event const&);
};
// A basic agent that resembles control_flow_agent, but uses uses dataflow to
// perform computations when data becomes available.
class dataflow_agent : public agent
{
public:
dataflow_agent(ISource<int>& source)
: _source(source)
{
}
// Retrieves the count of negative numbers that the agent received.
size_t negatives()
{
return receive(_negatives);
}
// Retrieves the count of positive numbers that the agent received.
size_t positives()
{
return receive(_positives);
}
protected:
void run()
{
// Counts the number of negative and positive values that
// the agent receives.
size_t negative_count = 0;
size_t positive_count = 0;
// Tracks the count of active operations.
countdown_event active;
// An event that is set by the sentinel.
event received_sentinel;
//
// Create the members of the dataflow network.
//
// Increments the active counter.
transformer<int, int> increment_active(
[&active](int value) -> int {
active.add_count();
return value;
});
// Increments the count of negative values.
call<int> negatives(
[&](int value) {
++negative_count;
// Decrement the active counter.
active.signal();
},
[](int value) -> bool {
return value < 0;
});
// Increments the count of positive values.
call<int> positives(
[&](int value) {
++positive_count;
// Decrement the active counter.
active.signal();
},
[](int value) -> bool {
return value > 0;
});
// Receives only the sentinel value of 0.
call<int> sentinel(
[&](int value) {
// Decrement the active counter.
active.signal();
// Set the sentinel event.
received_sentinel.set();
},
[](int value) -> bool {
return value == 0;
});
// Connects the _source message buffer to the rest of the network.
unbounded_buffer<int> connector;
//
// Connect the network.
//
// Connect the internal nodes of the network.
connector.link_target(&negatives);
connector.link_target(&positives);
connector.link_target(&sentinel);
increment_active.link_target(&connector);
// Connect the _source buffer to the internal network to
// begin data flow.
_source.link_target(&increment_active);
// Wait for the sentinel event and for all operations to finish.
received_sentinel.wait();
active.wait();
// Write the counts to the message buffers.
send(_negatives, negative_count);
send(_positives, positive_count);
// Set the agent to the completed state.
done();
}
private:
// Source message buffer to read from.
ISource<int>& _source;
// Holds the number of negative and positive numbers that the agent receives.
single_assignment<size_t> _negatives;
single_assignment<size_t> _positives;
};
// Sends a number of random values to the provided message buffer.
void send_values(ITarget<int>& source, int sentinel, size_t count)
{
// Send a series of random numbers to the source buffer.
mt19937 rnd(42);
for (size_t i = 0; i < count; ++i)
{
// Generate a random number that is not equal to the sentinel value.
int n;
while ((n = rnd()) == sentinel);
send(source, n);
}
// Send the sentinel value.
send(source, sentinel);
}
int wmain()
{
// Signals to the agent that there are no more values to process.
const int sentinel = 0;
// The number of samples to send to each agent.
const size_t count = 1000000;
// The source buffer that the application writes numbers to and
// the agents read numbers from.
unbounded_buffer<int> source;
//
// Use a control-flow agent to process a series of random numbers.
//
wcout << L"Control-flow agent:" << endl;
// Create and start the agent.
control_flow_agent cf_agent(source);
cf_agent.start();
// Send values to the agent.
send_values(source, sentinel, count);
// Wait for the agent to finish.
agent::wait(&cf_agent);
// Print the count of negative and positive numbers.
wcout << L"There are " << cf_agent.negatives()
<< L" negative numbers."<< endl;
wcout << L"There are " << cf_agent.positives()
<< L" positive numbers."<< endl;
//
// Perform the same task, but this time with a dataflow agent.
//
wcout << L"Dataflow agent:" << endl;
// Create and start the agent.
dataflow_agent df_agent(source);
df_agent.start();
// Send values to the agent.
send_values(source, sentinel, count);
// Wait for the agent to finish.
agent::wait(&df_agent);
// Print the count of negative and positive numbers.
wcout << L"There are " << df_agent.negatives()
<< L" negative numbers."<< endl;
wcout << L"There are " << df_agent.positives()
<< L" positive numbers."<< endl;
}
此示例产生以下示例输出:
Control-flow agent:
There are 500523 negative numbers.
There are 499477 positive numbers.
Dataflow agent:
There are 500523 negative numbers.
There are 499477 positive numbers.
编译代码
复制示例代码,并将它粘贴到 Visual Studio 项目中,或粘贴到名为 dataflow-agent.cpp
的文件中,再在 Visual Studio 命令提示符窗口中运行以下命令。
cl.exe /EHsc dataflow-agent.cpp
[返回页首]
创建消息记录代理
以下示例展示了 log_agent
类,它类似于 dataflow_agent
类。 log_agent
类实现异步记录代理,用于将日志消息写入文件和控制台。 log_agent
类使应用程序能够将消息分类为信息性、警告或错误消息。 它还使应用程序能够指定每个日志类别是写入文件、控制台还是同时写入这两者。 本示例将所有日志消息写入文件,并仅将错误消息写入控制台。
// log-filter.cpp
// compile with: /EHsc
#include <windows.h>
#include <agents.h>
#include <sstream>
#include <fstream>
#include <iostream>
using namespace concurrency;
using namespace std;
// A synchronization primitive that is signaled when its
// count reaches zero.
class countdown_event
{
public:
countdown_event(unsigned int count = 0L)
: _current(static_cast<long>(count))
{
// Set the event if the initial count is zero.
if (_current == 0L)
{
_event.set();
}
}
// Decrements the event counter.
void signal()
{
if(InterlockedDecrement(&_current) == 0L)
{
_event.set();
}
}
// Increments the event counter.
void add_count()
{
if(InterlockedIncrement(&_current) == 1L)
{
_event.reset();
}
}
// Blocks the current context until the event is set.
void wait()
{
_event.wait();
}
private:
// The current count.
volatile long _current;
// The event that is set when the counter reaches zero.
event _event;
// Disable copy constructor.
countdown_event(const countdown_event&);
// Disable assignment.
countdown_event const & operator=(countdown_event const&);
};
// Defines message types for the logger.
enum log_message_type
{
log_info = 0x1,
log_warning = 0x2,
log_error = 0x4,
};
// An asynchronous logging agent that writes log messages to
// file and to the console.
class log_agent : public agent
{
// Holds a message string and its logging type.
struct log_message
{
wstring message;
log_message_type type;
};
public:
log_agent(const wstring& file_path, log_message_type file_messages, log_message_type console_messages)
: _file(file_path)
, _file_messages(file_messages)
, _console_messages(console_messages)
, _active(0)
{
if (_file.bad())
{
throw invalid_argument("Unable to open log file.");
}
}
// Writes the provided message to the log.
void log(const wstring& message, log_message_type type)
{
// Increment the active message count.
_active.add_count();
// Send the message to the network.
log_message msg = { message, type };
send(_log_buffer, msg);
}
void close()
{
// Signal that the agent is now closed.
_closed.set();
}
protected:
void run()
{
//
// Create the dataflow network.
//
// Writes a log message to file.
call<log_message> writer([this](log_message msg)
{
if ((msg.type & _file_messages) != 0)
{
// Write the message to the file.
write_to_stream(msg, _file);
}
if ((msg.type & _console_messages) != 0)
{
// Write the message to the console.
write_to_stream(msg, wcout);
}
// Decrement the active counter.
_active.signal();
});
// Connect _log_buffer to the internal network to begin data flow.
_log_buffer.link_target(&writer);
// Wait for the closed event to be signaled.
_closed.wait();
// Wait for all messages to be processed.
_active.wait();
// Close the log file and flush the console.
_file.close();
wcout.flush();
// Set the agent to the completed state.
done();
}
private:
// Writes a logging message to the specified output stream.
void write_to_stream(const log_message& msg, wostream& stream)
{
// Write the message to the stream.
wstringstream ss;
switch (msg.type)
{
case log_info:
ss << L"info: ";
break;
case log_warning:
ss << L"warning: ";
break;
case log_error:
ss << L"error: ";
}
ss << msg.message << endl;
stream << ss.str();
}
private:
// The file stream to write messages to.
wofstream _file;
// The log message types that are written to file.
log_message_type _file_messages;
// The log message types that are written to the console.
log_message_type _console_messages;
// The head of the network. Propagates logging messages
// to the rest of the network.
unbounded_buffer<log_message> _log_buffer;
// Counts the number of active messages in the network.
countdown_event _active;
// Signals that the agent has been closed.
event _closed;
};
int wmain()
{
// Union of all log message types.
log_message_type log_all = log_message_type(log_info | log_warning | log_error);
// Create a logging agent that writes all log messages to file and error
// messages to the console.
log_agent logger(L"log.txt", log_all, log_error);
// Start the agent.
logger.start();
// Log a few messages.
logger.log(L"===Logging started.===", log_info);
logger.log(L"This is a sample warning message.", log_warning);
logger.log(L"This is a sample error message.", log_error);
logger.log(L"===Logging finished.===", log_info);
// Close the logger and wait for the agent to finish.
logger.close();
agent::wait(&logger);
}
本示例将以下输出写入控制台。
error: This is a sample error message.
此示例还生成 log.txt 文件,其中包含以下文本。
info: ===Logging started.===
warning: This is a sample warning message.
error: This is a sample error message.
info: ===Logging finished.===
编译代码
复制示例代码,并将它粘贴到 Visual Studio 项目中,或粘贴到名为 log-filter.cpp
的文件中,再在 Visual Studio 命令提示符窗口中运行以下命令。
cl.exe /EHsc log-filter.cpp
[返回页首]