Tutorial: Lago Delta
Este tutorial apresenta operações comuns do Delta Lake no Azure Databricks, incluindo as seguintes:
- Criar uma tabela.
- Upsert numa tabela.
- Ler a partir de uma tabela.
- Apresentar o histórico de tabelas.
- Consultar uma versão anterior de uma tabela.
- Otimizar uma tabela.
- Adicionar um índice de ordenação Z.
- Limpar ficheiros não referenciados.
Você pode executar o exemplo de código Python, Scala e SQL neste artigo de dentro de um bloco de anotações anexado a um recurso de computação do Azure Databricks, como um cluster. Você também pode executar o código SQL neste artigo de dentro de uma consulta associada a um armazém SQL no Databricks SQL.
Preparar os dados de origem
Este tutorial depende de um conjunto de dados chamado People 10 M. Ele contém 10 milhões de registros fictícios que contêm fatos sobre as pessoas, como nome e sobrenome, data de nascimento e salário. Este tutorial pressupõe que esse conjunto de dados esteja em um volume do Catálogo Unity associado ao seu espaço de trabalho de destino do Azure Databricks.
Para obter o conjunto de dados People 10 M para este tutorial, faça o seguinte:
- Vá para a página Pessoas 10 M no Kaggle.
- Clique em Download para baixar um arquivo nomeado
archive.zip
para sua máquina local. - Extraia o arquivo nomeado
export.csv
doarchive.zip
arquivo. Oexport.csv
arquivo contém os dados para este tutorial.
Para carregar o export.csv
arquivo no volume, faça o seguinte:
- Na barra lateral, clique em Catálogo.
- No Explorador de Catálogos, procure e abra o volume onde pretende carregar o
export.csv
ficheiro. - Clique em Carregar para este volume.
- Arraste e solte, ou procure e selecione, o
export.csv
arquivo em sua máquina local. - Clique em Carregar.
Nos exemplos de código a seguir, substitua /Volumes/main/default/my-volume/export.csv
pelo caminho para o export.csv
arquivo no volume de destino.
Criar uma tabela
Todas as tabelas criadas no Azure Databricks usam o Delta Lake por padrão. O Databricks recomenda o uso de tabelas gerenciadas do Unity Catalog.
No exemplo de código anterior e nos exemplos de código a seguir, substitua o nome main.default.people_10m
da tabela pelo seu catálogo de três partes, esquema e nome da tabela de destino no Unity Catalog.
Nota
Delta Lake é o padrão para todos os comandos de leitura, gravação e criação de tabela do Azure Databricks.
Python
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType
schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("middleName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("birthDate", TimestampType(), True),
StructField("ssn", StringType(), True),
StructField("salary", IntegerType(), True)
])
df = spark.read.format("csv").option("header", True).schema(schema).load("/Volumes/main/default/my-volume/export.csv")
# Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()
# If you know the table does not already exist, you can call this instead:
# df.saveAsTable("main.default.people_10m")
Scala
import org.apache.spark.sql.types._
val schema = StructType(Array(
StructField("id", IntegerType, nullable = true),
StructField("firstName", StringType, nullable = true),
StructField("middleName", StringType, nullable = true),
StructField("lastName", StringType, nullable = true),
StructField("gender", StringType, nullable = true),
StructField("birthDate", TimestampType, nullable = true),
StructField("ssn", StringType, nullable = true),
StructField("salary", IntegerType, nullable = true)
))
val df = spark.read.format("csv").option("header", "true").schema(schema).load("/Volumes/main/default/my-volume/export.csv")
// Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()
// If you know that the table doesn't exist, call this instead:
// df.saveAsTable("main.default.people_10m")
SQL
CREATE OR REPLACE TABLE main.default.people_10m (
id INT,
firstName STRING,
middleName STRING,
lastName STRING,
gender STRING,
birthDate TIMESTAMP,
ssn STRING,
salary INT
);
COPY INTO main.default.people_10m
FROM '/Volumes/main/default/my-volume/export.csv'
FILEFORMAT = CSV
FORMAT_OPTIONS ( 'header' = 'true', 'inferSchema' = 'true' );
As operações anteriores criam uma nova tabela gerenciada. Para obter informações sobre as opções disponíveis ao criar uma tabela Delta, consulte CREATE TABLE.
No Databricks Runtime 13.3 LTS e superior, você pode usar CREATE TABLE LIKE para criar uma nova tabela Delta vazia que duplica o esquema e as propriedades da tabela para uma tabela Delta de origem. Isso pode ser especialmente útil ao promover tabelas de um ambiente de desenvolvimento para a produção, como mostrado no exemplo de código a seguir:
CREATE TABLE main.default.people_10m_prod LIKE main.default.people_10m
Para criar uma tabela vazia, você também pode usar a DeltaTableBuilder
API no Delta Lake para Python e Scala. Em comparação com APIs DataFrameWriter equivalentes, essas APIs facilitam a especificação de informações adicionais, como comentários de coluna, propriedades de tabela e colunas geradas.
Importante
Esta funcionalidade está em Pré-visualização Pública.
Python
DeltaTable.createIfNotExists(spark)
.tableName("main.default.people_10m")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.execute()
Scala
DeltaTable.createOrReplace(spark)
.tableName("main.default.people_10m")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn(
DeltaTable.columnBuilder("lastName")
.dataType("STRING")
.comment("surname")
.build())
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.execute()
Upsert para uma mesa
Para mesclar um conjunto de atualizações e inserções em uma tabela Delta existente, use o DeltaTable.merge
método para Python e Scala e a instrução MERGE INTO para SQL. Por exemplo, o exemplo a seguir pega dados da tabela de origem e os mescla na tabela Delta de destino. Quando há uma linha correspondente em ambas as tabelas, o Delta Lake atualiza a coluna de dados usando a expressão fornecida. Quando não há uma linha correspondente, o Delta Lake adiciona uma nova linha. Esta operação é conhecida como upsert.
Python
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from datetime import date
schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("middleName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("birthDate", DateType(), True),
StructField("ssn", StringType(), True),
StructField("salary", IntegerType(), True)
])
data = [
(9999998, 'Billy', 'Tommie', 'Luppitt', 'M', date.fromisoformat('1992-09-17'), '953-38-9452', 55250),
(9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', date.fromisoformat('1984-05-22'), '906-51-2137', 48500),
(10000000, 'Joshua', 'Chas', 'Broggio', 'M', date.fromisoformat('1968-07-22'), '988-61-6247', 90000),
(20000001, 'John', '', 'Doe', 'M', date.fromisoformat('1978-01-14'), '345-67-8901', 55500),
(20000002, 'Mary', '', 'Smith', 'F', date.fromisoformat('1982-10-29'), '456-78-9012', 98250),
(20000003, 'Jane', '', 'Doe', 'F', date.fromisoformat('1981-06-25'), '567-89-0123', 89900)
]
people_10m_updates = spark.createDataFrame(data, schema)
people_10m_updates.createTempView("people_10m_updates")
# ...
from delta.tables import DeltaTable
deltaTable = DeltaTable.forName(spark, 'main.default.people_10m')
(deltaTable.alias("people_10m")
.merge(
people_10m_updates.alias("people_10m_updates"),
"people_10m.id = people_10m_updates.id")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
Scala
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import java.sql.Timestamp
val schema = StructType(Array(
StructField("id", IntegerType, nullable = true),
StructField("firstName", StringType, nullable = true),
StructField("middleName", StringType, nullable = true),
StructField("lastName", StringType, nullable = true),
StructField("gender", StringType, nullable = true),
StructField("birthDate", TimestampType, nullable = true),
StructField("ssn", StringType, nullable = true),
StructField("salary", IntegerType, nullable = true)
))
val data = Seq(
Row(9999998, "Billy", "Tommie", "Luppitt", "M", Timestamp.valueOf("1992-09-17 00:00:00"), "953-38-9452", 55250),
Row(9999999, "Elias", "Cyril", "Leadbetter", "M", Timestamp.valueOf("1984-05-22 00:00:00"), "906-51-2137", 48500),
Row(10000000, "Joshua", "Chas", "Broggio", "M", Timestamp.valueOf("1968-07-22 00:00:00"), "988-61-6247", 90000),
Row(20000001, "John", "", "Doe", "M", Timestamp.valueOf("1978-01-14 00:00:00"), "345-67-8901", 55500),
Row(20000002, "Mary", "", "Smith", "F", Timestamp.valueOf("1982-10-29 00:00:00"), "456-78-9012", 98250),
Row(20000003, "Jane", "", "Doe", "F", Timestamp.valueOf("1981-06-25 00:00:00"), "567-89-0123", 89900)
)
val people_10m_updates = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
people_10m_updates.createOrReplaceTempView("people_10m_updates")
// ...
import io.delta.tables.DeltaTable
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.as("people_10m")
.merge(
people_10m_updates.as("people_10m_updates"),
"people_10m.id = people_10m_updates.id"
)
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.execute()
SQL
CREATE OR REPLACE TEMP VIEW people_10m_updates (
id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
(9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
(9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
(10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
(20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
(20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
(20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);
MERGE INTO people_10m
USING people_10m_updates
ON people_10m.id = people_10m_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
No SQL, se você especificar *
, isso atualiza ou insere todas as colunas na tabela de destino, supondo que a tabela de origem tenha as mesmas colunas que a tabela de destino. Se a tabela de destino não tiver as mesmas colunas, a consulta lançará um erro de análise.
Você deve especificar um valor para cada coluna na tabela ao executar uma operação de inserção (por exemplo, quando não há nenhuma linha correspondente no conjunto de dados existente). No entanto, não é necessário atualizar todos os valores.
Para ver os resultados, consulte a tabela.
Python
df = spark.read.table("main.default.people_10m")
df_filtered = df.filter(df["id"] >= 9999998)
display(df_filtered)
Scala
val df = spark.read.table("main.default.people_10m")
val df_filtered = df.filter($"id" >= 9999998)
display(df_filtered)
SQL
SELECT * FROM main.default.people_10m WHERE id >= 9999998
Ler uma tabela
Você acessa dados em tabelas Delta pelo nome da tabela ou pelo caminho da tabela, conforme mostrado nos exemplos a seguir:
Python
people_df = spark.read.table("main.default.people_10m")
display(people_df)
Scala
val people_df = spark.read.table("main.default.people_10m")
display(people_df)
SQL
SELECT * FROM main.default.people_10m;
Escrever numa tabela
O Delta Lake usa sintaxe padrão para gravar dados em tabelas.
Para adicionar atomicamente novos dados a uma tabela Delta existente, use o modo de acréscimo conforme mostrado nos exemplos a seguir:
Python
df.write.mode("append").saveAsTable("main.default.people_10m")
Scala
df.write.mode("append").saveAsTable("main.default.people_10m")
SQL
INSERT INTO main.default.people_10m SELECT * FROM main.default.more_people
Para substituir todos os dados em uma tabela, use o modo de substituição como nos exemplos a seguir:
Python
df.write.mode("overwrite").saveAsTable("main.default.people_10m")
Scala
df.write.mode("overwrite").saveAsTable("main.default.people_10m")
SQL
INSERT OVERWRITE TABLE main.default.people_10m SELECT * FROM main.default.more_people
Atualizar uma tabela
Você pode atualizar dados que correspondam a um predicado em uma tabela Delta. Por exemplo, na tabela de exemplo people_10m
, para alterar uma abreviatura na gender
coluna de M
ou F
para Male
ou Female
, você pode executar o seguinte:
Python
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
condition = "gender = 'F'",
set = { "gender": "'Female'" }
)
# Declare the predicate by using Spark SQL functions.
deltaTable.update(
condition = col('gender') == 'M',
set = { 'gender': lit('Male') }
)
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
"gender = 'F'",
Map("gender" -> "'Female'")
)
import org.apache.spark.sql.functions._
import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
col("gender") === "M",
Map("gender" -> lit("Male")));
SQL
UPDATE main.default.people_10m SET gender = 'Female' WHERE gender = 'F';
UPDATE main.default.people_10m SET gender = 'Male' WHERE gender = 'M';
Excluir de uma tabela
Você pode remover dados que correspondam a um predicado de uma tabela Delta. Por exemplo, na tabela de exemplo people_10m
, para excluir todas as linhas correspondentes a pessoas com um valor na birthDate
coluna de antes 1955
, você pode executar o seguinte:
Python
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
import org.apache.spark.sql.functions._
import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")
SQL
DELETE FROM main.default.people_10m WHERE birthDate < '1955-01-01'
Importante
A exclusão remove os dados da versão mais recente da tabela Delta, mas não os remove do armazenamento físico até que as versões antigas sejam explicitamente aspiradas. Consulte vácuo para obter detalhes.
Exibir histórico da tabela
Para exibir o histórico de uma tabela, use o DeltaTable.history
método para Python e Scala e a instrução DESCRIBE HISTORY em SQL, que fornece informações de proveniência, incluindo a versão da tabela, operação, usuário e assim por diante, para cada gravação em uma tabela.
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())
SQL
DESCRIBE HISTORY main.default.people_10m
Consultar uma versão anterior da tabela (viagem no tempo)
A viagem no tempo do Lago Delta permite que você consulte um instantâneo mais antigo de uma tabela Delta.
Para consultar uma versão mais antiga de uma tabela, especifique a versão ou o carimbo de data/hora da tabela. Por exemplo, para consultar a versão 0 ou o carimbo de data/hora 2024-05-15T22:43:15.000+00:00Z
do histórico anterior, use o seguinte:
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaHistory = deltaTable.history()
display(deltaHistory.where("version == 0"))
# Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
val deltaHistory = deltaTable.history()
display(deltaHistory.where("version == 0"))
// Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))
SQL
SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'
Para carimbos de data/hora, somente cadeias de caracteres de carimbo de data ou hora são aceitas, por exemplo, "2024-05-15T22:43:15.000+00:00"
ou "2024-05-15 22:43:15"
.
As opções de DataFrameReader permitem que você crie um DataFrame a partir de uma tabela Delta que é corrigida para uma versão específica ou carimbo de data/hora da tabela, por exemplo:
Python
df = spark.read.option('versionAsOf', 0).table("main.default.people_10m")
# Or:
df = spark.read.option('timestampAsOf', '2024-05-15T22:43:15.000+00:00').table("main.default.people_10m")
display(df)
Scala
val df = spark.read.option("versionAsOf", 0).table("main.default.people_10m")
// Or:
val df = spark.read.option("timestampAsOf", "2024-05-15T22:43:15.000+00:00").table("main.default.people_10m")
display(df)
SQL
SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2024-05-15T22:43:15.000+00:00'
Para obter detalhes, consulte Trabalhar com o histórico da tabela Delta Lake.
Otimizar uma tabela
Depois de executar várias alterações em uma tabela, você pode ter muitos arquivos pequenos. Para melhorar a velocidade das consultas de leitura, você pode usar a operação otimizar para recolher arquivos pequenos em arquivos maiores:
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeCompaction()
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeCompaction()
SQL
OPTIMIZE main.default.people_10m
Ordem Z por colunas
Para melhorar ainda mais o desempenho de leitura, você pode colocar informações relacionadas no mesmo conjunto de arquivos por z-ordering. Os algoritmos de pulo de dados do Delta Lake usam essa colocação para reduzir drasticamente a quantidade de dados que precisam ser lidos. Para dados de ordem z, especifique as colunas a serem ordenadas na ordem z por operação. Por exemplo, para colocar por gender
, execute:
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")
SQL
OPTIMIZE main.default.people_10m
ZORDER BY (gender)
Para obter o conjunto completo de opções disponíveis ao executar a operação de otimização, consulte Otimizar layout de arquivo de dados.
Limpe instantâneos com VACUUM
O Delta Lake fornece isolamento de instantâneo para leituras, o que significa que é seguro executar uma operação de otimização mesmo enquanto outros usuários ou trabalhos estão consultando a tabela. Eventualmente, no entanto, você deve limpar instantâneos antigos. Você pode fazer isso executando a operação de vácuo:
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()
SQL
VACUUM main.default.people_10m
Para obter detalhes sobre como usar a operação de vácuo de forma eficaz, consulte Remover arquivos de dados não utilizados com vácuo.