Udostępnij za pośrednictwem


Interakcyjne rozmieszczanie danych za pomocą platformy Apache Spark w usłudze Azure Machine Learning

Uzdatnianie danych staje się jednym z najważniejszych aspektów projektów uczenia maszynowego. Integracja integracji usługi Azure Machine Learning z usługą Azure Synapse Analytics zapewnia dostęp do puli platformy Apache Spark — wspieranej przez usługę Azure Synapse — na potrzeby interakcyjnego uzdatniania danych korzystającego z notesów usługi Azure Machine Learning.

Z tego artykułu dowiesz się, jak obsługiwać uzdatnianie danych przy użyciu

  • Bezserwerowe obliczenia platformy Spark
  • Dołączona pula platformy Synapse Spark

Wymagania wstępne

Przed rozpoczęciem zadań uzdatniania danych dowiedz się więcej o procesie przechowywania wpisów tajnych

  • Klucz dostępu do konta usługi Azure Blob Storage
  • Token sygnatury dostępu współdzielonego (SAS)
  • Informacje o jednostce usługi Azure Data Lake Storage (ADLS) Gen 2

w usłudze Azure Key Vault. Musisz również wiedzieć, jak obsługiwać przypisania ról na kontach usługi Azure Storage. W poniższych sekcjach w tym dokumencie opisano te pojęcia. Następnie poznamy szczegóły interakcyjnego uzdatniania danych przy użyciu pul platformy Spark w notesach usługi Azure Machine Learning.

Napiwek

Aby dowiedzieć się więcej o konfiguracji przypisywania ról konta usługi Azure Storage lub jeśli uzyskujesz dostęp do danych na kontach magazynu przy użyciu przekazywania tożsamości użytkownika, odwiedź stronę Dodawanie przypisań ról na kontach usługi Azure Storage, aby uzyskać więcej informacji.

Interakcyjne rozmieszczanie danych za pomocą platformy Apache Spark

W przypadku interakcyjnego uzdatniania danych za pomocą platformy Apache Spark w notesach usługi Azure Machine Learning usługa Azure Machine Learning oferuje bezserwerowe obliczenia spark i dołączoną pulę platformy Synapse Spark. Przetwarzanie bezserwerowe platformy Spark nie wymaga tworzenia zasobów w obszarze roboczym usługi Azure Synapse. Zamiast tego w pełni zarządzane bezserwerowe zasoby obliczeniowe platformy Spark stają się dostępne bezpośrednio w notesach usługi Azure Machine Learning. Korzystanie z bezserwerowych obliczeń platformy Spark jest najprostszym sposobem uzyskiwania dostępu do klastra Spark w usłudze Azure Machine Learning.

Bezserwerowe zasoby obliczeniowe platformy Spark w notesach usługi Azure Machine Learning

Bezserwerowe zasoby obliczeniowe platformy Spark są domyślnie dostępne w notesach usługi Azure Machine Learning. Aby uzyskać dostęp do niego w notesie, wybierz pozycję Bezserwerowe obliczenia Spark w obszarze Azure Machine Learning Serverless Spark z menu wyboru Obliczenia .

