Condividi tramite


Usare il clustering liquido per Delta tables

Il clustering liquido di Delta Lake sostituisce il partizionamento table e ZORDER per semplificare le decisioni relative al layout dei dati e delle prestazioni delle query optimize. Il clustering liquido offre flessibilità per ridefinire le chiavi di clustering senza riscrivere dati esistenti, consentendo al layout dei dati di evolversi insieme alle esigenze analitiche nel tempo.

Importante

Databricks consiglia di usare Databricks Runtime 15.2 e versioni successive per tutte le tables con clustering liquido abilitato. Il supporto dell'anteprima pubblica con limitazioni è disponibile in Databricks Runtime 13.3 LTS e versioni successive.

Nota

Tables con il raggruppamento liquido abilitato supporta la concorrenza a livello di riga in Databricks Runtime 13.3 LTS e versioni successive. La concorrenza a livello di riga è disponibile a livello generale in Databricks Runtime 14.2 e versioni successive per tutte le tables con vettori di eliminazione abilitati. Vedere Livelli di isolamento e conflitti di scrittura in Azure Databricks.

Che cos'è il clustering liquido usato per?

Databricks consiglia il clustering liquido per tutte le nuove Delta tables. Di seguito sono riportati alcuni esempi di scenari che traggono vantaggio dal clustering:

  • Tables spesso filtrato per alta cardinalità columns.
  • Tables con un'asimmetria significativa nella distribuzione dei dati.
  • Tables che crescono rapidamente e richiedono manutenzione e sforzi di ottimizzazione.
  • Tables con requisiti di scrittura simultanei.
  • Tables con modelli di accesso che cambiano nel tempo.
  • Tables where una chiave di partition tipica potrebbe lasciare il table con troppe o troppo poche partizioni.

Abilitare il clustering liquido

È possibile abilitare il clustering liquido su un table esistente o durante la creazione di table. Il clustering non è compatibile con il partizionamento o con ZORDERe richiede di utilizzare Azure Databricks per gestire tutte le operazioni di layout e ottimizzazione dei dati nel tuo table. Dopo l'abilitazione del clustering liquido, eseguire OPTIMIZE i processi come di consueto per i dati del cluster in modo incrementale. Vedere Come attivare il clustering.

Per abilitare il clustering liquido, aggiungere la frase CLUSTER BY a un'istruzione di creazione table, come negli esempi seguenti:

Nota

In Databricks Runtime 14.2 e versioni successive è possibile usare le API DataFrame e l'API DeltaTable in Python o Scala per abilitare il clustering liquido.

SQL

-- Create an empty table
CREATE TABLE table1(col0 int, col1 string) CLUSTER BY (col0);

-- Using a CTAS statement
CREATE EXTERNAL TABLE table2 CLUSTER BY (col0)  -- specify clustering after table name, not in subquery
LOCATION 'table_location'
AS SELECT * FROM table1;

-- Using a LIKE statement to copy configurations
CREATE TABLE table3 LIKE table1;

Python

# Create an empty table
(DeltaTable.create()
  .tableName("table1")
  .addColumn("col0", dataType = "INT")
  .addColumn("col1", dataType = "STRING")
  .clusterBy("col0")
  .execute())

# Using a CTAS statement
df = spark.read.table("table1")
df.write.clusterBy("col0").saveAsTable("table2")

# CTAS using DataFrameWriterV2
df = spark.read.table("table1")
df.writeTo("table1").using("delta").clusterBy("col0").create()

Scala

// Create an empty table
DeltaTable.create()
  .tableName("table1")
  .addColumn("col0", dataType = "INT")
  .addColumn("col1", dataType = "STRING")
  .clusterBy("col0")
  .execute()

// Using a CTAS statement
val df = spark.read.table("table1")
df.write.clusterBy("col0").saveAsTable("table2")

// CTAS using DataFrameWriterV2
val df = spark.read.table("table1")
df.writeTo("table1").using("delta").clusterBy("col0").create()

In Databricks Runtime 16.0 e versioni successive, è possibile creare tables con il clustering liquido abilitato usando scritture di Structured Streaming, come negli esempi seguenti:

Python

(spark.readStream.table("source_table")
  .writeStream
  .clusterBy("column_name")
  .option("checkpointLocation", checkpointPath)
  .toTable("target_table")
)

Scala

spark.readStream.table("source_table")
  .writeStream
  .clusterBy("column_name")
  .option("checkpointLocation", checkpointPath)
  .toTable("target_table")

Avviso

Tables creati con clustering liquido abilitato hanno numerose funzionalità Delta table abilitate al momento della creazione e usano la versione scrittore Delta 7 e versione lettore 3. È possibile eseguire l'override dell'abilitazione di alcune di queste funzionalità. Vedere Override default feature enablement (facoltativo).

Non è possibile effettuare il downgrade delle versioni del protocollo Table, e le versioni tables con clustering abilitato non sono leggibili dai client Delta Lake che non supportano tutte le funzionalità del protocollo di lettura Delta table abilitate. Si veda In che modo Azure Databricks gestisce la compatibilità delle funzionalità di Delta Lake?.

