共用方式為


從 Apache Pulsar 串流

重要

這項功能處於公開預覽狀態

在 Databricks Runtime 14.1 和更新版本中,您可以使用結構化串流從 Azure Databricks 上的 Apache Pulsar 串流數據。

結構化串流針對從 Pulsar 來源讀取的數據,提供一次完全相同的處理語意。

語法範例

以下是使用結構化串流從 Pulsar 讀取的基本範例:

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .load()

您必須一律提供 service.url 和下列其中一個選項來指定主題:

  • topic
  • topics
  • topicsPattern

如需選項的完整清單,請參閱 設定 Pulsar 串流讀取的選項。

向 Pulsar 進行驗證

Azure Databricks 支援對 Pulsar 的信任存放區和密鑰存放區驗證。 Databricks 建議在儲存設定詳細數據時使用秘密。

您可以在串流設定期間設定下列選項:

  • pulsar.client.authPluginClassName
  • pulsar.client.authParams
  • pulsar.client.useKeyStoreTls
  • pulsar.client.tlsTrustStoreType
  • pulsar.client.tlsTrustStorePath
  • pulsar.client.tlsTrustStorePassword

如果資料流使用 PulsarAdmin,也請設定下列專案:

  • pulsar.admin.authPluginClassName
  • pulsar.admin.authParams

下列範例示範如何設定驗證選項:

val clientAuthParams = dbutils.secrets.get(scope = "pulsar", key = "clientAuthParams")
val clientPw = dbutils.secrets.get(scope = "pulsar", key = "clientPw")

// clientAuthParams is a comma-separated list of key-value pairs, such as:
//"keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .option("startingOffsets", startingOffsets)
  .option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
  .option("pulsar.client.authParams", clientAuthParams)
  .option("pulsar.client.useKeyStoreTls", "true")
  .option("pulsar.client.tlsTrustStoreType", "JKS")
  .option("pulsar.client.tlsTrustStorePath", trustStorePath)
  .option("pulsar.client.tlsTrustStorePassword", clientPw)
  .load()

Pulsar 架構

從 Pulsar 讀取的記錄架構取決於主題如何編碼其架構。

  • 針對 Avro 或 JSON 架構的主題,功能變數名稱和欄位類型會保留在產生的 Spark DataFrame 中。
  • 對於沒有架構或具有 Pulsar 中簡單數據類型的主題,承載會載入至數據 value 行。
  • 如果讀取器設定為讀取具有不同架構的多個主題,請將 設定 allowDifferentTopicSchemas 為將原始內容載入數據 value 行。

Pulsar 記錄具有下列元數據欄位:

資料行 類型
__key binary
__topic string
__messageId binary
__publishTime timestamp
__eventTime timestamp
__messageProperties map<String, String>

設定 Pulsar 串流讀取的選項

所有選項都會設定為使用 .option("<optionName>", "<optionValue>") 語法進行結構化串流讀取的一部分。 您也可以使用選項來設定驗證。 請參閱 驗證 Pulsar

下表描述 Pulsar 的必要設定。 您只能指定其中一個選項 topictopicstopicsPattern

選項 預設值 說明
service.url none Pulsar 服務的 Pulsar serviceUrl 組態。
topic none 要取用之主題的主題名稱字串。
topics none 要取用之主題的逗號分隔清單。
topicsPattern none 要與主題相符的Java regex 字串。

下表描述 Pulsar 支援的其他選項:

選項 預設值 說明
predefinedSubscription none 連接器用來追蹤 Spark 應用程式進度的預先定義訂用帳戶名稱。
subscriptionPrefix none 連接器用來產生隨機訂閱以追蹤 Spark 應用程式進度的前置詞。
pollTimeoutMs 120000 以毫秒為單位從 Pulsar 讀取訊息的逾時。
waitingForNonExistedTopic false 連接器是否應該等到建立所需的主題。
failOnDataLoss true 控制資料遺失時是否失敗(例如,刪除主題,或因為保留原則而刪除訊息)。
allowDifferentTopicSchemas false 如果讀取了具有不同架構的多個主題,請使用此參數來關閉以架構為基礎的自動主題值還原串行化。 當這個 為 true時,只會傳回原始值。
startingOffsets latest 如果 latest為 ,讀取器會在開始執行之後讀取最新的記錄。 如果 earliest為 ,讀取器會從最早的位移讀取。 使用者也可以指定指定特定位移的 JSON 字串。
maxBytesPerTrigger none 我們想要處理每個 microbatch 的最大位元元組數目的軟限制。 如果指定這個, admin.url 也必須指定。
admin.url none Pulsar serviceHttpUrl 組態。 只有在指定時 maxBytesPerTrigger 才需要。

您也可以使用下列模式來指定任何 Pulsar 用戶端、系統管理員和讀取器設定:

模式 連接選項的連結
pulsar.client.* Pulsar 用戶端設定
pulsar.admin.* Pulsar 系統管理員設定
pulsar.reader.* Pulsar 讀取器設定

建構起始位移 JSON

您可以手動建構訊息標識碼,以指定特定的位移,並將此值當做 JSON 傳遞至 startingOffsets 選項。 下列程式代碼範例示範此語法:

import org.apache.spark.sql.pulsar.JsonUtils
import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.client.impl.MessageIdImpl

val topic = "my-topic"
val msgId: MessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex)
val startOffsets = JsonUtils.topicOffsets(Map(topic -> msgId))

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topic", topic)
  .option("startingOffsets", startOffsets)
  .load()