팬아웃/팬인은 여러 함수를 동시에 실행한 다음 결과에 대해 일부 집계를 수행하는 패턴을 나타냅니다. 이 문서에서는 지속성 함수를 사용하여 팬인/팬아웃 시나리오를 구현하는 샘플에 대해 설명합니다. 샘플은 앱의 사이트 콘텐츠 전부 또는 일부를 Azure Storage에 백업하는 지속성 함수입니다.
참고 항목
Azure Functions용 Node.js 프로그래밍 모델 버전 4가 일반 공급됩니다. 새로운 v4 모델은 JavaScript 및 TypeScript 개발자에게 보다 유연하고 직관적인 환경을 제공하도록 설계되었습니다. 마이그레이션 가이드에서 v3과 v4의 차이점에 대해 자세히 알아봅니다.
다음 코드 조각에서 JavaScript(PM4)는 새로운 환경인 프로그래밍 모델 V4를 나타냅니다.
이 샘플에서 함수는 지정한 디렉터리 아래의 모든 파일을 Blob Storage에 재귀적으로 업로드합니다. 또한 업로드된 총 바이트 수를 계산합니다.
모든 작업을 처리하는 단일 함수를 작성할 수 있습니다. 실행 시의 주요 문제는 확장성입니다. 단일 함수 실행은 단일 가상 머신에서만 실행될 수 있으므로 처리량은 해당 단일 VM의 처리량으로 제한됩니다. 또 하나의 문제는 안정성입니다. 중간에 오류가 있거나 전체 프로세스에 5분 넘게 걸리는 경우 부분적으로 완료된 상태에서 백업이 실패할 수 있습니다. 그러면 다시 시작해야 합니다.
더 강력한 방법은 다음 두 가지 일반 함수를 작성하는 것입니다. 하나는 파일을 열거하고 큐에 파일 이름을 추가하며, 다른 하나는 파일을 큐에서 읽고 Blob Storage에 업로드합니다. 이 접근 방식은 처리량과 안정성 면에서 더 효율적이지만 큐를 프로비전하고 관리해야 합니다. 더 중요한 것은 업로드된 총 바이트 수의 보고와 같이 더 많은 작업을 수행하려는 경우 상태 관리 및 조정과 관련하여 상당한 복잡성이 도입된다는 것입니다.
지속성 함수 방법은 언급된 모든 이점을 매우 낮은 오버헤드로 제공합니다.
함수
이 문서에서는 샘플 앱의 다음 함수에 대해 설명합니다.
E2_BackupSiteContent: E2_GetFileList를 호출하여 백업할 파일 목록을 얻은 후 E2_CopyFileToBlob을 호출하여 각 파일을 백업하는 오케스트레이터 함수.
[FunctionName("E2_BackupSiteContent")]
public static async Task<long> Run(
[OrchestrationTrigger] IDurableOrchestrationContext backupContext)
{
string rootDirectory = backupContext.GetInput<string>()?.Trim();
if (string.IsNullOrEmpty(rootDirectory))
{
rootDirectory = Directory.GetParent(typeof(BackupSiteContent).Assembly.Location).FullName;
}
string[] files = await backupContext.CallActivityAsync<string[]>(
"E2_GetFileList",
rootDirectory);
var tasks = new Task<long>[files.Length];
for (int i = 0; i < files.Length; i++)
{
tasks[i] = backupContext.CallActivityAsync<long>(
"E2_CopyFileToBlob",
files[i]);
}
await Task.WhenAll(tasks);
long totalBytes = tasks.Sum(t => t.Result);
return totalBytes;
}
await Task.WhenAll(tasks); 줄에 유의하세요. E2_CopyFileToBlob 함수에 대한 모든 개별 호출은 대기하지 않았으며 병렬로 실행할 수 있습니다. 이 작업 배열을 Task.WhenAll에 전달하면 모든 복사 작업이 완료될 때까지 완료되지 않는 작업을 다시 가져옵니다. .NET의 TPL(작업 병렬 라이브러리)에 익숙하다면 이러한 작업은 새로운 것이 아닙니다. 차이점은 이러한 작업이 여러 가상 머신에서 동시에 실행될 수 있으며, Durable Functions 확장은 엔드투엔드 실행이 프로세스 재활용에 탄력적으로 수행되도록 보장한다는 것입니다.
Task.WhenAll에서 기다린 후에 모든 함수 호출이 완료되고 값을 다시 반환했습니다. E2_CopyFileToBlob을 호출할 때마다 업로드된 바이트 수가 반환되므로 총 바이트 수를 계산하는 것은 이러한 반환 값을 모두 추가하는 문제입니다.
const df = require("durable-functions");
module.exports = df.orchestrator(function* (context) {
const rootDirectory = context.df.getInput();
if (!rootDirectory) {
throw new Error("A directory path is required as an input.");
}
const files = yield context.df.callActivity("E2_GetFileList", rootDirectory);
// Backup Files and save Promises into array
const tasks = [];
for (const file of files) {
tasks.push(context.df.callActivity("E2_CopyFileToBlob", file));
}
// wait for all the Backup Files Activities to complete, sum total bytes
const results = yield context.df.Task.all(tasks);
const totalBytes = results.reduce((prev, curr) => prev + curr, 0);
// return results;
return totalBytes;
});
yield context.df.Task.all(tasks); 줄에 유의하세요. E2_CopyFileToBlob 함수에 대한 모든 개별 호출은 일시 중단되지 않았으며 병렬로 실행할 수 있습니다. 이 작업 배열을 context.df.Task.all에 전달하면 모든 복사 작업이 완료될 때까지 완료되지 않는 작업을 다시 가져옵니다. JavaScript의 Promise.all에 대해 잘 알고 있는 경우 낯설지 않을 것입니다. 차이점은 이러한 작업이 여러 가상 머신에서 동시에 실행될 수 있으며, Durable Functions 확장은 엔드투엔드 실행이 프로세스 재활용에 탄력적으로 수행되도록 보장한다는 것입니다.
참고 항목
작업은 개념상 JavaScript 프라미스와 비슷하지만 오케스트레이터 함수는 Promise.all 및 Promise.race 대신 context.df.Task.all 및 context.df.Task.any를 사용하여 작업 병렬 처리를 관리해야 합니다.
context.df.Task.all에서 일시 중단된 후에 모든 함수 호출이 완료되고 값을 다시 반환했습니다. E2_CopyFileToBlob을 호출할 때마다 업로드된 바이트 수가 반환되므로 총 바이트 수를 계산하는 것은 이러한 반환 값을 모두 추가하는 문제입니다.
다음은 오케스트레이터 함수를 구현하는 코드입니다.
const df = require("durable-functions");
const path = require("path");
const getFileListActivityName = "getFileList";
const copyFileToBlobActivityName = "copyFileToBlob";
df.app.orchestration("backupSiteContent", function* (context) {
const rootDir = context.df.getInput();
if (!rootDir) {
throw new Error("A directory path is required as an input.");
}
const rootDirAbs = path.resolve(rootDir);
const files = yield context.df.callActivity(getFileListActivityName, rootDirAbs);
// Backup Files and save Tasks into array
const tasks = [];
for (const file of files) {
const input = {
backupPath: path.relative(rootDirAbs, file).replace("\\", "/"),
filePath: file,
};
tasks.push(context.df.callActivity(copyFileToBlobActivityName, input));
}
// wait for all the Backup Files Activities to complete, sum total bytes
const results = yield context.df.Task.all(tasks);
const totalBytes = results ? results.reduce((prev, curr) => prev + curr, 0) : 0;
// return results;
return totalBytes;
});
yield context.df.Task.all(tasks); 줄에 유의하세요. copyFileToBlob 함수에 대한 모든 개별 호출은 일시 중단되지 않았으며 병렬로 실행할 수 있습니다. 이 작업 배열을 context.df.Task.all에 전달하면 모든 복사 작업이 완료될 때까지 완료되지 않는 작업을 다시 가져옵니다. JavaScript의 Promise.all에 대해 잘 알고 있는 경우 낯설지 않을 것입니다. 차이점은 이러한 작업이 여러 가상 머신에서 동시에 실행될 수 있으며, Durable Functions 확장은 엔드투엔드 실행이 프로세스 재활용에 탄력적으로 수행되도록 보장한다는 것입니다.
참고 항목
작업은 개념상 JavaScript 프라미스와 비슷하지만 오케스트레이터 함수는 Promise.all 및 Promise.race 대신 context.df.Task.all 및 context.df.Task.any를 사용하여 작업 병렬 처리를 관리해야 합니다.
context.df.Task.all에서 일시 중단된 후에 모든 함수 호출이 완료되고 값을 다시 반환했습니다. copyFileToBlob을 호출할 때마다 업로드된 바이트 수가 반환되므로 총 바이트 수를 계산하는 것은 이러한 반환 값을 모두 추가하는 문제입니다.
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
root_directory: str = context.get_input()
if not root_directory:
raise Exception("A directory path is required as input")
files = yield context.call_activity("E2_GetFileList", root_directory)
tasks = []
for file in files:
tasks.append(context.call_activity("E2_CopyFileToBlob", file))
results = yield context.task_all(tasks)
total_bytes = sum(results)
return total_bytes
main = df.Orchestrator.create(orchestrator_function)
yield context.task_all(tasks); 줄에 유의하세요. E2_CopyFileToBlob 함수에 대한 모든 개별 호출은 일시 중단되지 않았으며 병렬로 실행할 수 있습니다. 이 작업 배열을 context.task_all에 전달하면 모든 복사 작업이 완료될 때까지 완료되지 않는 작업을 다시 가져옵니다. Python의 asyncio.gather에 대해 잘 알고 있는 경우 낯설지 않을 것입니다. 차이점은 이러한 작업이 여러 가상 머신에서 동시에 실행될 수 있으며, Durable Functions 확장은 엔드투엔드 실행이 프로세스 재활용에 탄력적으로 수행되도록 보장한다는 것입니다.
참고 항목
작업은 개념적으로 Python awaitables와 유사하지만 오케스트레이터 함수는 yield뿐만 아니라 context.task_all 및 context.task_any API를 사용하여 작업 병렬화를 관리해야 합니다.
context.task_all에서 일시 중단된 후에 모든 함수 호출이 완료되고 값을 다시 반환했습니다. E2_CopyFileToBlob에 대한 각 호출은 업로드된 바이트 수를 반환하므로 모든 반환 값을 함께 추가하여 총 바이트 수를 계산할 수 있습니다.
도우미 작업 함수
도우미 작업 함수는 다른 샘플과 마찬가지로 activityTrigger 트리거 바인딩을 사용하는 일반 함수일 뿐입니다.
import os
from os.path import dirname
from typing import List
def main(rootDirectory: str) -> List[str]:
all_file_paths = []
# We walk the file system
for path, _, files in os.walk(rootDirectory):
# We copy the code for activities and orchestrators
if "E2_" in path:
# For each file, we add their full-path to the list
for name in files:
if name == "__init__.py" or name == "function.json":
file_path = os.path.join(path, name)
all_file_paths.append(file_path)
return all_file_paths
참고 항목
오케스트레이터 함수에 이 코드를 직접 배치할 수 없는 이유가 궁금할 수도 있습니다. 그렇게 할 수도 있지만, 로컬 파일 시스템 액세스를 포함하여 I/O 작업을 수행하지 않아야 하는 오케스트레이터 함수의 기본 규칙 중 하나가 손상될 수 있습니다. 자세한 내용은 오케스트레이터 함수 코드 제약 조건을 참조하세요.
import os
import pathlib
from azure.storage.blob import BlobServiceClient
from azure.core.exceptions import ResourceExistsError
connect_str = os.getenv('AzureWebJobsStorage')
def main(filePath: str) -> str:
# Create the BlobServiceClient object which will be used to create a container client
blob_service_client = BlobServiceClient.from_connection_string(connect_str)
# Create a unique name for the container
container_name = "backups"
# Create the container if it does not exist
try:
blob_service_client.create_container(container_name)
except ResourceExistsError:
pass
# Create a blob client using the local file name as the name for the blob
parent_dir, fname = pathlib.Path(filePath).parts[-2:] # Get last two path components
blob_name = parent_dir + "_" + fname
blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)
# Count bytes in file
byte_count = os.path.getsize(filePath)
# Upload the created file
with open(filePath, "rb") as data:
blob_client.upload_blob(data)
return byte_count
이 구현은 디스크에서 파일을 로드하고 "backups" 컨테이너에서 동일한 이름의 Blob에 콘텐츠를 비동기적으로 스트림합니다. 반환 값은 스토리지에 복사된 바이트 수이며 오케스트레이터 함수에서 집계 합계를 계산하는 데 사용됩니다.
참고 항목
이 예제는 I/O 작업을 activityTrigger 함수로 이동하는 완벽한 예제입니다. 작업을 여러 머신에 분산할 수 있을 뿐만 아니라 진행 상황에 대한 검사점 설정의 이점을 얻을 수도 있습니다. 어떤 이유로든 호스트 프로세스가 종료되면 이미 완료된 업로드를 알 수 있습니다.
샘플 실행
Windows에서 다음 HTTP POST 요청을 전송하여 오케스트레이션을 시작할 수 있습니다.
POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20
"D:\\home\\LogFiles"
또는 Linux 함수 앱(Python은 현재 Linux for App Service에서만 실행됨)에서 다음과 같이 오케스트레이션을 시작할 수 있습니다.
POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20
"/home/site/wwwroot"
참고 항목
호출하는 HttpStart 함수는 JSON 형식의 콘텐츠에서만 작동합니다. 이러한 이유로 Content-Type: application/json 헤더가 필요하며 디렉터리 경로는 JSON 문자열로 인코딩됩니다. 또한 HTTP 코드 조각에서는 모든 HTTP 트리거 함수 URL에서 기본 api/ 접두사를 제거하는 항목이 host.json 파일에 있다고 가정합니다. 샘플의 host.json 파일에서 이 구성에 대한 변경 내용을 찾을 수 있습니다.
이 HTTP 요청은 E2_BackupSiteContent 오케스트레이터를 트리거하고 D:\home\LogFiles 문자열을 매개 변수로 전달합니다. 응답에서는 이 백업 작업의 상태를 가져오는 링크를 제공합니다.
함수 앱에 있는 로그 파일의 수에 따라 이 작업을 완료하는 데 몇 분이 걸릴 수 있습니다. 이전 HTTP 202 응답의 Location 헤더에 있는 URL을 쿼리하여 최신 상태를 가져올 수 있습니다.
GET http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}
이 경우 함수는 계속 실행 중입니다. 오케스트레이터 상태와 마지막으로 업데이트된 시간에 저장된 입력을 확인할 수 있습니다. Location 헤더 값을 계속 사용하여 완료를 폴링할 수 있습니다. 상태가 "Completed"이면 다음과 비슷한 HTTP 응답 값이 표시됩니다.
HTTP/1.1 200 OK
Content-Length: 152
Content-Type: application/json; charset=utf-8
{"runtimeStatus":"Completed","input":"D:\\home\\LogFiles","output":452071,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:26Z"}
이제 오케스트레이션이 완료되었으며 완료하는 데 걸린 시간을 확인할 수 있습니다. output 필드의 값도 표시되며, 여기서는 약 450KB의 로그가 업로드되었음을 나타냅니다.
다음 단계
이 예제는 팬아웃/팬인 패턴의 구현 방법을 보여 줍니다. 다음 샘플은 지속성 타이머를 사용하여 모니터링 패턴을 구현하는 방법을 보여줍니다.