Partilhar via


read_kafka tablefunção -valorada

Aplica-se a:Marque Sim Databricks SQL Marque Sim Databricks Runtime 13.3 LTS e superior

Lê dados de um cluster Apache Kafka e retorna os dados em forma de tabela.

Pode ler dados de um ou mais tópicos de Kafka. Ele suporta consultas em lote e ingestão de streaming.

Sintaxe

read_kafka([option_key => option_value ] [, ...])

Argumentos

Esta função requer invocação de parâmetro nomeado.

  • option_key: O nome da opção a ser configurada. Você deve usar backticks (') para opções que contêm pontos (.).
  • option_value: Uma expressão constante para definir a opção set. Aceita literais e funções escalares.

Devoluções

Registros lidos de um cluster Apache Kafka com os seguintes schema:

  • key BINARY: A chave do disco de Kafka.
  • value BINARY NOT NULL: O valor do registro de Kafka.
  • topic STRING NOT NULL: O nome do tópico Kafka do qual o registro é lido.
  • partition INT NOT NULL: O ID do registo do Kafka partition de onde o registo é lido.
  • offset BIGINT NOT NULL: O número offset do registo em Kafka TopicPartition.
  • timestamp TIMESTAMP NOT NULL: Um valor de carimbo de data/hora para o registro. O timestampTypecolumn define a que corresponde esse carimbo de data/hora.
  • timestampType INTEGER NOT NULL: O tipo do timestamp especificado no timestampcolumn.
  • headers ARRAY<STRUCT<key: STRING, VALUE: BINARY>>: O cabeçalho values fornecido como parte do registro (se habilitado).

Exemplos

-- A batch query to read from a topic.
> SELECT value::string as value
  FROM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events'
  ) LIMIT 10;

-- Streaming ingestion from Kafka with JSON parsing.
> CREATE OR REFRESH STREAMING TABLE catalog.schema.raw_events AS
  SELECT
    value::string:events,                 -- extract the field `events`
    to_timestamp(value::string:ts) as ts  -- extract the field `ts` and cast to timestamp
  FROM STREAM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events'
  );

Opções

Pode encontrar uma lista list detalhada de opções na documentação do Apache Spark.

Opções necessárias

Forneça a opção abaixo para se conectar ao seu cluster Kafka.

Opção
bootstrapServers

Tipo: String

Uma list separada por vírgulas de pares host/porta apontando para o cluster Kafka.

Valor padrão: Nenhum

Forneça apenas uma das opções abaixo para configurar quais tópicos do Kafka extrair dados.

Opção
assign

Tipo: String

Uma cadeia de caracteres JSON que contém as partições de tópico específicas a serem consumidas. Por exemplo, para '{"topicA":[0,1],"topicB":[2,4]}', as partições 0'th e 1st do topicA serão consumidas a partir de.

Valor padrão: Nenhum
subscribe

Tipo: String

Uma lista de tópicos de Kafka separados por vírgulas (list) dos quais ler.

Valor padrão: Nenhum
subscribePattern

Tipo: String

Uma expressão regular que corresponde aos tópicos para assinar.

Valor padrão: Nenhum

Opções diversas

read_kafka pode ser usado em consultas em lote e em consultas de streaming. As opções abaixo especificam a que tipo de consulta se aplicam.

Opção
endingOffsets

Tipo: Tipo de consulta: String apenas lote

Os deslocamentos a serem lidos até em uma consulta em lotes, "latest" para especificar os registos mais recentes, ou uma string JSON especificando um offset final para cada TopicPartition. No JSON, o -1 como um offset pode ser utilizado para referir-se ao mais recente. -2 (mais cedo) na qualidade de offset não é permitido.

Valor predefinido: "latest"
endingOffsetsByTimestamp

Tipo: Tipo de consulta: String apenas lote

Uma cadeia de caracteres JSON especificando um carimbo de data/hora final para leitura até para cada TopicPartition. Os carimbos de data/hora precisam ser fornecidos como um valor longo do carimbo de data/hora em milissegundos desde 1970-01-01 00:00:00 UTC, por exemplo
1686444353000. Veja a nota abaixo sobre detalhes do comportamento com carimbos de data/hora.
endingOffsetsByTimestamp tem precedência sobre endingOffsets.

Valor padrão: Nenhum
endingTimestamp

Tipo: Tipo de consulta: String apenas lote

Um valor de cadeia de caracteres do carimbo de data/hora em milissegundos desde
1970-01-01 00:00:00 UTC, por exemplo "1686444353000". Se Kafka não devolver o offsetcorrespondente, o offset será set o mais recente. Veja a nota abaixo sobre detalhes do comportamento com carimbos de data/hora. Nota: endingTimestamp tem precedência sobre endingOffsetsByTimestamp e
endingOffsets.

Valor padrão: Nenhum
includeHeaders

