Partilhar via


Tutorial: Lago Delta

Este tutorial apresenta operações comuns do Delta Lake no Azure Databricks, incluindo as seguintes:

Você pode executar o exemplo de código Python, Scala e SQL neste artigo de dentro de um bloco de anotações anexado a um recurso de computação do Azure Databricks, como um cluster. Você também pode executar o código SQL neste artigo de dentro de uma consulta associada a um armazém SQL no Databricks SQL.

Preparar os dados de origem

Este tutorial depende de um conjunto de dados chamado People 10 M. Ele contém 10 milhões de registros fictícios que contêm fatos sobre as pessoas, como nome e sobrenome, data de nascimento e salário. Este tutorial pressupõe que esse conjunto de dados esteja em um volume do Catálogo Unity associado ao seu espaço de trabalho de destino do Azure Databricks.

Para obter o conjunto de dados People 10 M para este tutorial, faça o seguinte:

  1. Vá para a página Pessoas 10 M no Kaggle.
  2. Clique em Download para baixar um arquivo nomeado archive.zip para sua máquina local.
  3. Extraia o arquivo nomeado export.csv do archive.zip arquivo. O export.csv arquivo contém os dados para este tutorial.

Para carregar o export.csv arquivo no volume, faça o seguinte:

  1. Na barra lateral, clique em Catálogo.
  2. No Explorador de Catálogos, procure e abra o volume onde pretende carregar o export.csv ficheiro.
  3. Clique em Carregar para este volume.
  4. Arraste e solte, ou procure e selecione, o export.csv arquivo em sua máquina local.
  5. Clique em Carregar.

Nos exemplos de código a seguir, substitua /Volumes/main/default/my-volume/export.csv pelo caminho para o export.csv arquivo no volume de destino.

Criar uma tabela

Todas as tabelas criadas no Azure Databricks usam o Delta Lake por padrão. O Databricks recomenda o uso de tabelas gerenciadas do Unity Catalog.

No exemplo de código anterior e nos exemplos de código a seguir, substitua o nome main.default.people_10m da tabela pelo seu catálogo de três partes, esquema e nome da tabela de destino no Unity Catalog.

Nota

Delta Lake é o padrão para todos os comandos de leitura, gravação e criação de tabela do Azure Databricks.

Python

from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType

schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("firstName", StringType(), True),
  StructField("middleName", StringType(), True),
  StructField("lastName", StringType(), True),
  StructField("gender", StringType(), True),
  StructField("birthDate", TimestampType(), True),
  StructField("ssn", StringType(), True),
  StructField("salary", IntegerType(), True)
])

df = spark.read.format("csv").option("header", True).schema(schema).load("/Volumes/main/default/my-volume/export.csv")

# Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()

# If you know the table does not already exist, you can call this instead:
# df.saveAsTable("main.default.people_10m")

Scala

import org.apache.spark.sql.types._

val schema = StructType(Array(
  StructField("id", IntegerType, nullable = true),
  StructField("firstName", StringType, nullable = true),
  StructField("middleName", StringType, nullable = true),
  StructField("lastName", StringType, nullable = true),
  StructField("gender", StringType, nullable = true),
  StructField("birthDate", TimestampType, nullable = true),
  StructField("ssn", StringType, nullable = true),
  StructField("salary", IntegerType, nullable = true)
))

val df = spark.read.format("csv").option("header", "true").schema(schema).load("/Volumes/main/default/my-volume/export.csv")

// Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()

// If you know that the table doesn't exist, call this instead:
// df.saveAsTable("main.default.people_10m")

SQL

CREATE OR REPLACE TABLE main.default.people_10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
);

COPY INTO main.default.people_10m
FROM '/Volumes/main/default/my-volume/export.csv'
FILEFORMAT = CSV
FORMAT_OPTIONS ( 'header' = 'true', 'inferSchema' = 'true' );

As operações anteriores criam uma nova tabela gerenciada. Para obter informações sobre as opções disponíveis ao criar uma tabela Delta, consulte CREATE TABLE.

