Condividi tramite


Creare modelli con Machine Learning automatizzato (anteprima)

Machine Learning automatizzato (AutoML) include un set di tecniche e strumenti progettati per semplificare il processo di training e l'ottimizzazione dei modelli di Machine Learning con un intervento umano minimo. L'obiettivo principale di AutoML è semplificare e accelerare la selezione del modello di Machine Learning e degli iperparametri più adatti per un determinato set di dati, un'attività che richiede in genere notevoli competenze e risorse di calcolo. All'interno del framework di Infrastruttura, i data scientist possono sfruttare il modulo flaml.AutoML per automatizzare vari aspetti dei flussi di lavoro di Machine Learning.

In questo articolo verrà illustrato il processo di generazione di versioni di valutazione autoML direttamente dal codice usando un set di dati Spark. Verranno inoltre esaminati i metodi per convertire questi dati in un dataframe Pandas e verranno illustrate le tecniche per parallelizzare le prove di sperimentazione.

Importante

Questa funzionalità si trova nell’anteprima.

Prerequisiti

  • Creare un nuovo ambiente Fabric o assicurarsi che l’esecuzione avvenga in Fabric Runtime 1.2 (Spark 3.4 (o versione successiva) e Delta 2.4)
  • Creare un nuovo notebook.
  • Collegare il notebook a un lakehouse. Sul lato sinistro del notebook, selezionare Aggiungi per aggiungere un lakehouse esistente o crearne uno nuovo.

Caricare e preparare i dati

In questa sezione verranno specificate le impostazioni di download per i dati e quindi verranno salvate nel lakehouse.

Scarica dati

Questo blocco di codice scarica i dati da un'origine remota e lo salva nel lakehouse

import os
import requests

IS_CUSTOM_DATA = False  # if TRUE, dataset has to be uploaded manually

if not IS_CUSTOM_DATA:
    # Specify the remote URL where the data is hosted
    remote_url = "https://synapseaisolutionsa.blob.core.windows.net/public/bankcustomerchurn"
    
    # List of data files to download
    file_list = ["churn.csv"]
    
    # Define the download path within the lakehouse
    download_path = "/lakehouse/default/Files/churn/raw"
    
    # Check if the lakehouse directory exists; if not, raise an error
    if not os.path.exists("/lakehouse/default"):
        raise FileNotFoundError("Default lakehouse not found. Please add a lakehouse and restart the session.")
    
    # Create the download directory if it doesn't exist
    os.makedirs(download_path, exist_ok=True)
    
    # Download each data file if it doesn't already exist in the lakehouse
    for fname in file_list:
        if not os.path.exists(f"{download_path}/{fname}"):
            r = requests.get(f"{remote_url}/{fname}", timeout=30)
            with open(f"{download_path}/{fname}", "wb") as f:
                f.write(r.content)
    
    print("Downloaded demo data files into lakehouse.")

Caricare i dati in un dataframe Spark

Il blocco di codice seguente carica i dati dal file CSV in un dataframe Spark e lo memorizza nella cache per un'elaborazione efficiente.

df = (
    spark.read.option("header", True)
    .option("inferSchema", True)
    .csv("Files/churn/raw/churn.csv")
    .cache()
)

Questo codice presuppone che il file di dati sia stato scaricato e che si trovi nel percorso specificato. Legge il file CSV in un dataframe Spark, deduce lo schema e lo memorizza nella cache per un accesso più rapido durante le operazioni successive.

Preparare i dati

In questa sezione si eseguirà la pulizia dei dati e la progettazione delle funzionalità nel set di dati.

Pulire i dati

Prima di tutto, viene definita una funzione per pulire i dati, che include l'eliminazione di righe con dati mancanti, la rimozione di righe duplicate in base a colonne specifiche e l'eliminazione di colonne non necessarie.

# Define a function to clean the data
def clean_data(df):
    # Drop rows with missing data across all columns
    df = df.dropna(how="all")
    # Drop duplicate rows based on 'RowNumber' and 'CustomerId'
    df = df.dropDuplicates(subset=['RowNumber', 'CustomerId'])
    # Drop columns: 'RowNumber', 'CustomerId', 'Surname'
    df = df.drop('RowNumber', 'CustomerId', 'Surname')
    return df

