Referência da linguagem Python Delta Live Tables
Este artigo tem detalhes para a interface de programação Python Delta Live Tables.
Para obter informações sobre a API SQL, consulte a referência da linguagem SQL Delta Live Tables.
Para obter detalhes específicos sobre a configuração do Auto Loader, consulte O que é o Auto Loader?.
Antes de começar
A seguir estão considerações importantes quando você implementa pipelines com a interface Python Delta Live Tables:
- Como o Python
table()
eview()
as funções são invocados várias vezes durante o planejamento e a execução de uma atualização de pipeline, não inclua código em uma dessas funções que possam ter efeitos colaterais (por exemplo, código que modifica dados ou envia um e-mail). Para evitar um comportamento inesperado, suas funções Python que definem conjuntos de dados devem incluir apenas o código necessário para definir a tabela ou exibição. - Para executar operações como o envio de e-mails ou a integração com um serviço de monitoramento externo, particularmente em funções que definem conjuntos de dados, use ganchos de eventos. A implementação dessas operações nas funções que definem seus conjuntos de dados causará um comportamento inesperado.
- O Python
table
eview
as funções devem retornar um DataFrame. Algumas funções que operam em DataFrames não retornam DataFrames e não devem ser usadas. Essas operações incluem funções comocollect()
,count()
,toPandas()
,save()
, esaveAsTable()
. Como as transformações DataFrame são executadas após a resolução do gráfico de fluxo de dados completo, o uso dessas operações pode ter efeitos colaterais não intencionais.
Importar o dlt
módulo Python
As funções Python do dlt
Delta Live Tables são definidas no módulo. Seus pipelines implementados com a API Python devem importar este módulo:
import dlt
Criar uma visualização materializada do Delta Live Tables ou uma tabela de streaming
Em Python, o Delta Live Tables determina se um conjunto de dados deve ser atualizado como uma exibição materializada ou uma tabela de streaming com base na consulta definidora. O @table
decorador pode ser usado para definir vistas materializadas e mesas de streaming.
Para definir uma exibição materializada em Python, aplique @table
a uma consulta que executa uma leitura estática em relação a uma fonte de dados. Para definir uma tabela de streaming, aplique @table
a uma consulta que executa uma leitura de streaming em relação a uma fonte de dados ou use a função create_streaming_table(). Ambos os tipos de conjunto de dados têm a mesma especificação de sintaxe da seguinte maneira:
Nota
Para usar o argumento para habilitar o cluster_by
clustering líquido, seu pipeline deve ser configurado para usar o canal de visualização.
import dlt
@dlt.table(
name="<name>",
comment="<comment>",
spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
path="<storage-location-path>",
partition_cols=["<partition-column>", "<partition-column>"],
cluster_by = ["<clustering-column>", "<clustering-column>"],
schema="schema-definition",
row_filter = "row-filter-clause",
temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)
Criar uma visualização Delta Live Tables
Para definir uma vista em Python, aplique o @view
decorador. Como o @table
decorador, você pode usar visualizações no Delta Live Tables para conjuntos de dados estáticos ou de streaming. A sintaxe a seguir está para definir modos de exibição com Python:
import dlt
@dlt.view(
name="<name>",
comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)
Exemplo: Definir tabelas e modos de exibição
Para definir uma tabela ou visualização em Python, aplique o @dlt.view
ou @dlt.table
decorador a uma função. Você pode usar o nome da função ou o name
parâmetro para atribuir o nome da tabela ou da exibição. O exemplo a seguir define dois conjuntos de dados diferentes: um modo de exibição chamado taxi_raw
que usa um arquivo JSON como a fonte de entrada e uma tabela chamada filtered_data
que usa a taxi_raw
exibição como entrada:
import dlt
@dlt.view
def taxi_raw():
return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")
# Use the function name as the table name
@dlt.table
def filtered_data():
return spark.read.table("LIVE.taxi_raw").where(...)
# Use the name parameter as the table name
@dlt.table(
name="filtered_data")
def create_filtered_data():
return spark.read.table("LIVE.taxi_raw").where(...)
Exemplo: acessar um conjunto de dados definido no mesmo pipeline
Nota
Embora as dlt.read()
funções e dlt.read_stream()
ainda estejam disponíveis e sejam totalmente suportadas pela interface Python do Delta Live Tables, o Databricks recomenda sempre usar as spark.read.table()
funções e spark.readStream.table()
devido ao seguinte:
- As
spark
funções suportam a leitura de conjuntos de dados internos e externos, incluindo conjuntos de dados em armazenamento externo ou definidos em outros pipelines. Asdlt
funções suportam apenas a leitura de conjuntos de dados internos. - As
spark
funções suportam a especificação de opções, comoskipChangeCommits
, para ler operações. A especificação de opções não é suportadadlt
pelas funções.
Para acessar um conjunto de dados definido no mesmo pipeline, use as spark.read.table()
funções ou spark.readStream.table()
, precedendo a LIVE
palavra-chave para o nome do conjunto de dados:
@dlt.table
def customers_raw():
return spark.read.format("csv").load("/data/customers.csv")
@dlt.table
def customers_filteredA():
return spark.read.table("LIVE.customers_raw").where(...)
Exemplo: Ler a partir de uma tabela registada num metastore
Para ler dados de uma tabela registrada no metastore do Hive, no argumento da função, omita a LIVE
palavra-chave e, opcionalmente, qualifique o nome da tabela com o nome do banco de dados:
@dlt.table
def customers():
return spark.read.table("sales.customers").where(...)
Para obter um exemplo de leitura de uma tabela do Catálogo Unity, consulte Ingerir dados em um pipeline do Catálogo Unity.
Exemplo: Acessar um conjunto de dados usando spark.sql
Você também pode retornar um conjunto de dados usando uma spark.sql
expressão em uma função de consulta. Para ler a partir de um conjunto de dados interno, anexe LIVE.
ao nome do conjunto de dados:
@dlt.table
def chicago_customers():
return spark.sql("SELECT * FROM LIVE.customers_cleaned WHERE city = 'Chicago'")
Criar uma tabela para usar como destino de operações de streaming
Use a create_streaming_table()
função para criar uma tabela de destino para a saída de registros por operações de streaming, incluindo registros de saída apply_changes(), apply_changes_from_snapshot()() e @append_flow .
Nota
As create_target_table()
funções e create_streaming_live_table()
foram preteridas. O Databricks recomenda atualizar o código existente para usar a create_streaming_table()
função.
Nota
Para usar o argumento para habilitar o cluster_by
clustering líquido, seu pipeline deve ser configurado para usar o canal de visualização.
create_streaming_table(
name = "<table-name>",
comment = "<comment>"
spark_conf={"<key>" : "<value", "<key" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
partition_cols=["<partition-column>", "<partition-column>"],
cluster_by = ["<clustering-column>", "<clustering-column>"],
path="<storage-location-path>",
schema="schema-definition",
expect_all = {"<key>" : "<value", "<key" : "<value>"},
expect_all_or_drop = {"<key>" : "<value", "<key" : "<value>"},
expect_all_or_fail = {"<key>" : "<value", "<key" : "<value>"},
row_filter = "row-filter-clause"
)
Argumentos |
---|
name Tipo: str O nome da tabela. Este parâmetro é obrigatório. |
comment Tipo: str Uma descrição opcional para a tabela. |
spark_conf Tipo: dict Uma lista opcional de configurações do Spark para a execução desta consulta. |
table_properties Tipo: dict Uma lista opcional de propriedades da tabela para a tabela. |
partition_cols Tipo: array Uma lista opcional de uma ou mais colunas a serem usadas para particionar a tabela. |
cluster_by Tipo: array Opcionalmente, habilite o clustering líquido na tabela e defina as colunas a serem usadas como chaves de clustering. Veja Utilizar clustering líquido para tabelas Delta. |
path Tipo: str Um local de armazenamento opcional para dados de tabela. Se não estiver definido, o padrão do sistema será o local de armazenamento do pipeline. |
schema Tipo: str ou StructType Uma definição de esquema opcional para a tabela. Os esquemas podem ser definidos como uma cadeia de caracteres DDL SQL ou com um Python StructType . |
expect_all expect_all_or_drop expect_all_or_fail Tipo: dict Restrições opcionais de qualidade de dados para a tabela. Veja várias expectativas. |
row_filter (Pré-visualização Pública)Tipo: str Uma cláusula de filtro de linha opcional para a tabela. Consulte Publicar tabelas com filtros de linha e máscaras de coluna. |
Controlar como as tabelas são materializadas
As tabelas também oferecem um controlo adicional da sua materialização:
- Especifique como as tabelas são particionadas usando
partition_cols
o . Você pode usar o particionamento para acelerar as consultas. - Você pode definir as propriedades da tabela ao definir um modo de exibição ou tabela. Consulte Propriedades da tabela Delta Live Tables.
- Defina um local de armazenamento para os dados da tabela usando a
path
configuração. Por padrão, os dados da tabela são armazenados no local de armazenamento do pipeline sepath
não estiverem definidos. - Você pode usar colunas geradas em sua definição de esquema. Consulte Exemplo: Especifique um esquema e colunas de partição.
Nota
Para tabelas com menos de 1 TB de tamanho, o Databricks recomenda permitir que o Delta Live Tables controle a organização dos dados. Você não deve especificar colunas de partição, a menos que espere que sua tabela cresça além de um terabyte.
Exemplo: Especificar um esquema e colunas de partição
Opcionalmente, você pode especificar um esquema de tabela usando uma cadeia de caracteres DDL Python StructType
ou SQL. Quando especificado com uma cadeia de caracteres DDL, a definição pode incluir colunas geradas.
O exemplo a seguir cria uma tabela chamada sales
com um esquema especificado usando um Python StructType
:
sales_schema = StructType([
StructField("customer_id", StringType(), True),
StructField("customer_name", StringType(), True),
StructField("number_of_line_items", StringType(), True),
StructField("order_datetime", StringType(), True),
StructField("order_number", LongType(), True)]
)
@dlt.table(
comment="Raw data on sales",
schema=sales_schema)
def sales():
return ("...")
O exemplo a seguir especifica o esquema para uma tabela usando uma cadeia de caracteres DDL, define uma coluna gerada e define uma coluna de partição:
@dlt.table(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
partition_cols = ["order_day_of_week"])
def sales():
return ("...")
Por padrão, Delta Live Tables infere o esquema da table
definição se você não especificar um esquema.
Configurar uma tabela de streaming para ignorar alterações em uma tabela de streaming de origem
Nota
- O
skipChangeCommits
sinalizador funciona apenas comspark.readStream
o uso daoption()
função. Não é possível usar esse sinalizador em umadlt.read_stream()
função. - Não é possível usar o
skipChangeCommits
sinalizador quando a tabela de streaming de origem é definida como o destino de uma função apply_changes( ).
Por padrão, as tabelas de streaming exigem fontes somente de acréscimo. Quando uma tabela de streaming usa outra tabela de streaming como fonte e a tabela de streaming de origem requer atualizações ou excluições, por exemplo, o processamento do "direito a ser esquecido" do GDPR, o skipChangeCommits
sinalizador pode ser definido ao ler a tabela de streaming de origem para ignorar essas alterações. Para obter mais informações sobre esse sinalizador, consulte Ignorar atualizações e exclusões.
@table
def b():
return spark.readStream.option("skipChangeCommits", "true").table("LIVE.A")
Exemplo: Definir restrições de tabela
Importante
As restrições de tabela estão em Visualização pública.
Ao especificar um esquema, você pode definir chaves primárias e estrangeiras. As restrições são informativas e não são aplicadas. Consulte a cláusula CONSTRAINT na referência da linguagem SQL.
O exemplo a seguir define uma tabela com uma restrição de chave primária e estrangeira:
@dlt.table(
schema="""
customer_id STRING NOT NULL PRIMARY KEY,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
"""
def sales():
return ("...")
Exemplo: Definir um filtro de linha e uma máscara de coluna
Importante
Os filtros de linha e as máscaras de coluna estão na Pré-visualização Pública.
Para criar uma exibição materializada ou uma tabela de streaming com um filtro de linha e uma máscara de coluna, use a cláusula ROW FILTER e a cláusula MASK . O exemplo a seguir demonstra como definir um modo de exibição materializado e uma tabela Streaming com um filtro de linha e uma máscara de coluna:
@dlt.table(
schema="""
id int COMMENT 'This is the customer ID',
name string COMMENT 'This is the customer full name',
region string,
ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
""",
row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)"
def sales():
return ("...")
Para obter mais informações sobre filtros de linha e máscaras de coluna, consulte Publicar tabelas com filtros de linha e máscaras de coluna.
Propriedades Python Delta Live Tables
As tabelas a seguir descrevem as opções e propriedades que você pode especificar ao definir tabelas e exibições com Delta Live Tables:
Nota
Para usar o argumento para habilitar o cluster_by
clustering líquido, seu pipeline deve ser configurado para usar o canal de visualização.
@table ou @view |
---|
name Tipo: str Um nome opcional para a tabela ou exibição. Se não estiver definido, o nome da função será usado como o nome da tabela ou do modo de exibição. |
comment Tipo: str Uma descrição opcional para a tabela. |
spark_conf Tipo: dict Uma lista opcional de configurações do Spark para a execução desta consulta. |
table_properties Tipo: dict Uma lista opcional de propriedades da tabela para a tabela. |
path Tipo: str Um local de armazenamento opcional para dados de tabela. Se não estiver definido, o padrão do sistema será o local de armazenamento do pipeline. |
partition_cols Tipo: a collection of str Uma coleção opcional, por exemplo, uma list de uma ou mais colunas a serem usadas para particionar a tabela. |
cluster_by Tipo: array Opcionalmente, habilite o clustering líquido na tabela e defina as colunas a serem usadas como chaves de clustering. Veja Utilizar clustering líquido para tabelas Delta. |
schema Tipo: str ou StructType Uma definição de esquema opcional para a tabela. Os esquemas podem ser definidos como uma cadeia de caracteres DDL SQL ou com um Python StructType . |
temporary Tipo: bool Crie uma tabela, mas não publique metadados para a tabela. A temporary palavra-chave instrui Delta Live Tables a criar uma tabela que está disponível para o pipeline, mas não deve ser acessada fora do pipeline. Para reduzir o tempo de processamento, uma tabela temporária persiste durante o tempo de vida do pipeline que a cria, e não apenas uma única atualização.O padrão é 'False'. |
row_filter (Pré-visualização Pública)Tipo: str Uma cláusula de filtro de linha opcional para a tabela. Consulte Publicar tabelas com filtros de linha e máscaras de coluna. |
Definição de tabela ou vista |
---|
def <function-name>() Uma função Python que define o conjunto de dados. Se o name parâmetro não estiver definido, será <function-name> usado como o nome do conjunto de dados de destino. |
query Uma instrução Spark SQL que retorna um Spark Dataset ou Koalas DataFrame. Use dlt.read() ou spark.read.table() para executar uma leitura completa de um conjunto de dados definido no mesmo pipeline. Para ler um conjunto de dados externo, use a spark.read.table() função. Não é possível usar dlt.read() para ler conjuntos de dados externos. Como spark.read.table() pode ser usado para ler conjuntos de dados internos, conjuntos de dados definidos fora do pipeline atual e permite especificar opções para ler dados, o dlt.read() Databricks recomenda usá-lo em vez da função.Quando você usa a função para ler de spark.read.table() um conjunto de dados definido no mesmo pipeline, anexe a LIVE palavra-chave ao nome do conjunto de dados no argumento da função. Por exemplo, para ler a partir de um conjunto de dados chamado customers :spark.read.table("LIVE.customers") Você também pode usar a função para ler uma spark.read.table() tabela registrada no metastore omitindo a LIVE palavra-chave e, opcionalmente, qualificando o nome da tabela com o nome do banco de dados:spark.read.table("sales.customers") Use dlt.read_stream() ou spark.readStream.table() para executar uma leitura de streaming de um conjunto de dados definido no mesmo pipeline. Para executar uma leitura de streaming de um conjunto de dados externo, use o botãospark.readStream.table() função. Como spark.readStream.table() pode ser usado para ler conjuntos de dados internos, conjuntos de dados definidos fora do pipeline atual e permite especificar opções para ler dados, o dlt.read_stream() Databricks recomenda usá-lo em vez da função.Para definir uma consulta em uma função Delta Live Tables table usando sintaxe SQL, use a spark.sql função. Consulte Exemplo: acessar um conjunto de dados usando spark.sql. Para definir uma consulta em uma função Delta Live Tables table usando Python, use a sintaxe PySpark . |
Expectativas |
---|
@expect("description", "constraint") Declarar uma restrição de qualidade de dados identificada por description . Se uma linha violar a expectativa, inclua a linha no conjunto de dados de destino. |
@expect_or_drop("description", "constraint") Declarar uma restrição de qualidade de dados identificada por description . Se uma linha violar a expectativa, solte a linha do conjunto de dados de destino. |
@expect_or_fail("description", "constraint") Declarar uma restrição de qualidade de dados identificada por description . Se uma linha violar a expectativa, interrompa imediatamente a execução. |
@expect_all(expectations) Declare uma ou mais restrições de qualidade de dados. expectations é um dicionário Python, onde a chave é a descrição da expectativa e o valor é a restrição de expectativa. Se uma linha violar qualquer uma das expectativas, inclua a linha no conjunto de dados de destino. |
@expect_all_or_drop(expectations) Declare uma ou mais restrições de qualidade de dados. expectations é um dicionário Python, onde a chave é a descrição da expectativa e o valor é a restrição de expectativa. Se uma linha violar qualquer uma das expectativas, solte a linha do conjunto de dados de destino. |
@expect_all_or_fail(expectations) Declare uma ou mais restrições de qualidade de dados. expectations é um dicionário Python, onde a chave é a descrição da expectativa e o valor é a restrição de expectativa. Se uma linha violar qualquer uma das expectativas, interrompa imediatamente a execução. |
Alterar a captura de dados de um feed de alterações com Python no Delta Live Tables
Use a apply_changes()
função na API Python para usar a funcionalidade de captura de dados de alteração (CDC) Delta Live Tables para processar dados de origem de um feed de dados de alteração (CDF).
Importante
Você deve declarar uma tabela de streaming de destino para aplicar as alterações. Opcionalmente, você pode especificar o esquema para sua tabela de destino. Ao especificar o apply_changes()
esquema da tabela de destino, você deve incluir as __START_AT
colunas e __END_AT
com o mesmo tipo de dados que os sequence_by
campos.
Para criar a tabela de destino necessária, você pode usar a função create_streaming_table() na interface Python do Delta Live Tables.
apply_changes(
target = "<target-table>",
source = "<data-source>",
keys = ["key1", "key2", "keyN"],
sequence_by = "<sequence-column>",
ignore_null_updates = False,
apply_as_deletes = None,
apply_as_truncates = None,
column_list = None,
except_column_list = None,
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
)
Nota
Para APPLY CHANGES
processamento, o comportamento padrão para INSERT
eventos e UPDATE
é atualizar eventos CDC da origem: atualizar quaisquer linhas na tabela de destino que correspondam à(s) chave(s) especificada(s) ou inserir uma nova linha quando um registro correspondente não existir na tabela de destino. A manipulação de DELETE
eventos pode ser especificada com a APPLY AS DELETE WHEN
condição.
Para saber mais sobre o processamento CDC com um feed de alterações, consulte The APPLY CHANGES APIs: Simplify change data capture with Delta Live Tables. Para obter um exemplo de uso da apply_changes()
função, consulte Exemplo: processamento de SCD tipo 1 e SCD tipo 2 com dados de origem CDF.
Importante
Você deve declarar uma tabela de streaming de destino para aplicar as alterações. Opcionalmente, você pode especificar o esquema para sua tabela de destino. Ao especificar o esquema da tabela de apply_changes
destino, você deve incluir as __START_AT
colunas e __END_AT
com o mesmo tipo de dados do sequence_by
campo.
Consulte As APIs APPLY CHANGES: Simplifique a captura de dados de alteração com o Delta Live Tables.
Argumentos |
---|
target Tipo: str O nome da tabela a ser atualizada. Você pode usar a função create_streaming_table() para criar a tabela de destino antes de executar a apply_changes() função.Este parâmetro é obrigatório. |
source Tipo: str A fonte de dados que contém os registros do CDC. Este parâmetro é obrigatório. |
keys Tipo: list A coluna ou combinação de colunas que identifica exclusivamente uma linha nos dados de origem. Isso é usado para identificar quais eventos CDC se aplicam a registros específicos na tabela de destino. Você pode especificar: - Uma lista de strings: ["userId", "orderId"] - Uma lista de funções do Spark SQL col() : [col("userId"), col("orderId"] Os argumentos para col() funções não podem incluir qualificadores. Por exemplo, você pode usar col(userId) , mas não pode usar col(source.userId) .Este parâmetro é obrigatório. |
sequence_by Tipo: str ou col() O nome da coluna que especifica a ordem lógica dos eventos CDC nos dados de origem. O Delta Live Tables usa esse sequenciamento para manipular eventos de alteração que chegam fora de ordem. Você pode especificar: - Uma corda: "sequenceNum" - Uma função Spark SQL col() : col("sequenceNum") Os argumentos para col() funções não podem incluir qualificadores. Por exemplo, você pode usar col(userId) , mas não pode usar col(source.userId) .A coluna especificada deve ser um tipo de dados classificável. Este parâmetro é obrigatório. |
ignore_null_updates Tipo: bool Permitir a ingestão de atualizações contendo um subconjunto das colunas de destino. Quando um evento CDC corresponde a uma linha existente e ignore_null_updates é True , as colunas com um null mantêm seus valores existentes no destino. Isso também se aplica a colunas aninhadas com um valor de null . Quando ignore_null_updates é False , os valores existentes são substituídos por null valores.Este parâmetro é opcional. A predefinição é False . |
apply_as_deletes Tipo: str ou expr() Especifica quando um evento CDC deve ser tratado como um DELETE upsert em vez de um upsert. Para lidar com dados fora de ordem, a linha excluída é temporariamente mantida como uma marca de exclusão na tabela Delta subjacente e uma exibição é criada no metastore que filtra essas lápides. O intervalo de retenção pode ser configurado com opipelines.cdc.tombstoneGCThresholdInSeconds propriedade table.Você pode especificar: - Uma corda: "Operation = 'DELETE'" - Uma função Spark SQL expr() : expr("Operation = 'DELETE'") Este parâmetro é opcional. |
apply_as_truncates Tipo: str ou expr() Especifica quando um evento CDC deve ser tratado como uma tabela TRUNCATE completa. Como essa cláusula aciona um truncado completo da tabela de destino, ela deve ser usada apenas para casos de uso específicos que exijam essa funcionalidade.O apply_as_truncates parâmetro é suportado apenas para SCD tipo 1. SCD tipo 2 não suporta operações truncadas.Você pode especificar: - Uma corda: "Operation = 'TRUNCATE'" - Uma função Spark SQL expr() : expr("Operation = 'TRUNCATE'") Este parâmetro é opcional. |
column_list except_column_list Tipo: list Um subconjunto de colunas a serem incluídas na tabela de destino. Use column_list para especificar a lista completa de colunas a serem incluídas. Use except_column_list para especificar as colunas a serem excluídas. Você pode declarar qualquer valor como uma lista de cadeias de caracteres ou como funções do Spark SQL col() :- column_list = ["userId", "name", "city"] .- column_list = [col("userId"), col("name"), col("city")] - except_column_list = ["operation", "sequenceNum"] - except_column_list = [col("operation"), col("sequenceNum") Os argumentos para col() funções não podem incluir qualificadores. Por exemplo, você pode usar col(userId) , mas não pode usar col(source.userId) .Este parâmetro é opcional. O padrão é incluir todas as colunas na tabela de destino quando nenhum column_list argumento ou except_column_list é passado para a função. |
stored_as_scd_type Tipo: str ou int Se deseja armazenar registros como SCD tipo 1 ou SCD tipo 2. Definido como 1 para SCD tipo 1 ou 2 para SCD tipo 2.Esta cláusula é opcional. O padrão é SCD tipo 1. |
track_history_column_list track_history_except_column_list Tipo: list Um subconjunto de colunas de saída a serem rastreadas para o histórico na tabela de destino. Use track_history_column_list para especificar a lista completa de colunas a serem rastreadas. Utilizartrack_history_except_column_list para especificar as colunas a serem excluídas do acompanhamento. Você pode declarar qualquer valor como uma lista de cadeias de caracteres ou como funções do Spark SQL col() :- track_history_column_list = ["userId", "name", "city"] .- track_history_column_list = [col("userId"), col("name"), col("city")] - track_history_except_column_list = ["operation", "sequenceNum"] - track_history_except_column_list = [col("operation"), col("sequenceNum") Os argumentos para col() funções não podem incluir qualificadores. Por exemplo, você pode usar col(userId) , mas não pode usar col(source.userId) .Este parâmetro é opcional. O padrão é incluir todas as colunas na tabela de destino quando não track_history_column_list outrack_history_except_column_list é passado para a função. |
Alterar a captura de dados de instantâneos de banco de dados com Python em Delta Live Tables
Importante
A APPLY CHANGES FROM SNAPSHOT
API está em Visualização Pública.
Use a apply_changes_from_snapshot()
função na API Python para usar a funcionalidade de captura de dados de alteração (CDC) do Delta Live Tables para processar dados de origem de instantâneos de banco de dados.
Importante
Você deve declarar uma tabela de streaming de destino para aplicar as alterações. Opcionalmente, você pode especificar o esquema para sua tabela de destino. Ao especificar o apply_changes_from_snapshot()
esquema da tabela de destino, você também deve incluir as __START_AT
colunas e __END_AT
com o mesmo tipo de dados que o sequence_by
campo.
Para criar a tabela de destino necessária, você pode usar a função create_streaming_table() na interface Python do Delta Live Tables.
apply_changes_from_snapshot(
target = "<target-table>",
source = Any,
keys = ["key1", "key2", "keyN"],
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
) -> None
Nota
Para APPLY CHANGES FROM SNAPSHOT
processamento, o comportamento padrão é inserir uma nova linha quando um registro correspondente com a(s) mesma(s) chave(s) não existe no destino. Se existir um registro correspondente, ele será atualizado somente se algum dos valores na linha tiver sido alterado. As linhas com chaves presentes no destino, mas não mais presentes na origem, são excluídas.
Para saber mais sobre o processamento CDC com snapshots, consulte The APPLY CHANGES APIs: Simplify change data capture with Delta Live Tables. Para obter exemplos de uso da apply_changes_from_snapshot()
função, consulte os exemplos de ingestão periódica de instantâneo e ingestão de instantâneo histórico.
Argumentos |
---|
target Tipo: str O nome da tabela a ser atualizada. Você pode usar a função create_streaming_table() para criar a tabela de destino antes de executar a apply_changes() função.Este parâmetro é obrigatório. |
source Tipo: str ou lambda function O nome de uma tabela ou exibição para snapshot periodicamente ou uma função lambda Python que retorna o snapshot DataFrame a ser processado e a versão snapshot. Consulte Implementar o argumento de origem. Este parâmetro é obrigatório. |
keys Tipo: list A coluna ou combinação de colunas que identifica exclusivamente uma linha nos dados de origem. Isso é usado para identificar quais eventos CDC se aplicam a registros específicos na tabela de destino. Você pode especificar: - Uma lista de strings: ["userId", "orderId"] - Uma lista de funções do Spark SQL col() : [col("userId"), col("orderId"] Os argumentos para col() funções não podem incluir qualificadores. Por exemplo, você pode usar col(userId) , mas não pode usar col(source.userId) .Este parâmetro é obrigatório. |
stored_as_scd_type Tipo: str ou int Se deseja armazenar registros como SCD tipo 1 ou SCD tipo 2. Definido como 1 para SCD tipo 1 ou 2 para SCD tipo 2.Esta cláusula é opcional. O padrão é SCD tipo 1. |
track_history_column_list track_history_except_column_list Tipo: list Um subconjunto de colunas de saída a serem rastreadas para o histórico na tabela de destino. Use track_history_column_list para especificar a lista completa de colunas a serem rastreadas. Utilizartrack_history_except_column_list para especificar as colunas a serem excluídas do acompanhamento. Você pode declarar qualquer valor como uma lista de cadeias de caracteres ou como funções do Spark SQL col() :- track_history_column_list = ["userId", "name", "city"] .- track_history_column_list = [col("userId"), col("name"), col("city")] - track_history_except_column_list = ["operation", "sequenceNum"] - track_history_except_column_list = [col("operation"), col("sequenceNum") Os argumentos para col() funções não podem incluir qualificadores. Por exemplo, você pode usar col(userId) , mas não pode usar col(source.userId) .Este parâmetro é opcional. O padrão é incluir todas as colunas na tabela de destino quando não track_history_column_list outrack_history_except_column_list é passado para a função. |
Implementar o source
argumento
A apply_changes_from_snapshot()
função inclui o source
argumento. Para processar snapshots históricos, espera-se que o source
argumento seja uma função lambda Python que retorna dois valores para a apply_changes_from_snapshot()
função: um Python DataFrame contendo os dados de snapshot a serem processados e uma versão de snapshot.
A assinatura da função lambda é a seguinte:
lambda Any => Optional[(DataFrame, Any)]
- O argumento para a função lambda é a versão de snapshot processada mais recentemente.
- O valor de retorno da função lambda é
None
ou uma tupla de dois valores: O primeiro valor da tupla é um DataFrame contendo o instantâneo a ser processado. O segundo valor da tupla é a versão do instantâneo que representa a ordem lógica do instantâneo.
Um exemplo que implementa e chama a função lambda:
def next_snapshot_and_version(latest_snapshot_version):
if latest_snapshot_version is None:
return (spark.read.load("filename.csv"), 1)
else:
return None
apply_changes_from_snapshot(
# ...
source = next_snapshot_and_version,
# ...
)
O tempo de execução do Delta Live Tables executa as seguintes etapas sempre que o pipeline que contém a apply_changes_from_snapshot()
função é acionado:
- Executa a
next_snapshot_and_version
função para carregar o próximo snapshot DataFrame e a versão de snapshot correspondente. - Se nenhum DataFrame retornar, a execução será encerrada e a atualização do pipeline será marcada como concluída.
- Deteta as alterações no novo instantâneo e as aplica incrementalmente à tabela de destino.
- Retorna à etapa #1 para carregar o próximo instantâneo e sua versão.
Limitações
A interface Python do Delta Live Tables tem a seguinte limitação:
A pivot()
função não é suportada. A pivot
operação no Spark requer o carregamento ansioso de dados de entrada para calcular o esquema de saída. Esse recurso não é suportado no Delta Live Tables.