共用方式為


將資料匯入 ML 管線步驟並在其中來回移動 (Python)

適用於:Python SDK azureml v1

本文提供程式碼,以用於 Azure Machine Learning 管線中的步驟之間匯入、轉換和移動資料。 如需 Azure Machine Learning 中的資料如何運作的概觀,請參閱在 Azure 儲存體服務中存取資料。 關於 Azure Machine Learning 管線的優點和結構,請參閱什麼是 Azure Machine Learning 管線?

本文章說明如何:

  • 針對預先存在的資料使用 Dataset 物件
  • 在步驟中存取資料
  • Dataset 資料分割成子集,例如定型和驗證子集
  • 建立 OutputFileDatasetConfig 物件將資料傳輸至下一個管線步驟
  • 使用 OutputFileDatasetConfig 物件作為管線步驟的輸入
  • 從您想要保存的 OutputFileDatasetConfig 建立新的 Dataset 物件

必要條件

您需要:

針對預先存在的資料使用 Dataset 物件

將資料內嵌至管線的最佳方式是使用 Dataset 物件。 Dataset 物件代表整個工作區可用的持續性資料。

建立和註冊 Dataset 物件有許多種方式。 表格式資料集適用於一個或多個檔案中可用的分隔資料。 檔案資料集適用於二進位資料 (例如影像),或您剖析的資料。 建立 Dataset 物件最簡單的程式設計方式,就是使用工作區儲存體或公用 URL 中現有的 blob:

datastore = Datastore.get(workspace, 'training_data')
iris_dataset = Dataset.Tabular.from_delimited_files(DataPath(datastore, 'iris.csv'))

datastore_path = [
    DataPath(datastore, 'animals/dog/1.jpg'),
    DataPath(datastore, 'animals/dog/2.jpg'),
    DataPath(datastore, 'animals/cat/*.jpg')
]
cats_dogs_dataset = Dataset.File.from_files(path=datastore_path)

關於其他選項,包括以不同選項和不同來源建立資料集、在 Azure Machine Learning UI 中註冊和檢閱資料集、了解資料大小與計算容量如何相互影響,以及資料集版本設定,請參閱建立 Azure Machine Learning 資料集

將資料集傳遞至指令碼

若要將資料集的路徑傳遞至您的指令碼,請使用 Dataset 物件的 as_named_input() 方法。 您可以將產生的 DatasetConsumptionConfig 物件當作引數傳遞至指令碼,或利用管線指令碼的 inputs 引數,以使用 Run.get_context().input_datasets[] 來擷取資料集。

建立具名輸入之後,您可以選擇其存取模式 (僅適用於 FileDataset): as_mount()as_download()。 如果指令碼處理資料集的所有檔案,且計算資源上的磁碟大小足夠容納資料集,則下載存取模式是較佳的選擇。 下載存取模式可避免執行時串流資料的額外負荷。 如果指令碼存取資料集的子集,或對計算而言太大,請使用掛接存取模式。 如需詳細資訊,請閱讀掛接與下載

