Partilhar via


Referência da linguagem Python Delta Live Tables

Este artigo tem detalhes para a interface de programação Python Delta Live Tables.

Para obter informações sobre a API SQL, consulte a referência da linguagem SQL Delta Live Tables.

Para obter detalhes específicos sobre a configuração do Auto Loader, consulte O que é o Auto Loader?.

Antes de começar

A seguir estão considerações importantes quando você implementa pipelines com a interface Python Delta Live Tables:

  • Como o Python table() e view() as funções são invocados várias vezes durante o planejamento e a execução de uma atualização de pipeline, não inclua código em uma dessas funções que possam ter efeitos colaterais (por exemplo, código que modifica dados ou envia um e-mail). Para evitar um comportamento inesperado, suas funções Python que definem conjuntos de dados devem incluir apenas o código necessário para definir a tabela ou exibição.
  • Para executar operações como o envio de e-mails ou a integração com um serviço de monitoramento externo, particularmente em funções que definem conjuntos de dados, use ganchos de eventos. A implementação dessas operações nas funções que definem seus conjuntos de dados causará um comportamento inesperado.
  • O Python table e view as funções devem retornar um DataFrame. Algumas funções que operam em DataFrames não retornam DataFrames e não devem ser usadas. Essas operações incluem funções como collect(), count(), toPandas(), save(), e saveAsTable(). Como as transformações DataFrame são executadas após a resolução do gráfico de fluxo de dados completo, o uso dessas operações pode ter efeitos colaterais não intencionais.

Importar o dlt módulo Python

As funções Python do dlt Delta Live Tables são definidas no módulo. Seus pipelines implementados com a API Python devem importar este módulo:

import dlt

Criar uma visualização materializada do Delta Live Tables ou uma tabela de streaming

Em Python, o Delta Live Tables determina se um conjunto de dados deve ser atualizado como uma exibição materializada ou uma tabela de streaming com base na consulta definidora. O @table decorador pode ser usado para definir vistas materializadas e mesas de streaming.

Para definir uma exibição materializada em Python, aplique @table a uma consulta que executa uma leitura estática em relação a uma fonte de dados. Para definir uma tabela de streaming, aplique @table a uma consulta que executa uma leitura de streaming em relação a uma fonte de dados ou use a função create_streaming_table(). Ambos os tipos de conjunto de dados têm a mesma especificação de sintaxe da seguinte maneira:

Nota

Para usar o argumento para habilitar o cluster_by clustering líquido, seu pipeline deve ser configurado para usar o canal de visualização.

import dlt

