Buffers de protocolo de leitura e gravação
O Azure Databricks fornece suporte nativo para serialização e desserialização entre estruturas do Apache Spark e buffers de protocolo (protobuf). O suporte ao Protobuf é implementado como um transformador Apache Spark DataFrame e pode ser usado com Streaming Estruturado ou para operações em lote.
Como desserializar e serializar buffers de protocolo
No Databricks Runtime 12.2 LTS e superior, você pode usar from_protobuf
e to_protobuf
funções para serializar e desserializar dados. A serialização Protobuf é comumente usada em cargas de trabalho de streaming.
A sintaxe básica para funções protobuf é semelhante para funções de leitura e gravação. Você deve importar essas funções antes de usar.
from_protobuf
converte um column binário para um struct, e to_protobuf
converte um struct column para binário. Você deve fornecer um registro schema especificado com o argumento options
ou um arquivo descritor identificado pelo argumento descFilePath
.
Python
from_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)
to_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)
Scala
// While using with Schema registry:
from_protobuf(data: Column, options: Map[String, String])
// Or with Protobuf descriptor file:
from_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])
// While using with Schema registry:
to_protobuf(data: Column, options: Map[String, String])
// Or with Protobuf descriptor file:
to_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])
Os exemplos a seguir ilustram o processamento de registros de protobuf binário com from_protobuf()
e a conversão do Spark SQL struct em protobuf binário com to_protobuf()
.
Use protobuf com o Registo Schema Confluent
O Azure Databricks dá suporte ao uso do Confluent Schema Registry para definir o Protobuf.
Python
from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf
schema_registry_options = {
"schema.registry.subject" : "app-events-value",
"schema.registry.address" : "https://schema-registry:8081/"
}
# Convert binary Protobuf to SQL struct with from_protobuf():
proto_events_df = (
input_df
.select(
from_protobuf("proto_bytes", options = schema_registry_options)
.alias("proto_event")
)
)
# Convert SQL struct to binary Protobuf with to_protobuf():
protobuf_binary_df = (
proto_events_df
.selectExpr("struct(name, id, context) as event")
.select(
to_protobuf("event", options = schema_registry_options)
.alias("proto_bytes")
)
)
Scala
import org.apache.spark.sql.protobuf.functions._
import scala.collection.JavaConverters._
val schemaRegistryOptions = Map(
"schema.registry.subject" -> "app-events-value",
"schema.registry.address" -> "https://schema-registry:8081/"
)
// Convert binary Protobuf to SQL struct with from_protobuf():
val protoEventsDF = inputDF
.select(
from_protobuf($"proto_bytes", options = schemaRegistryOptions.asJava)
.as("proto_event")
)
// Convert SQL struct to binary Protobuf with to_protobuf():
val protobufBinaryDF = protoEventsDF
.selectExpr("struct(name, id, context) as event")
.select(
to_protobuf($"event", options = schemaRegistryOptions.asJava)
.as("proto_bytes")
)
Autenticar-se num Registro de Schema Confluent Externo
Para autenticar em um Registro de Schema Confluent externo, update suas opções de registro schema para incluir credentials de autenticação e chaves de API.
Python
schema_registry_options = {
"schema.registry.subject" : "app-events-value",
"schema.registry.address" : "https://remote-schema-registry-endpoint",
"confluent.schema.registry.basic.auth.credentials.source" : "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" : "confluentApiKey:confluentApiSecret"
}
Scala
val schemaRegistryOptions = Map(
"schema.registry.subject" -> "app-events-value",
"schema.registry.address" -> "https://remote-schema-registry-endpoint",
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret"
)
Utilize os ficheiros truststore e keystore no Unity Catalogvolumes
No Databricks Runtime 14.3 LTS e versões superiores, você pode usar arquivos truststore e keystore no Unity Catalogvolumes para autenticar num Confluent Schema Registry. Update suas opções de registro schema de acordo com o exemplo a seguir:
Python
schema_registry_options = {
"schema.registry.subject" : "app-events-value",
"schema.registry.address" : "https://remote-schema-registry-endpoint",
"confluent.schema.registry.ssl.truststore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
"confluent.schema.registry.ssl.truststore.password" : "<password>",
"confluent.schema.registry.ssl.keystore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
"confluent.schema.registry.ssl.keystore.password" : "<password>",
"confluent.schema.registry.ssl.key.password" : "<password>"
}
Scala
val schemaRegistryOptions = Map(
"schema.registry.subject" -> "app-events-value",
"schema.registry.address" -> "https://remote-schema-registry-endpoint",
"confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "<password>",
"confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
"confluent.schema.registry.ssl.keystore.password" -> "<password>",
"confluent.schema.registry.ssl.key.password" -> "<password>"
)
Use o Protobuf com um arquivo descritor
Você também pode fazer referência a um arquivo descritor protobuf que está disponível para seu cluster de computação. Certifique-se de que tem as permissões adequadas para ler o ficheiro, dependendo da sua localização.
Python
from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf
descriptor_file = "/path/to/proto_descriptor.desc"
proto_events_df = (
input_df.select(
from_protobuf(input_df.value, "BasicMessage", descFilePath=descriptor_file).alias("proto")
)
)
proto_binary_df = (
proto_events_df
.select(
to_protobuf(proto_events_df.proto, "BasicMessage", descriptor_file).alias("bytes")
)
)
Scala
import org.apache.spark.sql.protobuf.functions._
val descriptorFile = "/path/to/proto_descriptor.desc"
val protoEventsDF = inputDF
.select(
from_protobuf($"value", "BasicMessage", descFilePath=descriptorFile).as("proto")
)
val protoBytesDF = protoEventsDF
.select(
to_protobuf($"proto", "BasicMessage", descriptorFile).as("bytes")
)
Opções suportadas nas funções do Protobuf
As seguintes opções são suportadas nas funções do Protobuf.
-
mode: Determina como os erros durante a desserialização de registros Protobuf são tratados. Os erros podem ser causados por vários tipos de registros malformados, incluindo uma incompatibilidade entre o schema real do registro e o schema esperado fornecido no
from_protobuf()
.-
Values:
-
FAILFAST
(padrão): um erro é gerado quando um registro malformado é encontrado e a tarefa falha. -
PERMISSIVE
: Um NULL é retornado para registros malformados. Use esta opção com cuidado, pois pode resultar na queda de muitos registros. Isso é útil quando uma pequena fração dos registros na fonte está incorreta.
-
-
Values:
-
recursive.fields.max.depth: Adiciona suporte para campos recursivos. Os esquemas do Spark SQL não oferecem suporte a campos recursivos. Quando esta opção não é especificada, os campos recursivos não são permitidos. A fim de suportar campos recursivos em Protobufs, eles precisam estar se expandindo para uma profundidade especificada.
Values:
-1 (padrão): Campos recursivos não são permitidos.
0: Os campos recursivos são descartados.
1: Permite um único nível de recursão.
[2-10]: Especifique um limite para recursão múltipla, até 10 níveis.
Definir um valor como maior que 0 permite campos recursivos expandindo os campos aninhados para a profundidade configurada. Values maiores que 10 não são permitidos para evitar a criação inadvertida de esquemas muito grandes. Se uma mensagem Protobuf tiver profundidade além do limitconfigurado, a estrutura do Spark retornada será truncada após a recursão limit.
Exemplo: Considere um Protobuf com o seguinte campo recursivo:
message Person { string name = 1; Person friend = 2; }
A seguir está listado o schema final com values diferentes para essa configuração:
- Opção set a 1:
STRUCT<name: STRING>
- Opção set a 2:
STRUCT<name STRING, friend: STRUCT<name: STRING>>
- Opção set a 3:
STRUCT<name STRING, friend: STRUCT<name STRING, friend: STRUCT<name: STRING>>>
- Opção set a 1:
-
convert.any.fields.to.json: Esta opção permite converter campos Protobuf Any para JSON. Este recurso deve ser ativado com cuidado. A conversão e o processamento JSON são ineficientes. Além disso, o campo de string JSON perde a segurança do Protobuf schema, tornando o processamento a jusante propenso a erros.
Values:
- Falso (padrão): No tempo de execução, esses campos curinga podem conter mensagens arbitrárias do Protobuf como dados binários. Por padrão, esses campos são tratados como uma mensagem Protobuf normal. Tem dois campos com schema
(STRUCT<type_url: STRING, value: BINARY>)
. Por padrão, o campo bináriovalue
não é interpretado de forma alguma. Mas os dados binários podem não ser convenientes na prática para trabalhar em alguns aplicativos. - True: Definir esse valor como True permite converter
Any
campos em cadeias de caracteres JSON em tempo de execução. Com essa opção, o binário é analisado e a mensagem Protobuf é desserializada em uma cadeia de caracteres JSON.
- Falso (padrão): No tempo de execução, esses campos curinga podem conter mensagens arbitrárias do Protobuf como dados binários. Por padrão, esses campos são tratados como uma mensagem Protobuf normal. Tem dois campos com schema
Exemplo: Considere dois tipos de Protobuf definidos da seguinte forma:
message ProtoWithAny { string event_name = 1; google.protobuf.Any details = 2; } message Person { string name = 1; int32 id = 2; }
Com esta opção ativada, a schema para
from_protobuf("col", messageName ="ProtoWithAny")
seria:STRUCT<event_name: STRING, details: STRING>
.Em tempo de execução, se
details
o campo contiverPerson
a mensagem Protobuf, o valor retornado terá esta aparência:('click', '{"@type":"type.googleapis.com/...ProtoWithAny","name":"Mario","id":100}')
.Requisitos:
- As definições para todos os possíveis tipos de Protobuf que são usados em
Any
campos devem estar disponíveis no arquivo descritor Protobuf passado parafrom_protobuf()
. - Se
Any
o Protobuf não for encontrado, isso resultará em um erro para esse registro. - Este recurso não é suportado atualmente com schema-registry.
- As definições para todos os possíveis tipos de Protobuf que são usados em
-
emit.default.values: Permite renderizar campos com valor zero values ao desserializar o Protobuf para uma estrutura Spark. Esta opção deve ser utilizada com moderação. Geralmente, não é aconselhável depender de diferenças semânticas tão finas.
Values
- Falso (padrão): Quando um campo está vazio no Protobuf serializado, o campo resultante na estrutura do Spark é, por padrão, nulo. É mais simples não habilitar essa opção e tratar
null
como o valor padrão. - True: Quando esta opção está ativada, esses campos são preenchidos com a valuespadrão correspondente.
- Falso (padrão): Quando um campo está vazio no Protobuf serializado, o campo resultante na estrutura do Spark é, por padrão, nulo. É mais simples não habilitar essa opção e tratar
Exemplo: Considere o seguinte Protobuf com o Protobuf construído como
Person(age=0, middle_name="")
:syntax = "proto3"; message Person { string name = 1; int64 age = 2; optional string middle_name = 3; optional int64 salary = 4; }
- Com esta opção set definida para False, a estrutura do Spark após chamar
from_protobuf()
seria toda nula:{"name": null, "age": null, "middle_name": "", "salary": null}
. Apesar de dois campos (age
emiddle_name
) terem valuesset, o Protobuf não os inclui no formato wire, uma vez que são valuespadrão . - Com esta opção set definida para True, a estrutura do Spark após a chamada de
from_protobuf()
seria:{"name": "", "age": 0, "middle_name": "", "salary": null}
. O camposalary
permanece nulo, uma vez que é explicitamente declaradooptional
e não é set no registro de entrada.
- Com esta opção set definida para False, a estrutura do Spark após chamar
-
enums.as.ints: Quando ativado, os campos enum no Protobuf são renderizados como campos inteiros no Spark.
Values
- Falso (predefinição)
- True: Quando ativado, os campos enum no Protobuf são renderizados como campos inteiros no Spark.
Exemplo: Considere o seguinte Protobuf:
syntax = "proto3"; message Person { enum Job { NONE = 0; ENGINEER = 1; DOCTOR = 2; NURSE = 3; } Job job = 1; }
Dada uma mensagem Protobuf como
Person(job = ENGINEER)
:- Com esta opção desativada, a estrutura do Spark correspondente seria
{"job": "ENGINEER"}
. - Com essa opção ativada, a estrutura do Spark correspondente seria
{"job": 1}
.
Observe que a schema para esses campos é diferente em cada caso (inteiro em vez de cadeia de caracteres padrão). Esta alteração pode afetar a schema de tablesa jusante.
- Com esta opção desativada, a estrutura do Spark correspondente seria
Schema Opções do Registo
As seguintes opções de registo schema são relevantes ao utilizar o registo schema com funções Protobuf.
-
schema.registo.assunto
- Necessário
- Especifica o assunto para schema no Schema Registo, como "evento-cliente"
-
schema.registry.address
- Necessário
- URL para schema registro, como
https://schema-registry.example.com:8081
-
schema.registry.protobuf.name
- Opcional
- Padrão:
<NONE>
. - Uma entrada schema-registry para um assunto pode conter várias definições de Protobuf, assim como um único arquivo
proto
. Quando esta opção não é especificada, o primeiro Protobuf é usado para o schema. Especifique o nome da mensagem Protobuf quando ela não for a primeira na entrada. Por exemplo, considere uma entrada com duas definições de Protobuf: "Pessoa" e "Local" nessa ordem. Se o fluxo corresponder a "Local" em vez de "Pessoa", set esta opção para "Local" (ou seu nome completo, incluindo o pacote "com.example.protos.Location").
-
schema.registry.schema.evolution.mode
- Padrão: "reiniciar".
- Modos suportados:
- "Reiniciar"
- "nenhuma"
- Esta opção define o modo de evolução schemapara
from_protobuf()
. No início de uma consulta, o Spark regista o id mais recente schemapara determinado assunto. Isso determina a schema parafrom_protobuf()
. Uma nova schema pode ser publicada no registro schema após o início da consulta. Quando um schema-id mais recente é notado em um registro de entrada, isso indica uma alteração no schema. Esta opção determina como essa alteração no schema é tratada:-
reiniciar (padrão): ativa um
UnknownFieldException
quando é detetado um schema-id mais recente. Isso encerra a consulta. A Databricks recomenda configurar trabalhos para recomeçar em caso de falha na consulta para aplicar alterações schema. - nenhuma: Schema-id alterações são ignoradas. Os registos com schema-id mais recente são analisados com o mesmo schema que estava a ser observado no início da consulta. Espera-se que as definições mais recentes do Protobuf sejam compatíveis com versões anteriores e que novos campos sejam ignorados.
-
reiniciar (padrão): ativa um
-
confluentes.schema.registo.
<schema-registy-client-option>
- Opcional
-
Schema-registry se conecta ao Confluent schema-registry usando o cliente Confluent Schema Registry. Todas as opções de configuração suportadas pelo cliente podem ser especificadas com o prefixo "confluent.schema.registry". Por exemplo, as duas configurações a seguir fornecem autenticação "USER_INFO" credentials:
- "Confluente.schema.registry.basic.auth.credentials.source": 'USER_INFO'
- "Confluente.schema.registry.basic.auth.user.info": "
<KEY>
:<SECRET>
"