Поделиться через


Файл Avro

Apache Avro — это система сериализации данных. Avro предоставляет следующие возможности:

  • Обширные структуры данных.
  • Компактный и быстрый двоичный формат данных.
  • Файл контейнера для хранения постоянных данных.
  • Удаленный вызов процедур (RPC).
  • Простая интеграция с динамическими языками. Создание кода не требуется для чтения или записи файлов данных, а также для использования или реализации протоколов RPC. Создание кода в качестве дополнительной оптимизации, имеет смысл реализовать только для языков со статической типизацией.

Источник данных Avro поддерживает:

  • Преобразование схемы: автоматическое преобразование между Apache Spark SQL и записями Avro.
  • Секционирование: простое чтение и запись секционированных данных без дополнительной настройки.
  • Сжатие: сжатие, используемое при записи Avro на диск. Поддерживаемые типы данных:uncompressed, snappy и deflate. Можно также указать уровень сжатия.
  • Имена записей: имя записи и пространство имен путем передачи схемы параметров с recordName и recordNamespace.

См. также Потоковые операции чтения и записи данных Avro.

Настройка

Поведение источника данных Avro можно изменить с помощью различных параметров конфигурации.

Чтобы игнорировать файлы без расширения .avro при чтении, можно задать параметр avro.mapred.ignore.inputs.without.extension в конфигурации Hadoop. Значение по умолчанию — false.

spark
  .sparkContext
  .hadoopConfiguration
  .set("avro.mapred.ignore.inputs.without.extension", "true")

Чтобы настроить сжатие при записи, задайте следующие свойства Spark:

  • Кодек сжатия: spark.sql.avro.compression.codec. Поддерживаемые кодеки: snappy и deflate. Кодек по умолчанию — snappy.
  • Если используется кодек сжатия deflate, можно задать уровень сжатия следующим образом: spark.sql.avro.deflate.level. По умолчанию используется уровень -1.

Эти свойства можно задать в конфигурации Spark кластера или во время выполнения с помощью spark.conf.set(). Например:

spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")

Для Databricks Runtime 9.1 LTS и более поздних версий можно изменить поведение вывода схемы по умолчанию в Avro, указав параметр mergeSchema при чтении файлов. Если для mergeSchema задано значение true, не будет происходить вывод схемы чтения из одного файла.Вместо этого схема будет выводиться из набора файлов Avro в целевом каталоге и объединять их.

Поддерживаемые типы для типа Avro — > Преобразование Spark SQL

Эта библиотека поддерживает чтение всех типов Avro. В ней используется следующее сопоставление типов Avro с типами Spark SQL:

Тип Avro Тип Spark SQL
boolean BooleanType
INT IntegerType
длинный LongType
с плавающей запятой FloatType
двойной точности DoubleType
байт BinaryType
строка StringType
record StructType
перечисление StringType
array ArrayType
map MapType
fixed BinaryType
union См Типы объединения.

Типы объединения

Источник данных Avro поддерживает чтение типов union. Avro рассматривает следующие три типа как типы union:

  • union(int, long) сопоставляется LongType.
  • union(float, double) сопоставляется DoubleType.
  • union(something, null), где something — любой поддерживаемый тип Avro. Он сопоставляется с тем же типом Spark SQL, что и something, при этом параметру nullable присваивается значение true.

Все остальные типы union являются сложными типами. Они сопоставляются с StructType, где именами полей являются member0, member1 и т. д. в соответствии с членами union. Этот процесс соответствует поведению при преобразовании между Avro и Parquet.

Логические типы

Источник данных Avro поддерживает чтение следующих логических типов Avro:

Логический тип Avro Тип Avro Тип Spark SQL
Дата INT DateType
timestamp-millis длинный TimestampType
timestamp-micros длинный TimestampType
десятичное fixed DecimalType
десятичное байт DecimalType

Примечание.

Источник данных Avro игнорирует документы, псевдонимы и другие свойства, имеющиеся в файле Avro.

Поддерживаемые типы для Spark SQL > Преобразование Avro

Эта библиотека поддерживает запись всех типов Spark SQL в Avro. Для большинства типов сопоставление типов Spark с типами Avro происходит просто (например, IntegerType преобразуется в int). Ниже приведен список некоторых особых случаев.

Тип Spark SQL Тип Avro Логический тип Avro
ByteType INT
ShortType INT
BinaryType байт
DecimalType fixed десятичное
TimestampType длинный timestamp-micros
DateType INT Дата

Можно также указать всю выходную схему Avro с помощью параметра avroSchema, чтобы типы Spark SQL можно было преобразовать в другие типы Avro. Следующие преобразования не применяются по умолчанию, и для них требуется определенная пользователем схема Avro:

Тип Spark SQL Тип Avro Логический тип Avro
ByteType fixed
StringType перечисление
DecimalType байт десятичное
TimestampType длинный timestamp-millis

Примеры

В этих примерах используется файл episodes.avro.

Scala

// The Avro records are converted to Spark types, filtered, and
// then written back out as Avro records

val df = spark.read.format("avro").load("/tmp/episodes.avro")
df.filter("doctor > 5").write.format("avro").save("/tmp/output")

В этом примере показана пользовательская схема Avro:

import org.apache.avro.Schema

val schema = new Schema.Parser().parse(new File("episode.avsc"))

spark
  .read
  .format("avro")
  .option("avroSchema", schema.toString)
  .load("/tmp/episodes.avro")
  .show()

В этом примере показаны параметры сжатия Avro:

// configuration to use deflate compression
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")

val df = spark.read.format("avro").load("/tmp/episodes.avro")

// writes out compressed Avro records
df.write.format("avro").save("/tmp/output")

В этом примере показаны секционированные записи Avro:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().master("local").getOrCreate()

val df = spark.createDataFrame(
  Seq(
    (2012, 8, "Batman", 9.8),
    (2012, 8, "Hero", 8.7),
    (2012, 7, "Robot", 5.5),
    (2011, 7, "Git", 2.0))
  ).toDF("year", "month", "title", "rating")

df.toDF.write.format("avro").partitionBy("year", "month").save("/tmp/output")

В этом примере показаны имя записи и пространство имен:

val df = spark.read.format("avro").load("/tmp/episodes.avro")

val name = "AvroTest"
val namespace = "org.foo"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)

df.write.options(parameters).format("avro").save("/tmp/output")

Python

# Create a DataFrame from a specified directory
df = spark.read.format("avro").load("/tmp/episodes.avro")

#  Saves the subset of the Avro records read in
subset = df.where("doctor > 5")
subset.write.format("avro").save("/tmp/output")

SQL

Чтобы запросить данные Avro в SQL, зарегистрируйте файл данных как таблицу или временное представление:

CREATE TEMPORARY VIEW episodes
USING avro
OPTIONS (path "/tmp/episodes.avro")

SELECT * from episodes

Пример записной книжки: чтение и запись файлов Avro

В следующей записной книжке показано, как читать и записывать файлы Avro.

Чтение и запись в записной книжке файлов Avro

Получить записную книжку