Descrição geral do Delta Lake da Fundação Linux
Este artigo foi adaptado para maior clareza por parte do seu homólogo original aqui. Este artigo ajuda-o a explorar rapidamente as principais funcionalidades do Delta Lake. O artigo fornece fragmentos de código que mostram como ler e escrever em tabelas do Delta Lake a partir de consultas interativas, em lote e de transmissão em fluxo. Os fragmentos de código também estão disponíveis num conjunto de blocos de notas PySpark aqui, Scala aqui e C# aqui
Eis o que vamos abordar:
- Criar uma tabela
- Ler dados
- Atualizar dados da tabela
- Substituir dados da tabela
- Atualização condicional sem substituição
- Ler versões mais antigas de dados com o Time Travel
- Escrever um fluxo de dados numa tabela
- Ler um fluxo de alterações a partir de uma tabela
- Suporte do SQL
Configuração
Certifique-se de que modifica o seguinte conforme adequado para o seu ambiente.
import random
session_id = random.randint(0,1000000)
delta_table_path = "/delta/delta-table-{0}".format(session_id)
delta_table_path
var sessionId = (new Random()).Next(10000000);
var deltaTablePath = $"/delta/delta-table-{sessionId}";
deltaTablePath
val sessionId = scala.util.Random.nextInt(1000000)
val deltaTablePath = s"/delta/delta-table-$sessionId";
Resulta em:
"/delta/delta-table-335323"
Criar uma tabela
Para criar uma tabela do Delta Lake, escreva um DataFrame num DataFrame no formato delta. Pode alterar o formato de Parquet, CSV, JSON, etc., para delta.
O código que se segue mostra-lhe como criar uma nova tabela do Delta Lake com o esquema inferido do DataFrame.
data = spark.range(0,5)
data.show()
data.write.format("delta").save(delta_table_path)
var data = spark.Range(0,5);
data.Show();
data.Write().Format("delta").Save(deltaTablePath);
val data = spark.range(0, 5)
data.show
data.write.format("delta").save(deltaTablePath)
Resulta em:
ID |
---|
0 |
1 |
2 |
3 |
4 |
Ler dados
Pode ler dados na sua tabela do Delta Lake ao especificar o caminho para os ficheiros e o formato delta.
df = spark.read.format("delta").load(delta_table_path)
df.show()
var df = spark.Read().Format("delta").Load(deltaTablePath);
df.Show()
val df = spark.read.format("delta").load(deltaTablePath)
df.show()
Resulta em:
ID |
---|
1 |
3 |
4 |
0 |
2 |
A ordem dos resultados é diferente de acima, uma vez que não foi especificada explicitamente nenhuma ordem antes de exportar os resultados.
Atualizar dados da tabela
O Delta Lake suporta várias operações para modificar tabelas com APIs de DataFrame padrão. Estas operações são um dos melhoramentos que o formato delta adiciona. O exemplo seguinte executa uma tarefa de lote para substituir os dados na tabela.
data = spark.range(5,10)
data.write.format("delta").mode("overwrite").save(delta_table_path)
df.show()
var data = spark.Range(5,10);
data.Write().Format("delta").Mode("overwrite").Save(deltaTablePath);
df.Show();
val data = spark.range(5, 10)
data.write.format("delta").mode("overwrite").save(deltaTablePath)
df.show()
Resulta em:
ID |
---|
7 |
8 |
5 |
9 |
6 |
Aqui, pode ver que todos os cinco registos foram atualizados para conter novos valores.
Guardar como tabelas de catálogo
O Delta Lake pode escrever em tabelas de catálogo geridas ou externas.
data.write.format("delta").saveAsTable("ManagedDeltaTable")
spark.sql("CREATE TABLE ExternalDeltaTable USING DELTA LOCATION '{0}'".format(delta_table_path))
spark.sql("SHOW TABLES").show()
data.Write().Format("delta").SaveAsTable("ManagedDeltaTable");
spark.Sql($"CREATE TABLE ExternalDeltaTable USING DELTA LOCATION '{deltaTablePath}'");
spark.Sql("SHOW TABLES").Show();
data.write.format("delta").saveAsTable("ManagedDeltaTable")
spark.sql(s"CREATE TABLE ExternalDeltaTable USING DELTA LOCATION '$deltaTablePath'")
spark.sql("SHOW TABLES").show
Resulta em:
base de dados | tableName | isTemporary |
---|---|---|
predefinição | externaldeltatable | false |
predefinição | geridodeltatable | false |
Com este código, criou uma nova tabela no catálogo a partir de um dataframe existente, referido como uma tabela gerida. Em seguida, definiu uma nova tabela externa no catálogo que utiliza uma localização existente, referida como uma tabela externa. No resultado, pode ver que ambas as tabelas, independentemente da forma como foram criadas, são listadas no catálogo.
Agora, pode ver as propriedades expandidas de ambas as tabelas
spark.sql("DESCRIBE EXTENDED ManagedDeltaTable").show(truncate=False)
spark.Sql("DESCRIBE EXTENDED ManagedDeltaTable").Show(truncate: 0);
spark.sql("DESCRIBE EXTENDED ManagedDeltaTable").show(truncate=false)
Resulta em:
col_name | data_type | comentário |
---|---|---|
ID | bigint | nulo |
Informações Detalhadas da Tabela | ||
Base de Dados | predefinição | |
Tabela | geridodeltatable | |
Proprietário | trusted-service-user | |
Hora de Criação | Sáb 25 00:35:34 UTC 2020 | |
Último Acesso | Qui Jan 01 00:00:00 UTC 1970 | |
Criada Por | Spark 2.4.4.2.6.99.201-11401300 | |
Tipo | GERIDO | |
Fornecedor | delta | |
Propriedades da Tabela | [transient_lastDdlTime=1587774934] | |
Estatísticas | 2407 bytes | |
Localização | abfss://data@<data lake.dfs.core.windows.net/synapse/workspaces/>< workspace name>/warehouse/manageddeltatable | |
Biblioteca serde | org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe | |
InputFormat | org.apache.hadoop.mapred.SequenceFileInputFormat | |
OutputFormat | org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat | |
Propriedades de Armazenamento | [serialization.format=1] |
spark.sql("DESCRIBE EXTENDED ExternalDeltaTable").show(truncate=False)
spark.Sql("DESCRIBE EXTENDED ExternalDeltaTable").Show(truncate: 0);
spark.sql("DESCRIBE EXTENDED ExternalDeltaTable").show(truncate=false)
Resultados em:
col_name | data_type | comentário |
---|---|---|
ID | bigint | nulo |
Informações Detalhadas da Tabela | ||
Base de Dados | predefinição | |
Tabela | externaldeltatable | |
Proprietário | trusted-service-user | |
Hora de Criação | Sáb 25 00:35:38 UTC 2020 | |
Último Acesso | Thu Jan 01 00:00:00 UTC 1970 | |
Criada Por | Spark 2.4.4.2.6.99.201-11401300 | |
Tipo | EXTERNO | |
Fornecedor | DELTA | |
Propriedades da Tabela | [transient_lastDdlTime=1587774938] | |
Localização | abfss://data@<data lake.dfs.core.windows.net/delta/delta-table-587152> | |
Biblioteca serde | org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe | |
InputFormat | org.apache.hadoop.mapred.SequenceFileInputFormat | |
OutputFormat | org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat | |
Propriedades de Armazenamento | [serialization.format=1] |
Atualização condicional sem substituição
O Delta Lake fornece APIs programáticas para atualização condicional, eliminação e intercalação (este comando é normalmente referido como um upsert) em tabelas.
from delta.tables import *
from pyspark.sql.functions import *
delta_table = DeltaTable.forPath(spark, delta_table_path)
delta_table.update(
condition = expr("id % 2 == 0"),
set = { "id": expr("id + 100") })
delta_table.toDF().show()
using Microsoft.Spark.Extensions.Delta;
using Microsoft.Spark.Extensions.Delta.Tables;
using Microsoft.Spark.Sql;
using static Microsoft.Spark.Sql.Functions;
var deltaTable = DeltaTable.ForPath(deltaTablePath);
deltaTable.Update(
condition: Expr("id % 2 == 0"),
set: new Dictionary<string, Column>(){{ "id", Expr("id + 100") }});
deltaTable.ToDF().Show();
import io.delta.tables._
import org.apache.spark.sql.functions._
val deltaTable = DeltaTable.forPath(deltaTablePath)
// Update every even value by adding 100 to it
deltaTable.update(
condition = expr("id % 2 == 0"),
set = Map("id" -> expr("id + 100")))
deltaTable.toDF.show
Resultados em:
ID |
---|
106 |
108 |
5 |
7 |
9 |
Aqui acabou de adicionar 100 a cada ID par.
delta_table.delete("id % 2 == 0")
delta_table.toDF().show()
deltaTable.Delete(condition: Expr("id % 2 == 0"));
deltaTable.ToDF().Show();
deltaTable.delete(condition = expr("id % 2 == 0"))
deltaTable.toDF.show
Resultados em:
ID |
---|
5 |
7 |
9 |
Repare que todas as linhas pares foram eliminadas.
new_data = spark.range(0,20).alias("newData")
delta_table.alias("oldData")\
.merge(new_data.alias("newData"), "oldData.id = newData.id")\
.whenMatchedUpdate(set = { "id": lit("-1")})\
.whenNotMatchedInsert(values = { "id": col("newData.id") })\
.execute()
delta_table.toDF().show(100)
var newData = spark.Range(20).As("newData");
deltaTable
.As("oldData")
.Merge(newData, "oldData.id = newData.id")
.WhenMatched()
.Update(new Dictionary<string, Column>() {{"id", Lit("-1")}})
.WhenNotMatched()
.Insert(new Dictionary<string, Column>() {{"id", Col("newData.id")}})
.Execute();
deltaTable.ToDF().Show(100);
val newData = spark.range(0, 20).toDF
deltaTable.as("oldData").
merge(
newData.as("newData"),
"oldData.id = newData.id").
whenMatched.
update(Map("id" -> lit(-1))).
whenNotMatched.
insert(Map("id" -> col("newData.id"))).
execute()
deltaTable.toDF.show()
Resultados em:
ID |
---|
18 |
15 |
19 |
2 |
1 |
6 |
8 |
3 |
-1 |
10 |
13 |
0 |
16 |
4 |
-1 |
12 |
11 |
14 |
-1 |
17 |
Aqui tem uma combinação dos dados existentes. Os dados existentes foram atribuídos ao valor -1 no caminho de código update(WhenMatched). Os novos dados que foram criados na parte superior do fragmento e que foram adicionados através do caminho de código de inserção (WhenNotMatched), também foram adicionados.
Histórico
O Delta Lake's tem a capacidade de permitir olhar para a história de uma tabela. Ou seja, as alterações efetuadas à Tabela Delta subjacente. A célula abaixo mostra como é simples inspecionar o histórico.
delta_table.history().show(20, 1000, False)
deltaTable.History().Show(20, 1000, false);
deltaTable.history.show(false)
Resultados em:
versão | carimbo de data/hora | userId | userName | operation | operationParameters | tarefa | bloco de notas | clusterId | readVersion | isolationLevel | isBlindAppend |
---|---|---|---|---|---|---|---|---|---|---|---|
4 | 2020-04-25 00:36:27 | nulo | nulo | INTERCALAR | [predicado -> (oldData.ID = newData.ID )] |
nulo | nulo | nulo | 3 | nulo | false |
3 | 2020-04-25 00:36:08 | nulo | nulo | DELETE | [predicado -> ["((ID % CAST(2 AS BIGINT)) = CAST(0 AS BIGINT))"]] |
nulo | nulo | nulo | 2 | nulo | false |
2 | 2020-04-25 00:35:51 | nulo | nulo | UPDATE | [predicado -> ((ID#744L % cast(2 as bigint)) = cast(0 as bigint))] | nulo | nulo | nulo | 1 | nulo | false |
1 | 2020-04-25 00:35:05 | nulo | nulo | ESCREVER | [modo -> Substituir, partitionBy -> []] | nulo | nulo | nulo | 0 | nulo | false |
0 | 2020-04-25 00:34:34 | nulo | nulo | ESCREVER | [modo -> ErrorIfExists, partitionBy -> []] | nulo | nulo | nulo | nulo | nulo | true |
Aqui, pode ver todas as modificações efetuadas nos fragmentos de código acima.
Ler versões mais antigas de dados com o Time Travel
É possível consultar instantâneos anteriores da sua tabela do Delta Lake através de uma funcionalidade denominada Viagem no Tempo. Se quiser aceder aos dados que substituiu, pode consultar um instantâneo da tabela antes de substituir o primeiro conjunto de dados com a opção versionAsOf.
Depois de executar a célula abaixo, deverá ver o primeiro conjunto de dados de antes de a substituir. O Time Travel é uma funcionalidade avançada que tira partido do poder do registo de transações do Delta Lake para aceder a dados que já não estão na tabela. Remover a opção da versão 0 (ou especificar a versão 1) permitir-lhe-ia ver novamente os dados mais recentes. Para obter mais informações, veja Consultar um instantâneo mais antigo de uma tabela.
df = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)
df.show()
var df = spark.Read().Format("delta").Option("versionAsOf", 0).Load(deltaTablePath);
df.Show();
val df = spark.read.format("delta").option("versionAsOf", 0).load(deltaTablePath)
df.show()
Resulta em:
ID |
---|
0 |
1 |
4 |
3 |
2 |
Aqui, pode ver que voltou à versão mais antiga dos dados.
Escrever um fluxo de dados numa tabela
Também pode escrever numa tabela do Delta Lake com a Transmissão em Fluxo Estruturada do Spark. O registo de transações do Delta Lake garante exatamente uma vez o processamento, mesmo quando existem outros fluxos ou consultas em lote em execução em simultâneo na tabela. Por predefinição, os fluxos são executados no modo de acréscimo, o que adiciona novos registos à tabela.
Para obter mais informações sobre a integração do Delta Lake com a Transmissão em Fluxo Estruturada, veja Leituras e Escritas de Transmissão em Fluxo de Tabelas.
Nas células abaixo, eis o que estamos a fazer:
- Célula 30 Mostrar os dados recém-anexados
- Célula 31 Inspecionar histórico
- Célula 32 Parar a tarefa de transmissão em fluxo estruturada
- Célula 33 Inspecionar histórico <- Verá que os acréscimos pararam
Primeiro, vai configurar uma tarefa simples de Transmissão em Fluxo do Spark para gerar uma sequência e fazer com que a tarefa escreva na sua Tabela Delta.
streaming_df = spark.readStream.format("rate").load()
stream = streaming_df\
.selectExpr("value as id")\
.writeStream\
.format("delta")\
.option("checkpointLocation", "/tmp/checkpoint-{0}".format(session_id))\
.start(delta_table_path)
var streamingDf = spark.ReadStream().Format("rate").Load();
var stream = streamingDf.SelectExpr("value as id").WriteStream().Format("delta").Option("checkpointLocation", $"/tmp/checkpoint-{sessionId}").Start(deltaTablePath);
val streamingDf = spark.readStream.format("rate").load()
val stream = streamingDf.select($"value" as "id").writeStream.format("delta").option("checkpointLocation", s"/tmp/checkpoint-$sessionId").start(deltaTablePath)
Ler um fluxo de alterações a partir de uma tabela
Enquanto o fluxo está a escrever na tabela do Delta Lake, também pode ler a partir dessa tabela como uma origem de transmissão em fluxo. Por exemplo, pode iniciar outra consulta de transmissão em fluxo que imprima todas as alterações efetuadas à tabela do Delta Lake.
delta_table.toDF().sort(col("id").desc()).show(100)
deltaTable.ToDF().Sort(Col("id").Desc()).Show(100);
deltaTable.toDF.sort($"id".desc).show
Resulta em:
ID |
---|
19 |
18 |
17 |
16 |
15 |
14 |
13 |
12 |
11 |
10 |
8 |
6 |
4 |
3 |
2 |
1 |
0 |
-1 |
-1 |
-1 |
delta_table.history().drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").show(20, 1000, False)
deltaTable.History().Drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").Show(20, 1000, false);
deltaTable.history.show
Resulta em:
versão | carimbo de data/hora | operation | operationParameters | readVersion |
---|---|---|---|---|
5 | 2020-04-25 00:37:09 | ATUALIZAÇÃO DE TRANSMISSÃO EM FLUXO | [outputMode -> Acrescentar, queryId -> d26b4f8a-7e5a-44f2-a5fb-23a7bd02aef7, epochId -> 0] | 4 |
4 | 2020-04-25 00:36:27 | INTERCALAR | [predicado -> (oldData.id = newData.id )] |
3 |
3 | 2020-04-25 00:36:08 | DELETE | [predicado -> ["((id % CAST(2 AS BIGINT)) = CAST(0 AS BIGINT))"]] |
2 |
2 | 2020-04-25 00:35:51 | UPDATE | [predicado -> ((id#744L % cast(2 as bigint)) = cast(0 as bigint))] | 1 |
1 | 2020-04-25 00:35:05 | ESCREVER | [modo -> Substituir, partitionBy -> []] | 0 |
0 | 2020-04-25 00:34:34 | ESCREVER | [modo -> ErrorIfExists, partitionBy -> []] | nulo |
Aqui, está a remover algumas das colunas menos interessantes para simplificar a experiência de visualização da vista do histórico.
stream.stop()
delta_table.history().drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").show(100, 1000, False)
stream.Stop();
deltaTable.History().Drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").Show(100, 1000, false);
stream.stop
deltaTable.history.show
Resulta em:
versão | carimbo de data/hora | operation | operationParameters | readVersion |
---|---|---|---|---|
5 | 2020-04-25 00:37:09 | ATUALIZAÇÃO DE TRANSMISSÃO EM FLUXO | [outputMode -> Acrescentar, queryId -> d26b4f8a-7e5a-44f2-a5fb-23a7bd02aef7, epochId -> 0] | 4 |
4 | 2020-04-25 00:36:27 | INTERCALAR | [predicado -> (oldData.id = newData.id )] |
3 |
3 | 2020-04-25 00:36:08 | DELETE | [predicado -> ["((id % CAST(2 AS BIGINT)) = CAST(0 AS BIGINT))"]] |
2 |
2 | 2020-04-25 00:35:51 | UPDATE | [predicado -> ((id#744L % cast(2 as bigint)) = cast(0 as bigint))] | 1 |
1 | 2020-04-25 00:35:05 | ESCREVER | [modo -> Substituir, partitionBy -> []] | 0 |
0 | 2020-04-25 00:34:34 | ESCREVER | [modo -> ErrorIfExists, partitionBy -> []] | nulo |
Converter Parquet em Delta
Pode fazer uma conversão no local do formato Parquet para Delta.
Aqui, irá testar se a tabela existente está ou não no formato delta.
parquet_path = "/parquet/parquet-table-{0}".format(session_id)
data = spark.range(0,5)
data.write.parquet(parquet_path)
DeltaTable.isDeltaTable(spark, parquet_path)
var parquetPath = $"/parquet/parquet-table-{sessionId}";
var data = spark.Range(0,5);
data.Write().Parquet(parquetPath);
DeltaTable.IsDeltaTable(parquetPath)
val parquetPath = s"/parquet/parquet-table-$sessionId"
val data = spark.range(0,5)
data.write.parquet(parquetPath)
DeltaTable.isDeltaTable(parquetPath)
Resulta em:
Falso
Agora, vai converter os dados em formato delta e verificar se funcionaram.
DeltaTable.convertToDelta(spark, "parquet.`{0}`".format(parquet_path))
DeltaTable.isDeltaTable(spark, parquet_path)
DeltaTable.ConvertToDelta(spark, $"parquet.`{parquetPath}`");
DeltaTable.IsDeltaTable(parquetPath)
DeltaTable.convertToDelta(spark, s"parquet.`$parquetPath`")
DeltaTable.isDeltaTable(parquetPath)
Resulta em:
Verdadeiro
Suporte do SQL
A Delta suporta comandos utilitários de tabela através do SQL. Pode utilizar o SQL para:
- Obter o histórico de uma DeltaTable
- Aspirar uma DeltaTable
- Converter um ficheiro Parquet em Delta
spark.sql("DESCRIBE HISTORY delta.`{0}`".format(delta_table_path)).show()
spark.Sql($"DESCRIBE HISTORY delta.`{deltaTablePath}`").Show();
spark.sql(s"DESCRIBE HISTORY delta.`$deltaTablePath`").show()
Resultados em:
versão | carimbo de data/hora | userId | userName | operation | operationParameters | tarefa | bloco de notas | clusterId | readVersion | isolationLevel | isBlindAppend |
---|---|---|---|---|---|---|---|---|---|---|---|
5 | 2020-04-25 00:37:09 | nulo | nulo | ATUALIZAÇÃO DE TRANSMISSÃO EM FLUXO | [outputMode -> Ap... | nulo | nulo | nulo | 4 | nulo | true |
4 | 2020-04-25 00:36:27 | nulo | nulo | INTERCALAR | [predicado -> (ol... | nulo | nulo | nulo | 3 | nulo | false |
3 | 2020-04-25 00:36:08 | nulo | nulo | DELETE | [predicado -> ["(... | nulo | nulo | nulo | 2 | nulo | false |
2 | 2020-04-25 00:35:51 | nulo | nulo | UPDATE | [predicado -> ((i... | nulo | nulo | nulo | 1 | nulo | false |
1 | 2020-04-25 00:35:05 | nulo | nulo | ESCREVER | [modo -> Substituir... | nulo | nulo | nulo | 0 | nulo | false |
0 | 2020-04-25 00:34:34 | nulo | nulo | ESCREVER | [modo -> ErrorIfE... | nulo | nulo | nulo | nulo | nulo | true |
spark.sql("VACUUM delta.`{0}`".format(delta_table_path)).show()
spark.Sql($"VACUUM delta.`{deltaTablePath}`").Show();
spark.sql(s"VACUUM delta.`$deltaTablePath`").show()
Resultados em:
caminho |
---|
abfss://data@arca... |
Agora, vai verificar se uma tabela não é uma tabela de formato delta. Em seguida, irá converter a tabela em formato delta com o SQL do Spark e confirmar que foi convertida corretamente.
parquet_id = random.randint(0,1000)
parquet_path = "/parquet/parquet-table-{0}-{1}".format(session_id, parquet_id)
data = spark.range(0,5)
data.write.parquet(parquet_path)
DeltaTable.isDeltaTable(spark, parquet_path)
spark.sql("CONVERT TO DELTA parquet.`{0}`".format(parquet_path))
DeltaTable.isDeltaTable(spark, parquet_path)
var parquetId = (new Random()).Next(10000000);
var parquetPath = $"/parquet/parquet-table-{sessionId}-{parquetId}";
var data = spark.Range(0,5);
data.Write().Parquet(parquetPath);
DeltaTable.IsDeltaTable(parquetPath);
spark.Sql($"CONVERT TO DELTA parquet.`{parquetPath}`");
DeltaTable.IsDeltaTable(parquetPath);
val parquetId = scala.util.Random.nextInt(1000)
val parquetPath = s"/parquet/parquet-table-$sessionId-$parquetId"
val data = spark.range(0,5)
data.write.parquet(parquetPath)
DeltaTable.isDeltaTable(parquetPath)
spark.sql(s"CONVERT TO DELTA parquet.`$parquetPath`")
DeltaTable.isDeltaTable(parquetPath)
Resultados em:
Verdadeiro
Para obter a documentação completa, consulte a Página de Documentação do Delta Lake
Para obter mais informações, veja Delta Lake Project.