read_kafka
tablefunção -valorada
Aplica-se a: Databricks SQL Databricks Runtime 13.3 LTS e superior
Lê dados de um cluster Apache Kafka e retorna os dados em forma de tabela.
Pode ler dados de um ou mais tópicos de Kafka. Ele suporta consultas em lote e ingestão de streaming.
Sintaxe
read_kafka([option_key => option_value ] [, ...])
Argumentos
Esta função requer invocação de parâmetro nomeado.
-
option_key
: O nome da opção a ser configurada. Você deve usar backticks (') para opções que contêm pontos (.
). -
option_value
: Uma expressão constante para definir a opção set. Aceita literais e funções escalares.
Devoluções
Registros lidos de um cluster Apache Kafka com os seguintes schema:
-
key BINARY
: A chave do disco de Kafka. -
value BINARY NOT NULL
: O valor do registro de Kafka. -
topic STRING NOT NULL
: O nome do tópico Kafka do qual o registro é lido. -
partition INT NOT NULL
: O ID do registo do Kafka partition de onde o registo é lido. -
offset BIGINT NOT NULL
: O número offset do registo em KafkaTopicPartition
. -
timestamp TIMESTAMP NOT NULL
: Um valor de carimbo de data/hora para o registro. OtimestampType
column define a que corresponde esse carimbo de data/hora. -
timestampType INTEGER NOT NULL
: O tipo do timestamp especificado notimestamp
column. -
headers ARRAY<STRUCT<key: STRING, VALUE: BINARY>>
: O cabeçalho values fornecido como parte do registro (se habilitado).
Exemplos
-- A batch query to read from a topic.
> SELECT value::string as value
FROM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'events'
) LIMIT 10;
-- Streaming ingestion from Kafka with JSON parsing.
> CREATE OR REFRESH STREAMING TABLE catalog.schema.raw_events AS
SELECT
value::string:events, -- extract the field `events`
to_timestamp(value::string:ts) as ts -- extract the field `ts` and cast to timestamp
FROM STREAM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'events'
);
Opções
Pode encontrar uma lista list detalhada de opções na documentação do Apache Spark.
Opções necessárias
Forneça a opção abaixo para se conectar ao seu cluster Kafka.
Opção |
---|
bootstrapServers Tipo: String Uma list separada por vírgulas de pares host/porta apontando para o cluster Kafka. Valor padrão: Nenhum |
Forneça apenas uma das opções abaixo para configurar quais tópicos do Kafka extrair dados.
Opção |
---|
assign Tipo: String Uma cadeia de caracteres JSON que contém as partições de tópico específicas a serem consumidas. Por exemplo, para '{"topicA":[0,1],"topicB":[2,4]}' , as partições 0'th e 1st do topicA serão consumidas a partir de.Valor padrão: Nenhum |
subscribe Tipo: String Uma lista de tópicos de Kafka separados por vírgulas (list) dos quais ler. Valor padrão: Nenhum |
subscribePattern Tipo: String Uma expressão regular que corresponde aos tópicos para assinar. Valor padrão: Nenhum |
Opções diversas
read_kafka
pode ser usado em consultas em lote e em consultas de streaming. As opções abaixo especificam a que tipo de consulta se aplicam.
Opção |
---|
endingOffsets Tipo: Tipo de consulta: String apenas loteOs deslocamentos a serem lidos até em uma consulta em lotes, "latest" para especificar os registos mais recentes, ou uma string JSON especificando um offset final para cada TopicPartition. No JSON, o -1 como um offset pode ser utilizado para referir-se ao mais recente.
-2 (mais cedo) na qualidade de offset não é permitido.Valor predefinido: "latest" |
endingOffsetsByTimestamp Tipo: Tipo de consulta: String apenas loteUma cadeia de caracteres JSON especificando um carimbo de data/hora final para leitura até para cada TopicPartition. Os carimbos de data/hora precisam ser fornecidos como um valor longo do carimbo de data/hora em milissegundos desde 1970-01-01 00:00:00 UTC , por exemplo1686444353000 . Veja a nota abaixo sobre detalhes do comportamento com carimbos de data/hora.endingOffsetsByTimestamp tem precedência sobre endingOffsets .Valor padrão: Nenhum |
endingTimestamp Tipo: Tipo de consulta: String apenas loteUm valor de cadeia de caracteres do carimbo de data/hora em milissegundos desde 1970-01-01 00:00:00 UTC , por exemplo "1686444353000" . Se Kafka não devolver o offsetcorrespondente, o offset será set o mais recente. Veja a nota abaixo sobre detalhes do comportamento com carimbos de data/hora. Nota: endingTimestamp tem precedência sobre endingOffsetsByTimestamp eendingOffsets .Valor padrão: Nenhum |
includeHeaders Tipo: Tipo de consulta: Boolean streaming e loteSe os cabeçalhos de Kafka devem ser incluídos na linha. Valor predefinido: false |
kafka.<consumer_option> Tipo: Tipo de consulta: String streaming e loteQualquer opção específica do consumidor Kafka pode ser passada com o prefixo kafka. . Essas opções precisam ser cercadas por backticks quando fornecidas, caso contrário, você get um erro de analisador. Você pode encontrar as opções na documentação de Kafka.Nota: Não deve set as seguintes opções com esta função: key.deserializer , value.deserializer , bootstrap.servers , group.id Valor padrão: Nenhum |
maxOffsetsPerTrigger Tipo: Long Tipo de consulta: apenas streamingTaxa limit no número máximo de compensações ou linhas processadas por intervalo de disparo. O número total especificado de deslocamentos será dividido proporcionalmente em TopicPartitions. Valor padrão: Nenhum |
startingOffsets Tipo: Tipo de consulta: String streaming e loteO ponto de partida quando uma consulta é iniciada pode ser "earliest" , que é dos primeiros deslocamentos, "latest" , que é apenas dos deslocamentos mais recentes, ou uma cadeia de caracteres JSON que especifica um offset inicial para cada TopicPartition. No JSON, -2 como um offset pode ser usado para se referir ao mais antigo, -1 ao mais recente.Nota: Para consultas em lote, o mais recente (implicitamente ou usando -1 em JSON) não é permitido. Para consultas de streaming, isso só se aplica quando uma nova consulta é iniciada. As consultas de streaming reiniciadas continuarão a partir dos deslocamentos definidos no ponto de verificação da consulta. As partições recém-descobertas durante uma consulta começarão no mínimo. Valor padrão: "latest" para streaming, "earliest" para lote |
startingOffsetsByTimestamp Tipo: Tipo de consulta: String streaming e loteUma cadeia de caracteres JSON especificando um carimbo de data/hora inicial para cada TopicPartition. Os carimbos de data/hora precisam ser fornecidos como um valor longo do carimbo de data/hora em milissegundos desde 1970-01-01 00:00:00 UTC , por exemplo 1686444353000 , . Veja a nota abaixo sobre detalhes do comportamento com carimbos de data/hora. Se o Kafka não retornar o offsetcorrespondente, o comportamento seguirá o valor da opção startingOffsetsByTimestampStrategy .startingOffsetsByTimestamp tem precedência sobre startingOffsets .Nota: Para consultas de streaming, isso só se aplica quando uma nova consulta é iniciada. As consultas de streaming reiniciadas continuarão a partir dos deslocamentos definidos no ponto de verificação da consulta. As partições recém-descobertas durante uma consulta começarão no mínimo. Valor padrão: Nenhum |
startingOffsetsByTimestampStrategy Tipo: Tipo de consulta: String streaming e loteEssa estratégia é usada quando a offset inicial especificada por carimbo de data/hora (global ou por partition) não coincide com a offset retornada por Kafka. As estratégias disponíveis são: - "error" : falhar na consulta- "latest" : atribui as últimas offset a estas partições para que o Spark possa ler registos mais recentes dessas partições em microlotes posteriores.Valor predefinido: "error" |
startingTimestamp Tipo: Tipo de consulta: String streaming e loteUm valor de cadeia de caracteres do carimbo de data/hora em milissegundos desde 1970-01-01 00:00:00 UTC , por exemplo "1686444353000" . Veja a nota abaixo sobre detalhes do comportamento com carimbos de data/hora. Se Kafka não devolver o offsetcorrespondente, o comportamento será determinado pelo valor da opção startingOffsetsByTimestampStrategy .startingTimestamp tem precedência sobre startingOffsetsByTimestamp e startingOffsets .Nota: Para consultas de streaming, isso só se aplica quando uma nova consulta é iniciada. As consultas de streaming reiniciadas continuarão a partir dos deslocamentos definidos no ponto de verificação da consulta. As partições recém-descobertas durante uma consulta serão iniciadas mais cedo. Valor padrão: Nenhum |
Nota
O offset retornado para cada partition é o offset mais antigo cujo carimbo de data/hora é maior ou igual ao carimbo de data/hora dado no partitioncorrespondente. O comportamento varia entre as opções se Kafka não retornar o offset correspondente - verifique a descrição de cada opção.
O Spark simplesmente passa as informações do carimbo de data/hora para KafkaConsumer.offsetsForTimes
o , e não interpreta ou raciocina sobre o valor. Para obter mais detalhes sobre KafkaConsumer.offsetsForTimes
o , consulte a documentação. Além disso, o significado de carimbo de data/hora aqui pode variar de acordo com a configuração de Kafka (log.message.timestamp.type
). Para obter detalhes, consulte a documentação do Apache Kafka.