活動協調器範例專案
這個簡單的活動協調器範例示範如何在符合系統條件時,利用API在背景中重新定型模型。
範例專案概觀
讓我們考慮音樂編輯應用程式的情況。 此應用程式具有高優先順序的背景工作,可服務使用者要求,例如將內容發佈至雲端記憶體。 也有支援用戶互動的低優先順序背景工作,例如提供自動建議來改善編輯時組合。 最後,有一組延遲的工作不需要在任何特定時間發生,而不需要使用者的要求,這是我們在此範例中的焦點。 特別是,當用戶影響最小時,我們想要定期重新定型建議模型。 我們可以使用活動協調器 API 來達成此目的。
在此案例中,我們想要在使用者不存在時重新定型模型。 此案例中的重新定型工作流程也是 GPU 取用者,因此我們也想要在使用 GPU 的好時機執行。 我們可以使用活動協調器原則來指定這些需求。 活動協調器 API 會使用我們的原則來判斷何時符合需求,並針對何時啟動或停止執行我們的工作傳送通知。
在此情況下, GOOD 原則範本會符合我們大部分的需求,因為它會追蹤 CPU、記憶體、系統磁碟、電源和用戶閑置。 我們只需要明確設定 GPU 的條件。 請務必記住,即使我們的工作負載主要會利用 GPU,但我們的活動執行仍然原本就耗用 CPU、記憶體、磁碟和電源。 我們對這些資源的影響也可能在系統組態之間有很大的差異。 例如,較快的 GPU 可能會導致 CPU 花費更多時間將數據饋送 GPU,這可能會導致更多數據讀取或儲存到磁碟。 此磁碟的速度也可能以類似的方式影響CPU耗用量。 藉由設定我們影響的所有資源,我們可以確定不會不小心干擾用戶體驗或降低系統效能。 此外,工作本身已細分成在小塊中發生,因此我們可以適當地回應協調通知,以避免在所需的條件之外執行。
為了示範開發人員如何變更或降級原則,我們也新增了我們想要在 48 小時內完成重新定型的需求。 前24小時,我們的軟期限,我們試圖以我們的理想政策運行,最後24小時,我們降級為較小的政策。
範例項目程序代碼
下列程式代碼是音樂編輯範例應用程式。 它會利用活動協調器 API 來執行背景工作,如概觀中所述。
#include <chrono>
#include <mutex>
#include <condition_variable>
#include <Windows.h>
#include <ActivityCoordinator.h>
#include <wil/resource.h>
// To use ActivityCoordinator, we must link to the OneCoreUAP library.
#pragma comment(lib, "OneCoreUAP.lib")
using namespace std;
using namespace chrono;
using namespace wil;
// Declare RAII wrappers for the Activity Coordinator policy and subscription.
// These behave like traditional smart pointers and will call their associated
// API cleanup functions when they go out of scope.
typedef wil::unique_any<
ACTIVITY_COORDINATOR_POLICY,
decltype(&DestroyActivityCoordinatorPolicy),
DestroyActivityCoordinatorPolicy>
unique_policy;
typedef wil::unique_any<
ACTIVITY_COORDINATOR_SUBSCRIPTION,
decltype(&UnsubscribeActivityCoordinatorPolicy),
UnsubscribeActivityCoordinatorPolicy>
unique_subscription;
struct WORKER_CONTEXT {
mutex ContextLock;
unique_threadpool_work Worker;
bool ShouldRun;
bool IsRunning;
bool IsComplete;
std::condition_variable CompletionSignal;
};
_Requires_lock_held_(workerContext->ContextLock)
void
ResumeWorker(
_In_ WORKER_CONTEXT* workerContext
)
{
workerContext->ShouldRun = true;
if (!workerContext->IsRunning && !workerContext->IsComplete) {
// No active workers, so start a new one.
workerContext->IsRunning = true;
SubmitThreadpoolWork(workerContext->Worker.get());
}
}
void
DeferredWorkEventCallback(
_In_ ACTIVITY_COORDINATOR_NOTIFICATION notificationType,
_In_ void* callbackContext
)
{
WORKER_CONTEXT* workerContext = reinterpret_cast<WORKER_CONTEXT*>(callbackContext);
// Use this callback thread to dispatch notifications to a worker thread
// about whether or not it should process the next chunk of deferred work.
// Note: Do not use this thread to perform your activity's workload.
lock_guard<mutex> scopedLock(workerContext->ContextLock);
switch (notificationType) {
case ACTIVITY_COORDINATOR_NOTIFICATION_RUN:
// Allow deferred work to be processed.
ResumeWorker(workerContext);
break;
case ACTIVITY_COORDINATOR_NOTIFICATION_STOP:
// Stop processing deferred work.
workerContext->ShouldRun = false;
break;
default:
FAIL_FAST();
break;
}
}
bool
TrainNextModelChunk(
)
{
//
// Returns true if all work is completed, or false if there is more work.
//
return false;
}
void
DeferredModelTrainingWorker(
_Inout_ PTP_CALLBACK_INSTANCE callbackInstance,
_Inout_opt_ PVOID callbackContext,
_Inout_ PTP_WORK work
)
{
// Threadpool callback instance and work are not needed for this sample.
UNREFERENCED_PARAMETER(callbackInstance);
UNREFERENCED_PARAMETER(work);
WORKER_CONTEXT* workerContext = reinterpret_cast<WORKER_CONTEXT*>(callbackContext);
bool workComplete = false;
// Keep processing work until being told to stop or all work has been completed.
while (true) {
{
lock_guard<mutex> scopedLock(workerContext->ContextLock);
if (workComplete) {
workerContext->IsComplete = true;
}
if (!workerContext->ShouldRun || workerContext->IsComplete) {
workerContext->IsRunning = false;
break;
}
}
// TrainNextModelChunk returns true when there is no more work to do.
workComplete = TrainNextModelChunk();
}
workerContext->CompletionSignal.notify_all();
}
int
__cdecl
wmain(
)
{
WORKER_CONTEXT workerContext;
workerContext.ShouldRun = false;
workerContext.IsRunning = false;
workerContext.IsComplete = false;
// Create the worker that will be started by our subscription callback.
workerContext.Worker.reset(CreateThreadpoolWork(
DeferredModelTrainingWorker,
&workerContext,
nullptr));
RETURN_LAST_ERROR_IF_NULL(workerContext.Worker);
// Allocate a policy suited for tasks that are best run when unlikely
// to cause impact to the user or system performance.
unique_policy policy;
RETURN_IF_FAILED(CreateActivityCoordinatorPolicy(
ACTIVITY_COORDINATOR_POLICY_TEMPLATE_GOOD,
&policy));
// The model training in this sample consumes GPU.
// The GOOD policy template doesn't currently include the GPU resource. We
// therefore customize the policy to include good GPU conditions to minimize
// the impact of running our work.
RETURN_IF_FAILED(SetActivityCoordinatorPolicyResourceCondition(
policy.get(),
ACTIVITY_COORDINATOR_RESOURCE_GPU,
ACTIVITY_COORDINATOR_CONDITION_GOOD));
// Subscribe to the policy for coordination notifications.
unique_subscription subscription;
RETURN_IF_FAILED(SubscribeActivityCoordinatorPolicy(
policy.get(),
DeferredWorkEventCallback,
&workerContext,
&subscription));;
// Destroy the policy because we no longer need it.
policy.reset();
// We want our task to complete within 48h, so we allocate 24h under our
// ideal policy and before falling back to a downgraded policy.
bool workerCompleted;
{
unique_lock<mutex> scopedLock(workerContext.ContextLock);
workerCompleted = workerContext.CompletionSignal.wait_for(
scopedLock,
hours(24),
[&workerContext] { return workerContext.IsComplete; });
}
if (workerCompleted) {
// Since our work is complete, we should clean up our subscription by
// unsubscribing. This would normally be handled quietly by our RAII
// types, but we release them explicitly to demonstrate API flow for
// developers manually managing resources.
subscription.reset();
return S_OK;
}
// We passed our soft deadline, so downgrade the policy and wait the
// remaining 24h until our hard deadline has been reached. Since
// Subscriptions and policies are independent of each other, we need to
// create a new subscription with our downgraded policy to receive
// notifications based on its configuration.
//
// The downgraded policy uses medium conditions for all needed resources.
// This gives us the best chance to run while helping to prevent us from
// critically degrading the user experience, which we are more likely to do
// when falling back to manual execution.
RETURN_IF_FAILED(CreateActivityCoordinatorPolicy(
ACTIVITY_COORDINATOR_POLICY_TEMPLATE_MEDIUM,
&policy));
RETURN_IF_FAILED(SetActivityCoordinatorPolicyResourceCondition(
policy.get(),
ACTIVITY_COORDINATOR_RESOURCE_GPU,
ACTIVITY_COORDINATOR_CONDITION_MEDIUM));
subscription.reset();
RETURN_IF_FAILED(SubscribeActivityCoordinatorPolicy(
policy.get(),
DeferredWorkEventCallback,
&workerContext,
&subscription));
{
unique_lock<mutex> scopedLock(workerContext.ContextLock);
workerCompleted = workerContext.CompletionSignal.wait_for(
scopedLock,
hours(24),
[&workerContext] { return workerContext.IsComplete; });
}
// We passed our deadline, so unsubscribe and manually resume our task.
subscription.reset();
ResumeWorker(&workerContext);
// We destroyed our subscription, so we wait indefinitely for completion as
// there's nothing to pause execution of our task.
unique_lock<mutex> scopedLock(workerContext.ContextLock);
workerContext.CompletionSignal.wait(
scopedLock,
[&workerContext] { return workerContext.IsComplete; });
return S_OK;
}