Udostępnij za pośrednictwem


Najlepszych praktyk w bibliotece agentów asynchroniczne

Dokument ten opisano efektywnego wykorzystania asynchronicznego biblioteki agentów.Biblioteka agentów promuje aktor na model programowania i wiadomości w trakcie przekazywania dla gruboziarnistym przepływ danych oraz przetwarzanie potokowe zadań.

Aby uzyskać więcej informacji na temat agentów biblioteki, zobacz Biblioteka agentów asynchroniczne.

Sekcje

Ten dokument zawiera następujące sekcje:

  • Użyj agentów do stanu izolat

  • Ogranicz liczbę wiadomości w rurociągu danych za pomocą mechanizmu ograniczania przepustowości

  • Nie należy wykonywać szczegółowymi pracy planowanej danych

  • Nie przechodzi przez wartość ładunki dużych wiadomości

  • Użyj shared_ptr w danych sieci podczas użytkowania jest niezdefiniowana

Użyj agentów do stanu izolat

Biblioteka agentów zawiera alternatyw udostępnione Państwu umożliwiając łączenie składników izolowanych poprzez asynchronicznego mechanizm przekazywania wiadomości.Asynchroniczne agenci są najbardziej efektywne, podczas ich izolowanie ich stan wewnętrzny z innych składników.Izolując Państwo wiele składników nie zazwyczaj działają na udostępnionych danych.Państwo izolacji można włączyć aplikacji do skali, ponieważ pozwala uniknąć rywalizacji w pamięci współużytkowanej.Państwo izolacji zmniejsza szanse warunki zakleszczenia i wyścigu, ponieważ składniki nie mają dostępu do udostępnionych danych synchronizacji.

Zazwyczaj izolowanie Państwo w agenta przez danych członków gospodarstwa private lub protected sekcji klasy agenta i za pomocą informacji o zmianach stanu wiadomości buforów.W poniższym przykładzie basic_agent klasy, która wynika z concurrency::agent.basic_agent Klasy używa dwóch buforów wiadomości do komunikacji z składników zewnętrznych.Jeden bufor przechowuje wiadomości przychodzących; inne buforu wiadomości przechowuje wiadomości wychodzących.

// basic-agent.cpp
// compile with: /c /EHsc
#include <agents.h>

// An agent that uses message buffers to isolate state and communicate
// with other components.
class basic_agent : public concurrency::agent
{
public:
   basic_agent(concurrency::unbounded_buffer<int>& input)
      : _input(input)
   {
   }

   // Retrives the message buffer that holds output messages.
   concurrency::unbounded_buffer<int>& output()
   {
      return _output;
   }

protected:
   void run()
   {
      while (true)
      {
         // Read from the input message buffer.
         int value = concurrency::receive(_input);

         // TODO: Do something with the value.
         int result = value;

         // Write the result to the output message buffer.
         concurrency::send(_output, result);
      }
      done();
   }

private:
   // Holds incoming messages.
   concurrency::unbounded_buffer<int>& _input;
   // Holds outgoing messages.
   concurrency::unbounded_buffer<int> _output;
};

Zakończenie przykłady dotyczące definiowania i używania agentów, zobacz Instruktaż: Tworzenie aplikacji opartych na agenta i Instruktaż: Tworzenie agenta przepływ danych.

Top

Ogranicz liczbę wiadomości w rurociągu danych za pomocą mechanizmu ograniczania przepustowości

Wiele typów bufor komunikatów, takich jak concurrency::unbounded_buffer, może zawierać nieograniczoną liczbę wiadomości.Gdy producent wiadomości szybciej niż konsument może przetwarzać wiadomości wysyła wiadomości do rurociągu danych, aplikacji można wprowadzić stan braku pamięci lub braku pamięci.Mechanizm ograniczania przepustowości, na przykład semafora można ograniczyć liczbę wiadomości, które są jednocześnie aktywnych w rurociągu danych.

Następujące podstawowe przykład ilustruje sposób ograniczyć liczbę wiadomości w rurociągu danych za pomocą semafora.Dane rurociąg zastosowań concurrency::wait funkcji do symulacji operacji, która co najmniej 100 milisekund.Ponieważ nadawca daje szybciej niż konsument może przetwarzać wiadomości te wiadomości, w tym przykładzie definiuje semaphore klasy, aby umożliwić aplikacji ograniczyć liczbę aktywnych wiadomości.

