從 Apache Pulsar 串流
重要
這項功能處於公開預覽狀態。
在 Databricks Runtime 14.1 和更新版本中,您可以使用結構化串流從 Azure Databricks 上的 Apache Pulsar 串流數據。
結構化串流針對從 Pulsar 來源讀取的數據,提供一次完全相同的處理語意。
語法範例
以下是使用結構化串流從 Pulsar 讀取的基本範例:
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.load()
您必須一律提供 service.url
和下列其中一個選項來指定主題:
topic
topics
topicsPattern
如需選項的完整清單,請參閱 設定 Pulsar 串流讀取的選項。
向 Pulsar 進行驗證
Azure Databricks 支援對 Pulsar 的信任存放區和密鑰存放區驗證。 Databricks 建議在儲存設定詳細數據時使用秘密。
您可以在串流設定期間設定下列選項:
pulsar.client.authPluginClassName
pulsar.client.authParams
pulsar.client.useKeyStoreTls
pulsar.client.tlsTrustStoreType
pulsar.client.tlsTrustStorePath
pulsar.client.tlsTrustStorePassword
如果資料流使用 PulsarAdmin
,也請設定下列專案:
pulsar.admin.authPluginClassName
pulsar.admin.authParams
下列範例示範如何設定驗證選項:
val clientAuthParams = dbutils.secrets.get(scope = "pulsar", key = "clientAuthParams")
val clientPw = dbutils.secrets.get(scope = "pulsar", key = "clientPw")
// clientAuthParams is a comma-separated list of key-value pairs, such as:
//"keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.option("startingOffsets", startingOffsets)
.option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
.option("pulsar.client.authParams", clientAuthParams)
.option("pulsar.client.useKeyStoreTls", "true")
.option("pulsar.client.tlsTrustStoreType", "JKS")
.option("pulsar.client.tlsTrustStorePath", trustStorePath)
.option("pulsar.client.tlsTrustStorePassword", clientPw)
.load()
Pulsar 架構
從 Pulsar 讀取的記錄架構取決於主題如何編碼其架構。
- 針對 Avro 或 JSON 架構的主題,功能變數名稱和欄位類型會保留在產生的 Spark DataFrame 中。
- 對於沒有架構或具有 Pulsar 中簡單數據類型的主題,承載會載入至數據
value
行。 - 如果讀取器設定為讀取具有不同架構的多個主題,請將 設定
allowDifferentTopicSchemas
為將原始內容載入數據value
行。
Pulsar 記錄具有下列元數據欄位:
資料行 | 類型 |
---|---|
__key |
binary |
__topic |
string |
__messageId |
binary |
__publishTime |
timestamp |
__eventTime |
timestamp |
__messageProperties |
map<String, String> |
設定 Pulsar 串流讀取的選項
所有選項都會設定為使用 .option("<optionName>", "<optionValue>")
語法進行結構化串流讀取的一部分。 您也可以使用選項來設定驗證。 請參閱 驗證 Pulsar。
下表描述 Pulsar 的必要設定。 您只能指定其中一個選項 topic
, topics
或 topicsPattern
。
選項 | 預設值 | 說明 |
---|---|---|
service.url |
none | Pulsar 服務的 Pulsar serviceUrl 組態。 |
topic |
none | 要取用之主題的主題名稱字串。 |
topics |
none | 要取用之主題的逗號分隔清單。 |
topicsPattern |
none | 要與主題相符的Java regex 字串。 |
下表描述 Pulsar 支援的其他選項:
選項 | 預設值 | 說明 |
---|---|---|
predefinedSubscription |
none | 連接器用來追蹤 Spark 應用程式進度的預先定義訂用帳戶名稱。 |
subscriptionPrefix |
none | 連接器用來產生隨機訂閱以追蹤 Spark 應用程式進度的前置詞。 |
pollTimeoutMs |
120000 | 以毫秒為單位從 Pulsar 讀取訊息的逾時。 |
waitingForNonExistedTopic |
false |
連接器是否應該等到建立所需的主題。 |
failOnDataLoss |
true |
控制資料遺失時是否失敗(例如,刪除主題,或因為保留原則而刪除訊息)。 |
allowDifferentTopicSchemas |
false |
如果讀取了具有不同架構的多個主題,請使用此參數來關閉以架構為基礎的自動主題值還原串行化。 當這個 為 true 時,只會傳回原始值。 |
startingOffsets |
latest |
如果 latest 為 ,讀取器會在開始執行之後讀取最新的記錄。 如果 earliest 為 ,讀取器會從最早的位移讀取。 使用者也可以指定指定特定位移的 JSON 字串。 |
maxBytesPerTrigger |
none | 我們想要處理每個 microbatch 的最大位元元組數目的軟限制。 如果指定這個, admin.url 也必須指定。 |
admin.url |
none | Pulsar serviceHttpUrl 組態。 只有在指定時 maxBytesPerTrigger 才需要。 |
您也可以使用下列模式來指定任何 Pulsar 用戶端、系統管理員和讀取器設定:
模式 | 連接選項的連結 |
---|---|
pulsar.client.* |
Pulsar 用戶端設定 |
pulsar.admin.* |
Pulsar 系統管理員設定 |
pulsar.reader.* |
Pulsar 讀取器設定 |
建構起始位移 JSON
您可以手動建構訊息標識碼,以指定特定的位移,並將此值當做 JSON 傳遞至 startingOffsets
選項。 下列程式代碼範例示範此語法:
import org.apache.spark.sql.pulsar.JsonUtils
import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.client.impl.MessageIdImpl
val topic = "my-topic"
val msgId: MessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex)
val startOffsets = JsonUtils.topicOffsets(Map(topic -> msgId))
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topic", topic)
.option("startingOffsets", startOffsets)
.load()