並列実行ステップを SDK v2 にアップグレードする
SDK v2 では、"並列実行ステップ" は parallel job
としてジョブの概念に統合されます。 ユーザーは、並列ジョブを使うことで、同じターゲットを維持し、繰り返されるタスクを強力なマルチノード コンピューティング クラスターに分散させて、ジョブの実行を高速化できます。 並列実行ステップの上に、v2 並列ジョブには追加のベネフィットがあります:
- 柔軟なインターフェイス。これにより、ユーザーは並列ジョブの複数のカスタム入力と出力を定義できます。 他のステップに接続して、エントリ スクリプトでコンテンツを使用または管理できます
- v2
data asset
の概念を使用して入力としてDataset
を置き換える入力スキーマを簡略化します。 ローカル ファイルまたは BLOB ディレクトリ URI を並列ジョブへの入力として簡単に使用できます。 - より強力なフィーチャーは、v2 並列ジョブでのみ開発されています。 たとえば、失敗または取り消された並列ジョブを再開し、成功した結果を再利用して重複する作業量を節約することで、失敗または未処理のミニバッチのプロセス処理を続行します。
現在の sdk v1 並列実行ステップを v2 にアップグレードするには、次の手順を実行する必要があります
- v1 の
ParallelRunConfig
とParallelRunStep
に置き換えて並列ジョブを作成するためparallel_run_function
を使用します。 - v1 パイプラインを v2 にアップグレードする 次に、v2 パイプラインのステップとして v2 並列ジョブを呼び出します。 パイプラインのアップグレードの詳細については、パイプラインを v1 から v2 に移行する方法に関する記事を参照してください。
注: ユーザー 入力スクリプト は、v1 並列実行ステップと v2 並列ジョブの間で互換性があります。 そのため、並列実行ジョブをアップグレードするときに、同じ entry_script.py を使用し続けることができます。
この記事では、SDK v1 と SDK v2 のシナリオの比較を示します。 次の例では、パイプライン ジョブ内の入力データを予測する並列ジョブをビルドします。 並列ジョブをビルドする方法と、SDK v1 と SDK v2 の両方のパイプライン ジョブでそれを使用する方法について説明します。
前提条件
- SDK v2 環境を準備する: Azure Machine Learning SDK v2 for Python をインストールする
- SDK v2 パイプラインの基礎を理解する: Python SDK v2 を使用して Azure Machine Learning パイプラインを作成する方法
並列ステップの作成
SDK v1
# Create the configuration to wrap the inference script from azureml.pipeline.steps import ParallelRunStep, ParallelRunConfig parallel_run_config = ParallelRunConfig( source_directory=scripts_folder, entry_script=script_file, mini_batch_size=PipelineParameter(name="batch_size_param", default_value="5"), error_threshold=10, output_action="append_row", append_row_file_name="mnist_outputs.txt", environment=batch_env, compute_target=compute_target, process_count_per_node=PipelineParameter(name="process_count_param", default_value=2), node_count=2 ) # Create the Parallel run step parallelrun_step = ParallelRunStep( name="predict-digits-mnist", parallel_run_config=parallel_run_config, inputs=[ input_mnist_ds_consumption ], output=output_dir, allow_reuse=False )
SDK v2
# parallel job to process file data file_batch_inference = parallel_run_function( name="file_batch_score", display_name="Batch Score with File Dataset", description="parallel component for batch score", inputs=dict( job_data_path=Input( type=AssetTypes.MLTABLE, description="The data to be split and scored in parallel", ) ), outputs=dict(job_output_path=Output(type=AssetTypes.MLTABLE)), input_data="${{inputs.job_data_path}}", instance_count=2, mini_batch_size="1", mini_batch_error_threshold=1, max_concurrency_per_instance=1, task=RunFunction( code="./src", entry_script="file_batch_inference.py", program_arguments="--job_output_path ${{outputs.job_output_path}}", environment="azureml:AzureML-sklearn-0.24-ubuntu18.04-py37-cpu:1", ), )
パイプラインで並列ステップを使用する
SDK v1
# Run pipeline with parallel run step from azureml.core import Experiment pipeline = Pipeline(workspace=ws, steps=[parallelrun_step]) experiment = Experiment(ws, 'digit_identification') pipeline_run = experiment.submit(pipeline) pipeline_run.wait_for_completion(show_output=True)
SDK v2
@pipeline() def parallel_in_pipeline(pipeline_job_data_path, pipeline_score_model): prepare_file_tabular_data = prepare_data(input_data=pipeline_job_data_path) # output of file & tabular data should be type MLTable prepare_file_tabular_data.outputs.file_output_data.type = AssetTypes.MLTABLE prepare_file_tabular_data.outputs.tabular_output_data.type = AssetTypes.MLTABLE batch_inference_with_file_data = file_batch_inference( job_data_path=prepare_file_tabular_data.outputs.file_output_data ) # use eval_mount mode to handle file data batch_inference_with_file_data.inputs.job_data_path.mode = ( InputOutputModes.EVAL_MOUNT ) batch_inference_with_file_data.outputs.job_output_path.type = AssetTypes.MLTABLE batch_inference_with_tabular_data = tabular_batch_inference( job_data_path=prepare_file_tabular_data.outputs.tabular_output_data, score_model=pipeline_score_model, ) # use direct mode to handle tabular data batch_inference_with_tabular_data.inputs.job_data_path.mode = ( InputOutputModes.DIRECT ) return { "pipeline_job_out_file": batch_inference_with_file_data.outputs.job_output_path, "pipeline_job_out_tabular": batch_inference_with_tabular_data.outputs.job_output_path, } pipeline_job_data_path = Input( path="./dataset/", type=AssetTypes.MLTABLE, mode=InputOutputModes.RO_MOUNT ) pipeline_score_model = Input( path="./model/", type=AssetTypes.URI_FOLDER, mode=InputOutputModes.DOWNLOAD ) # create a pipeline pipeline_job = parallel_in_pipeline( pipeline_job_data_path=pipeline_job_data_path, pipeline_score_model=pipeline_score_model, ) pipeline_job.outputs.pipeline_job_out_tabular.type = AssetTypes.URI_FILE # set pipeline level compute pipeline_job.settings.default_compute = "cpu-cluster" # run pipeline job pipeline_job = ml_client.jobs.create_or_update( pipeline_job, experiment_name="pipeline_samples" )
SDK v1 と SDK v2 の主要機能のマッピング
SDK v1 の機能 | SDK v2 での大まかなマッピング |
---|---|
azureml.pipeline.steps.parallelrunconfig azureml.pipeline.steps.parallelrunstep |
azure.ai.ml.parallel |
OutputDatasetConfig | 出力 |
dataset as_mount | 入力 |
並列ジョブの構成と設定のマッピング
SDK v1 | SDK v2 | 説明 |
---|---|---|
ParallelRunConfig.environment | parallel_run_function.task.environment | トレーニング ジョブが実行される環境。 |
ParallelRunConfig.entry_script | parallel_run_function.task.entry_script | 複数のノードで並列に実行されるユーザー スクリプト。 |
ParallelRunConfig.error_threshold | parallel_run_function.error_threshold | この並列ジョブで無視できる失敗したミニバッチの数。 失敗したミニバッチの数がこのしきい値を超えた場合、並列ジョブは失敗としてマークされます。 既定値は "-1" で、並列ジョブの間に失敗したすべてのミニバッチを無視することを意味します。 |
ParallelRunConfig.output_action | parallel_run_function.append_row_to | ミニバッチの各実行から戻されたすべての値を集約して、このファイルに出力します。 ${{outputs.<output_name>}} というを使って、並列ジョブの出力の 1 つを参照できます |
ParallelRunConfig.node_count | parallel_run_function.instance_count | コンピューティング 先で使用されるインスタンスまたはノードの数 (省略可能)。 既定値は 1 です。 |
ParallelRunConfig.process_count_per_node | parallel_run_function.max_concurrency_per_instance | 各コンピューティング インスタンスが持つ最大並列処理。 |
ParallelRunConfig.mini_batch_size | parallel_run_function.mini_batch_size | 入力を分割する各ミニバッチのサイズを定義します。 input_data がフォルダーまたは一連のファイルの場合、この数値は各ミニバッチの ファイル数 を定義します。 たとえば、10、100 などです。 input_dataが mltable からの表形式のデータ の場合、この数値は各ミニバッチの近接物理サイズを定義します。 既定の単位は Byte で、値は 100 kb、100 mb などの文字列を受け取ります。 |
ParallelRunConfig.source_directory | parallel_run_function.task.code | ソース コードを指すローカルパスまたはリモート パス。 |
ParallelRunConfig.description | parallel_run_function.description | 並行プログラムのわかりやすい説明です |
ParallelRunConfig.logging_level | parallel_run_function.logging_level | ログ レベル名の文字列。'logging' で定義されます。 指定可能な値は、'WARNING'、'INFO'、'DEBUG' です。 (省略可能。既定値は 'INFO' です。)この値は PipelineParameter を使用して設定できます。 |
ParallelRunConfig.run_invocation_timeout | parallel_run_function.retry_settings.timeout | カスタム run() 関数の実行での秒単位のタイムアウト。 実行時間がこのしきい値を超えた場合、そのミニバッチは中止され、失敗とマークされて再試行がトリガーされます。 |
ParallelRunConfig.run_max_try | parallel_run_function.retry_settings.max_retries | ミニバッチが失敗またはタイムアウトしたときの再試行回数。 すべての再試行が失敗した場合、そのミニバッチは失敗とマークされて mini_batch_error_しきい値 の計算でカウントされます。 |
ParallelRunConfig.append_row_file_name | parallel_run_function.append_row_to | append_row_to 設定と 組み合わせる。 |
ParallelRunConfig.allowed_failed_count | parallel_run_function.mini_batch_error_threshold | この並列ジョブで無視できる失敗したミニバッチの数。 失敗したミニバッチの数がこのしきい値を超えた場合、並列ジョブは失敗としてマークされます。 既定値は "-1" で、並列ジョブの間に失敗したすべてのミニバッチを無視することを意味します。 |
ParallelRunConfig.allowed_failed_percent | parallel_run_function.task.program_arguments セット --allowed_failed_percent |
"allowed_failed_count"に似ているが、この設定はミニバッチの失敗アカウントの代わりに失敗したミニバッチのパーセントを使用します。 この設定の範囲は [0, 100] です。 既定値は "100" で、並列ジョブの間に失敗したすべてのミニバッチを無視することを意味します。 |
ParallelRunConfig.partition_keys | 開発中。 | |
ParallelRunConfig.environment_variables | parallel_run_function.environment_variables | 環境変数の名前と値のディクショナリ。 これらの環境変数は、ユーザー スクリプトが実行されるプロセスで設定されます。 |
ParallelRunStep.name | parallel_run_function.name | 作成された並列ジョブまたはコンポーネントの名前。 |
ParallelRunStep.inputs | parallel_run_function.inputs | この並列で使用される入力のディクテーション。 |
-- | parallel_run_function.input_data | 並列で分割および処理するデータを宣言する |
ParallelRunStep.output | parallel_run_function.outputs | この並列ジョブの出力。 |
ParallelRunStep.side_inputs | parallel_run_function.inputs | inputs と共に定義されます。 |
ParallelRunStep.arguments | parallel_run_function.task.program_arguments | 並列タスクの引数。 |
ParallelRunStep.allow_reuse | parallel_run_function.is_deterministic | 並列が同じ入力を指定して同じ出力を返すかどうかを指定します。 |
次のステップ
詳しくは、こちらのドキュメントをご覧ください。