共用方式為


Avro 檔案

Apache Avro 是資料序列化系統。 Avro 提供:

  • 豐富的資料結構。
  • 壓縮、快速的二進位資料格式。
  • 用來儲存持續性資料的容器檔案。
  • 遠端程序呼叫 (RPC)。
  • 與動態語言的簡單整合。 不需要產生代碼即可讀取或寫入資料檔案,也不需要使用或實施 RPC 通訊協定。 代碼產生為選擇性最佳化,僅值得實施於靜態類型語言。

Avro資料來源支援:

  • 結構描述轉換:Apache Spark SQL 與 Avro 記錄之間的自動轉換。
  • 資料分割:輕鬆讀取和寫入資料分割資料,而不需要任何額外的設定。
  • 壓縮:將 Avro 寫入磁碟時要使用的壓縮。 支援的類型為 uncompressedsnappydeflate。 您也可以指定壓抑的層級。
  • 記錄名稱:使用 recordNamerecordNamespace 傳遞參數的對應來記錄名稱和命名空間。

也請參閱讀取和寫入串流 Avro 資料

組態

您可以使用各種組態參數來變更 Avro 資料來源的行為。

若要在讀取時略過沒有 .avro 延伸項目的檔案,您可以在 Hadoop 組態中設定參數 avro.mapred.ignore.inputs.without.extension。 預設值為 false

spark
  .sparkContext
  .hadoopConfiguration
  .set("avro.mapred.ignore.inputs.without.extension", "true")

若要在寫入時設定壓縮,請設定下列 Spark 屬性:

  • 壓縮編解碼器:spark.sql.avro.compression.codec。 支援的編解碼器為 snappydeflate。 預設編解碼器為 snappy
  • 如果壓縮編解碼器是 deflate,您可以使用下列來設定壓縮層級:spark.sql.avro.deflate.level。 預設層級是 -1

您可以在叢集 Spark 組態或執行階段使用 spark.conf.set() 來設定這些屬性。 例如:

spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")

針對 Databricks Runtime 9.1 LTS 和更新版本,您可以在讀取檔案時在 Avro 中提供mergeSchema 選項,以變更 Avro 中的預設結構描述推斷行為。 將 mergeSchema 設定為 true 會從目標目錄中的一組 Avro 檔案推斷結構描述,並加以合併,而不是從單一檔案推斷讀取結構描述。

Avro 支援的類型 -> Spark SQL 轉換

此程式庫支援讀取所有 Avro 類型。 它會使用下列從 Avro 類型到 Spark SQL 類型的對應:

Avro 類型 Spark SQL 類型
boolean BooleanType
int IntegerType
long LongType
float FloatType
double DoubleType
bytes BinaryType
字串 StringType
記錄 StructType
enum StringType
陣列 ArrayType
map MapType
fixed BinaryType
union 請參閱等位型別

等位型別

Avro 資料來源支援讀取 union 類型。 Avro 會將下列三種類型視為 union 類型:

  • union(int, long) 對應至 LongType
  • union(float, double) 對應至 DoubleType
  • union(something, null),其中 something 是任何支援的 Avro 類型。 這會對應至與 something 相同的 Spark SQL 類型,並將 nullable 設定為 true

所有其他的 union 類型都是複雜類型。 它們會對應至 StructType,而其欄位名稱為 member0member1 等位置,且將依照 union 的成員進行對應。 這與 Avro 與 Parquet 之間轉換時的行為一致。

邏輯類型

Avro 資料來源支援讀取下列 Avro 邏輯類型

Avro 邏輯類型 Avro 類型 Spark SQL 類型
date int DateType
timestamp-millis long TimestampType
timestamp-micros long TimestampType
decimal fixed DecimalType
decimal bytes DecimalType

注意

Avro 資料來源略過 Avro 檔案中存在的文件、別名和其他屬性。

Spark SQL 支援的類型 -> Avro 轉換

此程式庫支援將所有 Spark SQL類型寫入 Avro。 對大部分類型而言,從 Spark 類型到 Avro 類型的對應很簡單(例如 IntegerType 會轉換成 int):下列是少數特殊案例的清單:

Spark SQL 類型 Avro 類型 Avro 邏輯類型
ByteType int
ShortType int
BinaryType bytes
DecimalType fixed decimal
TimestampType long timestamp-micros
DateType int date

您也可以使用選項 avroSchema 來指定整個輸出的 Avro 結構描述,讓 Spark SQL 類型可以轉換成其他 Avro 類型。 預設不會套用下列轉換,而且需要使用者指定的 Avro 結構描述:

Spark SQL 類型 Avro 類型 Avro 邏輯類型
ByteType fixed
StringType enum
DecimalType bytes decimal
TimestampType long timestamp-millis

範例

這些範例會使用 episodes.avro 檔案。

Scala

// The Avro records are converted to Spark types, filtered, and
// then written back out as Avro records

val df = spark.read.format("avro").load("/tmp/episodes.avro")
df.filter("doctor > 5").write.format("avro").save("/tmp/output")

此範例示範自訂的 Avro 結構描述:

import org.apache.avro.Schema

val schema = new Schema.Parser().parse(new File("episode.avsc"))

spark
  .read
  .format("avro")
  .option("avroSchema", schema.toString)
  .load("/tmp/episodes.avro")
  .show()

此範例示範 Avro 壓縮選項:

// configuration to use deflate compression
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")

val df = spark.read.format("avro").load("/tmp/episodes.avro")

// writes out compressed Avro records
df.write.format("avro").save("/tmp/output")

此範例示範分割的 Avro 記錄:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().master("local").getOrCreate()

val df = spark.createDataFrame(
  Seq(
    (2012, 8, "Batman", 9.8),
    (2012, 8, "Hero", 8.7),
    (2012, 7, "Robot", 5.5),
    (2011, 7, "Git", 2.0))
  ).toDF("year", "month", "title", "rating")

df.toDF.write.format("avro").partitionBy("year", "month").save("/tmp/output")

此範例示範記錄名稱與命名空間:

val df = spark.read.format("avro").load("/tmp/episodes.avro")

val name = "AvroTest"
val namespace = "org.foo"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)

df.write.options(parameters).format("avro").save("/tmp/output")

Python

# Create a DataFrame from a specified directory
df = spark.read.format("avro").load("/tmp/episodes.avro")

#  Saves the subset of the Avro records read in
subset = df.where("doctor > 5")
subset.write.format("avro").save("/tmp/output")

SQL

若要在 SQL 中查詢 Avro 資料,請將資料檔案註冊為資料表或暫存檢視:

CREATE TEMPORARY VIEW episodes
USING avro
OPTIONS (path "/tmp/episodes.avro")

SELECT * from episodes

筆記本範例:讀取和寫入 Avro 檔案

下列筆記本示範如何讀取和寫入 Avro 檔案。

讀取和寫入 Avro 檔案筆記本

取得筆記本