使用 Apache Kafka 和 Azure Databricks 進行串流處理
本文說明如何在 Azure Databricks 上執行結構化串流工作負載時,使用 Apache Kafka 作為來源或接收器。
如需更多 Kafka,請參閱 Kafka 文件。
從 Kafka 讀取資料
以下是從 Kafka 串流讀取的範例:
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
)
Azure Databricks 也支援 Kafka 資料來源的批次讀取語意,如下列範例所示:
df = (spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
)
對於累加式批次載入,Databricks 建議將 Kafka 與 Trigger.AvailableNow
搭配使用。 請參閱設定累加批次處理。
在 Databricks Runtime 13.3 LTS 和更新版本中,Azure Databricks 提供 SQL 函式來讀取 Kafka 資料。 只有 Delta Live Tables 或 Databricks SQL 中的串流 tables 才支援使用 SQL 串流處理。 請參閱 read_kafka table-valued 函式。
設定 Kafka 結構化串流讀取器
Azure Databricks 提供 kafka
關鍵詞作為數據格式,以將 connections 設定為 Kafka 0.10+ 。
以下是 Kafka 的最常見組態:
有多種方式指定要訂閱的主題。 您應該只提供下列其中一個 parameters:
選項 | 值 | Description |
---|---|---|
訂閱 | 用逗號分隔的主題列表 list。 | 可供訂閱的主題 list。 |
subscribePattern | Java regex 字串。 | 用於訂閱主題的模式。 |
assign | JSON 字串 {"topicA":[0,1],"topic":[2,4]} 。 |
要取用的特定 topicPartitions。 |
其他值得注意的組態:
選項 | 值 | 預設值 | Description |
---|---|---|---|
kafka.bootstrap.servers | host:port 的逗號分隔 list。 | empty | [必要] Kafka bootstrap.servers 組態。 如果您發現 Kafka 沒有資料,請先檢查代理位址 list。 如果訊息代理程式位址 list 不正確,則可能不會有任何錯誤。 這是因為 Kafka 用戶端假設訊息代理程式最終將變得可用,並且在發生網路錯誤的情況下會永遠重試。 |
failOnDataLoss |
true 或 false 。 |
true |
[選用] 在資料可能遺失的情況下是否讓查詢失敗。 在許多情況下 (例如主題已刪除、主題在處理前已截斷等等),查詢可能永遠也無法從 Kafka 讀取資料。 我們會嘗試保守地估計資料是否可能遺失。 有時這可能會導致誤報。 將 Set 設為 false ,如果它無法如預期運作,或是您希望在資料遺失時查詢能夠繼續處理。 |
minPartitions | 整數 >= 0,0 = 停用。 | 0 (已停用) | [選用] 要從 Kafka 讀取的分割數目下限。 您可以使用 minPartitions 選項將 Spark 設定為使用任意最少數目的分割從 Kafka 讀取。 通常,Spark 在 Kafka topicPartitions 與從 Kafka 取用的 Spark 分割之間存在 1-1 對應。 如果您 set 大於 Kafka topicPartitions 的值 minPartitions 選項,Spark 會將大型 Kafka 分割區分割到較小的片段。 這個選項在尖峰負載、資料傾斜,或串流處理進度落後時,可以設為 set,以提高處理速率。 這需要在每次觸發時初始化 Kafka 使用者。如果您在連線至 Kafka 時使用 SSL,這可能會影響效能。 |
kafka.group.id | Kafka 取用者群組識別碼。 | 非 set | [選用] 從 Kafka 讀取時要使用的群組識別碼。 請謹慎使用此選項。 根據預設,每個查詢都會產生用於讀取資料的唯一群組識別碼。 這可確保每個查詢都具有自己的取用者群組,不會受到任何其他取用者的干擾,因此可以讀取其訂閱的主題的所有分割。 在某些情況下 (例如,Kafka 群組型授權),您可能希望使用特定的授權群組識別碼來讀取資料。 您可以選擇性地 set 群組識別碼。 但是,執行此作業時要特別謹慎,因為它可能會導致非預期行為。 - 同時執行具有相同群組識別碼的查詢 (包括批次和串流) 可能會相互干擾,導致每個查詢僅讀取部分資料。 - 快速連續啟動/重新啟動查詢時,也可能會發生這種情況。 若要將這類問題降到最低,set Kafka 使用者設定 session.timeout.ms 為較小值。 |
startingOffsets | 最早、最新 | 最新 | [選擇性]查詢啟動時的起點,可能是「最早」,也就是從最早位移,或是 json 字串,指定每個 TopicPartition 的起始 offset。 在 json 中,-2 作為 offset 可用來參考最早的記錄,而 -1 則代表最新的記錄。 注意:對於批次查詢,不允許使用最新 (無論是隱式還是在 json 中使用 -1)。 對於串流查詢,只有當啟動新的查詢時才會套用此操作,且恢復時將一律從查詢中止的where繼續。 查詢期間新探索的分割將最早啟動。 |
如需其他選用組態,請參閱結構化串流 Kafka 整合指南。
Schema 的 Kafka 記錄
Kafka 記錄編號 schema 為:
Column | 類型 |
---|---|
key | binary |
value | binary |
主題 | 字串 |
partition | int |
offset | long |
timestamp | long |
timestampType | int |
一律使用 key
將 value
和 ByteArrayDeserializer
還原序列化為位元組陣列。 使用 DataFrame 作業(例如 cast("string")
)明確地將索引鍵和 values反序列化。
將資料寫入 Kafka
以下是串流寫入 Kafka 的範例:
(df
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.start()
)
Azure Databricks 也支援 Kafka 資料接收器的批次寫入語意,如下列範例所示:
(df
.write
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.save()
)
設定 Kafka 結構化串流寫入器
重要
Databricks Runtime 13.3 LTS 和更新版本包含一個較新版本的 kafka-clients
程式庫,該程式庫預設啟用等冪寫入。 如果 Kafka 接收器使用 2.8.0 或更低版本,並且已設定 ACL 但未啟用 IDEMPOTENT_WRITE
,則寫入將會失敗並出現錯誤訊息 org.apache.kafka.common.KafkaException:
Cannot execute transactional method because we are in an error state
。
透過升級至 Kafka 2.8.0 或更新版本或者在設定結構化串流寫入器時設定 .option(“kafka.enable.idempotence”, “false”)
,來解決此錯誤。
提供給 DataStreamWriter 的 schema 會與 Kafka 匯入端互動。 您可以使用下列欄位:
Column 名稱 | 必要或選用 | 類型 |
---|---|---|
key |
選用 |
STRING 或 BINARY |
value |
必要 |
STRING 或 BINARY |
headers |
選用 | ARRAY |
topic |
選擇性(如果 topic 是 set 作為編寫器選項,則忽略) |
STRING |
partition |
選用 | INT |
以下是在寫入 Kafka 時常見的選項:set
選項 | 值 | 預設值 | Description |
---|---|---|---|
kafka.boostrap.servers |
以逗號分隔的 list<host:port> |
none | [必要] Kafka bootstrap.servers 組態。 |
topic |
STRING |
非 set | [選用] 設定要寫入的所有資料列的主題。 此選項會覆蓋資料中任何現有的主題 column。 |
includeHeaders |
BOOLEAN |
false |
[選用] 是否要在資料列中包含 Kafka 標頭。 |
如需其他選用組態,請參閱結構化串流 Kafka 整合指南。
擷取 Kafka 計量
您可以 get 串流查詢在具有 avgOffsetsBehindLatest
、maxOffsetsBehindLatest
和 minOffsetsBehindLatest
計量之所有已訂閱主題中最新的可用可用 offset 數目的平均值、最小值和最大值。 請參閱以互動方式讀取計量。
注意
在 Databricks Runtime 9.1 和更新版本中可用。
Get 查詢進程尚未從訂閱的主題取用的估計位元組總數,方法是檢查 estimatedTotalBytesBehindLatest
的值。 此估計值基於在過去 300 秒內處理的批次。 您可以透過將選項 bytesEstimateWindowLength
設定為其他值,來變更估計值所基於的時間範圍。 例如,若要將它 set 為 10 分鐘:
df = (spark.readStream
.format("kafka")
.option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)
如果在筆記本中執行串流,您可以在串流查詢進度儀表板中的 [原始資料] 索引標籤下查看這些計量:
{
"sources" : [ {
"description" : "KafkaV2[Subscribe[topic]]",
"metrics" : {
"avgOffsetsBehindLatest" : "4.0",
"maxOffsetsBehindLatest" : "4",
"minOffsetsBehindLatest" : "4",
"estimatedTotalBytesBehindLatest" : "80.0"
},
} ]
}
使用 SSL 將 Azure Databricks 連線至 Kafka
若要啟用用於 Kafka 的 SSL connections,請遵循 Confluent 文件中的指示,SSL 加密與驗證。 您可以提供此處所述的組態 (以 kafka.
為首碼) 作為選項。 例如,您可以在屬性 kafka.ssl.truststore.location
中指定信任存放區位置。
Databricks 建議您:
- 將您的憑證儲存在雲端物件儲存體中。 您可以將對憑證的存取權限制為僅可以存取 Kafka 的叢集。 請參閱 使用 Unity 進行的數據控管 Catalog。
- 將憑證密碼儲存為祕密範圍中的祕密。
下列範例使用物件儲存位置和 Databricks 祕密來啟用 SSL 連線:
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", ...)
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.ssl.truststore.location", <truststore-location>)
.option("kafka.ssl.keystore.location", <keystore-location>)
.option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
.option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)
將 HDInsight 上的 Kafka 連線至 Azure Databricks
建立 HDInsight Kafka 叢集。
如需相關指示,請參閱透過 Azure 虛擬網路連線至 HDInsight 上的 Kafka。
設定 Kafka 訊息代理程式來公告正確的位址。
請遵循設定 Kafka 進行 IP 公告中的指示。 如果您在 Azure 虛擬機上自行管理 Kafka,請確定代理程式
advertised.listeners
組態 set 主機的內部 IP。建立 Azure Databricks 叢集。
將 Kafka 叢集對等互連至 Azure Databricks 叢集。
請遵循對等互連虛擬網路中的指示。
使用 Microsoft Entra ID 和 Azure 事件中樞進行服務主體驗證
Azure Databricks 支援使用事件中樞服務對 Spark 作業進行驗證。 此驗證是透過具有 Microsoft Entra ID 的 OAuth 來完成。
Azure Databricks 支援在下列計算環境中使用用戶端識別碼和祕密進行 Microsoft Entra ID 驗證:
- 設定了單一使用者存取模式的計算上的 Databricks Runtime 12.2 LTS 和更新版本。
- 設定了共用存取模式的計算上的 Databricks Runtime 14.3 LTS 和更新版本。
- 未使用 Unity Catalog設定的 Delta Live Tables 管線。
Azure Databricks 不支援在任何計算環境或使用 Unity Catalog設定的 Delta Live Tables 管線中,使用憑證進行 Microsoft Entra ID 驗證。
此驗證不適用於共享叢集或 Unity Catalog Delta Live Tables。
設定結構化串流 Kafka 連接器
若要使用 Microsoft Entra ID 執行驗證,您需要下列 values:
租用戶識別碼。 您可以在 [Microsoft Entra ID] 服務索引標籤中找到此項。
clientID (也稱為應用程式識別碼)。
用戶端密碼。 在您具有此密碼後,應將其作為祕密新增至 Databricks 工作區。 若要新增此祕密,請參閱祕密管理。
EventHubs 主題。 您可以在特定的 事件中樞命名空間 頁面的 [實體] 區段下的 [事件中樞] 區段中找到 list 個主題。 若要使用多個主題,您可以在事件中樞層級 set IAM 角色。
EventHubs 伺服器。 您可以在特定 [事件中樞命名空間] 的概觀頁面上找到此項:
此外,若要使用 Entra ID,我們需要告知 Kafka 使用 OAuth SASL 機制 (SASL 是一般通訊協定,而 OAuth 是 SASL「機制」的一種類型):
-
kafka.security.protocol
應為SASL_SSL
-
kafka.sasl.mechanism
應為OAUTHBEARER
-
kafka.sasl.login.callback.handler.class
應是 Java 類別的完整名稱,值為kafkashaded
表示遮蔽 Kafka 類別的登入回撥處理常式。 如需確切的類別,請參閱下列範例。
範例
接下來,讓我們看看執行中的範例:
Python
# This is the only section you need to modify for auth purposes!
# ------------------------------
tenant_id = "..."
client_id = "..."
client_secret = dbutils.secrets.get("your-scope", "your-secret-name")
event_hubs_server = "..."
event_hubs_topic = "..."
# -------------------------------
sasl_config = f'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="{client_id}" clientSecret="{client_secret}" scope="https://{event_hubs_server}/.default" ssl.protocol="SSL";'
kafka_options = {
# Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers": f"{event_hubs_server}:9093",
"kafka.sasl.jaas.config": sasl_config,
"kafka.sasl.oauthbearer.token.endpoint.url": f"https://login.microsoft.com/{tenant_id}/oauth2/v2.0/token",
"subscribe": event_hubs_topic,
# You should not need to modify these
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class": "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
}
df = spark.readStream.format("kafka").options(**kafka_options)
display(df)
Scala
// This is the only section you need to modify for auth purposes!
// -------------------------------
val tenantId = "..."
val clientId = "..."
val clientSecret = dbutils.secrets.get("your-scope", "your-secret-name")
val eventHubsServer = "..."
val eventHubsTopic = "..."
// -------------------------------
val saslConfig = s"""kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="$clientId" clientSecret="$clientSecret" scope="https://$eventHubsServer/.default" ssl.protocol="SSL";"""
val kafkaOptions = Map(
// Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers" -> s"$eventHubsServer:9093",
"kafka.sasl.jaas.config" -> saslConfig,
"kafka.sasl.oauthbearer.token.endpoint.url" -> s"https://login.microsoft.com/$tenantId/oauth2/v2.0/token",
"subscribe" -> eventHubsTopic,
// You should not need to modify these
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class" -> "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
)
val scalaDF = spark.readStream
.format("kafka")
.options(kafkaOptions)
.load()
display(scalaDF)
處理潛在錯誤
不支援串流選項。
如果您嘗試在以 Unity 設定的 Delta Live 管線 Tables 中使用此驗證機制,您可能會收到下列錯誤 Catalog:
若要解決此錯誤,請使用支援的計算組態。 請參閱使用 Microsoft Entra ID 和 Azure 事件中樞進行服務主體驗證。
無法建立新的
KafkaAdminClient
。此為內部錯誤,在下列任何驗證選項不正確時 Kafka 都會擲回此錯誤:
- 用戶端識別碼 (也稱為應用程式識別碼)
- 租用戶識別碼
- EventHubs 伺服器
若要解決錯誤,請確認這些選項的 values 正確無誤。
此外,如果您修改範例中預設提供的組態選項 (系統要求您不修改),例如
kafka.security.protocol
,您可能會看到此錯誤。未傳回任何記錄
如果嘗試顯示或處理 DataFrame 但未取得結果,您將在 UI 中看到下列內容。
此訊息表示驗證成功,但 EventHubs 未傳回任何資料。 一些可能的 (儘管並不詳盡) 原因包括:
- 指定的 EventHubs 主題錯誤。
-
startingOffsets
的預設 Kafka 組態選項是latest
,而且目前未透過主題收到任何資料。 您可以 setstartingOffsetstoearliest
,從 Kafka 的最早位移開始讀取數據。