Condividi tramite


Leggere e scrivere file XML

Importante

Questa funzionalità è disponibile in anteprima pubblica.

Questo articolo descrive come leggere e scrivere file XML.

Extensible Markup Language (XML) è un linguaggio di markup per la formattazione, l'archiviazione e la condivisione dei dati in formato testuale. Stabilisce un "set" di regole per la serializzazione dei dati, da documenti a strutture di dati arbitrarie.

Il supporto del formato di file XML nativo consente l'inserimento, l'esecuzione di query e l'analisi dei dati XML per l'elaborazione batch o lo streaming. Può dedurre ed evolvere automaticamente schema e tipi di dati, supporta espressioni SQL come from_xmle può generate documenti XML. Non richiede file JAR esterni e funziona perfettamente con il caricatore read_files automatico e COPY INTO. Facoltativamente, è possibile convalidare ogni record XML a livello di riga in base a una definizione XSD (XML Schema Definition).

Requisiti

Databricks Runtime 14.3 e versioni successive

Analizzare i record XML

La specifica XML impone una struttura ben formata. Tuttavia, questa specifica non esegue immediatamente il mapping a un formato tabulare. È necessario specificare l'opzione rowTag per indicare l'elemento XML mappato a un oggetto DataFrameRow. L'elemento rowTag diventa l'elemento di primo livello struct. Gli elementi figlio di rowTag diventano i campi del livello structsuperiore.

È possibile specificare il schema per questo record o lasciarlo dedurre automaticamente. Poiché il parser esamina solo gli rowTag elementi, le entità DTD ed esterne vengono filtrate.

Gli esempi seguenti illustrano l'inferenza schema e l'analisi di un file XML usando opzioni diverse di rowTag.

Python

xmlString = """
  <books>
    <book id="bk103">
      <author>Corets, Eva</author>
      <title>Maeve Ascendant</title>
    </book>
    <book id="bk104">
      <author>Corets, Eva</author>
      <title>Oberon's Legacy</title>
    </book>
  </books>"""

xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString, True)

Scala

val xmlString = """
  <books>
    <book id="bk103">
      <author>Corets, Eva</author>
      <title>Maeve Ascendant</title>
    </book>
    <book id="bk104">
      <author>Corets, Eva</author>
      <title>Oberon's Legacy</title>
    </book>
  </books>"""
val xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString)

Leggere il file XML con rowTag l'opzione "books":

Python

df = spark.read.option("rowTag", "books").format("xml").load(xmlPath)
df.printSchema()
df.show(truncate=False)

Scala

val df = spark.read.option("rowTag", "books").xml(xmlPath)
df.printSchema()
df.show(truncate=false)

Output:

root
|-- book: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _id: string (nullable = true)
| | |-- author: string (nullable = true)
| | |-- title: string (nullable = true)

