Condividi tramite


funzione a valori tableread_pulsar streaming

Si applica a:segno di spunta sì Databricks SQL segno di spunta sì Databricks Runtime 14.1 e versioni successive

Importante

Questa funzionalità è disponibile in anteprima pubblica.

Restituisce un table con le registrazioni lette da Pulsar.

Questa funzione a valore tablesupporta solo lo streaming e non le query batch.

Sintassi

read_pulsar ( { option_key => option_value } [, ...] )

Argomenti

Questa funzione richiede la chiamata di parametri denominati per le chiavi di opzione.

Le opzioni serviceUrl e topic sono obbligatorie.

Le descrizioni degli argomenti sono brevi qui. Per le descrizioni estese, vedere la documentazione di Pulsar di streaming strutturato.

Opzione Type Default Descrizione
serviceUrl STRING Obbligatorio URI del servizio Pulsar.
argomento STRING Obbligatorio Argomento da cui leggere.
predefinedSubscription STRING None Nome di sottoscrizione predefinito usato dal connettore per tenere traccia dello stato dell'applicazione Spark.
subscriptionPrefix STRING None Prefisso usato dal connettore per generate una sottoscrizione aleatoria per tenere traccia dell'avanzamento dell'applicazione Spark.
pollTimeoutMs LONG 120000 Timeout per la lettura dei messaggi da Pulsar in millisecondi.
failOnDataLoss BOOLEAN true Controlla se non eseguire una query quando i dati vengono persi( ad esempio, gli argomenti vengono eliminati o i messaggi vengono eliminati a causa dei criteri di conservazione).
startingOffsets STRING più recente Punto di inizio dell'avvio di una query, prima, più recente o stringa JSON che specifica un offsetspecifico. Se più recente, il lettore legge i record più recenti dopo l'avvio dell'esecuzione. Se più antico, il lettore legge dal più antico offset. L'utente può anche specificare una stringa JSON che specifica un offsetspecifico.
startingTime STRING None Se specificato, l'origine Pulsar leggerà i messaggi a partire dalla posizione del valore startingTime specificato.

Per l'autenticazione del client pulsar vengono usati gli argomenti seguenti:

Opzione Type Default Descrizione
pulsarClientAuthPluginClassName STRING None Nome del plug-in di autenticazione.
pulsarClientAuthParams STRING None Parameters per il plug-in di autenticazione.
pulsarClientUseKeyStoreTls STRING None Indica se usare KeyStore per l'autenticazione tls.
pulsarClientTlsTrustStoreType STRING None Tipo di file TrustStore per l'autenticazione tls.
pulsarClientTlsTrustStorePath STRING None Percorso del file TrustStore per l'autenticazione tls.
pulsarClientTlsTrustStorePassword STRING None Password trustStore per l'autenticazione tls.

Questi argomenti vengono utilizzati per la configurazione e l'autenticazione del controllo di ammissione di pulsar; la configurazione dell'amministratore pulsar è necessaria solo quando il controllo di ammissione è abilitato (quando maxBytesPerTrigger è set).

Opzione Type Default Descrizione
maxBytesPerTrigger bigint None Un limite soft limit del numero massimo di byte che vogliamo elaborare per microbatch. Se questa opzione è specificata, è necessario specificare anche admin.url.
adminUrl STRING None Configurazione del servizio PulsarHttpUrl. È necessario solo quando viene specificato maxBytesPerTrigger.
pulsarAdminAuthPlugin STRING None Nome del plug-in di autenticazione.
pulsarAdminAuthParams STRING None Parameters per il plug-in di autenticazione.
pulsarClientUseKeyStoreTls STRING None Indica se usare KeyStore per l'autenticazione tls.
pulsarAdminTlsTrustStoreType STRING None Tipo di file TrustStore per l'autenticazione tls.
pulsarAdminTlsTrustStorePath STRING None Percorso del file TrustStore per l'autenticazione tls.
pulsarAdminTlsTrustStorePassword STRING None Password trustStore per l'autenticazione tls.

Valori restituiti

Un table di record pulsar con il seguente schema.

  • __key STRING NOT NULL: chiave del messaggio Pulsar.

  • value BINARY NOT NULL: valore del messaggio Pulsar.

    Nota: per gli argomenti con Avro o JSON schema, invece di caricare il contenuto in un campo valore binario, il contenuto verrà espanso per mantenere i nomi dei campi e i tipi di campo dell'argomento Pulsar.

  • __topic STRING NOT NULL: nome dell'argomento Pulsar.

  • __messageId BINARY NOT NULL: ID messaggio Pulsar.

  • __publishTime TIMESTAMP NOT NULL: tempo di pubblicazione del messaggio Pulsar.

  • __eventTime TIMESTAMP NOT NULL: ora dell'evento del messaggio Pulsar.

  • __messageProperties MAP<STRING, STRING>: proprietà del messaggio Pulsar.

Esempi

-- Streaming from Pulsar
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pulsar(
      serviceUrl => 'pulsar://broker.example.com:6650',
      startingOffsets => 'earliest',
      topic => 'my-topic');

-- Streaming Ingestion from Pulsar with authentication
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pulsar(
        serviceUrl => 'pulsar://broker.example.com:6650',
        startingOffsets => 'earliest',
        topic => 'my-topic',
        pulsarClientAuthPluginClassName => 'org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls',
        pulsarClientAuthParams => 'keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw'
        );

The data can now to be queried from the testing.streaming_table for further analysis.