Integrare MLflow e Ray
MLflow è una piattaforma open source per la gestione dei processi di Machine Learning e IA. La combinazione di Ray con MLflow consente di distribuire i carichi di lavoro con Ray e tenere traccia di modelli, metriche, parameterse metadati generati durante il training con MLflow.
Questo articolo illustra come integrare MLflow con i componenti Ray seguenti:
Ray Core: applicazioni distribuite per utilizzo generico non coperte da Ray Tune e Ray Train
Ray Train: training del modello distribuito
Ray Tune: ottimizzazione degli iperparametri distribuiti
Servizio modelli: distribuzione di modelli per l'inferenza in tempo reale
Integrare Ray Core e MLflow
Ray Core offre i blocchi predefiniti fondamentali per le applicazioni distribuite per utilizzo generico. Consente di ridimensionare le funzioni e le classi Python in più nodi.
Questa sezione descrive i modelli seguenti per integrare Ray Core e MLflow:
- Registrare i modelli MLflow dal processo del driver Ray
- Registrare modelli MLflow da esecuzioni figlio
Log MLflow dal processo del driver Ray
In genere è consigliabile registrare i modelli MLflow dal processo del driver anziché dai nodi di lavoro. Ciò è dovuto alla complessità aggiunta del passaggio di riferimenti con stato ai lavoratori remoti.
Ad esempio, il codice seguente ha esito negativo perché il Server di rilevamento MLflow non viene inizializzato usando MLflow Client
dall'interno dei nodi di lavoro.
import mlflow
@ray.remote
def example_logging_task(x):
# ...
# This method will fail
mlflow.log_metric("x", x)
return x
with mlflow.start_run() as run:
ray.get([example_logging_task.remote(x) for x in range(10)])
Restituire invece le metriche al nodo driver. Le metriche e i metadati sono in genere sufficientemente piccoli per il trasferimento al driver senza causare problemi di memoria.
Prendi l'esempio indicato sopra e eseguilo con update per registrare le metriche restituite da un'attività di Ray.
import mlflow
@ray.remote
def example_logging_task(x):
# ...
return x
with mlflow.start_run() as run:
results = ray.get([example_logging_task.remote(x) for x in range(10)])
for x in results:
mlflow.log_metric("x", x)
Per le attività che richiedono il salvataggio di artefatti di grandi dimensioni, ad esempio pandas di grandi dimensioni table, immagini, tracciati o modelli, Databricks consiglia di rendere persistente l'artefatto come file. Quindi, ricaricare l'artefatto all'interno del contesto del driver o registrare direttamente l'oggetto con MLflow specificando il percorso del file salvato.
import mlflow
@ray.remote
def example_logging_task(x):
# ...
# Create a large object that needs to be stored
with open("/dbfs/myLargeFilePath.txt", "w") as f:
f.write(myLargeObject)
return x
with mlflow.start_run() as run:
results = ray.get([example_logging_task.remote(x) for x in range(10)])
for x in results:
mlflow.log_metric("x", x)
# Directly log the saved file by specifying the path
mlflow.log_artifact("/dbfs/myLargeFilePath.txt")
Registrare le attività Ray come esecuzioni figlio di MLflow
È possibile integrare Ray Core con MLflow usando le esecuzioni figlio. Questo include i passaggi seguenti:
- Creare un'esecuzione padre: inizializzare un'esecuzione padre nel processo del driver. Questa esecuzione funge da contenitore gerarchico per tutte le esecuzioni figlio successive.
- Crea esecuzioni figlio: all'interno di ogni attività Ray, avviare un'esecuzione figlio nell'esecuzione padre. Ogni esecuzione figlio può registrare in modo indipendente le proprie metriche.
Per implementare questo approccio, assicurarsi che ogni compito di Ray riceva il client necessario credentials e il padre run_id
. Questa configurazione stabilisce la relazione padre-figlio gerarchica tra le esecuzioni. Il frammento di codice seguente illustra come recuperare il credentials e passare il run_id
padre :
from mlflow.utils.databricks_utils import get_databricks_env_vars
mlflow_db_creds = get_databricks_env_vars("databricks")
username = "" # Username path
experiment_name = f"/Users/{username}/mlflow_test"
mlflow.set_experiment(experiment_name)
@ray.remote
def ray_task(x, run_id):
import os
# Set the MLflow credentials within the Ray task
os.environ.update(mlflow_db_creds)
# Set the active MLflow experiment within each Ray task
mlflow.set_experiment(experiment_name)
# Create nested child runs associated with the parent run_id
with mlflow.start_run(run_id=run_id, nested=True):
# Log metrics to the child run within the Ray task
mlflow.log_metric("x", x)
return x
# Start parent run on the main driver process
with mlflow.start_run() as run:
# Pass the parent run's run_id to each Ray task
results = ray.get([ray_task.remote(x, run.info.run_id) for x in range(10)])
Ray Train e MLflow
Il modo più semplice per registrare i modelli Ray Train in MLflow consiste nell'usare il checkpoint generato dall'esecuzione del training. Dopo il completamento dell'esecuzione del training, ricaricare il modello nel framework di Deep Learning nativo (ad esempio PyTorch o TensorFlow), quindi registrarlo con il codice MLflow corrispondente.
Questo approccio garantisce che il modello venga archiviato correttamente e pronto per la valutazione o la distribuzione.
Il codice seguente ricarica un modello da un checkpoint Ray Train e lo registra in MLflow:
result = trainer.fit()
checkpoint = result.checkpoint
with checkpoint.as_directory() as checkpoint_dir:
# Change as needed for different DL frameworks
checkpoint_path = f"{checkpoint_dir}/checkpoint.ckpt"
# Load the model from the checkpoint
model = MyModel.load_from_checkpoint(checkpoint_path)
with mlflow.start_run() as run:
# Change the MLflow flavor as needed
mlflow.pytorch.log_model(model, "model")
Anche se in genere è consigliabile inviare oggetti al nodo del driver, con Ray Train, il salvataggio dei risultati finali è più semplice rispetto all'intera cronologia di training dal processo di lavoro.
Per archiviare più modelli da un'esecuzione di training, specificare il numero di checkpoint da mantenere in ray.train.CheckpointConfig
. I modelli possono quindi essere letti e registrati allo stesso modo dell'archiviazione di un singolo modello.
Nota
MLflow non è responsabile della gestione della tolleranza di errore durante il training del modello, ma piuttosto per tenere traccia del ciclo di vita del modello. La tolleranza di errore viene invece gestita da Ray Train.
Per archiviare le metriche di training specificate da Ray Train, recuperarle dall'oggetto risultato e archiviarle usando MLflow.
result = trainer.fit()
with mlflow.start_run() as run:
mlflow.log_metrics(result.metrics_dataframe.to_dict(orient='dict'))
# Change the MLflow flavor as needed
mlflow.pytorch.log_model(model, "model")
Per configurare correttamente i cluster Spark e Ray e prevenire i problemi di allocazione delle risorse, è necessario modificare l'impostazione resources_per_worker
. In particolare, set il numero di CPU per ogni lavoratore Ray sia uno in meno rispetto al numero totale di CPU disponibili in un nodo lavoratore Ray. Questa regolazione è fondamentale perché se il formatore riserva tutti i core disponibili per gli attori Ray, può causare errori di contesa delle risorse.
Ray Tune e MLflow
L'integrazione di Ray Tune con MLflow consente di tenere traccia e registrare in modo efficiente gli esperimenti di ottimizzazione degli iperparametri all'interno di Databricks. Questa integrazione sfrutta le funzionalità di rilevamento degli esperimenti di MLflow per registrare metriche e risultati direttamente dalle attività Ray.
Approccio di esecuzione figlio per la registrazione
Analogamente alla registrazione da attività Ray Core, le applicazioni Ray Tune possono usare un approccio di esecuzione figlio per registrare le metriche da ogni versione di valutazione o ottimizzazione dell'iterazione. Usare la procedura seguente per implementare un approccio a esecuzione figlio:
- Creare un'esecuzione padre: inizializzare un'esecuzione padre nel processo del driver. Questa esecuzione funge da contenitore principale per tutte le esecuzioni figlio successive.
- Esecuzioni figlio del log: ogni attività Ray Tune crea un'esecuzione figlio nell'esecuzione padre, mantenendo una gerarchia chiara dei risultati dell'esperimento.
L'esempio seguente illustra come eseguire l'autenticazione e la registrazione dalle attività di Ray Tune usando MLflow.
import os
import tempfile
import time
import mlflow
from mlflow.utils.databricks_utils import get_databricks_env_vars
from ray import train, tune
from ray.air.integrations.mlflow import MLflowLoggerCallback, setup_mlflow
mlflow_db_creds = get_databricks_env_vars("databricks")
EXPERIMENT_NAME = "/Users/<WORKSPACE_USERNAME>/setup_mlflow_example"
mlflow.set_experiment(EXPERIMENT_NAME)
def evaluation_fn(step, width, height):
return (0.1 + width * step / 100) ** (-1) + height * 0.1
def train_function_mlflow(config, run_id):
os.environ.update(mlflow_db_creds)
mlflow.set_experiment(EXPERIMENT_NAME)
# Hyperparameters
width = config["width"]
height = config["height"]
with mlflow.start_run(run_id=run_id, nested=True):
for step in range(config.get("steps", 100)):
# Iterative training function - can be any arbitrary training procedure
intermediate_score = evaluation_fn(step, width, height)
# Log the metrics to MLflow
mlflow.log_metrics({"iterations": step, "mean_loss": intermediate_score})
# Feed the score back to Tune.
train.report({"iterations": step, "mean_loss": intermediate_score})
time.sleep(0.1)
def tune_with_setup(run_id, finish_fast=True):
os.environ.update(mlflow_db_creds)
# Set the experiment or create a new one if it does not exist.
mlflow.set_experiment(experiment_name=EXPERIMENT_NAME)
tuner = tune.Tuner(
tune.with_parameter(train_function_mlflow, run_id),
tune_config=tune.TuneConfig(num_samples=5),
run_config=train.RunConfig(
name="mlflow",
),
param_space={
"width": tune.randint(10, 100),
"height": tune.randint(0, 100),
"steps": 20 if finish_fast else 100,
},
)
results = tuner.fit()
with mlflow.start_run() as run:
mlflow_tracking_uri = mlflow.get_tracking_uri()
tune_with_setup(run.info.run_id)
Gestione dei modelli
L'uso di Ray Serve nei cluster Databricks per l'inferenza in tempo reale pone problemi a causa delle limitazioni di sicurezza e connettività di rete durante l'interazione con applicazioni esterne.
Databricks consiglia di usare Model Serving per distribuire modelli di Machine Learning nell'ambiente di produzione in un endpoint dell'API REST. Per altre informazioni, vedere Implementare modelli personalizzati.