Partilhar via


Stream do Apache Pulsar

Importante

Esta funcionalidade está em Pré-visualização Pública.

No Databricks Runtime 14.1 e superior, você pode usar o Streaming Estruturado para transmitir dados do Apache Pulsar no Azure Databricks.

O Streaming Estruturado fornece semântica de processamento exatamente uma vez para dados lidos de fontes do Pulsar.

Exemplo de sintaxe

A seguir está um exemplo básico do uso do Structured Streaming para ler do Pulsar:

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .load()

Você sempre deve fornecer uma service.url e uma das seguintes opções para especificar tópicos:

  • topic
  • topics
  • topicsPattern

Para obter uma lista completa de opções, consulte Configurar opções para leitura de streaming do Pulsar.

Autenticar no Pulsar

O Azure Databricks dá suporte à autenticação de armazenamento confiável e keystore para o Pulsar. O Databricks recomenda o uso de segredos ao armazenar detalhes de configuração.

Você pode definir as seguintes opções durante a configuração do fluxo:

  • pulsar.client.authPluginClassName
  • pulsar.client.authParams
  • pulsar.client.useKeyStoreTls
  • pulsar.client.tlsTrustStoreType
  • pulsar.client.tlsTrustStorePath
  • pulsar.client.tlsTrustStorePassword

Se o fluxo usar um PulsarAdmin, defina também o seguinte:

  • pulsar.admin.authPluginClassName
  • pulsar.admin.authParams

O exemplo a seguir demonstra a configuração de opções de autenticação:

val clientAuthParams = dbutils.secrets.get(scope = "pulsar", key = "clientAuthParams")
val clientPw = dbutils.secrets.get(scope = "pulsar", key = "clientPw")

// clientAuthParams is a comma-separated list of key-value pairs, such as:
//"keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .option("startingOffsets", startingOffsets)
  .option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
  .option("pulsar.client.authParams", clientAuthParams)
  .option("pulsar.client.useKeyStoreTls", "true")
  .option("pulsar.client.tlsTrustStoreType", "JKS")
  .option("pulsar.client.tlsTrustStorePath", trustStorePath)
  .option("pulsar.client.tlsTrustStorePassword", clientPw)
  .load()

Esquema pulsar

O esquema de registros lidos do Pulsar depende de como os tópicos têm seus esquemas codificados.

  • Para tópicos com esquema Avro ou JSON, os nomes e tipos de campo são preservados no Spark DataFrame resultante.
  • Para tópicos sem esquema ou com um tipo de dados simples no Pulsar, a carga útil é carregada em uma value coluna.
  • Se o leitor estiver configurado para ler vários tópicos com esquemas diferentes, defina allowDifferentTopicSchemas para carregar o conteúdo bruto em uma value coluna.

Os registros Pulsar têm os seguintes campos de metadados:

Column Tipo
__key binary
__topic string
__messageId binary
__publishTime timestamp
__eventTime timestamp
__messageProperties map<String, String>

Configurar opções para leitura de streaming do Pulsar

Todas as opções são configuradas como parte de uma leitura de Streaming Estruturado usando .option("<optionName>", "<optionValue>") sintaxe. Você também pode configurar a autenticação usando opções. Consulte Autenticar no Pulsar.

A tabela a seguir descreve as configurações necessárias para o Pulsar. Você deve especificar apenas uma das opções topicou topics topicsPattern.

Opção Valor predefinido Description
service.url nenhum A configuração Pulsar serviceUrl para o serviço Pulsar.
topic nenhum Uma cadeia de caracteres de nome de tópico para o tópico consumir.
topics nenhum Uma lista separada por vírgulas dos tópicos a consumir.
topicsPattern nenhum Uma string de regex Java para corresponder em tópicos a serem consumidos.

A tabela a seguir descreve outras opções suportadas para o Pulsar:

Opção Valor predefinido Description
predefinedSubscription nenhum O nome de assinatura predefinido usado pelo conector para acompanhar o progresso do aplicativo spark.
subscriptionPrefix nenhum Um prefixo usado pelo conector para gerar uma assinatura aleatória para acompanhar o progresso do aplicativo spark.
pollTimeoutMs 120000 O tempo limite para ler mensagens do Pulsar em milissegundos.
waitingForNonExistedTopic false Se o conector deve esperar até que os tópicos desejados sejam criados.
failOnDataLoss true Controla se uma consulta deve ser falhada quando os dados são perdidos (por exemplo, tópicos são excluídos ou mensagens são excluídas devido à política de retenção).
allowDifferentTopicSchemas false Se vários tópicos com esquemas diferentes forem lidos, use esse parâmetro para desativar a desserialização automática do valor do tópico baseada em esquema. Somente os valores brutos são retornados quando isso é true.
startingOffsets latest Se latesto , o leitor lê os registos mais recentes depois de começar a ser executado. Se earliest, o leitor lê desde a primeira compensação. O usuário também pode especificar uma cadeia de caracteres JSON que especifica um deslocamento específico.
maxBytesPerTrigger nenhum Um limite suave do número máximo de bytes que queremos processar por microlote. Se isso for especificado, admin.url também precisa ser especificado.
admin.url nenhum A configuração Pulsar serviceHttpUrl . Só é necessário quando maxBytesPerTrigger é especificado.

Você também pode especificar qualquer configuração de cliente, administrador e leitor do Pulsar usando os seguintes padrões:

Padrão Link para opções de configuração
pulsar.client.* Configuração do cliente Pulsar
pulsar.admin.* Configuração de administração do Pulsar
pulsar.reader.* Configuração do leitor de pulsar

Construir deslocamentos iniciais JSON

Você pode construir manualmente um ID de mensagem para especificar um deslocamento específico e passá-lo como um JSON para a startingOffsets opção. O exemplo de código a seguir demonstra essa sintaxe:

import org.apache.spark.sql.pulsar.JsonUtils
import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.client.impl.MessageIdImpl

val topic = "my-topic"
val msgId: MessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex)
val startOffsets = JsonUtils.topicOffsets(Map(topic -> msgId))

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topic", topic)
  .option("startingOffsets", startOffsets)
  .load()