// message-throttling.cpp
// compile with: /EHsc
#include <windows.h> // for GetTickCount()
#include <atomic>
#include <agents.h>
#include <concrt.h>
#include <concurrent_queue.h>
#include <sstream>
#include <iostream>

using namespace concurrency;
using namespace std;

// A semaphore type that uses cooperative blocking semantics.
class semaphore
{
public:
   explicit semaphore(long long capacity)
      : _semaphore_count(capacity)
   {
   }

   // Acquires access to the semaphore.
   void acquire()
   {
      // The capacity of the semaphore is exceeded when the semaphore count 
      // falls below zero. When this happens, add the current context to the 
      // back of the wait queue and block the current context.
      if (--_semaphore_count < 0)
      {
         _waiting_contexts.push(Context::CurrentContext());
         Context::Block();
      }
   }

   // Releases access to the semaphore.
   void release()
   {
      // If the semaphore count is negative, unblock the first waiting context.
      if (++_semaphore_count <= 0)
      {
         // A call to acquire might have decremented the counter, but has not
         // yet finished adding the context to the queue. 
         // Create a spin loop that waits for the context to become available.
         Context* waiting = NULL;
         while (!_waiting_contexts.try_pop(waiting))
         {
            Context::Yield();
         }

         // Unblock the context.
         waiting->Unblock();
      }
   }

private:
   // The semaphore count.
   atomic<long long> _semaphore_count;

   // A concurrency-safe queue of contexts that must wait to 
   // acquire the semaphore.
   concurrent_queue<Context*> _waiting_contexts;
};

// A synchronization primitive that is signaled when its 
// count reaches zero.
class countdown_event
{
public:
   countdown_event(long long count)
       : _current(count) 
    {
       // Set the event if the initial count is zero.
       if (_current == 0LL)
          _event.set();
    }

    // Decrements the event counter.
    void signal() {
       if(--_current == 0LL) {
          _event.set();
       }
    }

    // Increments the event counter.
    void add_count() {
       if(++_current == 1LL) {
          _event.reset();
       }
    }

    // Blocks the current context until the event is set.
    void wait() {
       _event.wait();
    }

private:
   // The current count.
   atomic<long long> _current;
   // The event that is set when the counter reaches zero.
   event _event;

   // Disable copy constructor.
   countdown_event(const countdown_event&);
   // Disable assignment.
   countdown_event const & operator=(countdown_event const&);
};

int wmain()
{
   // The number of messages to send to the consumer.
   const long long MessageCount = 5;

   // The number of messages that can be active at the same time.
   const long long ActiveMessages = 2;

   // Used to compute the elapsed time.
   DWORD start_time;

   // Computes the elapsed time, rounded-down to the nearest
   // 100 milliseconds.
   auto elapsed = [&start_time] {
      return (GetTickCount() - start_time)/100*100;
   };

   // Limits the number of active messages.
   semaphore s(ActiveMessages);

   // Enables the consumer message buffer to coordinate completion
   // with the main application.
   countdown_event e(MessageCount);

   // Create a data pipeline that has three stages.

   // The first stage of the pipeline prints a message.
   transformer<int, int> print_message([&elapsed](int n) -> int {
      wstringstream ss;
      ss << elapsed() << L": received " << n << endl;
      wcout << ss.str();

      // Send the input to the next pipeline stage.
      return n;
   });

   // The second stage of the pipeline simulates a 
   // time-consuming operation.
   transformer<int, int> long_operation([](int n) -> int {
      wait(100);

      // Send the input to the next pipeline stage.
      return n;
   });

   // The third stage of the pipeline releases the semaphore
   // and signals to the main appliation that the message has
   // been processed.
   call<int> release_and_signal([&](int unused) {
      // Enable the sender to send the next message.
      s.release();

      // Signal that the message has been processed.
      e.signal();
   });

   // Connect the pipeline.
   print_message.link_target(&long_operation);
   long_operation.link_target(&release_and_signal);

   // Send several messages to the pipeline.
   start_time = GetTickCount();
   for(auto i = 0; i < MessageCount; ++i)
   {
      // Acquire access to the semaphore.
      s.acquire();

      // Print the message to the console.
      wstringstream ss;
      ss << elapsed() << L": sending " << i << L"..." << endl;
      wcout << ss.str();

      // Send the message.
      send(print_message, i);
   }

   // Wait for the consumer to process all messages.
   e.wait();
}
/* Sample output:
    0: sending 0...
    0: received 0
    0: sending 1...
    0: received 1
    100: sending 2...
    100: received 2
    200: sending 3...
    200: received 3
    300: sending 4...
    300: received 4
*/