Interfejs użytkownika notesów udostępnia również opcje konfiguracji sesji platformy Spark dla bezserwerowych obliczeń platformy Spark. Aby skonfigurować sesję platformy Spark:

  1. Wybierz pozycję Konfiguruj sesję w górnej części ekranu.
  2. Wybierz pozycję Wersja platformy Apache Spark z menu rozwijanego.

    Ważne

    Środowisko uruchomieniowe usługi Azure Synapse dla platformy Apache Spark: anonsy

    • Środowisko uruchomieniowe usługi Azure Synapse dla platformy Apache Spark 3.2:
      • Data ogłoszenia EOLA: 8 lipca 2023 r.
      • Data zakończenia wsparcia: 8 lipca 2024 r. Po tej dacie środowisko uruchomieniowe zostanie wyłączone.
    • Apache Spark 3.3:
      • Data ogłoszenia EOLA: 12 lipca 2024 r.
      • Data zakończenia wsparcia technicznego: 31 marca 2025 r. Po tej dacie środowisko uruchomieniowe zostanie wyłączone.
    • Aby zapewnić ciągłą obsługę i optymalną wydajność, zalecamy migrację do platformy Apache Spark 3.4
  3. Wybierz pozycję Typ wystąpienia z menu rozwijanego. Te typy są obecnie obsługiwane:
    • Standard_E4s_v3
    • Standard_E8s_v3
    • Standard_E16s_v3
    • Standard_E32s_v3
    • Standard_E64s_v3
  4. Wprowadź wartość limitu czasu sesji platformy Spark w minutach.
  5. Wybierz, czy chcesz dynamicznie przydzielać funkcje wykonawcze
  6. Wybierz liczbę funkcji wykonawczych dla sesji platformy Spark.
  7. Wybierz pozycję Rozmiar funkcji wykonawczej z menu rozwijanego.
  8. Wybierz pozycję Rozmiar sterownika z menu rozwijanego.
  9. Aby skonfigurować sesję platformy Spark przy użyciu pliku Conda, zaznacz pole wyboru Przekaż plik conda. Następnie wybierz pozycję Przeglądaj i wybierz plik Conda z odpowiednią konfiguracją sesji platformy Spark.
  10. Dodaj właściwości ustawień konfiguracji, wartości wejściowe w polach tekstowych Właściwość i Wartość , a następnie wybierz pozycję Dodaj.
  11. Wybierz Zastosuj.
  12. W oknie podręcznym Konfiguruj nową sesję? wybierz pozycję Zatrzymaj sesję.

Zmiany konfiguracji sesji są utrwalane i stają się dostępne dla innej sesji notesu, która została uruchomiona przy użyciu bezserwerowych obliczeń platformy Spark.

Napiwek

Jeśli używasz pakietów Conda na poziomie sesji, możesz poprawić czas zimnego rozpoczęcia sesji platformy Spark, jeśli ustawisz zmienną spark.hadoop.aml.enable_cache konfiguracji na true. Zimny początek sesji z pakietami Conda na poziomie sesji zwykle trwa od 10 do 15 minut, gdy sesja rozpoczyna się po raz pierwszy. Jednak kolejne zimne rozpoczęcie sesji ze zmienną konfiguracji ustawioną na wartość true zwykle trwa od trzech do pięciu minut.

Importowanie i rozmieszczanie danych z usługi Azure Data Lake Storage (ADLS) Gen 2

Dostęp do danych przechowywanych na kontach magazynu usługi Azure Data Lake Storage (ADLS) Gen 2 można uzyskiwać i rozmieszczać przy użyciu abfss:// identyfikatorów URI danych. W tym celu należy postępować zgodnie z jednym z dwóch mechanizmów dostępu do danych:

  • Przekazywanie tożsamości użytkownika
  • Dostęp do danych opartych na jednostce usługi

Napiwek

Przetwarzanie danych za pomocą bezserwerowego przetwarzania platformy Spark i przekazywanie tożsamości użytkownika w celu uzyskania dostępu do danych na koncie magazynu usługi Azure Data Lake Storage (ADLS) Gen 2 wymaga najmniejszej liczby kroków konfiguracji.

Aby rozpocząć interakcyjne uzdatnianie danych za pomocą przekazywania tożsamości użytkownika:

  • Sprawdź, czy tożsamość użytkownika ma przypisanie roli Współautor i Współautor danych obiektu blob usługi Storage na koncie magazynu usługi Azure Data Lake Storage (ADLS) Gen 2.

  • Aby użyć bezserwerowych obliczeń platformy Spark, wybierz pozycję Bezserwerowe obliczenia platformy Spark w obszarze Bezserwerowa platforma Spark w obszarze Azure Machine Learning Serverless Spark z menu wyboru Obliczenia .

  • Aby użyć dołączonej puli platformy Synapse Spark, wybierz dołączoną pulę platformy Synapse Spark w obszarze Pule platformy Synapse Spark z menu wyboru Obliczenia .

  • Ten przykładowy kod uzdatniania danych Titanic pokazuje użycie identyfikatora URI danych w formacie abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/<PATH_TO_DATA> i pyspark.pandas pyspark.ml.feature.Imputer.

    import pyspark.pandas as pd
    from pyspark.ml.feature import Imputer
    
    df = pd.read_csv(
        "abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/data/titanic.csv",
        index_col="PassengerId",
    )
    imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy(
        "mean"
    )  # Replace missing values in Age column with the mean value
    df.fillna(
        value={"Cabin": "None"}, inplace=True
    )  # Fill Cabin column with value "None" if missing
    df.dropna(inplace=True)  # Drop the rows which still have any missing value
    df.to_csv(
        "abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/data/wrangled",
        index_col="PassengerId",
    )
    

    Uwaga

    Ten przykładowy kod w języku Python używa metody pyspark.pandas. Obsługuje to tylko środowisko uruchomieniowe platformy Spark w wersji 3.2 lub nowszej.