No Databricks Runtime 13.3 LTS e superior, você pode usar CREATE TABLE LIKE para criar uma nova tabela Delta vazia que duplica o esquema e as propriedades da tabela para uma tabela Delta de origem. Isso pode ser especialmente útil ao promover tabelas de um ambiente de desenvolvimento para a produção, como mostrado no exemplo de código a seguir:

CREATE TABLE main.default.people_10m_prod LIKE main.default.people_10m

Para criar uma tabela vazia, você também pode usar a DeltaTableBuilder API no Delta Lake para Python e Scala. Em comparação com APIs DataFrameWriter equivalentes, essas APIs facilitam a especificação de informações adicionais, como comentários de coluna, propriedades de tabela e colunas geradas.

Importante

Esta funcionalidade está em Pré-visualização Pública.

Python

DeltaTable.createIfNotExists(spark)
  .tableName("main.default.people_10m")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .execute()

Scala

DeltaTable.createOrReplace(spark)
  .tableName("main.default.people_10m")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .execute()

Upsert para uma mesa

Para mesclar um conjunto de atualizações e inserções em uma tabela Delta existente, use o DeltaTable.merge método para Python e Scala e a instrução MERGE INTO para SQL. Por exemplo, o exemplo a seguir pega dados da tabela de origem e os mescla na tabela Delta de destino. Quando há uma linha correspondente em ambas as tabelas, o Delta Lake atualiza a coluna de dados usando a expressão fornecida. Quando não há uma linha correspondente, o Delta Lake adiciona uma nova linha. Esta operação é conhecida como upsert.

Python

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from datetime import date

schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("firstName", StringType(), True),
  StructField("middleName", StringType(), True),
  StructField("lastName", StringType(), True),
  StructField("gender", StringType(), True),
  StructField("birthDate", DateType(), True),
  StructField("ssn", StringType(), True),
  StructField("salary", IntegerType(), True)
])

