Файл Avro
Apache Avro — это система сериализации данных. Avro предоставляет следующие возможности:
- Обширные структуры данных.
- Компактный и быстрый двоичный формат данных.
- Файл контейнера для хранения постоянных данных.
- Удаленный вызов процедур (RPC).
- Простая интеграция с динамическими языками. Создание кода не требуется для чтения или записи файлов данных, а также для использования или реализации протоколов RPC. Создание кода в качестве дополнительной оптимизации, имеет смысл реализовать только для языков со статической типизацией.
Источник данных Avro поддерживает:
- Преобразование схемы: автоматическое преобразование между Apache Spark SQL и записями Avro.
- Секционирование: простое чтение и запись секционированных данных без дополнительной настройки.
- Сжатие: сжатие, используемое при записи Avro на диск. Поддерживаемые типы данных:
uncompressed
,snappy
иdeflate
. Можно также указать уровень сжатия. - Имена записей: имя записи и пространство имен путем передачи схемы параметров с
recordName
иrecordNamespace
.
См. также Потоковые операции чтения и записи данных Avro.
Настройка
Поведение источника данных Avro можно изменить с помощью различных параметров конфигурации.
Чтобы игнорировать файлы без расширения .avro
при чтении, можно задать параметр avro.mapred.ignore.inputs.without.extension
в конфигурации Hadoop. Значение по умолчанию — false
.
spark
.sparkContext
.hadoopConfiguration
.set("avro.mapred.ignore.inputs.without.extension", "true")
Чтобы настроить сжатие при записи, задайте следующие свойства Spark:
- Кодек сжатия:
spark.sql.avro.compression.codec
. Поддерживаемые кодеки:snappy
иdeflate
. Кодек по умолчанию —snappy
. - Если используется кодек сжатия
deflate
, можно задать уровень сжатия следующим образом:spark.sql.avro.deflate.level
. По умолчанию используется уровень-1
.
Эти свойства можно задать в конфигурации Spark кластера или во время выполнения с помощью spark.conf.set()
. Например:
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")
Для Databricks Runtime 9.1 LTS и более поздних версий можно изменить поведение вывода схемы по умолчанию в Avro, указав параметр mergeSchema
при чтении файлов. Если для mergeSchema
задано значение true
, не будет происходить вывод схемы чтения из одного файла.Вместо этого схема будет выводиться из набора файлов Avro в целевом каталоге и объединять их.
Поддерживаемые типы для типа Avro — > Преобразование Spark SQL
Эта библиотека поддерживает чтение всех типов Avro. В ней используется следующее сопоставление типов Avro с типами Spark SQL:
Тип Avro | Тип Spark SQL |
---|---|
boolean | BooleanType |
INT | IntegerType |
длинный | LongType |
с плавающей запятой | FloatType |
двойной точности | DoubleType |
байт | BinaryType |
строка | StringType |
record | StructType |
перечисление | StringType |
array | ArrayType |
map | MapType |
fixed | BinaryType |
union | См Типы объединения. |
Типы объединения
Источник данных Avro поддерживает чтение типов union
. Avro рассматривает следующие три типа как типы union
:
union(int, long)
сопоставляетсяLongType
.union(float, double)
сопоставляетсяDoubleType
.union(something, null)
, гдеsomething
— любой поддерживаемый тип Avro. Он сопоставляется с тем же типом Spark SQL, что иsomething
, при этом параметруnullable
присваивается значениеtrue
.
Все остальные типы union
являются сложными типами. Они сопоставляются с StructType
, где именами полей являются member0
, member1
и т. д. в соответствии с членами union
. Этот процесс соответствует поведению при преобразовании между Avro и Parquet.
Логические типы
Источник данных Avro поддерживает чтение следующих логических типов Avro:
Логический тип Avro | Тип Avro | Тип Spark SQL |
---|---|---|
Дата | INT | DateType |
timestamp-millis | длинный | TimestampType |
timestamp-micros | длинный | TimestampType |
десятичное | fixed | DecimalType |
десятичное | байт | DecimalType |
Примечание.
Источник данных Avro игнорирует документы, псевдонимы и другие свойства, имеющиеся в файле Avro.
Поддерживаемые типы для Spark SQL > Преобразование Avro
Эта библиотека поддерживает запись всех типов Spark SQL в Avro. Для большинства типов сопоставление типов Spark с типами Avro происходит просто (например, IntegerType
преобразуется в int
). Ниже приведен список некоторых особых случаев.
Тип Spark SQL | Тип Avro | Логический тип Avro |
---|---|---|
ByteType | INT | |
ShortType | INT | |
BinaryType | байт | |
DecimalType | fixed | десятичное |
TimestampType | длинный | timestamp-micros |
DateType | INT | Дата |
Можно также указать всю выходную схему Avro с помощью параметра avroSchema
, чтобы типы Spark SQL можно было преобразовать в другие типы Avro.
Следующие преобразования не применяются по умолчанию, и для них требуется определенная пользователем схема Avro:
Тип Spark SQL | Тип Avro | Логический тип Avro |
---|---|---|
ByteType | fixed | |
StringType | перечисление | |
DecimalType | байт | десятичное |
TimestampType | длинный | timestamp-millis |
Примеры
В этих примерах используется файл episodes.avro.
Scala
// The Avro records are converted to Spark types, filtered, and
// then written back out as Avro records
val df = spark.read.format("avro").load("/tmp/episodes.avro")
df.filter("doctor > 5").write.format("avro").save("/tmp/output")
В этом примере показана пользовательская схема Avro:
import org.apache.avro.Schema
val schema = new Schema.Parser().parse(new File("episode.avsc"))
spark
.read
.format("avro")
.option("avroSchema", schema.toString)
.load("/tmp/episodes.avro")
.show()
В этом примере показаны параметры сжатия Avro:
// configuration to use deflate compression
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")
val df = spark.read.format("avro").load("/tmp/episodes.avro")
// writes out compressed Avro records
df.write.format("avro").save("/tmp/output")
В этом примере показаны секционированные записи Avro:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().master("local").getOrCreate()
val df = spark.createDataFrame(
Seq(
(2012, 8, "Batman", 9.8),
(2012, 8, "Hero", 8.7),
(2012, 7, "Robot", 5.5),
(2011, 7, "Git", 2.0))
).toDF("year", "month", "title", "rating")
df.toDF.write.format("avro").partitionBy("year", "month").save("/tmp/output")
В этом примере показаны имя записи и пространство имен:
val df = spark.read.format("avro").load("/tmp/episodes.avro")
val name = "AvroTest"
val namespace = "org.foo"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)
df.write.options(parameters).format("avro").save("/tmp/output")
Python
# Create a DataFrame from a specified directory
df = spark.read.format("avro").load("/tmp/episodes.avro")
# Saves the subset of the Avro records read in
subset = df.where("doctor > 5")
subset.write.format("avro").save("/tmp/output")
SQL
Чтобы запросить данные Avro в SQL, зарегистрируйте файл данных как таблицу или временное представление:
CREATE TEMPORARY VIEW episodes
USING avro
OPTIONS (path "/tmp/episodes.avro")
SELECT * from episodes
Пример записной книжки: чтение и запись файлов Avro
В следующей записной книжке показано, как читать и записывать файлы Avro.