Condividi tramite


Leggere e scrivere dati Avro in streaming

Apache Avro è un sistema di serializzazione dei dati comunemente usato nel mondo di streaming. Una soluzione tipica consiste nell'inserire i dati in formato Avro in Apache Kafka, metadati nel Registro schemi Confluent e quindi eseguire query con un framework di streaming che si connette sia a Kafka che a Registro schemi.

Azure Databricks supporta le funzioni from_avroe to_avro per creare pipeline di streaming con dati Avro in Kafka e metadati nel Registro schemi. La funzione to_avro codifica una colonna come binaria in formato Avro e la from_avro decodifica i dati binari Avro in una colonna. Entrambe le funzioni trasformano una colonna in un'altra colonna e il tipo di dati SQL di input/output può essere un tipo complesso o un tipo primitivo.

Nota

Funzioni from_avro e to_avro:

  • Disponibili in Phyton, Scala e Java.
  • È possibile passare alle funzioni SQL sia in batch che in query di streaming.

Vedere anche Origine dati del file Avro.

Esempio di schema specificato manualmente

Analogamente a from_json e to_json, è possibile usare from_avro e to_avrocon qualsiasi colonna binaria. È possibile specificare manualmente lo schema Avro, come nell'esempio seguente:

import org.apache.spark.sql.avro.functions._
import org.apache.avro.SchemaBuilder

// When reading the key and value of a Kafka topic, decode the
// binary (Avro) data into structured data.
// The schema of the resulting DataFrame is: <key: string, value: int>
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", SchemaBuilder.builder().stringType()).as("key"),
    from_avro($"value", SchemaBuilder.builder().intType()).as("value"))

// Convert structured data to binary from string (key column) and
// int (value column) and save to a Kafka topic.
dataDF
  .select(
    to_avro($"key").as("key"),
    to_avro($"value").as("value"))
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("topic", "t")
  .start()

Esempio jsonFormatSchema

È anche possibile specificare uno schema come stringa JSON. Ad esempio, se /tmp/user.avsc è:

{
  "namespace": "example.avro",
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "favorite_color", "type": ["string", "null"]}
  ]
}

È possibile creare una stringa JSON:

from pyspark.sql.avro.functions import from_avro, to_avro

jsonFormatSchema = open("/tmp/user.avsc", "r").read()

Usare quindi lo schema in from_avro:

# 1. Decode the Avro data into a struct.
# 2. Filter by column "favorite_color".
# 3. Encode the column "name" in Avro format.

output = df\
  .select(from_avro("value", jsonFormatSchema).alias("user"))\
  .where('user.favorite_color == "red"')\
  .select(to_avro("user.name").alias("value"))

Esempio con registro schemi

Se il cluster dispone di un servizio Registro schemi, from_avro può usarlo in modo che non sia necessario specificare manualmente lo schema Avro.

Nell'esempio seguente viene illustrata la lettura di un argomento Kafka "t", presupponendo che la chiave e il valore siano già registrati nel Registro schemi come soggetti "t-key" e "t-value" di tipo STRING e INT:

import org.apache.spark.sql.avro.functions._

val schemaRegistryAddr = "https://myhost:8081"
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", "t-key", schemaRegistryAddr).as("key"),
    from_avro($"value", "t-value", schemaRegistryAddr).as("value"))

Per to_avro, lo schema Avro di output predefinito potrebbe non corrispondere allo schema dell'oggetto di destinazione nel servizio Registro schemi per i motivi seguenti:

  • Il mapping dal tipo Spark SQL allo schema Avro non è uno-a-uno. Vedere Tipi supportati per Spark SQL -> Conversione Avro.
  • Se lo schema Avro di output convertito è di tipo record, il nome del record è topLevelRecord e per impostazione predefinita non c’è alcuno spazio dei nomi.

Se lo schema di output predefinito di to_avro corrisponde allo schema del soggetto di destinazione, è possibile procedere come segue:

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()

In caso contrario, è necessario specificare lo schema dell'oggetto di destinazione nella funzione to_avro:

// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()

Eseguire l'autenticazione in un registro schemi Confluent esterno

In Databricks Runtime 12.2 LTS e versioni successive è possibile eseguire l'autenticazione in un registro schemi Confluent esterno. Gli esempi seguenti illustrano come configurare le opzioni del registro schemi in modo da includere le credenziali di autenticazione e le chiavi API.

Scala

import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret")

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", "t-key", schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
    from_avro($"value", "t-value", schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

Python

from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro

schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
  "confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
  "confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}"
}

df = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro(
      data = col("key"),
      options = schema_registry_options,
      subject = "t-key",
      schemaRegistryAddress = schema_registry_address
    ).alias("key"),
    from_avro(
      data = col("value"),
      options = schema_registry_options,
      subject = "t-value",
      schemaRegistryAddress = schema_registry_address
    ).alias("value")
  )
)

