チュートリアル 3: 現行の再実体化を有効にしてバッチ推論を実行する
このチュートリアル シリーズでは、プロトタイプ作成、トレーニング、運用という機械学習ライフサイクルのすべてのフェーズを、特徴量によってシームレスに統合する方法について説明します。
最初のチュートリアルでは、カスタム変換を使用して特徴セットの仕様を作成し、その特徴セットを使用してトレーニング データを生成し、実体化を有効にし、バックフィルを実行する方法を示しました。 2 番目のチュートリアルでは、実体化を有効にし、バックフィルを実行する方法を示しました。 また、モデルのパフォーマンスを向上させる方法として、特徴を試す方法についても説明しました。
このチュートリアルでは、次の方法について説明します。
transactions
特徴量セットの反復する具体化を有効にします。- 登録済みモデルでバッチ推論パイプラインを実行します。
前提条件
このチュートリアルに進む前に、シリーズの最初と 2 番目のチュートリアルを完了してください。
設定
Azure Machine Learning Spark ノートブックを構成します。
このチュートリアルを実行するために、新しいノートブックを作成し、順を追って手順を実行できます。 次のような名前の既存のノートブックを開いて実行することもできます。3.Enable recurrent materialization and run batch inference。 そのノートブックと、このシリーズのすべてのノートブックは、"featurestore_sample/notebooks" ディレクトリにあります。 "sdk_only" または "sdk_and_cli" を選択できます。 このチュートリアルは開いたままにしておき、ドキュメントのリンクや追加の説明を参照してください。
上部のナビゲーションの コンピューティング ドロップダウン リストで、Azure Machine Learning サーバーレス Spark の サーバーレス Spark コンピューティング を選択します。
セッションを構成するには、以下を行います。
- 上部のステータス バーで [セッションの構成] を選びます。
- Python パッケージ タブを選択します。
- [Conda ファイルのアップロード] を選びます
- ローカル コンピューターから
azureml-examples/sdk/python/featurestore-sample/project/env/online.yml
ファイルを選択します。 - オプションで、前提条件が頻繁に再実行されないように、セッション タイムアウト (アイドル時間) を増やします。
Spark セッションを開始します。
# run this cell to start the spark session (any code block will start the session ). This can take around 10 mins. print("start spark session")
サンプルのルート ディレクトリを設定します。
import os # please update the dir to ./Users/<your_user_alias> (or any custom directory you uploaded the samples to). # You can find the name from the directory structure in the left nav root_dir = "./Users/<your_user_alias>/featurestore_sample" if os.path.isdir(root_dir): print("The folder exists.") else: print("The folder does not exist. Please create or fix the path")
CLI を設定します。
該当なし。
プロジェクト ワークスペースの CRUD (作成、読み取り、更新、および削除) クライアントを初期化します。
チュートリアル ノートブックは、この現在のワークスペースから実行されます。
### Initialize the MLClient of this project workspace import os from azure.ai.ml import MLClient from azure.ai.ml.identity import AzureMLOnBehalfOfCredential project_ws_sub_id = os.environ["AZUREML_ARM_SUBSCRIPTION"] project_ws_rg = os.environ["AZUREML_ARM_RESOURCEGROUP"] project_ws_name = os.environ["AZUREML_ARM_WORKSPACE_NAME"] # connect to the project workspace ws_client = MLClient( AzureMLOnBehalfOfCredential(), project_ws_sub_id, project_ws_rg, project_ws_name )
特徴量ストア変数を初期化します。
1 つめのチュートリアルで作成した内容を反映するように、
featurestore_name
の値を必ず更新してください。from azure.ai.ml import MLClient from azure.ai.ml.identity import AzureMLOnBehalfOfCredential # feature store featurestore_name = ( "<FEATURESTORE_NAME>" # use the same name from part #1 of the tutorial ) featurestore_subscription_id = os.environ["AZUREML_ARM_SUBSCRIPTION"] featurestore_resource_group_name = os.environ["AZUREML_ARM_RESOURCEGROUP"] # feature store ml client fs_client = MLClient( AzureMLOnBehalfOfCredential(), featurestore_subscription_id, featurestore_resource_group_name, featurestore_name, )
Feature Store SDK クライアントを初期化します。
# feature store client from azureml.featurestore import FeatureStoreClient from azure.ai.ml.identity import AzureMLOnBehalfOfCredential featurestore = FeatureStoreClient( credential=AzureMLOnBehalfOfCredential(), subscription_id=featurestore_subscription_id, resource_group_name=featurestore_resource_group_name, name=featurestore_name, )
トランザクション特徴量セットで反復する具体化を有効にする
2 つめのチュートリアルでは、具体化を有効にし、transactions
特徴量セットに対してバックフィルを実行しました。 バックフィルは、特徴量値を計算して具体化ストアに配置するオンデマンドの 1 回限りの操作です。
運用環境でモデルの推論を処理するために、反復する具体化ジョブを設定して、具体化ストアを最新の状態に保つことができます。 これらのジョブは、ユーザー定義のスケジュールで実行されます。 繰り返しジョブのスケジュールは次のように機能します。
間隔と頻度の値を指定して、時間枠を定義します。 たとえば、次の値は 3 時間の時間枠を定義します。
interval
=3
frequency
=Hour
最初の時間枠は、
RecurrenceTrigger
に定義されているstart_time
値で始まり、以降同様です。最初の繰り返しジョブは、更新時刻の後、次の時間枠の開始時に送信されます。
以降の反復するジョブは、最初のジョブの後、時間枠ごとに送信されます。
これまでのチュートリアルで説明したように、データが具体化されると (バックフィルまたは反復する具体化)、特徴量取得は、具体化されたデータを既定で使用します。
from datetime import datetime
from azure.ai.ml.entities import RecurrenceTrigger
transactions_fset_config = fs_client.feature_sets.get(name="transactions", version="1")
# create a schedule that runs the materialization job every 3 hours
transactions_fset_config.materialization_settings.schedule = RecurrenceTrigger(
interval=3, frequency="Hour", start_time=datetime(2023, 4, 15, 0, 4, 10, 0)
)
fs_poller = fs_client.feature_sets.begin_create_or_update(transactions_fset_config)
print(fs_poller.result())
(省略可能) 特徴量セット資産の YAML ファイルを保存する
更新された設定を使用して YAML ファイルを保存します。
## uncomment and run
# transactions_fset_config.dump(root_dir + "/featurestore/featuresets/transactions/featureset_asset_offline_enabled_with_schedule.yaml")
バッチ推論パイプラインを実行する
バッチ推論には、次の手順があります。
トレーニング パイプラインで使用したのと同じ組み込みの特徴量取得コンポーネント (3 つめのチュートリアルで説明済み) を使用します。 パイプライン トレーニングでは、コンポーネント入力として特徴量取得仕様を指定しました。 バッチ推論では、登録済みモデルを入力として渡します。 コンポーネントは、モデル アーティファクトで特徴量取得仕様を検索します。
さらに、トレーニングでは、観測データにターゲット変数が含まれました。 一方、バッチ推論の観測データにターゲット変数は含まれません。 特徴量取得手順では、観測データを特徴量と結合し、バッチ推論用のデータを出力します。
パイプラインは、前の手順のバッチ推論入力データを使用して、モデルで推論を実行し、予測値を出力として追加します。
Note
この例では、バッチ推論にジョブを使用しています。 Azure Machine Learning でバッチ エンドポイントを使用することもできます。
from azure.ai.ml import load_job # will be used later # set the batch inference pipeline path batch_inference_pipeline_path = ( root_dir + "/project/fraud_model/pipelines/batch_inference_pipeline.yaml" ) batch_inference_pipeline_definition = load_job(source=batch_inference_pipeline_path) # run the training pipeline batch_inference_pipeline_job = ws_client.jobs.create_or_update( batch_inference_pipeline_definition ) # stream the run logs ws_client.jobs.stream(batch_inference_pipeline_job.name)
バッチ推論用の出力データを検査する
パイプライン ビューで次のようにします。
outputs
カードでinference_step
を選択します。Data
フィールド値をコピーします。 これは、azureml_995abbc2-3171-461e-8214-c3c5d17ede83_output_data_data_with_prediction:1
のように表示されます。Data
フィールド値を、名前とバージョン値を分けて、次のセルに貼り付けます。 前にコロン (:
) が付いた最後の文字がバージョンです。バッチ推論パイプラインによって生成された
predict_is_fraud
列を書き留めます。inference_step
のoutputs
にname
またはversion
の値を指定しなかったため、バッチ推論パイプライン ("/project/fraud_mode/pipelines/batch_inference_pipeline.yaml") 出力に、GUID を名前の値、バージョンを1
にして、追跡されないデータ資産をシステムが作成しました。 このセルに、資産からデータ パスを派生させて表示します。inf_data_output = ws_client.data.get( name="azureml_1c106662-aa5e-4354-b5f9-57c1b0fdb3a7_output_data_data_with_prediction", version="1", ) inf_output_df = spark.read.parquet(inf_data_output.path + "data/*.parquet") display(inf_output_df.head(5))
クリーンアップ
このシリーズの 5 番目のチュートリアルでは、リソースを削除する方法について説明します。
次のステップ
- 特徴量ストアの概念およびマネージド Feature Store の最上位エンティティの詳細。
- マネージド Feature Store の ID とアクセス制御に関する記事をご覧ください。
- マネージド Feature Store のトラブルシューティング ガイドを参照してください。
- YAML リファレンスを参照してください。