Condividi tramite


Esercitazione 5: Sviluppare un set di funzionalità con un'origine personalizzata

Un archivio delle funzionalità gestite di Azure Machine Learning consente di individuare, creare e rendere operative le funzionalità. Le funzionalità fungono da tessuto connettivo nel ciclo di vita del Machine Learning, a partire dalla fase di creazione dei prototipi, in cui si sperimentano diverse funzionalità. Questo ciclo di vita continua con la fase di operazionalizzazione, in cui vengono distribuiti i modelli e vengono eseguiti i passaggi di inferenza per cercare i dati delle funzionalità. Per altre informazioni sugli archivi funzionalità, visitare la risorsa concetti relativi all'archivio funzionalità.

Nella prima parte di questa esercitazione è stato illustrato come creare una specifica del set di funzionalità con trasformazioni personalizzate, abilitare la materializzazione ed eseguire un back-fill. Nella parte 2 è stato spiegato come sperimentare le funzionalità nei flussi di sperimentazione e training. Nella parte 3 è stata illustrata la materializzazione ricorrente per il set di funzionalità transactions ed è stato spiegato come eseguire una pipeline di inferenza batch nel modello registrato. Nella parte 4 è stato descritto come eseguire l'inferenza batch.

In questa esercitazione si apprenderà come:

  • Definire la logica per caricare dati da un'origine dati personalizzata.
  • Configurare e registrare un set di funzionalità da utilizzare da questa origine dati personalizzata.
  • Testare il set di funzionalità registrato.

Prerequisiti

Nota

Per questa esercitazione si usa un notebook di Azure Machine Learning con l'ambiente di calcolo Spark serverless.

  • Assicurarsi di completare le esercitazioni precedenti in questa serie. Questa esercitazione riutilizza l'archivio funzionalità e altre risorse create in queste esercitazioni precedenti.

Impostazione

In questa esercitazione si usa l'SDK principale dell'archivio delle funzionalità Python (azureml-featurestore). Python SDK viene usato per operazioni di creazione, lettura, aggiornamento ed eliminazione (CRUD) in archivi funzionalità, set di funzionalità ed entità di archiviazione delle funzionalità.

Non è necessario installare in modo esplicito queste risorse per questa esercitazione, perché il file conda.yml, nelle istruzioni di configurazione illustrate di seguito, le include tutte.

Configurare il notebook Spark di Azure Machine Learning

È possibile creare un nuovo notebook ed eseguire le istruzioni riportate in questa esercitazione, procedura dettagliata. È anche possibile aprire ed eseguire il notebook esistente featurestore_sample/notebooks/sdk_only/5.Develop-feature-set-custom-source.ipynb. Mantenere aperta questa esercitazione e farvi riferimento per i collegamenti alla documentazione e per altre spiegazioni.

  1. Nel menu in alto selezionare Ambiente di calcolo Spark serverless nell'elenco a discesa Ambiente di calcolo in Azure Machine Learning Serverless Spark.

  2. Configurare la sessione:

    1. Selezionare Configura sessione sulla barra di stato in alto
    2. Selezionare la scheda Pacchetti Python, selezionare Carica file Conda
    3. Selezionare Carica file Conda
    4. Caricare il file conda.yml caricato nella prima esercitazione
    5. Facoltativamente, aumentare il timeout della sessione (tempo di inattività) per evitare frequenti riesecuzioni dei prerequisiti

Configurare la directory radice per gli esempi

Questa cella di codice consente di configurare la directory radice per gli esempi. Sono necessari circa 10 minuti per installare tutte le dipendenze e avviare la sessione Spark.

import os

# Please update the dir to ./Users/{your_user_alias} (or any custom directory you uploaded the samples to).
# You can find the name from the directory structure in the left navigation panel.
root_dir = "./Users/<your_user_alias>/featurestore_sample"

if os.path.isdir(root_dir):
    print("The folder exists.")
else:
    print("The folder does not exist. Please create or fix the path")

Inizializzare il client CRUD dell'area di lavoro dell'archivio delle funzionalità

Inizializzare il client MLClient per l'area di lavoro dell'archivio delle funzionalità per gestire le operazioni di creazione, lettura, aggiornamento ed eliminazione (CRUD) in tale area di lavoro.

from azure.ai.ml import MLClient
from azure.ai.ml.identity import AzureMLOnBehalfOfCredential

# Feature store
featurestore_name = (
    "<FEATURESTORE_NAME>"  # use the same name that was used in the tutorial #1
)
featurestore_subscription_id = os.environ["AZUREML_ARM_SUBSCRIPTION"]
featurestore_resource_group_name = os.environ["AZUREML_ARM_RESOURCEGROUP"]