È possibile abilitare il clustering liquido su un Delta table non partizionato esistente utilizzando la sintassi seguente:

ALTER TABLE <table_name>
CLUSTER BY (<clustering_columns>)

Importante

Il comportamento predefinito non applica il clustering ai dati scritti in precedenza. Per forzare il clustering per tutti i record, è necessario usare OPTIMIZE FULL. Consultare Force reclustering per tutti i record.

Eseguire l'override dell'abilitazione delle funzionalità predefinite (facoltativo)

È possibile modificare il comportamento predefinito che abilita le funzionalità Delta table durante l'abilitazione del clustering liquido. Ciò impedisce l'aggiornamento dei protocolli lettore e writer associati a tali funzionalità di table. Per completare i passaggi seguenti, è necessario disporre di un table esistente:

  1. Utilizzare ALTER TABLE per set la proprietà table che disabilita una o più funzionalità. Ad esempio, per disabilitare i vettori di eliminazione, eseguire le operazioni seguenti:

    ALTER TABLE table_name SET TBLPROPERTIES ('delta.enableDeletionVectors' = false);
    
  2. Abilitare il clustering liquido nel table eseguendo le operazioni seguenti:

    ALTER TABLE <table_name>
    CLUSTER BY (<clustering_columns>)
    

Il seguente table fornisce informazioni sulle funzionalità Delta su cui è possibile eseguire l'override e su come l'abilitazione influisca sulla compatibilità con le versioni di Databricks Runtime.

Funzionalità Delta Compatibilità del runtime Proprietà per eseguire l'override dell'abilitazione Impatto della disabilitazione sul clustering liquido
Vettori di eliminazione Le letture e le scritture richiedono Databricks Runtime 12.2 lTS e versioni successive. 'delta.enableDeletionVectors' = false La concorrenza a livello di riga è disabilitata, rendendo più probabili conflitti tra transazioni e operazioni di clustering. Vedere Conflitti di scrittura con concorrenza a livello di riga.

DELETEI comandi , MERGEe UPDATE potrebbero essere eseguiti più lentamente.
Rilevamento riga Le scritture richiedono Databricks Runtime 13.3 LTS e versioni successive. Può essere letto da qualsiasi versione di Databricks Runtime. 'delta.enableRowTracking' = false La concorrenza a livello di riga è disabilitata, rendendo più probabili conflitti tra transazioni e operazioni di clustering. Vedere Conflitti di scrittura con concorrenza a livello di riga.
Checkpoint V2 Le letture e le scritture richiedono Databricks Runtime 13.3 LTS e versioni successive. 'delta.checkpointPolicy' = 'classic' Nessun impatto sul comportamento del clustering liquido.

Scegliere le chiavi di clustering

Databricks consiglia di scegliere le chiavi di clustering in base ai filtri di query di uso comune. Le chiavi di clustering possono essere definite in qualsiasi ordine. Se due columns sono correlate, è necessario aggiungerne solo una come chiave di clustering.

È possibile specificare fino a 4 columns come chiavi di clustering. È possibile specificare columns solo con le statistiche raccolte per le chiavi di clustering. Per impostazione predefinita, vengono raccolte statistiche per i primi 32 columns in un Delta table. Vedere Specificare le statistiche delta columns.

Il clustering supporta i tipi di dati seguenti per le chiavi di clustering:

  • Data
  • Timestamp:
  • TimestampNTZ (richiede Databricks Runtime 14.3 LTS o versione successiva)
  • String
  • Intero
  • Lungo
  • Short
  • Float
  • Double
  • Decimale
  • Byte

Se stai convertendo un tableesistente, considera i seguenti consigli:

Tecnica di ottimizzazione dei dati corrente Raccomandazione per le chiavi di clustering
Partizionamento in stile Hive Usare partitioncolumns come chiavi di clustering.
Indicizzazione dell'ordine Z Usare il ZORDER BYcolumns come chiavi di clustering.
Partizionamento in stile Hive e ordine Z Usare sia partitioncolumns che ZORDER BYcolumns come chiavi di clustering.
Generato columns per ridurre la cardinalità (ad esempio, data per un timestamp) Usare il column originale come chiave di clustering e non creare un columngenerato.

Scrivere dati in un table cluster

È necessario usare un client Delta writer che supporta tutte le funzionalità del protocollo di scrittura Delta table utilizzate dal clustering liquido. In Azure Databricks è necessario usare Databricks Runtime 13.3 LTS e versioni successive.

Le operazioni eseguite dal cluster in scrittura includono quanto segue:

  • INSERT INTO Operazioni
  • CTASistruzioni e RTAS
  • COPY INTO dal formato Parquet
  • spark.write.mode("append")

Le scritture di Structured Streaming non attivano mai il clustering in scrittura. Si applicano limitazioni aggiuntive. Vedere Limitazioni.

