O streaming Delta table lê e grava
O Delta Lake está profundamente integrado com o Spark Structured Streaming através readStream
e writeStream
. O Delta Lake supera muitas das limitações tipicamente associadas a sistemas de transmissão em fluxo e ficheiros, incluindo:
- Coalescência de pequenos arquivos produzidos por ingestão de baixa latência.
- Manter o processamento "exatamente uma vez" com mais de um fluxo (ou trabalhos em lote simultâneos).
- Descobrir com eficiência quais arquivos são novos ao usar arquivos como fonte para um fluxo.
Nota
Este artigo descreve o uso do Delta Lake tables como fontes e destinos de streaming. Para saber como carregar dados usando streaming tables no Databricks SQL, consulte Carregar dados usando streaming tables no Databricks SQL.
Para obter informações sobre junções estáticas de fluxo com o Delta Lake, consulte Junções estáticas de fluxo.
Delta table como fonte
O Streaming estruturado lê incrementalmente o Delta tables. Enquanto uma consulta de streaming está ativa num tableDelta, novos registos são processados de forma idempotente à medida que novas versões de table são confirmadas na fonte table.
Os exemplos de código a seguir mostram a configuração de uma leitura de streaming usando o nome do table ou o caminho do arquivo.
Python
spark.readStream.table("table_name")
spark.readStream.load("/path/to/table")
Scala
spark.readStream.table("table_name")
spark.readStream.load("/path/to/table")
Importante
Se a schema de um table Delta mudar depois que uma leitura de streaming começar em relação ao table, a consulta falhará. Para a maioria das alterações schema, pode reiniciar o stream para resolver a incompatibilidade schema e continuar o processamento.
No Databricks Runtime 12.2 LTS e inferior, não é possível transmitir a partir de um table Delta com mapeamento column ativado que tenha sofrido uma evolução de schema não aditiva, como o renomeamento ou a remoção de columns. Para obter detalhes, consulte Streaming com mapeamento de column e alterações de schema.
Limit taxa de entrada
As seguintes opções estão disponíveis para controlar microlotes:
-
maxFilesPerTrigger
: Quantos novos arquivos devem ser considerados em cada microlote. A predefinição é 1000. -
maxBytesPerTrigger
: Quantidade de dados processados em cada microlote. Esta opção define um *soft max*, o que significa que um lote processa aproximadamente essa quantidade de dados e pode processar mais do que o limit para permitir que a consulta de transmissão avance nos casos em que a menor unidade de entrada é maior do que essa limit. Isso não é set por padrão.
Se você usar maxBytesPerTrigger
em conjunto com maxFilesPerTrigger
, o microlote processará dados até que o maxFilesPerTrigger
ou maxBytesPerTrigger
limit seja alcançado.
Nota
Nos casos em que as transações de table de origem são limpas devido à configuração logRetentionDuration
e a consulta de streaming tenta processar essas versões, por padrão, a consulta falha para evitar a perda de dados. Você pode set a opção failOnDataLoss
false
ignorar dados perdidos e continuar o processamento.
Transmitir um feed de captura de dados de alteração do Lago Delta (CDC)
O Delta Lake alterar o feed de dados registra as alterações em um tableDelta, incluindo atualizações e exclusões. Quando ativado, pode-se transmitir a partir de um feed de dados de alteração e escrever lógica que processe inserções, atualizações e eliminações para o tablesa jusante. Embora a saída dos dados do fluxo de dados de alteração difira um pouco da table Delta que descreve, isso oferece uma solução para propagar alterações incrementais a tables a jusante numa arquitetura de medalhão .
Importante
No Databricks Runtime 12.2 LTS e anteriores, não é possível fazer streaming do feed de dados de alteração para um table Delta com o mapeamento column habilitado que tenha passado por evolução de schema não aditiva, como operações como renomear ou descartar columns. Consulte Streaming com mapeamento de column e alterações schema.
Ignorar atualizações e exclusões
O Streaming Estruturado não manipula entradas que não sejam um acréscimo e lança uma exceção se ocorrerem modificações no table que está sendo usado como fonte. Existem duas estratégias principais para lidar com alterações que não podem ser propagadas automaticamente a jusante:
- Você pode excluir a saída e o ponto de verificação e reiniciar o fluxo desde o início.
- Você pode set uma destas duas opções:
-
ignoreDeletes
: ignore as transações que excluem dados em limites partition. -
skipChangeCommits
: ignore transações que excluem ou modificam registros existentes.skipChangeCommits
subsumesignoreDeletes
.
-
Nota
No Databricks Runtime 12.2 LTS e superior, skipChangeCommits
substitui a configuração ignoreChanges
anterior. No Databricks Runtime 11.3 LTS e inferior, ignoreChanges
é a única opção suportada.
A semântica para ignoreChanges
difere muito de skipChangeCommits
. Com o ignoreChanges
habilitado, os arquivos de dados reescritos no table de origem são reemitidos após uma operação de alteração de dados, como UPDATE
, MERGE INTO
, DELETE
(dentro de partições) ou OVERWRITE
. Muitas vezes, as linhas inalteradas são emitidas juntamente com novas linhas, pelo que os consumidores a jusante têm de conseguir lidar com duplicados. As eliminações não são propagadas a jusante.
ignoreChanges
subsumes ignoreDeletes
.
skipChangeCommits
ignora totalmente as operações de alteração de ficheiros. Os arquivos de dados que são reescritos no table de origem devido à operação de alteração de dados, como UPDATE
, MERGE INTO
, DELETE
e OVERWRITE
são totalmente ignorados. Para refletir as alterações na fonte original upstream tables, deve-se implementar uma lógica distinta para propagar essas alterações.
As cargas de trabalho configuradas com ignoreChanges
continuam a operar usando semântica conhecida, mas o Databricks recomenda o uso skipChangeCommits
para todas as novas cargas de trabalho. A migração de cargas de trabalho usando ignoreChanges
para skipChangeCommits
requer lógica de refatoração.
Exemplo
Por exemplo, suponha que você tenha um tableuser_events
com date
, user_email
e action
columns que é particionado por date
. Você extrai dados do user_events
table e precisa apagá-los devido ao GDPR (Regulamento Geral sobre a Proteção de Dados).
Quando se exclui nos limites partition (ou seja, o WHERE
está em um partitioncolumn), os arquivos já estão segmentados por valor, então a eliminação apenas remove esses arquivos dos metadados. Ao excluir uma partition inteira de dados, você pode usar o seguinte:
spark.readStream
.option("ignoreDeletes", "true")
.table("user_events")
Se você excluir dados em várias partições (neste exemplo, filtrando em user_email
), use a seguinte sintaxe:
spark.readStream
.option("skipChangeCommits", "true")
.table("user_events")
Se você update um user_email
com a instrução UPDATE
, o arquivo que contém o user_email
em questão será reescrito. Use skipChangeCommits
para ignorar os arquivos de dados alterados.
Especificar posição inicial
Você pode usar as opções a seguir para especificar o ponto de partida da fonte de streaming Delta Lake sem processar todo o table.
startingVersion
: A versão Delta Lake para começar. O Databricks recomenda omitir essa opção para a maioria das cargas de trabalho. Quando não set, o fluxo começa a partir da última versão disponível, incluindo um instantâneo completo do table tirado naquele momento.Caso seja especificado, o fluxo lê todas as alterações no Delta table começando pela versão especificada (inclusive). Se a versão especificada não estiver mais disponível, o fluxo não será iniciado. Você pode obter as versões de confirmação a partir do
version
column da saída do comando DESCRIBE HISTORY.Para retornar apenas as alterações mais recentes, especifique
latest
.startingTimestamp
: O carimbo de data/hora a partir do qual começar. Todas as alterações table confirmadas no ou após o carimbo de data/hora (inclusive) são lidas pelo leitor de streaming. Se o carimbo de data/hora fornecido preceder todas as confirmações table, a leitura de streaming começará com o carimbo de data/hora mais antigo disponível. Um dos seguintes:- Uma cadeia de caracteres de carimbo de data/hora. Por exemplo,
"2019-01-01T00:00:00.000Z"
. - Uma cadeia de caracteres de data. Por exemplo,
"2019-01-01"
.
- Uma cadeia de caracteres de carimbo de data/hora. Por exemplo,
Não é possível set as duas opções simultaneamente. Eles entram em vigor somente ao iniciar uma nova consulta de streaming. Se uma consulta de streaming tiver sido iniciada e o progresso tiver sido registrado em seu ponto de verificação, essas opções serão ignoradas.
Importante
Embora se possa iniciar a fonte de streaming a partir de uma versão especificada ou de um carimbo de data e hora, a schema da fonte de streaming é sempre a schema mais recente do Delta table. Você deve garantir que não haja nenhuma alteração schema incompatível ao Delta table após a versão ou carimbo de data/hora especificado. Caso contrário, a fonte de streaming pode retornar resultados incorretos ao ler os dados com um schemaincorreto.
Exemplo
Por exemplo, suponha que tenhas um tableuser_events
. Se você quiser ler as alterações desde a versão 5, use:
spark.readStream
.option("startingVersion", "5")
.table("user_events")
Se quiser ler as alterações desde 2018-10-18, utilize:
spark.readStream
.option("startingTimestamp", "2018-10-18")
.table("user_events")
Processar snapshot inicial sem que os dados sejam descartados
Nota
Esse recurso está disponível no Databricks Runtime 11.3 LTS e superior. Esta funcionalidade está em Pré-visualização Pública.
Ao utilizar um Delta table como fonte de fluxo, a consulta processa primeiro todos os dados presentes no table. Delta table nesta versão é chamado de instantâneo inicial. Por padrão, os arquivos de dados do Delta tablesão processados com base em qual arquivo foi modificado pela última vez. No entanto, a hora da última modificação não representa necessariamente a ordem de tempo do evento de registro.
Numa consulta de streaming com estado e com um watermarkdefinido, o processamento de ficheiros por tempo de modificação pode resultar em registos a serem processados na ordem incorreta. Isto pode levar a que os registos sejam considerados como eventos atrasados pelo watermark.
Você pode evitar o problema de queda de dados ativando a seguinte opção:
- withEventTimeOrder: Se o instantâneo inicial deve ser processado com a ordem de hora do evento.
Com a ordem de tempo do evento habilitada, o intervalo de tempo do evento dos dados iniciais do instantâneo é dividido em intervalos de tempo. Cada microlote processa um bucket filtrando dados dentro do intervalo de tempo. As opções de configuração maxFilesPerTrigger e maxBytesPerTrigger ainda são aplicáveis para controlar o tamanho do microlote, mas apenas de forma aproximada devido à natureza do processamento.
O gráfico abaixo mostra esse processo:
Informações notáveis sobre este recurso:
- O problema de queda de dados só acontece quando o instantâneo Delta inicial de uma consulta de streaming com monitoração de estado é processado na ordem padrão.
- Não é possível alterar
withEventTimeOrder
depois que a consulta de fluxo é iniciada enquanto o instantâneo inicial ainda está sendo processado. Para reiniciar comwithEventTimeOrder
alterado, você precisa excluir o ponto de verificação. - Se você estiver executando uma consulta de fluxo com withEventTimeOrder habilitado, não poderá fazer o downgrade para uma versão DBR que não ofereça suporte a esse recurso até que o processamento inicial do snapshot seja concluído. Se precisar fazer downgrade, aguarde a conclusão do snapshot inicial ou exclua o ponto de verificação e reinicie a consulta.
- Este recurso não é suportado nos seguintes cenários incomuns:
- A hora do evento column é um column gerado e há transformações não-projetivas entre a fonte Delta e watermark.
- Há um watermark que tem mais de uma fonte Delta na consulta de fluxo.
- Com a ordem de tempo do evento ativada, o desempenho do processamento inicial do snapshot Delta pode ser mais lento.
- Cada microlote verifica o instantâneo inicial para filtrar dados dentro do intervalo de tempo de evento correspondente. Para acelerar a ação do filtro, é aconselhável usar uma origem Delta column como hora do evento para que o salto de dados possa ser aplicado (verifique Salto de dados para Delta Lake para saber quando é aplicável). Além disso, o particionamento de table ao longo do tempo de column do evento pode acelerar ainda mais o processamento. Você pode verificar a interface do usuário do Spark para ver quantos arquivos delta são verificados para um microlote específico.
Exemplo
Suponha que você tenha um tableuser_events
com um event_time
column. Sua consulta de streaming é uma consulta de agregação. Se quiser garantir que não haja queda de dados durante o processamento inicial do snapshot, você pode usar:
spark.readStream
.option("withEventTimeOrder", "true")
.table("user_events")
.withWatermark("event_time", "10 seconds")
Nota
Você também pode habilitar isso com a configuração do Spark no cluster, que se aplicará a todas as consultas de streaming: spark.databricks.delta.withEventTimeOrder.enabled true
Delta table como pia
Você também pode gravar dados em um Delta table usando o Structured Streaming. O log de transações permite que o Delta Lake garanta o processamento exatamente uma vez, mesmo quando há outros fluxos ou consultas em lote sendo executados simultaneamente no table.
Nota
A função Delta Lake VACUUM
remove todos os arquivos não gerenciados pelo Delta Lake, mas ignora todos os diretórios que começam com _
. Você pode armazenar pontos de verificação com segurança ao lado de outros dados e metadados para um table Delta usando uma estrutura de diretórios como <table-name>/_checkpoints
.
Métricas
Você pode descobrir o número de bytes e o número de arquivos ainda a serem processados em um processo de consulta de streaming como as numBytesOutstanding
métricas e numFilesOutstanding
. As métricas adicionais incluem:
-
numNewListedFiles
: Número de arquivos Delta Lake que foram listados para calcular a lista de pendências para este lote.-
backlogEndOffset
: A versão table usada para calcular a lista de pendências.
-
Se você estiver executando o fluxo em um bloco de anotações, poderá ver essas métricas na guia Dados brutos no painel de progresso da consulta de streaming:
{
"sources" : [
{
"description" : "DeltaSource[file:/path/to/source]",
"metrics" : {
"numBytesOutstanding" : "3456",
"numFilesOutstanding" : "8"
},
}
]
}
Modo de acréscimo
Por padrão, os fluxos são executados no modo de acréscimo, que adiciona novos registros ao table.
Use o método toTable
ao transmitir para tables, como no exemplo a seguir:
Python
(events.writeStream
.outputMode("append")
.option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
.toTable("events")
)
Scala
events.writeStream
.outputMode("append")
.option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
.toTable("events")
Modo completo
Você também pode usar o Streaming Estruturado para substituir o table inteiro em cada lote. Um exemplo de caso de uso é calcular um resumo usando agregação:
Python
(spark.readStream
.table("events")
.groupBy("customerId")
.count()
.writeStream
.outputMode("complete")
.option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
.toTable("events_by_customer")
)
Scala
spark.readStream
.table("events")
.groupBy("customerId")
.count()
.writeStream
.outputMode("complete")
.option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
.toTable("events_by_customer")
O exemplo anterior atualiza continuamente um table que contém o número agregado de eventos por cliente.
Para aplicativos com requisitos de latência mais brandos, você pode economizar recursos de computação com gatilhos únicos. Use-os para updatetables de agregação resumida em um determinado cronograma, processando apenas novos dados que chegaram desde o último update.
Upsert de consultas de streaming usando foreachBatch
Você pode usar uma combinação de merge
e foreachBatch
para escrever upserts complexos de uma consulta de streaming em um Delta table. Veja Utilizar foreachBatch para escrever em sinks de dados arbitrários.
Esse padrão tem muitas aplicações, incluindo as seguintes:
- Escrever agregados de streaming no Modo Update: É muito mais eficiente do que o Modo Completo.
-
Gravar um fluxo de alterações de banco de dados em um tableDelta: A consulta de mesclagem de para gravar dados de alteração pode ser usada em
foreachBatch
para aplicar continuamente um fluxo de alterações a um tableDelta. -
Gravar um fluxo de dados em table Delta com deduplicação: A consulta apenas de mesclagem insertpara deduplicação pode ser usada em
foreachBatch
para gravar continuamente dados (com duplicatas) em table Delta com deduplicação automática.
Nota
- Certifique-se de que sua
merge
instrução dentroforeachBatch
é idempotente, pois as reinicializações da consulta de streaming podem aplicar a operação no mesmo lote de dados várias vezes. - Quando
merge
é usado noforeachBatch
, a taxa de dados de entrada da consulta de streaming (relatada eStreamingQueryProgress
visível no gráfico de taxa do notebook) pode ser relatada como um múltiplo da taxa real na qual os dados são gerados na fonte. Isto acontece porquemerge
lê os dados de entrada várias vezes, fazendo com que as métricas de entrada sejam multiplicadas. Se for um estrangulamento, pode colocar o DataFrame em cache antesmerge
e, em seguida, retirá-lo da cache depois demerge
.
O exemplo a seguir demonstra como você pode usar o SQL dentro foreachBatch
para realizar essa tarefa:
Scala
// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
// Set the dataframe to view name
microBatchOutputDF.createOrReplaceTempView("updates")
// Use the view name to apply MERGE
// NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe
microBatchOutputDF.sparkSession.sql(s"""
MERGE INTO aggregates t
USING updates s
ON s.key = t.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
}
// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
.foreachBatch(upsertToDelta _)
.outputMode("update")
.start()
Python
# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
# Set the dataframe to view name
microBatchOutputDF.createOrReplaceTempView("updates")
# Use the view name to apply MERGE
# NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe
# In Databricks Runtime 10.5 and below, you must use the following:
# microBatchOutputDF._jdf.sparkSession().sql("""
microBatchOutputDF.sparkSession.sql("""
MERGE INTO aggregates t
USING updates s
ON s.key = t.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
# Write the output of a streaming aggregation query into Delta table
(streamingAggregatesDF.writeStream
.foreachBatch(upsertToDelta)
.outputMode("update")
.start()
)
Você também pode optar por usar as APIs do Delta Lake para executar upserts de streaming, como no exemplo a seguir:
Scala
import io.delta.tables.*
val deltaTable = DeltaTable.forName(spark, "table_name")
// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
deltaTable.as("t")
.merge(
microBatchOutputDF.as("s"),
"s.key = t.key")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
}
// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
.foreachBatch(upsertToDelta _)
.outputMode("update")
.start()
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "table_name")
# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
(deltaTable.alias("t").merge(
microBatchOutputDF.alias("s"),
"s.key = t.key")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
# Write the output of a streaming aggregation query into Delta table
(streamingAggregatesDF.writeStream
.foreachBatch(upsertToDelta)
.outputMode("update")
.start()
)
Idempotent table escreve em foreachBatch
Nota
O Databricks recomenda configurar uma gravação de streaming separada para cada coletor que você deseja update. A utilização de foreachBatch
para escrever em vários tables serializa as escritas, o que reduz a paralelização e aumenta a latência geral.
O Delta tables suporta as seguintes opções de DataFrameWriter
para fazer gravações em vários tables dentro foreachBatch
idempotente:
-
txnAppId
: Uma cadeia de caracteres exclusiva que você pode passar em cada gravação DataFrame. Por exemplo, você pode usar a ID StreamingQuery comotxnAppId
. -
txnVersion
: Um número monotonicamente crescente que atua como versão de transação.
Delta Lake usa a combinação de txnAppId
e txnVersion
para identificar gravações duplicadas e ignorá-las.
Se uma gravação em lote for interrompida com uma falha, a nova execução do lote usará o mesmo aplicativo e ID de lote para ajudar o tempo de execução a identificar corretamente gravações duplicadas e ignorá-las. O ID do aplicativo (txnAppId
) pode ser qualquer cadeia de caracteres exclusiva gerada pelo usuário e não precisa estar relacionado ao ID do fluxo. Veja Utilizar foreachBatch para escrever em sinks de dados arbitrários.
Aviso
Se você excluir o ponto de verificação de streaming e reiniciar a consulta com um novo ponto de verificação, deverá fornecer um txnAppId
arquivo . Novos pontos de verificação começam com um ID de lote de 0
. O Delta Lake usa o ID do lote e o txnAppId
como uma chave exclusiva e ignora os lotes que já têm o values.
O exemplo de código a seguir demonstra esse padrão:
Python
app_id = ... # A unique string that is used as an application ID.
def writeToDeltaLakeTableIdempotent(batch_df, batch_id):
batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 1
batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 2
streamingDF.writeStream.foreachBatch(writeToDeltaLakeTableIdempotent).start()
Scala
val appId = ... // A unique string that is used as an application ID.
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...) // location 1
batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...) // location 2
}