semaphore Obiektu ogranicza rurociąg do przetwarzania w tym samym czasie co najwyżej dwa komunikaty.

Producent, w tym przykładzie wysyła komunikaty stosunkowo niewiele do konsumenta.Dlatego w tym przykładzie nie wykazują potencjalne warunku braku pamięci lub braku pamięci.Jednak mechanizm ten jest przydatne, gdy rurociąg danych zawiera stosunkowo wysoką liczbę wiadomości.

Aby uzyskać więcej informacji na temat tworzenia klasy semafora, który jest używany w tym przykładzie, zobacz Jak: klasa kontekstu służy do wprowadzenia w życie wspólnych semafora.

Top

Nie należy wykonywać szczegółowymi pracy planowanej danych

Biblioteka agentów jest najbardziej użyteczna, gdy pracę wykonywaną przez rurociąg danych jest dość gruboziarnistym.Na przykład jeden składnik aplikacji może odczytywać dane z pliku lub połączenia sieciowego i czasami wysyłania danych do innego składnika.Protokół, który używa biblioteki agentów do propagowania wiadomości powoduje, że mechanizm przekazywania wiadomości mają większe obciążenie niż konstrukcje zadania równolegle, dostarczanych przez Równolegle desenie biblioteki (PPL).Dlatego upewnij się, że pracę wykonywaną przez rurociąg danych jest wystarczająco długi zrównoważyć obciążenie to.

Chociaż rurociąg danych jest najbardziej efektywna, gdy jego zadania są gruboziarnistym, każdy etap potoku danych można użyć PPL konstrukcje grup zadań jak algorytmy równoległe wykonywanie bardziej szczegółowymi pracy.Przykładem sieci gruboziarnistym danych używane szczegółowymi równoległości na każdym etapie przetwarzania, zobacz Instruktaż: Tworzenie sieci przetwarzania obrazu.

Top

Nie przechodzi przez wartość ładunki dużych wiadomości

W niektórych przypadkach wykonawczym tworzy kopię każdej wiadomości przesyłane z buforu wiadomości do innego buforu wiadomości.Na przykład concurrency::overwrite_buffer klasy oferuje kopia każdej wiadomości odbierające każdemu swoich celów.Podczas takich jak używać funkcji przekazywania wiadomości wykonawczym również tworzy kopię danych wiadomości concurrency::send i concurrency::receive do pisania wiadomości i odczytywać wiadomości z buforu wiadomości.Chociaż ten mechanizm ułatwia wyeliminowanie ryzyka pisanie jednocześnie do udostępnionych danych, może prowadzić do pamięci słabej wydajności, gdy ładunek wiadomości jest stosunkowo duże.

Można użyć wskaźników lub odwołania do poprawy wydajności pamięci podczas przekazywania wiadomości, które mają duże ładunku.Poniższy przykład porównuje passing dużych wiadomości przez wartość przekazywanie wskaźników do tego samego typu wiadomości.W przykładzie zdefiniowano dwa typy agenta, producer i consumer, który działa na message_data obiektów.Przykład porównuje czas wymagany dla producenta wysłać kilka message_data obiekty do konsumenta do czasu wymaganego dla agenta producentów do wysyłania kilku wskaźników do message_data obiekty do konsumenta.

// message-payloads.cpp
// compile with: /EHsc
#include <Windows.h>
#include <agents.h>
#include <iostream>

using namespace concurrency;
using namespace std;

