Condividi tramite


Inserire dati come tipo variant semistrutturato

Importante

Questa funzionalità è disponibile in anteprima pubblica.

In Databricks Runtime 15.3 e versioni successive è possibile usare il VARIANT tipo per inserire dati semistrutturati. Questo articolo descrive il comportamento e fornisce modelli di esempio per l'inserimento di dati dall'archiviazione di oggetti cloud tramite il caricatore automatico e COPY INTO, lo streaming di record da Kafka e i comandi SQL per la creazione di nuove tabelle con dati variant o l'inserimento di nuovi record usando il tipo variant.

Vedere Eseguire query sui dati varianti.

Creare una tabella con una colonna variante

VARIANT è un tipo SQL standard in Databricks Runtime 15.3 e versioni successive e supportato dalle tabelle supportate da Delta Lake. Per impostazione predefinita, le tabelle gestite in Azure Databricks usano Delta Lake, quindi è possibile creare una tabella vuota con una singola VARIANT colonna usando la sintassi seguente:

CREATE TABLE table_name (variant_column VARIANT)

In alternativa, è possibile usare la PARSE_JSON funzione in una stringa JSON per usare un'istruzione CTAS per creare una tabella con una colonna variante. Nell'esempio seguente viene creata una tabella con due colonne:

  • Colonna id estratta dalla stringa JSON come STRING tipo.
  • La variant_column colonna contiene l'intera stringa JSON codificata come VARIANT tipo.
CREATE TABLE table_name AS
  SELECT json_string:id AS id,
    PARSE_JSON(json_string) variant_column
  FROM source_data

Nota

VARIANT non è possibile usare colonne per il clustering di chiavi, partizioni o chiavi con ordine Z. I dati archiviati con VARIANT tipo non possono essere usati per confronti e ordinamento.

Databricks consiglia di estrarre e archiviare i campi come colonne non varianti che si prevede di usare per accelerare le query e ottimizzare il layout di archiviazione.

Inserire dati usando parse_json

Se la tabella di destinazione contiene già una colonna codificata come VARIANT, è possibile usare parse_json per inserire record stringa JSON come VARIANT, come nell'esempio seguente:

SQL

INSERT INTO table_name (variant_column)
  SELECT PARSE_JSON(json_string)
  FROM source_data

Python

from pyspark.sql.functions import col, parse_json

(spark.read
  .table("source_data")
  .select(parse_json(col("json_string")))
  .write
  .mode("append")
  .saveAsTable("table_name")
)

Inserire dati dall'archiviazione di oggetti cloud come variante

In Databricks Runtime 15.3 e versioni successive è possibile usare il caricatore automatico per caricare tutti i dati da origini JSON come singola VARIANT colonna in una tabella di destinazione. Poiché VARIANT è flessibile per la modifica dello schema e del tipo e mantiene la distinzione tra maiuscole e minuscole e NULL i valori presenti nell'origine dati, questo modello è affidabile per la maggior parte degli scenari di inserimento con le avvertenze seguenti:

  • I record JSON in formato non valido non possono essere codificati usando VARIANT il tipo .
  • VARIANT il tipo può contenere solo record di dimensioni fino a 16 MB.

Nota

Variant considera record eccessivamente di grandi dimensioni simili ai record danneggiati. Nella modalità di elaborazione predefinita PERMISSIVE , i record eccessivamente grandi vengono acquisiti nella _malformed_data colonna insieme a record JSON in formato non valido.

Poiché tutti i dati dell'origine JSON vengono registrati come una singola VARIANT colonna, non viene eseguita alcuna evoluzione dello schema durante l'inserimento e rescuedDataColumn non è supportata. Nell'esempio seguente si presuppone che la tabella di destinazione esista già con una singola VARIANT colonna.

(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("singleVariantColumn", "variant_column")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

È anche possibile specificare VARIANT quando si definisce uno schema o si schemaHintspassa . I dati nel campo di origine a cui si fa riferimento devono contenere una stringa JSON valida. Gli esempi seguenti illustrano questa sintassi:

# Define the schema.
# Writes the columns `name` as a string and `address` as variant.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .schema("name STRING, address VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

# Define the schema.
# A single field `payload` containing JSON data is written as variant.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .schema("payload VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

# Supply schema hints.
# Writes the `address` column as variant.
# Infers the schema for other fields using standard rules.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaHints", "address VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

Usare COPY INTO con variant

Databricks consiglia di usare il caricatore automatico quando COPY INTO disponibile.

COPY INTO supporta l'inserimento dell'intero contenuto di un'origine dati JSON come singola colonna. L'esempio seguente crea una nuova tabella con una singola VARIANT colonna e quindi usa COPY INTO per inserire record da un'origine file JSON.

CREATE TABLE table_name (variant_column VARIANT);
COPY INTO table_name
  FROM '/Volumes/catalog_name/schema_name/volume_name/path'
  FILEFORMAT = JSON
  FORMAT_OPTIONS ('singleVariantColumn' = 'name')

È anche possibile definire qualsiasi campo in una tabella di destinazione come VARIANT. Quando si esegue COPY INTO, i campi corrispondenti nell'origine dati vengono inseriti ed eseguiti il cast al VARIANT tipo, come negli esempi seguenti:

-- Extracts the `address` field from the JSON record and casts to variant
CREATE TABLE table_name (address VARIANT);
COPY INTO table_name
  FROM '/Volumes/catalog_name/schema_name/volume_name/path'
  FILEFORMAT = JSON

-- Extracts `name` and `address` from the JSON record and casts `address` to variant
CREATE TABLE table_name (name STRING, address VARIANT);
COPY INTO table_name
  FROM '/Volumes/catalog_name/schema_name/volume_name/path'
  FILEFORMAT = JSON

Trasmettere dati Kafka come variant

Molti flussi Kafka codificano i payload usando JSON. L'inserimento di flussi Kafka tramite VARIANT rende questi carichi di lavoro affidabili per le modifiche dello schema.

Nell'esempio seguente viene illustrata la lettura di un'origine di streaming Kafka, il key cast di come STRING e value come VARIANTe la scrittura in una tabella di destinazione.

from pyspark.sql.functions import col, parse_json

(spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .option("startingOffsets", "earliest")
  .load()
  .select(
    col("key").cast("string"),
    parse_json(col("value").cast("string"))
  ).writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)