Aggiornare il passaggio di esecuzione parallela all'SDK v2
In SDK v2, "Passaggio di esecuzione parallela" viene consolidato nel concetto di processo come parallel job
. Il processo parallelo mantiene la stessa destinazione per consentire agli utenti di accelerare l'esecuzione del processo distribuendo attività ripetute in cluster di calcolo multi-nodi potenti. Oltre al passaggio di esecuzione parallela, il processo parallelo v2 offre vantaggi aggiuntivi:
- Interfaccia flessibile, che consente all'utente di definire più input e output personalizzati per il processo parallelo. È possibile connetterli con altri passaggi per utilizzare o gestire il contenuto nello script di immissione
- Semplificare lo schema di input, che sostituisce
Dataset
come input usando il concetto v2data asset
. È possibile usare facilmente i file locali o l'URI della directory BLOB come input per il processo parallelo. - Le funzionalità più potenti sono sviluppate solo in processi paralleli v2. Ad esempio, riprendere il processo parallelo non riuscito/annullato per continuare a elaborare i mini batch non elaborati o non elaborati riutilizzando il risultato riuscito per risparmiare lavoro duplicato.
Per aggiornare il passaggio di esecuzione parallela dell'SDK v1 corrente alla versione 2, è necessario
- Usare
parallel_run_function
per creare un processo parallelo sostituendoParallelRunConfig
eParallelRunStep
in v1. - Aggiornare la pipeline v1 alla versione 2. Richiamare quindi il processo parallelo v2 come passaggio nella pipeline v2. Per informazioni dettagliate sull'aggiornamento della pipeline, vedere come aggiornare la pipeline dalla versione 1 alla versione 2 .
Nota: lo script di immissione utente è compatibile tra il passaggio di esecuzione parallela v1 e il processo parallelo v2. È quindi possibile continuare a usare lo stesso entry_script.py quando si aggiorna il processo di esecuzione parallela.
Questo articolo offre un confronto tra scenari in SDK v1 e SDK v2. Negli esempi seguenti verrà creato un processo parallelo per stimare i dati di input in un processo di pipeline. Si vedrà come creare un processo parallelo e come usarlo in un processo della pipeline per SDK v1 e SDK v2.
Prerequisiti
- Preparare l'ambiente SDK v2: installare Azure Machine Learning SDK v2 per Python
- Informazioni sulla base della pipeline SDK v2: Come creare una pipeline di Azure Machine Learning con Python SDK v2
Creare un passaggio parallelo
SDK v1
# Create the configuration to wrap the inference script from azureml.pipeline.steps import ParallelRunStep, ParallelRunConfig parallel_run_config = ParallelRunConfig( source_directory=scripts_folder, entry_script=script_file, mini_batch_size=PipelineParameter(name="batch_size_param", default_value="5"), error_threshold=10, output_action="append_row", append_row_file_name="mnist_outputs.txt", environment=batch_env, compute_target=compute_target, process_count_per_node=PipelineParameter(name="process_count_param", default_value=2), node_count=2 ) # Create the Parallel run step parallelrun_step = ParallelRunStep( name="predict-digits-mnist", parallel_run_config=parallel_run_config, inputs=[ input_mnist_ds_consumption ], output=output_dir, allow_reuse=False )
SDK v2
# parallel job to process file data file_batch_inference = parallel_run_function( name="file_batch_score", display_name="Batch Score with File Dataset", description="parallel component for batch score", inputs=dict( job_data_path=Input( type=AssetTypes.MLTABLE, description="The data to be split and scored in parallel", ) ), outputs=dict(job_output_path=Output(type=AssetTypes.MLTABLE)), input_data="${{inputs.job_data_path}}", instance_count=2, mini_batch_size="1", mini_batch_error_threshold=1, max_concurrency_per_instance=1, task=RunFunction( code="./src", entry_script="file_batch_inference.py", program_arguments="--job_output_path ${{outputs.job_output_path}}", environment="azureml:AzureML-sklearn-0.24-ubuntu18.04-py37-cpu:1", ), )
Usare il passaggio parallelo nella pipeline
SDK v1
# Run pipeline with parallel run step from azureml.core import Experiment pipeline = Pipeline(workspace=ws, steps=[parallelrun_step]) experiment = Experiment(ws, 'digit_identification') pipeline_run = experiment.submit(pipeline) pipeline_run.wait_for_completion(show_output=True)
SDK v2
@pipeline() def parallel_in_pipeline(pipeline_job_data_path, pipeline_score_model): prepare_file_tabular_data = prepare_data(input_data=pipeline_job_data_path) # output of file & tabular data should be type MLTable prepare_file_tabular_data.outputs.file_output_data.type = AssetTypes.MLTABLE prepare_file_tabular_data.outputs.tabular_output_data.type = AssetTypes.MLTABLE batch_inference_with_file_data = file_batch_inference( job_data_path=prepare_file_tabular_data.outputs.file_output_data ) # use eval_mount mode to handle file data batch_inference_with_file_data.inputs.job_data_path.mode = ( InputOutputModes.EVAL_MOUNT ) batch_inference_with_file_data.outputs.job_output_path.type = AssetTypes.MLTABLE batch_inference_with_tabular_data = tabular_batch_inference( job_data_path=prepare_file_tabular_data.outputs.tabular_output_data, score_model=pipeline_score_model, ) # use direct mode to handle tabular data batch_inference_with_tabular_data.inputs.job_data_path.mode = ( InputOutputModes.DIRECT ) return { "pipeline_job_out_file": batch_inference_with_file_data.outputs.job_output_path, "pipeline_job_out_tabular": batch_inference_with_tabular_data.outputs.job_output_path, } pipeline_job_data_path = Input( path="./dataset/", type=AssetTypes.MLTABLE, mode=InputOutputModes.RO_MOUNT ) pipeline_score_model = Input( path="./model/", type=AssetTypes.URI_FOLDER, mode=InputOutputModes.DOWNLOAD ) # create a pipeline pipeline_job = parallel_in_pipeline( pipeline_job_data_path=pipeline_job_data_path, pipeline_score_model=pipeline_score_model, ) pipeline_job.outputs.pipeline_job_out_tabular.type = AssetTypes.URI_FILE # set pipeline level compute pipeline_job.settings.default_compute = "cpu-cluster" # run pipeline job pipeline_job = ml_client.jobs.create_or_update( pipeline_job, experiment_name="pipeline_samples" )
Mapping delle funzionalità chiave in SDK v1 e SDK v2
Funzionalità nell'SDK v1 | Mapping approssimativo in SDK v2 |
---|---|
azureml.pipeline.steps.parallelrunconfig azureml.pipeline.steps.parallelrunstep |
azure.ai.ml.parallel |
OutputDatasetConfig | Output |
set di dati as_mount | Input |
Mapping di impostazioni e configurazioni dei processi paralleli
SDK v1 | SDK v2 | Descrizione |
---|---|---|
ParallelRunConfig.environment | parallel_run_function.task.environment | Ambiente in cui verrà eseguito il processo di training. |
ParallelRunConfig.entry_script | parallel_run_function.task.entry_script | Script utente che verrà eseguito in parallelo su più nodi. |
ParallelRunConfig.error_threshold | parallel_run_function.error_threshold | Numero di mini batch non riusciti che potrebbero essere ignorati in questo processo parallelo. Se il numero di mini batch non riusciti è superiore a questa soglia, il processo in parallelo verrà contrassegnato come non riuscito. "-1" è il numero predefinito e indica che tutti i mini batch non riusciti durante il processo in parallelo verranno ignorati. |
ParallelRunConfig.output_action | parallel_run_function.append_row_to | Aggrega tutti i valori restituiti da ogni esecuzione di mini batch e li restituisce come output in questo file. Può fare riferimento a uno degli output del processo in parallelo usando l'espressione ${{outputs.<output_name>}} |
ParallelRunConfig.node_count | parallel_run_function.instance_count | Numero facoltativo di istanze o nodi usati dalla destinazione di calcolo. Assume il valore predefinito 1. |
ParallelRunConfig.process_count_per_node | parallel_run_function.max_concurrency_per_instance | Parallelismo massimo di ogni istanza di calcolo. |
ParallelRunConfig.mini_batch_size | parallel_run_function.mini_batch_size | Definire le dimensioni di ogni mini batch per suddividere l'input. Se il input_data è una cartella o un set di file, questo numero definisce il numero di file per ogni mini batch. Ad esempio, 10, 100. Se il input_data è dati tabulari da mltable , questo numero definisce le dimensioni fisiche prossime per ogni mini batch. L'unità predefinita è Byte e il valore può accettare una stringa come 100 kb, 100 mb. |
ParallelRunConfig.source_directory | parallel_run_function.task.code | Percorso locale o remoto che punta al codice sorgente. |
ParallelRunConfig.description | parallel_run_function.description | Descrizione descrittiva del parallelismo |
ParallelRunConfig.logging_level | parallel_run_function.logging_level | Stringa del nome del livello di registrazione, definito in 'logging'. I valori possibili sono 'WARNING', 'INFO' e 'DEBUG'. (facoltativo, il valore predefinito è 'INFO'). Questo valore può essere impostato tramite PipelineParameter. |
ParallelRunConfig.run_invocation_timeout | parallel_run_function.retry_settings.timeout | Timeout in secondi per l'esecuzione di una funzione run() personalizzata. Se il tempo di esecuzione è superiore a questa soglia, il mini batch verrà interrotto e contrassegnato come mini batch non riuscito per attivare nuovi tentativi. |
ParallelRunConfig.run_max_try | parallel_run_function.retry_settings.max_tentativi | Numero di tentativi quando mini-batch non è riuscito o timeout. Se tutti i tentativi non sono riusciti, il mini batch verrà contrassegnato come non riuscito per il calcolo mini_batch_error_threshold. |
ParallelRunConfig.append_row_file_name | parallel_run_function.append_row_to | Combinato con append_row_to l'impostazione. |
ParallelRunConfig.allowed_failed_count | parallel_run_function.mini_batch_error_threshold | Numero di mini batch non riusciti che potrebbero essere ignorati in questo processo parallelo. Se il numero di mini batch non riusciti è superiore a questa soglia, il processo in parallelo verrà contrassegnato come non riuscito. "-1" è il numero predefinito e indica che tutti i mini batch non riusciti durante il processo in parallelo verranno ignorati. |
ParallelRunConfig.allowed_failed_percent | parallel_run_function.task.program_arguments impostato --allowed_failed_percent |
Analogamente a "allowed_failed_count", ma questa impostazione usa la percentuale di mini batch non riusciti anziché il numero di errori del mini batch. L'intervallo di questa impostazione è [0, 100]. "100" è il numero predefinito, ovvero ignorare tutti i mini batch non riusciti durante il processo parallelo. |
ParallelRunConfig.partition_keys | In fase di sviluppo. | |
ParallelRunConfig.environment_variables | parallel_run_function.environment_variables | Dizionario di nomi e valori delle variabili di ambiente. Queste variabili di ambiente vengono impostate nel processo in cui viene eseguito lo script utente. |
ParallelRunStep.name | parallel_run_function.name | Nome del processo o del componente parallelo creato. |
ParallelRunStep.inputs | parallel_run_function.inputs | Una deviazione di input usata da questo parallelo. |
-- | parallel_run_function.input_data | Dichiarare i dati da suddividere ed elaborare con parallelismo |
ParallelRunStep.output | parallel_run_function.outputs | Output di questo processo parallelo. |
ParallelRunStep.side_inputs | parallel_run_function.inputs | Definito insieme a inputs . |
ParallelRunStep.arguments | parallel_run_function.task.program_arguments | Argomenti dell'attività parallela. |
ParallelRunStep.allow_reuse | parallel_run_function.is_deterministic | Specificare se il parallel restituirà lo stesso output dato lo stesso input. |
Passaggi successivi
Per altre informazioni, vedere la documentazione qui: