Leggere e scrivere file XML
Importante
Questa funzionalità è disponibile in anteprima pubblica.
Questo articolo descrive come leggere e scrivere file XML.
Extensible Markup Language (XML) è un linguaggio di markup per la formattazione, l'archiviazione e la condivisione dei dati in formato testuale. Stabilisce un "set" di regole per la serializzazione dei dati, da documenti a strutture di dati arbitrarie.
Il supporto del formato di file XML nativo consente l'inserimento, l'esecuzione di query e l'analisi dei dati XML per l'elaborazione batch o lo streaming. Può dedurre ed evolvere automaticamente schema e tipi di dati, supporta espressioni SQL come from_xml
e può generate documenti XML. Non richiede file JAR esterni e funziona perfettamente con il caricatore read_files
automatico e COPY INTO
. Facoltativamente, è possibile convalidare ogni record XML a livello di riga in base a una definizione XSD (XML Schema Definition).
Requisiti
Databricks Runtime 14.3 e versioni successive
Analizzare i record XML
La specifica XML impone una struttura ben formata. Tuttavia, questa specifica non esegue immediatamente il mapping a un formato tabulare. È necessario specificare l'opzione rowTag
per indicare l'elemento XML mappato a un oggetto DataFrame
Row
. L'elemento rowTag
diventa l'elemento di primo livello struct
. Gli elementi figlio di rowTag
diventano i campi del livello struct
superiore.
È possibile specificare il schema per questo record o lasciarlo dedurre automaticamente. Poiché il parser esamina solo gli rowTag
elementi, le entità DTD ed esterne vengono filtrate.
Gli esempi seguenti illustrano l'inferenza schema e l'analisi di un file XML usando opzioni diverse di rowTag
.
Python
xmlString = """
<books>
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
</book>
<book id="bk104">
<author>Corets, Eva</author>
<title>Oberon's Legacy</title>
</book>
</books>"""
xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString, True)
Scala
val xmlString = """
<books>
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
</book>
<book id="bk104">
<author>Corets, Eva</author>
<title>Oberon's Legacy</title>
</book>
</books>"""
val xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString)
Leggere il file XML con rowTag
l'opzione "books":
Python
df = spark.read.option("rowTag", "books").format("xml").load(xmlPath)
df.printSchema()
df.show(truncate=False)
Scala
val df = spark.read.option("rowTag", "books").xml(xmlPath)
df.printSchema()
df.show(truncate=false)
Output:
root
|-- book: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _id: string (nullable = true)
| | |-- author: string (nullable = true)
| | |-- title: string (nullable = true)
+------------------------------------------------------------------------------+
|book |
+------------------------------------------------------------------------------+
|[{bk103, Corets, Eva, Maeve Ascendant}, {bk104, Corets, Eva, Oberon's Legacy}]|
+------------------------------------------------------------------------------+
Leggere il file XML con rowTag
come "book":
Python
df = spark.read.option("rowTag", "book").format("xml").load(xmlPath)
# Infers three top-level fields and parses `book` in separate rows:
Scala
val df = spark.read.option("rowTag", "book").xml(xmlPath)
// Infers three top-level fields and parses `book` in separate rows:
Output:
root
|-- _id: string (nullable = true)
|-- author: string (nullable = true)
|-- title: string (nullable = true)
+-----+-----------+---------------+
|_id |author |title |
+-----+-----------+---------------+
|bk103|Corets, Eva|Maeve Ascendant|
|bk104|Corets, Eva|Oberon's Legacy|
+-----+-----------+---------------+
Opzioni origine dati
È possibile specificare le opzioni dell'origine dati per XML nei modi seguenti:
- Metodi
.option/.options
dei seguenti:- DataFrameReader
- DataFrameWriter
- DataStreamReader
- DataStreamWriter
- Le funzioni predefinite seguenti:
- Clausola
OPTIONS
di CREATE TABLE UTILIZZANDO DATA_SOURCE
Per un list di opzioni, vedere Opzioni del caricatore automatico.
Supporto XSD
Facoltativamente, è possibile convalidare ogni record XML a livello di riga da un file XSD (XML Schema Definition). Il file XSD viene specificato nell'opzione rowValidationXSDPath
. L'XSD non influisce altrimenti sul schema fornito o dedotto. Un record che non riesce la convalida viene contrassegnato come "danneggiato" e gestito in base all'opzione modalità di gestione dei record danneggiata descritta nella sezione dell'opzione.
È possibile usare XSDToSchema
per estrarre un dataframe Spark schema da un file XSD. Supporta solo tipi di sequenza semplici, complessi e supporta solo le funzionalità XSD di base.
import org.apache.spark.sql.execution.datasources.xml.XSDToSchema
import org.apache.hadoop.fs.Path
val xsdPath = "dbfs:/tmp/books.xsd"
val xsdString = """<?xml version="1.0" encoding="UTF-8" ?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:element name="book">
<xs:complexType>
<xs:sequence>
<xs:element name="author" type="xs:string" />
<xs:element name="title" type="xs:string" />
<xs:element name="genre" type="xs:string" />
<xs:element name="price" type="xs:decimal" />
<xs:element name="publish_date" type="xs:date" />
<xs:element name="description" type="xs:string" />
</xs:sequence>
<xs:attribute name="id" type="xs:string" use="required" />
</xs:complexType>
</xs:element>
</xs:schema>"""
dbutils.fs.put(xsdPath, xsdString, true)
val schema1 = XSDToSchema.read(xsdString)
val schema2 = XSDToSchema.read(new Path(xsdPath))
L'table seguente illustra la conversione dei tipi di dati XSD in tipi di dati Spark:
Tipi di dati XSD | Tipi di dati Spark |
---|---|
boolean |
BooleanType |
decimal |
DecimalType |
unsignedLong |
DecimalType(38, 0) |
double |
DoubleType |
float |
FloatType |
byte |
ByteType |
short , unsignedByte |
ShortType |
integer , negativeInteger , nonNegativeInteger , nonPositiveInteger , positiveInteger unsignedShort |
IntegerType |
long , unsignedInt |
LongType |
date |
DateType |
dateTime |
TimestampType |
Others |
StringType |
Analizzare xml annidato
I dati XML, rappresentati da column in un dataframe esistente, possono essere analizzati con schema_of_xml
e from_xml
che restituiscono il schema e i risultati analizzati come nuovi struct
columns. I dati XML passati come argomento a schema_of_xml
e from_xml
devono essere un singolo record XML ben formato.
schema_of_xml
Sintassi
schema_of_xml(xmlStr [, options] )
Argomenti
-
xmlStr
: espressione STRING che specifica un singolo record XML ben formato. -
options
: valore letterale facoltativoMAP<STRING,STRING>
che specifica le direttive.
Resi
Stringa contenente una definizione di struct con n campi di stringhe where, i cui nomi column derivano dai nomi di elementi e attributi XML. Il campo values contiene i tipi SQL derivati formattati.
from_xml
Sintassi
from_xml(xmlStr, schema [, options])
Argomenti
-
xmlStr
: espressione STRING che specifica un singolo record XML ben formato. -
schema
: espressione STRING o chiamata dellaschema_of_xml
funzione. -
options
: valore letterale facoltativoMAP<STRING,STRING>
che specifica le direttive.
Resi
Una struct con nomi di campo e di tipo corrispondenti alla definizione di schema.
Schema deve essere definito come coppie nome e tipo di dati delimit column ate da virgole come usato in, ad esempio, CREATE TABLE
. La maggior parte delle opzioni visualizzate nelle opzioni dell'origine dati è applicabile con le eccezioni seguenti:
-
rowTag
: poiché è presente un solo record XML, l'opzionerowTag
non è applicabile. -
mode
(impostazione predefinita:PERMISSIVE
): consente una modalità per gestire i record danneggiati durante l'analisi.-
PERMISSIVE
: quando soddisfa un record danneggiato, inserisce la stringa in formato non valido in un campo configurato dacolumnNameOfCorruptRecord
e imposta i campi in formato non valido sunull
. Per mantenere i record danneggiati, è possibile set un campo di tipo stringa denominatocolumnNameOfCorruptRecord
in un schemadefinito dall'utente. Se un schema non dispone del campo, elimina i record danneggiati durante l'analisi. Quando si deduce un schema, viene aggiunto in modo implicito un campocolumnNameOfCorruptRecord
in un output schema. -
FAILFAST
: genera un'eccezione quando soddisfa i record danneggiati.
-
Conversione della struttura
A causa delle differenze di struttura tra DataFrame e XML, esistono alcune regole di conversione da dati XML a DataFrame
e da DataFrame
a dati XML. Si noti che la gestione degli attributi può essere disabilitata con l'opzione excludeAttribute
.
Conversione da XML a dataframe
Attributi: gli attributi vengono convertiti come campi con il prefisso attributePrefix
dell'intestazione .
<one myOneAttrib="AAAA">
<two>two</two>
<three>three</three>
</one>
produce una schema seguente:
root
|-- _myOneAttrib: string (nullable = true)
|-- two: string (nullable = true)
|-- three: string (nullable = true)
Dati di tipo carattere in un elemento contenente attributi o elementi figlio: vengono analizzati nel valueTag
campo. Se sono presenti più occorrenze di dati di tipo carattere, il valueTag
campo viene convertito in un array
tipo.
<one>
<two myTwoAttrib="BBBBB">two</two>
some value between elements
<three>three</three>
some other value between elements
</one>
produce una schema seguente:
root
|-- _VALUE: array (nullable = true)
| |-- element: string (containsNull = true)
|-- two: struct (nullable = true)
| |-- _VALUE: string (nullable = true)
| |-- _myTwoAttrib: string (nullable = true)
|-- three: string (nullable = true)
Conversione da dataframe a XML
Elemento come matrice in una matrice: scrittura di un file XML da DataFrame
having un campo ArrayType
con il relativo elemento come ArrayType
avrebbe un campo annidato aggiuntivo per l'elemento. Ciò non avviene durante la lettura e la scrittura di dati XML, ma la scrittura di una DataFrame
lettura da altre origini. Pertanto, il round trip nella lettura e scrittura di file XML ha la stessa struttura, ma la scrittura di una DataFrame
lettura da altre origini è possibile avere una struttura diversa.
DataFrame con un schema sotto:
|-- a: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)
e con i dati seguenti:
+------------------------------------+
| a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+
produce un file XML seguente:
<a>
<item>aa</item>
</a>
<a>
<item>bb</item>
</a>
Il nome dell'elemento della matrice senza nome in DataFrame
è specificato dall'opzione arrayElementName
(Default: item
).
column dati salvati
I dati salvati column garantiscono che non si perdano né si manchino mai i dati durante l'ETL. È possibile abilitare i dati salvati column per acquisire tutti i dati che non sono stati analizzati perché uno o più campi in un record presentano uno dei problemi seguenti:
- Assente dal schema fornito
- Non corrisponde al tipo di dati del schema fornito
- Ha una mancata corrispondenza tra maiuscole e minuscole con i nomi dei campi nel schema specificato
I dati salvati column vengono restituiti come documento JSON contenente i columns salvati e il percorso del file di origine del record. Per remove il percorso del file di origine dai dati salvati column, è possibile set la configurazione SQL seguente:
Python
spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")
Scala
spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false").
È possibile abilitare il column dei dati salvati impostando l'opzione rescuedDataColumn
su un nome di column durante la lettura dei dati, ad esempio _rescued_data
con spark.read.option("rescuedDataColumn", "_rescued_data").format("xml").load(<path>)
.
Il parser XML supporta tre modalità durante l'analisi dei record: PERMISSIVE
, DROPMALFORMED
e FAILFAST
. Se usato insieme a rescuedDataColumn
, le mancate corrispondenze del tipo di dati non causano l'esclusione dei record in modalità DROPMALFORMED
oppure generano un errore in modalità FAILFAST
. Solo i record danneggiati (XML incompleto o non valido) vengono eliminati o generati errori.
Schema inferenza ed evoluzione in Auto Loader
Per una descrizione dettagliata di questo argomento e delle opzioni applicabili, vedere Configurazione schema di inferenza ed evoluzione in Auto Loader. È possibile configurare il Caricatore Automatico per rilevare automaticamente il schema dei dati XML caricati, consentendo di inizializzare tables senza dichiarare esplicitamente i dati schema ed evolvere il tableschema man mano che vengono introdotti nuovi columns. In questo modo si elimina la necessità di tenere traccia e applicare manualmente le modifiche schema nel tempo.
Per impostazione predefinita, l'inferenza del Caricatore Automatico schema cerca di evitare problemi di evoluzione schema a causa di mancata corrispondenza di tipo. Per i formati che non codificano i tipi di dati (JSON, CSV e XML), il caricatore automatico deduce tutti i columns come stringhe, inclusi i campi annidati nei file XML. Il DataFrameReader
Apache Spark utilizza un comportamento diverso per l'inferenza schema, selezionando i tipi di dati per columns nelle origini XML in base ai dati campione. Per abilitare questo comportamento con Auto Loader, set l'opzione cloudFiles.inferColumnTypes
per true
.
Auto Loader rileva l'aggiunta di nuovi columns man mano che elabora i dati. Quando il caricatore automatico rileva un nuovo column, il flusso si arresta con un UnknownFieldException
. Prima che il flusso generi questo errore, Auto Loader esegue l'inferenza schema sul micro-batch di dati più recente e aggiorna la posizione schema con l'ultimo schema aggiungendo i nuovi columns alla fine del schema. I tipi di dati di columns esistenti rimangono invariati. Il caricatore automatico supporta diverse modalità di per l'evoluzione schema, set nell'opzione cloudFiles.schemaEvolutionMode
.
È possibile utilizzare i suggerimenti schema per applicare le informazioni schema che conoscete e prevedete su un schemadedotto. Quando si sa che un column è di un tipo di dato specifico, o se si desidera optare per un tipo di dato più generale (ad esempio, un double invece di un integer), è possibile fornire, sotto forma di stringa, un numero arbitrario di indicazioni per i tipi di dato column utilizzando la sintassi di specificazione SQL schema. Quando i dati recuperati column sono abilitati, i campi denominati in un formato diverso da quello di schema vengono caricati nel _rescued_data
column. È possibile modificare questo comportamento impostando l'opzione readerCaseSensitive
su false
, nel qual caso il caricatore automatico legge i dati in modo senza distinzione tra maiuscole e minuscole.
Esempi
Gli esempi in questa sezione usano un file XML disponibile per il download nel repository GitHub di Apache Spark.
Leggere e scrivere codice XML
Python
df = (spark.read
.format('xml')
.options(rowTag='book')
.load(xmlPath)) # books.xml
selected_data = df.select("author", "_id")
(selected_data.write
.options(rowTag='book', rootTag='books')
.xml('newbooks.xml'))
Scala
val df = spark.read
.option("rowTag", "book")
.xml(xmlPath) // books.xml
val selectedData = df.select("author", "_id")
selectedData.write
.option("rootTag", "books")
.option("rowTag", "book")
.xml("newbooks.xml")
R
df <- loadDF("books.xml", source = "xml", rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")
È possibile specificare manualmente il schema durante la lettura dei dati:
Python
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
custom_schema = StructType([
StructField("_id", StringType(), True),
StructField("author", StringType(), True),
StructField("description", StringType(), True),
StructField("genre", StringType(), True),
StructField("price", DoubleType(), True),
StructField("publish_date", StringType(), True),
StructField("title", StringType(), True)
])
df = spark.read.options(rowTag='book').xml('books.xml', schema = customSchema)
selected_data = df.select("author", "_id")
selected_data.write.options(rowTag='book', rootTag='books').xml('newbooks.xml')
Scala
import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}
val customSchema = StructType(Array(
StructField("_id", StringType, nullable = true),
StructField("author", StringType, nullable = true),
StructField("description", StringType, nullable = true),
StructField("genre", StringType, nullable = true),
StructField("price", DoubleType, nullable = true),
StructField("publish_date", StringType, nullable = true),
StructField("title", StringType, nullable = true)))
val df = spark.read.option("rowTag", "book").schema(customSchema).xml(xmlPath) // books.xml
val selectedData = df.select("author", "_id")
selectedData.write.option("rootTag", "books").option("rowTag", "book").xml("newbooks.xml")
R
customSchema <- structType(
structField("_id", "string"),
structField("author", "string"),
structField("description", "string"),
structField("genre", "string"),
structField("price", "double"),
structField("publish_date", "string"),
structField("title", "string"))
df <- loadDF("books.xml", source = "xml", schema = customSchema, rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")
API SQL
L'origine dati XML può dedurre i tipi di dati:
DROP TABLE IF EXISTS books;
CREATE TABLE books
USING XML
OPTIONS (path "books.xml", rowTag "book");
SELECT * FROM books;
È possibile specificare anche nomi e tipi column in DDL. In questo caso, il schema non viene dedotto automaticamente.
DROP TABLE IF EXISTS books;
CREATE TABLE books (author string, description string, genre string, _id string,
price double, publish_date string, title string)
USING XML
OPTIONS (path "books.xml", rowTag "book");
Carica XML utilizzando COPY INTO
DROP TABLE IF EXISTS books;
CREATE TABLE IF NOT EXISTS books;
COPY INTO books
FROM "/FileStore/xmltestDir/input/books.xml"
FILEFORMAT = XML
FORMAT_OPTIONS ('mergeSchema' = 'true', 'rowTag' = 'book')
COPY_OPTIONS ('mergeSchema' = 'true');
Leggere il codice XML con la convalida delle righe
Python
df = (spark.read
.format("xml")
.option("rowTag", "book")
.option("rowValidationXSDPath", xsdPath)
.load(inputPath))
df.printSchema()
Scala
val df = spark.read
.option("rowTag", "book")
.option("rowValidationXSDPath", xsdPath)
.xml(inputPath)
df.printSchema
Analizzare xml annidato (from_xml e schema_of_xml)
Python
from pyspark.sql.functions import from_xml, schema_of_xml, lit, col
xml_data = """
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>
"""
df = spark.createDataFrame([(8, xml_data)], ["number", "payload"])
schema = schema_of_xml(df.select("payload").limit(1).collect()[0][0])
parsed = df.withColumn("parsed", from_xml(col("payload"), schema))
parsed.printSchema()
parsed.show()
Scala
import org.apache.spark.sql.functions.{from_xml,schema_of_xml,lit}
val xmlData = """
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>""".stripMargin
val df = Seq((8, xmlData)).toDF("number", "payload")
val schema = schema_of_xml(xmlData)
val parsed = df.withColumn("parsed", from_xml($"payload", schema))
parsed.printSchema()
parsed.show()
from_xml e schema_of_xml con l'API SQL
SELECT from_xml('
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>',
schema_of_xml('
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>')
);
Caricare codice XML con caricatore automatico
Python
query = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "xml")
.option("rowTag", "book")
.option("cloudFiles.inferColumnTypes", True)
.option("cloudFiles.schemaLocation", schemaPath)
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load(inputPath)
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", checkPointPath)
.trigger(availableNow=True)
.toTable("table_name")
)
Scala
val query = spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "xml")
.option("rowTag", "book")
.option("cloudFiles.inferColumnTypes", true)
.option("cloudFiles.schemaLocation", schemaPath)
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load(inputPath)
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", checkPointPath)
.trigger(Trigger.AvailableNow()
.toTable("table_name")
)