Condividi tramite


Connettore azure Esplora dati per Apache Spark

Apache Spark è un motore di analisi unificato per l'elaborazione di dati su larga scala. Esplora dati di Azure è un servizio di analisi dei dati veloce e completamente gestito per l'analisi in tempo reale di volumi elevati di dati.

Il connettore Kusto per Spark è un progetto open source che può essere eseguito in qualsiasi cluster Spark. Implementa l'origine dati e il sink di dati per lo spostamento di dati tra cluster di Esplora dati di Azure e Spark. Usando Azure Esplora dati e Apache Spark, è possibile creare applicazioni veloci e scalabili destinate a scenari basati sui dati. Ad esempio, Machine Learning (ML), Extract-Transform-Load (ETL) e Log Analytics. Con il connettore, Azure Esplora dati diventa un archivio dati valido per le operazioni di origine e sink Spark standard, ad esempio scrittura, lettura e scritturaStream.

È possibile scrivere in Azure Esplora dati tramite l'inserimento in coda o l'inserimento in streaming. La lettura da Azure Esplora dati supporta l'eliminazione delle colonne e il pushdown del predicato, che filtra i dati in Azure Esplora dati, riducendo il volume dei dati trasferiti.

Nota

Per informazioni sull'uso del connettore Synapse Spark per Azure Esplora dati, vedere Connettersi ad Azure Esplora dati con Apache Spark per Azure Synapse Analytics.

Questo argomento descrive come installare e configurare il connettore Spark di Azure Esplora dati e spostare i dati tra cluster Di Azure Esplora dati e Apache Spark.

Nota

Anche se alcuni degli esempi seguenti fanno riferimento a un cluster Spark di Azure Databricks, azure Esplora dati connettore Spark non accetta dipendenze dirette da Databricks o da altre distribuzioni Spark.

Prerequisiti

Suggerimento

Sono supportate anche le versioni di Spark 2.3.x, ma potrebbero essere necessarie alcune modifiche nelle dipendenze pom.xml.

Come creare il connettore Spark

A partire dalla versione 2.3.0 sono stati introdotti nuovi ID artefatti che vanno a sostituire spark-kusto-connector: kusto-spark_3.0_2.12 destinati a Spark 3.x e Scala 2.12.

Nota

Le versioni precedenti alla 2.5.1 non funzionano più per l'inserimento in una tabella esistente. Si prega di fare l’aggiornamento a una versione successiva. Questo passaggio è facoltativo. Se si usano librerie predefinite, ad esempio Maven, vedere Configurazione del cluster Spark.

