Leggere e scrivere dati Avro in streaming
Apache Avro è un sistema di serializzazione dei dati comunemente usato nel mondo di streaming. Una soluzione tipica consiste nell'inserire i dati in formato Avro in Apache Kafka, i metadati nel Confluent Schema Registrye quindi eseguire query con un framework di streaming che si connette sia a Kafka che al Schema Registry.
Azure Databricks supporta le funzioni di from_avro
e to_avro
per creare pipeline di streaming con dati Avro in Kafka e metadati in Schema Registry. La funzione to_avro
codifica un column come binario in formato Avro e from_avro
decodifica i dati binari Avro in un column. Entrambe le funzioni trasformano un column in un altro columne il tipo di dati SQL di input/output può essere un tipo complesso o un tipo primitivo.
Nota
Funzioni from_avro
e to_avro
:
- Disponibili in Phyton, Scala e Java.
- È possibile passare alle funzioni SQL sia in batch che in query di streaming.
Vedere anche Origine dati del file Avro.
Esempio di schema specificato manualmente
Analogamente a from_json e to_json, è possibile usare from_avro
e to_avro
con qualsiasi columnbinario. È possibile specificare manualmente l'Avro schema, come nell'esempio seguente:
import org.apache.spark.sql.avro.functions._
import org.apache.avro.SchemaBuilder
// When reading the key and value of a Kafka topic, decode the
// binary (Avro) data into structured data.
// The schema of the resulting DataFrame is: <key: string, value: int>
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro($"key", SchemaBuilder.builder().stringType()).as("key"),
from_avro($"value", SchemaBuilder.builder().intType()).as("value"))
// Convert structured data to binary from string (key column) and
// int (value column) and save to a Kafka topic.
dataDF
.select(
to_avro($"key").as("key"),
to_avro($"value").as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()
Esempio jsonFormatSchema
È anche possibile specificare un schema come stringa JSON. Ad esempio, se /tmp/user.avsc
è:
{
"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_color", "type": ["string", "null"]}
]
}
È possibile creare una stringa JSON:
from pyspark.sql.avro.functions import from_avro, to_avro
jsonFormatSchema = open("/tmp/user.avsc", "r").read()
Usare quindi il schema in from_avro
:
# 1. Decode the Avro data into a struct.
# 2. Filter by column "favorite_color".
# 3. Encode the column "name" in Avro format.
output = df\
.select(from_avro("value", jsonFormatSchema).alias("user"))\
.where('user.favorite_color == "red"')\
.select(to_avro("user.name").alias("value"))
Esempio con registro Schema
Se il cluster dispone di un servizio di registrazione Schema, è possibile utilizzare from_avro
in modo che non sia necessario specificare manualmente il schema Avro.
Nell'esempio seguente è mostrata la lettura di un topic Kafka "t", presupponendo che la chiave e il valore siano già registrati nel registro Schema come voci "t-key" e "t-value" di tipi STRING
e INT
:
import org.apache.spark.sql.avro.functions._
val schemaRegistryAddr = "https://myhost:8081"
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro($"key", "t-key", schemaRegistryAddr).as("key"),
from_avro($"value", "t-value", schemaRegistryAddr).as("value"))
Per to_avro
, l'output predefinito Avro schema potrebbe non corrispondere al schema dell'oggetto di destinazione nel servizio Registro Schema per i motivi seguenti:
- La mappatura dal tipo Spark SQL ad Avro schema non è univoca. Vedere Tipi supportati per Spark SQL -> Conversione Avro.
- Se l'output convertito avro schema è di tipo record, il nome del record è
topLevelRecord
e non esiste alcuno spazio dei nomi per impostazione predefinita.
Se l'output predefinito schema di to_avro
corrisponde al schema dell'oggetto di destinazione, è possibile eseguire le operazioni seguenti:
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()
In caso contrario, è necessario specificare il schema dell'oggetto di destinazione nella funzione to_avro
:
// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()
Eseguire l'autenticazione in un registro di Schema confluente esterno
In Databricks Runtime 12.2 LTS e versioni successive è possibile eseguire l'autenticazione in un registro confluente esterno Schema. Gli esempi seguenti illustrano come configurare le opzioni del Registro di sistema schema in modo da includere credentials di autenticazione e chiavi API.
Scala
import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret")
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro($"key", "t-key", schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
from_avro($"value", "t-value", schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
Python
from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro
schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
"confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
"confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}"
}
df = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro(
data = col("key"),
options = schema_registry_options,
subject = "t-key",
schemaRegistryAddress = schema_registry_address
).alias("key"),
from_avro(
data = col("value"),
options = schema_registry_options,
subject = "t-value",
schemaRegistryAddress = schema_registry_address
).alias("value")
)
)
# The converted data is saved to Kafka as a Kafka topic "t".
data_df
.select(
to_avro(
data = col("key"),
subject = lit("t-key"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options
).alias("key"),
to_avro(
data = col("value"),
subject = lit("t-value"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options
).alias("value")
)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
# The Avro schema of subject "t-value" in JSON string format.
avro_schema = ...
# The converted data is saved to Kafka as a Kafka topic "t".
data_df
.select(
to_avro(
data = col("key"),
subject = lit("t-key"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options
).alias("key"),
to_avro(
data = col("value"),
subject = lit("t-value"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options,
jsonFormatSchema = avro_schema).alias("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
Usare file truststore e keystore in Unity Catalogvolumes
In Databricks Runtime 14.3 LTS e versioni successive, è possibile utilizzare i file truststore e keystore in Unity Catalogvolumes per autenticarsi a un Registry Confluent Schema. Update la configurazione nell'esempio precedente usando la sintassi seguente:
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/truststore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "truststorePassword",
"confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/keystore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "keystorePassword",
"confluent.schema.registry.ssl.key.password" -> "keyPassword")
Usare la modalità di evoluzione schema con from_avro
In Databricks Runtime 14.2 e versioni successive, è possibile usare la modalità di evoluzione schema con from_avro
. L'abilitazione della modalità di evoluzione schema provoca che il processo generi un UnknownFieldException
dopo aver rilevato l'evoluzione schema. Databricks consiglia di configurare i processi con la modalità di evoluzione schema per il riavvio automatico in caso di errore dell'attività. Vedere Considerazioni sulla produzione per Structured Streaming.
Schema: l'evoluzione è utile se si prevede che il schema dei dati di origine si evolva nel tempo e acquisisca tutti i campi dalla tua fonte di dati. Se le query specificano già in modo esplicito quali campi interrogare nell'origine dati, i campi aggiunti vengono ignorati indipendentemente dall'evoluzione schema.
Utilizzare l'opzione avroSchemaEvolutionMode
per abilitare l'evoluzione schema. La seguente table descrive le opzioni per la modalità di evoluzione di schema:
Opzione | Comportamento |
---|---|
none |
Valore predefinito. Ignora l'evoluzione del schema e l'attività continua. |
restart |
Genera un UnknownFieldException quando si rileva l'evoluzione di schema. Richiede il riavvio. |
Nota
È possibile modificare questa configurazione tra processi di streaming e riutilizzare lo stesso checkpoint. La disabilitazione dell'evoluzione schema può comportare l'eliminazione di columns.
Configurare la modalità di analisi
È possibile configurare la modalità di analisi per determinare se si desidera interrompere o generare record null quando la modalità di evoluzione schema è disabilitata e la schema si evolve in modo non retrocompatibile. Con le impostazioni predefinite, from_avro
fallisce quando rileva modifiche schema non compatibili.
Usare l'opzione mode
per specificare la modalità di analisi. L'table seguente descrive l'opzione per la modalità di analisi:
Opzione | Comportamento |
---|---|
FAILFAST |
Valore predefinito. Un errore di analisi genera un SparkException con un errorClass di MALFORMED_AVRO_MESSAGE . |
PERMISSIVE |
Un errore di analisi viene ignorato e viene generato un record Null. |
Nota
Quando l'evoluzione schema è abilitata, FAILFAST
genera eccezioni solo se un record è danneggiato.
Esempio di utilizzo dell'evoluzione schema e della configurazione della modalità di analisi
L'esempio seguente illustra come abilitare l'evoluzione schema e specificare la modalità di analisi FAILFAST
con un Registro di Confluent Schema.
Scala
import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret",
"avroSchemaEvolutionMode" -> "restart",
"mode" -> "FAILFAST")
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
// We read the "key" binary column from the subject "t-key" in the schema
// registry at schemaRegistryAddr. We provide schemaRegistryOptions,
// which has avroSchemaEvolutionMode -> "restart". This instructs from_avro
// to fail the query if the schema for the subject t-key evolves.
from_avro(
$"key",
"t-key",
schemaRegistryAddr,
schemaRegistryOptions.asJava).as("key"))
Python
from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro
schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
"confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
"confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}",
"avroSchemaEvolutionMode": "restart",
"mode": "FAILFAST",
}
df = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro(
data = col("key"),
options = schema_registry_options,
subject = "t-key",
schemaRegistryAddress = schema_registry_address
).alias("key")
)
)