Condividi tramite


Modelli comuni di caricamento dei dati

Il caricatore automatico semplifica una serie di attività comuni di inserimento dati. Questo riferimento rapido fornisce esempi per diversi modelli comuni.

Filtro di directory o file usando modelli GLOB

I modelli Glob possono essere usati per filtrare directory e file quando specificati nel percorso.

Modello Descrizione
? Corrisponde a qualsiasi carattere singolo
* Corrisponde a zero o più caratteri
[abc] Trova la corrispondenza di un singolo carattere dal carattere set {a,b,c}.
[a-z] Trova la corrispondenza di un singolo carattere dall'intervallo di caratteri {a... z}.
[^a] Corrisponde a un singolo carattere che non fa parte del carattere set o dell'intervallo {a}. Si noti che il ^ carattere deve essere immediatamente a destra della parentesi aperta.
{ab,cd} Corrisponde a una stringa dalla stringa set {ab, cd}.
{ab,c{de, fh}} Trova la corrispondenza di una stringa dalla stringa set {ab, cde, cfh}.

path Usare per fornire modelli di prefisso, ad esempio:

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", <format>) \
  .schema(schema) \
  .load("<base-path>/*/files")

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", <format>)
  .schema(schema)
  .load("<base-path>/*/files")

Importante

È necessario usare l'opzione pathGlobFilter per fornire in modo esplicito modelli di suffisso. Fornisce path solo un filtro di prefisso.

Ad esempio, se si desidera analizzare solo png i file in una directory contenente file con suffissi diversi, è possibile eseguire le operazioni seguenti:

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "binaryFile") \
  .option("pathGlobfilter", "*.png") \
  .load(<base-path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "binaryFile")
  .option("pathGlobfilter", "*.png")
  .load(<base-path>)

Nota

Il comportamento predefinito del caricatore automatico è diverso dal comportamento predefinito di altre origini file Spark. Aggiungere .option("cloudFiles.useStrictGlobber", "true") alla lettura per usare il globbing che corrisponde al comportamento predefinito di Spark rispetto alle origini file. Per altre informazioni sul globbing, vedere il table seguente:

Modello Percorso file Globber predefinito Globber rigoroso
/a/b /a/b/c/file.txt
/a/b /a/b_dir/c/file.txt No No
/a/b /a/b.txt No No
/a/b/ /a/b.txt No No
/a/*/c/ /a/b/c/file.txt
/a/*/c/ /a/b/c/d/file.txt
/a/*/c/ /a/b/x/y/c/file.txt No
/a/*/c /a/b/c_file.txt No
/a/*/c/ /a/b/c_file.txt No
/a/*/c/ /a/*/cookie/file.txt No
/a/b* /a/b.txt
/a/b* /a/b/file.txt
/a/{0.txt,1.txt} /a/0.txt
/a/*/{0.txt,1.txt} /a/0.txt No No
/a/b/[cde-h]/i/ /a/b/c/i/file.txt

Abilitare easy ETL

Un modo semplice per get i dati in Delta Lake senza perdere dati consiste nell'usare il modello seguente e abilitare l'inferenza di schema con il caricatore automatico. Databricks consiglia di eseguire il codice seguente in un processo di Azure Databricks per riavviare automaticamente il flusso quando cambia il schema dei dati di origine. Per impostazione predefinita, il schema viene considerato come tipo stringa, eventuali errori di analisi (non dovrebbero verificarsi se tutto rimane come stringa) passeranno a _rescued_datae qualsiasi nuovo columns comprometterà il flusso e comporterà cambiamenti nel schema.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>")
  .load("<path-to-source-data>")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Evitare la perdita di dati in dati ben strutturati

Quando si conosce il schema, ma si vuole sapere ogni volta che si ricevono dati imprevisti, Databricks consiglia di usare il rescuedDataColumn.

Python

spark.readStream.format("cloudFiles") \
  .schema(expected_schema) \
  .option("cloudFiles.format", "json") \
  # will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .schema(expected_schema)
  .option("cloudFiles.format", "json")
  // will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Se si vuole che il flusso interrompa l'elaborazione se viene introdotto un nuovo campo che non corrisponde al schema, è possibile aggiungere:

.option("cloudFiles.schemaEvolutionMode", "failOnNewColumns")

Abilitare pipeline di dati semistrutturate flessibili

