パイプラインを SDK v2 にアップグレードする
SDK v2 では、"パイプライン" がジョブに統合されます。
ジョブには種類があります。 ほとんどのジョブは、command
(python main.py
など) を実行するコマンド ジョブです。 ジョブで実行されるものはどのプログラミング言語にも依存しないため、bash
スクリプトの実行、python
インタープリターの呼び出し、一連の curl
コマンドの実行などが行えます。
pipeline
は、もう 1 つの種類のジョブです。これは、入出力リレーションシップを持つ可能性のある子ジョブを定義し、有向非巡回グラフ (DAG) を形成します。
アップグレードするには、パイプラインを定義して SDK v2 に送信するためのコードを変更する必要があります。 子ジョブ "内で" 実行するものを SDK v2 にアップグレードする必要はありません。 ただし、Azure Machine Learning に固有のコードは、モデル トレーニング スクリプトから削除することをお勧めします。 この分離により、ローカルとクラウドの切り替えが容易になり、成熟した MLOps のベスト プラクティスと見なされます。 実際には、これは azureml.*
コード行の削除を意味します。 モデルのログと追跡コードは、MLflow に置き換える必要があります。 詳細については、v2 での MLflow の使用方法に関する記事をご覧ください。
この記事では、SDK v1 と SDK v2 のシナリオの比較を示します。 次の例では、ダミーのパイプライン ジョブ内に 3 つのステップ (トレーニング、スコア付け、評価) を作成します。 この例では、SDK v1 と SDK v2 を使用してパイプライン ジョブを構築する方法と、データを使用してステップ間でデータを転送する方法を示しています。
パイプラインの実行
SDK v1
# import required libraries import os import azureml.core from azureml.core import ( Workspace, Dataset, Datastore, ComputeTarget, Experiment, ScriptRunConfig, ) from azureml.pipeline.steps import PythonScriptStep from azureml.pipeline.core import Pipeline # check core SDK version number print("Azure Machine Learning SDK Version: ", azureml.core.VERSION) # load workspace workspace = Workspace.from_config() print( "Workspace name: " + workspace.name, "Azure region: " + workspace.location, "Subscription id: " + workspace.subscription_id, "Resource group: " + workspace.resource_group, sep="\n", ) # create an ML experiment experiment = Experiment(workspace=workspace, name="train_score_eval_pipeline") # create a directory script_folder = "./src" # create compute from azureml.core.compute import ComputeTarget, AmlCompute from azureml.core.compute_target import ComputeTargetException # Choose a name for your CPU cluster amlcompute_cluster_name = "cpu-cluster" # Verify that cluster does not exist already try: aml_compute = ComputeTarget(workspace=workspace, name=amlcompute_cluster_name) print('Found existing cluster, use it.') except ComputeTargetException: compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_DS12_V2', max_nodes=4) aml_compute = ComputeTarget.create(ws, amlcompute_cluster_name, compute_config) aml_compute.wait_for_completion(show_output=True) # define data set data_urls = ["wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv"] input_ds = Dataset.File.from_files(data_urls) # define steps in pipeline from azureml.data import OutputFileDatasetConfig model_output = OutputFileDatasetConfig('model_output') train_step = PythonScriptStep( name="train step", script_name="train.py", arguments=['--training_data', input_ds.as_named_input('training_data').as_mount() ,'--max_epocs', 5, '--learning_rate', 0.1,'--model_output', model_output], source_directory=script_folder, compute_target=aml_compute, allow_reuse=True, ) score_output = OutputFileDatasetConfig('score_output') score_step = PythonScriptStep( name="score step", script_name="score.py", arguments=['--model_input',model_output.as_input('model_input'), '--test_data', input_ds.as_named_input('test_data').as_mount(), '--score_output', score_output], source_directory=script_folder, compute_target=aml_compute, allow_reuse=True, ) eval_output = OutputFileDatasetConfig('eval_output') eval_step = PythonScriptStep( name="eval step", script_name="eval.py", arguments=['--scoring_result',score_output.as_input('scoring_result'), '--eval_output', eval_output], source_directory=script_folder, compute_target=aml_compute, allow_reuse=True, ) # built pipeline from azureml.pipeline.core import Pipeline pipeline_steps = [train_step, score_step, eval_step] pipeline = Pipeline(workspace = workspace, steps=pipeline_steps) print("Pipeline is built.") pipeline_run = experiment.submit(pipeline, regenerate_outputs=False) print("Pipeline submitted for execution.")
SDK v2。 完全なサンプルへのリンク
# import required libraries from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential from azure.ai.ml import MLClient, Input from azure.ai.ml.dsl import pipeline try: credential = DefaultAzureCredential() # Check if given credential can get token successfully. credential.get_token("https://management.azure.com/.default") except Exception as ex: # Fall back to InteractiveBrowserCredential in case DefaultAzureCredential not work credential = InteractiveBrowserCredential() # Get a handle to workspace ml_client = MLClient.from_config(credential=credential) # Retrieve an already attached Azure Machine Learning Compute. cluster_name = "cpu-cluster" print(ml_client.compute.get(cluster_name)) # Import components that are defined with Python function with open("src/components.py") as fin: print(fin.read()) # You need to install mldesigner package to use command_component decorator. # Option 1: install directly # !pip install mldesigner # Option 2: install as an extra dependency of azure-ai-ml # !pip install azure-ai-ml[designer] # import the components as functions from src.components import train_model, score_data, eval_model cluster_name = "cpu-cluster" # define a pipeline with component @pipeline(default_compute=cluster_name) def pipeline_with_python_function_components(input_data, test_data, learning_rate): """E2E dummy train-score-eval pipeline with components defined via Python function components""" # Call component obj as function: apply given inputs & parameters to create a node in pipeline train_with_sample_data = train_model( training_data=input_data, max_epochs=5, learning_rate=learning_rate ) score_with_sample_data = score_data( model_input=train_with_sample_data.outputs.model_output, test_data=test_data ) eval_with_sample_data = eval_model( scoring_result=score_with_sample_data.outputs.score_output ) # Return: pipeline outputs return { "eval_output": eval_with_sample_data.outputs.eval_output, "model_output": train_with_sample_data.outputs.model_output, } pipeline_job = pipeline_with_python_function_components( input_data=Input( path="wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv", type="uri_file" ), test_data=Input( path="wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv", type="uri_file" ), learning_rate=0.1, ) # submit job to workspace pipeline_job = ml_client.jobs.create_or_update( pipeline_job, experiment_name="train_score_eval_pipeline" )
SDK v1 と SDK v2 の主要機能のマッピング
SDK v1 の機能 | SDK v2 での大まかなマッピング |
---|---|
azureml.pipeline.core.Pipeline | azure.ai.ml.dsl.pipeline |
OutputDatasetConfig | 出力 |
dataset as_mount | 入力 |
StepSequence | データの依存関係 |
ステップとジョブ/コンポーネントの種類のマッピング
SDK v1 のステップ | SDK v2 のジョブの種類 | SDK v2 のコンポーネントの種類 |
---|---|---|
adla_step |
なし | なし |
automl_step |
automl ジョブ |
automl コンポーネント |
azurebatch_step |
なし | なし |
command_step |
command 仕事 |
command コンポーネント |
data_transfer_step |
なし | なし |
databricks_step |
なし | なし |
estimator_step |
command 仕事 |
command コンポーネント |
hyper_drive_step |
sweep 仕事 |
なし |
kusto_step |
なし | なし |
module_step |
なし | command コンポーネント |
mpi_step |
command 仕事 |
command コンポーネント |
parallel_run_step |
Parallel ジョブ |
Parallel コンポーネント |
python_script_step |
command 仕事 |
command コンポーネント |
r_script_step |
command 仕事 |
command コンポーネント |
synapse_spark_step |
spark 仕事 |
spark コンポーネント |
公開済みパイプライン
パイプラインを起動して実行すると、パイプラインを発行することができるため、さまざまな入力で実行されます。 これは公開されたパイプラインと呼ばれていました。 バッチ エンドポイントでは、持続的な API で実行されている複数のアセットを処理するための同様のより強力な方法が提案されています。そのため、公開されたパイプラインの機能は、バッチ エンドポイントのパイプライン コンポーネント デプロイに移動されました。
バッチ エンドポイントは、インターフェイス (エンドポイント) を実際の実装 (デプロイ) から切り離し、エンドポイントの既定の実装を提供するデプロイをユーザーが決定できるようにします。 バッチ エンドポイントでパイプライン コンポーネントのデプロイを使用すると、ユーザーはパイプラインではなくパイプライン コンポーネントをデプロイできるようになります。これにより、MLOps プラクティスを効率化しようとしている組織向けに、再利用可能なアセットをより有効に使用できます。
次の表は、各概念の比較を示しています。
概念 | SDK v1 | SDK v2 |
---|---|---|
呼び出し用のパイプラインの REST エンドポイント | パイプライン エンドポイント | バッチ エンドポイント |
エンドポイント下のパイプラインの特定のバージョン | 公開されたパイプライン | パイプライン コンポーネント デプロイ |
呼び出し時のパイプラインの引数 | パイプライン パラメーター | ジョブの入力 |
公開されたパイプラインから生成されるジョブ | パイプライン ジョブ | バッチ ジョブ |
バッチ エンドポイントへの移行方法に関する具体的なガイダンスについては、「パイプライン エンドポイントを SDK v2 にアップグレードする」を参照してください。
関連ドキュメント
詳しくは、こちらのドキュメントをご覧ください。