# Create a copy of the original dataframe by selecting all the columns
df_copy = df.select("*")

# Apply the clean_data function to the copy
df_clean = clean_data(df_copy)

La funzione clean_data consente di garantire che il set di dati sia privo di valori mancanti e duplicati durante la rimozione di colonne non necessarie.

Progettazione delle caratteristiche

Successivamente, viene eseguita la progettazione delle funzionalità creando colonne fittizie per le colonne 'Geography' e 'Gender' usando la codifica one-hot.

# Import PySpark functions
from pyspark.sql import functions as F

# Create dummy columns for 'Geography' and 'Gender' using one-hot encoding
df_clean = df_clean.select(
    "*",
    F.when(F.col("Geography") == "France", 1).otherwise(0).alias("Geography_France"),
    F.when(F.col("Geography") == "Germany", 1).otherwise(0).alias("Geography_Germany"),
    F.when(F.col("Geography") == "Spain", 1).otherwise(0).alias("Geography_Spain"),
    F.when(F.col("Gender") == "Female", 1).otherwise(0).alias("Gender_Female"),
    F.when(F.col("Gender") == "Male", 1).otherwise(0).alias("Gender_Male")
)

# Drop the original 'Geography' and 'Gender' columns
df_clean = df_clean.drop("Geography", "Gender")

In questo caso viene usata la codifica one-hot per convertire le colonne categoriche in colonne fittizie binarie, rendendole adatte agli algoritmi di Machine Learning.

Visualizzare i dati puliti

Infine, viene visualizzato il set di dati pulito e progettato per le funzionalità usando la funzione di visualizzazione.


display(df_clean)

Questo passaggio consente di esaminare il dataframe risultante con le trasformazioni applicate.

Salva in lakehouse

Ora si salverà il set di dati pulito e progettato per le funzionalità nel lakehouse.

# Create PySpark DataFrame from Pandas
df_clean.write.mode("overwrite").format("delta").save(f"Tables/churn_data_clean")
print(f"Spark dataframe saved to delta table: churn_data_clean")

In questo caso, prendiamo il dataframe PySpark pulito e trasformato, df_clean, e lo salviamo come tabella Delta denominata "churn_data_clean" nella lakehouse. Viene usato il formato Delta per il controllo delle versioni e la gestione efficienti del set di dati. mode("overwrite") garantisce che qualsiasi tabella esistente con lo stesso nome venga sovrascritta e venga creata una nuova versione della tabella.

Creare set di dati di training e di test

Verranno quindi creati i set di dati di test e training dai dati puliti e progettati dalle funzionalità.

Nella sezione del codice fornita viene caricato un set di dati pulito e progettato dalle funzionalità dal lakehouse usando il formato Delta, suddiviso in set di training e test con un rapporto 80-20 e si preparano i dati per l'apprendimento automatico. Questa preparazione comporta l'importazione di VectorAssembler da PySpark ML per combinare le colonne delle funzionalità in una singola colonna "features". Successivamente, viene usato VectorAssembler per trasformare i set di dati di training e test, generando i dataframe train_data e test_data che contengono la variabile di destinazione "Exited" e i vettori di funzionalità. Questi set di dati sono ora pronti per l'uso nella compilazione e nella valutazione di modelli di Machine Learning.

# Import the necessary library for feature vectorization
from pyspark.ml.feature import VectorAssembler

# Load the cleaned and feature-engineered dataset from the lakehouse
df_final = spark.read.format("delta").load("Tables/churn_data_clean")

# Train-Test Separation
train_raw, test_raw = df_final.randomSplit([0.8, 0.2], seed=41)

# Define the feature columns (excluding the target variable 'Exited')
feature_cols = [col for col in df_final.columns if col != "Exited"]

# Create a VectorAssembler to combine feature columns into a single 'features' column
featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features")

# Transform the training and testing datasets using the VectorAssembler
train_data = featurizer.transform(train_raw)["Exited", "features"]
test_data = featurizer.transform(test_raw)["Exited", "features"]

Eseguire il training di un modello di base

Usando i dati con funzionalità, si eseguirà il training di un modello di Machine Learning di base, si configurerà MLflow per il rilevamento dell'esperimento, si definirà una funzione di stima per il calcolo delle metriche e infine si visualizzerà e si renderà registrare il punteggio AUC ROC risultante.

Impostare il livello di registrazione

In questo caso viene configurato il livello di registrazione per eliminare l'output non necessario dal catalogo Synapse.ml, mantenendo i log più puliti.

import logging
 
logging.getLogger('synapse.ml').setLevel(logging.ERROR)

Configurare MLflow

In questa sezione viene configurato MLflow per il rilevamento dell'esperimento. Il nome dell'esperimento viene impostato su "automl_sample" per organizzare le esecuzioni. Inoltre, viene abilitata la registrazione automatica, assicurando che i parametri del modello, le metriche e gli artefatti vengano registrati automaticamente in MLflow.

import mlflow

# Set the MLflow experiment to "automl_sample" and enable automatic logging
mlflow.set_experiment("automl_sample")
mlflow.autolog(exclusive=False)

Eseguire il training e valutare il modello

Infine, viene eseguito il training di un modello LightGBMClassifier sui dati di training forniti. Il modello è configurato con le impostazioni necessarie per la classificazione binaria e la gestione dello squilibrio. Si usa quindi il modello sottoposto a training per effettuare previsioni sui dati di test. Vengono estratte le probabilità stimate per la classe positiva e le etichette vere dai dati di test. Successivamente, viene calcolato il punteggio dell'AUC ROC usando la funzione roc_auc_score di sklearn.

from synapse.ml.lightgbm import LightGBMClassifier
from sklearn.metrics import roc_auc_score

# Assuming you have already defined 'train_data' and 'test_data'

with mlflow.start_run(run_name="default") as run:
    # Create a LightGBMClassifier model with specified settings
    model = LightGBMClassifier(objective="binary", featuresCol="features", labelCol="Exited", dataTransferMode="bulk")
    
    # Fit the model to the training data
    model = model.fit(train_data)

    # Get the predictions
    predictions = model.transform(test_data)

    # Extract the predicted probabilities for the positive class
    y_pred = predictions.select("probability").rdd.map(lambda x: x[0][1]).collect()

    # Extract the true labels from the 'test_data' DataFrame
    y_true = test_data.select("Exited").rdd.map(lambda x: x[0]).collect()

    # Compute the ROC AUC score
    roc_auc = roc_auc_score(y_true, y_pred)

    # Log the ROC AUC score with MLflow
    mlflow.log_metric("ROC_AUC", roc_auc)

    # Print or log the ROC AUC score
    print("ROC AUC Score:", roc_auc)

Da qui è possibile notare che il modello risultante ottiene un punteggio ROC AUC pari all'84%.

Creare una versione di valutazione di AutoML con FLAML

In questa sezione si creerà una versione di valutazione autoML usando il pacchetto FLAML, si configureranno le impostazioni di valutazione, si convertirà il set di dati Spark in un set di dati Pandas in Spark, si eseguirà la versione di valutazione autoML e si visualizzeranno le metriche risultanti.

Configurare la versione di valutazione AutoML

In questo caso vengono importate le classi e i moduli necessari dal pacchetto FLAML e viene creata un'istanza di AutoML, che verrà usata per automatizzare la pipeline di Machine Learning.

# Import the AutoML class from the FLAML package
from flaml import AutoML
from flaml.automl.spark.utils import to_pandas_on_spark

# Create an AutoML instance
automl = AutoML()

Configurare le impostazioni

In questa sezione vengono definite le impostazioni di configurazione per la versione di valutazione di AutoML.

# Define AutoML settings
settings = {
    "time_budget": 250,         # Total running time in seconds
    "metric": 'roc_auc',       # Optimization metric (ROC AUC in this case)
    "task": 'classification',  # Task type (classification)
    "log_file_name": 'flaml_experiment.log',  # FLAML log file
    "seed": 41,                # Random seed
    "force_cancel": True,      # Force stop training once time_budget is used up
    "mlflow_exp_name": "automl_sample"      # MLflow experiment name
}

Eseguire la conversione in Pandas in Spark

Per eseguire AutoML con un set di dati basato su Spark, è necessario convertirlo in un set di dati Pandas in Spark usando la funzione to_pandas_on_spark. Ciò consente a FLAML di lavorare con i dati in modo efficiente.

# Convert the Spark training dataset to a Pandas on Spark dataset
df_automl = to_pandas_on_spark(train_data)

Eseguire la versione di valutazione di AutoML

A questo momento, viene eseguita la versione di valutazione di AutoML. Viene usata un'esecuzione MLflow annidata per tenere traccia dell'esperimento all'interno del contesto di esecuzione MLflow esistente. La versione di valutazione autoML viene eseguita sul set di dati Pandas in Spark (df_automl) con la variabile di destinazione "Exited e le impostazioni definite vengono passate alla funzione fit per la configurazione.

'''The main flaml automl API'''

with mlflow.start_run(nested=True):
    automl.fit(dataframe=df_automl, label='Exited', isUnbalance=True, **settings)

Visualizzare le metriche risultanti

In questa sezione finale vengono recuperati e visualizzati i risultati della versione di valutazione di AutoML. Queste metriche forniscono informazioni dettagliate sulle prestazioni e sulla configurazione del modello AutoML nel set di dati specificato.

# Retrieve and display the best hyperparameter configuration and metrics
print('Best hyperparameter config:', automl.best_config)
print('Best ROC AUC on validation data: {0:.4g}'.format(1 - automl.best_loss))
print('Training duration of the best run: {0:.4g} s'.format(automl.best_config_train_time))

Parallelizzare la versione di valutazione di AutoML con Apache Spark

Negli scenari in cui il set di dati può essere inserito in un singolo nodo e si vuole sfruttare la potenza di Spark per l'esecuzione simultanea di più versioni di valutazione autoML parallele, è possibile seguire questa procedura:

Convertire in frame di dati Pandas

Per abilitare la parallelizzazione, i dati devono prima essere convertiti in un dataframe Pandas.

pandas_df = train_raw.toPandas()

In questo caso, si converte il dataframe Spark train_raw in un dataframe Pandas denominato pandas_df per renderlo adatto per l'elaborazione parallela.

Configurare le impostazioni di parallelizzazione

Impostare use_spark su True per abilitare il parallelismo basato su Spark. Per impostazione predefinita, FLAML avvierà una versione di valutazione per ogni executor. È possibile personalizzare il numero di versioni di valutazione simultanee usando l'argomento n_concurrent_trials.

settings = {
    "time_budget": 250,           # Total running time in seconds
    "metric": 'roc_auc',         # Optimization metric (ROC AUC in this case)
    "task": 'classification',    # Task type (classification)
    "seed": 41,                  # Random seed
    "use_spark": True,           # Enable Spark-based parallelism
    "n_concurrent_trials": 3,    # Number of concurrent trials to run
    "force_cancel": True,        # Force stop training once time_budget is used up
    "mlflow_exp_name": "automl_sample"  # MLflow experiment name

}

In queste impostazioni si specifica che si vuole usare Spark per il parallelismo impostando use_spark su True. È stato anche impostato il numero di versioni di valutazione simultanee su 3, ovvero tre versioni di valutazione verranno eseguite in parallelo in Spark.

Per maggiori informazioni su come sincronizzare i percorsi AutoML, consulta la documentazione FLAML per i processi Spark paralleli.

Eseguire la versione di valutazione di AutoML in parallelo

Ora verrà eseguita la versione di valutazione di AutoML in parallelo con le impostazioni specificate. Verrà usata un'esecuzione MLflow annidata per tenere traccia dell'esperimento all'interno del contesto di esecuzione MLflow esistente.

'''The main FLAML AutoML API'''
with mlflow.start_run(nested=True, run_name="parallel_trial"):
    automl.fit(dataframe=pandas_df, label='Exited', **settings)

Verrà ora eseguita la versione di valutazione di AutoML con la parallelizzazione abilitata. L'argomento dataframe viene impostato sul dataframe Pandas pandas_df, e altre impostazioni vengono passate alla funzione fit per l'esecuzione parallela.

Visualizzare le metriche

Dopo aver eseguito la versione di valutazione parallela di AutoML, recuperare e visualizzare i risultati, tra cui la configurazione migliore degli iperparametri, l'AUC ROC sui dati di convalida e la durata del training dell'esecuzione con prestazioni migliori.

''' retrieve best config'''
print('Best hyperparmeter config:', automl.best_config)
print('Best roc_auc on validation data: {0:.4g}'.format(1-automl.best_loss))
print('Training duration of best run: {0:.4g} s'.format(automl.best_config_train_time))