Condividi tramite


Leggere e scrivere dati XML usando la spark-xml libreria

Importante

Questa documentazione è stata ritirata e potrebbe non essere aggiornata. I prodotti, i servizi o le tecnologie menzionati int il suo contenuto non sono ufficialmente approvati o testati da Databricks.

Il supporto del formato di file XML nativo è disponibile come anteprima pubblica. Vedere Leggere e scrivere file XML.

Questo articolo descrive come leggere e scrivere un file XML come origine dati Apache Spark.

Requisiti

  1. Creare la spark-xml libreria come libreria Maven. Per la coordinata Maven, specificare:

    • Databricks Runtime 7.x e versioni successive: com.databricks:spark-xml_2.12:<release>

    Vedere spark-xml Versioni per la versione più recente di <release>.

  2. Installare la libreria in un cluster.

Esempio

Nell'esempio riportato in questa sezione viene utilizzato il file XML dei libri .

  1. Recuperare il file XML della documentazione:

    $ wget https://github.com/databricks/spark-xml/raw/master/src/test/resources/books.xml
    
  2. Caricare il file in DBFS.

Leggere e scrivere dati XML

SQL

/*Infer schema*/

CREATE TABLE books
USING xml
OPTIONS (path "dbfs:/books.xml", rowTag "book")

/*Specify column names and types*/

CREATE TABLE books (author string, description string, genre string, _id string, price double, publish_date string, title string)
USING xml
OPTIONS (path "dbfs:/books.xml", rowTag "book")

Scala

// Infer schema

import com.databricks.spark.xml._ // Add the DataFrame.read.xml() method

val df = spark.read
  .option("rowTag", "book")
  .xml("dbfs:/books.xml")

val selectedData = df.select("author", "_id")
selectedData.write
  .option("rootTag", "books")
  .option("rowTag", "book")
  .xml("dbfs:/newbooks.xml")

// Specify schema

import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}

val customSchema = StructType(Array(
  StructField("_id", StringType, nullable = true),
  StructField("author", StringType, nullable = true),
  StructField("description", StringType, nullable = true),
  StructField("genre", StringType, nullable = true),
  StructField("price", DoubleType, nullable = true),
  StructField("publish_date", StringType, nullable = true),
  StructField("title", StringType, nullable = true)))

val df = spark.read
  .option("rowTag", "book")
  .schema(customSchema)
  .xml("books.xml")

val selectedData = df.select("author", "_id")
selectedData.write
  .option("rootTag", "books")
  .option("rowTag", "book")
  .xml("dbfs:/newbooks.xml")

R

# Infer schema

library(SparkR)

sparkR.session("local[4]", sparkPackages = c("com.databricks:spark-xml_2.12:<release>"))

df <- read.df("dbfs:/books.xml", source = "xml", rowTag = "book")

# Default `rootTag` and `rowTag`
write.df(df, "dbfs:/newbooks.xml", "xml")

# Specify schema

customSchema <- structType(
  structField("_id", "string"),
  structField("author", "string"),
  structField("description", "string"),
  structField("genre", "string"),
  structField("price", "double"),
  structField("publish_date", "string"),
  structField("title", "string"))

df <- read.df("dbfs:/books.xml", source = "xml", schema = customSchema, rowTag = "book")

# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
write.df(df, "dbfs:/newbooks.xml", "xml", "overwrite")

Opzioni

  • Lettura
    • path: percorso dei file XML. Accetta espressioni globbing Hadoop standard.
    • rowTag: tag di riga da considerare come riga. In questo codice XML <books><book><book>...</books>, ad esempio, il valore sarà book. Il valore predefinito è ROW.
    • samplingRatio: rapporto di campionamento per l'inferenza dello schema (0,0 ~ 1). Il valore predefinito è 1. I tipi possibili sono StructType, StringTypeArrayType, LongTypeTimestampType DoubleTypeBooleanTypee NullType, a meno che non si fornisca uno schema.
    • excludeAttribute: indica se escludere gli attributi negli elementi. Il valore predefinito è false.
    • nullValue: valore da considerare come null valore. Il valore predefinito è "".
    • mode: modalità per la gestione dei record danneggiati. Il valore predefinito è PERMISSIVE.
      • PERMISSIVE:
        • Quando rileva un record danneggiato, imposta tutti i campi su null e inserisce la stringa in formato non valido in un nuovo campo configurato da columnNameOfCorruptRecord.
        • Quando rileva un campo del tipo di dati errato, imposta il campo che causa l'errore su null.
      • DROPMALFORMED: ignora i record danneggiati.
      • FAILFAST: genera un'eccezione quando rileva record danneggiati.
    • inferSchema: se true, tenta di dedurre un tipo appropriato per ogni colonna dataframe risultante, ad esempio un tipo booleano, numerico o date. Se false, tutte le colonne risultanti sono di tipo stringa. Il valore predefinito è true.
    • columnNameOfCorruptRecord: nome del nuovo campo in cui vengono archiviate stringhe in formato non valido. Il valore predefinito è _corrupt_record.
    • attributePrefix: prefisso per gli attributi in modo da distinguere attributi ed elementi. Si tratta del prefisso per i nomi dei campi. Il valore predefinito è _.
    • valueTag: tag usato per il valore quando sono presenti attributi in un elemento senza elementi figlio. Il valore predefinito è _VALUE.
    • charset: il valore predefinito è UTF-8 ma può essere impostato su altri nomi di set di caratteri validi.
    • ignoreSurroundingSpaces: indica se gli spazi vuoti circostanti devono essere ignorati. Il valore predefinito è false.
    • rowValidationXSDPath: percorso di un file XSD usato per convalidare il codice XML per ogni riga. Le righe che non riescono a convalidare vengono considerate come gli errori di analisi come sopra. L'XSD non influisce in caso contrario sullo schema fornito o dedotto. Se lo stesso percorso locale non è già visibile negli executor nel cluster, il file XSD e gli altri da cui dipende devono essere aggiunti agli executor Spark con SparkContext.addFile. In questo caso, per usare XSD /foo/bar.xsdlocale, chiamare addFile("/foo/bar.xsd") e passare "bar.xsd" come rowValidationXSDPath.
  • Scrittura
    • path: percorso in cui scrivere file.
    • rowTag: tag di riga da considerare come riga. In questo codice XML <books><book><book>...</books>, ad esempio, il valore sarà book. Il valore predefinito è ROW.
    • rootTag: tag radice da considerare come radice. In questo codice XML <books><book><book>...</books>, ad esempio, il valore sarà books. Il valore predefinito è ROWS.
    • nullValue: valore da scrivere null . Il valore predefinito è la stringa "null". Quando "null", non scrive attributi ed elementi per i campi.
    • attributePrefix: prefisso per gli attributi per distinguere attributi ed elementi. Si tratta del prefisso per i nomi dei campi. Il valore predefinito è _.
    • valueTag: tag usato per il valore quando sono presenti attributi in un elemento senza elementi figlio. Il valore predefinito è _VALUE.
    • compression: codec di compressione da usare durante il salvataggio nel file. Deve essere il nome completo di una classe che implementa org.apache.hadoop.io.compress.CompressionCodec o uno dei nomi brevi senza distinzione tra maiuscole e minuscole (bzip2, gzip, lz4e snappy). Il valore predefinito non è una compressione.

