Instruktaż: Tworzenie agenta przepływ danych
Niniejszy dokument przedstawia sposób tworzenia aplikacji opartych na agent, które są oparte na przepływ danych, zamiast kontroli przepływu.
Sterowanie przepływem odnosi się do kolejności wykonywania operacji w programie.Przepływ sterowania jest regulowana przy użyciu struktur sterujących, takich jak instrukcje warunkowe, pętle i tak dalej.Alternatywnie Przepływ odnosi się do modelu programowania, w którym obliczenia są dokonywane tylko wtedy, gdy wszystkie dane wymagane jest dostępny.Model programowania przepływ danych jest związana z koncepcja przekazywania wiadomości, w którym niezależne składników programu komunikują się ze sobą przez wysłanie wiadomości.
Agenci asynchronicznego obsługuje zarówno kontroli przepływu i modele programowania przepływ danych.Chociaż model przepływ sterowania jest w wielu przypadkach, model przepływ danych jest w innych, na przykład, gdy agent odbiera dane i wykonuje akcję, która jest oparta na ładunek danych.
Wymagania wstępne
Przed rozpoczęciem tego instruktażu, przeczytaj następujące dokumenty:
Sekcje
Ten instruktaż zawiera następujące sekcje:
Tworzenie podstawowego agenta przepływ sterowania
Tworzenie podstawowego agenta przepływ danych
Tworzenie agenta rejestrowania komunikatów
Tworzenie podstawowego agenta przepływ sterowania
Rozważmy następujący przykład definiujący control_flow_agent klasy.control_flow_agent Klasa działa na trzy buforów wiadomości: jeden wejścia buforu i dwa wyjścia buforów.run Metoda odczytuje buforu wiadomości źródłowych w pętli i używa instrukcji warunkowej które kierują strumień wykonywania programu.Agent zwiększa jeden licznik do zera, ujemne wartości i zwiększa licznik innego do zera, dodatnie wartości.Gdy agent otrzyma wskaźnikowych wartości zero, wysyła wartości liczników do buforów wyjściowych wiadomości.negatives i positives metody Włączanie aplikacji odczytać liczniki wartości ujemnych i dodatnich agenta.
// A basic agent that uses control-flow to regulate the order of program
// execution. This agent reads numbers from a message buffer and counts the
// number of positive and negative values.
class control_flow_agent : public agent
{
public:
explicit control_flow_agent(ISource<int>& source)
: _source(source)
{
}
// Retrieves the count of negative numbers that the agent received.
size_t negatives()
{
return receive(_negatives);
}
// Retrieves the count of positive numbers that the agent received.
size_t positives()
{
return receive(_positives);
}
protected:
void run()
{
// Counts the number of negative and positive values that
// the agent receives.
size_t negative_count = 0;
size_t positive_count = 0;
// Read from the source buffer until we receive
// the sentinel value of 0.
int value = 0;
while ((value = receive(_source)) != 0)
{
// Send negative values to the first target and
// non-negative values to the second target.
if (value < 0)
++negative_count;
else
++positive_count;
}
// Write the counts to the message buffers.
send(_negatives, negative_count);
send(_positives, positive_count);
// Set the agent to the completed state.
done();
}
private:
// Source message buffer to read from.
ISource<int>& _source;
// Holds the number of negative and positive numbers that the agent receives.
single_assignment<size_t> _negatives;
single_assignment<size_t> _positives;
};
Chociaż w tym przykładzie powoduje, że podstawowe zastosowanie kontroli przepływu agenta, pokazuje szeregowego charakter Programowanie oparte na kontroli przepływu.Każda wiadomość muszą być przetwarzane sekwencyjnie, mimo że wiele wiadomości mogą być dostępne w buforze komunikat wejściowy.Model przepływ danych umożliwia zarówno oddziałów instrukcji warunkowej do oceny równocześnie.Model przepływ danych umożliwia również tworzenie bardziej złożonych sieci obsługi wiadomości, które działają na danych staje się dostępne.
Top
Tworzenie podstawowego agenta przepływ danych
W tej sekcji przedstawiono sposób konwertowania control_flow_agent klasy do wykonania tego samego zadania za pomocą modelu przepływ danych.
Działa agent umożliwia tworzenie sieci buforów wiadomości, z których każdy pełni określonym celu.Niektórych bloków komunikatów użyć funkcji filtru, aby zaakceptować lub odrzucić wiadomość z jego ładunku.Funkcja filtrowania zapewnia, że bloku komunikatu odbiera tylko niektóre wartości.
Aby przekonwertować agenta przepływ sterowania agenta przepływ danych
Kopiowanie treści control_flow_agent klasy do innej klasy, na przykład, dataflow_agent.Alternatywnie, można zmienić nazwę control_flow_agent klasy.
Usuń treści pętli, który wywołuje receive z run metody.
void run() { // Counts the number of negative and positive values that // the agent receives. size_t negative_count = 0; size_t positive_count = 0; // Write the counts to the message buffers. send(_negatives, negative_count); send(_positives, positive_count); // Set the agent to the completed state. done(); }
W run metoda po zainicjowaniu zmiennych negative_count i positive_count, dodać countdown_event obiekt, który śledzi liczba aktywnych operacji.
// Tracks the count of active operations. countdown_event active; // An event that is set by the sentinel. event received_sentinel;
countdown_event Klasy znajduje się w dalszej części tego tematu.
Utwórz wiadomość buforu obiektów, które będą uczestniczyć w sieci przepływ danych.
// // Create the members of the dataflow network. // // Increments the active counter. transformer<int, int> increment_active( [&active](int value) -> int { active.add_count(); return value; }); // Increments the count of negative values. call<int> negatives( [&](int value) { ++negative_count; // Decrement the active counter. active.signal(); }, [](int value) -> bool { return value < 0; }); // Increments the count of positive values. call<int> positives( [&](int value) { ++positive_count; // Decrement the active counter. active.signal(); }, [](int value) -> bool { return value > 0; }); // Receives only the sentinel value of 0. call<int> sentinel( [&](int value) { // Decrement the active counter. active.signal(); // Set the sentinel event. received_sentinel.set(); }, [](int value) { return value == 0; }); // Connects the _source message buffer to the rest of the network. unbounded_buffer<int> connector;
Połącz z buforów wiadomości do sieci.
// // Connect the network. // // Connect the internal nodes of the network. connector.link_target(&negatives); connector.link_target(&positives); connector.link_target(&sentinel); increment_active.link_target(&connector); // Connect the _source buffer to the internal network to // begin data flow. _source.link_target(&increment_active);
Poczekaj, aż event i countdown event obiektów, należy ustawić.Te zdarzenia sygnału, że otrzymana wartość wskaźnikowych przez przedstawiciela i zakończeniu wszystkich operacji.
// Wait for the sentinel event and for all operations to finish. received_sentinel.wait(); active.wait();
Poniższy diagram przedstawia pełny przepływ danych sieci dataflow_agent klasy:
W poniższej tabeli opisano członków sieci.
Członkowskie |
Opis |
---|---|
increment_active |
A concurrency::transformer obiekt, który zwiększa licznik zdarzeń aktywnych i przekazuje wartość wejściowa do pozostałej części sieci. |
negatives, positives |
CONCURRENCY::Call obiektów, które zwiększa licznika liczby i zmniejsza licznik zdarzeń aktywnych.Obiekty każdego Użyj filtru, aby zaakceptować liczb ujemnych i dodatnich liczb. |
sentinel |
A concurrency::call obiektu, która akceptuje tylko wskaźnikowych wartości zero i zmniejszy licznik zdarzeń aktywnych. |
connector |
A concurrency::unbounded_buffer obiekt, który łączy buforu źródła wiadomości w sieci wewnętrznej. |
Ponieważ run w osobnym wątku wywoływana jest metoda, inne wątki mogą wysyłać wiadomości do sieci, zanim podłączona sieć._source Danych jest unbounded_buffer obiekt buforów jest wysyłana z aplikacji do agenta wszystkich danych wejściowych.Aby upewnić się, że sieci przetwarza wiadomości wszystkich wejściowe, agent najpierw łączy wewnętrznego węzły sieci i następnie łączy start w tej sieci, connector, do _source elementu danych.Gwarantuje to, że wiadomości nie uzyskać przetwarzane jak powstała sieci.
Ponieważ sieci, w tym przykładzie jest oparty na przepływ danych, zamiast na przepływ sterowania, sieć musi przekazać agent że zakończył przetwarzanie każdej wartości wejściowych i że węzeł wskaźnikowych otrzymała jej wartość.W tym przykładzie countdown_event obiekt, aby zasygnalizować, że wszystkich wartości wejściowych zostały przetworzone i concurrency::event obiekt, aby wskazać, że węzeł wskaźnikowych otrzymała jej wartość.countdown_event Klasy zastosowań event obiektu do sygnału, gdy wartość licznika osiągnie zero.Szef sieci przepływ danych zwiększa licznik przy każdym że odbiera wartość.Każdy terminal węzeł Dekrementuje sieci licznika po przetwarza wartości wejściowe.Po agent tworzy sieć przepływ danych, oczekuje na węźle wskaźnikowych ustawić event obiektu i countdown_event obiekt, aby zasygnalizować, że jego licznik osiągnął zero.
W poniższym przykładzie control_flow_agent, dataflow_agent, i countdown_event klasy.wmain Funkcja tworzy control_flow_agent i dataflow_agent obiektu i zastosowań send_values funkcji wysyłanie serii losowych wartości do agentów.
// dataflow-agent.cpp
// compile with: /EHsc
#include <windows.h>
#include <agents.h>
#include <iostream>
#include <random>
using namespace concurrency;
using namespace std;
// A basic agent that uses control-flow to regulate the order of program
// execution. This agent reads numbers from a message buffer and counts the
// number of positive and negative values.
class control_flow_agent : public agent
{
public:
explicit control_flow_agent(ISource<int>& source)
: _source(source)
{
}
// Retrieves the count of negative numbers that the agent received.
size_t negatives()
{
return receive(_negatives);
}
// Retrieves the count of positive numbers that the agent received.
size_t positives()
{
return receive(_positives);
}
protected:
void run()
{
// Counts the number of negative and positive values that
// the agent receives.
size_t negative_count = 0;
size_t positive_count = 0;
// Read from the source buffer until we receive
// the sentinel value of 0.
int value = 0;
while ((value = receive(_source)) != 0)
{
// Send negative values to the first target and
// non-negative values to the second target.
if (value < 0)
++negative_count;
else
++positive_count;
}
// Write the counts to the message buffers.
send(_negatives, negative_count);
send(_positives, positive_count);
// Set the agent to the completed state.
done();
}
private:
// Source message buffer to read from.
ISource<int>& _source;
// Holds the number of negative and positive numbers that the agent receives.
single_assignment<size_t> _negatives;
single_assignment<size_t> _positives;
};
// A synchronization primitive that is signaled when its
// count reaches zero.
class countdown_event
{
public:
countdown_event(unsigned int count = 0L)
: _current(static_cast<long>(count))
{
// Set the event if the initial count is zero.
if (_current == 0L)
_event.set();
}
// Decrements the event counter.
void signal() {
if(InterlockedDecrement(&_current) == 0L) {
_event.set();
}
}
// Increments the event counter.
void add_count() {
if(InterlockedIncrement(&_current) == 1L) {
_event.reset();
}
}
// Blocks the current context until the event is set.
void wait() {
_event.wait();
}
private:
// The current count.
volatile long _current;
// The event that is set when the counter reaches zero.
event _event;
// Disable copy constructor.
countdown_event(const countdown_event&);
// Disable assignment.
countdown_event const & operator=(countdown_event const&);
};
// A basic agent that resembles control_flow_agent, but uses uses dataflow to
// perform computations when data becomes available.
class dataflow_agent : public agent
{
public:
dataflow_agent(ISource<int>& source)
: _source(source)
{
}
// Retrieves the count of negative numbers that the agent received.
size_t negatives()
{
return receive(_negatives);
}
// Retrieves the count of positive numbers that the agent received.
size_t positives()
{
return receive(_positives);
}
protected:
void run()
{
// Counts the number of negative and positive values that
// the agent receives.
size_t negative_count = 0;
size_t positive_count = 0;
// Tracks the count of active operations.
countdown_event active;
// An event that is set by the sentinel.
event received_sentinel;
//
// Create the members of the dataflow network.
//
// Increments the active counter.
transformer<int, int> increment_active(
[&active](int value) -> int {
active.add_count();
return value;
});
// Increments the count of negative values.
call<int> negatives(
[&](int value) {
++negative_count;
// Decrement the active counter.
active.signal();
},
[](int value) -> bool {
return value < 0;
});
// Increments the count of positive values.
call<int> positives(
[&](int value) {
++positive_count;
// Decrement the active counter.
active.signal();
},
[](int value) -> bool {
return value > 0;
});
// Receives only the sentinel value of 0.
call<int> sentinel(
[&](int value) {
// Decrement the active counter.
active.signal();
// Set the sentinel event.
received_sentinel.set();
},
[](int value) {
return value == 0;
});
// Connects the _source message buffer to the rest of the network.
unbounded_buffer<int> connector;
//
// Connect the network.
//
// Connect the internal nodes of the network.
connector.link_target(&negatives);
connector.link_target(&positives);
connector.link_target(&sentinel);
increment_active.link_target(&connector);
// Connect the _source buffer to the internal network to
// begin data flow.
_source.link_target(&increment_active);
// Wait for the sentinel event and for all operations to finish.
received_sentinel.wait();
active.wait();
// Write the counts to the message buffers.
send(_negatives, negative_count);
send(_positives, positive_count);
// Set the agent to the completed state.
done();
}
private:
// Source message buffer to read from.
ISource<int>& _source;
// Holds the number of negative and positive numbers that the agent receives.
single_assignment<size_t> _negatives;
single_assignment<size_t> _positives;
};
// Sends a number of random values to the provided message buffer.
void send_values(ITarget<int>& source, int sentinel, size_t count)
{
// Send a series of random numbers to the source buffer.
mt19937 rnd(42);
for (size_t i = 0; i < count; ++i)
{
// Generate a random number that is not equal to the sentinel value.
int n;
while ((n = rnd()) == sentinel);
send(source, n);
}
// Send the sentinel value.
send(source, sentinel);
}
int wmain()
{
// Signals to the agent that there are no more values to process.
const int sentinel = 0;
// The number of samples to send to each agent.
const size_t count = 1000000;
// The source buffer that the application writes numbers to and
// the agents read numbers from.
unbounded_buffer<int> source;
//
// Use a control-flow agent to process a series of random numbers.
//
wcout << L"Control-flow agent:" << endl;
// Create and start the agent.
control_flow_agent cf_agent(source);
cf_agent.start();
// Send values to the agent.
send_values(source, sentinel, count);
// Wait for the agent to finish.
agent::wait(&cf_agent);
// Print the count of negative and positive numbers.
wcout << L"There are " << cf_agent.negatives()
<< L" negative numbers."<< endl;
wcout << L"There are " << cf_agent.positives()
<< L" positive numbers."<< endl;
//
// Perform the same task, but this time with a dataflow agent.
//
wcout << L"Dataflow agent:" << endl;
// Create and start the agent.
dataflow_agent df_agent(source);
df_agent.start();
// Send values to the agent.
send_values(source, sentinel, count);
// Wait for the agent to finish.
agent::wait(&df_agent);
// Print the count of negative and positive numbers.
wcout << L"There are " << df_agent.negatives()
<< L" negative numbers."<< endl;
wcout << L"There are " << df_agent.positives()
<< L" positive numbers."<< endl;
}
Ten przykład generuje następujące przykładowe dane wyjściowe:
Control-flow agent:
There are 500523 negative numbers.
There are 499477 positive numbers.
Dataflow agent:
There are 500523 negative numbers.
There are 499477 positive numbers.
Kompilowanie kodu
Skopiuj przykładowy kod i wklej go w projekcie programu Visual Studio lub wkleić go w pliku o nazwie Przepływ agent.cpp , a następnie uruchom następujące polecenie w oknie wiersza polecenia usługi programu Visual Studio.
cl.exe /EHsc dataflow-agent.cpp
Top
Tworzenie agenta rejestrowania komunikatów
W poniższym przykładzie log_agent klasy, która jest podobna do dataflow_agent klasy.log_agent Klasy implementuje agenta rejestrowania asynchronicznych zapisów dziennika wiadomości do pliku i do konsoli.log_agent Klasy umożliwia klasyfikowanie wiadomości jako informacyjny, ostrzeżenie lub błąd aplikacji.Umożliwia także aplikacji określić, czy każdej kategorii dziennika jest zapisywany plik lub konsoli.W tym przykładzie zapisuje wszystkie komunikaty dziennika do pliku i tylko komunikaty o błędach do konsoli.
// log-filter.cpp
// compile with: /EHsc
#include <windows.h>
#include <agents.h>
#include <sstream>
#include <fstream>
#include <iostream>
using namespace concurrency;
using namespace std;
// A synchronization primitive that is signaled when its
// count reaches zero.
class countdown_event
{
public:
countdown_event(unsigned int count = 0L)
: _current(static_cast<long>(count))
{
// Set the event if the initial count is zero.
if (_current == 0L)
{
_event.set();
}
}
// Decrements the event counter.
void signal()
{
if(InterlockedDecrement(&_current) == 0L)
{
_event.set();
}
}
// Increments the event counter.
void add_count()
{
if(InterlockedIncrement(&_current) == 1L)
{
_event.reset();
}
}
// Blocks the current context until the event is set.
void wait()
{
_event.wait();
}
private:
// The current count.
volatile long _current;
// The event that is set when the counter reaches zero.
event _event;
// Disable copy constructor.
countdown_event(const countdown_event&);
// Disable assignment.
countdown_event const & operator=(countdown_event const&);
};
// Defines message types for the logger.
enum log_message_type
{
log_info = 0x1,
log_warning = 0x2,
log_error = 0x4,
};
// An asynchronous logging agent that writes log messages to
// file and to the console.
class log_agent : public agent
{
// Holds a message string and its logging type.
struct log_message
{
wstring message;
log_message_type type;
};
public:
log_agent(const wstring& file_path, log_message_type file_messages, log_message_type console_messages)
: _file(file_path)
, _file_messages(file_messages)
, _console_messages(console_messages)
, _active(0)
{
if (_file.bad())
{
throw invalid_argument("Unable to open log file.");
}
}
// Writes the provided message to the log.
void log(const wstring& message, log_message_type type)
{
// Increment the active message count.
_active.add_count();
// Send the message to the network.
log_message msg = { message, type };
send(_log_buffer, msg);
}
void close()
{
// Signal that the agent is now closed.
_closed.set();
}
protected:
void run()
{
//
// Create the dataflow network.
//
// Writes a log message to file.
call<log_message> writer([this](log_message msg)
{
if ((msg.type & _file_messages) != 0)
{
// Write the message to the file.
write_to_stream(msg, _file);
}
if ((msg.type & _console_messages) != 0)
{
// Write the message to the console.
write_to_stream(msg, wcout);
}
// Decrement the active counter.
_active.signal();
});
// Connect _log_buffer to the internal network to begin data flow.
_log_buffer.link_target(&writer);
// Wait for the closed event to be signaled.
_closed.wait();
// Wait for all messages to be processed.
_active.wait();
// Close the log file and flush the console.
_file.close();
wcout.flush();
// Set the agent to the completed state.
done();
}
private:
// Writes a logging message to the specified output stream.
void write_to_stream(const log_message& msg, wostream& stream)
{
// Write the message to the stream.
wstringstream ss;
switch (msg.type)
{
case log_info:
ss << L"info: ";
break;
case log_warning:
ss << L"warning: ";
break;
case log_error:
ss << L"error: ";
}
ss << msg.message << endl;
stream << ss.str();
}
private:
// The file stream to write messages to.
wofstream _file;
// The log message types that are written to file.
log_message_type _file_messages;
// The log message types that are written to the console.
log_message_type _console_messages;
// The head of the network. Propagates logging messages
// to the rest of the network.
unbounded_buffer<log_message> _log_buffer;
// Counts the number of active messages in the network.
countdown_event _active;
// Signals that the agent has been closed.
event _closed;
};
int wmain()
{
// Union of all log message types.
log_message_type log_all = log_message_type(log_info | log_warning | log_error);
// Create a logging agent that writes all log messages to file and error
// messages to the console.
log_agent logger(L"log.txt", log_all, log_error);
// Start the agent.
logger.start();
// Log a few messages.
logger.log(L"===Logging started.===", log_info);
logger.log(L"This is a sample warning message.", log_warning);
logger.log(L"This is a sample error message.", log_error);
logger.log(L"===Logging finished.===", log_info);
// Close the logger and wait for the agent to finish.
logger.close();
agent::wait(&logger);
}
W tym przykładzie zapisuje następujące dane wyjściowe do konsoli.
error: This is a sample error message.
Ten przykład generuje również plik log.txt, który zawiera następujący tekst.
info: ===Logging started.===
warning: This is a sample warning message.
error: This is a sample error message.
info: ===Logging finished.===
Kompilowanie kodu
Skopiuj przykładowy kod i wklej go w projekcie programu Visual Studio lub wkleić go w pliku o nazwie dziennika filter.cpp , a następnie uruchom następujące polecenie w oknie wiersza polecenia usługi programu Visual Studio.
cl.exe /EHsc log-filter.cpp
Top