Il clustering in fase di scrittura viene attivato solo quando i dati nella transazione soddisfano una soglia di dimensioni. Queste soglie variano in base al numero di columns di clustering e sono inferiori per Unity Catalogtables gestito rispetto ad altri tablesDelta.

Numero di cluster columns Dimensioni della soglia per Unity Catalogtables gestito Dimensione soglia per altri Delta tables
1 64 MB 256 MB
2 256 MB 1 GB
3 512 MB 2 GB
4 1 GB 4 GB

Poiché non tutte le operazioni applicano clustering liquido, Databricks consiglia l'esecuzione OPTIMIZE frequente per garantire che tutti i dati siano raggruppati in modo efficiente.

Come attivare il clustering

L'ottimizzazione predittiva esegue automaticamente i comandi OPTIMIZE per abilitare tables. Consulta Ottimizzazione predittiva per Unity Catalog gestita tables.

Per attivare il clustering, è necessario usare Databricks Runtime 13.3 LTS o versione successiva. Usare il comando OPTIMIZE sul tuo table, come nell'esempio seguente:

OPTIMIZE table_name;

Il clustering liquido è incrementale, ovvero i dati vengono riscritti solo in base alle esigenze per contenere i dati che devono essere raggruppati. I file di dati con chiavi di clustering che non corrispondono ai dati da raggruppare non vengono riscritti.

Per ottenere prestazioni ottimali, Databricks consiglia di pianificare processi regolari OPTIMIZE nei dati del cluster. Quando tables sperimenta molti aggiornamenti o inserimenti, Databricks consiglia di pianificare un'attività OPTIMIZE ogni una o due ore. Poiché il clustering liquido è incrementale, la maggior parte dei processi OPTIMIZE per il clustering di tables viene eseguita rapidamente.

Force reclustering per tutti i record

In Databricks Runtime 16.0 e versioni successive è possibile forzare il clustering di tutti i record in un table con la sintassi seguente:

OPTIMIZE table_name FULL;

Importante

Eseguendo OPTIMIZE FULL, tutti i dati esistenti vengono riorganizzati secondo necessità. Per le tables di grandi dimensioni che non sono state precedentemente raggruppate nelle chiavi specificate, questa operazione potrebbe richiedere ore.

Eseguire il comando OPTIMIZE FULL quando si abilita il clustering per la prima volta o si modificano le chiavi di clustering. Se in precedenza è stata eseguita OPTIMIZE FULL e non sono state apportate modifiche alle chiavi di clustering, OPTIMIZE FULL viene eseguito come OPTIMIZE. Usare sempre OPTIMIZE FULL per garantire che il layout dei dati rifletta le chiavi di clustering correnti.

Leggere dati da un cluster table

È possibile leggere i dati in un cluster table raggruppato usando qualsiasi client Delta Lake che supporta la lettura dei vettori di eliminazione. Per ottenere risultati ottimali delle query, includere le chiavi di clustering nei filtri di query, come nell'esempio seguente:

SELECT * FROM table_name WHERE cluster_key_column_name = "some_value";

Modificare le chiavi di clustering

È possibile modificare le chiavi di clustering per un table in qualsiasi momento eseguendo un comando ALTER TABLE, come nell'esempio seguente:

ALTER TABLE table_name CLUSTER BY (new_column1, new_column2);

Quando si modificano le chiavi di clustering, le operazioni successive OPTIMIZE e di scrittura usano il nuovo approccio di clustering, ma i dati esistenti non vengono riscritti.

È anche possibile disattivare il clustering impostando le chiavi su NONE, come nell'esempio seguente:

ALTER TABLE table_name CLUSTER BY NONE;

L'impostazione delle chiavi del cluster su NONE non riscrive i dati già in cluster, ma impedisce alle operazioni future OPTIMIZE di usare le chiavi di clustering.

Vedi come table è raggruppato

È possibile usare i comandi DESCRIBE per visualizzare le chiavi di clustering per un table, come nei seguenti esempi:

DESCRIBE TABLE table_name;

DESCRIBE DETAIL table_name;

compatibilità di con tables con clustering liquido

Tables creati con clustering liquido in Databricks Runtime 14.1 e versioni successive usano checkpoint v2 per impostazione predefinita. È possibile leggere e scrivere tables con checkpoint v2 in Databricks Runtime 13.3 LTS e versioni successive.

È possibile disabilitare i checkpoint v2 e eseguire il downgrade dei protocolli table per leggere tables utilizzando il clustering liquido in Databricks Runtime 12.2 LTS e versioni successive. Vedi Drop Delta table funzionalità.

Limiti

Esistono le limitazioni seguenti:

  • In Databricks Runtime 15.1 e versioni successive il clustering in scrittura non supporta le query di origine che includono filtri, join o aggregazioni.
  • I carichi di lavoro di flusso strutturato non supportano il clustering in scrittura.
  • In Databricks Runtime 15.4 LTS e versioni precedenti non è possibile creare un table con il clustering liquido abilitato utilizzando una scrittura in streaming strutturato. È possibile utilizzare Structured Streaming per scrivere i dati in un table esistente con il clustering liquido abilitato.