meilleures pratiques pour la bibliothèque d’agents asynchrones
Ce document explique comment utiliser efficacement la bibliothèque d’agents asynchrones. La bibliothèque d’agents promeut un modèle de programmation basé sur un acteur et un passage de message in-process pour les tâches de flux de données grossières et de pipeline.
Pour plus d’informations sur la bibliothèque d’agents, consultez Bibliothèque d’agents asynchrones.
Sections
Ce document contient les sections suivantes :
Utiliser un mécanisme de limitation pour limiter le nombre de messages dans un pipeline de données
Ne pas effectuer de travail précis dans un pipeline de données
Ne pas passer de charges utiles de message volumineuses par valeur
Utiliser shared_ptr dans un réseau de données lorsque la propriété n’est pas définie
Utiliser des agents pour isoler l’état
La bibliothèque d’agents offre des alternatives à l’état partagé en vous permettant de connecter des composants isolés via un mécanisme asynchrone de transmission de messages. Les agents asynchrones sont les plus efficaces lorsqu’ils isolent leur état interne d’autres composants. En isolant l’état, plusieurs composants n’agissent généralement pas sur les données partagées. L’isolation de l’état peut permettre à votre application de mettre à l’échelle, car elle réduit la contention sur la mémoire partagée. L’isolation de l’état réduit également le risque d’interblocage et de conditions de concurrence, car les composants n’ont pas besoin de synchroniser l’accès aux données partagées.
En règle générale, vous isolez l’état dans un agent en maintenant les membres de données dans les sections de protected
la private
classe d’agent et en utilisant des mémoires tampons de message pour communiquer les modifications d’état. L’exemple suivant montre la basic_agent
classe, qui dérive de concurrency ::agent. La basic_agent
classe utilise deux mémoires tampons de message pour communiquer avec des composants externes. Une mémoire tampon de messages contient les messages entrants ; l’autre mémoire tampon de message contient les messages sortants.
// basic-agent.cpp
// compile with: /c /EHsc
#include <agents.h>
// An agent that uses message buffers to isolate state and communicate
// with other components.
class basic_agent : public concurrency::agent
{
public:
basic_agent(concurrency::unbounded_buffer<int>& input)
: _input(input)
{
}
// Retrieves the message buffer that holds output messages.
concurrency::unbounded_buffer<int>& output()
{
return _output;
}
protected:
void run()
{
while (true)
{
// Read from the input message buffer.
int value = concurrency::receive(_input);
// TODO: Do something with the value.
int result = value;
// Write the result to the output message buffer.
concurrency::send(_output, result);
}
done();
}
private:
// Holds incoming messages.
concurrency::unbounded_buffer<int>& _input;
// Holds outgoing messages.
concurrency::unbounded_buffer<int> _output;
};
Pour obtenir des exemples complets sur la définition et l’utilisation d’agents, consultez Procédure pas à pas : création d’une application basée sur un agent et procédure pas à pas : création d’un agent de flux de données.
[Haut]
Utiliser un mécanisme de limitation pour limiter le nombre de messages dans un pipeline de données
De nombreux types de mémoires tampons de messages, tels que concurrency ::unbounded_buffer, peuvent contenir un nombre illimité de messages. Lorsqu’un producteur de messages envoie des messages à un pipeline de données plus rapidement que le consommateur peut traiter ces messages, l’application peut entrer un état de mémoire insuffisante ou hors mémoire. Vous pouvez utiliser un mécanisme de limitation, par exemple un sémaphore, pour limiter le nombre de messages actifs simultanément dans un pipeline de données.
L’exemple de base suivant montre comment utiliser un sémaphore pour limiter le nombre de messages dans un pipeline de données. Le pipeline de données utilise la fonction concurrency ::wait pour simuler une opération qui prend au moins 100 millisecondes. Étant donné que l’expéditeur produit des messages plus rapidement que le consommateur peut traiter ces messages, cet exemple définit la semaphore
classe pour permettre à l’application de limiter le nombre de messages actifs.
// message-throttling.cpp
// compile with: /EHsc
#include <windows.h> // for GetTickCount()
#include <atomic>
#include <agents.h>
#include <concrt.h>
#include <concurrent_queue.h>
#include <sstream>
#include <iostream>
using namespace concurrency;
using namespace std;
// A semaphore type that uses cooperative blocking semantics.
class semaphore
{
public:
explicit semaphore(long long capacity)
: _semaphore_count(capacity)
{
}
// Acquires access to the semaphore.
void acquire()
{
// The capacity of the semaphore is exceeded when the semaphore count
// falls below zero. When this happens, add the current context to the
// back of the wait queue and block the current context.
if (--_semaphore_count < 0)
{
_waiting_contexts.push(Context::CurrentContext());
Context::Block();
}
}
// Releases access to the semaphore.
void release()
{
// If the semaphore count is negative, unblock the first waiting context.
if (++_semaphore_count <= 0)
{
// A call to acquire might have decremented the counter, but has not
// yet finished adding the context to the queue.
// Create a spin loop that waits for the context to become available.
Context* waiting = NULL;
while (!_waiting_contexts.try_pop(waiting))
{
(Context::Yield)(); // <windows.h> defines Yield as a macro. The parenthesis around Yield prevent the macro expansion so that Context::Yield() is called.
}
// Unblock the context.
waiting->Unblock();
}
}
private:
// The semaphore count.
atomic<long long> _semaphore_count;
// A concurrency-safe queue of contexts that must wait to
// acquire the semaphore.
concurrent_queue<Context*> _waiting_contexts;
};
// A synchronization primitive that is signaled when its
// count reaches zero.
class countdown_event
{
public:
countdown_event(long long count)
: _current(count)
{
// Set the event if the initial count is zero.
if (_current == 0LL)
_event.set();
}
// Decrements the event counter.
void signal() {
if(--_current == 0LL) {
_event.set();
}
}
// Increments the event counter.
void add_count() {
if(++_current == 1LL) {
_event.reset();
}
}
// Blocks the current context until the event is set.
void wait() {
_event.wait();
}
private:
// The current count.
atomic<long long> _current;
// The event that is set when the counter reaches zero.
event _event;
// Disable copy constructor.
countdown_event(const countdown_event&);
// Disable assignment.
countdown_event const & operator=(countdown_event const&);
};
int wmain()
{
// The number of messages to send to the consumer.
const long long MessageCount = 5;
// The number of messages that can be active at the same time.
const long long ActiveMessages = 2;
// Used to compute the elapsed time.
DWORD start_time;
// Computes the elapsed time, rounded-down to the nearest
// 100 milliseconds.
auto elapsed = [&start_time] {
return (GetTickCount() - start_time)/100*100;
};
// Limits the number of active messages.
semaphore s(ActiveMessages);
// Enables the consumer message buffer to coordinate completion
// with the main application.
countdown_event e(MessageCount);
// Create a data pipeline that has three stages.
// The first stage of the pipeline prints a message.
transformer<int, int> print_message([&elapsed](int n) -> int {
wstringstream ss;
ss << elapsed() << L": received " << n << endl;
wcout << ss.str();
// Send the input to the next pipeline stage.
return n;
});
// The second stage of the pipeline simulates a
// time-consuming operation.
transformer<int, int> long_operation([](int n) -> int {
wait(100);
// Send the input to the next pipeline stage.
return n;
});
// The third stage of the pipeline releases the semaphore
// and signals to the main appliation that the message has
// been processed.
call<int> release_and_signal([&](int unused) {
// Enable the sender to send the next message.
s.release();
// Signal that the message has been processed.
e.signal();
});
// Connect the pipeline.
print_message.link_target(&long_operation);
long_operation.link_target(&release_and_signal);
// Send several messages to the pipeline.
start_time = GetTickCount();
for(auto i = 0; i < MessageCount; ++i)
{
// Acquire access to the semaphore.
s.acquire();
// Print the message to the console.
wstringstream ss;
ss << elapsed() << L": sending " << i << L"..." << endl;
wcout << ss.str();
// Send the message.
send(print_message, i);
}
// Wait for the consumer to process all messages.
e.wait();
}
/* Sample output:
0: sending 0...
0: received 0
0: sending 1...
0: received 1
100: sending 2...
100: received 2
200: sending 3...
200: received 3
300: sending 4...
300: received 4
*/
L’objet semaphore
limite le pipeline à traiter au maximum deux messages en même temps.
Le producteur de cet exemple envoie relativement peu de messages au consommateur. Par conséquent, cet exemple ne montre pas de condition potentielle de faible mémoire ou d’absence de mémoire. Toutefois, ce mécanisme est utile lorsqu’un pipeline de données contient un nombre relativement élevé de messages.
Pour plus d’informations sur la création de la classe sémaphore utilisée dans cet exemple, consultez Guide pratique pour utiliser la classe de contexte pour implémenter un sémaphore coopératif.
[Haut]
Ne pas effectuer de travail précis dans un pipeline de données
La bibliothèque d’agents est la plus utile lorsque le travail effectué par un pipeline de données est assez grossière. Par exemple, un composant d’application peut lire des données à partir d’un fichier ou d’une connexion réseau et envoyer occasionnellement ces données à un autre composant. Le protocole utilisé par la bibliothèque d’agents pour propager les messages entraîne la surcharge du mécanisme de transmission de messages que les constructions parallèles de tâche fournies par la bibliothèque de modèles parallèles (PPL). Par conséquent, assurez-vous que le travail effectué par un pipeline de données est suffisamment long pour compenser cette surcharge.
Bien qu’un pipeline de données soit le plus efficace lorsque ses tâches sont grossières, chaque étape du pipeline de données peut utiliser des constructions PPL telles que des groupes de tâches et des algorithmes parallèles pour effectuer un travail plus précis. Pour obtenir un exemple de réseau de données grossière qui utilise un parallélisme affiné à chaque étape de traitement, consultez Procédure pas à pas : Création d’un réseau de traitement d’images.
[Haut]
Ne pas passer de charges utiles de message volumineuses par valeur
Dans certains cas, le runtime crée une copie de chaque message qu’il passe d’une mémoire tampon de message à une autre mémoire tampon de message. Par exemple, la classe concurrency ::overwrite_buffer offre une copie de chaque message qu’elle reçoit à chacune de ses cibles. Le runtime crée également une copie des données de message lorsque vous utilisez des fonctions de transmission de messages telles que concurrency ::send et concurrency ::receive pour écrire des messages dans et lire des messages à partir d’une mémoire tampon de message. Bien que ce mécanisme permet d’éliminer le risque d’écriture simultanée dans des données partagées, cela peut entraîner des performances de mémoire médiocres lorsque la charge utile du message est relativement importante.
Vous pouvez utiliser des pointeurs ou des références pour améliorer les performances de la mémoire lorsque vous transmettez des messages qui ont une charge utile importante. L’exemple suivant compare le passage de messages volumineux par valeur au passage de pointeurs vers le même type de message. L’exemple définit deux types d’agent et producer
consumer
qui agissent sur message_data
des objets. L’exemple compare le temps nécessaire au producteur pour envoyer plusieurs message_data
objets au consommateur à l’heure requise pour que l’agent de producteur envoie plusieurs pointeurs vers message_data
des objets au consommateur.
// message-payloads.cpp
// compile with: /EHsc
#include <Windows.h>
#include <agents.h>
#include <iostream>
using namespace concurrency;
using namespace std;
// Calls the provided work function and returns the number of milliseconds
// that it takes to call that function.
template <class Function>
__int64 time_call(Function&& f)
{
__int64 begin = GetTickCount();
f();
return GetTickCount() - begin;
}
// A message structure that contains large payload data.
struct message_data
{
int id;
string source;
unsigned char binary_data[32768];
};
// A basic agent that produces values.
template <typename T>
class producer : public agent
{
public:
explicit producer(ITarget<T>& target, unsigned int message_count)
: _target(target)
, _message_count(message_count)
{
}
protected:
void run();
private:
// The target buffer to write to.
ITarget<T>& _target;
// The number of messages to send.
unsigned int _message_count;
};
// Template specialization for message_data.
template <>
void producer<message_data>::run()
{
// Send a number of messages to the target buffer.
while (_message_count > 0)
{
message_data message;
message.id = _message_count;
message.source = "Application";
send(_target, message);
--_message_count;
}
// Set the agent to the finished state.
done();
}
// Template specialization for message_data*.
template <>
void producer<message_data*>::run()
{
// Send a number of messages to the target buffer.
while (_message_count > 0)
{
message_data* message = new message_data;
message->id = _message_count;
message->source = "Application";
send(_target, message);
--_message_count;
}
// Set the agent to the finished state.
done();
}
// A basic agent that consumes values.
template <typename T>
class consumer : public agent
{
public:
explicit consumer(ISource<T>& source, unsigned int message_count)
: _source(source)
, _message_count(message_count)
{
}
protected:
void run();
private:
// The source buffer to read from.
ISource<T>& _source;
// The number of messages to receive.
unsigned int _message_count;
};
// Template specialization for message_data.
template <>
void consumer<message_data>::run()
{
// Receive a number of messages from the source buffer.
while (_message_count > 0)
{
message_data message = receive(_source);
--_message_count;
// TODO: Do something with the message.
// ...
}
// Set the agent to the finished state.
done();
}
template <>
void consumer<message_data*>::run()
{
// Receive a number of messages from the source buffer.
while (_message_count > 0)
{
message_data* message = receive(_source);
--_message_count;
// TODO: Do something with the message.
// ...
// Release the memory for the message.
delete message;
}
// Set the agent to the finished state.
done();
}
int wmain()
{
// The number of values for the producer agent to send.
const unsigned int count = 10000;
__int64 elapsed;
// Run the producer and consumer agents.
// This version uses message_data as the message payload type.
wcout << L"Using message_data..." << endl;
elapsed = time_call([count] {
// A message buffer that is shared by the agents.
unbounded_buffer<message_data> buffer;
// Create and start the producer and consumer agents.
producer<message_data> prod(buffer, count);
consumer<message_data> cons(buffer, count);
prod.start();
cons.start();
// Wait for the agents to finish.
agent::wait(&prod);
agent::wait(&cons);
});
wcout << L"took " << elapsed << L"ms." << endl;
// Run the producer and consumer agents a second time.
// This version uses message_data* as the message payload type.
wcout << L"Using message_data*..." << endl;
elapsed = time_call([count] {
// A message buffer that is shared by the agents.
unbounded_buffer<message_data*> buffer;
// Create and start the producer and consumer agents.
producer<message_data*> prod(buffer, count);
consumer<message_data*> cons(buffer, count);
prod.start();
cons.start();
// Wait for the agents to finish.
agent::wait(&prod);
agent::wait(&cons);
});
wcout << L"took " << elapsed << L"ms." << endl;
}
Cet exemple produit l’exemple de sortie suivant :
Using message_data...
took 437ms.
Using message_data*...
took 47ms.
La version qui utilise des pointeurs fonctionne mieux, car elle élimine la nécessité pour le runtime de créer une copie complète de chaque message_data
objet qu’il transmet du producteur au consommateur.
[Haut]
Utiliser shared_ptr dans un réseau de données lorsque la propriété n’est pas définie
Lorsque vous envoyez des messages par le biais d’un pipeline ou d’un réseau de transmission de messages, vous allouez généralement la mémoire pour chaque message à l’avant du réseau et libérez cette mémoire à la fin du réseau. Bien que ce mécanisme fonctionne fréquemment bien, il existe des cas où il est difficile ou impossible de l’utiliser. Par exemple, considérez le cas dans lequel le réseau de données contient plusieurs nœuds finaux. Dans ce cas, il n’existe aucun emplacement clair pour libérer la mémoire des messages.
Pour résoudre ce problème, vous pouvez utiliser un mécanisme, par exemple, std ::shared_ptr, qui permet à un pointeur d’appartenir à plusieurs composants. Lorsque l’objet final shared_ptr
propriétaire d’une ressource est détruit, la ressource est également libérée.
L’exemple suivant montre comment utiliser shared_ptr
pour partager des valeurs de pointeur entre plusieurs mémoires tampons de messages. L’exemple connecte un objet concurrency ::overwrite_buffer à trois objets concurrency ::call . La overwrite_buffer
classe propose des messages à chacune de ses cibles. Étant donné qu’il existe plusieurs propriétaires des données à la fin du réseau de données, cet exemple utilise shared_ptr
pour permettre à chaque call
objet de partager la propriété des messages.
// message-sharing.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>
#include <sstream>
using namespace concurrency;
using namespace std;
// A type that holds a resource.
class resource
{
public:
resource(int id) : _id(id)
{
wcout << L"Creating resource " << _id << L"..." << endl;
}
~resource()
{
wcout << L"Destroying resource " << _id << L"..." << endl;
}
// Retrieves the identifier for the resource.
int id() const { return _id; }
// TODO: Add additional members here.
private:
// An identifier for the resource.
int _id;
// TODO: Add additional members here.
};
int wmain()
{
// A message buffer that sends messages to each of its targets.
overwrite_buffer<shared_ptr<resource>> input;
// Create three call objects that each receive resource objects
// from the input message buffer.
call<shared_ptr<resource>> receiver1(
[](shared_ptr<resource> res) {
wstringstream ss;
ss << L"receiver1: received resource " << res->id() << endl;
wcout << ss.str();
},
[](shared_ptr<resource> res) {
return res != nullptr;
}
);
call<shared_ptr<resource>> receiver2(
[](shared_ptr<resource> res) {
wstringstream ss;
ss << L"receiver2: received resource " << res->id() << endl;
wcout << ss.str();
},
[](shared_ptr<resource> res) {
return res != nullptr;
}
);
event e;
call<shared_ptr<resource>> receiver3(
[&e](shared_ptr<resource> res) {
e.set();
},
[](shared_ptr<resource> res) {
return res == nullptr;
}
);
// Connect the call objects to the input message buffer.
input.link_target(&receiver1);
input.link_target(&receiver2);
input.link_target(&receiver3);
// Send a few messages through the network.
send(input, make_shared<resource>(42));
send(input, make_shared<resource>(64));
send(input, shared_ptr<resource>(nullptr));
// Wait for the receiver that accepts the nullptr value to
// receive its message.
e.wait();
}
Cet exemple produit l’exemple de sortie suivant :
Creating resource 42...
receiver1: received resource 42
Creating resource 64...
receiver2: received resource 42
receiver1: received resource 64
Destroying resource 42...
receiver2: received resource 64
Destroying resource 64...
Voir aussi
Bonnes pratiques sur le runtime d’accès concurrentiel
Bibliothèque d’agents asynchrones
Procédure pas à pas : création d’une application basée sur un agent
Procédure pas à pas : création des agents de flux de données
Procédure pas à pas : création d’un réseau de traitement d’image
Bonnes pratiques de la Bibliothèque de modèles parallèles
Bonnes pratiques en général du runtime d’accès concurrentiel