訂閱Google Pub/Sub
Azure Databricks 提供內建連接器來訂閱 Databricks Runtime 13.3 LTS 和更新版本中的 Google Pub/Sub。 此連接器提供訂閱者記錄的一次性處理語意。
注意
Pub/Sub 可能會發佈重複的記錄,而記錄可能會依序抵達訂閱者。 您應該撰寫 Azure Databricks 程式代碼來處理重複和順序錯亂的記錄。
語法範例
下列程式代碼範例示範如何設定從 Pub/Sub 讀取的結構化串流的基本語法:
val authOptions: Map[String, String] =
Map("clientId" -> clientId,
"clientEmail" -> clientEmail,
"privateKey" -> privateKey,
"privateKeyId" -> privateKeyId)
val query = spark.readStream
.format("pubsub")
// we will create a Pubsub subscription if none exists with this id
.option("subscriptionId", "mysub") // required
.option("topicId", "mytopic") // required
.option("projectId", "myproject") // required
.options(authOptions)
.load()
如需更多組態選項,請參閱 設定發佈/子串流讀取的選項。
設定對 Pub/Sub 的存取
Databricks 建議在提供授權選項時使用秘密。 授權連線需要下列選項:
clientEmail
clientId
privateKey
privateKeyId
下表描述所設定認證所需的角色:
角色 | 必要或選用 | 其使用方式 |
---|---|---|
roles/pubsub.viewer 或 roles/viewer |
必要 | 檢查訂用帳戶是否存在並取得訂用帳戶 |
roles/pubsub.subscriber |
必要 | 從訂用帳戶擷取數據 |
roles/pubsub.editor 或 roles/editor |
選擇性 | 如果訂用帳戶不存在,也允許在串流終止時使用 deleteSubscriptionOnStreamStop 來刪除訂用帳戶,則啟用建立訂閱 |
發行/子架構
數據流的架構符合從 Pub/Sub 擷取的記錄,如下表所述:
欄位 | 類型 |
---|---|
messageId |
StringType |
payload |
ArrayType[ByteType] |
attributes |
StringType |
publishTimestampInMillis |
LongType |
設定發佈/子串流讀取的選項
下表描述 Pub/Sub 支援的選項。 所有選項都會設定為使用 .option("<optionName>", "<optionValue>")
語法進行結構化串流讀取的一部分。
注意
某些 Pub/Sub 組態選項會使用擷取的概念,而不是微批次。 這會反映內部實作詳細數據,而且選項的運作方式類似於其他結構化串流連接器中的主編,不同之處在於會擷取和處理記錄。
選項 | 預設值 | 說明 |
---|---|---|
numFetchPartitions |
將 設定為數據流初始化時存在的執行程式數目的一半。 | 從訂用帳戶擷取記錄的平行 Spark 工作數目。 |
deleteSubscriptionOnStreamStop |
false |
如果 true 為 ,當串流作業結束時,會刪除傳遞至數據流的訂用帳戶。 |
maxBytesPerTrigger |
none | 每個觸發的微批次期間,要處理的批次大小軟性限制。 |
maxRecordsPerFetch |
1000 | 處理記錄之前要擷取每個工作的記錄數目。 |
maxFetchPeriod |
10 秒 | 處理記錄之前,每個工作要擷取的時間持續時間。 Databricks 建議使用預設值。 |
Pub/Sub 的累加批處理語意
您可以使用 Trigger.AvailableNow
從 Pub/Sub 來源取用增量批次的可用記錄。
Azure Databricks 會在您使用 設定開始讀取 Trigger.AvailableNow
時記錄時間戳。 批次處理的記錄包含所有先前擷取的數據,以及時間戳小於記錄數據流開始時間戳的任何新發行記錄。
請參閱設定累加批次處理。
監視串流計量
結構化串流進度計量會報告擷取和準備處理的記錄數目、擷取和準備處理的記錄大小,以及串流開始後所看到重複專案的數目。 以下是這些計量的範例:
"metrics" : {
"numDuplicatesSinceStreamStart" : "1",
"numRecordsReadyToProcess" : "1",
"sizeOfRecordsReadyToProcess" : "8"
}
限制
Pub/Sub 不支持推測性執行 (spark.speculation
)。