Tutorial: Executar o seu primeiro pipeline Delta Live Tables
Este tutorial leva você pelas etapas para configurar seu primeiro pipeline Delta Live Tables, escrever código ETL básico e executar uma atualização de pipeline.
Todas as etapas neste tutorial são projetadas para espaços de trabalho com o Unity Catalog habilitado. Você também pode configurar pipelines Delta Live Tables para trabalhar com o metastore herdado do Hive. Consulte Usar pipelines Delta Live Tables com metastore herdado do Hive.
Nota
Este tutorial tem instruções para desenvolver e validar novo código de pipeline usando notebooks Databricks. Você também pode configurar pipelines usando código-fonte em arquivos Python ou SQL.
Você pode configurar um pipeline para executar seu código se já tiver o código-fonte escrito usando a sintaxe Delta Live Tables. Consulte Configurar um pipeline Delta Live Tables.
Você pode usar a sintaxe SQL totalmente declarativa no Databricks SQL para registrar e definir agendas de atualização para exibições materializadas e tabelas de streaming como objetos gerenciados pelo Unity Catalog. Consulte Usar exibições materializadas no Databricks SQL e Carregar dados usando tabelas de streaming no Databricks SQL.
Exemplo: Ingerir e processar dados de nomes de bebés de Nova Iorque
O exemplo neste artigo usa um conjunto de dados disponível publicamente que contém registros de nomes de bebês do Estado de Nova York. Este exemplo demonstra como usar um pipeline do Delta Live Tables para:
- Leia dados CSV brutos de um volume em uma tabela.
- Leia os registros da tabela de ingestão e use as expectativas do Delta Live Tables para criar uma nova tabela que contenha dados limpos.
- Use os registros limpos como entrada para consultas Delta Live Tables que criam conjuntos de dados derivados.
Este código demonstra um exemplo simplificado da arquitetura medalhão. Veja O que é a arquitetura do medalhão lakehouse?.
Implementações deste exemplo são fornecidas para Python e SQL. Siga as etapas para criar um novo pipeline e bloco de anotações e, em seguida, copie e cole o código fornecido.
Exemplos de blocos de anotações com código completo também são fornecidos.
Requisitos
Para iniciar um pipeline, você deve ter permissão de criação de cluster ou acesso a uma política de cluster que defina um cluster Delta Live Tables. O runtime de Tabelas Dinâmicas Delta cria um cluster antes de executar o pipeline e falha se não tiver a permissão correta.
Todos os usuários podem disparar atualizações usando pipelines sem servidor por padrão. O Serverless deve estar habilitado no nível da conta e pode não estar disponível na região do espaço de trabalho. Consulte Ativar computação sem servidor.
Os exemplos neste tutorial usam o Unity Catalog. O Databricks recomenda a criação de um novo esquema para executar este tutorial, pois vários objetos de banco de dados são criados no esquema de destino.
- Para criar um novo esquema em um catálogo, você deve ter
ALL PRIVILEGES
privilégios ouUSE CATALOG
eCREATE SCHEMA
. - Se não for possível criar um novo esquema, execute este tutorial em relação a um esquema existente. Você deve ter os seguintes privilégios:
-
USE CATALOG
para o catálogo pai. -
ALL PRIVILEGES
ouUSE SCHEMA
,CREATE MATERIALIZED VIEW
eCREATE TABLE
privilégios no esquema de destino.
-
- Este tutorial usa um volume para armazenar dados de exemplo. A Databricks recomenda a criação de um novo volume para este tutorial. Se você criar um novo esquema para este tutorial, poderá criar um novo volume nesse esquema.
- Para criar um novo volume em um esquema existente, você deve ter os seguintes privilégios:
-
USE CATALOG
para o catálogo pai. -
ALL PRIVILEGES
ouUSE SCHEMA
eCREATE VOLUME
privilégios no esquema de destino.
-
- Opcionalmente, você pode usar um volume existente. Você deve ter os seguintes privilégios:
-
USE CATALOG
para o catálogo pai. -
USE SCHEMA
para o esquema pai. -
ALL PRIVILEGES
ouREAD VOLUME
eWRITE VOLUME
no volume-alvo.
-
- Para criar um novo volume em um esquema existente, você deve ter os seguintes privilégios:
Para definir essas permissões, entre em contato com o administrador do Databricks. Para obter mais informações sobre os privilégios do Catálogo Unity, consulte Privilégios do Catálogo Unity e objetos protegíveis.
- Para criar um novo esquema em um catálogo, você deve ter
Passo 0: Transferir dados
Este exemplo carrega dados de um volume do Catálogo Unity. O código a seguir baixa um arquivo CSV e o armazena no volume especificado. Abra um novo bloco de anotações e execute o seguinte código para baixar esses dados para o volume especificado:
my_catalog = "<catalog-name>"
my_schema = "<schema-name>"
my_volume = "<volume-name>"
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {my_catalog}.{my_schema}")
spark.sql(f"CREATE VOLUME IF NOT EXISTS {my_catalog}.{my_schema}.{my_volume}")
volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"
download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
filename = "babynames.csv"
dbutils.fs.cp(download_url, volume_path + filename)
Substitua <catalog-name>
, <schema-name>
e <volume-name>
pelos nomes de catálogo, esquema e volume de um volume do Catálogo Unity. O código fornecido tenta criar o esquema e o volume especificados se esses objetos não existirem. Você deve ter os privilégios apropriados para criar e gravar em objetos no Unity Catalog. Consulte Requisitos.
Nota
Certifique-se de que este bloco de notas foi executado com êxito antes de continuar com o tutorial. Não configure este bloco de anotações como parte do seu pipeline.
Etapa 1: Criar um pipeline
O Delta Live Tables cria pipelines resolvendo dependências definidas em blocos de anotações ou arquivos (chamados de código-fonte) usando a sintaxe Delta Live Tables. Cada arquivo de código-fonte pode conter apenas um idioma, mas você pode adicionar vários blocos de anotações ou arquivos específicos do idioma no pipeline.
Importante
Não configure nenhum ativo no campo Código-fonte . Deixar esse campo preto cria e configura um bloco de anotações para a criação do código-fonte.
As instruções neste tutorial usam computação sem servidor e Unity Catalog. Use as configurações padrão para todas as opções de configuração não especificadas nestas instruções.
Nota
Se serverless não estiver habilitado ou suportado em seu espaço de trabalho, você poderá concluir o tutorial conforme escrito usando as configurações de computação padrão. Você deve selecionar manualmente Unity Catalog em Opções de armazenamento na seção Destino da interface do usuário Criar pipeline .
Para configurar um novo pipeline, faça o seguinte:
- Na barra lateral, clique em Delta Live Tables.
- Clique Criar pipeline.
- No Nome do pipeline, digite um nome de pipeline exclusivo.
- Marque a caixa de seleção sem servidor.
- No Destino
, para configurar um local do Catálogo Unity onde as tabelas são publicadas, selecione um Catálogo e um Esquema . - Em Avançado, clique em Adicionar configuração e, em seguida, defina os parâmetros do pipeline para o catálogo, esquema e volume para os quais descarregou os dados, utilizando os seguintes nomes de parâmetro:
my_catalog
my_schema
my_volume
- Clique em Criar.
A interface do usuário dos pipelines aparece para o novo pipeline. Um bloco de anotações de código-fonte é criado e configurado automaticamente para o pipeline.
O bloco de anotações é criado em um novo diretório no diretório do usuário. O nome do novo diretório e arquivo correspondem ao nome do seu pipeline. Por exemplo, /Users/your.username@databricks.com/my_pipeline/my_pipeline
.
Um link para acessar este bloco de anotações está no campo Código-fonte no painel Detalhes do pipeline. Clique no link para abrir o bloco de anotações antes de prosseguir para a próxima etapa.
Etapa 2: Declarar visualizações materializadas e tabelas de streaming em um bloco de anotações com Python ou SQL
Você pode usar notebooks Datbricks para desenvolver e validar interativamente o código-fonte para pipelines Delta Live Tables. Você deve anexar seu bloco de anotações ao pipeline para usar essa funcionalidade. Para anexar seu bloco de anotações recém-criado ao pipeline que você acabou de criar:
- Clique em Conectar no canto superior direito para abrir o menu de configuração de computação.
- Passe o cursor sobre o nome do pipeline criado na Etapa 1.
- Clique em Ligar.
A interface do usuário muda para incluir os botões Validar e Iniciar no canto superior direito. Para saber mais sobre o suporte de notebook para desenvolvimento de código de pipeline, consulte Desenvolver e depurar pipelines Delta Live Tables em blocos de anotações.
Importante
- Os pipelines Delta Live Tables avaliam todas as células de um bloco de anotações durante o planejamento. Ao contrário dos blocos de anotações que são executados em computação multiuso ou agendados como trabalhos, os pipelines não garantem que as células sejam executadas na ordem especificada.
- Os blocos de notas só podem conter uma única linguagem de programação. Não misture Python e código SQL em blocos de anotações de código-fonte de pipeline.
Para obter detalhes sobre como desenvolver código com Python ou SQL, consulte Desenvolver código de pipeline com Python ou Desenvolver código de pipeline com SQL.
Exemplo de código de pipeline
Para implementar o exemplo neste tutorial, copie e cole o código a seguir em uma célula do bloco de anotações configurada como código-fonte para seu pipeline.
O código fornecido faz o seguinte:
- Importa módulos necessários (somente Python).
- Referencia parâmetros definidos durante a configuração do pipeline.
- Define uma tabela de streaming chamada
baby_names_raw
que ingere a partir de um volume. - Define uma exibição materializada chamada
baby_names_prepared
que valida os dados ingeridos. - Define uma exibição materializada nomeada
top_baby_names_2021
que tem uma exibição altamente refinada dos dados.
Python
# Import modules
import dlt
from pyspark.sql.functions import *
# Assign pipeline parameters to variables
my_catalog = spark.conf.get("my_catalog")
my_schema = spark.conf.get("my_schema")
my_volume = spark.conf.get("my_volume")
# Define the path to source data
volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"
# Define a streaming table to ingest data from a volume
@dlt.table(
comment="Popular baby first names in New York. This data was ingested from the New York State Department of Health."
)
def baby_names_raw():
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("inferSchema", True)
.option("header", True)
.load(volume_path)
)
df_renamed_column = df.withColumnRenamed("First Name", "First_Name")
return df_renamed_column
# Define a materialized view that validates data and renames a column
@dlt.table(
comment="New York popular baby first name data cleaned and prepared for analysis."
)
@dlt.expect("valid_first_name", "First_Name IS NOT NULL")
@dlt.expect_or_fail("valid_count", "Count > 0")
def baby_names_prepared():
return (
spark.read.table("LIVE.baby_names_raw")
.withColumnRenamed("Year", "Year_Of_Birth")
.select("Year_Of_Birth", "First_Name", "Count")
)
# Define a materialized view that has a filtered, aggregated, and sorted view of the data
@dlt.table(
comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
return (
spark.read.table("LIVE.baby_names_prepared")
.filter(expr("Year_Of_Birth == 2021"))
.groupBy("First_Name")
.agg(sum("Count").alias("Total_Count"))
.sort(desc("Total_Count"))
.limit(10)
)
SQL
-- Define a streaming table to ingest data from a volume
CREATE OR REFRESH STREAMING TABLE baby_names_raw
COMMENT "Popular baby first names in New York. This data was ingested from the New York State Department of Health."
AS SELECT Year, `First Name` AS First_Name, County, Sex, Count
FROM STREAM(read_files(
'/Volumes/${my_catalog}/${my_schema}/${my_volume}/babynames.csv',
format => 'csv',
header => true,
mode => 'FAILFAST'));
-- Define a materialized view that validates data and renames a column
CREATE OR REFRESH MATERIALIZED VIEW baby_names_prepared(
CONSTRAINT valid_first_name EXPECT (First_Name IS NOT NULL),
CONSTRAINT valid_count EXPECT (Count > 0) ON VIOLATION FAIL UPDATE
)
COMMENT "New York popular baby first name data cleaned and prepared for analysis."
AS SELECT
Year AS Year_Of_Birth,
First_Name,
Count
FROM LIVE.baby_names_raw;
-- Define a materialized view that provides a filtered, aggregated, and sorted view of the data
CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
First_Name,
SUM(Count) AS Total_Count
FROM LIVE.baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
LIMIT 10;
Etapa 3: Iniciar uma atualização de pipeline
Para iniciar uma atualização de pipeline, clique no botão Iniciar no canto superior direito da interface do usuário do bloco de anotações.
Exemplos de blocos de notas
Os blocos de anotações a seguir contêm os mesmos exemplos de código fornecidos neste artigo. Esses blocos de anotações têm os mesmos requisitos das etapas deste artigo. Consulte Requisitos.
Para importar um bloco de anotações, conclua as seguintes etapas:
- Abra a interface do usuário do bloco de anotações.
- Clique em + Novo>Bloco de Notas.
- Um bloco de anotações vazio é aberto.
- Clique em File (Ficheiro)>Import (Importar). A caixa de diálogo Importar é exibida.
- Selecione a opção URL para Importar de.
- Cole o URL do bloco de notas.
- Clique em Importar.
Este tutorial requer que você execute um bloco de anotações de configuração de dados antes de configurar e executar seu pipeline Delta Live Tables. Importe o bloco de anotações a seguir, anexe-o a um recurso de computação, preencha a variável necessária para my_catalog
, my_schema
e my_volume
, e clique em Executar tudo.
Tutorial de download de dados para pipelines
Os blocos de anotações a seguir fornecem exemplos em Python ou SQL. Quando você importa um bloco de anotações, ele é salvo no diretório inicial do usuário.
Depois de importar um dos blocos de anotações abaixo, conclua as etapas para criar um pipeline, mas use o seletor de arquivos de código-fonte para selecionar o bloco de anotações baixado. Depois de criar o pipeline com um bloco de anotações configurado como código-fonte, clique em Iniciar na interface do usuário do pipeline para disparar uma atualização.