Creare modelli con Machine Learning automatizzato (anteprima)
Machine Learning automatizzato (AutoML) include un set di tecniche e strumenti progettati per semplificare il processo di training e l'ottimizzazione dei modelli di Machine Learning con un intervento umano minimo. L'obiettivo principale di AutoML è semplificare e accelerare la selezione del modello di Machine Learning e degli iperparametri più adatti per un determinato set di dati, un'attività che richiede in genere notevoli competenze e risorse di calcolo. All'interno del framework di Infrastruttura, i data scientist possono sfruttare il modulo flaml.AutoML
per automatizzare vari aspetti dei flussi di lavoro di Machine Learning.
In questo articolo verrà illustrato il processo di generazione di versioni di valutazione autoML direttamente dal codice usando un set di dati Spark. Verranno inoltre esaminati i metodi per convertire questi dati in un dataframe Pandas e verranno illustrate le tecniche per parallelizzare le prove di sperimentazione.
Importante
Questa funzionalità si trova nell’anteprima.
Prerequisiti
Ottenere una sottoscrizione di Microsoft Fabric. In alternativa, iscriversi per ottenere una versione di valutazione di Microsoft Fabric gratuita.
Accedere a Microsoft Fabric.
Usare l’opzione esperienza sul lato sinistro della home page per passare all'esperienza Science di Synapse.
- Creare un nuovo ambiente Fabric o assicurarsi che l’esecuzione avvenga in Fabric Runtime 1.2 (Spark 3.4 (o versione successiva) e Delta 2.4)
- Creare un nuovo notebook.
- Collegare il notebook a un lakehouse. Sul lato sinistro del notebook, selezionare Aggiungi per aggiungere un lakehouse esistente o crearne uno nuovo.
Caricare e preparare i dati
In questa sezione verranno specificate le impostazioni di download per i dati e quindi verranno salvate nel lakehouse.
Scarica dati
Questo blocco di codice scarica i dati da un'origine remota e lo salva nel lakehouse
import os
import requests
IS_CUSTOM_DATA = False # if TRUE, dataset has to be uploaded manually
if not IS_CUSTOM_DATA:
# Specify the remote URL where the data is hosted
remote_url = "https://synapseaisolutionsa.blob.core.windows.net/public/bankcustomerchurn"
# List of data files to download
file_list = ["churn.csv"]
# Define the download path within the lakehouse
download_path = "/lakehouse/default/Files/churn/raw"
# Check if the lakehouse directory exists; if not, raise an error
if not os.path.exists("/lakehouse/default"):
raise FileNotFoundError("Default lakehouse not found. Please add a lakehouse and restart the session.")
# Create the download directory if it doesn't exist
os.makedirs(download_path, exist_ok=True)
# Download each data file if it doesn't already exist in the lakehouse
for fname in file_list:
if not os.path.exists(f"{download_path}/{fname}"):
r = requests.get(f"{remote_url}/{fname}", timeout=30)
with open(f"{download_path}/{fname}", "wb") as f:
f.write(r.content)
print("Downloaded demo data files into lakehouse.")
Caricare i dati in un dataframe Spark
Il blocco di codice seguente carica i dati dal file CSV in un dataframe Spark e lo memorizza nella cache per un'elaborazione efficiente.
df = (
spark.read.option("header", True)
.option("inferSchema", True)
.csv("Files/churn/raw/churn.csv")
.cache()
)
Questo codice presuppone che il file di dati sia stato scaricato e che si trovi nel percorso specificato. Legge il file CSV in un dataframe Spark, deduce lo schema e lo memorizza nella cache per un accesso più rapido durante le operazioni successive.
Preparare i dati
In questa sezione si eseguirà la pulizia dei dati e la progettazione delle funzionalità nel set di dati.
Pulire i dati
Prima di tutto, viene definita una funzione per pulire i dati, che include l'eliminazione di righe con dati mancanti, la rimozione di righe duplicate in base a colonne specifiche e l'eliminazione di colonne non necessarie.
# Define a function to clean the data
def clean_data(df):
# Drop rows with missing data across all columns
df = df.dropna(how="all")
# Drop duplicate rows based on 'RowNumber' and 'CustomerId'
df = df.dropDuplicates(subset=['RowNumber', 'CustomerId'])
# Drop columns: 'RowNumber', 'CustomerId', 'Surname'
df = df.drop('RowNumber', 'CustomerId', 'Surname')
return df
# Create a copy of the original dataframe by selecting all the columns
df_copy = df.select("*")
# Apply the clean_data function to the copy
df_clean = clean_data(df_copy)
La funzione clean_data
consente di garantire che il set di dati sia privo di valori mancanti e duplicati durante la rimozione di colonne non necessarie.
Progettazione delle caratteristiche
Successivamente, viene eseguita la progettazione delle funzionalità creando colonne fittizie per le colonne 'Geography' e 'Gender' usando la codifica one-hot.
# Import PySpark functions
from pyspark.sql import functions as F
# Create dummy columns for 'Geography' and 'Gender' using one-hot encoding
df_clean = df_clean.select(
"*",
F.when(F.col("Geography") == "France", 1).otherwise(0).alias("Geography_France"),
F.when(F.col("Geography") == "Germany", 1).otherwise(0).alias("Geography_Germany"),
F.when(F.col("Geography") == "Spain", 1).otherwise(0).alias("Geography_Spain"),
F.when(F.col("Gender") == "Female", 1).otherwise(0).alias("Gender_Female"),
F.when(F.col("Gender") == "Male", 1).otherwise(0).alias("Gender_Male")
)
# Drop the original 'Geography' and 'Gender' columns
df_clean = df_clean.drop("Geography", "Gender")
In questo caso viene usata la codifica one-hot per convertire le colonne categoriche in colonne fittizie binarie, rendendole adatte agli algoritmi di Machine Learning.
Visualizzare i dati puliti
Infine, viene visualizzato il set di dati pulito e progettato per le funzionalità usando la funzione di visualizzazione.
display(df_clean)
Questo passaggio consente di esaminare il dataframe risultante con le trasformazioni applicate.
Salva in lakehouse
Ora si salverà il set di dati pulito e progettato per le funzionalità nel lakehouse.
# Create PySpark DataFrame from Pandas
df_clean.write.mode("overwrite").format("delta").save(f"Tables/churn_data_clean")
print(f"Spark dataframe saved to delta table: churn_data_clean")
In questo caso, prendiamo il dataframe PySpark pulito e trasformato, df_clean
, e lo salviamo come tabella Delta denominata "churn_data_clean" nella lakehouse. Viene usato il formato Delta per il controllo delle versioni e la gestione efficienti del set di dati. mode("overwrite")
garantisce che qualsiasi tabella esistente con lo stesso nome venga sovrascritta e venga creata una nuova versione della tabella.
Creare set di dati di training e di test
Verranno quindi creati i set di dati di test e training dai dati puliti e progettati dalle funzionalità.
Nella sezione del codice fornita viene caricato un set di dati pulito e progettato dalle funzionalità dal lakehouse usando il formato Delta, suddiviso in set di training e test con un rapporto 80-20 e si preparano i dati per l'apprendimento automatico. Questa preparazione comporta l'importazione di VectorAssembler
da PySpark ML per combinare le colonne delle funzionalità in una singola colonna "features". Successivamente, viene usato VectorAssembler
per trasformare i set di dati di training e test, generando i dataframe train_data
e test_data
che contengono la variabile di destinazione "Exited" e i vettori di funzionalità. Questi set di dati sono ora pronti per l'uso nella compilazione e nella valutazione di modelli di Machine Learning.
# Import the necessary library for feature vectorization
from pyspark.ml.feature import VectorAssembler
# Load the cleaned and feature-engineered dataset from the lakehouse
df_final = spark.read.format("delta").load("Tables/churn_data_clean")
# Train-Test Separation
train_raw, test_raw = df_final.randomSplit([0.8, 0.2], seed=41)
# Define the feature columns (excluding the target variable 'Exited')
feature_cols = [col for col in df_final.columns if col != "Exited"]
# Create a VectorAssembler to combine feature columns into a single 'features' column
featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features")
# Transform the training and testing datasets using the VectorAssembler
train_data = featurizer.transform(train_raw)["Exited", "features"]
test_data = featurizer.transform(test_raw)["Exited", "features"]
Eseguire il training di un modello di base
Usando i dati con funzionalità, si eseguirà il training di un modello di Machine Learning di base, si configurerà MLflow per il rilevamento dell'esperimento, si definirà una funzione di stima per il calcolo delle metriche e infine si visualizzerà e si renderà registrare il punteggio AUC ROC risultante.
Impostare il livello di registrazione
In questo caso viene configurato il livello di registrazione per eliminare l'output non necessario dal catalogo Synapse.ml, mantenendo i log più puliti.
import logging
logging.getLogger('synapse.ml').setLevel(logging.ERROR)
Configurare MLflow
In questa sezione viene configurato MLflow per il rilevamento dell'esperimento. Il nome dell'esperimento viene impostato su "automl_sample" per organizzare le esecuzioni. Inoltre, viene abilitata la registrazione automatica, assicurando che i parametri del modello, le metriche e gli artefatti vengano registrati automaticamente in MLflow.
import mlflow
# Set the MLflow experiment to "automl_sample" and enable automatic logging
mlflow.set_experiment("automl_sample")
mlflow.autolog(exclusive=False)
Eseguire il training e valutare il modello
Infine, viene eseguito il training di un modello LightGBMClassifier sui dati di training forniti. Il modello è configurato con le impostazioni necessarie per la classificazione binaria e la gestione dello squilibrio. Si usa quindi il modello sottoposto a training per effettuare previsioni sui dati di test. Vengono estratte le probabilità stimate per la classe positiva e le etichette vere dai dati di test. Successivamente, viene calcolato il punteggio dell'AUC ROC usando la funzione roc_auc_score
di sklearn.
from synapse.ml.lightgbm import LightGBMClassifier
from sklearn.metrics import roc_auc_score
# Assuming you have already defined 'train_data' and 'test_data'
with mlflow.start_run(run_name="default") as run:
# Create a LightGBMClassifier model with specified settings
model = LightGBMClassifier(objective="binary", featuresCol="features", labelCol="Exited", dataTransferMode="bulk")
# Fit the model to the training data
model = model.fit(train_data)
# Get the predictions
predictions = model.transform(test_data)
# Extract the predicted probabilities for the positive class
y_pred = predictions.select("probability").rdd.map(lambda x: x[0][1]).collect()
# Extract the true labels from the 'test_data' DataFrame
y_true = test_data.select("Exited").rdd.map(lambda x: x[0]).collect()
# Compute the ROC AUC score
roc_auc = roc_auc_score(y_true, y_pred)
# Log the ROC AUC score with MLflow
mlflow.log_metric("ROC_AUC", roc_auc)
# Print or log the ROC AUC score
print("ROC AUC Score:", roc_auc)
Da qui è possibile notare che il modello risultante ottiene un punteggio ROC AUC pari all'84%.
Creare una versione di valutazione di AutoML con FLAML
In questa sezione si creerà una versione di valutazione autoML usando il pacchetto FLAML, si configureranno le impostazioni di valutazione, si convertirà il set di dati Spark in un set di dati Pandas in Spark, si eseguirà la versione di valutazione autoML e si visualizzeranno le metriche risultanti.
Configurare la versione di valutazione AutoML
In questo caso vengono importate le classi e i moduli necessari dal pacchetto FLAML e viene creata un'istanza di AutoML, che verrà usata per automatizzare la pipeline di Machine Learning.
# Import the AutoML class from the FLAML package
from flaml import AutoML
from flaml.automl.spark.utils import to_pandas_on_spark
# Create an AutoML instance
automl = AutoML()
Configurare le impostazioni
In questa sezione vengono definite le impostazioni di configurazione per la versione di valutazione di AutoML.
# Define AutoML settings
settings = {
"time_budget": 250, # Total running time in seconds
"metric": 'roc_auc', # Optimization metric (ROC AUC in this case)
"task": 'classification', # Task type (classification)
"log_file_name": 'flaml_experiment.log', # FLAML log file
"seed": 41, # Random seed
"force_cancel": True, # Force stop training once time_budget is used up
"mlflow_exp_name": "automl_sample" # MLflow experiment name
}
Eseguire la conversione in Pandas in Spark
Per eseguire AutoML con un set di dati basato su Spark, è necessario convertirlo in un set di dati Pandas in Spark usando la funzione to_pandas_on_spark
. Ciò consente a FLAML di lavorare con i dati in modo efficiente.
# Convert the Spark training dataset to a Pandas on Spark dataset
df_automl = to_pandas_on_spark(train_data)
Eseguire la versione di valutazione di AutoML
A questo momento, viene eseguita la versione di valutazione di AutoML. Viene usata un'esecuzione MLflow annidata per tenere traccia dell'esperimento all'interno del contesto di esecuzione MLflow esistente. La versione di valutazione autoML viene eseguita sul set di dati Pandas in Spark (df_automl
) con la variabile di destinazione "Exited
e le impostazioni definite vengono passate alla funzione fit
per la configurazione.
'''The main flaml automl API'''
with mlflow.start_run(nested=True):
automl.fit(dataframe=df_automl, label='Exited', isUnbalance=True, **settings)
Visualizzare le metriche risultanti
In questa sezione finale vengono recuperati e visualizzati i risultati della versione di valutazione di AutoML. Queste metriche forniscono informazioni dettagliate sulle prestazioni e sulla configurazione del modello AutoML nel set di dati specificato.
# Retrieve and display the best hyperparameter configuration and metrics
print('Best hyperparameter config:', automl.best_config)
print('Best ROC AUC on validation data: {0:.4g}'.format(1 - automl.best_loss))
print('Training duration of the best run: {0:.4g} s'.format(automl.best_config_train_time))
Parallelizzare la versione di valutazione di AutoML con Apache Spark
Negli scenari in cui il set di dati può essere inserito in un singolo nodo e si vuole sfruttare la potenza di Spark per l'esecuzione simultanea di più versioni di valutazione autoML parallele, è possibile seguire questa procedura:
Convertire in frame di dati Pandas
Per abilitare la parallelizzazione, i dati devono prima essere convertiti in un dataframe Pandas.
pandas_df = train_raw.toPandas()
In questo caso, si converte il dataframe Spark train_raw
in un dataframe Pandas denominato pandas_df
per renderlo adatto per l'elaborazione parallela.
Configurare le impostazioni di parallelizzazione
Impostare use_spark
su True
per abilitare il parallelismo basato su Spark. Per impostazione predefinita, FLAML avvierà una versione di valutazione per ogni executor. È possibile personalizzare il numero di versioni di valutazione simultanee usando l'argomento n_concurrent_trials
.
settings = {
"time_budget": 250, # Total running time in seconds
"metric": 'roc_auc', # Optimization metric (ROC AUC in this case)
"task": 'classification', # Task type (classification)
"seed": 41, # Random seed
"use_spark": True, # Enable Spark-based parallelism
"n_concurrent_trials": 3, # Number of concurrent trials to run
"force_cancel": True, # Force stop training once time_budget is used up
"mlflow_exp_name": "automl_sample" # MLflow experiment name
}
In queste impostazioni si specifica che si vuole usare Spark per il parallelismo impostando use_spark
su True
. È stato anche impostato il numero di versioni di valutazione simultanee su 3, ovvero tre versioni di valutazione verranno eseguite in parallelo in Spark.
Per maggiori informazioni su come sincronizzare i percorsi AutoML, consulta la documentazione FLAML per i processi Spark paralleli.
Eseguire la versione di valutazione di AutoML in parallelo
Ora verrà eseguita la versione di valutazione di AutoML in parallelo con le impostazioni specificate. Verrà usata un'esecuzione MLflow annidata per tenere traccia dell'esperimento all'interno del contesto di esecuzione MLflow esistente.
'''The main FLAML AutoML API'''
with mlflow.start_run(nested=True, run_name="parallel_trial"):
automl.fit(dataframe=pandas_df, label='Exited', **settings)
Verrà ora eseguita la versione di valutazione di AutoML con la parallelizzazione abilitata. L'argomento dataframe
viene impostato sul dataframe Pandas pandas_df
, e altre impostazioni vengono passate alla funzione fit
per l'esecuzione parallela.
Visualizzare le metriche
Dopo aver eseguito la versione di valutazione parallela di AutoML, recuperare e visualizzare i risultati, tra cui la configurazione migliore degli iperparametri, l'AUC ROC sui dati di convalida e la durata del training dell'esecuzione con prestazioni migliori.
''' retrieve best config'''
print('Best hyperparmeter config:', automl.best_config)
print('Best roc_auc on validation data: {0:.4g}'.format(1-automl.best_loss))
print('Training duration of best run: {0:.4g} s'.format(automl.best_config_train_time))