Partilhar via


Desenvolver código de pipeline com SQL

Delta Live Tables introduz várias novas palavras-chave SQL e funções para definir visualizações materializadas e tabelas de streaming em pipelines. O suporte SQL para o desenvolvimento de pipelines baseia-se nos conceitos básicos do Spark SQL e adiciona suporte para a funcionalidade de Streaming Estruturado.

Os usuários familiarizados com o PySpark DataFrames podem preferir desenvolver código de pipeline com Python. O Python suporta testes e operações mais abrangentes que são difíceis de implementar com SQL, como operações de metaprogramação. Consulte Desenvolver código de pipeline com Python.

Para obter uma referência completa da sintaxe SQL do Delta Live Tables, consulte Referência da linguagem SQL do Delta Live Tables.

Noções básicas de SQL para desenvolvimento de pipeline

O código SQL que cria conjuntos de dados Delta Live Tables usa a sintaxe para definir exibições materializadas e tabelas de streaming em relação aos resultados da CREATE OR REFRESH consulta.

A STREAM palavra-chave indica se a fonte de dados referenciada em uma SELECT cláusula deve ser lida com semântica de streaming.

O código-fonte do Delta Live Tables difere criticamente dos scripts SQL: o Delta Live Tables avalia todas as definições de conjunto de dados em todos os arquivos de código-fonte configurados em um pipeline e cria um gráfico de fluxo de dados antes que qualquer consulta seja executada. A ordem das consultas que aparecem em um bloco de anotações ou script não define a ordem de execução.

Criar uma vista materializada com SQL

O exemplo de código a seguir demonstra a sintaxe básica para criar uma exibição materializada com SQL:

CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;

Criar uma tabela de streaming com SQL

O exemplo de código a seguir demonstra a sintaxe básica para criar uma tabela de streaming com SQL:

Nota

Nem todas as fontes de dados suportam leituras de streaming, e algumas fontes de dados devem sempre ser processadas com semântica de streaming.

CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;

Carregar dados do armazenamento de objetos

O Delta Live Tables dá suporte ao carregamento de dados de todos os formatos suportados pelo Azure Databricks. Consulte Opções de formato de dados.

Nota

Estes exemplos usam dados disponíveis sob o /databricks-datasets montado automaticamente em seu espaço de trabalho. O Databricks recomenda o uso de caminhos de volume ou URIs de nuvem para fazer referência a dados armazenados no armazenamento de objetos em nuvem. Consulte O que são volumes do Catálogo Unity?.

O Databricks recomenda o uso do Auto Loader e de tabelas de streaming ao configurar cargas de trabalho de ingestão incremental em relação aos dados armazenados no armazenamento de objetos na nuvem. Consulte O que é Auto Loader?.

SQL usa a função para invocar a read_files funcionalidade Auto Loader. Você também deve usar a STREAM palavra-chave para configurar uma leitura de streaming com read_files.

O exemplo a seguir cria uma tabela de streaming a partir de arquivos JSON usando o Auto Loader:

CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

A read_files função também suporta semântica em lote para criar visualizações materializadas. O exemplo a seguir usa semântica em lote para ler um diretório JSON e criar uma exibição materializada:

CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders");

Valide dados com expectativas

Você pode usar as expectativas para definir e impor restrições de qualidade de dados. Consulte Gerenciar a qualidade dos dados com o Delta Live Tables.

O código a seguir define uma expectativa chamada valid_data que descarta registros que são nulos durante a ingestão de dados:

CREATE OR REFRESH STREAMING TABLE orders_valid(
  CONSTRAINT valid_date
  EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
  ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

Consultar exibições materializadas e tabelas de streaming definidas em seu pipeline

Use o LIVE esquema para consultar outras exibições materializadas e tabelas de streaming definidas em seu pipeline.

O exemplo a seguir define quatro conjuntos de dados:

  • Uma tabela de streaming chamada orders que carrega dados JSON.
  • Uma vista materializada denominada customers que carrega dados CSV.
  • Um modo de exibição materializado chamado customer_orders que une registros dos conjuntos de dados e customers , converte o carimbo de orders data/hora do pedido em uma data e seleciona os customer_idcampos , order_number, statee order_date .
  • Uma visão materializada nomeada daily_orders_by_state que agrega a contagem diária de pedidos para cada estado.
CREATE OR REFRESH STREAMING TABLE orders(
  CONSTRAINT valid_date
  EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
  ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");

CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
  c.customer_id,
  o.order_number,
  c.state,
  date(timestamp(int(o.order_datetime))) order_date
FROM LIVE.orders o
INNER JOIN LIVE.customers c
ON o.customer_id = c.customer_id;

CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM LIVE.customer_orders
GROUP BY state, order_date;