# Feature store ml client
fs_client = MLClient(
    AzureMLOnBehalfOfCredential(),
    featurestore_subscription_id,
    featurestore_resource_group_name,
    featurestore_name,
)

Inizializzare il client SDK principale dell'archivio delle funzionalità.

Come accennato in precedenza, per questa esercitazione si usa l'SDK principale dell'archivio delle funzionalità Python (azureml-featurestore). Questo client SDK inizializzato gestisce le operazioni di creazione, lettura, aggiornamento ed eliminazione (CRUD) negli archivi delle funzionalità, nei set di funzionalità e nelle entità dell'archivio delle funzionalità.

from azureml.featurestore import FeatureStoreClient
from azure.ai.ml.identity import AzureMLOnBehalfOfCredential

featurestore = FeatureStoreClient(
    credential=AzureMLOnBehalfOfCredential(),
    subscription_id=featurestore_subscription_id,
    resource_group_name=featurestore_resource_group_name,
    name=featurestore_name,
)

Definizione dell'origine personalizzata

È possibile definire la logica di caricamento dell'origine personalizzata da qualsiasi risorsa di archiviazione dei dati che includa una definizione dell'origine personalizzata. Per usare questa funzionalità, implementare una classe di funzione definita dall'utente del processore di origine (CustomSourceTransformer in questa esercitazione). Questa classe deve definire una funzione __init__(self, **kwargs) e una funzione process(self, start_time, end_time, **kwargs). Come parte della definizione della specifica del set di funzionalità viene fornito il dizionario kwargs. Questa definizione viene quindi passata alla funzione definita dall'utente. I parametri start_time e end_time vengono calcolati e passati alla funzione definita dall'utente.

Questo è il codice di esempio per la classe di funzione definita dall'utente del processore di origine:

from datetime import datetime

class CustomSourceTransformer:
    def __init__(self, **kwargs):
        self.path = kwargs.get("source_path")
        self.timestamp_column_name = kwargs.get("timestamp_column_name")
        if not self.path:
            raise Exception("`source_path` is not provided")
        if not self.timestamp_column_name:
            raise Exception("`timestamp_column_name` is not provided")

    def process(
        self, start_time: datetime, end_time: datetime, **kwargs
    ) -> "pyspark.sql.DataFrame":
        from pyspark.sql import SparkSession
        from pyspark.sql.functions import col, lit, to_timestamp

        spark = SparkSession.builder.getOrCreate()
        df = spark.read.json(self.path)

        if start_time:
            df = df.filter(col(self.timestamp_column_name) >= to_timestamp(lit(start_time)))

        if end_time:
            df = df.filter(col(self.timestamp_column_name) < to_timestamp(lit(end_time)))

        return df

Creare una specifica del set di funzionalità con un'origine personalizzata e sperimentarla in locale

Creare ora una specifica del set di funzionalità con una definizione dell'origine personalizzata e usarla nell'ambiente di sviluppo per sperimentare il set di funzionalità. Il notebook dell'esercitazione collegato all'ambiente di calcolo Spark serverless funge da ambiente di sviluppo.

from azureml.featurestore import create_feature_set_spec
from azureml.featurestore.feature_source import CustomFeatureSource
from azureml.featurestore.contracts import (
    SourceProcessCode,
    TransformationCode,
    Column,
    ColumnType,
    DateTimeOffset,
    TimestampColumn,
)

transactions_source_process_code_path = (
    root_dir
    + "/featurestore/featuresets/transactions_custom_source/source_process_code"
)
transactions_feature_transform_code_path = (
    root_dir
    + "/featurestore/featuresets/transactions_custom_source/feature_process_code"
)

udf_featureset_spec = create_feature_set_spec(
    source=CustomFeatureSource(
        kwargs={
            "source_path": "wasbs://data@azuremlexampledata.blob.core.windows.net/feature-store-prp/datasources/transactions-source-json/*.json",
            "timestamp_column_name": "timestamp",
        },
        timestamp_column=TimestampColumn(name="timestamp"),
        source_delay=DateTimeOffset(days=0, hours=0, minutes=20),
        source_process_code=SourceProcessCode(
            path=transactions_source_process_code_path,
            process_class="source_process.CustomSourceTransformer",
        ),
    ),
    feature_transformation=TransformationCode(
        path=transactions_feature_transform_code_path,
        transformer_class="transaction_transform.TransactionFeatureTransformer",
    ),
    index_columns=[Column(name="accountID", type=ColumnType.string)],
    source_lookback=DateTimeOffset(days=7, hours=0, minutes=0),
    temporal_join_lookback=DateTimeOffset(days=1, hours=0, minutes=0),
    infer_schema=True,
)

