Esercitazione: COPY INTO con Spark SQL
Databricks consiglia di usare il comando COPY INTO per il caricamento incrementale e bulk dei dati per le origini dati che contengono migliaia di file. Databricks consiglia di usare il caricatore automatico per casi d'uso avanzati.
In questa esercitazione si usa il COPY INTO
comando per caricare i dati dall'archiviazione di oggetti cloud in una tabella nell'area di lavoro di Azure Databricks.
Requisiti
- Una sottoscrizione di Azure, un'area di lavoro di Azure Databricks in tale sottoscrizione e un cluster in tale area di lavoro. Per creare questi elementi, vedere Avvio rapido: Eseguire un processo Spark nell'area di lavoro di Azure Databricks usando il portale di Azure. Se si segue questa guida introduttiva, non è necessario seguire le istruzioni nella sezione Eseguire un processo Spark SQL.
- Un cluster all-purpose nell'area di lavoro che esegue Databricks Runtime 11.3 LTS o versione successiva. Per creare un cluster all-purpose, vedere Informazioni di riferimento sulla configurazione di calcolo.
- Familiarità con l'interfaccia utente dell'area di lavoro di Azure Databricks. Vedere Esplorare l'area di lavoro.
- Familiarità con i notebook di Databricks.
- Una posizione in cui è possibile scrivere i dati; questa demo usa la radice DBFS come esempio, ma Databricks consiglia un percorso di archiviazione esterno configurato con Unity Catalog.
Passaggio 1: Configurare l'ambiente e creare un generatore di dati
Questa esercitazione presuppone una conoscenza di base di Azure Databricks e di una configurazione predefinita dell'area di lavoro. Se non è possibile eseguire il codice specificato, contattare l'amministratore dell'area di lavoro per assicurarsi di avere accesso alle risorse di calcolo e a una posizione in cui è possibile scrivere i dati.
Si noti che il codice fornito usa un source
parametro per specificare il percorso che verrà configurato come COPY INTO
origine dati. Come scritto, questo codice punta a una posizione nella radice DBFS. Se si dispone delle autorizzazioni di scrittura per un percorso di archiviazione di oggetti esterno, sostituire la dbfs:/
parte della stringa di origine con il percorso dell'archivio oggetti. Poiché questo blocco di codice esegue anche un'eliminazione ricorsiva per reimpostare questa demo, assicurarsi di non puntare ai dati di produzione e di mantenere la /user/{username}/copy-into-demo
directory annidata per evitare la sovrascrittura o l'eliminazione di dati esistenti.
Creare un nuovo notebook SQL e collegarlo a un cluster che esegue Databricks Runtime 11.3 LTS o versione successiva.
Copiare ed eseguire il codice seguente per reimpostare il percorso di archiviazione e il database usati in questa esercitazione:
%python # Set parameters for isolation in workspace and reset demo username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0] database = f"copyinto_{username}_db" source = f"dbfs:/user/{username}/copy-into-demo" spark.sql(f"SET c.username='{username}'") spark.sql(f"SET c.database={database}") spark.sql(f"SET c.source='{source}'") spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE") spark.sql("CREATE DATABASE ${c.database}") spark.sql("USE ${c.database}") dbutils.fs.rm(source, True)
Copiare ed eseguire il codice seguente per configurare alcune tabelle e funzioni che verranno usate per generare dati in modo casuale:
-- Configure random data generator CREATE TABLE user_ping_raw (user_id STRING, ping INTEGER, time TIMESTAMP) USING json LOCATION ${c.source}; CREATE TABLE user_ids (user_id STRING); INSERT INTO user_ids VALUES ("potato_luver"), ("beanbag_lyfe"), ("default_username"), ("the_king"), ("n00b"), ("frodo"), ("data_the_kid"), ("el_matador"), ("the_wiz"); CREATE FUNCTION get_ping() RETURNS INT RETURN int(rand() * 250); CREATE FUNCTION is_active() RETURNS BOOLEAN RETURN CASE WHEN rand() > .25 THEN true ELSE false END;
Passaggio 2: Scrivere i dati di esempio nell'archiviazione cloud
La scrittura in formati di dati diversi da Delta Lake è rara in Azure Databricks. Il codice fornito qui scrive in JSON, simulando un sistema esterno che potrebbe eseguire il dump dei risultati da un altro sistema nell'archiviazione di oggetti.
Copiare ed eseguire il codice seguente per scrivere un batch di dati JSON non elaborati:
-- Write a new batch of data to the data source INSERT INTO user_ping_raw SELECT *, get_ping() ping, current_timestamp() time FROM user_ids WHERE is_active()=true;
Passaggio 3: Usare COPY INTO per caricare i dati JSON in modo idempotente
Prima di poter usare COPY INTO
, è necessario creare una tabella Delta Lake di destinazione. In Databricks Runtime 11.3 LTS e versioni successive non è necessario specificare altro nome di tabella nell'istruzione CREATE TABLE
. Per le versioni precedenti di Databricks Runtime, è necessario specificare uno schema durante la creazione di una tabella vuota.
Copiare ed eseguire il codice seguente per creare la tabella Delta di destinazione e caricare i dati dall'origine:
-- Create target table and load data CREATE TABLE IF NOT EXISTS user_ping_target; COPY INTO user_ping_target FROM ${c.source} FILEFORMAT = JSON FORMAT_OPTIONS ("mergeSchema" = "true") COPY_OPTIONS ("mergeSchema" = "true")
Poiché questa azione è idempotente, è possibile eseguirla più volte, ma i dati verranno caricati una sola volta.
Passaggio 4: Visualizzare in anteprima il contenuto della tabella
È possibile eseguire una semplice query SQL per esaminare manualmente il contenuto di questa tabella.
Copiare ed eseguire il codice seguente per visualizzare in anteprima la tabella:
-- Review updated table SELECT * FROM user_ping_target
Passaggio 5: Caricare più dati e visualizzare in anteprima i risultati
È possibile eseguire nuovamente i passaggi da 2 a 4 volte per trasferire nuovi batch di dati JSON non elaborati casuali nell'origine, caricarli in modo idempotente in Delta Lake con COPY INTO
e visualizzare in anteprima i risultati. Provare a eseguire questi passaggi non in ordine o più volte per simulare più batch di dati non elaborati scritti o eseguiti COPY INTO
più volte senza che siano arrivati nuovi dati.
Passaggio 6: Esercitazione sulla pulizia
Al termine di questa esercitazione, è possibile pulire le risorse associate se non si vogliono più mantenerle.
Copiare ed eseguire il codice seguente per eliminare il database, le tabelle e rimuovere tutti i dati:
%python # Drop database and tables and remove data spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE") dbutils.fs.rm(source, True)
Per arrestare la risorsa di calcolo, passare alla scheda Cluster e Terminare il cluster.
Risorse aggiuntive
- Articolo di riferimento su COPY INTO