# The converted data is saved to Kafka as a Kafka topic "t".
data_df
  .select(
    to_avro(
      data = col("key"),
      subject = lit("t-key"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options
    ).alias("key"),
    to_avro(
      data = col("value"),
      subject = lit("t-value"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options
    ).alias("value")
  )
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

# The Avro schema of subject "t-value" in JSON string format.
avro_schema = ...

# The converted data is saved to Kafka as a Kafka topic "t".
data_df
  .select(
    to_avro(
      data = col("key"),
      subject = lit("t-key"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options
    ).alias("key"),
    to_avro(
      data = col("value"),
      subject = lit("t-value"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options,
      jsonFormatSchema = avro_schema).alias("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

Usare i file truststore e archivio chiavi nei volumi di Unity Catalog

In Databricks Runtime 14.3 LTS e versioni successive è possibile usare i file truststore e archivio chiavi nei volumi di Unity Catalog per eseguire l'autenticazione in un Registro schemi Confluent. Aggiornare la configurazione nell'esempio precedente usando la sintassi seguente:

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
      "confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/truststore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "truststorePassword",
      "confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/keystore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "keystorePassword",
      "confluent.schema.registry.ssl.key.password" -> "keyPassword")

Usare la modalità di evoluzione dello schema con from_avro

In Databricks Runtime 14.2 e versioni successive è possibile usare la modalità di evoluzione dello schema con from_avro. L'abilitazione della modalità di evoluzione dello schema fa sì che il processo generi UnknownFieldException dopo aver rilevato l'evoluzione dello schema. Databricks consiglia di configurare i processi con la modalità di evoluzione dello schema per il riavvio automatico in caso di errore del task. Vedere Considerazioni sulla produzione per Structured Streaming.

L'evoluzione dello schema è utile se si prevede che lo schema dei dati di origine si evolva nel tempo e inserisca tutti i campi dall'origine dati. Se le query specificano già in modo esplicito in quali campi eseguire query nell'origine dati, i campi aggiunti vengono ignorati indipendentemente dall'evoluzione dello schema.

Usare l'opzione avroSchemaEvolutionMode per abilitare l'evoluzione dello schema. La tabella seguente descrive le opzioni della modalità di evoluzione dello schema:

Opzione Comportamento
none Valore predefinito. Ignora l'evoluzione dello schema e il processo continua.
restart Genera un UnknownFieldException quando rileva l’evoluzione dello schema. Richiede il riavvio.

Nota

È possibile modificare questa configurazione tra processi di streaming e riutilizzare lo stesso checkpoint. La disabilitazione dell'evoluzione dello schema può comportare l'eliminazione di colonne.

Configurare la modalità di analisi

È possibile configurare la modalità di analisi per determinare se si desidera interrompere o generare record Null quando la modalità di evoluzione dello schema è disabilitata e lo schema si evolve in modo non compatibile con le versioni precedenti. Con le impostazioni predefinite, from_avro ha esito negativo quando trova modifiche dello schema incompatibili.

Usare l'opzione mode per specificare la modalità di analisi. La tabella seguente descrive le opzioni per la modalità di analisi:

Opzione Comportamento
FAILFAST Valore predefinito. Un errore di analisi genera un SparkException con un errorClass di MALFORMED_AVRO_MESSAGE.
PERMISSIVE Un errore di analisi viene ignorato e viene generato un record Null.

Nota

Con l'evoluzione dello schema abilitata, FAILFAST genera eccezioni solo se un record è danneggiato.

Esempio di uso dell'evoluzione dello schema e dell'impostazione della modalità di analisi

Nell'esempio seguente viene illustrata l'abilitazione dell'evoluzione dello schema e la specifica della FAILFAST modalità di analisi con un Registro schemi Confluent:

Scala

import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret",
      "avroSchemaEvolutionMode" -> "restart",
      "mode" -> "FAILFAST")

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    // We read the "key" binary column from the subject "t-key" in the schema
    // registry at schemaRegistryAddr. We provide schemaRegistryOptions,
    // which has avroSchemaEvolutionMode -> "restart". This instructs from_avro
    // to fail the query if the schema for the subject t-key evolves.
    from_avro(
            $"key",
            "t-key",
            schemaRegistryAddr,
            schemaRegistryOptions.asJava).as("key"))

Python

from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro

schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
  "confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
  "confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}",
  "avroSchemaEvolutionMode": "restart",
  "mode": "FAILFAST",
}

df = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro(
      data = col("key"),
      options = schema_registry_options,
      subject = "t-key",
      schemaRegistryAddress = schema_registry_address
    ).alias("key")
  )
)