Usare la cronologia delle tabelle Delta Lake
Ogni operazione che modifica una tabella Delta Lake crea una nuova versione della tabella. È possibile usare le informazioni sulla cronologia per controllare le operazioni, eseguire il rollback di una tabella o eseguire query su una tabella in un momento specifico usando il tempo di spostamento.
Nota
Databricks non consiglia di usare la cronologia delle tabelle Delta Lake come soluzione di backup a lungo termine per l'archiviazione dei dati. Databricks consiglia di usare solo gli ultimi 7 giorni per le operazioni di spostamento cronologico, a meno che non siano state impostate entrambe le configurazioni di conservazione dei dati e dei log su un valore maggiore.
Recuperare la cronologia delle tabelle Delta
È possibile recuperare informazioni, incluse le operazioni, l'utente e il timestamp per ogni scrittura in una tabella Delta eseguendo il history
comando . Le operazioni vengono restituite in ordine cronologico inverso.
La conservazione della cronologia tabelle è determinata dall'impostazione delta.logRetentionDuration
della tabella , ovvero 30 giorni per impostazione predefinita.
Nota
Lo spostamento cronologico e la cronologia delle tabelle sono controllati da soglie di conservazione diverse. Vedere Che cos'è lo spostamento cronologico di Delta Lake?.
DESCRIBE HISTORY table_name -- get the full history of the table
DESCRIBE HISTORY table_name LIMIT 1 -- get the last operation only
Per informazioni dettagliate sulla sintassi di Spark SQL, vedere DESCRIBE HISTORY.For Spark SQL syntax details, see DESCRIBE HISTORY.
Per informazioni dettagliate sulla sintassi scala/Java/Python, vedere la documentazione dell'API Delta Lake.
Esplora cataloghi offre una visualizzazione visiva di queste informazioni dettagliate sulla tabella e sulla cronologia per le tabelle Delta. Oltre allo schema della tabella e ai dati di esempio, è possibile fare clic sulla scheda Cronologia per visualizzare la cronologia della tabella visualizzata con DESCRIBE HISTORY
.
Schema cronologia
L'output dell'operazione history
include le colonne seguenti.
Column | Type | Description |
---|---|---|
versione | long | Versione della tabella generata dall'operazione. |
timestamp | timestamp | Quando è stato eseguito il commit di questa versione. |
userId | string | ID dell'utente che ha eseguito l'operazione. |
userName | string | Nome dell'utente che ha eseguito l'operazione. |
operation (operazione) | string | Nome dell'operazione. |
operationParameters | mappa | Parametri dell'operazione , ad esempio predicati. |
processo | struct | Dettagli del processo che ha eseguito l'operazione. |
notebook | struct | Dettagli del notebook da cui è stata eseguita l'operazione. |
clusterId | string | ID del cluster in cui è stata eseguita l'operazione. |
readVersion | long | Versione della tabella letta per eseguire l'operazione di scrittura. |
isolationLevel | string | Livello di isolamento usato per questa operazione. |
isBlindAppend | boolean | Indica se questa operazione ha accodato dati. |
operationMetrics | mappa | Metriche dell'operazione (ad esempio, numero di righe e file modificati). |
userMetadata | string | Metadati di commit definiti dall'utente se è stato specificato |
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version| timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion| isolationLevel|isBlindAppend| operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
| 5|2019-07-29 14:07:47| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 4|WriteSerializable| false|[numTotalRows -> ...|
| 4|2019-07-29 14:07:41| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 3|WriteSerializable| false|[numTotalRows -> ...|
| 3|2019-07-29 14:07:29| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 2|WriteSerializable| false|[numTotalRows -> ...|
| 2|2019-07-29 14:06:56| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 1|WriteSerializable| false|[numTotalRows -> ...|
| 1|2019-07-29 14:04:31| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 0|WriteSerializable| false|[numTotalRows -> ...|
| 0|2019-07-29 14:01:40| ###| ###| WRITE|[mode -> ErrorIfE...|null| ###| ###| null|WriteSerializable| true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
Nota
- Alcune delle altre colonne non sono disponibili se si scrive in una tabella Delta usando i metodi seguenti:
- Le colonne aggiunte in futuro verranno sempre aggiunte dopo l'ultima colonna.
Chiavi delle metriche delle operazioni
L'operazione history
restituisce una raccolta di metriche operative nella mappa delle operationMetrics
colonne.
Le tabelle seguenti elencano le definizioni delle chiavi della mappa in base all'operazione.
Operazione | Nome metrica | Descrizione |
---|---|---|
WRITE, CREATE TABLE AS SELECT, REPLACE TABLE AS SELECT, COPY INTO | ||
numFiles | Numero di file scritti. | |
numOutputBytes | Dimensioni in byte del contenuto scritto. | |
numOutputRows | Numero di righe scritte. | |
STREAMING UPDATE | ||
numAddedFiles | Numero di file aggiunti. | |
numRemovedFiles | Numero di file rimossi. | |
numOutputRows | Numero di righe scritte. | |
numOutputBytes | Dimensione della scrittura in byte. | |
DELETE | ||
numAddedFiles | Numero di file aggiunti. Non specificato quando vengono eliminate le partizioni della tabella. | |
numRemovedFiles | Numero di file rimossi. | |
numDeletedRows | Numero di righe rimosse. Non specificato quando vengono eliminate le partizioni della tabella. | |
numCopiedRows | Numero di righe copiate nel processo di eliminazione dei file. | |
executionTimeMs | Tempo impiegato per eseguire l'intera operazione. | |
scanTimeMs | Tempo impiegato per analizzare i file per individuare le corrispondenze. | |
rewriteTimeMs | Tempo impiegato per riscrivere i file corrispondenti. | |
TRUNCATE | ||
numRemovedFiles | Numero di file rimossi. | |
executionTimeMs | Tempo impiegato per eseguire l'intera operazione. | |
MERGE | ||
numSourceRows | Numero di righe nel dataframe di origine. | |
numTargetRowsInserted | Numero di righe inserite nella tabella di destinazione. | |
numTargetRowsUpdated | Numero di righe aggiornate nella tabella di destinazione. | |
numTargetRowsDeleted | Numero di righe eliminate nella tabella di destinazione. | |
numTargetRowsCopied | Numero di righe di destinazione copiate. | |
numOutputRows | Numero totale di righe scritte. | |
numTargetFilesAdded | Numero di file aggiunti al sink(destinazione). | |
numTargetFilesRemoved | Numero di file rimossi dal sink(destinazione). | |
executionTimeMs | Tempo impiegato per eseguire l'intera operazione. | |
scanTimeMs | Tempo impiegato per analizzare i file per individuare le corrispondenze. | |
rewriteTimeMs | Tempo impiegato per riscrivere i file corrispondenti. | |
UPDATE | ||
numAddedFiles | Numero di file aggiunti. | |
numRemovedFiles | Numero di file rimossi. | |
numUpdatedRows | Numero di righe aggiornate. | |
numCopiedRows | Numero di righe appena copiate nel processo di aggiornamento dei file. | |
executionTimeMs | Tempo impiegato per eseguire l'intera operazione. | |
scanTimeMs | Tempo impiegato per analizzare i file per individuare le corrispondenze. | |
rewriteTimeMs | Tempo impiegato per riscrivere i file corrispondenti. | |
FSCK | numRemovedFiles | Numero di file rimossi. |
CONVERT | numConvertedFiles | Numero di file Parquet convertiti. |
OPTIMIZE | ||
numAddedFiles | Numero di file aggiunti. | |
numRemovedFiles | Numero di file ottimizzati. | |
numAddedBytes | Numero di byte aggiunti dopo l'ottimizzazione della tabella. | |
numRemovedBytes | Numero di byte rimossi. | |
minFileSize | Dimensioni del file più piccolo dopo l'ottimizzazione della tabella. | |
p25FileSize | Dimensioni del 25° file percentile dopo l'ottimizzazione della tabella. | |
p50FileSize | Dimensioni del file mediano dopo l'ottimizzazione della tabella. | |
p75FileSize | Dimensioni del 75° file percentile dopo l'ottimizzazione della tabella. | |
maxFileSize | Dimensioni del file più grande dopo l'ottimizzazione della tabella. | |
CLONE | ||
sourceTableSize | Dimensioni in byte della tabella di origine nella versione clonata. | |
sourceNumOfFiles | Numero di file nella tabella di origine nella versione clonata. | |
numRemovedFiles | Numero di file rimossi dalla tabella di destinazione se è stata sostituita una tabella Delta precedente. | |
removedFilesSize | Dimensioni totali in byte dei file rimossi dalla tabella di destinazione se è stata sostituita una tabella Delta precedente. | |
numCopiedFiles | Numero di file copiati nel nuovo percorso. 0 per cloni superficiali. | |
copiedFilesSize | Dimensioni totali in byte dei file copiati nel nuovo percorso. 0 per cloni superficiali. | |
RESTORE | ||
tableSizeAfterRestore | Dimensioni della tabella in byte dopo il ripristino. | |
numOfFilesAfterRestore | Numero di file nella tabella dopo il ripristino. | |
numRemovedFiles | Numero di file rimossi dall'operazione di ripristino. | |
numRestoredFiles | Numero di file aggiunti in seguito al ripristino. | |
removedFilesSize | Dimensioni in byte dei file rimossi dal ripristino. | |
restoredFilesSize | Dimensioni in byte di file aggiunti dal ripristino. | |
VACUUM | ||
numDeletedFiles | Numero di file eliminati. | |
numVacuumedDirectories | Numero di directory a vuoto. | |
numFilesToDelete | Numero di file da eliminare. |
Che cos'è il viaggio nel tempo di Delta Lake?
Delta Lake Time Travel supporta l'esecuzione di query sulle versioni precedenti della tabella in base al timestamp o alla versione della tabella (come registrato nel log delle transazioni). È possibile usare il tempo di viaggio per le applicazioni, ad esempio le seguenti:
- Ricreare analisi, report o output (ad esempio, l'output di un modello di Machine Learning). Questo può essere utile per il debug o il controllo, in particolare nei settori regolamentati.
- Scrivere query temporali complesse.
- Correggere gli errori nei dati.
- Fornire l'isolamento dello snapshot per un set di query per tabelle che cambiano rapidamente.
Importante
Le versioni delle tabelle accessibili con il tempo di spostamento sono determinate da una combinazione della soglia di conservazione per i file di log delle transazioni e la frequenza e la conservazione specificata per VACUUM
le operazioni. Se si esegue VACUUM
ogni giorno con i valori predefiniti, sono disponibili 7 giorni di dati per il viaggio nel tempo.
Sintassi dello spostamento cronologico di Delta Lake
Per eseguire una query su una tabella Delta, aggiungere una clausola dopo la specifica del nome della tabella.
timestamp_expression
può essere uno qualsiasi di:'2018-10-18T22:15:12.013Z'
ovvero una stringa di cui è possibile eseguire il cast a un timestampcast('2018-10-18 13:36:32 CEST' as timestamp)
'2018-10-18'
, ovvero una stringa di datacurrent_timestamp() - interval 12 hours
date_sub(current_date(), 1)
- Qualsiasi altra espressione che è o può essere eseguita il cast a un timestamp
version
è un valore long che può essere ottenuto dall'output diDESCRIBE HISTORY table_spec
.
Né timestamp_expression
né version
possono essere sottoquery.
Vengono accettate solo stringhe di data o timestamp. Ad esempio, "2019-01-01"
e "2019-01-01T00:00:00.000Z"
. Vedere il codice seguente per una sintassi di esempio:
SQL
SELECT * FROM people10m TIMESTAMP AS OF '2018-10-18T22:15:12.013Z';
SELECT * FROM people10m VERSION AS OF 123;
Python
df1 = spark.read.option("timestampAsOf", "2019-01-01").table("people10m")
df2 = spark.read.option("versionAsOf", 123).table("people10m")
È anche possibile usare la @
sintassi per specificare il timestamp o la versione come parte del nome della tabella. Il timestamp deve essere in yyyyMMddHHmmssSSS
formato . È possibile specificare una versione dopo @
anteponendo una v
alla versione. Vedere il codice seguente per una sintassi di esempio:
SQL
SELECT * FROM people10m@20190101000000000
SELECT * FROM people10m@v123
Python
spark.read.table("people10m@20190101000000000")
spark.read.table("people10m@v123")
Che cosa sono i checkpoint del log delle transazioni?
Delta Lake registra le versioni della tabella come file JSON all'interno della directory, archiviate insieme ai dati della _delta_log
tabella. Per ottimizzare l'esecuzione di query sui checkpoint, Delta Lake aggrega le versioni delle tabelle ai file di checkpoint Parquet, impedendo la necessità di leggere tutte le versioni JSON della cronologia delle tabelle. Azure Databricks ottimizza la frequenza di checkpoint per le dimensioni dei dati e il carico di lavoro. Gli utenti non devono interagire direttamente con i checkpoint. La frequenza del checkpoint è soggetta a modifiche senza preavviso.
Configurare la conservazione dei dati per le query di spostamento del tempo
Per eseguire query su una versione precedente della tabella, è necessario conservare sia il log che i file di dati per tale versione.
I file di dati vengono eliminati quando VACUUM
vengono eseguiti su una tabella. Delta Lake gestisce automaticamente la rimozione dei file di log dopo il checkpoint delle versioni delle tabelle.
Poiché la maggior parte delle tabelle Delta è stata VACUUM
eseguita regolarmente, le query temporizzato devono rispettare la soglia di conservazione per , ovvero 7 giorni per VACUUM
impostazione predefinita.
Per aumentare la soglia di conservazione dei dati per le tabelle Delta, è necessario configurare le proprietà della tabella seguenti:
delta.logRetentionDuration = "interval <interval>"
: controlla per quanto tempo viene mantenuta la cronologia di una tabella. Il valore predefinito èinterval 30 days
.delta.deletedFileRetentionDuration = "interval <interval>"
: determina l'utilizzo della sogliaVACUUM
per rimuovere i file di dati a cui non si fa più riferimento nella versione della tabella corrente. Il valore predefinito èinterval 7 days
.
È possibile specificare le proprietà Delta durante la creazione della tabella o impostarle con un'istruzione ALTER TABLE
. Vedere Informazioni di riferimento sulle proprietà della tabella Delta.
Nota
È necessario impostare entrambe queste proprietà per garantire che la cronologia delle tabelle venga mantenuta per una durata più lunga per le tabelle con operazioni frequenti VACUUM
. Ad esempio, per accedere a 30 giorni di dati cronologici, impostare delta.deletedFileRetentionDuration = "interval 30 days"
(che corrisponde all'impostazione predefinita per delta.logRetentionDuration
).
L'aumento della soglia di conservazione dei dati può causare l'aumento dei costi di archiviazione, man mano che vengono mantenuti più file di dati.
Ripristinare uno stato precedente di una tabella Delta
È possibile ripristinare uno stato precedente di una tabella Delta usando il RESTORE
comando . Una tabella Delta gestisce internamente le versioni storiche della tabella che consentono di ripristinarla a uno stato precedente.
Una versione corrispondente allo stato precedente o a un timestamp di quando è stato creato lo stato precedente sono supportate come opzioni dal comando RESTORE
.
Importante
- È possibile ripristinare una tabella già ripristinata.
- È possibile ripristinare una tabella clonata .
- È necessario disporre
MODIFY
dell'autorizzazione per la tabella da ripristinare. - Non è possibile ripristinare una tabella in una versione precedente in cui i file di dati sono stati eliminati manualmente o da
vacuum
. Il ripristino in questa versione parzialmente è comunque possibile sespark.sql.files.ignoreMissingFiles
è impostato sutrue
. - Il formato timestamp per il ripristino in uno stato precedente è
yyyy-MM-dd HH:mm:ss
. È supportata anche solo una stringa date(yyyy-MM-dd
).
RESTORE TABLE target_table TO VERSION AS OF <version>;
RESTORE TABLE target_table TO TIMESTAMP AS OF <timestamp>;
Per informazioni dettagliate sulla sintassi, vedere RESTORE.
Importante
Il ripristino viene considerato un'operazione di modifica dei dati. Le voci di log delta Lake aggiunte dal RESTORE
comando contengono dataChange impostato su true. Se è presente un'applicazione downstream, ad esempio un processo di streaming strutturato che elabora gli aggiornamenti a una tabella Delta Lake, le voci del log delle modifiche dei dati aggiunte dall'operazione di ripristino vengono considerate come nuovi aggiornamenti dei dati e l'elaborazione può comportare dati duplicati.
Ad esempio:
Versione tabella | Operazione | Aggiornamenti del log differenziale | Record negli aggiornamenti del log delle modifiche dei dati |
---|---|---|---|
0 | INSERT … | AddFile(/path/to/file-1, dataChange = true) | (name = Victor, age = 29, (name = George, age = 55) |
1 | INSERT … | AddFile(/path/to/file-2, dataChange = true) | (nome = George, età = 39) |
2 | OPTIMIZE | AddFile(/path/to/file-3, dataChange = false), RemoveFile(/path/to/file-1), RemoveFile(/path/to/file-2) | (Nessun record come ottimizzazione della compattazione non modifica i dati nella tabella) |
3 | RESTORE(version=1) | RemoveFile(/path/to/file-3), AddFile(/path/to/file-1, dataChange = true), AddFile(/path/to/file-2, dataChange = true) | (name = Victor, age = 29), (name = George, age = 55), (name = George, age = 39) |
Nell'esempio precedente, il RESTORE
comando genera aggiornamenti già visualizzati durante la lettura della tabella Delta versione 0 e 1. Se una query di streaming legge questa tabella, questi file verranno considerati come dati appena aggiunti e verranno elaborati di nuovo.
Ripristinare le metriche
RESTORE
segnala le metriche seguenti come singolo dataframe di riga al termine dell'operazione:
table_size_after_restore
: dimensioni della tabella dopo il ripristino.num_of_files_after_restore
: numero di file nella tabella dopo il ripristino.num_removed_files
: numero di file rimossi (eliminati logicamente) dalla tabella.num_restored_files
: numero di file ripristinati a causa del rollback.removed_files_size
: dimensione totale in byte dei file rimossi dalla tabella.restored_files_size
: dimensioni totali in byte dei file ripristinati.
Esempi di utilizzo dello spostamento cronologico di Delta Lake
Correggere le eliminazioni accidentali in una tabella per l'utente
111
:INSERT INTO my_table SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1) WHERE userId = 111
Correzione di aggiornamenti accidentali non corretti in una tabella:
MERGE INTO my_table target USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source ON source.userId = target.userId WHEN MATCHED THEN UPDATE SET *
Eseguire una query sul numero di nuovi clienti aggiunti nell'ultima settimana.
SELECT count(distinct userId) FROM my_table - ( SELECT count(distinct userId) FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7))
Ricerca per categorie trovare la versione dell'ultimo commit nella sessione Spark?
Per ottenere il numero di versione dell'ultimo commit scritto dall'oggetto corrente SparkSession
in tutti i thread e in tutte le tabelle, eseguire una query sulla configurazione spark.databricks.delta.lastCommitVersionInSession
SQL .
SQL
SET spark.databricks.delta.lastCommitVersionInSession
Python
spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")
Scala
spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")
Se non sono stati eseguiti commit da , l'esecuzione SparkSession
di query sulla chiave restituisce un valore vuoto.
Nota
Se si condivide lo stesso SparkSession
tra più thread, è simile alla condivisione di una variabile tra più thread. È possibile che si verifichino race condition man mano che il valore di configurazione viene aggiornato contemporaneamente.