Modelli comuni di caricamento dei dati
Il caricatore automatico semplifica una serie di attività comuni di inserimento dati. Questo riferimento rapido fornisce esempi per diversi modelli comuni.
Filtro di directory o file usando modelli GLOB
I modelli Glob possono essere usati per filtrare directory e file quando specificati nel percorso.
Modello | Descrizione |
---|---|
? |
Corrisponde a qualsiasi carattere singolo |
* |
Corrisponde a zero o più caratteri |
[abc] |
Trova la corrispondenza di un singolo carattere dal carattere set {a,b,c}. |
[a-z] |
Trova la corrispondenza di un singolo carattere dall'intervallo di caratteri {a... z}. |
[^a] |
Corrisponde a un singolo carattere che non fa parte del carattere set o dell'intervallo {a}. Si noti che il ^ carattere deve essere immediatamente a destra della parentesi aperta. |
{ab,cd} |
Corrisponde a una stringa dalla stringa set {ab, cd}. |
{ab,c{de, fh}} |
Trova la corrispondenza di una stringa dalla stringa set {ab, cde, cfh}. |
path
Usare per fornire modelli di prefisso, ad esempio:
Python
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", <format>) \
.schema(schema) \
.load("<base-path>/*/files")
Scala
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", <format>)
.schema(schema)
.load("<base-path>/*/files")
Importante
È necessario usare l'opzione pathGlobFilter
per fornire in modo esplicito modelli di suffisso. Fornisce path
solo un filtro di prefisso.
Ad esempio, se si desidera analizzare solo png
i file in una directory contenente file con suffissi diversi, è possibile eseguire le operazioni seguenti:
Python
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "binaryFile") \
.option("pathGlobfilter", "*.png") \
.load(<base-path>)
Scala
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "binaryFile")
.option("pathGlobfilter", "*.png")
.load(<base-path>)
Nota
Il comportamento predefinito del caricatore automatico è diverso dal comportamento predefinito di altre origini file Spark. Aggiungere .option("cloudFiles.useStrictGlobber", "true")
alla lettura per usare il globbing che corrisponde al comportamento predefinito di Spark rispetto alle origini file. Per altre informazioni sul globbing, vedere il table seguente:
Modello | Percorso file | Globber predefinito | Globber rigoroso |
---|---|---|---|
/a/b | /a/b/c/file.txt | Sì | Sì |
/a/b | /a/b_dir/c/file.txt | No | No |
/a/b | /a/b.txt | No | No |
/a/b/ | /a/b.txt | No | No |
/a/*/c/ | /a/b/c/file.txt | Sì | Sì |
/a/*/c/ | /a/b/c/d/file.txt | Sì | Sì |
/a/*/c/ | /a/b/x/y/c/file.txt | Sì | No |
/a/*/c | /a/b/c_file.txt | Sì | No |
/a/*/c/ | /a/b/c_file.txt | Sì | No |
/a/*/c/ | /a/*/cookie/file.txt | Sì | No |
/a/b* | /a/b.txt | Sì | Sì |
/a/b* | /a/b/file.txt | Sì | Sì |
/a/{0.txt,1.txt} | /a/0.txt | Sì | Sì |
/a/*/{0.txt,1.txt} | /a/0.txt | No | No |
/a/b/[cde-h]/i/ | /a/b/c/i/file.txt | Sì | Sì |
Abilitare easy ETL
Un modo semplice per get i dati in Delta Lake senza perdere dati consiste nell'usare il modello seguente e abilitare l'inferenza di schema con il caricatore automatico. Databricks consiglia di eseguire il codice seguente in un processo di Azure Databricks per riavviare automaticamente il flusso quando cambia il schema dei dati di origine. Per impostazione predefinita, il schema viene considerato come tipo stringa, eventuali errori di analisi (non dovrebbero verificarsi se tutto rimane come stringa) passeranno a _rescued_data
e qualsiasi nuovo columns comprometterà il flusso e comporterà cambiamenti nel schema.
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
.option("cloudFiles.schemaLocation", "<path-to-schema-location>") \
.load("<path-to-source-data>") \
.writeStream \
.option("mergeSchema", "true") \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "<path-to-schema-location>")
.load("<path-to-source-data>")
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Evitare la perdita di dati in dati ben strutturati
Quando si conosce il schema, ma si vuole sapere ogni volta che si ricevono dati imprevisti, Databricks consiglia di usare il rescuedDataColumn
.
Python
spark.readStream.format("cloudFiles") \
.schema(expected_schema) \
.option("cloudFiles.format", "json") \
# will collect all new fields as well as data type mismatches in _rescued_data
.option("cloudFiles.schemaEvolutionMode", "rescue") \
.load("<path-to-source-data>") \
.writeStream \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
Scala
spark.readStream.format("cloudFiles")
.schema(expected_schema)
.option("cloudFiles.format", "json")
// will collect all new fields as well as data type mismatches in _rescued_data
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Se si vuole che il flusso interrompa l'elaborazione se viene introdotto un nuovo campo che non corrisponde al schema, è possibile aggiungere:
.option("cloudFiles.schemaEvolutionMode", "failOnNewColumns")
Abilitare pipeline di dati semistrutturate flessibili
Quando ricevi dati da un fornitore che introduce nuovi columns nelle informazioni che ti fornisce, potresti non sapere esattamente quando lo fanno, oppure potresti non avere le risorse per update la tua pipeline di dati. È ora possibile sfruttare schema'evoluzione per riavviare il flusso e consentire al caricatore automatico di update il schema dedotto automaticamente. È anche possibile sfruttare schemaHints
per alcuni dei campi "senza schema" che il fornitore potrebbe fornire.
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
# will ensure that the headers column gets processed as a map
.option("cloudFiles.schemaHints",
"headers map<string,string>, statusCode SHORT") \
.load("/api/requests") \
.writeStream \
.option("mergeSchema", "true") \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
// will ensure that the headers column gets processed as a map
.option("cloudFiles.schemaHints",
"headers map<string,string>, statusCode SHORT")
.load("/api/requests")
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Trasformare i dati JSON annidati
Poiché il caricatore automatico deduce il columns JSON di primo livello come stringhe, è possibile rimanere con oggetti JSON annidati che richiedono ulteriori trasformazioni. È possibile usare le API di accesso ai dati semistrutturate per trasformare ulteriormente il contenuto JSON complesso.
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
# The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
.load("<source-data-with-nested-json>") \
.selectExpr(
"*",
"tags:page.name", # extracts {"tags":{"page":{"name":...}}}
"tags:page.id::int", # extracts {"tags":{"page":{"id":...}}} and casts to int
"tags:eventType" # extracts {"tags":{"eventType":...}}
)
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
// The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.load("<source-data-with-nested-json>")
.selectExpr(
"*",
"tags:page.name", // extracts {"tags":{"page":{"name":...}}}
"tags:page.id::int", // extracts {"tags":{"page":{"id":...}}} and casts to int
"tags:eventType" // extracts {"tags":{"eventType":...}}
)
Dedurre i dati JSON annidati
Quando hai dati annidati, puoi usare l'opzione cloudFiles.inferColumnTypes
per dedurre la struttura dei dati annidati e altri tipi di column.
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
# The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
.option("cloudFiles.inferColumnTypes", "true") \
.load("<source-data-with-nested-json>")
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
// The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.option("cloudFiles.inferColumnTypes", "true")
.load("<source-data-with-nested-json>")
Caricare file CSV senza intestazioni
Python
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "csv") \
.option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
.schema(<schema>) \ # provide a schema here for the files
.load(<path>)
Scala
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
.schema(<schema>) // provide a schema here for the files
.load(<path>)
Applicare un schema ai file CSV con intestazioni
Python
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "csv") \
.option("header", "true") \
.option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
.schema(<schema>) \ # provide a schema here for the files
.load(<path>)
Scala
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("header", "true")
.option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
.schema(<schema>) // provide a schema here for the files
.load(<path>)
Inserire dati immagine o binari in Delta Lake per ML
Dopo aver archiviato i dati in Delta Lake, è possibile eseguire l'inferenza distribuita sui dati. Vedere Eseguire l'inferenza distribuita usando la funzione definita dall'utente pandas.
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "binaryFile") \
.load("<path-to-source-data>") \
.writeStream \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "binaryFile")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Sintassi del caricatore automatico per DLT
Delta Live Tables fornisce una sintassi Python leggermente modificata per Auto Loader e aggiunge il supporto SQL per quest'ultimo.
Gli esempi seguenti usano Il caricatore automatico per creare set di dati da file CSV e JSON:
Python
@dlt.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/databricks-datasets/retail-org/customers/")
)
@dlt.table
def sales_orders_raw():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders/")
)
SQL
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers/", "csv")
CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders/", "json")
È possibile usare le opzioni di formato supportate con Il caricatore automatico. Usando la map()
funzione , è possibile passare le opzioni al read_files()
metodo . Le opzioni sono coppie chiave-valore, where le chiavi e values sono stringhe. Di seguito viene descritta la sintassi per l'uso del caricatore automatico in SQL:
CREATE OR REFRESH STREAMING TABLE <table-name>
AS SELECT *
FROM read_files(
"<file-path>",
"<file-format>",
map(
"<option-key>", "<option_value",
"<option-key>", "<option_value",
...
)
)
L'esempio seguente legge i dati dai file CSV delimitati da tabulazioni con un'intestazione:
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers/", "csv", map("delimiter", "\t", "header", "true"))
È possibile usare il schema
per specificare il formato manualmente; è necessario specificare il schema
per i formati che non supportano l'inferenza schema:
Python
@dlt.table
def wiki_raw():
return (
spark.readStream.format("cloudFiles")
.schema("title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
.option("cloudFiles.format", "parquet")
.load("/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet")
)
SQL
CREATE OR REFRESH STREAMING TABLE wiki_raw
AS SELECT *
FROM read_files(
"/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet",
"parquet",
map("schema", "title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
)
Nota
Delta Live Tables configura e gestisce automaticamente le directory schema e di checkpoint quando si usa Auto Loader per leggere i file. Tuttavia, se si configura manualmente una di queste directory, l'esecuzione di un refresh completo non influisce sul contenuto delle directory configurate. Databricks consiglia di usare le directory configurate automaticamente per evitare effetti collaterali imprevisti durante l'elaborazione.