funzione a valori tableread_pulsar
streaming
Si applica a: Databricks SQL Databricks Runtime 14.1 e versioni successive
Importante
Questa funzionalità è disponibile in anteprima pubblica.
Restituisce un table con le registrazioni lette da Pulsar.
Questa funzione a valore tablesupporta solo lo streaming e non le query batch.
Sintassi
read_pulsar ( { option_key => option_value } [, ...] )
Argomenti
Questa funzione richiede la chiamata di parametri denominati per le chiavi di opzione.
Le opzioni serviceUrl
e topic
sono obbligatorie.
Le descrizioni degli argomenti sono brevi qui. Per le descrizioni estese, vedere la documentazione di Pulsar di streaming strutturato.
Opzione | Type | Default | Descrizione |
---|---|---|---|
serviceUrl | STRING | Obbligatorio | URI del servizio Pulsar. |
argomento | STRING | Obbligatorio | Argomento da cui leggere. |
predefinedSubscription | STRING | None | Nome di sottoscrizione predefinito usato dal connettore per tenere traccia dello stato dell'applicazione Spark. |
subscriptionPrefix | STRING | None | Prefisso usato dal connettore per generate una sottoscrizione aleatoria per tenere traccia dell'avanzamento dell'applicazione Spark. |
pollTimeoutMs | LONG | 120000 | Timeout per la lettura dei messaggi da Pulsar in millisecondi. |
failOnDataLoss | BOOLEAN | true | Controlla se non eseguire una query quando i dati vengono persi( ad esempio, gli argomenti vengono eliminati o i messaggi vengono eliminati a causa dei criteri di conservazione). |
startingOffsets | STRING | più recente | Punto di inizio dell'avvio di una query, prima, più recente o stringa JSON che specifica un offsetspecifico. Se più recente, il lettore legge i record più recenti dopo l'avvio dell'esecuzione. Se più antico, il lettore legge dal più antico offset. L'utente può anche specificare una stringa JSON che specifica un offsetspecifico. |
startingTime | STRING | None | Se specificato, l'origine Pulsar leggerà i messaggi a partire dalla posizione del valore startingTime specificato. |
Per l'autenticazione del client pulsar vengono usati gli argomenti seguenti:
Opzione | Type | Default | Descrizione |
---|---|---|---|
pulsarClientAuthPluginClassName | STRING | None | Nome del plug-in di autenticazione. |
pulsarClientAuthParams | STRING | None | Parameters per il plug-in di autenticazione. |
pulsarClientUseKeyStoreTls | STRING | None | Indica se usare KeyStore per l'autenticazione tls. |
pulsarClientTlsTrustStoreType | STRING | None | Tipo di file TrustStore per l'autenticazione tls. |
pulsarClientTlsTrustStorePath | STRING | None | Percorso del file TrustStore per l'autenticazione tls. |
pulsarClientTlsTrustStorePassword | STRING | None | Password trustStore per l'autenticazione tls. |
Questi argomenti vengono utilizzati per la configurazione e l'autenticazione del controllo di ammissione di pulsar; la configurazione dell'amministratore pulsar è necessaria solo quando il controllo di ammissione è abilitato (quando maxBytesPerTrigger è set).
Opzione | Type | Default | Descrizione |
---|---|---|---|
maxBytesPerTrigger | bigint | None | Un limite soft limit del numero massimo di byte che vogliamo elaborare per microbatch. Se questa opzione è specificata, è necessario specificare anche admin.url. |
adminUrl | STRING | None | Configurazione del servizio PulsarHttpUrl. È necessario solo quando viene specificato maxBytesPerTrigger. |
pulsarAdminAuthPlugin | STRING | None | Nome del plug-in di autenticazione. |
pulsarAdminAuthParams | STRING | None | Parameters per il plug-in di autenticazione. |
pulsarClientUseKeyStoreTls | STRING | None | Indica se usare KeyStore per l'autenticazione tls. |
pulsarAdminTlsTrustStoreType | STRING | None | Tipo di file TrustStore per l'autenticazione tls. |
pulsarAdminTlsTrustStorePath | STRING | None | Percorso del file TrustStore per l'autenticazione tls. |
pulsarAdminTlsTrustStorePassword | STRING | None | Password trustStore per l'autenticazione tls. |
Valori restituiti
Un table di record pulsar con il seguente schema.
__key STRING NOT NULL
: chiave del messaggio Pulsar.value BINARY NOT NULL
: valore del messaggio Pulsar.Nota: per gli argomenti con Avro o JSON schema, invece di caricare il contenuto in un campo valore binario, il contenuto verrà espanso per mantenere i nomi dei campi e i tipi di campo dell'argomento Pulsar.
__topic STRING NOT NULL
: nome dell'argomento Pulsar.__messageId BINARY NOT NULL
: ID messaggio Pulsar.__publishTime TIMESTAMP NOT NULL
: tempo di pubblicazione del messaggio Pulsar.__eventTime TIMESTAMP NOT NULL
: ora dell'evento del messaggio Pulsar.__messageProperties MAP<STRING, STRING>
: proprietà del messaggio Pulsar.
Esempi
-- Streaming from Pulsar
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pulsar(
serviceUrl => 'pulsar://broker.example.com:6650',
startingOffsets => 'earliest',
topic => 'my-topic');
-- Streaming Ingestion from Pulsar with authentication
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pulsar(
serviceUrl => 'pulsar://broker.example.com:6650',
startingOffsets => 'earliest',
topic => 'my-topic',
pulsarClientAuthPluginClassName => 'org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls',
pulsarClientAuthParams => 'keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw'
);
The data can now to be queried from the testing.streaming_table for further analysis.