同時実行ランタイムでの例外処理
更新 : 2011 年 3 月
同時実行ランタイムは、C++ 例外処理を使用してさまざまなエラーを通知します。 そのエラーには、ランタイムの不適切な使用、リソースの取得の失敗などのランタイム エラー、タスク グループに提供した処理関数で生じるエラーなどがあります。 このトピックでは、タスク グループ、軽量タスク、および非同期エージェントによってスローされた例外をランタイムが処理するしくみと、アプリケーションで例外に応答する方法を説明します。
セクション
タスク グループと並列アルゴリズム
軽量タスク
非同期エージェント
一般的な例外
概要
タスク グループと並列アルゴリズム
ここでは、タスク グループによってスローされた例外をランタイムが処理するしくみについて説明します。 このセクションの内容は、Concurrency::parallel_for をはじめとする並列アルゴリズムにも当てはまります。これらのアルゴリズムはタスク グループに基づいて作成されているためです。
ヒント
例外が依存タスクに及ぼす影響を十分に理解しておいてください。 タスクまたは並列アルゴリズムによる例外処理を使用する場合のベスト プラクティスについては、「並列パターン ライブラリに関するベスト プラクティス」の「取り消し処理および例外処理がオブジェクトの破棄に及ぼす影響について」を参照してください。
タスク グループの詳細については、「タスクの並列化 (同時実行ランタイム)」を参照してください。 並列アルゴリズムの詳細については、「並列アルゴリズム」を参照してください。
処理関数によってスローされる例外
処理関数は、ランタイムに渡されるラムダ関数、関数オブジェクト、または関数ポインターです。 処理関数をタスク グループに渡すと、ランタイムがその処理関数を独立したコンテキストで実行します。
Concurrency::task_group オブジェクトまたは Concurrency::structured_task_group オブジェクトに渡す処理関数の本体で例外をスローすると、ランタイムはその例外を保存し、Concurrency::task_group::wait、Concurrency::structured_task_group::wait、Concurrency::task_group::run_and_wait、または Concurrency::structured_task_group::run_and_wait を呼び出すコンテキストにその例外をマーシャリングします。 また、ランタイムは、タスク グループ内のすべてのアクティブ タスク (子タスク グループ内のタスクも含む) を中止すると共に、開始されていないすべてのタスクを破棄します。
次の例は、例外をスローする処理関数の基本的な構造を示しています。 この例では、task_group オブジェクトを使用して 2 つの point オブジェクトの値を並列的に出力します。 print_point 処理関数は point オブジェクトの値をコンソールに出力します。 入力値が NULL の場合、処理関数は例外をスローします。 ランタイムはこの例外を保存し、task_group::wait を呼び出すコンテキストにその例外をマーシャリングします。
// eh-task-group.cpp
// compile with: /EHsc
#include <ppl.h>
#include <iostream>
#include <sstream>
using namespace Concurrency;
using namespace std;
// Defines a basic point with X and Y coordinates.
struct point
{
int X;
int Y;
};
// Prints the provided point object to the console.
void print_point(point* pt)
{
// Throw an exception if the value is NULL.
if (pt == NULL)
{
throw exception("point is NULL.");
}
// Otherwise, print the values of the point.
wstringstream ss;
ss << L"X = " << pt->X << L", Y = " << pt->Y << endl;
wcout << ss.str();
}
int wmain()
{
// Create a few point objects.
point pt = {15, 30};
point* pt1 = &pt;
point* pt2 = NULL;
// Use a task group to print the values of the points.
task_group tasks;
tasks.run([&] {
print_point(pt1);
});
tasks.run([&] {
print_point(pt2);
});
// Wait for the tasks to finish. If any task throws an exception,
// the runtime marshals it to the call to wait.
try
{
tasks.wait();
}
catch (const exception& e)
{
wcerr << L"Caught exception: " << e.what() << endl;
}
}
この例を実行すると、次の出力が生成されます。
X = 15, Y = 30
Caught exception: point is NULL.
タスク グループの例外処理を使用した詳細な例については、「方法: 例外処理を使用して並列ループを中断する」を参照してください。
ランタイムによってスローされる例外
処理関数に加えて、ランタイムの呼び出しから例外が発生することもあります。 ランタイムによってスローされる例外のほとんどは、プログラミング エラーの存在を示しています。 一般にこれらのエラーは回復不能であるため、アプリケーション コードではキャッチまたは処理できません。 ただし、ランタイムで定義されている例外の種類を把握しておけば、プログラミング エラーを診断するときに役立ちます。 一般的な例外とその発生条件については、「一般的な例外」セクションを参照してください。
ランタイムによってスローされる例外の例外処理機構は、処理関数によってスローされる例外の例外処理機構と同じです。 たとえば、Concurrency::receive 関数は、指定の時間内にメッセージを受信しなかった場合、operation_timed_out をスローします。 タスク グループに渡す処理関数で receive が例外をスローすると、ランタイムはその例外を保存して、task_group::wait、structured_task_group::wait、task_group::run_and_wait、または structured_task_group::run_and_wait を呼び出すコンテキストにその例外をマーシャリングします。
次の例では、Concurrency::parallel_invoke アルゴリズムを使用して、2 つのタスクを並列に実行します。 1 つ目のタスクは 5 秒間待機した後、メッセージをメッセージ バッファーに送信します。 2 つ目のタスクは receive 関数を使用して 3 秒間待機し、同じメッセージ バッファーからメッセージを受信します。 receive 関数は、時間内にメッセージを受信しなかった場合、operation_timed_out をスローします。
// eh-time-out.cpp
// compile with: /EHsc
#include <agents.h>
#include <ppl.h>
#include <iostream>
using namespace Concurrency;
using namespace std;
int wmain()
{
single_assignment<int> buffer;
int result;
try
{
// Run two tasks in parallel.
parallel_invoke(
// This task waits 5 seconds and then sends a message to
// the message buffer.
[&] {
wait(5000);
send(buffer, 42);
},
// This task waits 3 seconds to receive a message.
// The receive function throws operation_timed_out if it does
// not receive a message in the specified time period.
[&] {
result = receive(buffer, 3000);
}
);
// Print the result.
wcout << L"The result is " << result << endl;
}
catch (operation_timed_out&)
{
wcout << L"The operation timed out." << endl;
}
}
この例を実行すると、次の出力が生成されます。
The operation timed out.
アプリケーションの異常終了を防ぐために、コードでランタイムを呼び出す場合は例外が処理されるようにしてください。 また、サードパーティのライブラリなど、同時実行ランタイムを使用する外部コードを呼び出す場合にも、例外を処理する必要があります。
複数の例外
タスクまたは並列アルゴリズムが複数の例外を受け取った場合、ランタイムはそのいずれか 1 つだけを呼び出し元のコンテキストにマーシャリングします。 どの例外がマーシャリングされるかは、任意です。
次の例では、parallel_for アルゴリズムを使用して、数値をコンソールに出力します。 入力値が指定の最小値未満であるか、最大値を超えている場合、例外がスローされます。 この例では、複数の処理関数が例外をスローする可能性があります。
// eh-multiple.cpp
// compile with: /EHsc
#include <ppl.h>
#include <iostream>
#include <sstream>
using namespace Concurrency;
using namespace std;
int wmain()
{
const int min = 0;
const int max = 10;
// Print values in a parallel_for loop. Use a try-catch block to
// handle any exceptions that occur in the loop.
try
{
parallel_for(-5, 20, [min,max](int i)
{
// Throw an exeception if the input value is less than the
// minimum or greater than the maximum.
// Otherwise, print the value to the console.
if (i < min)
{
stringstream ss;
ss << i << ": the value is less than the minimum.";
throw exception(ss.str().c_str());
}
else if (i > max)
{
stringstream ss;
ss << i << ": the value is greater than than the maximum.";
throw exception(ss.str().c_str());
}
else
{
wstringstream ss;
ss << i << endl;
wcout << ss.str();
}
});
}
catch (exception& e)
{
// Print the error to the console.
wcerr << L"Caught exception: " << e.what() << endl;
}
}
この例のサンプル出力を次に示します。
8
2
9
3
10
4
5
6
7
Caught exception: -5: the value is less than the minimum.
キャンセル
すべての例外がエラーの存在を示すわけではありません。 たとえば、検索アルゴリズムは結果を検出したときに、例外処理を使用して、関連付けられているタスクを中止することがあります。 取り消しの機構の使用方法の詳細については、「PPL における取り消し処理」を参照してください。
[ページのトップへ]
軽量タスク
軽量タスクは、Concurrency::Scheduler オブジェクトから直接スケジュールするタスクです。 軽量タスクは、通常のタスクよりもオーバーヘッドが小さくなります。 ただし、ランタイムは軽量タスクによってスローされた例外をキャッチしません。 代わりに、ハンドルされない例外のハンドラーが例外をキャッチし、既定ではプロセスを終了します。 したがって、アプリケーションで適切なエラー処理機構を使用する必要があります。 軽量タスクの詳細については、「タスク スケジューラ (同時実行ランタイム)」を参照してください。
[ページのトップへ]
非同期エージェント
軽量タスクと同様に、ランタイムは非同期エージェントによってスローされた例外を管理しません。
Concurrency::agent から派生するクラスで例外を処理する方法の一例を次に示します。 この例では、points_agent クラスを定義しています。 points_agent::run メソッドはメッセージ バッファーから point オブジェクトを読み取り、それをコンソールに出力します。 run メソッドは、NULL ポインターを受け取った場合に例外をスローします。
run メソッドでは、すべての処理が try-catch ブロックに内包されています。 catch ブロックは、例外をメッセージ バッファーに格納します。 アプリケーションは、エージェントの終了後にこのバッファーから例外を読み取ることで、エージェントでのエラーの有無をチェックします。
// eh-agents.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>
using namespace Concurrency;
using namespace std;
// Defines a point with x and y coordinates.
struct point
{
int X;
int Y;
};
// Informs the agent to end processing.
point sentinel = {0,0};
// An agent that prints point objects to the console.
class point_agent : public agent
{
public:
explicit point_agent(unbounded_buffer<point*>& points)
: _points(points)
{
}
// Retrieves any exception that occured in the agent.
bool get_error(exception& e)
{
return try_receive(_error, e);
}
protected:
// Performs the work of the agent.
void run()
{
// Perform processing in a try block.
try
{
// Read from the buffer until we reach the sentinel value.
while (true)
{
// Read a value from the message buffer.
point* r = receive(_points);
// In this example, it is an error to receive a
// NULL point pointer. In this case, throw an exception.
if (r == NULL)
{
throw exception("point must not be NULL");
}
// Break from the loop if we receive the
// sentinel value.
else if (r == &sentinel)
{
break;
}
// Otherwise, do something with the point.
else
{
// Print the point to the console.
wcout << L"X: " << r->X << L" Y: " << r->Y << endl;
}
}
}
// Store the error in the message buffer.
catch (exception& e)
{
send(_error, e);
}
// Set the agent status to done.
done();
}
private:
// A message buffer that receives point objects.
unbounded_buffer<point*>& _points;
// A message buffer that stores error information.
single_assignment<exception> _error;
};
int wmain()
{
// Create a message buffer so that we can communicate with
// the agent.
unbounded_buffer<point*> buffer;
// Create and start a point_agent object.
point_agent a(buffer);
a.start();
// Send several points to the agent.
point r1 = {10, 20};
point r2 = {20, 30};
point r3 = {30, 40};
send(buffer, &r1);
send(buffer, &r2);
// To illustrate exception handling, send the NULL pointer to the agent.
send(buffer, reinterpret_cast<point*>(NULL));
send(buffer, &r3);
send(buffer, &sentinel);
// Wait for the agent to finish.
agent::wait(&a);
// Check whether the agent encountered an error.
exception e;
if (a.get_error(e))
{
cout << "error occured in agent: " << e.what() << endl;
}
// Print out agent status.
wcout << L"the status of the agent is: ";
switch (a.status())
{
case agent_created:
wcout << L"created";
break;
case agent_runnable:
wcout << L"runnable";
break;
case agent_started:
wcout << L"started";
break;
case agent_done:
wcout << L"done";
break;
case agent_canceled:
wcout << L"canceled";
break;
default:
wcout << L"unknown";
break;
}
wcout << endl;
}
この例を実行すると、次の出力が生成されます。
X: 10 Y: 20
X: 20 Y: 30
error occured in agent: point must not be NULL
the status of the agent is: done
try-catch ブロックは while ループの外側にあるため、エージェントは最初のエラーを検出した時点で処理を終了します。 try-catch ブロックが while ループの内側にある場合は、エージェントはエラーの発生後も処理を継続できます。
この例では例外がメッセージ バッファーに格納されるため、別のコンポーネントが実行中のエージェントのエラーを監視できます。 この例では、エラーを保存するのに Concurrency::single_assignment オブジェクトを使用します。 エージェントが複数の例外を処理する場合、single_assignment クラスは渡された最初のメッセージだけを保存します。 最後の例外だけを保存するには、Concurrency::overwrite_buffer クラスを使用します。 例外をすべて保存するには、Concurrency::unbounded_buffer クラスを使用します。 これらのメッセージ ブロックの詳細については、「非同期メッセージ ブロック」を参照してください。
非同期エージェントの詳細については、「非同期エージェント」を参照してください。
[ページのトップへ]
一般的な例外
次の表に、同時実行ランタイムの一般的な例外クラスと、例外がスローされる条件を示します。 operation_timed_out と unsupported_os を除くほとんどの種類の例外は、プログラミング エラーが存在することを示します。 一般にこれらのエラーは回復不能であるため、アプリケーション コードではキャッチまたは処理できません。 プログラミング エラーを診断する必要がある場合にのみ、回復不能なエラーをアプリケーション コードでキャッチまたは処理することをお勧めします。
例外クラス |
状態 |
---|---|
メッセージ ブロックに渡したポインターが無効です。 |
|
コンテキストがそれ自体のブロックを解除しようとしました。 |
|
ランタイムがコンテキストのブロックを解除しようとしましたが、そのコンテキストのブロックは既に解除されていました。 |
|
既定のスケジューラの設定が試みられましたが、既定のスケジューラは既に存在しています。 |
|
ロックが不適切に取得されました。 |
|
コンテキストが同じスケジューラに複数回アタッチされました。 |
|
ランタイムによって内部で管理されているコンテキストがそのスケジューラからデタッチされたか、スケジューラにアタッチされていません。 |
|
終了中のスケジューラの参照カウンターをコンテキストがインクリメントしました。さらに、そのコンテキストがスケジューラの内部にありません。 |
|
同じオブジェクトがメッセージ ブロックに複数回リンクされています。 |
|
未完了のタスクが複数回スケジュールされました。 |
|
ランタイムが無効な操作を実行しました。 |
|
有効化されていないオーバーサブスクリプションが無効化されました。 |
|
無効なポリシー キーが Concurrency::SchedulerPolicy オブジェクトに渡されました。 |
|
SchedulerPolicy オブジェクトの最大同時実行レベルが最小同時実行レベル未満に設定されています。 |
|
無効なポリシーの値が SchedulerPolicy オブジェクトに渡されました。 |
|
メッセージ ブロックが要求されたメッセージを見つけることができませんでした。 |
|
Concurrency::task_group::wait メソッドまたは Concurrency::structured_task_group::wait メソッドが呼び出される前にタスク グループ オブジェクトが破棄されました。 |
|
入れ子のスケジューラがそれ自体を親から適切にデタッチしませんでした。 |
|
指定された期間内に操作が完了しませんでした。 |
|
コンテキストがそのスケジューラからデタッチしようとしましたが、そのコンテキストはスケジューラにアタッチされていませんでした。 |
|
必要なリソース (オペレーティング システムから提供されたリソースなど) をランタイムが取得しませんでした。 |
|
現在のオペレーティング システムでランタイムがサポートされていません。 |
[ページのトップへ]
概要
タスクによって例外がスローされると、ランタイムはその例外を保持し、タスク グループの完了を待機するコンテキストにマーシャリングします。 軽量タスクやエージェントなどのコンポーネントの例外は、ランタイムによって自動的には管理されません。 そのため、独自の例外処理機構を実装する必要があります。
[ページのトップへ]
参照
概念
その他の技術情報
履歴の変更
日付 |
履歴 |
理由 |
---|---|---|
2011 年 3 月 |
例外が依存タスクに及ぼす影響に関する注意書きを追加。 |
情報の拡充 |