Leggere e scrivere dati XML usando la spark-xml
libreria
Importante
Questa documentazione è stata ritirata e potrebbe non essere aggiornata. I prodotti, i servizi o le tecnologie menzionati int il suo contenuto non sono ufficialmente approvati o testati da Databricks.
Il supporto del formato di file XML nativo è disponibile come anteprima pubblica. Vedere Leggere e scrivere file XML.
Questo articolo descrive come leggere e scrivere un file XML come origine dati Apache Spark.
Requisiti
Creare la
spark-xml
libreria come libreria Maven. Per la coordinata Maven, specificare:- Databricks Runtime 7.x e versioni successive:
com.databricks:spark-xml_2.12:<release>
Vedere
spark-xml
Versioni per la versione più recente di<release>
.- Databricks Runtime 7.x e versioni successive:
Installare la libreria in un cluster.
Esempio
Nell'esempio riportato in questa sezione viene utilizzato il file XML dei libri .
Recuperare il file XML della documentazione:
$ wget https://github.com/databricks/spark-xml/raw/master/src/test/resources/books.xml
Caricare il file in DBFS.
Leggere e scrivere dati XML
SQL
/*Infer schema*/
CREATE TABLE books
USING xml
OPTIONS (path "dbfs:/books.xml", rowTag "book")
/*Specify column names and types*/
CREATE TABLE books (author string, description string, genre string, _id string, price double, publish_date string, title string)
USING xml
OPTIONS (path "dbfs:/books.xml", rowTag "book")
Scala
// Infer schema
import com.databricks.spark.xml._ // Add the DataFrame.read.xml() method
val df = spark.read
.option("rowTag", "book")
.xml("dbfs:/books.xml")
val selectedData = df.select("author", "_id")
selectedData.write
.option("rootTag", "books")
.option("rowTag", "book")
.xml("dbfs:/newbooks.xml")
// Specify schema
import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}
val customSchema = StructType(Array(
StructField("_id", StringType, nullable = true),
StructField("author", StringType, nullable = true),
StructField("description", StringType, nullable = true),
StructField("genre", StringType, nullable = true),
StructField("price", DoubleType, nullable = true),
StructField("publish_date", StringType, nullable = true),
StructField("title", StringType, nullable = true)))
val df = spark.read
.option("rowTag", "book")
.schema(customSchema)
.xml("books.xml")
val selectedData = df.select("author", "_id")
selectedData.write
.option("rootTag", "books")
.option("rowTag", "book")
.xml("dbfs:/newbooks.xml")
R
# Infer schema
library(SparkR)
sparkR.session("local[4]", sparkPackages = c("com.databricks:spark-xml_2.12:<release>"))
df <- read.df("dbfs:/books.xml", source = "xml", rowTag = "book")
# Default `rootTag` and `rowTag`
write.df(df, "dbfs:/newbooks.xml", "xml")
# Specify schema
customSchema <- structType(
structField("_id", "string"),
structField("author", "string"),
structField("description", "string"),
structField("genre", "string"),
structField("price", "double"),
structField("publish_date", "string"),
structField("title", "string"))
df <- read.df("dbfs:/books.xml", source = "xml", schema = customSchema, rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
write.df(df, "dbfs:/newbooks.xml", "xml", "overwrite")
Opzioni
- Lettura
path
: percorso dei file XML. Accetta espressioni globbing Hadoop standard.rowTag
: tag di riga da considerare come riga. In questo codice XML<books><book><book>...</books>
, ad esempio, il valore saràbook
. Il valore predefinito èROW
.samplingRatio
: rapporto di campionamento per l'inferenza dello schema (0,0 ~ 1). Il valore predefinito è 1. I tipi possibili sonoStructType
,StringType
ArrayType
,LongType
TimestampType
DoubleType
BooleanType
eNullType
, a meno che non si fornisca uno schema.excludeAttribute
: indica se escludere gli attributi negli elementi. Il valore predefinito è false.nullValue
: valore da considerare comenull
valore. Il valore predefinito è""
.mode
: modalità per la gestione dei record danneggiati. Il valore predefinito èPERMISSIVE
.PERMISSIVE
:- Quando rileva un record danneggiato, imposta tutti i campi su
null
e inserisce la stringa in formato non valido in un nuovo campo configurato dacolumnNameOfCorruptRecord
. - Quando rileva un campo del tipo di dati errato, imposta il campo che causa l'errore su
null
.
- Quando rileva un record danneggiato, imposta tutti i campi su
DROPMALFORMED
: ignora i record danneggiati.FAILFAST
: genera un'eccezione quando rileva record danneggiati.
inferSchema
: setrue
, tenta di dedurre un tipo appropriato per ogni colonna dataframe risultante, ad esempio un tipo booleano, numerico o date. Sefalse
, tutte le colonne risultanti sono di tipo stringa. Il valore predefinito ètrue
.columnNameOfCorruptRecord
: nome del nuovo campo in cui vengono archiviate stringhe in formato non valido. Il valore predefinito è_corrupt_record
.attributePrefix
: prefisso per gli attributi in modo da distinguere attributi ed elementi. Si tratta del prefisso per i nomi dei campi. Il valore predefinito è_
.valueTag
: tag usato per il valore quando sono presenti attributi in un elemento senza elementi figlio. Il valore predefinito è_VALUE
.charset
: il valore predefinito èUTF-8
ma può essere impostato su altri nomi di set di caratteri validi.ignoreSurroundingSpaces
: indica se gli spazi vuoti circostanti devono essere ignorati. Il valore predefinito è false.rowValidationXSDPath
: percorso di un file XSD usato per convalidare il codice XML per ogni riga. Le righe che non riescono a convalidare vengono considerate come gli errori di analisi come sopra. L'XSD non influisce in caso contrario sullo schema fornito o dedotto. Se lo stesso percorso locale non è già visibile negli executor nel cluster, il file XSD e gli altri da cui dipende devono essere aggiunti agli executor Spark con SparkContext.addFile. In questo caso, per usare XSD/foo/bar.xsd
locale, chiamareaddFile("/foo/bar.xsd")
e passare"bar.xsd"
comerowValidationXSDPath
.
- Scrittura
path
: percorso in cui scrivere file.rowTag
: tag di riga da considerare come riga. In questo codice XML<books><book><book>...</books>
, ad esempio, il valore saràbook
. Il valore predefinito èROW
.rootTag
: tag radice da considerare come radice. In questo codice XML<books><book><book>...</books>
, ad esempio, il valore saràbooks
. Il valore predefinito èROWS
.nullValue
: valore da scriverenull
. Il valore predefinito è la stringa"null"
. Quando"null"
, non scrive attributi ed elementi per i campi.attributePrefix
: prefisso per gli attributi per distinguere attributi ed elementi. Si tratta del prefisso per i nomi dei campi. Il valore predefinito è_
.valueTag
: tag usato per il valore quando sono presenti attributi in un elemento senza elementi figlio. Il valore predefinito è_VALUE
.compression
: codec di compressione da usare durante il salvataggio nel file. Deve essere il nome completo di una classe che implementaorg.apache.hadoop.io.compress.CompressionCodec
o uno dei nomi brevi senza distinzione tra maiuscole e minuscole (bzip2
,gzip
,lz4
esnappy
). Il valore predefinito non è una compressione.
Supporta l'utilizzo abbreviato dei nomi; È possibile usare xml
anziché com.databricks.spark.xml
.
Supporto XSD
È possibile convalidare singole righe rispetto a uno schema XSD usando rowValidationXSDPath
.
Usare l'utilità com.databricks.spark.xml.util.XSDToSchema
per estrarre uno schema di dataframe Spark da alcuni file XSD. Supporta solo tipi semplici, complessi e di sequenza, solo funzionalità XSD di base ed è sperimentale.
import com.databricks.spark.xml.util.XSDToSchema
import java.nio.file.Paths
val schema = XSDToSchema.read(Paths.get("/path/to/your.xsd"))
val df = spark.read.schema(schema)....xml(...)
Analizzare xml annidato
Sebbene venga usato principalmente per convertire un file XML in un dataframe, è anche possibile usare il from_xml
metodo per analizzare il codice XML in una colonna con valori di stringa in un dataframe esistente e aggiungerlo come nuova colonna con risultati analizzati come struct con:
import com.databricks.spark.xml.functions.from_xml
import com.databricks.spark.xml.schema_of_xml
import spark.implicits._
val df = ... /// DataFrame with XML in column 'payload'
val payloadSchema = schema_of_xml(df.select("payload").as[String])
val parsed = df.withColumn("parsed", from_xml($"payload", payloadSchema))
Nota
mode
:- Se impostato su
PERMISSIVE
, il valore predefinito, la modalità di analisi viene invece impostata suDROPMALFORMED
. Se si include una colonna nello schema perfrom_xml
che corrisponde a ,PERMISSIVE
lacolumnNameOfCorruptRecord
modalità restituisce record in formato non valido a tale colonna nello struct risultante. - Se impostato su
DROPMALFORMED
, i valori XML che non analizzano correttamente generano unnull
valore per la colonna. Nessuna riga viene eliminata.
- Se impostato su
from_xml
converte matrici di stringhe contenenti XML in matrici di struct analizzati. Utilizzare inveceschema_of_xml_array
.from_xml_string
è un'alternativa per l'uso in funzioni definite dall'utente che operano direttamente su una stringa anziché su una colonna.
Regole di conversione
A causa delle differenze strutturali tra dataframe e XML, esistono alcune regole di conversione da dati XML a dataframe e da dataframe a dati XML. È possibile disabilitare la gestione degli attributi con l'opzione excludeAttribute
.
Convertire XML in dataframe
Attributi: gli attributi vengono convertiti come campi con il prefisso specificato nell'opzione
attributePrefix
. SeattributePrefix
è_
, il documento<one myOneAttrib="AAAA"> <two>two</two> <three>three</three> </one>
produce lo schema:
root |-- _myOneAttrib: string (nullable = true) |-- two: string (nullable = true) |-- three: string (nullable = true)
Se un elemento ha attributi ma nessun elemento figlio, il valore dell'attributo viene inserito in un campo separato specificato nell'opzione
valueTag
. SevalueTag
è_VALUE
, il documento<one> <two myTwoAttrib="BBBBB">two</two> <three>three</three> </one>
produce lo schema:
root |-- two: struct (nullable = true) | |-- _VALUE: string (nullable = true) | |-- _myTwoAttrib: string (nullable = true) |-- three: string (nullable = true)
Convertire un dataframe in XML
Scrittura di un file XML dal dataframe con un campo ArrayType
con il relativo elemento come ArrayType
se fosse presente un campo annidato aggiuntivo per l'elemento. Ciò non avviene durante la lettura e la scrittura di dati XML, ma nella scrittura di un dataframe letto da altre origini. Pertanto, il round trip in lettura e scrittura di file XML ha la stessa struttura, ma la scrittura di un dataframe letto da altre origini è possibile avere una struttura diversa.
DataFrame con lo schema:
|-- a: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)
e dati:
+------------------------------------+
| a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+
produce il file XML:
<a>
<item>aa</item>
</a>
<a>
<item>bb</item>
</a>