// Calls the provided work function and returns the number of milliseconds 
// that it takes to call that function.
template <class Function>
__int64 time_call(Function&& f)
{
   __int64 begin = GetTickCount();
   f();
   return GetTickCount() - begin;
}

// A message structure that contains large payload data.
struct message_data
{
   int id;
   string source;
   unsigned char binary_data[32768];
};

// A basic agent that produces values.
template <typename T>
class producer : public agent
{
public:
   explicit producer(ITarget<T>& target, unsigned int message_count)
      : _target(target)
      , _message_count(message_count)
   {
   }
protected:
   void run();

private:
   // The target buffer to write to.
   ITarget<T>& _target;
   // The number of messages to send.
   unsigned int _message_count;
};

// Template specialization for message_data.
template <>
void producer<message_data>::run()
{
   // Send a number of messages to the target buffer.
   while (_message_count > 0)
   {
      message_data message;
      message.id = _message_count;
      message.source = "Application";

      send(_target, message);
      --_message_count;
   }

   // Set the agent to the finished state.
   done();
}

// Template specialization for message_data*.
template <>
void producer<message_data*>::run()
{
   // Send a number of messages to the target buffer.
   while (_message_count > 0)
   {
      message_data* message = new message_data;
      message->id = _message_count;
      message->source = "Application";

      send(_target, message);
      --_message_count;
   }

   // Set the agent to the finished state.
   done();
}

// A basic agent that consumes values.
template <typename T>
class consumer : public agent
{
public:
   explicit consumer(ISource<T>& source, unsigned int message_count)
      : _source(source)
      , _message_count(message_count)
   {
   }

protected:
   void run();

private:
   // The source buffer to read from.
   ISource<T>& _source;
   // The number of messages to receive.
   unsigned int _message_count;
};

// Template specialization for message_data.
template <>
void consumer<message_data>::run()
{
   // Receive a number of messages from the source buffer.
   while (_message_count > 0)
   {
      message_data message = receive(_source);
      --_message_count;

      // TODO: Do something with the message. 
      // ...
   }

   // Set the agent to the finished state.
   done();
}

template <>
void consumer<message_data*>::run()
{
   // Receive a number of messages from the source buffer.
   while (_message_count > 0)
   {
      message_data* message = receive(_source);
      --_message_count;

      // TODO: Do something with the message.
      // ...

      // Release the memory for the message.
      delete message;     
   }

   // Set the agent to the finished state.
   done();
}

int wmain()
{
   // The number of values for the producer agent to send.
   const unsigned int count = 10000;

   __int64 elapsed;

   // Run the producer and consumer agents.
   // This version uses message_data as the message payload type.

   wcout << L"Using message_data..." << endl;
   elapsed = time_call([count] {
      // A message buffer that is shared by the agents.
      unbounded_buffer<message_data> buffer;

      // Create and start the producer and consumer agents.
      producer<message_data> prod(buffer, count);
      consumer<message_data> cons(buffer, count);
      prod.start();
      cons.start();

      // Wait for the agents to finish.
      agent::wait(&prod);
      agent::wait(&cons);
   });
   wcout << L"took " << elapsed << L"ms." << endl;

   // Run the producer and consumer agents a second time.
   // This version uses message_data* as the message payload type.

   wcout << L"Using message_data*..." << endl;
   elapsed = time_call([count] {
      // A message buffer that is shared by the agents.
      unbounded_buffer<message_data*> buffer;

      // Create and start the producer and consumer agents.
      producer<message_data*> prod(buffer, count);
      consumer<message_data*> cons(buffer, count);
      prod.start();
      cons.start();

      // Wait for the agents to finish.
      agent::wait(&prod);
      agent::wait(&cons);
   });
   wcout << L"took " << elapsed << L"ms." << endl;
}

Ten przykład generuje następujące przykładowe dane wyjściowe:

Using message_data...
took 437ms.
Using message_data*...
took 47ms.

Wersji, która używa wskaźników działa lepiej, ponieważ eliminuje wymóg Runtime utworzyć pełną kopię każdego message_data obiekt przechodzi od producenta do konsumenta.

Top

Użyj shared_ptr w danych sieci podczas użytkowania jest niezdefiniowana