Aby wrangle danych przez dostęp za pośrednictwem jednostki usługi:

  1. Sprawdź, czy jednostka usługi ma przypisanie roli Współautor i Współautor danych obiektu blob usługi Storage na koncie magazynu usługi Azure Data Lake Storage (ADLS) Gen 2.

  2. Utwórz wpisy tajne usługi Azure Key Vault dla identyfikatora dzierżawy jednostki usługi, identyfikatora klienta i wartości wpisów tajnych klienta.

  3. W menu wyboru Obliczenia wybierz pozycję Przetwarzanie bezserwerowe Spark w obszarze Azure Machine Learning Serverless Spark. Możesz również wybrać dołączoną pulę platformy Synapse Spark w obszarze Pule platformy Synapse Spark z menu wyboru Obliczenia .

  4. Ustaw identyfikator dzierżawy jednostki usługi, identyfikator klienta i wartości wpisów tajnych klienta w konfiguracji i wykonaj poniższy przykładowy kod.

    • Wywołanie get_secret() w kodzie zależy od nazwy usługi Azure Key Vault oraz nazw wpisów tajnych usługi Azure Key Vault utworzonych dla identyfikatora dzierżawy jednostki usługi, identyfikatora klienta i klucza tajnego klienta. Ustaw odpowiednią nazwę/wartości właściwości w konfiguracji:

      • Właściwość Identyfikator klienta: fs.azure.account.oauth2.client.id.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net
      • Właściwość wpisu tajnego klienta: fs.azure.account.oauth2.client.secret.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net
      • Właściwość identyfikatora dzierżawy: fs.azure.account.oauth2.client.endpoint.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net
      • Wartość identyfikatora dzierżawy: https://login.microsoftonline.com/<TENANT_ID>/oauth2/token
      from pyspark.sql import SparkSession
      
      sc = SparkSession.builder.getOrCreate()
      token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
      
      # Set up service principal tenant ID, client ID and secret from Azure Key Vault
      client_id = token_library.getSecret("<KEY_VAULT_NAME>", "<CLIENT_ID_SECRET_NAME>")
      tenant_id = token_library.getSecret("<KEY_VAULT_NAME>", "<TENANT_ID_SECRET_NAME>")
      client_secret = token_library.getSecret("<KEY_VAULT_NAME>", "<CLIENT_SECRET_NAME>")
      
      # Set up service principal which has access of the data
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.auth.type.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net", "OAuth"
      )
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.oauth.provider.type.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net",
          "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
      )
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.oauth2.client.id.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net",
          client_id,
      )
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.oauth2.client.secret.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net",
          client_secret,
      )
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.oauth2.client.endpoint.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net",
          "https://login.microsoftonline.com/" + tenant_id + "/oauth2/token",
      )
      
  5. Za pomocą danych Titanic zaimportuj i wrangle dane przy użyciu identyfikatora URI danych w abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/<PATH_TO_DATA> formacie, jak pokazano w przykładzie kodu.

Importowanie i rozmieszczanie danych z usługi Azure Blob Storage

Dostęp do danych usługi Azure Blob Storage można uzyskać przy użyciu klucza dostępu konta magazynu lub tokenu sygnatury dostępu współdzielonego (SAS). Te poświadczenia należy przechowywać w usłudze Azure Key Vault jako wpis tajny i ustawiać je jako właściwości w konfiguracji sesji.