data = [
  (9999998, 'Billy', 'Tommie', 'Luppitt', 'M', date.fromisoformat('1992-09-17'), '953-38-9452', 55250),
  (9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', date.fromisoformat('1984-05-22'), '906-51-2137', 48500),
  (10000000, 'Joshua', 'Chas', 'Broggio', 'M', date.fromisoformat('1968-07-22'), '988-61-6247', 90000),
  (20000001, 'John', '', 'Doe', 'M', date.fromisoformat('1978-01-14'), '345-67-8901', 55500),
  (20000002, 'Mary', '', 'Smith', 'F', date.fromisoformat('1982-10-29'), '456-78-9012', 98250),
  (20000003, 'Jane', '', 'Doe', 'F', date.fromisoformat('1981-06-25'), '567-89-0123', 89900)
]

people_10m_updates = spark.createDataFrame(data, schema)
people_10m_updates.createTempView("people_10m_updates")

# ...

from delta.tables import DeltaTable

deltaTable = DeltaTable.forName(spark, 'main.default.people_10m')

(deltaTable.alias("people_10m")
  .merge(
    people_10m_updates.alias("people_10m_updates"),
    "people_10m.id = people_10m_updates.id")
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .execute()
)

Scala

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import java.sql.Timestamp

val schema = StructType(Array(
  StructField("id", IntegerType, nullable = true),
  StructField("firstName", StringType, nullable = true),
  StructField("middleName", StringType, nullable = true),
  StructField("lastName", StringType, nullable = true),
  StructField("gender", StringType, nullable = true),
  StructField("birthDate", TimestampType, nullable = true),
  StructField("ssn", StringType, nullable = true),
  StructField("salary", IntegerType, nullable = true)
))

val data = Seq(
  Row(9999998, "Billy", "Tommie", "Luppitt", "M", Timestamp.valueOf("1992-09-17 00:00:00"), "953-38-9452", 55250),
  Row(9999999, "Elias", "Cyril", "Leadbetter", "M", Timestamp.valueOf("1984-05-22 00:00:00"), "906-51-2137", 48500),
  Row(10000000, "Joshua", "Chas", "Broggio", "M", Timestamp.valueOf("1968-07-22 00:00:00"), "988-61-6247", 90000),
  Row(20000001, "John", "", "Doe", "M", Timestamp.valueOf("1978-01-14 00:00:00"), "345-67-8901", 55500),
  Row(20000002, "Mary", "", "Smith", "F", Timestamp.valueOf("1982-10-29 00:00:00"), "456-78-9012", 98250),
  Row(20000003, "Jane", "", "Doe", "F", Timestamp.valueOf("1981-06-25 00:00:00"), "567-89-0123", 89900)
)

val people_10m_updates = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
people_10m_updates.createOrReplaceTempView("people_10m_updates")

// ...

import io.delta.tables.DeltaTable

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

deltaTable.as("people_10m")
  .merge(
    people_10m_updates.as("people_10m_updates"),
    "people_10m.id = people_10m_updates.id"
  )
  .whenMatched()
  .updateAll()
  .whenNotMatched()
  .insertAll()
  .execute()

SQL

CREATE OR REPLACE TEMP VIEW people_10m_updates (
  id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
  (9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
  (9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
  (10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
  (20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
  (20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
  (20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);

MERGE INTO people_10m
USING people_10m_updates
ON people_10m.id = people_10m_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

No SQL, se você especificar *, isso atualiza ou insere todas as colunas na tabela de destino, supondo que a tabela de origem tenha as mesmas colunas que a tabela de destino. Se a tabela de destino não tiver as mesmas colunas, a consulta lançará um erro de análise.

Você deve especificar um valor para cada coluna na tabela ao executar uma operação de inserção (por exemplo, quando não há nenhuma linha correspondente no conjunto de dados existente). No entanto, não é necessário atualizar todos os valores.

Para ver os resultados, consulte a tabela.

Python

df = spark.read.table("main.default.people_10m")
df_filtered = df.filter(df["id"] >= 9999998)
display(df_filtered)

Scala

val df = spark.read.table("main.default.people_10m")
val df_filtered = df.filter($"id" >= 9999998)
display(df_filtered)

SQL

SELECT * FROM main.default.people_10m WHERE id >= 9999998

Ler uma tabela

Você acessa dados em tabelas Delta pelo nome da tabela ou pelo caminho da tabela, conforme mostrado nos exemplos a seguir:

Python

people_df = spark.read.table("main.default.people_10m")
display(people_df)

Scala

val people_df = spark.read.table("main.default.people_10m")
display(people_df)

SQL

SELECT * FROM main.default.people_10m;

Escrever numa tabela

O Delta Lake usa sintaxe padrão para gravar dados em tabelas.

Para adicionar atomicamente novos dados a uma tabela Delta existente, use o modo de acréscimo conforme mostrado nos exemplos a seguir:

Python

df.write.mode("append").saveAsTable("main.default.people_10m")

Scala

df.write.mode("append").saveAsTable("main.default.people_10m")

SQL

INSERT INTO main.default.people_10m SELECT * FROM main.default.more_people

Para substituir todos os dados em uma tabela, use o modo de substituição como nos exemplos a seguir:

Python

df.write.mode("overwrite").saveAsTable("main.default.people_10m")

Scala

df.write.mode("overwrite").saveAsTable("main.default.people_10m")

SQL

INSERT OVERWRITE TABLE main.default.people_10m SELECT * FROM main.default.more_people

Atualizar uma tabela

Você pode atualizar dados que correspondam a um predicado em uma tabela Delta. Por exemplo, na tabela de exemplo people_10m , para alterar uma abreviatura na gender coluna de M ou F para Male ou Female, você pode executar o seguinte:

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
  condition = "gender = 'F'",
  set = { "gender": "'Female'" }
)

# Declare the predicate by using Spark SQL functions.
deltaTable.update(
  condition = col('gender') == 'M',
  set = { 'gender': lit('Male') }
)

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
  "gender = 'F'",
  Map("gender" -> "'Female'")
)

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
  col("gender") === "M",
  Map("gender" -> lit("Male")));

SQL

UPDATE main.default.people_10m SET gender = 'Female' WHERE gender = 'F';
UPDATE main.default.people_10m SET gender = 'Male' WHERE gender = 'M';

Excluir de uma tabela

Você pode remover dados que correspondam a um predicado de uma tabela Delta. Por exemplo, na tabela de exemplo people_10m , para excluir todas as linhas correspondentes a pessoas com um valor na birthDate coluna de antes 1955, você pode executar o seguinte:

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")

SQL

DELETE FROM main.default.people_10m WHERE birthDate < '1955-01-01'

Importante

A exclusão remove os dados da versão mais recente da tabela Delta, mas não os remove do armazenamento físico até que as versões antigas sejam explicitamente aspiradas. Consulte vácuo para obter detalhes.

Exibir histórico da tabela

Para exibir o histórico de uma tabela, use o DeltaTable.history método para Python e Scala e a instrução DESCRIBE HISTORY em SQL, que fornece informações de proveniência, incluindo a versão da tabela, operação, usuário e assim por diante, para cada gravação em uma tabela.

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())

SQL

DESCRIBE HISTORY main.default.people_10m

Consultar uma versão anterior da tabela (viagem no tempo)

A viagem no tempo do Lago Delta permite que você consulte um instantâneo mais antigo de uma tabela Delta.

Para consultar uma versão mais antiga de uma tabela, especifique a versão ou o carimbo de data/hora da tabela. Por exemplo, para consultar a versão 0 ou o carimbo de data/hora 2024-05-15T22:43:15.000+00:00Z do histórico anterior, use o seguinte:

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaHistory = deltaTable.history()

display(deltaHistory.where("version == 0"))
# Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
val deltaHistory = deltaTable.history()

display(deltaHistory.where("version == 0"))
// Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))

SQL

SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'

Para carimbos de data/hora, somente cadeias de caracteres de carimbo de data ou hora são aceitas, por exemplo, "2024-05-15T22:43:15.000+00:00" ou "2024-05-15 22:43:15".

As opções de DataFrameReader permitem que você crie um DataFrame a partir de uma tabela Delta que é corrigida para uma versão específica ou carimbo de data/hora da tabela, por exemplo:

Python

df = spark.read.option('versionAsOf', 0).table("main.default.people_10m")
# Or:
df = spark.read.option('timestampAsOf', '2024-05-15T22:43:15.000+00:00').table("main.default.people_10m")

display(df)

Scala

val df = spark.read.option("versionAsOf", 0).table("main.default.people_10m")
// Or:
val df = spark.read.option("timestampAsOf", "2024-05-15T22:43:15.000+00:00").table("main.default.people_10m")

display(df)

SQL

SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2024-05-15T22:43:15.000+00:00'

Para obter detalhes, consulte Trabalhar com o histórico da tabela Delta Lake.

Otimizar uma tabela

Depois de executar várias alterações em uma tabela, você pode ter muitos arquivos pequenos. Para melhorar a velocidade das consultas de leitura, você pode usar a operação otimizar para recolher arquivos pequenos em arquivos maiores:

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeCompaction()

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeCompaction()

SQL

OPTIMIZE main.default.people_10m

Ordem Z por colunas

Para melhorar ainda mais o desempenho de leitura, você pode colocar informações relacionadas no mesmo conjunto de arquivos por z-ordering. Os algoritmos de pulo de dados do Delta Lake usam essa colocação para reduzir drasticamente a quantidade de dados que precisam ser lidos. Para dados de ordem z, especifique as colunas a serem ordenadas na ordem z por operação. Por exemplo, para colocar por gender, execute:

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")

SQL

OPTIMIZE main.default.people_10m
ZORDER BY (gender)

Para obter o conjunto completo de opções disponíveis ao executar a operação de otimização, consulte Otimizar layout de arquivo de dados.

Limpe instantâneos com VACUUM

O Delta Lake fornece isolamento de instantâneo para leituras, o que significa que é seguro executar uma operação de otimização mesmo enquanto outros usuários ou trabalhos estão consultando a tabela. Eventualmente, no entanto, você deve limpar instantâneos antigos. Você pode fazer isso executando a operação de vácuo:

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()

SQL

VACUUM main.default.people_10m

Para obter detalhes sobre como usar a operação de vácuo de forma eficaz, consulte Remover arquivos de dados não utilizados com vácuo.