將平行執行步驟升級至 SDK v2
在 SDK v2 中,「平行執行步驟」會合併為作業概念。parallel job
平行作業會保留相同的目標,讓用戶能夠在功能強大的多節點計算叢集上散發重複的工作,以加速其作業執行。 在平行執行步驟之上,v2 平行作業可提供額外的優點:
- 彈性介面,可讓使用者為平行作業定義多個自定義輸入和輸出。 您可以使用其他步驟來連接它們,以取用或管理其專案腳本中的內容
- 簡化輸入架構,使用 v2
data asset
概念取代Dataset
為輸入。 您可以輕鬆地使用本機檔案或 Blob 目錄 URI 作為平行作業的輸入。 - 只有 v2 平行作業才開發更強大的功能。 例如,繼續失敗/取消的平行作業,藉由重複使用成功的結果來節省重複的工作,以繼續處理失敗或未處理的迷你批次。
若要將目前的 sdk v1 平行執行步驟升級至 v2,您必須
- 使用
parallel_run_function
來取代ParallelRunConfig
v1 中的和ParallelRunStep
來建立平行作業。 - 將您的 v1 管線升級至 v2。 然後叫用 v2 平行作業作為 v2 管線中的步驟。 如需管線升級的詳細數據,請參閱 如何將管線從 v1 升級至 v2 。
注意:使用者 輸入文本 在 v1 平行執行步驟與 v2 平行作業之間相容。 因此,當您升級平行執行作業時,您可以繼續使用相同的entry_script.py。
本文提供 SDK v1 和 SDK v2 中案例的比較。 在下列範例中,我們將建置平行作業來預測管線作業中的輸入數據。 您將瞭解如何建置平行作業,以及如何在 SDK v1 和 SDK v2 的管線作業中使用。
必要條件
- 準備 SDK v2 環境:安裝適用於 Python 的 Azure 機器學習 SDK v2
- 瞭解 SDK v2 管線的基礎:如何使用 Python SDK v2 建立 Azure 機器學習 管線
建立平行步驟
SDK v1
# Create the configuration to wrap the inference script from azureml.pipeline.steps import ParallelRunStep, ParallelRunConfig parallel_run_config = ParallelRunConfig( source_directory=scripts_folder, entry_script=script_file, mini_batch_size=PipelineParameter(name="batch_size_param", default_value="5"), error_threshold=10, output_action="append_row", append_row_file_name="mnist_outputs.txt", environment=batch_env, compute_target=compute_target, process_count_per_node=PipelineParameter(name="process_count_param", default_value=2), node_count=2 ) # Create the Parallel run step parallelrun_step = ParallelRunStep( name="predict-digits-mnist", parallel_run_config=parallel_run_config, inputs=[ input_mnist_ds_consumption ], output=output_dir, allow_reuse=False )
SDK v2
# parallel job to process file data file_batch_inference = parallel_run_function( name="file_batch_score", display_name="Batch Score with File Dataset", description="parallel component for batch score", inputs=dict( job_data_path=Input( type=AssetTypes.MLTABLE, description="The data to be split and scored in parallel", ) ), outputs=dict(job_output_path=Output(type=AssetTypes.MLTABLE)), input_data="${{inputs.job_data_path}}", instance_count=2, mini_batch_size="1", mini_batch_error_threshold=1, max_concurrency_per_instance=1, task=RunFunction( code="./src", entry_script="file_batch_inference.py", program_arguments="--job_output_path ${{outputs.job_output_path}}", environment="azureml:AzureML-sklearn-0.24-ubuntu18.04-py37-cpu:1", ), )
在管線中使用平行步驟
SDK v1
# Run pipeline with parallel run step from azureml.core import Experiment pipeline = Pipeline(workspace=ws, steps=[parallelrun_step]) experiment = Experiment(ws, 'digit_identification') pipeline_run = experiment.submit(pipeline) pipeline_run.wait_for_completion(show_output=True)
SDK v2
@pipeline() def parallel_in_pipeline(pipeline_job_data_path, pipeline_score_model): prepare_file_tabular_data = prepare_data(input_data=pipeline_job_data_path) # output of file & tabular data should be type MLTable prepare_file_tabular_data.outputs.file_output_data.type = AssetTypes.MLTABLE prepare_file_tabular_data.outputs.tabular_output_data.type = AssetTypes.MLTABLE batch_inference_with_file_data = file_batch_inference( job_data_path=prepare_file_tabular_data.outputs.file_output_data ) # use eval_mount mode to handle file data batch_inference_with_file_data.inputs.job_data_path.mode = ( InputOutputModes.EVAL_MOUNT ) batch_inference_with_file_data.outputs.job_output_path.type = AssetTypes.MLTABLE batch_inference_with_tabular_data = tabular_batch_inference( job_data_path=prepare_file_tabular_data.outputs.tabular_output_data, score_model=pipeline_score_model, ) # use direct mode to handle tabular data batch_inference_with_tabular_data.inputs.job_data_path.mode = ( InputOutputModes.DIRECT ) return { "pipeline_job_out_file": batch_inference_with_file_data.outputs.job_output_path, "pipeline_job_out_tabular": batch_inference_with_tabular_data.outputs.job_output_path, } pipeline_job_data_path = Input( path="./dataset/", type=AssetTypes.MLTABLE, mode=InputOutputModes.RO_MOUNT ) pipeline_score_model = Input( path="./model/", type=AssetTypes.URI_FOLDER, mode=InputOutputModes.DOWNLOAD ) # create a pipeline pipeline_job = parallel_in_pipeline( pipeline_job_data_path=pipeline_job_data_path, pipeline_score_model=pipeline_score_model, ) pipeline_job.outputs.pipeline_job_out_tabular.type = AssetTypes.URI_FILE # set pipeline level compute pipeline_job.settings.default_compute = "cpu-cluster" # run pipeline job pipeline_job = ml_client.jobs.create_or_update( pipeline_job, experiment_name="pipeline_samples" )
SDK v1 和 SDK v2 中的主要功能對應
SDK v1 中的功能 | SDK v2 中的粗略對應 |
---|---|
azureml.pipeline.steps.parallelrunconfig azureml.pipeline.steps.parallelrunstep |
azure.ai.ml.parallel |
OutputDatasetConfig | 輸出 |
數據集as_mount | 輸入 |
平行作業組態和設定對應
SDK v1 | SDK v2 | 描述 |
---|---|---|
ParallelRunConfig.environment | parallel_run_function.task.environment | 定型作業將在 中執行的環境。 |
ParallelRunConfig.entry_script | parallel_run_function.task.entry_script | 將在多個節點上平行執行的用戶腳本。 |
ParallelRunConfig.error_threshold | parallel_run_function.error_threshold | 此平行作業中可以忽略的失敗迷你批次數目。 如果失敗的迷你批次計數高於此臨界值,平行作業將會標示為失敗。 「-1」是預設數字,表示在平行作業期間忽略所有失敗的迷你批次。 |
ParallelRunConfig.output_action | parallel_run_function.append_row_to | 匯總每個迷你批次回合的所有傳回,並將其輸出到此檔案中。 可以使用運算式 ${{outputs.<output_name>}} 參考其中一個平行作業輸出。 |
ParallelRunConfig.node_count | parallel_run_function.instance_count | 計算目標所使用的實例或節點選擇性數目。 預設值為 1。 |
ParallelRunConfig.process_count_per_node | parallel_run_function.max_concurrency_per_instance | 每個計算實例的最大平行處理原則。 |
ParallelRunConfig.mini_batch_size | parallel_run_function.mini_batch_size | 定義每個迷你批次的大小,以分割輸入。 如果input_data是資料夾或檔案集,此數位會定義每個迷你批次的檔案計數。 例如,10、100。 如果input_data是 來自 mltable 的表格式數據,這個數位會定義每個迷你批次的親和實體大小。 默認單位為 Byte,值可以接受字串,例如 100 kb、100 mb。 |
ParallelRunConfig.source_directory | parallel_run_function.task.code | 指向原始碼的本機或遠端路徑。 |
ParallelRunConfig.description | parallel_run_function.description | 平行的易記描述 |
ParallelRunConfig.logging_level | parallel_run_function.logging_level | 記錄層級名稱的字串,定義於 『logging』 中。 可能的值為 『WARNING』、『INFO』 和 『DEBUG』。 (選擇性,預設值為 'INFO'。)這個值可以透過 PipelineParameter 來設定。 |
ParallelRunConfig.run_invocation_timeout | parallel_run_function.retry_settings.timeout | 執行自定義 run() 函式的逾時以秒為單位。 如果執行時間高於此閾值,便會中止迷你批次,並標示為失敗的迷你批次以觸發重試。 |
ParallelRunConfig.run_max_try | parallel_run_function.retry_settings.max_retries | 迷你批次失敗或逾時時重試次數。 如果所有重試失敗,則迷你批次會標示為無法計算mini_batch_error_threshold計算。 |
ParallelRunConfig.append_row_file_name | parallel_run_function.append_row_to | 結合 append_row_to 設定。 |
ParallelRunConfig.allowed_failed_count | parallel_run_function.mini_batch_error_threshold | 此平行作業中可以忽略的失敗迷你批次數目。 如果失敗的迷你批次計數高於此臨界值,平行作業將會標示為失敗。 「-1」是預設數字,表示在平行作業期間忽略所有失敗的迷你批次。 |
ParallelRunConfig.allowed_failed_percent | parallel_run_function.task.program_arguments set --allowed_failed_percent |
類似於「allowed_failed_count」,但此設定會使用失敗的迷你批次百分比,而不是迷你批次失敗計數。 此設定的範圍是 [0, 100]。 “100” 是預設數位,表示忽略平行作業期間所有失敗的迷你批次。 |
ParallelRunConfig.partition_keys | 開發中。 | |
ParallelRunConfig.environment_variables | parallel_run_function.environment_variables | 環境變數名稱和值的字典。 這些環境變數會在執行使用者文本的程序上設定。 |
ParallelRunStep.name | parallel_run_function.name | 已建立之平行作業或元件的名稱。 |
ParallelRunStep.inputs | parallel_run_function.inputs | 這個平行使用的輸入聽寫。 |
-- | parallel_run_function.input_data | 宣告要以平行方式分割和處理的數據 |
ParallelRunStep.output | parallel_run_function.outputs | 這個平行作業的輸出。 |
ParallelRunStep.side_inputs | parallel_run_function.inputs | 與 inputs 一起定義。 |
ParallelRunStep.arguments | parallel_run_function.task.program_arguments | 平行工作的自變數。 |
ParallelRunStep.allow_reuse | parallel_run_function.is_deterministic | 指定平行處理是否會傳回相同輸入的相同輸出。 |
下一步
如需詳細資訊,請參閱這裡的檔: