什麼是可設定狀態的串流?
具可設定狀態的結構化串流查詢需要中繼狀態資訊的累加式更新,而無狀態結構化串流查詢只會追蹤關於從源頭到接收器已處理的資料行資訊。
可設定狀態的作業包括串流彙總、串流 dropDuplicates
、串流至串流連接、 mapGroupsWithState
和 flatMapGroupsWithState
。
具可設定狀態的結構化串流查詢所需的中間狀態資訊,若未正確設定,可能會導致非預期的延遲和生產問題。
在 Databricks Runtime 13.3 LTS 和更新版本中,您可以啟用變更記錄檢查點,以降低結構化串流工作負載的檢查點持續時間和端對端延遲。 Databricks 建議為所有結構化串流具狀態的查詢,啟用變更記錄檢查點。 請參閱 <啟用變更記錄檢查點>。
Optimize 有狀態的結構化串流查詢
管理具可設定狀態的結構化串流查詢的中間狀態資訊,有助於防止非預期的延遲和生產問題。
Databricks 建議:
- 使用計算最佳化的執行個體作為背景工作角色。
- Set 將洗牌分割區的數量設置為叢集核心數目的 1 到 2 倍。
-
Set 要在 SparkSession 中
false
spark.sql.streaming.noDataMicroBatches.enabled
組態。 這可防止串流微批次引擎處理不包含資料的微批次。 另請注意,將此組態設定為false
可能會導致具狀態的操作利用浮水印或處理時間逾時,直到有新的數據到達才會 get 數據輸出,而不是立即輸出。
Databricks 建議使用 RocksDB 搭配變更記錄檢查點來管理可設定狀態的串流的狀態。 請參閱 《在 Azure Databricks 設定 RocksDB 狀態存放區》。
注意
查詢重新啟動之間無法更改狀態管理配置。 也就是說,如果查詢已使用預設管理開始,若未從頭開始使用新的檢查點位置啟動查詢,就無法變更它。
在結構化串流中使用多個可設定狀態的運算符
在 Databricks Runtime 13.3 LTS 和更新版本中,Azure Databricks 提供結構化串流工作負載中可設定狀態的運算符的進階支援。 您現在可以將多個具狀態運算子鏈結在一起,這表示您可以將視窗化匯總等作業的輸出饋送至另一個具狀態作業,例如 join。
下列範例示範您可以使用的數種模式。
重要
使用多個可設定狀態的運算符時,存在下列限制:
- 不支援
FlatMapGroupWithState
。 - 僅支援附加輸出模式。
鏈結時間 window 聚合
Python
words = ... # streaming DataFrame of schema { timestamp: Timestamp, word: String }
# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
window(words.timestamp, "10 minutes", "5 minutes"),
words.word
).count()
# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
window(window_time(windowedCounts.window), "1 hour"),
windowedCounts.word
).count()
Scala
import spark.implicits._
val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
window($"timestamp", "10 minutes", "5 minutes"),
$"word"
).count()
// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
window($"window", "1 hour"),
$"word"
).count()
於兩個不同數據流中進行時間 window 的匯總,接著進行數據流 window和join 的串流串接。
Python
clicksWindow = clicksWithWatermark.groupBy(
clicksWithWatermark.clickAdId,
window(clicksWithWatermark.clickTime, "1 hour")
).count()
impressionsWindow = impressionsWithWatermark.groupBy(
impressionsWithWatermark.impressionAdId,
window(impressionsWithWatermark.impressionTime, "1 hour")
).count()
clicksWindow.join(impressionsWindow, "window", "inner")
Scala
val clicksWindow = clicksWithWatermark
.groupBy(window("clickTime", "1 hour"))
.count()
val impressionsWindow = impressionsWithWatermark
.groupBy(window("impressionTime", "1 hour"))
.count()
clicksWindow.join(impressionsWindow, "window", "inner")
串流時間間隔 join 後,緊接著進行時間 window 的匯總
Python
joined = impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
"leftOuter" # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)
joined.groupBy(
joined.clickAdId,
window(joined.clickTime, "1 hour")
).count()
Scala
val joined = impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
joinType = "leftOuter" // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)
joined
.groupBy($"clickAdId", window($"clickTime", "1 hour"))
.count()
結構化串流的狀態重新平衡
在 Delta Live Tables中,所有串流工作負載預設會啟用狀態平衡調整。 在 Databricks Runtime 11.3 LTS 和更新版本中,您可以在 Spark 叢集設定中 set 下列組態選項,以啟用狀態重新平衡:
spark.sql.streaming.statefulOperator.stateRebalancing.enabled true
狀態重新平衡有利於進行叢集調整大小事件的可設定狀態的結構化串流管線。 不論叢集大小變更為何,無狀態串流運算都不會受益。
注意
在縮小結構化串流工作負載的叢集大小時,計算自動調整有其限制。 Databricks 建議使用 Delta Live Tables,搭配增強的自動調整功能來處理串流工作負載。 請參閱增強型自動調整的Delta Live
叢集調整大小事件會導致狀態重新平衡觸發。 在重新平衡事件期間,微批次可能會有較高的延遲,因為狀態會從雲端儲存載入到新的執行程式。
為 mapGroupsWithState
指定初始狀態
您可以使用 flatMapGroupsWithState
或 mapGroupsWithState
,為結構化串流可設定狀態的處理指定使用者定義的初始狀態。 這可讓您避免在沒有有效檢查點的情況下啟動可設定狀態的串流時重新處理資料。
def mapGroupsWithState[S: Encoder, U: Encoder](
timeoutConf: GroupStateTimeout,
initialState: KeyValueGroupedDataset[K, S])(
func: (K, Iterator[V], GroupState[S]) => U): Dataset[U]
def flatMapGroupsWithState[S: Encoder, U: Encoder](
outputMode: OutputMode,
timeoutConf: GroupStateTimeout,
initialState: KeyValueGroupedDataset[K, S])(
func: (K, Iterator[V], GroupState[S]) => Iterator[U])
指定 flatMapGroupsWithState
運算符初始狀態的範例使用案例:
val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
state.update(new RunningCount(count))
Iterator((key, count.toString))
}
val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
("apple", new RunningCount(1)),
("orange", new RunningCount(2)),
("mango", new RunningCount(5)),
).toDS()
val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)
fruitStream
.groupByKey(x => x)
.flatMapGroupsWithState(Update, GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)
指定 mapGroupsWithState
運算符初始狀態的範例使用案例:
val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
state.update(new RunningCount(count))
(key, count.toString)
}
val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
("apple", new RunningCount(1)),
("orange", new RunningCount(2)),
("mango", new RunningCount(5)),
).toDS()
val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)
fruitStream
.groupByKey(x => x)
.mapGroupsWithState(GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)
測試 mapGroupsWithState
update 函式
TestGroupState
API 可讓您測試用於 Dataset.groupByKey(...).mapGroupsWithState(...)
和 Dataset.groupByKey(...).flatMapGroupsWithState(...)
的狀態 update 函式。
狀態 update 函式會使用類型為 GroupState
的物件,將先前的狀態當做輸入。 請參閱 Apache Spark GroupState 參考文件。 例如:
import org.apache.spark.sql.streaming._
import org.apache.spark.api.java.Optional
test("flatMapGroupsWithState's state update function") {
var prevState = TestGroupState.create[UserStatus](
optionalState = Optional.empty[UserStatus],
timeoutConf = GroupStateTimeout.EventTimeTimeout,
batchProcessingTimeMs = 1L,
eventTimeWatermarkMs = Optional.of(1L),
hasTimedOut = false)
val userId: String = ...
val actions: Iterator[UserAction] = ...
assert(!prevState.hasUpdated)
updateState(userId, actions, prevState)
assert(prevState.hasUpdated)
}