Livelli di isolamento e conflitti di scrittura in Azure Databricks
Il livello di isolamento di un table definisce il grado in cui una transazione deve essere isolata dalle modifiche apportate da operazioni simultanee. I conflitti di scrittura in Azure Databricks dipendono dal livello di isolamento.
Delta Lake offre garanzie di transazione ACID tra letture e scritture. Ciò significa che:
- Più writer in più cluster possono modificare contemporaneamente un tablepartition. Gli scrittori vedono un'istantanea coerente delle table e le operazioni di scrittura avvengono in ordine seriale.
- I lettori continuano a vedere un'istantanea coerente del table con cui è iniziata l'attività di Azure Databricks, anche quando un table viene modificato durante l'esecuzione di un'attività.
Vedere Che cosa sono le garanzie ACID in Azure Databricks?.
Nota
Azure Databricks usa Delta Lake per tutti i tables per impostazione predefinita. Questo articolo descrive il comportamento per Delta Lake in Azure Databricks.
Importante
Le modifiche ai metadati causano l'esito negativo di tutte le operazioni di scrittura simultanee. Queste operazioni includono modifiche al protocollo table, alle proprietà table o ai dati schema.
Le letture di streaming hanno esito negativo quando rilevano un commit che modifica i metadati di table. Se si vuole che lo streaming continui, è necessario riavviarlo. Per i metodi consigliati, vedere Considerazioni sulla produzione per Structured Streaming.
Di seguito sono riportati esempi di query che modificano i metadati:
-- Set a table property.
ALTER TABLE table-name SET TBLPROPERTIES ('delta.isolationLevel' = 'Serializable')
-- Enable a feature using a table property and update the table protocol.
ALTER TABLE table_name SET TBLPROPERTIES ('delta.enableDeletionVectors' = true);
-- Drop a table feature.
ALTER TABLE table_name DROP FEATURE deletionVectors;
-- Upgrade to UniForm.
REORG TABLE table_name APPLY (UPGRADE UNIFORM(ICEBERG_COMPAT_VERSION=2));
-- Update the table schema.
ALTER TABLE table_name ADD COLUMNS (col_name STRING);
Conflitti di scrittura con concorrenza a livello di riga
La concorrenza a livello di riga riduce i conflitti tra le operazioni di scrittura simultanee rilevando le modifiche a livello di riga e risolvendo automaticamente i conflitti che si verificano quando le scritture simultanee update o eliminano righe diverse nello stesso file di dati.
La concorrenza a livello di riga è disponibile a livello generale in Databricks Runtime 14.2 e versioni successive. La concorrenza a livello di riga è supportata per impostazione predefinita per le condizioni seguenti:
- Tables con vettori di eliminazione abilitati e senza partizionamento.
- Tables con clustering liquido, a meno che non siano stati disabilitati i vettori di eliminazione.
Tables con partizioni non supportano la concorrenza a livello di riga, ma possono comunque prevenire conflitti tra OPTIMIZE
e tutte le altre operazioni di scrittura quando sono attivati i vettori di eliminazione. Vedere Limitazioni per la concorrenza a livello di riga.
Per altre versioni di Databricks Runtime, vedere Comportamento di anteprima della concorrenza a livello di riga (legacy).
MERGE INTO
il supporto per la concorrenza a livello di riga richiede Photon in Databricks Runtime 14.2. In Databricks Runtime 14.3 LTS e versioni successive, Photon non è obbligatorio.
Il seguente table descrive quali coppie di operazioni di scrittura possono entrare in conflitto in ciascun livello di isolamento , con la concorrenza a livello di riga abilitata.
Nota
Tables con identità columns non supportano transazioni simultanee. Vedere Usare columns identity in Delta Lake.
INSERT (1) | UPDATE, ELIMINA, MERGE INTO | OPTIMIZE | |
---|---|---|---|
INSERT | Impossibile conflitto | ||
UPDATE, DELETE, MERGE INTO | Impossibile conflitto in WriteSerializable. Può essere in conflitto in Serializable durante la modifica della stessa riga. Vedere Limitazioni per la concorrenza a livello di riga. | Può essere in conflitto durante la modifica della stessa riga. Vedere Limitazioni per la concorrenza a livello di riga. | |
OPTIMIZE | Impossibile conflitto | Può essere in conflitto quando ZORDER BY viene usato. Non può essere in conflitto in caso contrario. |
Può essere in conflitto quando ZORDER BY viene usato. Non può essere in conflitto in caso contrario. |
Importante
(1) Tutte le operazioni di INSERT
nella tables sopra descrivono operazioni di aggiunta che non leggono nessun dato dalla stessa table prima del commit. Le operazioni INSERT
che contengono sottoquery e leggono lo stesso table garantiscono la stessa concorrenza di MERGE
.
REORG
le operazioni hanno una semantica di isolamento identica a OPTIMIZE
quando si riscrive i file di dati per riflettere le modifiche registrate nei vettori di eliminazione. Quando si usa REORG
per applicare un aggiornamento, i protocolli table cambiano, entrando in conflitto con tutte le operazioni in corso.
Conflitti di scrittura senza concorrenza a livello di riga
Nell'table vengono descritte quali coppie di operazioni di scrittura possono entrare in conflitto in ciascun livello di isolamento .
Tables non supportano la concorrenza a livello di riga se hanno partizioni definite o non dispongono di vettori di eliminazione abilitati. Databricks Runtime 14.2 o versione successiva è necessario per la concorrenza a livello di riga.
Nota
Tables con identità columns non supportano transazioni simultanee. Vedi Usare l'identità columns in Delta Lake.
INSERT (1) | UPDATE, ELIMINA, MERGE INTO | OPTIMIZE | |
---|---|---|---|
INSERT | Impossibile conflitto | ||
UPDATE, ELIMINA, MERGE INTO | Impossibile conflitto in WriteSerializable. Può entrare in conflitto in Serializable. Vedere evitare conflitti con le partizioni. | Può conflitto in Serializable e WriteSerializable. Vedere evitare conflitti con le partizioni. | |
OPTIMIZE | Impossibile conflitto | Impossibile entrare in conflitto con in tables con vettori di eliminazione abilitati, a meno che non venga usato ZORDER BY . In caso contrario, può essere in conflitto. |
Impossibile creare conflitto in tables con vettori di eliminazione abilitati, a meno che venga usato ZORDER BY . In caso contrario, può essere in conflitto. |
Importante
(1) Tutte le operazioni di INSERT
nella tables precedente descrivono le operazioni di accodamento che non leggono dati dalla stessa table prima del commit. Le operazioni INSERT
che contengono sottoquery che leggono lo stesso table supportano la stessa concorrenza di MERGE
.
REORG
le operazioni hanno una semantica di isolamento identica a OPTIMIZE
quando si riscrive i file di dati per riflettere le modifiche registrate nei vettori di eliminazione. Quando si usa REORG
per applicare un aggiornamento, i protocolli table cambiano, entrando in conflitto con tutte le operazioni in corso.
Limitazioni per la concorrenza a livello di riga
Alcune limitazioni si applicano per la concorrenza a livello di riga. Per le operazioni seguenti, la risoluzione dei conflitti segue la normale concorrenza per i conflitti di scrittura in Azure Databricks. Vedere Conflitti di scrittura senza concorrenza a livello di riga.
- Comandi con clausole condizionali complesse, tra cui:
- Condizioni su tipi di dati complessi, ad esempio struct, matrici o mappe.
- Condizioni che usano espressioni non deterministiche e sottoquery.
- Condizioni che contengono sottoquery correlate.
- In Databricks Runtime 14.2, i comandi
MERGE
devono usare un predicato esplicito sul table di destinazione per filtrare le righe che corrispondono alla fonte table. Per la risoluzione di merge, il filtro analizza solo le righe che potrebbero essere in conflitto in base alle condizioni di filtro nelle operazioni simultanee.
Nota
Il rilevamento dei conflitti a livello di riga può aumentare il tempo di esecuzione totale. Nel caso di molte transazioni simultanee, il writer assegna priorità alla latenza sulla risoluzione dei conflitti e sui conflitti.
Si applicano anche tutte le limitazioni per i vettori di eliminazione. Vedere Limitazioni.
Quando Delta Lake esegue un commit senza leggere il table?
Le operazioni Delta Lake INSERT
o le operazioni di accodamento non leggono lo stato table prima di eseguire il commit se vengono soddisfatte le seguenti condizioni:
- La logica viene espressa usando la
INSERT
logica SQL o la modalità di accodamento. - La logica non contiene sottoquery o condizionali che fanno riferimento al table designato per l'operazione di scrittura.
Come in altri commit, Delta Lake convalida e risolve le versioni table al commit usando i metadati nel log delle transazioni, ma non viene effettivamente letta alcuna versione del table.
Nota
Molti modelli comuni usano operazioni MERGE
per insert dati in base alle condizioni di table. Sebbene sia possibile riscrivere questa logica usando istruzioni INSERT
, se un'espressione condizionale fa riferimento a un column nella destinazione table, le istruzioni hanno le stesse limitazioni di concorrenza di MERGE
.
Scrivere livelli di isolamento serializzabili e serializzabili
Il livello di isolamento di un table definisce il grado in cui una transazione deve essere isolata dalle modifiche apportate da transazioni simultanee. Delta Lake in Azure Databricks supporta due livelli di isolamento: Serializable e WriteSerializable.
Serializzabile: livello di isolamento più sicuro. Garantisce che le operazioni di scrittura di cui è stato eseguito il commit e che tutte le letture siano serializzabili. Le operazioni sono consentite purché esista una sequenza seriale di esecuzione una alla volta che genera lo stesso risultato visualizzato nella table. Per le operazioni di scrittura, la sequenza seriale corrisponde esattamente a quella visualizzata nella cronologia della table.
WriteSerializable (impostazione predefinita): livello di isolamento più debole rispetto a Serializable. Garantisce solo che le operazioni di scrittura (ovvero non le letture) siano serializzabili. Tuttavia, questo è ancora più forte rispetto all'isolamento dello snapshot . WriteSerializable è il livello di isolamento predefinito perché offre un ottimo equilibrio tra coerenza e disponibilità dei dati per le operazioni più comuni.
In questa modalità, il contenuto del Delta table può essere diverso da quello che ci si aspetta dalla sequenza di operazioni come visto nella cronologia table. Questo avviene perché questa modalità consente a determinate coppie di scritture simultanee (ad esempio, operazioni X e Y) di procedere in modo che il risultato sia come se Y fosse stato eseguito prima di X (ovvero serializzabile tra di essi) anche se la cronologia mostrerebbe che Y è stato eseguito dopo X. Per impedire questo riordinamento, set il livello di isolamento table serializzabile per causare l'esito negativo di queste transazioni.
Le operazioni di lettura usano sempre l'isolamento dello snapshot. Il livello di isolamento della scrittura determina se è possibile che un lettore visualizzi uno snapshot di un table, che secondo la cronologia "non è mai esistito".
Per il livello serializzabile, un lettore vede sempre solo tables conformi alla cronologia. Per il livello WriteSerializable, un lettore potrebbe visualizzare un table che non esiste nel Delta log.
Si consideri ad esempio txn1, un'eliminazione a esecuzione prolungata e txn2, che inserisce i dati eliminati da txn1. txn2 e txn1 completi e vengono registrati in tale ordine nella cronologia. In base alla cronologia, i dati inseriti in txn2 non devono esistere nella table. Per il livello serializzabile, un lettore non visualizzerà mai i dati inseriti da txn2. Tuttavia, per il livello WriteSerializable, un lettore potrebbe a un certo punto vedere i dati inseriti da txn2.
Per altre informazioni sui tipi di operazioni che possono entrare in conflitto tra loro in ogni livello di isolamento e sui possibili errori, vedere Evitare conflitti tramite partizionamento e condizioni di comando non contigue.
Set il livello di isolamento
È set il livello di isolamento usando il comando ALTER TABLE
.
ALTER TABLE <table-name> SET TBLPROPERTIES ('delta.isolationLevel' = <level-name>)
where
<level-name>
è Serializable
o WriteSerializable
.
Ad esempio, per modificare il livello di isolamento dal valore predefinito WriteSerializable
a Serializable
, eseguire:
ALTER TABLE <table-name> SET TBLPROPERTIES ('delta.isolationLevel' = 'Serializable')
Evitare conflitti usando il partizionamento e le condizioni di comando non contigue
In tutti i casi contrassegnati come "possono essere in conflitto", se le due operazioni saranno in conflitto dipende dal fatto che funzionino sullo stesso set di file. È possibile rendere disgiunti i due set di file partizionando il table con lo stesso columns utilizzato nelle stesse condizioni delle operazioni. Ad esempio, i due comandi UPDATE table WHERE date > '2010-01-01' ...
e DELETE table WHERE date < '2010-01-01'
verranno in conflitto se il table non è partizionato per data, perché entrambi possono tentare di modificare lo stesso set di file. Il partizionamento del table per date
eviterà il conflitto. Di conseguenza, il partizionamento di un table in base alle condizioni comunemente usate nel comando può ridurre significativamente i conflitti. Tuttavia, il partizionamento di un table da un column con cardinalità elevata può causare altri problemi di prestazioni a causa del numero elevato di sottodirectory.
Eccezioni di conflitto
Quando si verifica un conflitto fra transazioni, si osserverà una delle eccezioni seguenti:
ConcurrentAppendException
Questa eccezione si verifica quando un'operazione simultanea aggiunge file nella stessa partition (o in qualsiasi punto di un tablenon partizionato) che la tua operazione legge. Le aggiunte di file possono essere causate da INSERT
operazioni , DELETE
, UPDATE
o MERGE
.
Con il livello di isolamento predefinito di WriteSerializable
, i file aggiunti da operazioni diINSERT
non vedenti (ovvero le operazioni che accodano i dati senza leggere dati) non sono in conflitto con alcuna operazione, anche se toccano lo stesso partition (o in qualsiasi punto di un tablenon partizionato). Se il livello di isolamento è da set a Serializable
, gli accodamenti ciechi potrebbero entrare in conflitto.
Questa eccezione viene spesso generata durante le operazioni simultanee DELETE
, UPDATE
, o MERGE
. Anche se le operazioni simultanee possono fisicamente aggiornare directory diverse partition, una di esse può leggere lo stesso partition che l'altra sta aggiornando contemporaneamente, causando così un conflitto. È possibile evitarlo rendendo esplicita la separazione nella condizione dell'operazione. Si consideri l'esempio seguente.
// Target 'deltaTable' is partitioned by date and country
deltaTable.as("t").merge(
source.as("s"),
"s.user_id = t.user_id AND s.date = t.date AND s.country = t.country")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
Si supponga di eseguire il codice precedente simultaneamente per date o Paesi diversi. Poiché ogni processo lavora su un partition indipendente sul delta di destinazione table, non si prevede alcun conflitto. Tuttavia, la condizione non è sufficientemente esplicita e può esaminare l'intero table, entrando in conflitto con operazioni simultanee che aggiornano altre partizioni. È invece possibile riscrivere l'istruzione per aggiungere una data e un Paese specifici alla condizione di merge, come illustrato nell'esempio seguente.
// Target 'deltaTable' is partitioned by date and country
deltaTable.as("t").merge(
source.as("s"),
"s.user_id = t.user_id AND s.date = t.date AND s.country = t.country AND t.date = '" + <date> + "' AND t.country = '" + <country> + "'")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
Questa operazione ora è sicura per l'esecuzione simultanea in date e Paesi diversi.
ConcurrentDeleteReadException
Questa eccezione si verifica quando un'operazione simultanea ha eliminato un file letto dall'operazione. Le cause comuni sono un'operazione DELETE
, UPDATE
o MERGE
che riscrive i file.
ConcurrentDeleteDeleteException
Questa eccezione si verifica quando un'operazione simultanea ha eliminato un file eliminato anche dall'operazione. Ciò potrebbe essere causato da due operazioni di compattazione simultanee che riscrivono gli stessi file.
MetadataChangedException
Questa eccezione si verifica quando una transazione simultanea aggiorna i metadati di un delta table. Le cause comuni sono operazioni ALTER TABLE
o scritture al table Delta che update il schema del table.
ConcurrentTransactionException
Se una query di streaming che utilizza la stessa posizione del checkpoint viene avviata più volte in parallelo e tenta di scrivere contemporaneamente sul Delta table. Non è mai necessario disporre di due query di streaming che usano lo stesso percorso del checkpoint ed eseguire contemporaneamente.
ProtocolChangedException
Questa eccezione può verificarsi nei casi seguenti:
- Quando il Delta table viene aggiornato a una nuova versione del protocollo. Per un esito positivo delle operazioni future, potrebbe essere necessario aggiornare il runtime di Databricks.
- Quando più autori creano o sostituiscono un table contemporaneamente.
- Quando più writer scrivono contemporaneamente in un percorso vuoto.
Per altri dettagli, vedere Come azure Databricks gestisce la compatibilità delle funzionalità Delta Lake?
Comportamento di anteprima della concorrenza a livello di riga (legacy)
Questa sezione descrive i comportamenti di anteprima per la concorrenza a livello di riga in Databricks Runtime 14.1 e versioni successive. La concorrenza a livello di riga richiede sempre vettori di eliminazione.
In Databricks Runtime 13.3 LTS e versioni successive, tables con clustering liquido abilitato abilita automaticamente la concorrenza a livello di riga.
In Databricks Runtime 14.0 e 14.1, è possibile abilitare la concorrenza a livello di riga per tables con vettori di cancellazione impostando la seguente configurazione per il cluster o la SparkSession.
spark.databricks.delta.rowLevelConcurrencyPreview = true
In Databricks Runtime 14.1 e versioni successive, il calcolo non Photon supporta solo la concorrenza a livello di riga per DELETE
le operazioni.