Podczas wysyłania wiadomości przez wskaźnik przez rurociąg przekazywania wiadomości lub sieci, zazwyczaj przydzielić pamięci dla każdej wiadomości z przodu sieci i zwolnić pamięć, że na końcu sieci.Chociaż często dobrze działa na ten mechanizm, istnieją przypadki, w których jest trudne lub nie można go używać.Na przykład należy rozważyć przypadek, w którym sieci danych zawiera wiele węzłów w celu.W tym przypadku jest lokalizacja nie Wyczyść, aby zwolnić pamięć dla wiadomości.

Aby rozwiązać ten problem, można użyć mechanizmu, na przykład std::shared_ptr, umożliwiająca wskaźnik należące do wielu składników.Kiedy ostatni shared_ptr niszczony jest obiekt, który jest właścicielem zasobu, zasób jest również zwolniony.

Poniższy przykład demonstruje, jak używać shared_ptr udostępnianie wartości wskaźnika wśród wielu wiadomości buforów.Przykład łączy concurrency::overwrite_buffer obiektu do trzech concurrency::call obiektów.overwrite_buffer Klasy oferuje wiadomości do każdego z jej celów.Ponieważ wielu właścicieli danych na końcu sieci danych, w tym przykładzie shared_ptr umożliwiające call obiekt na własność komunikaty akcji.

// message-sharing.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>
#include <sstream>

using namespace concurrency;
using namespace std;

// A type that holds a resource.
class resource
{
public:
   resource(int id) : _id(id)
   { 
      wcout << L"Creating resource " << _id << L"..." << endl;
   }
   ~resource()
   { 
      wcout << L"Destroying resource " << _id << L"..." << endl;
   }

   // Retrieves the identifier for the resource.
   int id() const { return _id; }

   // TODO: Add additional members here.
private:
   // An identifier for the resource.
   int _id;

   // TODO: Add additional members here.
};

int wmain()
{   
   // A message buffer that sends messages to each of its targets.
   overwrite_buffer<shared_ptr<resource>> input;

   // Create three call objects that each receive resource objects
   // from the input message buffer.

   call<shared_ptr<resource>> receiver1(
      [](shared_ptr<resource> res) {
         wstringstream ss;
         ss << L"receiver1: received resource " << res->id() << endl;
         wcout << ss.str();
      },
      [](shared_ptr<resource> res) { 
         return res != nullptr; 
      }
   );

   call<shared_ptr<resource>> receiver2(
      [](shared_ptr<resource> res) {
         wstringstream ss;
         ss << L"receiver2: received resource " << res->id() << endl;
         wcout << ss.str();
      },
      [](shared_ptr<resource> res) { 
         return res != nullptr; 
      }
   );

   event e;
   call<shared_ptr<resource>> receiver3(
      [&e](shared_ptr<resource> res) {
         e.set();
      },
      [](shared_ptr<resource> res) { 
         return res == nullptr; 
      }
   );

   // Connect the call objects to the input message buffer.
   input.link_target(&receiver1);
   input.link_target(&receiver2);
   input.link_target(&receiver3);

   // Send a few messages through the network.
   send(input, make_shared<resource>(42));
   send(input, make_shared<resource>(64));
   send(input, shared_ptr<resource>(nullptr));

   // Wait for the receiver that accepts the nullptr value to 
   // receive its message.
   e.wait();
}

Ten przykład generuje następujące przykładowe dane wyjściowe:

Creating resource 42...
receiver1: received resource 42
Creating resource 64...
receiver2: received resource 42
receiver1: received resource 64
Destroying resource 42...
receiver2: received resource 64
Destroying resource 64...

Zobacz też

Zadania

Instruktaż: Tworzenie aplikacji opartych na agenta

Instruktaż: Tworzenie agenta przepływ danych

Instruktaż: Tworzenie sieci przetwarzania obrazu

Koncepcje

Biblioteka agentów asynchroniczne

Najlepszych praktyk w bibliotece desenie równoległe

Najważniejsze wskazówki ogólne w czasie wykonywania współbieżności

Inne zasoby

Współbieżność Runtime najlepszych praktyk.