Aggiornare le pipeline all'SDK v2
In SDK v2 le "pipeline" vengono consolidate in processi.
Un processo ha un tipo. La maggior parte dei processi sono processi di comando che eseguono un command
oggetto , ad esempio python main.py
. Ciò che viene eseguito in un processo è indipendente da qualsiasi linguaggio di programmazione, quindi è possibile eseguire script bash
, richiamare interpreti python
, eseguire una serie di comandi curl
o qualsiasi altra operazione.
Un pipeline
è un altro tipo di processo, che definisce processi figlio che possono avere relazioni di input/output, formando un grafo aciclico diretto (DAG).
Per eseguire l'aggiornamento, è necessario modificare il codice per definire e inviare le pipeline all'SDK v2. L'esecuzione all'interno del processo figlio non deve essere aggiornata all'SDK v2. È tuttavia consigliabile rimuovere qualsiasi codice specifico per Azure Machine Learning dagli script di training del modello. Questa separazione consente una transizione più semplice tra locale e cloud ed è considerata una procedura consigliata per MLOps maturi. In pratica, ciò significa rimuovere azureml.*
righe di codice. La registrazione del modello e il codice di rilevamento devono essere sostituiti con MLflow. Per altre informazioni, vedere come usare MLflow nella versione 2.
Questo articolo offre un confronto tra scenari in SDK v1 e SDK v2. Negli esempi seguenti verranno compilati tre passaggi (training, punteggio e valutazione) in un processo fittizio della pipeline. In questo modo viene illustrato come compilare processi della pipeline usando SDK v1 e SDK v2 e come usare i dati e trasferire i dati tra i passaggi.
Eseguire una pipeline
SDK v1
# import required libraries import os import azureml.core from azureml.core import ( Workspace, Dataset, Datastore, ComputeTarget, Experiment, ScriptRunConfig, ) from azureml.pipeline.steps import PythonScriptStep from azureml.pipeline.core import Pipeline # check core SDK version number print("Azure Machine Learning SDK Version: ", azureml.core.VERSION) # load workspace workspace = Workspace.from_config() print( "Workspace name: " + workspace.name, "Azure region: " + workspace.location, "Subscription id: " + workspace.subscription_id, "Resource group: " + workspace.resource_group, sep="\n", ) # create an ML experiment experiment = Experiment(workspace=workspace, name="train_score_eval_pipeline") # create a directory script_folder = "./src" # create compute from azureml.core.compute import ComputeTarget, AmlCompute from azureml.core.compute_target import ComputeTargetException # Choose a name for your CPU cluster amlcompute_cluster_name = "cpu-cluster" # Verify that cluster does not exist already try: aml_compute = ComputeTarget(workspace=workspace, name=amlcompute_cluster_name) print('Found existing cluster, use it.') except ComputeTargetException: compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_DS12_V2', max_nodes=4) aml_compute = ComputeTarget.create(ws, amlcompute_cluster_name, compute_config) aml_compute.wait_for_completion(show_output=True) # define data set data_urls = ["wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv"] input_ds = Dataset.File.from_files(data_urls) # define steps in pipeline from azureml.data import OutputFileDatasetConfig model_output = OutputFileDatasetConfig('model_output') train_step = PythonScriptStep( name="train step", script_name="train.py", arguments=['--training_data', input_ds.as_named_input('training_data').as_mount() ,'--max_epocs', 5, '--learning_rate', 0.1,'--model_output', model_output], source_directory=script_folder, compute_target=aml_compute, allow_reuse=True, ) score_output = OutputFileDatasetConfig('score_output') score_step = PythonScriptStep( name="score step", script_name="score.py", arguments=['--model_input',model_output.as_input('model_input'), '--test_data', input_ds.as_named_input('test_data').as_mount(), '--score_output', score_output], source_directory=script_folder, compute_target=aml_compute, allow_reuse=True, ) eval_output = OutputFileDatasetConfig('eval_output') eval_step = PythonScriptStep( name="eval step", script_name="eval.py", arguments=['--scoring_result',score_output.as_input('scoring_result'), '--eval_output', eval_output], source_directory=script_folder, compute_target=aml_compute, allow_reuse=True, ) # built pipeline from azureml.pipeline.core import Pipeline pipeline_steps = [train_step, score_step, eval_step] pipeline = Pipeline(workspace = workspace, steps=pipeline_steps) print("Pipeline is built.") pipeline_run = experiment.submit(pipeline, regenerate_outputs=False) print("Pipeline submitted for execution.")
SDK v2. Collegamento completo di esempio
# import required libraries from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential from azure.ai.ml import MLClient, Input from azure.ai.ml.dsl import pipeline try: credential = DefaultAzureCredential() # Check if given credential can get token successfully. credential.get_token("https://management.azure.com/.default") except Exception as ex: # Fall back to InteractiveBrowserCredential in case DefaultAzureCredential not work credential = InteractiveBrowserCredential() # Get a handle to workspace ml_client = MLClient.from_config(credential=credential) # Retrieve an already attached Azure Machine Learning Compute. cluster_name = "cpu-cluster" print(ml_client.compute.get(cluster_name)) # Import components that are defined with Python function with open("src/components.py") as fin: print(fin.read()) # You need to install mldesigner package to use command_component decorator. # Option 1: install directly # !pip install mldesigner # Option 2: install as an extra dependency of azure-ai-ml # !pip install azure-ai-ml[designer] # import the components as functions from src.components import train_model, score_data, eval_model cluster_name = "cpu-cluster" # define a pipeline with component @pipeline(default_compute=cluster_name) def pipeline_with_python_function_components(input_data, test_data, learning_rate): """E2E dummy train-score-eval pipeline with components defined via Python function components""" # Call component obj as function: apply given inputs & parameters to create a node in pipeline train_with_sample_data = train_model( training_data=input_data, max_epochs=5, learning_rate=learning_rate ) score_with_sample_data = score_data( model_input=train_with_sample_data.outputs.model_output, test_data=test_data ) eval_with_sample_data = eval_model( scoring_result=score_with_sample_data.outputs.score_output ) # Return: pipeline outputs return { "eval_output": eval_with_sample_data.outputs.eval_output, "model_output": train_with_sample_data.outputs.model_output, } pipeline_job = pipeline_with_python_function_components( input_data=Input( path="wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv", type="uri_file" ), test_data=Input( path="wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv", type="uri_file" ), learning_rate=0.1, ) # submit job to workspace pipeline_job = ml_client.jobs.create_or_update( pipeline_job, experiment_name="train_score_eval_pipeline" )
Mapping delle funzionalità chiave in SDK v1 e SDK v2
Funzionalità nell'SDK v1 | Mapping approssimativo in SDK v2 |
---|---|
azureml.pipeline.core.Pipeline | azure.ai.ml.dsl.pipeline |
OutputDatasetConfig | Output |
set di dati as_mount | Input |
StepSequence | Dipendenza dei dati |
Mapping dei tipi di passaggio e processo/componente
passaggio in SDK v1 | tipo di processo nell'SDK v2 | tipo di componente nell'SDK v2 |
---|---|---|
adla_step |
None | None |
automl_step |
automl lavoro |
Componente automl |
azurebatch_step |
None | None |
command_step |
command lavoro |
command componente |
data_transfer_step |
None | None |
databricks_step |
None | None |
estimator_step |
command lavoro |
command componente |
hyper_drive_step |
sweep lavoro |
None |
kusto_step |
None | None |
module_step |
None | command componente |
mpi_step |
command lavoro |
command componente |
parallel_run_step |
Parallel lavoro |
Componente Parallel |
python_script_step |
command lavoro |
command componente |
r_script_step |
command lavoro |
command componente |
synapse_spark_step |
spark lavoro |
spark componente |
Pipeline pubblicate
Dopo che una pipeline è in esecuzione, è possibile pubblicare una pipeline in modo che venga eseguita con input diversi. Questa operazione è nota come Pipeline pubblicate. L'endpoint batch propone un modo ancora più efficace per gestire più asset in esecuzione in un'API durevole, motivo per cui la funzionalità Pipeline pubblicate è stata spostata nelle distribuzioni dei componenti della pipeline negli endpoint batch.
Gli endpoint batch separano l'interfaccia (endpoint) dall'implementazione effettiva (distribuzione) e consentono all'utente di decidere quale distribuzione serve l'implementazione predefinita dell'endpoint. Le distribuzioni dei componenti della pipeline negli endpoint batch consentono agli utenti di distribuire componenti della pipeline invece delle pipeline, che usano meglio gli asset riutilizzabili per tali organizzazioni cercando di semplificare la procedura MLOps.
La tabella seguente illustra un confronto tra ognuno dei concetti:
Concetto | SDK v1 | SDK v2 |
---|---|---|
Endpoint REST della pipeline per la chiamata | Endpoint della pipeline | Endpoint batch |
Versione specifica della pipeline nell'endpoint | Pipeline pubblicata | Distribuzione di componenti della pipeline |
Argomenti della pipeline sulla chiamata | Parametro della pipeline | Input del processo |
Processo generato da una pipeline pubblicata | Processo della pipeline | Processo batch |
Per indicazioni specifiche su come eseguire la migrazione agli endpoint batch, vedere Aggiornare gli endpoint della pipeline all'SDK v2 .
Documenti correlati
Per altre informazioni, vedere la documentazione qui: