Condividi tramite


File Avro

Apache Avro è un sistema di serializzazione dei dati. Avro fornisce:

  • Strutture dei dati arricchiti.
  • Formato di dati binario compatto e veloce.
  • Un file contenitore per archiviare i dati persistenti.
  • Remote procedure call (RPC).
  • Integrazione semplice con linguaggi dinamici. La generazione del codice non è necessaria per leggere o scrivere file di dati né per usare o implementare protocolli RPC. Generazione di codice come ottimizzazione facoltativa: conviene implementarla solo per i linguaggi tipizzati in modo statico.

L'origine dati Avro supporta:

  • Conversione Schema: conversione automatica dei record tra Apache Spark SQL e Avro.
  • Partizionamento: leggere e scrivere facilmente dati partizionati senza alcuna configurazione aggiuntiva.
  • Compressione: compressione da usare durante la scrittura di Avro su disco. I tipi supportati sono uncompressed, snappy e deflate. È anche possibile specificare il livello di deflate.
  • Nomi dei record: nome del record e namespace utilizzando una mappa di parameters con recordName e recordNamespace.

Vedere anche Leggere e scrivere dati Avro in streaming.

Impostazione

È possibile modificare il comportamento di un'origine dati Avro usando varie configurazioni parameters.

Per ignorare i file senza l'estensione .avro durante la lettura, è possibile set il parametro avro.mapred.ignore.inputs.without.extension nella configurazione di Hadoop. Il valore predefinito è false.

spark
  .sparkContext
  .hadoopConfiguration
  .set("avro.mapred.ignore.inputs.without.extension", "true")

Per configurare la compressione durante la scrittura, set le proprietà spark seguenti:

  • Codec di compressione: spark.sql.avro.compression.codec. I codec supportati sono snappy e deflate. Il codec predefinito è snappy.
  • Se il codec di compressione è deflate, è possibile set il livello di compressione con: spark.sql.avro.deflate.level. Il livello predefinito è -1.

È possibile set queste proprietà nel cluster configurazione di Spark o in fase di esecuzione usando spark.conf.set(). Ad esempio:

spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")

Per Databricks Runtime 9.1 LTS e versioni successive, è possibile modificare il comportamento di inferenza schema predefinito in Avro fornendo l'opzione mergeSchema durante la lettura dei file. L'impostazione di mergeSchema su true dedurrà un schema da un set di file Avro nella directory di destinazione e li unisce anziché dedurre il schema di lettura da un singolo file.

Tipi supportati per Avro -> Conversione spark SQL

Questa libreria supporta la lettura di tutti i tipi Avro. Per abbinare i tipi Avro ai tipi Spark SQL, usa il mapping seguente:

Tipo Avro Tipo SQL Spark
boolean BooleanType
int IntegerType
long LongType
float FloatType
double DoubleType
bytes BinaryType
string StringType
registra StructType
enum StringType
array ArrayType
mappa MapType
fixed BinaryType
union Vedere Tipi unione.

Tipi unione

L'origine dati Avro supporta i tipi di lettura union. Avro presuppone che i tre tipi seguenti siano tipi union:

  • union(int, long) esegue il mapping a LongType.
  • union(float, double) esegue il mapping a DoubleType.
  • union(something, null), wheresomething è qualsiasi tipo Avro supportato. Corrisponde allo stesso tipo Spark SQL di something, con nullableset a true.

Tutti gli altri tipi union sono tipi complessi. I nomi di campo StructType,where, member0, member1e così via vengono mappati in base ai membri del union. Questo comportamento è coerente con il comportamento durante la conversione tra Avro e Parquet.

Tipi logici

L'origine dati Avro supporta la lettura dei seguenti tipi logici Avro:

Tipo logico Avro Tipo Avro Tipo SQL Spark
data int DateType
timestamp-millis long TimestampType
timestamp-micros long TimestampType
decimale fixed DecimalType
decimale bytes DecimalType

Nota

L'origine dati Avro ignora documenti, alias e altre proprietà presenti nel file Avro.

Tipi supportati per Spark SQL -> Conversione Avro

Questa libreria supporta la scrittura di tutti i tipi Spark SQL in Avro. Per la maggior parte dei tipi, il mapping dai tipi Spark ai tipi Avro è semplice (per esempio IntegerType viene convertito in int); segue un list dei pochi casi speciali:

Tipo SQL Spark Tipo Avro Tipo logico Avro
ByteType int
ShortType int
BinaryType bytes
DecimalType fixed decimale
TimestampType long timestamp-micros
DateType int data

È anche possibile specificare l'intero output schema Avro con l'opzione avroSchema, in modo che i tipi Spark SQL possano essere convertiti in altri tipi Avro. Le conversioni seguenti non vengono applicate per impostazione predefinita e richiedono che l'utente specifichi Avro schema:

Tipo SQL Spark Tipo Avro Tipo logico Avro
ByteType fixed
StringType enum
DecimalType bytes decimale
TimestampType long timestamp-millis

Esempi

Questi esempi usano il file episodes.avro.

Scala

// The Avro records are converted to Spark types, filtered, and
// then written back out as Avro records

val df = spark.read.format("avro").load("/tmp/episodes.avro")
df.filter("doctor > 5").write.format("avro").save("/tmp/output")

Questo esempio illustra un schemaAvro personalizzato:

import org.apache.avro.Schema

val schema = new Schema.Parser().parse(new File("episode.avsc"))

spark
  .read
  .format("avro")
  .option("avroSchema", schema.toString)
  .load("/tmp/episodes.avro")
  .show()

Questo esempio illustra le opzioni di compressione Avro:

// configuration to use deflate compression
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")

val df = spark.read.format("avro").load("/tmp/episodes.avro")

// writes out compressed Avro records
df.write.format("avro").save("/tmp/output")

Questo esempio illustra i record Avro partizionati:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().master("local").getOrCreate()

val df = spark.createDataFrame(
  Seq(
    (2012, 8, "Batman", 9.8),
    (2012, 8, "Hero", 8.7),
    (2012, 7, "Robot", 5.5),
    (2011, 7, "Git", 2.0))
  ).toDF("year", "month", "title", "rating")

df.toDF.write.format("avro").partitionBy("year", "month").save("/tmp/output")

Questo esempio illustra il nome del record e lo spazio dei nomi:

val df = spark.read.format("avro").load("/tmp/episodes.avro")

val name = "AvroTest"
val namespace = "org.foo"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)

df.write.options(parameters).format("avro").save("/tmp/output")

Python

# Create a DataFrame from a specified directory
df = spark.read.format("avro").load("/tmp/episodes.avro")

#  Saves the subset of the Avro records read in
subset = df.where("doctor > 5")
subset.write.format("avro").save("/tmp/output")

SQL

Per eseguire query sui dati Avro in SQL, registrare il file di dati come table o una vista temporanea.

CREATE TEMPORARY VIEW episodes
USING avro
OPTIONS (path "/tmp/episodes.avro")

SELECT * from episodes

Esempio di notebook: leggere e scrivere file Avro

Il notebook seguente illustra come leggere e scrivere file Avro.

Notebook: Leggere e scrivere file Avro

Get taccuino