共用方式為


read_pulsar 串流數據表值函式

適用於: 核取記號為「是」 Databricks SQL 核取記號為「是」 Databricks Runtime 14.1 和更新版本

重要

這項功能處於公開預覽狀態

傳回資料表,其中包含從 Pulsar 讀取的記錄。

此數據表值函式僅支援串流處理,而不支援批次查詢。

語法

read_pulsar ( { option_key => option_value } [, ...] )

引數

此函式需要 選項索引鍵的具名參數調用

和是必要選項serviceUrltopic

這裡簡短說明自變數。 如需擴充描述,請參閱 結構化串流 Pulsar 檔。

選項 類型 預設 描述
serviceUrl 字串 必要 Pulsar 服務的 URI。
主題 字串 必要 要從中讀取的主題。
predefinedSubscription 字串 連接器用來追蹤 Spark 應用程式進度的預先定義訂用帳戶名稱。
subscriptionPrefix 字串 連接器用來產生隨機訂閱以追蹤 Spark 應用程式進度的前置詞。
pollTimeoutMs LONG 120000 以毫秒為單位從 Pulsar 讀取訊息的逾時。
failOnDataLoss BOOLEAN true 控制資料遺失時是否失敗(例如,刪除主題,或因為保留原則而刪除訊息)。
startingOffsets 字串 最新 查詢啟動時的起點,最早、最新或指定特定位移的 JSON 字串。 如果最新,讀取器會在開始執行之後讀取最新的記錄。 如果最早,讀取器會從最早的位移讀取。 使用者也可以指定指定特定位移的 JSON 字串。
startingTime 字串 指定時,Pulsar 來源會讀取從指定 startingTime 位置開始的訊息。

下列自變數用於驗證脈衝星用戶端:

選項 類型 預設 描述
pulsarClientAuthPluginClassName 字串 驗證外掛程式的名稱。
pulsarClientAuthParams 字串 驗證外掛程式的參數。
pulsarClientUseKeyStoreTls 字串 是否要使用 KeyStore 進行 Tls 驗證。
pulsarClientTlsTrustStoreType 字串 Tls 驗證的 TrustStore 檔案類型。
pulsarClientTlsTrustStorePath 字串 Tls 驗證的 TrustStore 檔案路徑。
pulsarClientTlsTrustStorePassword 字串 Tls 驗證的 TrustStore 密碼。

這些自變數用於設定和驗證脈衝星許可控制,只有在啟用許可控制時才需要 Pulsar 管理員設定(設定 maxBytesPerTrigger 時)

選項 類型 預設 描述
maxBytesPerTrigger BIGINT 我們想要處理每個 microbatch 的最大位元元組數目的軟限制。 如果指定此專案,也必須指定 admin.url。
adminUrl 字串 Pulsar 服務HttpUrl 組態。 只有在指定 maxBytesPerTrigger 時才需要。
pulsarAdminAuthPlugin 字串 驗證外掛程式的名稱。
pulsarAdminAuthParams 字串 驗證外掛程式的參數。
pulsarClientUseKeyStoreTls 字串 是否要使用 KeyStore 進行 Tls 驗證。
pulsarAdminTlsTrustStoreType 字串 Tls 驗證的 TrustStore 檔案類型。
pulsarAdminTlsTrustStorePath 字串 Tls 驗證的 TrustStore 檔案路徑。
pulsarAdminTlsTrustStorePassword 字串 Tls 驗證的 TrustStore 密碼。

傳回

具有下列架構的脈衝星記錄數據表。

  • __key STRING NOT NULL:P ulsar 訊息密鑰。

  • value BINARY NOT NULL:P ulsar 訊息值。

    注意:對於 Avro 或 JSON 架構的主題,而不是將內容載入二進位值欄位,內容將會展開以保留 Pulsar 主題的功能變數名稱和欄位類型。

  • __topic STRING NOT NULL:P ulsar 主題名稱。

  • __messageId BINARY NOT NULL:P ulsar 訊息標識碼。

  • __publishTime TIMESTAMP NOT NULL:P ulsar 訊息發佈時間。

  • __eventTime TIMESTAMP NOT NULL:P ulsar 訊息事件時間。

  • __messageProperties MAP<STRING, STRING>:P ulsar 訊息屬性。

範例

-- Streaming from Pulsar
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pulsar(
      serviceUrl => 'pulsar://broker.example.com:6650',
      startingOffsets => 'earliest',
      topic => 'my-topic');

-- Streaming Ingestion from Pulsar with authentication
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pulsar(
        serviceUrl => 'pulsar://broker.example.com:6650',
        startingOffsets => 'earliest',
        topic => 'my-topic',
        pulsarClientAuthPluginClassName => 'org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls',
        pulsarClientAuthParams => 'keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw'
        );

The data can now to be queried from the testing.streaming_table for further analysis.