Supporta l'utilizzo abbreviato dei nomi; È possibile usare xml anziché com.databricks.spark.xml.

Supporto XSD

È possibile convalidare singole righe rispetto a uno schema XSD usando rowValidationXSDPath.

Usare l'utilità com.databricks.spark.xml.util.XSDToSchema per estrarre uno schema di dataframe Spark da alcuni file XSD. Supporta solo tipi semplici, complessi e di sequenza, solo funzionalità XSD di base ed è sperimentale.

import com.databricks.spark.xml.util.XSDToSchema
import java.nio.file.Paths

val schema = XSDToSchema.read(Paths.get("/path/to/your.xsd"))
val df = spark.read.schema(schema)....xml(...)

Analizzare xml annidato

Sebbene venga usato principalmente per convertire un file XML in un dataframe, è anche possibile usare il from_xml metodo per analizzare il codice XML in una colonna con valori di stringa in un dataframe esistente e aggiungerlo come nuova colonna con risultati analizzati come struct con:

import com.databricks.spark.xml.functions.from_xml
import com.databricks.spark.xml.schema_of_xml
import spark.implicits._

val df = ... /// DataFrame with XML in column 'payload'
val payloadSchema = schema_of_xml(df.select("payload").as[String])
val parsed = df.withColumn("parsed", from_xml($"payload", payloadSchema))

Nota

  • mode:
    • Se impostato su PERMISSIVE, il valore predefinito, la modalità di analisi viene invece impostata su DROPMALFORMED. Se si include una colonna nello schema per from_xml che corrisponde a , PERMISSIVE la columnNameOfCorruptRecordmodalità restituisce record in formato non valido a tale colonna nello struct risultante.
    • Se impostato su DROPMALFORMED, i valori XML che non analizzano correttamente generano un null valore per la colonna. Nessuna riga viene eliminata.
  • from_xml converte matrici di stringhe contenenti XML in matrici di struct analizzati. Utilizzare invece schema_of_xml_array.
  • from_xml_string è un'alternativa per l'uso in funzioni definite dall'utente che operano direttamente su una stringa anziché su una colonna.

Regole di conversione

A causa delle differenze strutturali tra dataframe e XML, esistono alcune regole di conversione da dati XML a dataframe e da dataframe a dati XML. È possibile disabilitare la gestione degli attributi con l'opzione excludeAttribute.

Convertire XML in dataframe

  • Attributi: gli attributi vengono convertiti come campi con il prefisso specificato nell'opzione attributePrefix . Se attributePrefix è _, il documento

    <one myOneAttrib="AAAA">
        <two>two</two>
        <three>three</three>
    </one>
    

    produce lo schema:

    root
    |-- _myOneAttrib: string (nullable = true)
    |-- two: string (nullable = true)
    |-- three: string (nullable = true)
    
  • Se un elemento ha attributi ma nessun elemento figlio, il valore dell'attributo viene inserito in un campo separato specificato nell'opzione valueTag . Se valueTag è _VALUE, il documento

    <one>
        <two myTwoAttrib="BBBBB">two</two>
        <three>three</three>
    </one>
    

    produce lo schema:

    root
    |-- two: struct (nullable = true)
    |    |-- _VALUE: string (nullable = true)
    |    |-- _myTwoAttrib: string (nullable = true)
    |-- three: string (nullable = true)
    

Convertire un dataframe in XML

Scrittura di un file XML dal dataframe con un campo ArrayType con il relativo elemento come ArrayType se fosse presente un campo annidato aggiuntivo per l'elemento. Ciò non avviene durante la lettura e la scrittura di dati XML, ma nella scrittura di un dataframe letto da altre origini. Pertanto, il round trip in lettura e scrittura di file XML ha la stessa struttura, ma la scrittura di un dataframe letto da altre origini è possibile avere una struttura diversa.

DataFrame con lo schema:

 |-- a: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)

e dati:

+------------------------------------+
|                                   a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+

produce il file XML:

<a>
  <item>aa</item>
</a>
<a>
  <item>bb</item>
</a>