File Avro
Apache Avro è un sistema di serializzazione dei dati. Avro fornisce:
- Strutture dei dati arricchiti.
- Formato di dati binario compatto e veloce.
- Un file contenitore per archiviare i dati persistenti.
- Remote procedure call (RPC).
- Integrazione semplice con linguaggi dinamici. La generazione del codice non è necessaria per leggere o scrivere file di dati né per usare o implementare protocolli RPC. Generazione di codice come ottimizzazione facoltativa: conviene implementarla solo per i linguaggi tipizzati in modo statico.
L'origine dati Avro supporta:
- Conversione Schema: conversione automatica dei record tra Apache Spark SQL e Avro.
- Partizionamento: leggere e scrivere facilmente dati partizionati senza alcuna configurazione aggiuntiva.
- Compressione: compressione da usare durante la scrittura di Avro su disco. I tipi supportati sono
uncompressed
,snappy
edeflate
. È anche possibile specificare il livello di deflate. - Nomi dei record: nome del record e namespace utilizzando una mappa di parameters con
recordName
erecordNamespace
.
Vedere anche Leggere e scrivere dati Avro in streaming.
Impostazione
È possibile modificare il comportamento di un'origine dati Avro usando varie configurazioni parameters.
Per ignorare i file senza l'estensione .avro
durante la lettura, è possibile set il parametro avro.mapred.ignore.inputs.without.extension
nella configurazione di Hadoop. Il valore predefinito è false
.
spark
.sparkContext
.hadoopConfiguration
.set("avro.mapred.ignore.inputs.without.extension", "true")
Per configurare la compressione durante la scrittura, set le proprietà spark seguenti:
- Codec di compressione:
spark.sql.avro.compression.codec
. I codec supportati sonosnappy
edeflate
. Il codec predefinito èsnappy
. - Se il codec di compressione è
deflate
, è possibile set il livello di compressione con:spark.sql.avro.deflate.level
. Il livello predefinito è-1
.
È possibile set queste proprietà nel cluster configurazione di Spark o in fase di esecuzione usando spark.conf.set()
. Ad esempio:
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")
Per Databricks Runtime 9.1 LTS e versioni successive, è possibile modificare il comportamento di inferenza schema predefinito in Avro fornendo l'opzione mergeSchema
durante la lettura dei file. L'impostazione di mergeSchema
su true
dedurrà un schema da un set di file Avro nella directory di destinazione e li unisce anziché dedurre il schema di lettura da un singolo file.
Tipi supportati per Avro -> Conversione spark SQL
Questa libreria supporta la lettura di tutti i tipi Avro. Per abbinare i tipi Avro ai tipi Spark SQL, usa il mapping seguente:
Tipo Avro | Tipo SQL Spark |
---|---|
boolean | BooleanType |
int | IntegerType |
long | LongType |
float | FloatType |
double | DoubleType |
bytes | BinaryType |
string | StringType |
registra | StructType |
enum | StringType |
array | ArrayType |
mappa | MapType |
fixed | BinaryType |
union | Vedere Tipi unione. |
Tipi unione
L'origine dati Avro supporta i tipi di lettura union
. Avro presuppone che i tre tipi seguenti siano tipi union
:
-
union(int, long)
esegue il mapping aLongType
. -
union(float, double)
esegue il mapping aDoubleType
. -
union(something, null)
, wheresomething
è qualsiasi tipo Avro supportato. Corrisponde allo stesso tipo Spark SQL disomething
, connullable
set atrue
.
Tutti gli altri tipi union
sono tipi complessi. I nomi di campo StructType
,where, member0
, member1
e così via vengono mappati in base ai membri del union
. Questo comportamento è coerente con il comportamento durante la conversione tra Avro e Parquet.
Tipi logici
L'origine dati Avro supporta la lettura dei seguenti tipi logici Avro:
Tipo logico Avro | Tipo Avro | Tipo SQL Spark |
---|---|---|
data | int | DateType |
timestamp-millis | long | TimestampType |
timestamp-micros | long | TimestampType |
decimale | fixed | DecimalType |
decimale | bytes | DecimalType |
Nota
L'origine dati Avro ignora documenti, alias e altre proprietà presenti nel file Avro.
Tipi supportati per Spark SQL -> Conversione Avro
Questa libreria supporta la scrittura di tutti i tipi Spark SQL in Avro. Per la maggior parte dei tipi, il mapping dai tipi Spark ai tipi Avro è semplice (per esempio IntegerType
viene convertito in int
); segue un list dei pochi casi speciali:
Tipo SQL Spark | Tipo Avro | Tipo logico Avro |
---|---|---|
ByteType | int | |
ShortType | int | |
BinaryType | bytes | |
DecimalType | fixed | decimale |
TimestampType | long | timestamp-micros |
DateType | int | data |
È anche possibile specificare l'intero output schema Avro con l'opzione avroSchema
, in modo che i tipi Spark SQL possano essere convertiti in altri tipi Avro.
Le conversioni seguenti non vengono applicate per impostazione predefinita e richiedono che l'utente specifichi Avro schema:
Tipo SQL Spark | Tipo Avro | Tipo logico Avro |
---|---|---|
ByteType | fixed | |
StringType | enum | |
DecimalType | bytes | decimale |
TimestampType | long | timestamp-millis |
Esempi
Questi esempi usano il file episodes.avro.
Scala
// The Avro records are converted to Spark types, filtered, and
// then written back out as Avro records
val df = spark.read.format("avro").load("/tmp/episodes.avro")
df.filter("doctor > 5").write.format("avro").save("/tmp/output")
Questo esempio illustra un schemaAvro personalizzato:
import org.apache.avro.Schema
val schema = new Schema.Parser().parse(new File("episode.avsc"))
spark
.read
.format("avro")
.option("avroSchema", schema.toString)
.load("/tmp/episodes.avro")
.show()
Questo esempio illustra le opzioni di compressione Avro:
// configuration to use deflate compression
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")
val df = spark.read.format("avro").load("/tmp/episodes.avro")
// writes out compressed Avro records
df.write.format("avro").save("/tmp/output")
Questo esempio illustra i record Avro partizionati:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().master("local").getOrCreate()
val df = spark.createDataFrame(
Seq(
(2012, 8, "Batman", 9.8),
(2012, 8, "Hero", 8.7),
(2012, 7, "Robot", 5.5),
(2011, 7, "Git", 2.0))
).toDF("year", "month", "title", "rating")
df.toDF.write.format("avro").partitionBy("year", "month").save("/tmp/output")
Questo esempio illustra il nome del record e lo spazio dei nomi:
val df = spark.read.format("avro").load("/tmp/episodes.avro")
val name = "AvroTest"
val namespace = "org.foo"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)
df.write.options(parameters).format("avro").save("/tmp/output")
Python
# Create a DataFrame from a specified directory
df = spark.read.format("avro").load("/tmp/episodes.avro")
# Saves the subset of the Avro records read in
subset = df.where("doctor > 5")
subset.write.format("avro").save("/tmp/output")
SQL
Per eseguire query sui dati Avro in SQL, registrare il file di dati come table o una vista temporanea.
CREATE TEMPORARY VIEW episodes
USING avro
OPTIONS (path "/tmp/episodes.avro")
SELECT * from episodes
Esempio di notebook: leggere e scrivere file Avro
Il notebook seguente illustra come leggere e scrivere file Avro.