연습: 사용자 지정 데이터 흐름 에이전트 만들기
이 문서에서는 제어 흐름 때신 데이터 흐름을 기반으로 하는 에이전트 기반 응용 프로그램을 만드는 방법을 보여 줍니다.
제어 흐름은 프로그램에서 작업의 실행 순서를 말합니다. 제어 흐름은 조건문, 루프 등의 제어 구조를 사용하여 조정됩니다. 그리고 데이터 흐름은 필요한 모든 데이터를 사용할 수 있는 경우에만 계산이 수행되는 프로그래밍 모델을 말합니다. 데이터 흐름 프로그래밍 모델은 프로그램의 개별 구성 요소가 메시지를 보내 다른 구성 요소와 통신하는 메시지 전달 개념과 관련되어 있습니다.
비동기 에이전트는 제어 흐름 및 데이터 흐름 프로그래밍 모델을 지원합니다. 대부분의 경우 제어 흐름 모델이 적합하지만 에이전트가 데이터를 받고 해당 데이터의 페이로드를 기반으로 하는 작업을 수행하는 경우와 같이 데이터 흐름 모델이 적합한 경우도 있습니다.
사전 요구 사항
이 연습을 시작하기 전에 다음 문서를 읽어 보십시오.
단원
이 연습에는 다음 단원이 포함되어 있습니다.
기본 제어 흐름 에이전트 만들기
기본 데이터 흐름 에이전트 만들기
메시지 로깅 에이전트 만들기
기본 제어 흐름 에이전트 만들기
control_flow_agent 클래스를 정의하는 다음 예제를 살펴봅니다. control_flow_agent 클래스는 세 개의 메시지 버퍼(입력 버퍼 하나와 출력 버퍼 두 개)에서 동작합니다. run 메서드는 루프에서 소스 메시지 버퍼를 읽고 조건문을 사용하여 프로그램 실행 흐름을 지시합니다. 에이전트는 0이 아닌 음수에 대한 하나의 카운터를 증가시키고 0이 아닌 양수에 대한 다른 카운터를 증가시킵니다. 에이전트가 센티널 값 0을 받은 후 카운터 값을 출력 메시지 버퍼로 보냅니다. negatives 및 positives 메서드를 사용하면 응용 프로그램이 에이전트에서 음수 및 양수 값 카운터를 읽을 수 있습니다.
// A basic agent that uses control-flow to regulate the order of program
// execution. This agent reads numbers from a message buffer and counts the
// number of positive and negative values.
class control_flow_agent : public agent
{
public:
explicit control_flow_agent(ISource<int>& source)
: _source(source)
{
}
// Retrieves the count of negative numbers that the agent received.
size_t negatives()
{
return receive(_negatives);
}
// Retrieves the count of positive numbers that the agent received.
size_t positives()
{
return receive(_positives);
}
protected:
void run()
{
// Counts the number of negative and positive values that
// the agent receives.
size_t negative_count = 0;
size_t positive_count = 0;
// Read from the source buffer until we receive
// the sentinel value of 0.
int value = 0;
while ((value = receive(_source)) != 0)
{
// Send negative values to the first target and
// non-negative values to the second target.
if (value < 0)
++negative_count;
else
++positive_count;
}
// Write the counts to the message buffers.
send(_negatives, negative_count);
send(_positives, positive_count);
// Set the agent to the completed state.
done();
}
private:
// Source message buffer to read from.
ISource<int>& _source;
// Holds the number of negative and positive numbers that the agent receives.
single_assignment<size_t> _negatives;
single_assignment<size_t> _positives;
};
이 예제에서는 에이전트에서 제어 흐름을 기본적으로 사용하지만 제어 흐름 기반 프로그래밍의 연속 특성을 보여 줍니다. 입력 메시지 버퍼에서 여러 메시지를 사용할 수 있는 경우에도 각 메시지는 순차적으로 처리되어야 합니다. 데이터 흐름 모델을 사용하면 조건문의 두 분기를 사용하여 동시에 평가할 수 있습니다. 그리고 데이터 흐름 모델을 사용하게 될 경우 데이터에 대해 동작하는 더 복잡한 메시징 네트워크를 만들 수도 있습니다.
[맨 위]
기본 데이터 흐름 에이전트 만들기
이 단원에서는 데이터 흐름 모델을 사용하여 동일한 작업을 수행하도록 control_flow_agent 클래스를 변환하는 방법을 보여 줍니다.
데이터 흐름 에이전트는 각각 특정 용도로 사용되는 메시지 버퍼 네트워크를 만들어 작동합니다. 특정 메시지 블록은 필터 함수를 사용하여 해당 페이로드를 기준으로 메시지를 수락하거나 거부합니다. 필터 함수를 사용하면 메시지 블록이 특정 값만 받도록 할 수 있습니다.
제어 흐름 에이전트를 데이터 흐름 에이전트로 변환하려면
control_flow_agent 클래스의 본문을 dataflow_agent와 같은 다른 클래스로 복사합니다. 또는 control_flow_agent 클래스의 이름을 바꿀 수 있습니다.
run 메서드에서 receive를 호출하는 루프의 본문을 제거합니다.
void run() { // Counts the number of negative and positive values that // the agent receives. size_t negative_count = 0; size_t positive_count = 0; // Write the counts to the message buffers. send(_negatives, negative_count); send(_positives, positive_count); // Set the agent to the completed state. done(); }
run 메서드에서 negative_count 및 positive_count 변수의 초기화 후 활성 작업 수를 추적하는 countdown_event 개체를 추가합니다.
// Tracks the count of active operations. countdown_event active; // An event that is set by the sentinel. event received_sentinel;
countdown_event 클래스는 이 항목의 뒷부분에 나옵니다.
데이터 흐름 네트워크에 참여할 메시지 버퍼 개체를 만듭니다.
// // Create the members of the dataflow network. // // Increments the active counter. transformer<int, int> increment_active( [&active](int value) -> int { active.add_count(); return value; }); // Increments the count of negative values. call<int> negatives( [&](int value) { ++negative_count; // Decrement the active counter. active.signal(); }, [](int value) -> bool { return value < 0; }); // Increments the count of positive values. call<int> positives( [&](int value) { ++positive_count; // Decrement the active counter. active.signal(); }, [](int value) -> bool { return value > 0; }); // Receives only the sentinel value of 0. call<int> sentinel( [&](int value) { // Decrement the active counter. active.signal(); // Set the sentinel event. received_sentinel.set(); }, [](int value) { return value == 0; }); // Connects the _source message buffer to the rest of the network. unbounded_buffer<int> connector;
메시지 버퍼를 연결하여 네트워크를 구성합니다.
// // Connect the network. // // Connect the internal nodes of the network. connector.link_target(&negatives); connector.link_target(&positives); connector.link_target(&sentinel); increment_active.link_target(&connector); // Connect the _source buffer to the internal network to // begin data flow. _source.link_target(&increment_active);
event 및 countdown event 개체가 설정될 때까지 기다립니다. 이 이벤트는 에이전트가 센티널 값을 받았고 모든 작업이 완료되었음을 알립니다.
// Wait for the sentinel event and for all operations to finish. received_sentinel.wait(); active.wait();
다음 다이어그램에서는 dataflow_agent 클래스의 전체 데이터 흐름 네트워크를 보여 줍니다.
다음 표에서는 네트워크 멤버에 대해 설명합니다.
멤버 |
설명 |
---|---|
increment_active |
활성 이벤트 카운터를 증가시키고 입력 값을 나머지 네트워크에 전달하는 concurrency::transformer 개체입니다. |
negatives, positives |
숫자 개수를 증가시키고 활성 이벤트 카운터를 감소시키는 concurrency::call 개체입니다. 이 개체는 각각 필터를 사용하여 음수 또는 양수를 수락합니다. |
sentinel |
센티널 수 0만 수락하고 활성 이벤트 카운터를 감소시키는 concurrency::call 개체입니다. |
connector |
소스 메시지 버퍼를 내부 네트워크에 연결하는 concurrency::unbounded_buffer 개체입니다. |
run 메서드는 별도의 스레드에서 호출되므로 네트워크가 완전히 연결되기 전에 다른 스레드에서 메시지를 네트워크로 보낼 수 있습니다. _source 데이터 멤버는 응용 프로그램에서 에이전트로 보내는 모든 입력을 버퍼링하는 unbounded_buffer 개체입니다. 네트워크에서 모든 입력 메시지를 처리하는지 확인하기 위해 에이전트는 먼저 네트워크의 내부 노드를 연결한 다음 해당 네트워크의 시작 부분인 connector를 _source 데이터 멤버에 연결합니다. 그러면 네트워크가 구성 중일 때 메시지가 처리되지 않습니다.
이 예제에서 네트워크는 제어 흐름이 아닌 데이터 흐름을 기반으로 하기 때문에 네트워크는 각 입력 값 처리를 마치고 센티널 노드에서 해당 값을 받은 에이전트와 통신해야 합니다. 이 예제에서는 countdown_event 개체를 사용하여 모든 입력 값이 처리되었음을 알리고 concurrency::event 개체를 사용하여 센티널 노드에서 해당 값을 받았음을 나타냅니다. countdown_event 클래스는 카운터 값이 0이 되면 event 개체를 사용하여 신호를 보낼 수 있습니다. 데이터 흐름 네트워크의 헤드는 값을 받을 때마다 카운터를 증가시킵니다. 네트워크의 모든 터미널 노드는 입력 값을 처리된 후 카운터를 감소시킵니다. 에이전트는 데이터 흐름 네트워크를 구성한 후 센티널 노드에서 event 개체를 설정하고 countdown_event 개체에서 카운터가 0에 도달했음을 알릴 때까지 기다립니다.
다음 예제에서는 control_flow_agent, dataflow_agent 및 countdown_event 클래스를 보여 줍니다. wmain 함수는 control_flow_agent 및 dataflow_agent 개체를 만들고 send_values 함수를 사용하여 일련의 임의 값을 에이전트로 보냅니다.
// dataflow-agent.cpp
// compile with: /EHsc
#include <windows.h>
#include <agents.h>
#include <iostream>
#include <random>
using namespace concurrency;
using namespace std;
// A basic agent that uses control-flow to regulate the order of program
// execution. This agent reads numbers from a message buffer and counts the
// number of positive and negative values.
class control_flow_agent : public agent
{
public:
explicit control_flow_agent(ISource<int>& source)
: _source(source)
{
}
// Retrieves the count of negative numbers that the agent received.
size_t negatives()
{
return receive(_negatives);
}
// Retrieves the count of positive numbers that the agent received.
size_t positives()
{
return receive(_positives);
}
protected:
void run()
{
// Counts the number of negative and positive values that
// the agent receives.
size_t negative_count = 0;
size_t positive_count = 0;
// Read from the source buffer until we receive
// the sentinel value of 0.
int value = 0;
while ((value = receive(_source)) != 0)
{
// Send negative values to the first target and
// non-negative values to the second target.
if (value < 0)
++negative_count;
else
++positive_count;
}
// Write the counts to the message buffers.
send(_negatives, negative_count);
send(_positives, positive_count);
// Set the agent to the completed state.
done();
}
private:
// Source message buffer to read from.
ISource<int>& _source;
// Holds the number of negative and positive numbers that the agent receives.
single_assignment<size_t> _negatives;
single_assignment<size_t> _positives;
};
// A synchronization primitive that is signaled when its
// count reaches zero.
class countdown_event
{
public:
countdown_event(unsigned int count = 0L)
: _current(static_cast<long>(count))
{
// Set the event if the initial count is zero.
if (_current == 0L)
_event.set();
}
// Decrements the event counter.
void signal() {
if(InterlockedDecrement(&_current) == 0L) {
_event.set();
}
}
// Increments the event counter.
void add_count() {
if(InterlockedIncrement(&_current) == 1L) {
_event.reset();
}
}
// Blocks the current context until the event is set.
void wait() {
_event.wait();
}
private:
// The current count.
volatile long _current;
// The event that is set when the counter reaches zero.
event _event;
// Disable copy constructor.
countdown_event(const countdown_event&);
// Disable assignment.
countdown_event const & operator=(countdown_event const&);
};
// A basic agent that resembles control_flow_agent, but uses uses dataflow to
// perform computations when data becomes available.
class dataflow_agent : public agent
{
public:
dataflow_agent(ISource<int>& source)
: _source(source)
{
}
// Retrieves the count of negative numbers that the agent received.
size_t negatives()
{
return receive(_negatives);
}
// Retrieves the count of positive numbers that the agent received.
size_t positives()
{
return receive(_positives);
}
protected:
void run()
{
// Counts the number of negative and positive values that
// the agent receives.
size_t negative_count = 0;
size_t positive_count = 0;
// Tracks the count of active operations.
countdown_event active;
// An event that is set by the sentinel.
event received_sentinel;
//
// Create the members of the dataflow network.
//
// Increments the active counter.
transformer<int, int> increment_active(
[&active](int value) -> int {
active.add_count();
return value;
});
// Increments the count of negative values.
call<int> negatives(
[&](int value) {
++negative_count;
// Decrement the active counter.
active.signal();
},
[](int value) -> bool {
return value < 0;
});
// Increments the count of positive values.
call<int> positives(
[&](int value) {
++positive_count;
// Decrement the active counter.
active.signal();
},
[](int value) -> bool {
return value > 0;
});
// Receives only the sentinel value of 0.
call<int> sentinel(
[&](int value) {
// Decrement the active counter.
active.signal();
// Set the sentinel event.
received_sentinel.set();
},
[](int value) {
return value == 0;
});
// Connects the _source message buffer to the rest of the network.
unbounded_buffer<int> connector;
//
// Connect the network.
//
// Connect the internal nodes of the network.
connector.link_target(&negatives);
connector.link_target(&positives);
connector.link_target(&sentinel);
increment_active.link_target(&connector);
// Connect the _source buffer to the internal network to
// begin data flow.
_source.link_target(&increment_active);
// Wait for the sentinel event and for all operations to finish.
received_sentinel.wait();
active.wait();
// Write the counts to the message buffers.
send(_negatives, negative_count);
send(_positives, positive_count);
// Set the agent to the completed state.
done();
}
private:
// Source message buffer to read from.
ISource<int>& _source;
// Holds the number of negative and positive numbers that the agent receives.
single_assignment<size_t> _negatives;
single_assignment<size_t> _positives;
};
// Sends a number of random values to the provided message buffer.
void send_values(ITarget<int>& source, int sentinel, size_t count)
{
// Send a series of random numbers to the source buffer.
mt19937 rnd(42);
for (size_t i = 0; i < count; ++i)
{
// Generate a random number that is not equal to the sentinel value.
int n;
while ((n = rnd()) == sentinel);
send(source, n);
}
// Send the sentinel value.
send(source, sentinel);
}
int wmain()
{
// Signals to the agent that there are no more values to process.
const int sentinel = 0;
// The number of samples to send to each agent.
const size_t count = 1000000;
// The source buffer that the application writes numbers to and
// the agents read numbers from.
unbounded_buffer<int> source;
//
// Use a control-flow agent to process a series of random numbers.
//
wcout << L"Control-flow agent:" << endl;
// Create and start the agent.
control_flow_agent cf_agent(source);
cf_agent.start();
// Send values to the agent.
send_values(source, sentinel, count);
// Wait for the agent to finish.
agent::wait(&cf_agent);
// Print the count of negative and positive numbers.
wcout << L"There are " << cf_agent.negatives()
<< L" negative numbers."<< endl;
wcout << L"There are " << cf_agent.positives()
<< L" positive numbers."<< endl;
//
// Perform the same task, but this time with a dataflow agent.
//
wcout << L"Dataflow agent:" << endl;
// Create and start the agent.
dataflow_agent df_agent(source);
df_agent.start();
// Send values to the agent.
send_values(source, sentinel, count);
// Wait for the agent to finish.
agent::wait(&df_agent);
// Print the count of negative and positive numbers.
wcout << L"There are " << df_agent.negatives()
<< L" negative numbers."<< endl;
wcout << L"There are " << df_agent.positives()
<< L" positive numbers."<< endl;
}
이 예제를 실행하면 다음과 같은 샘플 결과가 출력됩니다.
코드 컴파일
예제 코드를 복사하여 Visual Studio 프로젝트 또는 dataflow-agent.cpp 파일에 붙여 넣고 Visual Studio 명령 프롬프트 창에서 다음 명령을 실행합니다.
cl.exe /EHsc dataflow-agent.cpp
[맨 위]
메시지 로깅 에이전트 만들기
다음 예제에서는 dataflow_agent 클래스와 유사한 log_agent 클래스를 보여 줍니다. log_agent 클래스는 로그 메시지를 파일 및 콘솔에 쓰는 비동기 로깅 에이전트를 구현합니다. log_agent 클래스를 사용하면 응용 프로그램에서 메시지를 정보, 경고 또는 오류로 분류하도록 할 수 있습니다. 그리고 이 클래스를 사용하면 응용 프로그램에서 각 로그 범주를 파일에 쓸지 콘솔에 쓸지 파일과 콘솔 둘 다에 쓸지 지정하도록 할 수 있습니다. 이 예제에서는 모든 로그 파일을 파일에 쓰고 오류 메시지만 콘솔에 씁니다.
// log-filter.cpp
// compile with: /EHsc
#include <windows.h>
#include <agents.h>
#include <sstream>
#include <fstream>
#include <iostream>
using namespace concurrency;
using namespace std;
// A synchronization primitive that is signaled when its
// count reaches zero.
class countdown_event
{
public:
countdown_event(unsigned int count = 0L)
: _current(static_cast<long>(count))
{
// Set the event if the initial count is zero.
if (_current == 0L)
{
_event.set();
}
}
// Decrements the event counter.
void signal()
{
if(InterlockedDecrement(&_current) == 0L)
{
_event.set();
}
}
// Increments the event counter.
void add_count()
{
if(InterlockedIncrement(&_current) == 1L)
{
_event.reset();
}
}
// Blocks the current context until the event is set.
void wait()
{
_event.wait();
}
private:
// The current count.
volatile long _current;
// The event that is set when the counter reaches zero.
event _event;
// Disable copy constructor.
countdown_event(const countdown_event&);
// Disable assignment.
countdown_event const & operator=(countdown_event const&);
};
// Defines message types for the logger.
enum log_message_type
{
log_info = 0x1,
log_warning = 0x2,
log_error = 0x4,
};
// An asynchronous logging agent that writes log messages to
// file and to the console.
class log_agent : public agent
{
// Holds a message string and its logging type.
struct log_message
{
wstring message;
log_message_type type;
};
public:
log_agent(const wstring& file_path, log_message_type file_messages, log_message_type console_messages)
: _file(file_path)
, _file_messages(file_messages)
, _console_messages(console_messages)
, _active(0)
{
if (_file.bad())
{
throw invalid_argument("Unable to open log file.");
}
}
// Writes the provided message to the log.
void log(const wstring& message, log_message_type type)
{
// Increment the active message count.
_active.add_count();
// Send the message to the network.
log_message msg = { message, type };
send(_log_buffer, msg);
}
void close()
{
// Signal that the agent is now closed.
_closed.set();
}
protected:
void run()
{
//
// Create the dataflow network.
//
// Writes a log message to file.
call<log_message> writer([this](log_message msg)
{
if ((msg.type & _file_messages) != 0)
{
// Write the message to the file.
write_to_stream(msg, _file);
}
if ((msg.type & _console_messages) != 0)
{
// Write the message to the console.
write_to_stream(msg, wcout);
}
// Decrement the active counter.
_active.signal();
});
// Connect _log_buffer to the internal network to begin data flow.
_log_buffer.link_target(&writer);
// Wait for the closed event to be signaled.
_closed.wait();
// Wait for all messages to be processed.
_active.wait();
// Close the log file and flush the console.
_file.close();
wcout.flush();
// Set the agent to the completed state.
done();
}
private:
// Writes a logging message to the specified output stream.
void write_to_stream(const log_message& msg, wostream& stream)
{
// Write the message to the stream.
wstringstream ss;
switch (msg.type)
{
case log_info:
ss << L"info: ";
break;
case log_warning:
ss << L"warning: ";
break;
case log_error:
ss << L"error: ";
}
ss << msg.message << endl;
stream << ss.str();
}
private:
// The file stream to write messages to.
wofstream _file;
// The log message types that are written to file.
log_message_type _file_messages;
// The log message types that are written to the console.
log_message_type _console_messages;
// The head of the network. Propagates logging messages
// to the rest of the network.
unbounded_buffer<log_message> _log_buffer;
// Counts the number of active messages in the network.
countdown_event _active;
// Signals that the agent has been closed.
event _closed;
};
int wmain()
{
// Union of all log message types.
log_message_type log_all = log_message_type(log_info | log_warning | log_error);
// Create a logging agent that writes all log messages to file and error
// messages to the console.
log_agent logger(L"log.txt", log_all, log_error);
// Start the agent.
logger.start();
// Log a few messages.
logger.log(L"===Logging started.===", log_info);
logger.log(L"This is a sample warning message.", log_warning);
logger.log(L"This is a sample error message.", log_error);
logger.log(L"===Logging finished.===", log_info);
// Close the logger and wait for the agent to finish.
logger.close();
agent::wait(&logger);
}
이 예제에서는 다음 출력을 콘솔에 씁니다.
이 예제에서는 다음 텍스트가 포함된 log.txt 파일도 생성합니다.
코드 컴파일
예제 코드를 복사하여 Visual Studio 프로젝트 또는 log-filter.cpp 파일에 붙여 넣고 Visual Studio 명령 프롬프트 창에서 다음 명령을 실행합니다.
cl.exe /EHsc log-filter.cpp
[맨 위]