udf_featureset_spec

Definire quindi una finestra delle funzionalità e visualizzare i valori delle funzionalità in tale finestra.

from datetime import datetime

st = datetime(2023, 1, 1)
et = datetime(2023, 6, 1)

display(
    udf_featureset_spec.to_spark_dataframe(
        feature_window_start_date_time=st, feature_window_end_date_time=et
    )
)

Esportare come specifica del set di funzionalità.

Per registrare la specifica del set di funzionalità con l'archivio delle funzionalità, salvare tale specifica in un formato particolare. Esaminare la specifica del set di funzionalità transactions_custom_source generata. Aprire questo file dall'albero dei file per visualizzare la specifica: featurestore/featuresets/transactions_custom_source/spec/FeaturesetSpec.yaml.

La specifica include questi elementi:

  • features: un elenco delle funzionalità e dei relativi tipi di dati.
  • index_columns: le chiavi di join necessarie per accedere ai valori dal set di funzionalità.

Per altre informazioni sulla specifica, vedere Informazioni sulle entità di primo livello nelle risorse dello schema YAML archivio delle funzionalità gestite e dell'interfaccia della riga di comando (v2).

La persistenza delle specifiche del set di funzionalità offre un altro vantaggio: la specifica del set di funzionalità può essere inclusa nel controllo del codice sorgente.

feature_spec_folder = (
    root_dir + "/featurestore/featuresets/transactions_custom_source/spec"
)

udf_featureset_spec.dump(feature_spec_folder)

Registrare il set di funzionalità transaction con l'archivio delle funzionalità

Usare questo codice per registrare un asset del set di funzionalità caricato dall'origine personalizzata con l'archivio delle funzionalità. È quindi possibile riutilizzare tale asset e condividerlo facilmente. La registrazione di un asset del set di funzionalità offre funzionalità gestite, tra cui il controllo delle versioni e la materializzazione.

from azure.ai.ml.entities import FeatureSet, FeatureSetSpecification

transaction_fset_config = FeatureSet(
    name="transactions_custom_source",
    version="1",
    description="transactions feature set loaded from custom source",
    entities=["azureml:account:1"],
    stage="Development",
    specification=FeatureSetSpecification(path=feature_spec_folder),
    tags={"data_type": "nonPII"},
)

poller = fs_client.feature_sets.begin_create_or_update(transaction_fset_config)
print(poller.result())

Ottenere il set di funzionalità registrato e stampare le informazioni correlate.

# Look up the feature set by providing name and version
transactions_fset_config = featurestore.feature_sets.get(
    name="transactions_custom_source", version="1"
)
# Print feature set information
print(transactions_fset_config)

Testare la generazione di funzionalità dal set di funzionalità registrato

Usare la funzione to_spark_dataframe() del set di funzionalità per testare la generazione di funzionalità dal set di funzionalità registrato e visualizzare le funzionalità. print-txn-fset-sample-values

df = transactions_fset_config.to_spark_dataframe()
display(df)

Dovrebbe essere possibile recuperare correttamente il set di funzionalità registrato come dataframe Spark e quindi visualizzarlo. È ora possibile usare queste funzionalità per un join temporizzato con i dati di osservazione e i passaggi successivi nella pipeline di Machine Learning.

Eseguire la pulizia

Se è stato creato un gruppo di risorse per l'esercitazione, è possibile eliminarlo per eliminare tutte le risorse associate a questa esercitazione. In caso contrario, è possibile eliminare le risorse singolarmente:

  • Per eliminare l'archivio delle funzionalità, aprire il gruppo di risorse nel portale di Azure, selezionare l'archivio ed eliminarlo.
  • L'identità gestita assegnata dall'utente assegnata all'area di lavoro dell'archivio delle funzionalità non viene eliminata quando si elimina l'archivio. Per eliminare l'identità gestita assegnata dall'utente, seguire queste istruzioni.
  • Per eliminare un archivio offline di tipo account di archiviazione, aprire il gruppo di risorse nel portale di Azure, selezionare la risorsa di archiviazione creata ed eliminarla.
  • Per eliminare un'istanza della cache di Azure per Redis, aprire il gruppo di risorse nel portale di Azure, selezionare l'istanza creata ed eliminarla.

Passaggi successivi