Aby rozpocząć interakcyjne uzdatnianie danych:

  1. W lewym panelu usługi Azure Machine Learning Studio wybierz pozycję Notesy.

  2. W menu wyboru Obliczenia wybierz pozycję Przetwarzanie bezserwerowe Spark w obszarze Azure Machine Learning Serverless Spark. Możesz również wybrać dołączoną pulę platformy Synapse Spark w obszarze Pule platformy Synapse Spark z menu wyboru Obliczenia .

  3. Aby skonfigurować klucz dostępu do konta magazynu lub token sygnatury dostępu współdzielonego (SAS) na potrzeby dostępu do danych w notesach usługi Azure Machine Learning:

    • Dla klucza dostępu ustaw fs.azure.account.key.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net właściwość , jak pokazano w tym fragmencie kodu:

      from pyspark.sql import SparkSession
      
      sc = SparkSession.builder.getOrCreate()
      token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
      access_key = token_library.getSecret("<KEY_VAULT_NAME>", "<ACCESS_KEY_SECRET_NAME>")
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.key.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net", access_key
      )
      
    • Dla tokenu fs.azure.sas.<BLOB_CONTAINER_NAME>.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net SAS ustaw właściwość, jak pokazano w tym fragmencie kodu:

      from pyspark.sql import SparkSession
      
      sc = SparkSession.builder.getOrCreate()
      token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
      sas_token = token_library.getSecret("<KEY_VAULT_NAME>", "<SAS_TOKEN_SECRET_NAME>")
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.sas.<BLOB_CONTAINER_NAME>.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net",
          sas_token,
      )
      

      Uwaga

      Wywołania get_secret() we wcześniejszych fragmentach kodu wymagają nazwy usługi Azure Key Vault oraz nazw wpisów tajnych utworzonych dla klucza dostępu konta usługi Azure Blob Storage lub tokenu SAS.

  4. Wykonaj kod uzdatniania danych w tym samym notesie. Sformatuj identyfikator URI danych jako wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.windows.net/<PATH_TO_DATA>, podobnie jak pokazano w tym fragmencie kodu:

    import pyspark.pandas as pd
    from pyspark.ml.feature import Imputer
    
    df = pd.read_csv(
        "wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.windows.net/data/titanic.csv",
        index_col="PassengerId",
    )
    imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy(
        "mean"
    )  # Replace missing values in Age column with the mean value
    df.fillna(
        value={"Cabin": "None"}, inplace=True
    )  # Fill Cabin column with value "None" if missing
    df.dropna(inplace=True)  # Drop the rows which still have any missing value
    df.to_csv(
        "wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.windows.net/data/wrangled",
        index_col="PassengerId",
    )
    

    Uwaga

    Ten przykładowy kod w języku Python używa metody pyspark.pandas. Obsługuje to tylko środowisko uruchomieniowe platformy Spark w wersji 3.2 lub nowszej.

Importowanie i rozmieszczanie danych z magazynu danych usługi Azure Machine Learning

Aby uzyskać dostęp do danych z magazynu danych usługi Azure Machine Learning, zdefiniuj ścieżkę do danych w magazynie danych przy użyciu formatu azureml://datastores/<DATASTORE_NAME>/paths/<PATH_TO_DATA>URI . Aby interaktywnie rozmieścić dane z magazynu danych usługi Azure Machine Learning w sesji notesów:

  1. Wybierz pozycję Bezserwerowe zasoby obliczeniowe platformy Spark w obszarze Bezserwerowa platforma Spark w usłudze Azure Machine Learning z menu wyboru Obliczenia lub wybierz dołączoną pulę platformy Synapse Spark w obszarze Pule platformy Synapse Spark z menu wyboru Obliczenia .

  2. W tym przykładzie kodu pokazano, jak odczytywać i wrangle danych Titanic z magazynu danych usługi Azure Machine Learning przy użyciu azureml:// identyfikatora URI magazynu danych, pyspark.pandasi pyspark.ml.feature.Imputer.

    import pyspark.pandas as pd
    from pyspark.ml.feature import Imputer
    
    df = pd.read_csv(
        "azureml://datastores/workspaceblobstore/paths/data/titanic.csv",
        index_col="PassengerId",
    )
    imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy(
        "mean"
    )  # Replace missing values in Age column with the mean value
    df.fillna(
        value={"Cabin": "None"}, inplace=True
    )  # Fill Cabin column with value "None" if missing
    df.dropna(inplace=True)  # Drop the rows which still have any missing value
    df.to_csv(
        "azureml://datastores/workspaceblobstore/paths/data/wrangled",
        index_col="PassengerId",
    )
    

    Uwaga

    Ten przykładowy kod w języku Python używa metody pyspark.pandas. Obsługuje to tylko środowisko uruchomieniowe platformy Spark w wersji 3.2 lub nowszej.