Prerequisiti di creazione

  1. Fare riferimento a questa origine per la compilazione del connettore Spark.

  2. Per le applicazioni Scala e Java che usano le definizioni del progetto Maven, collegare l'applicazione con l'elemento seguente. Trovare l'elemento più recente in Maven Central.

    For more information, see [https://mvnrepository.com/artifact/com.microsoft.azure.kusto/kusto-spark_3.0_2.12](https://mvnrepository.com/artifact/com.microsoft.azure.kusto/kusto-spark_3.0_2.12).
    
    
  3. Se non si usano librerie predefinite, è necessario installare le librerie elencate nelle dipendenze, incluse le librerie Kusto Java SDK seguenti. Per trovare la versione corretta da installare, consultare il pom della versione pertinente:

    1. Per compilare file con estensione jar ed eseguire tutti i test:

      mvn clean package -DskipTests
      
    2. Per compilare file con estensione jar, eseguire tutti i test e installare i file con estensione jar nel repository Maven locale:

      mvn clean install -DskipTests
      

Per maggiori informazioni, vedere Uso del connettore.

Configurazione del cluster Spark

Nota

È consigliabile usare la versione più recente del connettore Spark Kusto quando si eseguono i passaggi seguenti.

  1. Configurare le seguenti impostazioni del cluster Spark, in base al cluster Azure Databricks Spark 3.0.1 e Scala 2.12:

    Impostazioni del cluster Databricks.

  2. Installare la libreria spark-kusto-connector più recente da Maven:

    Importare le librerie.Selezionare Spark-Kusto-Connector.

  3. Verificare che tutte le librerie necessarie siano installate:

    Verificare le librerie installate.

  4. Per l'installazione con un file JAR, verificare che siano state installate altre dipendenze:

    Aggiungere le dipendenze.

Autenticazione

Il connettore Kusto Spark consente di eseguire l'autenticazione con l’ID Microsoft Entra usando uno dei metodi seguenti:

Autenticazione dell’applicazione Microsoft Entra

L'autenticazione dell'applicazione Microsoft Entra è il metodo di autenticazione più semplice e comune ed è consigliato per il connettore Kusto Spark.

  1. Accedere alla sottoscrizione di Azure usando l'interfaccia della riga di comando di Azure. Eseguire quindi l'autenticazione nel browser.

    az login
    
  2. Scegliere la sottoscrizione per ospitare l'entità di sicurezza. Questo passaggio è necessario quando si hanno più sottoscrizioni.

    az account set --subscription YOUR_SUBSCRIPTION_GUID
    
  3. Creare l'entità servizio. In questo esempio l'entità servizio viene chiamata my-service-principal.

    az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
    
  4. Dai dati JSON restituiti copiare appId, password e tenant per un uso futuro.

    {
      "appId": "00001111-aaaa-2222-bbbb-3333cccc4444",
      "displayName": "my-service-principal",
      "name": "my-service-principal",
      "password": "00001111-aaaa-2222-bbbb-3333cccc4444",
      "tenant": "00001111-aaaa-2222-bbbb-3333cccc4444"
    }
    

L'applicazione Microsoft Entra e l'entità servizio sono state create.

Il connettore Spark usa le seguenti proprietà dell'app Entra per l'autenticazione:

Proprietà Stringa opzione Descrizione
KUSTO_AAD_APP_ID kustoAadAppId (Client) identifier dell’Applicazione Microsoft Entra.
KUSTO_AAD_AUTHORITY_ID kustoAadAuthorityID Autorità di autenticazione di Microsoft Entra. ID (tenant) di Microsoft Entra directory. Facoltativo - il valore predefinito è microsoft.com. Per maggiori informazioni, vedere Microsoft Entra authority.
KUSTO_AAD_APP_SECRET kustoAadAppSecret Application key per il client Microsoft Entra.
KUSTO_ACCESS_TOKEN kustoAccessToken Se si ha già un accessToken creato con accesso a Kusto, che può essere usato anche dal connettore per l'autenticazione.

Nota

Le versioni precedenti dell'API (prima della 2.0.0) hanno la seguente denominazione: "kustoAADClientID", "kustoClientAADClientPassword", "kustoAADAuthorityID"

Privilegi Kusto

Concedere i seguenti privilegi sul lato kusto in base all'operazione Spark che si vuole eseguire.

Operazione Spark Privilegi
Lettura - Modalità singola Lettore
Lettura - forzare la modalità distribuita Lettore
Write - modalità in coda con l'opzione createTableIfNotExist table create Amministratore
Scrittura - modalità in coda con l'opzione di creazione della tabella FailIfNotExist Ingestor
Scrittura - TransactionalMode Amministratore

Per maggiori informazioni sui ruoli principali, vedere Che cos'è il controllo degli accessi in base al ruolo?. Per la gestione dei ruoli di sicurezza, vedere Gestione dei ruoli di sicurezza.

Sink Spark: scrittura in Kusto

  1. Impostazione dei parametri sink:

    val KustoSparkTestAppId = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppId")
    val KustoSparkTestAppKey = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppKey")
    
    val appId = KustoSparkTestAppId
    val appKey = KustoSparkTestAppKey
    val authorityId = "72f988bf-86f1-41af-91ab-2d7cd011db47" // Optional - defaults to microsoft.com
    val cluster = "Sparktest.eastus2"
    val database = "TestDb"
    val table = "StringAndIntTable"
    
  2. Scrivere un dataframe Spark nel cluster Kusto come batch:

    import com.microsoft.kusto.spark.datasink.KustoSinkOptions
    import org.apache.spark.sql.{SaveMode, SparkSession}
    
    df.write
      .format("com.microsoft.kusto.spark.datasource")
      .option(KustoSinkOptions.KUSTO_CLUSTER, cluster)
      .option(KustoSinkOptions.KUSTO_DATABASE, database)
      .option(KustoSinkOptions.KUSTO_TABLE, "Demo3_spark")
      .option(KustoSinkOptions.KUSTO_AAD_APP_ID, appId)
      .option(KustoSinkOptions.KUSTO_AAD_APP_SECRET, appKey)
      .option(KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID, authorityId)
      .option(KustoSinkOptions.KUSTO_TABLE_CREATE_OPTIONS, "CreateIfNotExist")
      .mode(SaveMode.Append)
      .save()  
    

    In alternativa, usare la sintassi semplificata:

    import com.microsoft.kusto.spark.datasink.SparkIngestionProperties
    import com.microsoft.kusto.spark.sql.extension.SparkExtension._
    
    val sparkIngestionProperties = Some(new SparkIngestionProperties()) // Optional, use None if not needed
    df.write.kusto(cluster, database, table, conf, sparkIngestionProperties)
    
  3. Scrivere dati di streaming:

    import org.apache.spark.sql.streaming.Trigger
    import java.util.concurrent.TimeUnit
    import java.util.concurrent.TimeUnit
    import org.apache.spark.sql.streaming.Trigger
    
    // Set up a checkpoint and disable codeGen. 
    spark.conf.set("spark.sql.streaming.checkpointLocation", "/FileStore/temp/checkpoint")
    
    // Write to a Kusto table from a streaming source
    val kustoQ = df
      .writeStream
      .format("com.microsoft.kusto.spark.datasink.KustoSinkProvider")
      .options(conf) 
      .trigger(Trigger.ProcessingTime(TimeUnit.SECONDS.toMillis(10))) // Sync this with the ingestionBatching policy of the database
      .start()
    

Origine Spark: lettura da Kusto

  1. Durante la lettura di piccole quantità di dati, definire la query sui dati:

    import com.microsoft.kusto.spark.datasource.KustoSourceOptions
    import org.apache.spark.SparkConf
    import org.apache.spark.sql._
    import com.microsoft.azure.kusto.data.ClientRequestProperties
    
    val query = s"$table | where (ColB % 1000 == 0) | distinct ColA"
    val conf: Map[String, String] = Map(
          KustoSourceOptions.KUSTO_AAD_APP_ID -> appId,
          KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey
        )
    
    val df = spark.read.format("com.microsoft.kusto.spark.datasource").
      options(conf).
      option(KustoSourceOptions.KUSTO_QUERY, query).
      option(KustoSourceOptions.KUSTO_DATABASE, database).
      option(KustoSourceOptions.KUSTO_CLUSTER, cluster).
      load()
    
    // Simplified syntax flavor
    import com.microsoft.kusto.spark.sql.extension.SparkExtension._
    
    val cpr: Option[ClientRequestProperties] = None // Optional
    val df2 = spark.read.kusto(cluster, database, query, conf, cpr)
    display(df2)
    
  2. Facoltativo: se si fornisce l'archivio BLOB temporaneo (e non Kusto) i BLOB vengono creati sotto la responsabilità del chiamante. Ciò comprende il provisioning dell'archiviazione, la rotazione delle chiavi di accesso e l'eliminazione di elementi temporanei. Il modulo KustoBlobStorageUtils contiene funzioni helper per l'eliminazione di BLOB in base alle coordinate di account e contenitori e alle credenziali dell'account oppure a un URL SAS con autorizzazioni di scrittura, lettura ed elenco. Quando il set di dati RDD corrispondente non è più necessario, ogni transazione archivia gli artefatti BLOB temporanei in una directory separata. Questa directory viene acquisita come parte dei log delle informazioni sulle transazioni di lettura segnalati nel nodo Driver Spark.

    // Use either container/account-key/account name, or container SaS
    val container = dbutils.secrets.get(scope = "KustoDemos", key = "blobContainer")
    val storageAccountKey = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountKey")
    val storageAccountName = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountName")
    // val storageSas = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageSasUrl")
    

    Nell'esempio precedente, l'insieme di credenziali delle chiavi non è accessibile usando l'interfaccia del connettore; viene usato un metodo più semplice per l'uso dei segreti di Databricks.

  3. Leggere da Kusto.

    • Se si fornisce l'archivio BLOB temporaneo, leggere da Kusto come segue:

       val conf3 = Map(
            KustoSourceOptions.KUSTO_AAD_APP_ID -> appId,
            KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey
            KustoSourceOptions.KUSTO_BLOB_STORAGE_SAS_URL -> storageSas)
      val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3)
      
      val dfFiltered = df2
        .where(df2.col("ColA").startsWith("row-2"))
        .filter("ColB > 12")
        .filter("ColB <= 21")
        .select("ColA")
      
      display(dfFiltered)
      
    • Se Kusto fornisce l'archivio BLOB temporaneo, leggere da Kusto come segue:

      val conf3 = Map(
        KustoSourceOptions.KUSTO_AAD_CLIENT_ID -> appId,
        KustoSourceOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey)
      val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3)
      
      val dfFiltered = df2
        .where(df2.col("ColA").startsWith("row-2"))
        .filter("ColB > 12")
        .filter("ColB <= 21")
        .select("ColA")
      
      display(dfFiltered)