@dlt.table(
  name="<name>",
  comment="<comment>",
  spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  path="<storage-location-path>",
  partition_cols=["<partition-column>", "<partition-column>"],
  cluster_by = ["<clustering-column>", "<clustering-column>"],
  schema="schema-definition",
  row_filter = "row-filter-clause",
  temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

Criar uma visualização Delta Live Tables

Para definir uma vista em Python, aplique o @view decorador. Como o @table decorador, você pode usar visualizações no Delta Live Tables para conjuntos de dados estáticos ou de streaming. A sintaxe a seguir está para definir modos de exibição com Python:

import dlt

@dlt.view(
  name="<name>",
  comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

Exemplo: Definir tabelas e modos de exibição

Para definir uma tabela ou visualização em Python, aplique o @dlt.view ou @dlt.table decorador a uma função. Você pode usar o nome da função ou o name parâmetro para atribuir o nome da tabela ou da exibição. O exemplo a seguir define dois conjuntos de dados diferentes: um modo de exibição chamado taxi_raw que usa um arquivo JSON como a fonte de entrada e uma tabela chamada filtered_data que usa a taxi_raw exibição como entrada:

import dlt

@dlt.view
def taxi_raw():
  return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")

# Use the function name as the table name
@dlt.table
def filtered_data():
  return spark.read.table("LIVE.taxi_raw").where(...)

# Use the name parameter as the table name
@dlt.table(
  name="filtered_data")
def create_filtered_data():
  return spark.read.table("LIVE.taxi_raw").where(...)

Exemplo: acessar um conjunto de dados definido no mesmo pipeline

Nota

Embora as dlt.read() funções e dlt.read_stream() ainda estejam disponíveis e sejam totalmente suportadas pela interface Python do Delta Live Tables, o Databricks recomenda sempre usar as spark.read.table() funções e spark.readStream.table() devido ao seguinte:

  • As spark funções suportam a leitura de conjuntos de dados internos e externos, incluindo conjuntos de dados em armazenamento externo ou definidos em outros pipelines. As dlt funções suportam apenas a leitura de conjuntos de dados internos.
  • As spark funções suportam a especificação de opções, como skipChangeCommits, para ler operações. A especificação de opções não é suportada dlt pelas funções.

Para acessar um conjunto de dados definido no mesmo pipeline, use as spark.read.table() funções ou spark.readStream.table() , precedendo a LIVE palavra-chave para o nome do conjunto de dados:

@dlt.table
def customers_raw():
  return spark.read.format("csv").load("/data/customers.csv")

@dlt.table
def customers_filteredA():
  return spark.read.table("LIVE.customers_raw").where(...)

Exemplo: Ler a partir de uma tabela registada num metastore

Para ler dados de uma tabela registrada no metastore do Hive, no argumento da função, omita a LIVE palavra-chave e, opcionalmente, qualifique o nome da tabela com o nome do banco de dados:

@dlt.table
def customers():
  return spark.read.table("sales.customers").where(...)

Para obter um exemplo de leitura de uma tabela do Catálogo Unity, consulte Ingerir dados em um pipeline do Catálogo Unity.

Exemplo: Acessar um conjunto de dados usando spark.sql

Você também pode retornar um conjunto de dados usando uma spark.sql expressão em uma função de consulta. Para ler a partir de um conjunto de dados interno, anexe LIVE. ao nome do conjunto de dados:

@dlt.table
def chicago_customers():
  return spark.sql("SELECT * FROM LIVE.customers_cleaned WHERE city = 'Chicago'")

Criar uma tabela para usar como destino de operações de streaming

Use a create_streaming_table() função para criar uma tabela de destino para a saída de registros por operações de streaming, incluindo registros de saída apply_changes(), apply_changes_from_snapshot()() e @append_flow .

Nota

As create_target_table() funções e create_streaming_live_table() foram preteridas. O Databricks recomenda atualizar o código existente para usar a create_streaming_table() função.

Nota

Para usar o argumento para habilitar o cluster_by clustering líquido, seu pipeline deve ser configurado para usar o canal de visualização.

create_streaming_table(
  name = "<table-name>",
  comment = "<comment>"
  spark_conf={"<key>" : "<value", "<key" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  partition_cols=["<partition-column>", "<partition-column>"],
  cluster_by = ["<clustering-column>", "<clustering-column>"],
  path="<storage-location-path>",
  schema="schema-definition",
  expect_all = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_drop = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_fail = {"<key>" : "<value", "<key" : "<value>"},
  row_filter = "row-filter-clause"
)
Argumentos
name

Tipo: str

O nome da tabela.

Este parâmetro é obrigatório.
comment

Tipo: str

Uma descrição opcional para a tabela.
spark_conf

Tipo: dict

Uma lista opcional de configurações do Spark para a execução desta consulta.
table_properties

Tipo: dict

Uma lista opcional de propriedades da tabela para a tabela.
partition_cols

Tipo: array

Uma lista opcional de uma ou mais colunas a serem usadas para particionar a tabela.
cluster_by

Tipo: array

Opcionalmente, habilite o clustering líquido na tabela e defina as colunas a serem usadas como chaves de clustering.

Veja Utilizar clustering líquido para tabelas Delta.
path

Tipo: str

Um local de armazenamento opcional para dados de tabela. Se não estiver definido, o padrão do sistema será o local de armazenamento do pipeline.
schema

Tipo: str ou StructType

Uma definição de esquema opcional para a tabela. Os esquemas podem ser definidos como uma cadeia de caracteres DDL SQL ou com um Python
StructType.
expect_all
expect_all_or_drop
expect_all_or_fail

Tipo: dict

Restrições opcionais de qualidade de dados para a tabela. Veja várias expectativas.
row_filter (Pré-visualização Pública)

Tipo: str

Uma cláusula de filtro de linha opcional para a tabela. Consulte Publicar tabelas com filtros de linha e máscaras de coluna.

Controlar como as tabelas são materializadas

As tabelas também oferecem um controlo adicional da sua materialização:

  • Especifique como as tabelas são particionadas usando partition_colso . Você pode usar o particionamento para acelerar as consultas.
  • Você pode definir as propriedades da tabela ao definir um modo de exibição ou tabela. Consulte Propriedades da tabela Delta Live Tables.
  • Defina um local de armazenamento para os dados da tabela usando a path configuração. Por padrão, os dados da tabela são armazenados no local de armazenamento do pipeline se path não estiverem definidos.
  • Você pode usar colunas geradas em sua definição de esquema. Consulte Exemplo: Especifique um esquema e colunas de partição.

Nota

Para tabelas com menos de 1 TB de tamanho, o Databricks recomenda permitir que o Delta Live Tables controle a organização dos dados. Você não deve especificar colunas de partição, a menos que espere que sua tabela cresça além de um terabyte.

Exemplo: Especificar um esquema e colunas de partição

Opcionalmente, você pode especificar um esquema de tabela usando uma cadeia de caracteres DDL Python StructType ou SQL. Quando especificado com uma cadeia de caracteres DDL, a definição pode incluir colunas geradas.

O exemplo a seguir cria uma tabela chamada sales com um esquema especificado usando um Python StructType:

sales_schema = StructType([
  StructField("customer_id", StringType(), True),
  StructField("customer_name", StringType(), True),
  StructField("number_of_line_items", StringType(), True),
  StructField("order_datetime", StringType(), True),
  StructField("order_number", LongType(), True)]
)

@dlt.table(
  comment="Raw data on sales",
  schema=sales_schema)
def sales():
  return ("...")

O exemplo a seguir especifica o esquema para uma tabela usando uma cadeia de caracteres DDL, define uma coluna gerada e define uma coluna de partição:

@dlt.table(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  partition_cols = ["order_day_of_week"])
def sales():
  return ("...")

Por padrão, Delta Live Tables infere o esquema da table definição se você não especificar um esquema.

Configurar uma tabela de streaming para ignorar alterações em uma tabela de streaming de origem

Nota

  • O skipChangeCommits sinalizador funciona apenas com spark.readStream o uso da option() função. Não é possível usar esse sinalizador em uma dlt.read_stream() função.
  • Não é possível usar o skipChangeCommits sinalizador quando a tabela de streaming de origem é definida como o destino de uma função apply_changes( ).

Por padrão, as tabelas de streaming exigem fontes somente de acréscimo. Quando uma tabela de streaming usa outra tabela de streaming como fonte e a tabela de streaming de origem requer atualizações ou excluições, por exemplo, o processamento do "direito a ser esquecido" do GDPR, o skipChangeCommits sinalizador pode ser definido ao ler a tabela de streaming de origem para ignorar essas alterações. Para obter mais informações sobre esse sinalizador, consulte Ignorar atualizações e exclusões.

@table
def b():
   return spark.readStream.option("skipChangeCommits", "true").table("LIVE.A")

Exemplo: Definir restrições de tabela

Importante

As restrições de tabela estão em Visualização pública.

Ao especificar um esquema, você pode definir chaves primárias e estrangeiras. As restrições são informativas e não são aplicadas. Consulte a cláusula CONSTRAINT na referência da linguagem SQL.

O exemplo a seguir define uma tabela com uma restrição de chave primária e estrangeira:

@dlt.table(
   schema="""
    customer_id STRING NOT NULL PRIMARY KEY,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
    CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
    """
def sales():
   return ("...")

Exemplo: Definir um filtro de linha e uma máscara de coluna

Importante

Os filtros de linha e as máscaras de coluna estão na Pré-visualização Pública.

Para criar uma exibição materializada ou uma tabela de streaming com um filtro de linha e uma máscara de coluna, use a cláusula ROW FILTER e a cláusula MASK . O exemplo a seguir demonstra como definir um modo de exibição materializado e uma tabela Streaming com um filtro de linha e uma máscara de coluna:

@dlt.table(
   schema="""
    id int COMMENT 'This is the customer ID',
    name string COMMENT 'This is the customer full name',
    region string,
    ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
    """,
  row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)"
def sales():
   return ("...")

Para obter mais informações sobre filtros de linha e máscaras de coluna, consulte Publicar tabelas com filtros de linha e máscaras de coluna.

Propriedades Python Delta Live Tables

As tabelas a seguir descrevem as opções e propriedades que você pode especificar ao definir tabelas e exibições com Delta Live Tables:

Nota

Para usar o argumento para habilitar o cluster_by clustering líquido, seu pipeline deve ser configurado para usar o canal de visualização.

@table ou @view
name

Tipo: str

Um nome opcional para a tabela ou exibição. Se não estiver definido, o nome da função será usado como o nome da tabela ou do modo de exibição.
comment

Tipo: str

Uma descrição opcional para a tabela.
spark_conf

Tipo: dict

Uma lista opcional de configurações do Spark para a execução desta consulta.
table_properties

Tipo: dict

Uma lista opcional de propriedades da tabela para a tabela.
path

Tipo: str

Um local de armazenamento opcional para dados de tabela. Se não estiver definido, o padrão do sistema será o local de armazenamento do pipeline.
partition_cols

Tipo: a collection of str

Uma coleção opcional, por exemplo, uma list de uma ou mais colunas a serem usadas para particionar a tabela.
cluster_by

Tipo: array

Opcionalmente, habilite o clustering líquido na tabela e defina as colunas a serem usadas como chaves de clustering.

Veja Utilizar clustering líquido para tabelas Delta.
schema

Tipo: str ou StructType

Uma definição de esquema opcional para a tabela. Os esquemas podem ser definidos como uma cadeia de caracteres DDL SQL ou com um Python StructType.
temporary

Tipo: bool

Crie uma tabela, mas não publique metadados para a tabela. A temporary palavra-chave instrui Delta Live Tables a criar uma tabela que está disponível para o pipeline, mas não deve ser acessada fora do pipeline. Para reduzir o tempo de processamento, uma tabela temporária persiste durante o tempo de vida do pipeline que a cria, e não apenas uma única atualização.

O padrão é 'False'.
row_filter (Pré-visualização Pública)

Tipo: str

Uma cláusula de filtro de linha opcional para a tabela. Consulte Publicar tabelas com filtros de linha e máscaras de coluna.
Definição de tabela ou vista
def <function-name>()

Uma função Python que define o conjunto de dados. Se o name parâmetro não estiver definido, será <function-name> usado como o nome do conjunto de dados de destino.
query

Uma instrução Spark SQL que retorna um Spark Dataset ou Koalas DataFrame.

Use dlt.read() ou spark.read.table() para executar uma leitura completa de um conjunto de dados definido no mesmo pipeline. Para ler um conjunto de dados externo, use a spark.read.table() função. Não é possível usar dlt.read() para ler conjuntos de dados externos. Como spark.read.table() pode ser usado para ler conjuntos de dados internos, conjuntos de dados definidos fora do pipeline atual e permite especificar opções para ler dados, o dlt.read() Databricks recomenda usá-lo em vez da função.

Quando você usa a função para ler de spark.read.table() um conjunto de dados definido no mesmo pipeline, anexe a LIVE palavra-chave ao nome do conjunto de dados no argumento da função. Por exemplo, para ler a partir de um conjunto de dados chamado customers:

spark.read.table("LIVE.customers")

Você também pode usar a função para ler uma spark.read.table() tabela registrada no metastore omitindo a LIVE palavra-chave e, opcionalmente, qualificando o nome da tabela com o nome do banco de dados:

spark.read.table("sales.customers")

Use dlt.read_stream() ou spark.readStream.table() para executar uma leitura de streaming de um conjunto de dados definido no mesmo pipeline. Para executar uma leitura de streaming de um conjunto de dados externo, use o botão
spark.readStream.table() função. Como spark.readStream.table() pode ser usado para ler conjuntos de dados internos, conjuntos de dados definidos fora do pipeline atual e permite especificar opções para ler dados, o dlt.read_stream() Databricks recomenda usá-lo em vez da função.

Para definir uma consulta em uma função Delta Live Tables table usando sintaxe SQL, use a spark.sql função. Consulte Exemplo: acessar um conjunto de dados usando spark.sql. Para definir uma consulta em uma função Delta Live Tables table usando Python, use a sintaxe PySpark .
Expectativas
@expect("description", "constraint")

Declarar uma restrição de qualidade de dados identificada por
description. Se uma linha violar a expectativa, inclua a linha no conjunto de dados de destino.
@expect_or_drop("description", "constraint")

Declarar uma restrição de qualidade de dados identificada por
description. Se uma linha violar a expectativa, solte a linha do conjunto de dados de destino.
@expect_or_fail("description", "constraint")

Declarar uma restrição de qualidade de dados identificada por
description. Se uma linha violar a expectativa, interrompa imediatamente a execução.
@expect_all(expectations)

Declare uma ou mais restrições de qualidade de dados.
expectations é um dicionário Python, onde a chave é a descrição da expectativa e o valor é a restrição de expectativa. Se uma linha violar qualquer uma das expectativas, inclua a linha no conjunto de dados de destino.
@expect_all_or_drop(expectations)

Declare uma ou mais restrições de qualidade de dados.
expectations é um dicionário Python, onde a chave é a descrição da expectativa e o valor é a restrição de expectativa. Se uma linha violar qualquer uma das expectativas, solte a linha do conjunto de dados de destino.
@expect_all_or_fail(expectations)

Declare uma ou mais restrições de qualidade de dados.
expectations é um dicionário Python, onde a chave é a descrição da expectativa e o valor é a restrição de expectativa. Se uma linha violar qualquer uma das expectativas, interrompa imediatamente a execução.

Alterar a captura de dados de um feed de alterações com Python no Delta Live Tables

Use a apply_changes() função na API Python para usar a funcionalidade de captura de dados de alteração (CDC) Delta Live Tables para processar dados de origem de um feed de dados de alteração (CDF).

Importante

Você deve declarar uma tabela de streaming de destino para aplicar as alterações. Opcionalmente, você pode especificar o esquema para sua tabela de destino. Ao especificar o apply_changes() esquema da tabela de destino, você deve incluir as __START_AT colunas e __END_AT com o mesmo tipo de dados que os sequence_by campos.

Para criar a tabela de destino necessária, você pode usar a função create_streaming_table() na interface Python do Delta Live Tables.

apply_changes(
  target = "<target-table>",
  source = "<data-source>",
  keys = ["key1", "key2", "keyN"],
  sequence_by = "<sequence-column>",
  ignore_null_updates = False,
  apply_as_deletes = None,
  apply_as_truncates = None,
  column_list = None,
  except_column_list = None,
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
)

Nota

Para APPLY CHANGES processamento, o comportamento padrão para INSERT eventos e UPDATE é atualizar eventos CDC da origem: atualizar quaisquer linhas na tabela de destino que correspondam à(s) chave(s) especificada(s) ou inserir uma nova linha quando um registro correspondente não existir na tabela de destino. A manipulação de DELETE eventos pode ser especificada com a APPLY AS DELETE WHEN condição.

Para saber mais sobre o processamento CDC com um feed de alterações, consulte The APPLY CHANGES APIs: Simplify change data capture with Delta Live Tables. Para obter um exemplo de uso da apply_changes() função, consulte Exemplo: processamento de SCD tipo 1 e SCD tipo 2 com dados de origem CDF.

Importante

Você deve declarar uma tabela de streaming de destino para aplicar as alterações. Opcionalmente, você pode especificar o esquema para sua tabela de destino. Ao especificar o esquema da tabela de apply_changes destino, você deve incluir as __START_AT colunas e __END_AT com o mesmo tipo de dados do sequence_by campo.

Consulte As APIs APPLY CHANGES: Simplifique a captura de dados de alteração com o Delta Live Tables.

Argumentos
target

Tipo: str

O nome da tabela a ser atualizada. Você pode usar a função create_streaming_table() para criar a tabela de destino antes de executar a apply_changes() função.

Este parâmetro é obrigatório.
source

Tipo: str

A fonte de dados que contém os registros do CDC.

Este parâmetro é obrigatório.
keys

Tipo: list

A coluna ou combinação de colunas que identifica exclusivamente uma linha nos dados de origem. Isso é usado para identificar quais eventos CDC se aplicam a registros específicos na tabela de destino.

Você pode especificar:

- Uma lista de strings: ["userId", "orderId"]
- Uma lista de funções do Spark SQL col() : [col("userId"), col("orderId"]

Os argumentos para col() funções não podem incluir qualificadores. Por exemplo, você pode usar col(userId), mas não pode usar col(source.userId).

Este parâmetro é obrigatório.
sequence_by

Tipo: str ou col()

O nome da coluna que especifica a ordem lógica dos eventos CDC nos dados de origem. O Delta Live Tables usa esse sequenciamento para manipular eventos de alteração que chegam fora de ordem.

Você pode especificar:

- Uma corda: "sequenceNum"
- Uma função Spark SQL col() : col("sequenceNum")

Os argumentos para col() funções não podem incluir qualificadores. Por exemplo, você pode usar col(userId), mas não pode usar col(source.userId).

A coluna especificada deve ser um tipo de dados classificável.

Este parâmetro é obrigatório.
ignore_null_updates

Tipo: bool

Permitir a ingestão de atualizações contendo um subconjunto das colunas de destino. Quando um evento CDC corresponde a uma linha existente e ignore_null_updates é True, as colunas com um null mantêm seus valores existentes no destino. Isso também se aplica a colunas aninhadas com um valor de null. Quando ignore_null_updates é False, os valores existentes são substituídos por null valores.

Este parâmetro é opcional.

A predefinição é False.
apply_as_deletes

Tipo: str ou expr()

Especifica quando um evento CDC deve ser tratado como um DELETE upsert em vez de um upsert. Para lidar com dados fora de ordem, a linha excluída é temporariamente mantida como uma marca de exclusão na tabela Delta subjacente e uma exibição é criada no metastore que filtra essas lápides. O intervalo de retenção pode ser configurado com o
pipelines.cdc.tombstoneGCThresholdInSeconds propriedade table.

Você pode especificar:

- Uma corda: "Operation = 'DELETE'"
- Uma função Spark SQL expr() : expr("Operation = 'DELETE'")

Este parâmetro é opcional.
apply_as_truncates

Tipo: str ou expr()

Especifica quando um evento CDC deve ser tratado como uma tabela TRUNCATEcompleta. Como essa cláusula aciona um truncado completo da tabela de destino, ela deve ser usada apenas para casos de uso específicos que exijam essa funcionalidade.

O apply_as_truncates parâmetro é suportado apenas para SCD tipo 1. SCD tipo 2 não suporta operações truncadas.

Você pode especificar:

- Uma corda: "Operation = 'TRUNCATE'"
- Uma função Spark SQL expr() : expr("Operation = 'TRUNCATE'")

Este parâmetro é opcional.
column_list

except_column_list

Tipo: list

Um subconjunto de colunas a serem incluídas na tabela de destino. Use column_list para especificar a lista completa de colunas a serem incluídas. Use except_column_list para especificar as colunas a serem excluídas. Você pode declarar qualquer valor como uma lista de cadeias de caracteres ou como funções do Spark SQL col() :

- column_list = ["userId", "name", "city"].
- column_list = [col("userId"), col("name"), col("city")]
- except_column_list = ["operation", "sequenceNum"]
- except_column_list = [col("operation"), col("sequenceNum")

Os argumentos para col() funções não podem incluir qualificadores. Por exemplo, você pode usar col(userId), mas não pode usar col(source.userId).

Este parâmetro é opcional.

O padrão é incluir todas as colunas na tabela de destino quando nenhum column_list argumento ou except_column_list é passado para a função.
stored_as_scd_type

Tipo: str ou int

Se deseja armazenar registros como SCD tipo 1 ou SCD tipo 2.

Definido como 1 para SCD tipo 1 ou 2 para SCD tipo 2.

Esta cláusula é opcional.

O padrão é SCD tipo 1.
track_history_column_list

track_history_except_column_list

Tipo: list

Um subconjunto de colunas de saída a serem rastreadas para o histórico na tabela de destino. Use track_history_column_list para especificar a lista completa de colunas a serem rastreadas. Utilizar
track_history_except_column_list para especificar as colunas a serem excluídas do acompanhamento. Você pode declarar qualquer valor como uma lista de cadeias de caracteres ou como funções do Spark SQL col() :
- track_history_column_list = ["userId", "name", "city"].
- track_history_column_list = [col("userId"), col("name"), col("city")]
- track_history_except_column_list = ["operation", "sequenceNum"]
- track_history_except_column_list = [col("operation"), col("sequenceNum")

Os argumentos para col() funções não podem incluir qualificadores. Por exemplo, você pode usar col(userId), mas não pode usar col(source.userId).

Este parâmetro é opcional.

O padrão é incluir todas as colunas na tabela de destino quando não track_history_column_list ou
track_history_except_column_list é passado para a função.

Alterar a captura de dados de instantâneos de banco de dados com Python em Delta Live Tables

Importante

A APPLY CHANGES FROM SNAPSHOT API está em Visualização Pública.

Use a apply_changes_from_snapshot() função na API Python para usar a funcionalidade de captura de dados de alteração (CDC) do Delta Live Tables para processar dados de origem de instantâneos de banco de dados.

Importante

Você deve declarar uma tabela de streaming de destino para aplicar as alterações. Opcionalmente, você pode especificar o esquema para sua tabela de destino. Ao especificar o apply_changes_from_snapshot() esquema da tabela de destino, você também deve incluir as __START_AT colunas e __END_AT com o mesmo tipo de dados que o sequence_by campo.

Para criar a tabela de destino necessária, você pode usar a função create_streaming_table() na interface Python do Delta Live Tables.

apply_changes_from_snapshot(
  target = "<target-table>",
  source = Any,
  keys = ["key1", "key2", "keyN"],
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
) -> None

Nota

Para APPLY CHANGES FROM SNAPSHOT processamento, o comportamento padrão é inserir uma nova linha quando um registro correspondente com a(s) mesma(s) chave(s) não existe no destino. Se existir um registro correspondente, ele será atualizado somente se algum dos valores na linha tiver sido alterado. As linhas com chaves presentes no destino, mas não mais presentes na origem, são excluídas.

Para saber mais sobre o processamento CDC com snapshots, consulte The APPLY CHANGES APIs: Simplify change data capture with Delta Live Tables. Para obter exemplos de uso da apply_changes_from_snapshot() função, consulte os exemplos de ingestão periódica de instantâneo e ingestão de instantâneo histórico.

Argumentos
target

Tipo: str

O nome da tabela a ser atualizada. Você pode usar a função create_streaming_table() para criar a tabela de destino antes de executar a apply_changes() função.

Este parâmetro é obrigatório.
source

Tipo: str ou lambda function

O nome de uma tabela ou exibição para snapshot periodicamente ou uma função lambda Python que retorna o snapshot DataFrame a ser processado e a versão snapshot. Consulte Implementar o argumento de origem.

Este parâmetro é obrigatório.
keys

Tipo: list

A coluna ou combinação de colunas que identifica exclusivamente uma linha nos dados de origem. Isso é usado para identificar quais eventos CDC se aplicam a registros específicos na tabela de destino.

Você pode especificar:

- Uma lista de strings: ["userId", "orderId"]
- Uma lista de funções do Spark SQL col() : [col("userId"), col("orderId"]

Os argumentos para col() funções não podem incluir qualificadores. Por exemplo, você pode usar col(userId), mas não pode usar col(source.userId).

Este parâmetro é obrigatório.
stored_as_scd_type

Tipo: str ou int

Se deseja armazenar registros como SCD tipo 1 ou SCD tipo 2.

Definido como 1 para SCD tipo 1 ou 2 para SCD tipo 2.

Esta cláusula é opcional.

O padrão é SCD tipo 1.
track_history_column_list

track_history_except_column_list

Tipo: list

Um subconjunto de colunas de saída a serem rastreadas para o histórico na tabela de destino. Use track_history_column_list para especificar a lista completa de colunas a serem rastreadas. Utilizar
track_history_except_column_list para especificar as colunas a serem excluídas do acompanhamento. Você pode declarar qualquer valor como uma lista de cadeias de caracteres ou como funções do Spark SQL col() :
- track_history_column_list = ["userId", "name", "city"].
- track_history_column_list = [col("userId"), col("name"), col("city")]
- track_history_except_column_list = ["operation", "sequenceNum"]
- track_history_except_column_list = [col("operation"), col("sequenceNum")

Os argumentos para col() funções não podem incluir qualificadores. Por exemplo, você pode usar col(userId), mas não pode usar col(source.userId).

Este parâmetro é opcional.

O padrão é incluir todas as colunas na tabela de destino quando não track_history_column_list ou
track_history_except_column_list é passado para a função.

Implementar o source argumento

A apply_changes_from_snapshot() função inclui o source argumento. Para processar snapshots históricos, espera-se que o source argumento seja uma função lambda Python que retorna dois valores para a apply_changes_from_snapshot() função: um Python DataFrame contendo os dados de snapshot a serem processados e uma versão de snapshot.

A assinatura da função lambda é a seguinte:

lambda Any => Optional[(DataFrame, Any)]
  • O argumento para a função lambda é a versão de snapshot processada mais recentemente.
  • O valor de retorno da função lambda é None ou uma tupla de dois valores: O primeiro valor da tupla é um DataFrame contendo o instantâneo a ser processado. O segundo valor da tupla é a versão do instantâneo que representa a ordem lógica do instantâneo.

Um exemplo que implementa e chama a função lambda:

def next_snapshot_and_version(latest_snapshot_version):
 if latest_snapshot_version is None:
   return (spark.read.load("filename.csv"), 1)
 else:
   return None

apply_changes_from_snapshot(
  # ...
  source = next_snapshot_and_version,
  # ...
)

O tempo de execução do Delta Live Tables executa as seguintes etapas sempre que o pipeline que contém a apply_changes_from_snapshot() função é acionado:

  1. Executa a next_snapshot_and_version função para carregar o próximo snapshot DataFrame e a versão de snapshot correspondente.
  2. Se nenhum DataFrame retornar, a execução será encerrada e a atualização do pipeline será marcada como concluída.
  3. Deteta as alterações no novo instantâneo e as aplica incrementalmente à tabela de destino.
  4. Retorna à etapa #1 para carregar o próximo instantâneo e sua versão.

Limitações

A interface Python do Delta Live Tables tem a seguinte limitação:

A pivot() função não é suportada. A pivot operação no Spark requer o carregamento ansioso de dados de entrada para calcular o esquema de saída. Esse recurso não é suportado no Delta Live Tables.