read_pulsar
串流數據表值函式
適用於: Databricks SQL Databricks Runtime 14.1 和更新版本
重要
這項功能處於公開預覽狀態。
傳回資料表,其中包含從 Pulsar 讀取的記錄。
此數據表值函式僅支援串流處理,而不支援批次查詢。
語法
read_pulsar ( { option_key => option_value } [, ...] )
引數
此函式需要 選項索引鍵的具名參數調用 。
和是必要選項serviceUrl
topic
。
這裡簡短說明自變數。 如需擴充描述,請參閱 結構化串流 Pulsar 檔。
選項 | 類型 | 預設 | 描述 |
---|---|---|---|
serviceUrl | 字串 | 必要 | Pulsar 服務的 URI。 |
主題 | 字串 | 必要 | 要從中讀取的主題。 |
predefinedSubscription | 字串 | 無 | 連接器用來追蹤 Spark 應用程式進度的預先定義訂用帳戶名稱。 |
subscriptionPrefix | 字串 | 無 | 連接器用來產生隨機訂閱以追蹤 Spark 應用程式進度的前置詞。 |
pollTimeoutMs | LONG | 120000 | 以毫秒為單位從 Pulsar 讀取訊息的逾時。 |
failOnDataLoss | BOOLEAN | true | 控制資料遺失時是否失敗(例如,刪除主題,或因為保留原則而刪除訊息)。 |
startingOffsets | 字串 | 最新 | 查詢啟動時的起點,最早、最新或指定特定位移的 JSON 字串。 如果最新,讀取器會在開始執行之後讀取最新的記錄。 如果最早,讀取器會從最早的位移讀取。 使用者也可以指定指定特定位移的 JSON 字串。 |
startingTime | 字串 | 無 | 指定時,Pulsar 來源會讀取從指定 startingTime 位置開始的訊息。 |
下列自變數用於驗證脈衝星用戶端:
選項 | 類型 | 預設 | 描述 |
---|---|---|---|
pulsarClientAuthPluginClassName | 字串 | 無 | 驗證外掛程式的名稱。 |
pulsarClientAuthParams | 字串 | 無 | 驗證外掛程式的參數。 |
pulsarClientUseKeyStoreTls | 字串 | 無 | 是否要使用 KeyStore 進行 Tls 驗證。 |
pulsarClientTlsTrustStoreType | 字串 | 無 | Tls 驗證的 TrustStore 檔案類型。 |
pulsarClientTlsTrustStorePath | 字串 | 無 | Tls 驗證的 TrustStore 檔案路徑。 |
pulsarClientTlsTrustStorePassword | 字串 | 無 | Tls 驗證的 TrustStore 密碼。 |
這些自變數用於設定和驗證脈衝星許可控制,只有在啟用許可控制時才需要 Pulsar 管理員設定(設定 maxBytesPerTrigger 時)
選項 | 類型 | 預設 | 描述 |
---|---|---|---|
maxBytesPerTrigger | BIGINT | 無 | 我們想要處理每個 microbatch 的最大位元元組數目的軟限制。 如果指定此專案,也必須指定 admin.url。 |
adminUrl | 字串 | 無 | Pulsar 服務HttpUrl 組態。 只有在指定 maxBytesPerTrigger 時才需要。 |
pulsarAdminAuthPlugin | 字串 | 無 | 驗證外掛程式的名稱。 |
pulsarAdminAuthParams | 字串 | 無 | 驗證外掛程式的參數。 |
pulsarClientUseKeyStoreTls | 字串 | 無 | 是否要使用 KeyStore 進行 Tls 驗證。 |
pulsarAdminTlsTrustStoreType | 字串 | 無 | Tls 驗證的 TrustStore 檔案類型。 |
pulsarAdminTlsTrustStorePath | 字串 | 無 | Tls 驗證的 TrustStore 檔案路徑。 |
pulsarAdminTlsTrustStorePassword | 字串 | 無 | Tls 驗證的 TrustStore 密碼。 |
傳回
具有下列架構的脈衝星記錄數據表。
__key STRING NOT NULL
:P ulsar 訊息密鑰。value BINARY NOT NULL
:P ulsar 訊息值。注意:對於 Avro 或 JSON 架構的主題,而不是將內容載入二進位值欄位,內容將會展開以保留 Pulsar 主題的功能變數名稱和欄位類型。
__topic STRING NOT NULL
:P ulsar 主題名稱。__messageId BINARY NOT NULL
:P ulsar 訊息標識碼。__publishTime TIMESTAMP NOT NULL
:P ulsar 訊息發佈時間。__eventTime TIMESTAMP NOT NULL
:P ulsar 訊息事件時間。__messageProperties MAP<STRING, STRING>
:P ulsar 訊息屬性。
範例
-- Streaming from Pulsar
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pulsar(
serviceUrl => 'pulsar://broker.example.com:6650',
startingOffsets => 'earliest',
topic => 'my-topic');
-- Streaming Ingestion from Pulsar with authentication
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pulsar(
serviceUrl => 'pulsar://broker.example.com:6650',
startingOffsets => 'earliest',
topic => 'my-topic',
pulsarClientAuthPluginClassName => 'org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls',
pulsarClientAuthParams => 'keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw'
);
The data can now to be queried from the testing.streaming_table for further analysis.