MLflow 및 Ray 통합
MLflow는 기계 학습 및 AI 워크로드를 관리하기 위한 오픈 소스 플랫폼입니다. Ray와 MLflow를 결합하면 워크로드를 Ray와 배포하고 MLflow를 사용하여 학습하는 동안 생성된 모델, 메트릭, 매개 변수 및 메타데이터를 추적할 수 있습니다.
이 문서에서는 MLflow를 다음 Ray 구성 요소와 통합하는 방법을 설명합니다.
Ray Core: Ray Tune 및 Ray Train에서 다루지 않는 범용 분산 애플리케이션
Ray Train: 분산 모델 학습
Ray Tune: 분산 하이퍼 매개 변수 튜닝
모델 서비스: 실시간 유추를 위한 모델 배포
Ray Core 및 MLflow 통합
Ray Core는 범용 분산 애플리케이션을 위한 기본 구성 요소를 제공합니다. 이를 통해 여러 노드에서 Python 함수 및 클래스의 크기를 조정할 수 있습니다.
이 섹션에서는 Ray Core 및 MLflow를 통합하는 다음 패턴에 대해 설명합니다.
- Ray 드라이버 프로세스에서 MLflow 모델 기록
- 자식 실행에서 MLflow 모델 기록
Ray 드라이버 프로세스에서 MLflow 기록
일반적으로 작업자 노드가 아닌 드라이버 프로세스에서 MLflow 모델을 기록하는 것이 가장 좋습니다. 이는 상태 저장 참조를 원격 작업자에게 전달하는 복잡성이 추가되기 때문입니다.
예를 들어 MLflow 추적 서버는 작업자 노드 내에서 MLflow Client
를 사용하여 시작되지 않으므로 다음 코드가 실패합니다.
import mlflow
@ray.remote
def example_logging_task(x):
# ...
# This method will fail
mlflow.log_metric("x", x)
return x
with mlflow.start_run() as run:
ray.get([example_logging_task.remote(x) for x in range(10)])
대신, 드라이버 노드에 메트릭을 반환합니다. 메트릭 및 메타데이터는 일반적으로 메모리 문제를 일으키지 않고 드라이버로 다시 전송할 수 있을 만큼 작습니다.
위에 표시된 예제를 가져와서 Ray 작업에서 반환된 메트릭을 기록하도록 업데이트합니다.
import mlflow
@ray.remote
def example_logging_task(x):
# ...
return x
with mlflow.start_run() as run:
results = ray.get([example_logging_task.remote(x) for x in range(10)])
for x in results:
mlflow.log_metric("x", x)
큰 Pandas 테이블, 이미지, 플롯 또는 모델과 같은 큰 아티팩트를 저장해야 하는 작업의 경우 Databricks는 아티팩트를 파일로 유지하는 것이 좋습니다. 그런 다음, 드라이버 컨텍스트 내에서 아티팩트를 다시 로드하거나 저장된 파일의 경로를 지정하여 MLflow를 통해 개체를 직접 기록합니다.
import mlflow
@ray.remote
def example_logging_task(x):
# ...
# Create a large object that needs to be stored
with open("/dbfs/myLargeFilePath.txt", "w") as f:
f.write(myLargeObject)
return x
with mlflow.start_run() as run:
results = ray.get([example_logging_task.remote(x) for x in range(10)])
for x in results:
mlflow.log_metric("x", x)
# Directly log the saved file by specifying the path
mlflow.log_artifact("/dbfs/myLargeFilePath.txt")
MLflow 자식 실행으로 Ray 작업 기록
자식 실행을 사용하여 Ray Core를 MLflow와 통합할 수 있습니다. 여기에는 다음 단계가 포함됩니다.
- 부모 실행 만들기: 드라이버 프로세스에서 부모 실행을 시작합니다. 이 실행은 모든 후속 자식 실행에 대한 계층적 컨테이너 역할을 수행합니다.
- 자식 실행 만들기: 각 Ray 작업 내의 부모 실행에서 자식 실행을 시작합니다. 각 자식 실행은 자체 메트릭을 독립적으로 기록할 수 있습니다.
이 방법을 구현하려면 각 Ray 작업이 필요한 클라이언트 자격 증명과 부모 run_id
를 수신하는지 확인합니다. 이 설정은 실행 간에 계층적 부모-자식 관계를 설정합니다. 다음 코드 조각은 자격 증명을 검색하고 부모 run_id
를 따라 전달하는 방법을 보여 줍니다.
from mlflow.utils.databricks_utils import get_databricks_env_vars
mlflow_db_creds = get_databricks_env_vars("databricks")
username = "" # Username path
experiment_name = f"/Users/{username}/mlflow_test"
mlflow.set_experiment(experiment_name)
@ray.remote
def ray_task(x, run_id):
import os
# Set the MLflow credentials within the Ray task
os.environ.update(mlflow_db_creds)
# Set the active MLflow experiment within each Ray task
mlflow.set_experiment(experiment_name)
# Create nested child runs associated with the parent run_id
with mlflow.start_run(run_id=run_id, nested=True):
# Log metrics to the child run within the Ray task
mlflow.log_metric("x", x)
return x
# Start parent run on the main driver process
with mlflow.start_run() as run:
# Pass the parent run's run_id to each Ray task
results = ray.get([ray_task.remote(x, run.info.run_id) for x in range(10)])
Ray Train 및 MLflow
Ray Train 모델을 MLflow에 기록하는 가장 간단한 방법은 학습 실행에서 생성된 검사점을 사용하는 것입니다. 학습 실행이 완료되면 원시 딥 러닝 프레임워크(예: PyTorch 또는 TensorFlow)에서 모델을 다시 로드한 다음 해당 MLflow 코드로 기록합니다.
이 방법을 사용하면 모델이 올바르게 저장되고 평가 또는 배포 준비가 됩니다.
다음 코드는 Ray Train 검사점에서 모델을 다시 로드하고 MLflow에 기록합니다.
result = trainer.fit()
checkpoint = result.checkpoint
with checkpoint.as_directory() as checkpoint_dir:
# Change as needed for different DL frameworks
checkpoint_path = f"{checkpoint_dir}/checkpoint.ckpt"
# Load the model from the checkpoint
model = MyModel.load_from_checkpoint(checkpoint_path)
with mlflow.start_run() as run:
# Change the MLflow flavor as needed
mlflow.pytorch.log_model(model, "model")
일반적으로 Ray Train을 사용하여 개체를 드라이버 노드로 다시 보내는 것이 모범 사례이지만 최종 결과를 저장하는 것은 작업자 프로세스의 전체 학습 기록보다 쉽습니다.
학습 실행에서 여러 모델을 저장하려면 ray.train.CheckpointConfig
에서 유지할 검사점 수를 지정합니다. 그런 다음 단일 모델을 저장하는 방식과 동일한 방식으로 모델을 읽고 기록할 수 있습니다.
참고 항목
MLflow는 모델 학습 중에 내결함성을 처리하는 것이 아니라 모델의 수명 주기를 추적합니다. 내결함성은 Ray Train 자체에 의해 관리됩니다.
Ray Train에서 지정한 학습 메트릭을 저장하려면 결과 개체에서 검색하고 MLflow를 사용하여 저장합니다.
result = trainer.fit()
with mlflow.start_run() as run:
mlflow.log_metrics(result.metrics_dataframe.to_dict(orient='dict'))
# Change the MLflow flavor as needed
mlflow.pytorch.log_model(model, "model")
Spark 및 Ray 클러스터를 올바르게 구성하고 리소스 할당 문제를 방지하려면 resources_per_worker
설정을 조정해야 합니다. 특히 각 Ray 작업자의 CPU 수를 Ray 작업자 노드에서 사용할 수 있는 총 CPU 수보다 하나 작게 설정합니다. 이 조정은 트레이너가 Ray 행위자를 위해 사용 가능한 모든 코어를 예약하는 경우 리소스 경합 오류가 발생할 수 있기 때문에 중요합니다.
Ray Tune 및 MLflow
Ray Tune을 MLflow와 통합하면 Databricks 내에서 하이퍼 매개 변수 튜닝 실험을 효율적으로 추적하고 기록할 수 있습니다. 이 통합은 MLflow의 실험 추적 기능을 활용하여 Ray 작업에서 직접 메트릭 및 결과를 기록합니다.
로깅을 위한 자식 실행 방법
Ray Core 작업의 로깅과 마찬가지로 Ray Tune 애플리케이션은 자식 실행 방법을 사용하여 각 평가판 또는 튜닝 반복에서 메트릭을 기록할 수 있습니다. 다음 단계를 사용하여 자식 실행 방법을 구현합니다.
- 부모 실행 만들기: 드라이버 프로세스에서 부모 실행을 시작합니다. 이 실행은 모든 후속 자식 실행에 대한 기본 컨테이너 역할을 합니다.
- 자식 실행 기록: 각 Ray Tune 작업은 부모 실행에서 자식 실행을 만들어 실험 결과의 명확한 계층 구조를 유지합니다.
다음 예제에서는 MLflow를 사용하여 Ray Tune 작업에서 인증하고 기록하는 방법을 보여 줍니다.
import os
import tempfile
import time
import mlflow
from mlflow.utils.databricks_utils import get_databricks_env_vars
from ray import train, tune
from ray.air.integrations.mlflow import MLflowLoggerCallback, setup_mlflow
mlflow_db_creds = get_databricks_env_vars("databricks")
EXPERIMENT_NAME = "/Users/<WORKSPACE_USERNAME>/setup_mlflow_example"
mlflow.set_experiment(EXPERIMENT_NAME)
def evaluation_fn(step, width, height):
return (0.1 + width * step / 100) ** (-1) + height * 0.1
def train_function_mlflow(config, run_id):
os.environ.update(mlflow_db_creds)
mlflow.set_experiment(EXPERIMENT_NAME)
# Hyperparameters
width = config["width"]
height = config["height"]
with mlflow.start_run(run_id=run_id, nested=True):
for step in range(config.get("steps", 100)):
# Iterative training function - can be any arbitrary training procedure
intermediate_score = evaluation_fn(step, width, height)
# Log the metrics to MLflow
mlflow.log_metrics({"iterations": step, "mean_loss": intermediate_score})
# Feed the score back to Tune.
train.report({"iterations": step, "mean_loss": intermediate_score})
time.sleep(0.1)
def tune_with_setup(run_id, finish_fast=True):
os.environ.update(mlflow_db_creds)
# Set the experiment or create a new one if it does not exist.
mlflow.set_experiment(experiment_name=EXPERIMENT_NAME)
tuner = tune.Tuner(
tune.with_parameter(train_function_mlflow, run_id),
tune_config=tune.TuneConfig(num_samples=5),
run_config=train.RunConfig(
name="mlflow",
),
param_space={
"width": tune.randint(10, 100),
"height": tune.randint(0, 100),
"steps": 20 if finish_fast else 100,
},
)
results = tuner.fit()
with mlflow.start_run() as run:
mlflow_tracking_uri = mlflow.get_tracking_uri()
tune_with_setup(run.info.run_id)
모델 서비스
실시간 유추를 위해 Databricks 클러스터에서 Ray Serve를 사용하면 외부 애플리케이션과 상호 작용할 때 네트워크 보안 및 연결 제한으로 인해 문제가 발생합니다.
Databricks는 모델 서비스를 사용하여 프로덕션의 기계 학습 모델을 REST API 엔드포인트에 배포하는 것이 좋습니다. 자세한 내용은 사용자 지정 모델 배포를 참조하세요.