Tipo: Tipo de consulta: Boolean streaming e lote

Se os cabeçalhos de Kafka devem ser incluídos na linha.

Valor predefinido: false
kafka.<consumer_option>

Tipo: Tipo de consulta: String streaming e lote

Qualquer opção específica do consumidor Kafka pode ser passada com o prefixo kafka. . Essas opções precisam ser cercadas por backticks quando fornecidas, caso contrário, você get um erro de analisador. Você pode encontrar as opções na documentação de Kafka.

Nota: Não deve set as seguintes opções com esta função:
key.deserializer, value.deserializer, bootstrap.servers, group.id

Valor padrão: Nenhum
maxOffsetsPerTrigger

Tipo: Long Tipo de consulta: apenas streaming

Taxa limit no número máximo de compensações ou linhas processadas por intervalo de disparo. O número total especificado de deslocamentos será dividido proporcionalmente em TopicPartitions.

Valor padrão: Nenhum
startingOffsets

Tipo: Tipo de consulta: String streaming e lote

O ponto de partida quando uma consulta é iniciada pode ser "earliest", que é dos primeiros deslocamentos, "latest", que é apenas dos deslocamentos mais recentes, ou uma cadeia de caracteres JSON que especifica um offset inicial para cada TopicPartition. No JSON, -2 como um offset pode ser usado para se referir ao mais antigo, -1 ao mais recente.

Nota: Para consultas em lote, o mais recente (implicitamente ou usando -1 em JSON) não é permitido. Para consultas de streaming, isso só se aplica quando uma nova consulta é iniciada. As consultas de streaming reiniciadas continuarão a partir dos deslocamentos definidos no ponto de verificação da consulta. As partições recém-descobertas durante uma consulta começarão no mínimo.

Valor padrão: "latest" para streaming, "earliest" para lote
startingOffsetsByTimestamp

Tipo: Tipo de consulta: String streaming e lote

Uma cadeia de caracteres JSON especificando um carimbo de data/hora inicial para cada TopicPartition. Os carimbos de data/hora precisam ser fornecidos como um valor longo do carimbo de data/hora em milissegundos desde 1970-01-01 00:00:00 UTC, por exemplo 1686444353000, . Veja a nota abaixo sobre detalhes do comportamento com carimbos de data/hora. Se o Kafka não retornar o offsetcorrespondente, o comportamento seguirá o valor da opção startingOffsetsByTimestampStrategy.
startingOffsetsByTimestamp tem precedência sobre startingOffsets.

Nota: Para consultas de streaming, isso só se aplica quando uma nova consulta é iniciada. As consultas de streaming reiniciadas continuarão a partir dos deslocamentos definidos no ponto de verificação da consulta. As partições recém-descobertas durante uma consulta começarão no mínimo.

Valor padrão: Nenhum
startingOffsetsByTimestampStrategy

Tipo: Tipo de consulta: String streaming e lote

Essa estratégia é usada quando a offset inicial especificada por carimbo de data/hora (global ou por partition) não coincide com a offset retornada por Kafka. As estratégias disponíveis são:

- "error": falhar na consulta
- "latest": atribui as últimas offset a estas partições para que o Spark possa ler registos mais recentes dessas partições em microlotes posteriores.

Valor predefinido: "error"
startingTimestamp

Tipo: Tipo de consulta: String streaming e lote

Um valor de cadeia de caracteres do carimbo de data/hora em milissegundos desde
1970-01-01 00:00:00 UTC, por exemplo "1686444353000". Veja a nota abaixo sobre detalhes do comportamento com carimbos de data/hora. Se Kafka não devolver o offsetcorrespondente, o comportamento será determinado pelo valor da opção startingOffsetsByTimestampStrategy.
startingTimestamp tem precedência sobre startingOffsetsByTimestamp e startingOffsets.

Nota: Para consultas de streaming, isso só se aplica quando uma nova consulta é iniciada. As consultas de streaming reiniciadas continuarão a partir dos deslocamentos definidos no ponto de verificação da consulta. As partições recém-descobertas durante uma consulta serão iniciadas mais cedo.

Valor padrão: Nenhum

Nota

O offset retornado para cada partition é o offset mais antigo cujo carimbo de data/hora é maior ou igual ao carimbo de data/hora dado no partitioncorrespondente. O comportamento varia entre as opções se Kafka não retornar o offset correspondente - verifique a descrição de cada opção.

O Spark simplesmente passa as informações do carimbo de data/hora para KafkaConsumer.offsetsForTimeso , e não interpreta ou raciocina sobre o valor. Para obter mais detalhes sobre KafkaConsumer.offsetsForTimeso , consulte a documentação. Além disso, o significado de carimbo de data/hora aqui pode variar de acordo com a configuração de Kafka (log.message.timestamp.type). Para obter detalhes, consulte a documentação do Apache Kafka.