Quando ricevi dati da un fornitore che introduce nuovi columns nelle informazioni che ti fornisce, potresti non sapere esattamente quando lo fanno, oppure potresti non avere le risorse per update la tua pipeline di dati. È ora possibile sfruttare schema'evoluzione per riavviare il flusso e consentire al caricatore automatico di update il schema dedotto automaticamente. È anche possibile sfruttare schemaHints per alcuni dei campi "senza schema" che il fornitore potrebbe fornire.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT") \
  .load("/api/requests") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT")
  .load("/api/requests")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Trasformare i dati JSON annidati

Poiché il caricatore automatico deduce il columns JSON di primo livello come stringhe, è possibile rimanere con oggetti JSON annidati che richiedono ulteriori trasformazioni. È possibile usare le API di accesso ai dati semistrutturate per trasformare ulteriormente il contenuto JSON complesso.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
  .load("<source-data-with-nested-json>") \
  .selectExpr(
    "*",
    "tags:page.name",    # extracts {"tags":{"page":{"name":...}}}
    "tags:page.id::int", # extracts {"tags":{"page":{"id":...}}} and casts to int
    "tags:eventType"     # extracts {"tags":{"eventType":...}}
  )

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<source-data-with-nested-json>")
  .selectExpr(
    "*",
    "tags:page.name",     // extracts {"tags":{"page":{"name":...}}}
    "tags:page.id::int",  // extracts {"tags":{"page":{"id":...}}} and casts to int
    "tags:eventType"      // extracts {"tags":{"eventType":...}}
  )

Dedurre i dati JSON annidati

Quando hai dati annidati, puoi usare l'opzione cloudFiles.inferColumnTypes per dedurre la struttura dei dati annidati e altri tipi di column.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
  .option("cloudFiles.inferColumnTypes", "true") \
  .load("<source-data-with-nested-json>")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .option("cloudFiles.inferColumnTypes", "true")
  .load("<source-data-with-nested-json>")

Caricare file CSV senza intestazioni

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
  .schema(<schema>) \ # provide a schema here for the files
  .load(<path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
  .schema(<schema>) // provide a schema here for the files
  .load(<path>)

Applicare un schema ai file CSV con intestazioni

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("header", "true") \
  .option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
  .schema(<schema>) \ # provide a schema here for the files
  .load(<path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("header", "true")
  .option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
  .schema(<schema>) // provide a schema here for the files
  .load(<path>)

Inserire dati immagine o binari in Delta Lake per ML

Dopo aver archiviato i dati in Delta Lake, è possibile eseguire l'inferenza distribuita sui dati. Vedere Eseguire l'inferenza distribuita usando la funzione definita dall'utente pandas.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "binaryFile") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "binaryFile")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Sintassi del caricatore automatico per DLT

Delta Live Tables fornisce una sintassi Python leggermente modificata per Auto Loader e aggiunge il supporto SQL per quest'ultimo.

Gli esempi seguenti usano Il caricatore automatico per creare set di dati da file CSV e JSON:

Python

@dlt.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/databricks-datasets/retail-org/customers/")
  )

@dlt.table
def sales_orders_raw():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("/databricks-datasets/retail-org/sales_orders/")
  )

SQL

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders/", "json")

È possibile usare le opzioni di formato supportate con Il caricatore automatico. Usando la map() funzione , è possibile passare le opzioni al read_files() metodo . Le opzioni sono coppie chiave-valore, where le chiavi e values sono stringhe. Di seguito viene descritta la sintassi per l'uso del caricatore automatico in SQL:

CREATE OR REFRESH STREAMING TABLE <table-name>
AS SELECT *
  FROM read_files(
    "<file-path>",
    "<file-format>",
    map(
      "<option-key>", "<option_value",
      "<option-key>", "<option_value",
      ...
    )
  )

L'esempio seguente legge i dati dai file CSV delimitati da tabulazioni con un'intestazione:

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers/", "csv", map("delimiter", "\t", "header", "true"))

È possibile usare il schema per specificare il formato manualmente; è necessario specificare il schema per i formati che non supportano l'inferenza schema:

Python

@dlt.table
def wiki_raw():
  return (
    spark.readStream.format("cloudFiles")
      .schema("title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
      .option("cloudFiles.format", "parquet")
      .load("/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet")
  )

SQL

CREATE OR REFRESH STREAMING TABLE wiki_raw
AS SELECT *
  FROM read_files(
    "/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet",
    "parquet",
    map("schema", "title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
  )

Nota

Delta Live Tables configura e gestisce automaticamente le directory schema e di checkpoint quando si usa Auto Loader per leggere i file. Tuttavia, se si configura manualmente una di queste directory, l'esecuzione di un refresh completo non influisce sul contenuto delle directory configurate. Databricks consiglia di usare le directory configurate automaticamente per evitare effetti collaterali imprevisti durante l'elaborazione.