Magazyny danych usługi Azure Machine Learning mogą uzyskiwać dostęp do danych przy użyciu poświadczeń konta usługi Azure Storage

  • klucz dostępu
  • Token SAS
  • jednostka usługi

lub używają dostępu do danych bez poświadczeń. W zależności od typu magazynu danych i bazowego typu konta usługi Azure Storage wybierz odpowiedni mechanizm uwierzytelniania, aby zapewnić dostęp do danych. Ta tabela zawiera podsumowanie mechanizmów uwierzytelniania w celu uzyskania dostępu do danych w magazynach danych usługi Azure Machine Learning:

Storage account type Dostęp do danych bez poświadczeń Mechanizm dostępu do danych Przypisania ról
Azure Blob Nie. Klucz dostępu lub token SAS Brak wymaganych przypisań ról
Azure Blob Tak Przekazywanie tożsamości użytkownika* Tożsamość użytkownika powinna mieć odpowiednie przypisania ról na koncie usługi Azure Blob Storage
Azure Data Lake Storage (ADLS) Gen 2 Nie. Jednostka usługi Jednostka usługi powinna mieć odpowiednie przypisania ról na koncie magazynu usługi Azure Data Lake Storage (ADLS) Gen 2
Azure Data Lake Storage (ADLS) Gen 2 Tak Przekazywanie tożsamości użytkownika Tożsamość użytkownika powinna mieć odpowiednie przypisania ról na koncie magazynu usługi Azure Data Lake Storage (ADLS) Gen 2

* Przekazywanie tożsamości użytkownika działa w przypadku magazynów danych bez poświadczeń wskazujących konta usługi Azure Blob Storage tylko wtedy, gdy nie włączono usuwania nietrwałego.

Uzyskiwanie dostępu do danych w domyślnym udziale plików

Domyślny udział plików jest instalowany zarówno do bezserwerowych obliczeń platformy Spark, jak i dołączonych pul platformy Synapse Spark.

Zrzut ekranu przedstawiający użycie udziału plików.

W usłudze Azure Machine Learning Studio pliki w domyślnym udziale plików są wyświetlane w drzewie katalogów na karcie Pliki . Kod notesu może uzyskiwać bezpośredni dostęp do plików przechowywanych w tym udziale plików wraz ze ścieżką file:// bezwzględną pliku bez większej liczby konfiguracji. Ten fragment kodu pokazuje, jak uzyskać dostęp do pliku przechowywanego w domyślnym udziale plików:

import os
import pyspark.pandas as pd
from pyspark.ml.feature import Imputer

abspath = os.path.abspath(".")
file = "file://" + abspath + "/Users/<USER>/data/titanic.csv"
print(file)
df = pd.read_csv(file, index_col="PassengerId")
imputer = Imputer(
    inputCols=["Age"],
    outputCol="Age").setStrategy("mean") # Replace missing values in Age column with the mean value
df.fillna(value={"Cabin" : "None"}, inplace=True) # Fill Cabin column with value "None" if missing
df.dropna(inplace=True) # Drop the rows which still have any missing value
output_path = "file://" + abspath + "/Users/<USER>/data/wrangled"
df.to_csv(output_path, index_col="PassengerId")

Uwaga

Ten przykładowy kod w języku Python używa metody pyspark.pandas. Obsługuje to tylko środowisko uruchomieniowe platformy Spark w wersji 3.2 lub nowszej.

Następne kroki