若要將資料集傳遞至管線步驟:

  1. 使用 TabularDataset.as_named_input()FileDataset.as_named_input() (結尾沒有 ‘s') 來建立 DatasetConsumptionConfig 物件
  2. 僅限 FileDataset:。 使用 as_mount()as_download() 來設定存取模式。 TabularDataset 不會支援設定存取模式。
  3. 使用 argumentsinputs 引數將資料集傳遞至管線步驟

下列程式碼片段顯示使用 iris_dataset 在 PythonScriptStep 建構函式中結合這些步驟的常見模式 (TabularDataset):


train_step = PythonScriptStep(
    name="train_data",
    script_name="train.py",
    compute_target=cluster,
    inputs=[iris_dataset.as_named_input('iris')]
)

注意

您必須將所有這些引數 (也就是 "train_data""train.py"clusteriris_dataset) 的值換成您自己的資料。 上述程式碼片段只是顯示呼叫的形式,不是 Microsoft 範例的一部分。

您也可以使用 random_split()take_sample() 等方法來建立多個輸入,或減少傳遞至管線步驟的資料量:

seed = 42 # PRNG seed
smaller_dataset = iris_dataset.take_sample(0.1, seed=seed) # 10%
train, test = smaller_dataset.random_split(percentage=0.8, seed=seed)

train_step = PythonScriptStep(
    name="train_data",
    script_name="train.py",
    compute_target=cluster,
    inputs=[train.as_named_input('train'), test.as_named_input('test')]
)

在指令碼內存取資料集

管線步驟指令碼的具名輸入可作為 Run 物件內的字典。 使用 Run.get_context() 擷取作用中 Run 物件,然後使用 input_datasets 擷取具名輸入的字典。 如果您使用 arguments 引數而非 inputs 引數來傳遞物件 DatasetConsumptionConfig,請使用 ArgParser 程式碼來存取資料。 下列程式碼片段示範這兩種技巧:

管線定義指令碼

# Code for demonstration only: It would be very confusing to split datasets between `arguments` and `inputs`
train_step = PythonScriptStep(
    name="train_data",
    script_name="train.py",
    compute_target=cluster,
    # datasets passed as arguments
    arguments=['--training-folder', train.as_named_input('train').as_download()],
    # datasets passed as inputs
    inputs=[test.as_named_input('test').as_download()]
)

從 PythonScriptStep 參考的 train.py 指令碼

# In pipeline script
parser = argparse.ArgumentParser()
# Retreive the dataset passed as an argument
parser.add_argument('--training-folder', type=str, dest='train_folder', help='training data folder mounting point')
args = parser.parse_args()
training_data_folder = args.train_folder
# Retrieve the dataset passed as an input
testing_data_folder = Run.get_context().input_datasets['test']

傳遞的值是資料集檔案的路徑。

也可以直接存取已註冊的 Dataset。 由於註冊的資料集在整個工作區持續且共用,因此可以直接擷取:

run = Run.get_context()
ws = run.experiment.workspace
ds = Dataset.get_by_name(workspace=ws, name='mnist_opendataset')

注意

上述程式碼片段顯示呼叫的形式,不是 Microsoft 範例的一部分。 您必須將各種引數換成您自己專案的值。

針對中繼資料使用 OutputFileDatasetConfig

雖然 Dataset 物件只代表持續性資料,但 OutputFileDatasetConfig 物件可用於管線步驟的暫存資料輸出持續性輸出資料。 OutputFileDatasetConfig 支援將資料寫入 blob 儲存體、檔案共用、adlsgen1 或 adlsgen2。 支援掛接模式和上傳模式。 在掛接模式中,寫入掛接目錄的檔案在檔案關閉時永久儲存。 在上傳模式中,寫入輸出目錄的檔案在作業結束時上傳。 如果作業失敗或取消,則不會上傳輸出目錄。

OutputFileDatasetConfig 物件的預設行為是寫入工作區的預設資料存放區。 使用 arguments 參數將 OutputFileDatasetConfig 物件傳遞至 PythonScriptStep

from azureml.data import OutputFileDatasetConfig
dataprep_output = OutputFileDatasetConfig()
input_dataset = Dataset.get_by_name(workspace, 'raw_data')

dataprep_step = PythonScriptStep(
    name="prep_data",
    script_name="dataprep.py",
    compute_target=cluster,
    arguments=[input_dataset.as_named_input('raw_data').as_mount(), dataprep_output]
    )

注意

同時寫入 OutputFileDatasetConfig 會失敗。 請勿嘗試同時使用單一 OutputFileDatasetConfig。 請勿在多處理情況下共用單一 OutputFileDatasetConfig,例如使用分散式定型時。

使用 OutputFileDatasetConfig 作為定型步驟的輸出

在管線的 PythonScriptStep 中,您可以使用程式的引數來擷取可用的輸出路徑。 如果這是第一個步驟,而且這個步驟將初始化輸出資料,則必須在指定的路徑中建立目錄。 接著,您可以撰寫想要納入 OutputFileDatasetConfig 中的任何檔案。

parser = argparse.ArgumentParser()
parser.add_argument('--output_path', dest='output_path', required=True)
args = parser.parse_args()

# Make directory for file
os.makedirs(os.path.dirname(args.output_path), exist_ok=True)
with open(args.output_path, 'w') as f:
    f.write("Step 1's output")

讀取 OutputFileDatasetConfig 作為非初始步驟的輸入

在初始管線步驟將部分資料寫入 OutputFileDatasetConfig 路徑,且變成該初始步驟的輸出之後,就可以作為稍後步驟的輸入。

在下列程式碼中:

  • step1_output_data 表示 PythonScriptStep 的輸出 step1 在上傳存取模式中寫入 ADLS Gen 2 資料存放區 my_adlsgen2。 深入了解如何設定角色權限,以便將資料寫回 ADLS Gen 2 資料存放區。

  • step1 完成且輸出寫入 step1_output_data 指出的目的地之後,step2 就可以使用 step1_output_data 作為輸入。

# get adls gen 2 datastore already registered with the workspace
datastore = workspace.datastores['my_adlsgen2']
step1_output_data = OutputFileDatasetConfig(name="processed_data", destination=(datastore, "mypath/{run-id}/{output-name}")).as_upload()

step1 = PythonScriptStep(
    name="generate_data",
    script_name="step1.py",
    runconfig = aml_run_config,
    arguments = ["--output_path", step1_output_data]
)

step2 = PythonScriptStep(
    name="read_pipeline_data",
    script_name="step2.py",
    compute_target=compute,
    runconfig = aml_run_config,
    arguments = ["--pd", step1_output_data.as_input()]

)

pipeline = Pipeline(workspace=ws, steps=[step1, step2])

提示

讀取 Python 指令碼 step2.py 中的資料,與先前於在指令碼內存取資料集中所記載的相同; 使用 ArgumentParser 在指令碼中新增 --pd 的引數來存取資料。

註冊 OutputFileDatasetConfig 物件供重複使用

如果要在實驗期間過後仍可使用 OutputFileDatasetConfig,請註冊到工作區,以跨實驗共用及重複使用。

step1_output_ds = step1_output_data.register_on_complete(
    name='processed_data', 
    description = 'files from step1'
)

刪除不再需要的 OutputFileDatasetConfig 內容

Azure 不會自動刪除以 OutputFileDatasetConfig 撰寫的中繼資料。 為了避免大量不必要的資料產生儲存體費用,您應該執行下列其中一項:

警告

只有在資料上次變更日期的 30 天後才刪除中繼資料。 刪除先前的資料可能會導致管線執行失敗,因為管線會假設中繼資料在 30 天內會存在以供重複使用。

  • 在管線工作執行結束時,以程式設計方式刪除不再需要的中繼資料。
  • 針對中繼資料使用 blob 儲存體搭配短期儲存原則 (請參閱自動化 Azure Blob 儲存體存取層以最佳化成本)。 此原則只能設定為工作區的非預設資料存放區。 使用 OutputFileDatasetConfig 將中繼資料匯出至非預設值的另一個資料存放區。
    # Get adls gen 2 datastore already registered with the workspace
    datastore = workspace.datastores['my_adlsgen2']
    step1_output_data = OutputFileDatasetConfig(name="processed_data", destination=(datastore, "mypath/{run-id}/{output-name}")).as_upload()
    
  • 定期檢閱並刪除不再需要的資料。

如需詳細資訊,請參閱規劃和管理 Azure Machine Learning 的成本

下一步