+------------------------------------------------------------------------------+
|book                                                                          |
+------------------------------------------------------------------------------+
|[{bk103, Corets, Eva, Maeve Ascendant}, {bk104, Corets, Eva, Oberon's Legacy}]|
+------------------------------------------------------------------------------+

Leggere il file XML con rowTag come "book":

Python

df = spark.read.option("rowTag", "book").format("xml").load(xmlPath)
# Infers three top-level fields and parses `book` in separate rows:

Scala

val df = spark.read.option("rowTag", "book").xml(xmlPath)
// Infers three top-level fields and parses `book` in separate rows:

Output:

root
|-- _id: string (nullable = true)
|-- author: string (nullable = true)
|-- title: string (nullable = true)

+-----+-----------+---------------+
|_id  |author     |title          |
+-----+-----------+---------------+
|bk103|Corets, Eva|Maeve Ascendant|
|bk104|Corets, Eva|Oberon's Legacy|
+-----+-----------+---------------+

Opzioni origine dati

È possibile specificare le opzioni dell'origine dati per XML nei modi seguenti:

Per un list di opzioni, vedere Opzioni del caricatore automatico.

Supporto XSD

Facoltativamente, è possibile convalidare ogni record XML a livello di riga da un file XSD (XML Schema Definition). Il file XSD viene specificato nell'opzione rowValidationXSDPath . L'XSD non influisce altrimenti sul schema fornito o dedotto. Un record che non riesce la convalida viene contrassegnato come "danneggiato" e gestito in base all'opzione modalità di gestione dei record danneggiata descritta nella sezione dell'opzione.

È possibile usare XSDToSchema per estrarre un dataframe Spark schema da un file XSD. Supporta solo tipi di sequenza semplici, complessi e supporta solo le funzionalità XSD di base.

import org.apache.spark.sql.execution.datasources.xml.XSDToSchema
import org.apache.hadoop.fs.Path

val xsdPath = "dbfs:/tmp/books.xsd"
val xsdString = """<?xml version="1.0" encoding="UTF-8" ?>
  <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
    <xs:element name="book">
      <xs:complexType>
        <xs:sequence>
          <xs:element name="author" type="xs:string" />
          <xs:element name="title" type="xs:string" />
          <xs:element name="genre" type="xs:string" />
          <xs:element name="price" type="xs:decimal" />
          <xs:element name="publish_date" type="xs:date" />
          <xs:element name="description" type="xs:string" />
        </xs:sequence>
        <xs:attribute name="id" type="xs:string" use="required" />
      </xs:complexType>
    </xs:element>
  </xs:schema>"""

dbutils.fs.put(xsdPath, xsdString, true)

val schema1 = XSDToSchema.read(xsdString)
val schema2 = XSDToSchema.read(new Path(xsdPath))

L'table seguente illustra la conversione dei tipi di dati XSD in tipi di dati Spark:

Tipi di dati XSD Tipi di dati Spark
boolean BooleanType
decimal DecimalType
unsignedLong DecimalType(38, 0)
double DoubleType
float FloatType
byte ByteType
short, unsignedByte ShortType
integer, negativeInteger, nonNegativeInteger, nonPositiveInteger, positiveIntegerunsignedShort IntegerType
long, unsignedInt LongType
date DateType
dateTime TimestampType
Others StringType

Analizzare xml annidato

I dati XML, rappresentati da column in un dataframe esistente, possono essere analizzati con schema_of_xml e from_xml che restituiscono il schema e i risultati analizzati come nuovi structcolumns. I dati XML passati come argomento a schema_of_xml e from_xml devono essere un singolo record XML ben formato.

schema_of_xml

Sintassi

schema_of_xml(xmlStr [, options] )

Argomenti

  • xmlStr: espressione STRING che specifica un singolo record XML ben formato.
  • options: valore letterale facoltativo MAP<STRING,STRING> che specifica le direttive.

Resi

Stringa contenente una definizione di struct con n campi di stringhe where, i cui nomi column derivano dai nomi di elementi e attributi XML. Il campo values contiene i tipi SQL derivati formattati.

from_xml

Sintassi

from_xml(xmlStr, schema [, options])

Argomenti

  • xmlStr: espressione STRING che specifica un singolo record XML ben formato.
  • schema: espressione STRING o chiamata della schema_of_xml funzione.
  • options: valore letterale facoltativo MAP<STRING,STRING> che specifica le direttive.

Resi

Una struct con nomi di campo e di tipo corrispondenti alla definizione di schema. Schema deve essere definito come coppie nome e tipo di dati delimit column ate da virgole come usato in, ad esempio, CREATE TABLE. La maggior parte delle opzioni visualizzate nelle opzioni dell'origine dati è applicabile con le eccezioni seguenti:

  • rowTag: poiché è presente un solo record XML, l'opzione rowTag non è applicabile.
  • mode (impostazione predefinita: PERMISSIVE): consente una modalità per gestire i record danneggiati durante l'analisi.
    • PERMISSIVE: quando soddisfa un record danneggiato, inserisce la stringa in formato non valido in un campo configurato da columnNameOfCorruptRecorde imposta i campi in formato non valido su null. Per mantenere i record danneggiati, è possibile set un campo di tipo stringa denominato columnNameOfCorruptRecord in un schemadefinito dall'utente. Se un schema non dispone del campo, elimina i record danneggiati durante l'analisi. Quando si deduce un schema, viene aggiunto in modo implicito un campo columnNameOfCorruptRecord in un output schema.
    • FAILFAST: genera un'eccezione quando soddisfa i record danneggiati.

Conversione della struttura

A causa delle differenze di struttura tra DataFrame e XML, esistono alcune regole di conversione da dati XML a DataFrame e da DataFrame a dati XML. Si noti che la gestione degli attributi può essere disabilitata con l'opzione excludeAttribute.

Conversione da XML a dataframe

Attributi: gli attributi vengono convertiti come campi con il prefisso attributePrefixdell'intestazione .

<one myOneAttrib="AAAA">
  <two>two</two>
  <three>three</three>
</one>

produce una schema seguente:

root
|-- _myOneAttrib: string (nullable = true)
|-- two: string (nullable = true)
|-- three: string (nullable = true)

Dati di tipo carattere in un elemento contenente attributi o elementi figlio: vengono analizzati nel valueTag campo. Se sono presenti più occorrenze di dati di tipo carattere, il valueTag campo viene convertito in un array tipo.

<one>
  <two myTwoAttrib="BBBBB">two</two>
  some value between elements
  <three>three</three>
  some other value between elements
</one>

produce una schema seguente:

root
 |-- _VALUE: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- two: struct (nullable = true)
 |    |-- _VALUE: string (nullable = true)
| |-- _myTwoAttrib: string (nullable = true)
|-- three: string (nullable = true)

Conversione da dataframe a XML

Elemento come matrice in una matrice: scrittura di un file XML da DataFramehaving un campo ArrayType con il relativo elemento come ArrayType avrebbe un campo annidato aggiuntivo per l'elemento. Ciò non avviene durante la lettura e la scrittura di dati XML, ma la scrittura di una DataFrame lettura da altre origini. Pertanto, il round trip nella lettura e scrittura di file XML ha la stessa struttura, ma la scrittura di una DataFrame lettura da altre origini è possibile avere una struttura diversa.

DataFrame con un schema sotto:

|-- a: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)

e con i dati seguenti:

+------------------------------------+
| a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+

produce un file XML seguente:

<a>
  <item>aa</item>
</a>
<a>
  <item>bb</item>
</a>

Il nome dell'elemento della matrice senza nome in DataFrame è specificato dall'opzione arrayElementName (Default: item).

column dati salvati

I dati salvati column garantiscono che non si perdano né si manchino mai i dati durante l'ETL. È possibile abilitare i dati salvati column per acquisire tutti i dati che non sono stati analizzati perché uno o più campi in un record presentano uno dei problemi seguenti:

  • Assente dal schema fornito
  • Non corrisponde al tipo di dati del schema fornito
  • Ha una mancata corrispondenza tra maiuscole e minuscole con i nomi dei campi nel schema specificato

I dati salvati column vengono restituiti come documento JSON contenente i columns salvati e il percorso del file di origine del record. Per remove il percorso del file di origine dai dati salvati column, è possibile set la configurazione SQL seguente:

Python

spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")

Scala

spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false").

È possibile abilitare il column dei dati salvati impostando l'opzione rescuedDataColumn su un nome di column durante la lettura dei dati, ad esempio _rescued_data con spark.read.option("rescuedDataColumn", "_rescued_data").format("xml").load(<path>).

Il parser XML supporta tre modalità durante l'analisi dei record: PERMISSIVE, DROPMALFORMEDe FAILFAST. Se usato insieme a rescuedDataColumn, le mancate corrispondenze del tipo di dati non causano l'esclusione dei record in modalità DROPMALFORMED oppure generano un errore in modalità FAILFAST. Solo i record danneggiati (XML incompleto o non valido) vengono eliminati o generati errori.

Schema inferenza ed evoluzione in Auto Loader

Per una descrizione dettagliata di questo argomento e delle opzioni applicabili, vedere Configurazione schema di inferenza ed evoluzione in Auto Loader. È possibile configurare il Caricatore Automatico per rilevare automaticamente il schema dei dati XML caricati, consentendo di inizializzare tables senza dichiarare esplicitamente i dati schema ed evolvere il tableschema man mano che vengono introdotti nuovi columns. In questo modo si elimina la necessità di tenere traccia e applicare manualmente le modifiche schema nel tempo.

Per impostazione predefinita, l'inferenza del Caricatore Automatico schema cerca di evitare problemi di evoluzione schema a causa di mancata corrispondenza di tipo. Per i formati che non codificano i tipi di dati (JSON, CSV e XML), il caricatore automatico deduce tutti i columns come stringhe, inclusi i campi annidati nei file XML. Il DataFrameReader Apache Spark utilizza un comportamento diverso per l'inferenza schema, selezionando i tipi di dati per columns nelle origini XML in base ai dati campione. Per abilitare questo comportamento con Auto Loader, set l'opzione cloudFiles.inferColumnTypes per true.

Auto Loader rileva l'aggiunta di nuovi columns man mano che elabora i dati. Quando il caricatore automatico rileva un nuovo column, il flusso si arresta con un UnknownFieldException. Prima che il flusso generi questo errore, Auto Loader esegue l'inferenza schema sul micro-batch di dati più recente e aggiorna la posizione schema con l'ultimo schema aggiungendo i nuovi columns alla fine del schema. I tipi di dati di columns esistenti rimangono invariati. Il caricatore automatico supporta diverse modalità di per l'evoluzione schema, set nell'opzione cloudFiles.schemaEvolutionMode.

È possibile utilizzare i suggerimenti schema per applicare le informazioni schema che conoscete e prevedete su un schemadedotto. Quando si sa che un column è di un tipo di dato specifico, o se si desidera optare per un tipo di dato più generale (ad esempio, un double invece di un integer), è possibile fornire, sotto forma di stringa, un numero arbitrario di indicazioni per i tipi di dato column utilizzando la sintassi di specificazione SQL schema. Quando i dati recuperati column sono abilitati, i campi denominati in un formato diverso da quello di schema vengono caricati nel _rescued_datacolumn. È possibile modificare questo comportamento impostando l'opzione readerCaseSensitive su false, nel qual caso il caricatore automatico legge i dati in modo senza distinzione tra maiuscole e minuscole.

Esempi

Gli esempi in questa sezione usano un file XML disponibile per il download nel repository GitHub di Apache Spark.

Leggere e scrivere codice XML

Python

df = (spark.read
  .format('xml')
  .options(rowTag='book')
  .load(xmlPath))  # books.xml

selected_data = df.select("author", "_id")
(selected_data.write
  .options(rowTag='book', rootTag='books')
  .xml('newbooks.xml'))

Scala

val df = spark.read
  .option("rowTag", "book")
  .xml(xmlPath) // books.xml

val selectedData = df.select("author", "_id")
selectedData.write
  .option("rootTag", "books")
  .option("rowTag", "book")
  .xml("newbooks.xml")

R

df <- loadDF("books.xml", source = "xml", rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")

È possibile specificare manualmente il schema durante la lettura dei dati:

Python

from pyspark.sql.types import StructType, StructField, StringType, DoubleType

custom_schema = StructType([
    StructField("_id", StringType(), True),
    StructField("author", StringType(), True),
    StructField("description", StringType(), True),
    StructField("genre", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("publish_date", StringType(), True),
    StructField("title", StringType(), True)
])
df = spark.read.options(rowTag='book').xml('books.xml', schema = customSchema)

selected_data = df.select("author", "_id")
selected_data.write.options(rowTag='book', rootTag='books').xml('newbooks.xml')

Scala

import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}

val customSchema = StructType(Array(
  StructField("_id", StringType, nullable = true),
  StructField("author", StringType, nullable = true),
  StructField("description", StringType, nullable = true),
  StructField("genre", StringType, nullable = true),
  StructField("price", DoubleType, nullable = true),
  StructField("publish_date", StringType, nullable = true),
  StructField("title", StringType, nullable = true)))
val df = spark.read.option("rowTag", "book").schema(customSchema).xml(xmlPath) // books.xml

val selectedData = df.select("author", "_id")
selectedData.write.option("rootTag", "books").option("rowTag", "book").xml("newbooks.xml")

R

customSchema <- structType(
  structField("_id", "string"),
  structField("author", "string"),
  structField("description", "string"),
  structField("genre", "string"),
  structField("price", "double"),
  structField("publish_date", "string"),
  structField("title", "string"))

df <- loadDF("books.xml", source = "xml", schema = customSchema, rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")

API SQL

L'origine dati XML può dedurre i tipi di dati:

DROP TABLE IF EXISTS books;
CREATE TABLE books
USING XML
OPTIONS (path "books.xml", rowTag "book");
SELECT * FROM books;

È possibile specificare anche nomi e tipi column in DDL. In questo caso, il schema non viene dedotto automaticamente.

DROP TABLE IF EXISTS books;

CREATE TABLE books (author string, description string, genre string, _id string,
price double, publish_date string, title string)
USING XML
OPTIONS (path "books.xml", rowTag "book");

Carica XML utilizzando COPY INTO

DROP TABLE IF EXISTS books;
CREATE TABLE IF NOT EXISTS books;

COPY INTO books
FROM "/FileStore/xmltestDir/input/books.xml"
FILEFORMAT = XML
FORMAT_OPTIONS ('mergeSchema' = 'true', 'rowTag' = 'book')
COPY_OPTIONS ('mergeSchema' = 'true');

Leggere il codice XML con la convalida delle righe

Python

df = (spark.read
    .format("xml")
    .option("rowTag", "book")
    .option("rowValidationXSDPath", xsdPath)
    .load(inputPath))
df.printSchema()

Scala

val df = spark.read
  .option("rowTag", "book")
  .option("rowValidationXSDPath", xsdPath)
  .xml(inputPath)
df.printSchema

Analizzare xml annidato (from_xml e schema_of_xml)

Python

from pyspark.sql.functions import from_xml, schema_of_xml, lit, col

xml_data = """
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>
"""

df = spark.createDataFrame([(8, xml_data)], ["number", "payload"])
schema = schema_of_xml(df.select("payload").limit(1).collect()[0][0])
parsed = df.withColumn("parsed", from_xml(col("payload"), schema))
parsed.printSchema()
parsed.show()

Scala

import org.apache.spark.sql.functions.{from_xml,schema_of_xml,lit}

val xmlData = """
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>""".stripMargin

val df = Seq((8, xmlData)).toDF("number", "payload")
val schema = schema_of_xml(xmlData)
val parsed = df.withColumn("parsed", from_xml($"payload", schema))
parsed.printSchema()
parsed.show()

from_xml e schema_of_xml con l'API SQL

SELECT from_xml('
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>',
  schema_of_xml('
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>')
);

Caricare codice XML con caricatore automatico

Python

query = (spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "xml")
  .option("rowTag", "book")
  .option("cloudFiles.inferColumnTypes", True)
  .option("cloudFiles.schemaLocation", schemaPath)
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load(inputPath)
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", checkPointPath)
  .trigger(availableNow=True)
  .toTable("table_name")
)

Scala

val query = spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "xml")
  .option("rowTag", "book")
  .option("cloudFiles.inferColumnTypes", true)
  .option("cloudFiles.schemaLocation", schemaPath)
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load(inputPath)
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", checkPointPath)
  .trigger(Trigger.AvailableNow()
  .toTable("table_name")
  )

Risorse aggiuntive

Leggere e scrivere dati XML usando la libreria spark-xml