Criar um pipeline de dados de ponta a ponta no Databricks
Este artigo mostra como criar e implantar um pipeline de processamento de dados de ponta a ponta, incluindo como ingerir dados brutos, transformar os dados e executar análises nos dados processados.
Observação
Embora este artigo demonstre como criar um pipeline de dados completo usando notebooks do Databricks e um trabalho do Azure Databricks para orquestrar um fluxo de trabalho, o Databricks recomenda o uso do Delta Live Tables, uma interface declarativa para criar pipelines de processamento de dados confiáveis, sustentáveis e testáveis.
O que é um pipeline de dados?
Um pipeline de dados implementa as etapas necessárias para mover dados de sistemas de origem, transformar esses dados com base nos requisitos e armazenar os dados em um sistema de destino. Um pipeline de dados inclui todos os processos necessários para transformar dados brutos em dados preparados que os usuários podem consumir. Por exemplo, um pipeline de dados pode preparar dados para que analistas de dados e cientistas de dados possam extrair valor dos dados por meio de análise e relatório.
Um fluxo de trabalho ETL (extração, transformação e carregamento) é um exemplo comum de um pipeline de dados. No processamento de ETL, os dados são ingeridos de sistemas de origem e gravados em uma área de preparo, transformados com base nos requisitos (garantindo a qualidade dos dados, eliminando a duplicação de registros e assim por diante) e gravados em um sistema de destino, como um data warehouse ou data lake.
Etapas do pipeline de dados
Para ajudá-lo a começar a criar pipelines de dados no Azure Databricks, o exemplo incluído neste artigo explica como criar um fluxo de trabalho de processamento de dados:
- Use os recursos do Azure Databricks para explorar um conjunto de dados brutos.
- Crie um notebook do Databricks para ingerir dados brutos de origem e gravar os dados brutos em uma tabela de destino.
- Crie um notebook do Databricks para transformar os dados brutos de origem e gravar os dados transformados em uma tabela de destino.
- Crie um notebook do Databricks para consultar os dados transformados.
- Automatize o pipeline de dados com um trabalho do Azure Databricks.
Requisitos
- Você está conectado ao Azure Databricks e está no workspace Ciência de dados e Engenharia.
- Você tem permissão para criar um cluster ou acesso a um cluster.
- (Opcional) Para publicar tabelas no Catálogo do Unity, você deve criar um catálogo e um esquema no Catálogo do Unity.
Exemplo: Conjunto de dados de milhões de músicas
O conjunto de dados usado neste exemplo é um subconjunto do Conjunto de dados de milhões de músicas, uma coleção de recursos e metadados para faixas de música contemporânea. Esse conjunto de dados está disponível nos conjuntos de dados de amostra incluídos no workspace do Azure Databricks.
Etapa 1: criar um cluster
Para executar o processamento e a análise de dados neste exemplo, crie um cluster para fornecer os recursos de computação necessários para executar comandos.
Observação
Como este exemplo usa um conjunto de dados de exemplo armazenado no DBFS e recomenda manter tabelas no Catálogo do Unity, você cria um cluster configurado com o modo de acesso de usuário único. Um único modo de acesso de usuário fornece acesso completo ao DBFS e, ao mesmo tempo, habilita o acesso ao Catálogo do Unity. Consulte Melhores práticas para o DBFS e o Catálogo do Unity.
- Na barra lateral, clique em Computação.
- Na página Computação, clique em Criar Cluster.
- Na página Novo Cluster, insira um nome exclusivo para o cluster.
- Em Modo de acesso, selecione Usuário único.
- Em Acesso de usuário único ou entidade de serviço, selecione seu nome de usuário.
- Deixe os valores restantes no estado padrão e clique em Criar Cluster.
Para saber mais sobre clusters do Databricks, confira Computação.
Etapa 2: explorar os dados de origem
Para saber como usar a interface do Azure Databricks para explorar os dados brutos de origem, confira Explorar os dados de origem de um pipeline de dados. Se você quiser ir diretamente para ingerir e preparar os dados, continue para a Etapa 3: Ingerir os dados brutos.
Etapa 3: Ingerir os dados brutos
Nesta etapa, você carrega os dados brutos em uma tabela para disponibilizá-los para processamento adicional. Para gerenciar os ativos de dados na plataforma do Databricks, como tabelas, o Databricks recomenda o Catálogo do Unity. No entanto, se você não tiver permissões para criar o catálogo e o esquema necessários para publicar tabelas no Catálogo do Unity, ainda poderá concluir as etapas a seguir publicando tabelas no metastore do Hive.
Para ingerir dados, o Databricks recomenda usar o Carregador Automático. O Carregador Automático detecta e processa automaticamente arquivos novos confirme eles chegam no armazenamento de objeto da nuvem.
Você pode configurar o Carregador Automático para detectar automaticamente o esquema de dados carregados, permitindo inicializar tabelas sem declarar explicitamente o esquema de dados e evoluir o esquema de tabela à medida que novas colunas são introduzidas. Isso elimina a necessidade de controlar e aplicar manualmente as alterações de esquema ao longo do tempo. O Databricks recomenda a inferência de esquema ao usar o Carregador Automático. No entanto, como visto na etapa de exploração de dados, os dados de músicas não contêm informações de cabeçalho. Como o cabeçalho não é armazenado com os dados, você precisará definir explicitamente o esquema, conforme mostrado no próximo exemplo.
Na barra lateral, clique em Novo e selecione Notebook no menu. A caixa de diálogo Criar Notebook será exibida.
Insira um nome para o notebook, por exemplo
Ingest songs data
. Por padrão:- Python é o idioma selecionado.
- O notebook é anexado ao último cluster usado. Nesse caso, o cluster que você criou na Etapa 1: Criar um cluster.
Insira o seguinte na primeira célula do notebook:
from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField # Define variables used in the code below file_path = "/databricks-datasets/songs/data-001/" table_name = "<table-name>" checkpoint_path = "/tmp/pipeline_get_started/_checkpoint/song_data" schema = StructType( [ StructField("artist_id", StringType(), True), StructField("artist_lat", DoubleType(), True), StructField("artist_long", DoubleType(), True), StructField("artist_location", StringType(), True), StructField("artist_name", StringType(), True), StructField("duration", DoubleType(), True), StructField("end_of_fade_in", DoubleType(), True), StructField("key", IntegerType(), True), StructField("key_confidence", DoubleType(), True), StructField("loudness", DoubleType(), True), StructField("release", StringType(), True), StructField("song_hotnes", DoubleType(), True), StructField("song_id", StringType(), True), StructField("start_of_fade_out", DoubleType(), True), StructField("tempo", DoubleType(), True), StructField("time_signature", DoubleType(), True), StructField("time_signature_confidence", DoubleType(), True), StructField("title", StringType(), True), StructField("year", IntegerType(), True), StructField("partial_sequence", IntegerType(), True) ] ) (spark.readStream .format("cloudFiles") .schema(schema) .option("cloudFiles.format", "csv") .option("sep","\t") .load(file_path) .writeStream .option("checkpointLocation", checkpoint_path) .trigger(availableNow=True) .toTable(table_name) )
Se você estiver usando o Catálogo do Unity, substitua
<table-name>
por um catálogo, esquema e nome de tabela para conter os registros ingeridos (por exemplo,data_pipelines.songs_data.raw_song_data
). Caso contrário, substitua<table-name>
pelo nome da tabela Delta para conter os registros ingeridos, por exemplo,raw_song_data
.Substitua
<checkpoint-path>
por um caminho para um diretório no DBFS para manter arquivos de ponto de verificação, por exemplo,/tmp/pipeline_get_started/_checkpoint/song_data
.Clique em e selecione Executar célula. Este exemplo define o esquema de dados usando as informações do
README
, ingere os dados de músicas de todos os arquivos contidos emfile_path
e grava os dados na tabela especificada portable_name
.
Etapa 4: Preparar os dados brutos
Para preparar os dados brutos para análise, as etapas a seguir transformam os dados brutos de músicas filtrando colunas desnecessárias e adicionando um novo campo que contém um carimbo de data/hora para a criação do novo registro.
Na barra lateral, clique em Novo e selecione Notebook no menu. A caixa de diálogo Criar Notebook será exibida.
Insira um nome para o notebook. Por exemplo,
Prepare songs data
. Altere o idioma padrão para SQL.Insira o seguinte na primeira célula do notebook:
CREATE OR REPLACE TABLE <table-name> ( artist_id STRING, artist_name STRING, duration DOUBLE, release STRING, tempo DOUBLE, time_signature DOUBLE, title STRING, year DOUBLE, processed_time TIMESTAMP ); INSERT INTO <table-name> SELECT artist_id, artist_name, duration, release, tempo, time_signature, title, year, current_timestamp() FROM <raw-songs-table-name>
Se você estiver usando o Catálogo do Unity, substitua
<table-name>
por um catálogo, esquema e nome de tabela para conter os registros filtrados e transformados (por exemplo,data_pipelines.songs_data.prepared_song_data
). Caso contrário, substitua<table-name>
pelo nome de uma tabela para conter os registros filtrados e transformados (por exemplo,prepared_song_data
).Substitua
<raw-songs-table-name>
pelo nome da tabela que contém os registros de músicas brutos ingeridos na etapa anterior.Clique em e selecione Executar célula.
Etapa 5: consultar os dados transformados
Nesta etapa, você estende o pipeline de processamento adicionando consultas para analisar os dados de músicas. Essas consultas usam os registros preparados criados na etapa anterior.
Na barra lateral, clique em Novo e selecione Notebook no menu. A caixa de diálogo Criar Notebook será exibida.
Insira um nome para o notebook. Por exemplo,
Analyze songs data
. Altere o idioma padrão para SQL.Insira o seguinte na primeira célula do notebook:
-- Which artists released the most songs each year? SELECT artist_name, count(artist_name) AS num_songs, year FROM <prepared-songs-table-name> WHERE year > 0 GROUP BY artist_name, year ORDER BY num_songs DESC, year DESC
Substitua
<prepared-songs-table-name>
pelo nome da tabela que contém os dados preparados. Por exemplo,data_pipelines.songs_data.prepared_song_data
.Clique em no menu de ações da célula, selecione Adicionar célula abaixo e insira o seguinte na nova célula:
-- Find songs for your DJ list SELECT artist_name, title, tempo FROM <prepared-songs-table-name> WHERE time_signature = 4 AND tempo between 100 and 140;
Substitua
<prepared-songs-table-name>
pelo nome da tabela preparada criada na etapa anterior. Por exemplo,data_pipelines.songs_data.prepared_song_data
.Para executar as consultas e exibir a saída, clique em Executar tudo.
Etapa 6: criar um trabalho do Azure Databricks para executar o pipeline
Você pode criar um fluxo de trabalho para automatizar a execução das etapas de ingestão, processamento e análise de dados usando um trabalho do Azure Databricks.
- Em seu workspace Ciência de dados e Engenharia, siga um desses procedimentos:
- Clique em Fluxos de trabalho na barra lateral e clique em .
- Na barra lateral, clique em Novo e selecione Trabalho.
- Na caixa de diálogo da tarefa na guia Tarefas, substitua Adicionar um nome para seu trabalho... com seu nome de trabalho. Por exemplo, "Fluxo de trabalho de músicas".
- Em Nome da tarefa, insira um nome para a primeira tarefa, por exemplo,
Ingest_songs_data
. - Em Tipo, selecione o tipo de tarefa Notebook.
- Em Origem, selecione Workspace.
- Use o navegador de arquivos para encontrar o notebook de ingestão de dados, clique no nome do notebook e clique em Confirmar.
- Em Cluster, selecione Shared_job_cluster ou o cluster que você criou na etapa
Create a cluster
. - Clique em Criar.
- Clique no abaixo da tarefa recém-criadas e selecione Notebook.
- Em Nome da tarefa, insira um nome para a tarefa, por exemplo,
Prepare_songs_data
. - Em Tipo, selecione o tipo de tarefa Notebook.
- Em Origem, selecione Workspace.
- Use o navegador de arquivos para encontrar o notebook de preparação de dados, clique no nome do notebook e clique em Confirmar.
- Em Cluster, selecione Shared_job_cluster ou o cluster que você criou na etapa
Create a cluster
. - Clique em Criar.
- Clique no abaixo da tarefa recém-criadas e selecione Notebook.
- Em Nome da tarefa, insira um nome para a tarefa, por exemplo,
Analyze_songs_data
. - Em Tipo, selecione o tipo de tarefa Notebook.
- Em Origem, selecione Workspace.
- Use o navegador de arquivos para encontrar o notebook de análise de dados, clique no nome do notebook e clique em Confirmar.
- Em Cluster, selecione Shared_job_cluster ou o cluster que você criou na etapa
Create a cluster
. - Clique em Criar.
- Para executar o fluxo de trabalho, clique no . Para exibir detalhes da execução, clique no link na coluna Hora de início da execução no modo de exibição execuções do trabalho. Clique em cada tarefa para exibir detalhes da execução da tarefa.
- Para exibir os resultados quando o fluxo de trabalho for concluído, clique na tarefa final de análise de dados. A página Saída aparece e exibe os resultados da consulta.
Etapa 7: agendar o trabalho do pipeline de dados
Observação
Para demonstrar o uso de um trabalho do Azure Databricks para orquestrar um fluxo de trabalho agendado, esse exemplo de introdução separa as etapas de ingestão, preparação e análise em notebooks separados e cada notebook é usado para criar uma tarefa no trabalho. Se todo o processamento estiver contido em um único notebook, você poderá agendar facilmente o notebook diretamente da interface do usuário do notebook do Azure Databricks. Consulte Criar e gerenciar trabalhos agendados do notebook.
Um requisito comum é executar um pipeline de dados por agendamento. Para definir um agendamento para o trabalho que executa o pipeline:
- Clique em Fluxos de trabalho na barra lateral.
- Na coluna Nome, clique no nome do trabalho. O painel lateral exibe os Detalhes do trabalho.
- Clique em Adicionar gatilho no painel Detalhes do trabalho e selecione Agendado no Tipo de gatilho.
- Especifique o período, a hora de início e o fuso horário. Opcionalmente, marque a caixa de seleção Mostrar sintaxe Cron para exibir e editar o agendamento na Sintaxe Quartz Cron.
- Clique em Save (Salvar).
Saiba mais
- Para saber mais sobre notebooks do Databricks, confira Introdução aos notebooks do Databricks.
- Para saber mais sobre trabalhos do Azure Databricks, confira O que são os trabalhos do Azure Databricks?.
- Para saber mais sobre o Delta Lake no Azure Databricks, veja O que é o Delta Lake?.
- Para saber mais sobre pipelines de processamento de dados com Delta Live Tables, consulte O que é o Delta Live Tables?.