Configurer un magasin d’état RocksDB sur Azure Databricks
Vous pouvez activer la gestion d'état basée sur RocksDB en définissant la configuration suivante dans la SparkSession avant de démarrer la requête de streaming.
spark.conf.set(
"spark.sql.streaming.stateStore.providerClass",
"com.databricks.sql.streaming.state.RocksDBStateStoreProvider")
Vous pouvez activer RocksDB sur les pipelines Delta Live Tables. Consultez Optimiser la configuration du pipeline pour le traitement avec état.
Activer le point de contrôle du journal des modifications
Dans Databricks Runtime 13.3 LTS et versions ultérieures, vous pouvez activer le point de contrôle du journal des modifications pour réduire la durée des points de contrôle et la latence de bout en bout pour des charges de travail de flux structuré. Databricks recommande d’activer le point de contrôle du journal des modifications pour toutes les requêtes avec état Flux structuré.
Traditionnellement, le magasin d’état RocksDB capture des instantanés et charge les fichiers de données pendant les points de contrôle. Pour éviter ce coût, le point de contrôle du journal des modifications écrit uniquement les enregistrements qui ont été modifiés depuis le dernier point de contrôle dans le stockage durable. »
Le point de contrôle du journal des modifications est désactivé par défaut. Vous pouvez activer le point de contrôle du journal des modifications au niveau SparkSession à l’aide de la syntaxe suivante :
spark.conf.set(
"spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")
Vous pouvez activer le point de contrôle du journal des modifications sur un flux existant et conserver les informations d’état stockées dans le point de contrôle.
Important
Les requêtes qui ont activé le point de contrôle du journal des modifications ne peuvent être exécutées que sur Databricks Runtime 13.3 LTS et versions ultérieures. Vous pouvez désactiver les points de contrôle du journal des modifications pour revenir au comportement de point de contrôle hérité, mais vous devez continuer à exécuter ces requêtes sur Databricks Runtime 13.3 LTS ou version ultérieure. Vous devez redémarrer le travail pour que ces modifications se produisent.
Métriques du magasin d’état RocksDB
Chaque opérateur d’état collecte des métriques liées aux opérations de gestion d’état effectuées sur son instance RocksDB pour observer le magasin d’état, voire aider à déboguer la lenteur d’un travail. Ces métriques sont agrégées (sum) par un opérateur d’état dans le travail sur toutes les tâches où l’opérateur d’état est actif. Ces métriques font partie du mappage customMetrics
dans les champs stateOperators
de StreamingQueryProgress
. Voici un exemple de StreamingQueryProgress
sous forme de JSON (obtenu à l’aide de StreamingQueryProgress.json()
).
{
"id" : "6774075e-8869-454b-ad51-513be86cfd43",
"runId" : "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
"batchId" : 7,
"stateOperators" : [ {
"numRowsTotal" : 20000000,
"numRowsUpdated" : 20000000,
"memoryUsedBytes" : 31005397,
"numRowsDroppedByWatermark" : 0,
"customMetrics" : {
"rocksdbBytesCopied" : 141037747,
"rocksdbCommitCheckpointLatency" : 2,
"rocksdbCommitCompactLatency" : 22061,
"rocksdbCommitFileSyncLatencyMs" : 1710,
"rocksdbCommitFlushLatency" : 19032,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 56155,
"rocksdbFilesCopied" : 2,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 40000000,
"rocksdbGetLatency" : 21834,
"rocksdbPutCount" : 1,
"rocksdbPutLatency" : 56155599000,
"rocksdbReadBlockCacheHitCount" : 1988,
"rocksdbReadBlockCacheMissCount" : 40341617,
"rocksdbSstFileSize" : 141037747,
"rocksdbTotalBytesReadByCompaction" : 336853375,
"rocksdbTotalBytesReadByGet" : 680000000,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 141037747,
"rocksdbTotalBytesWrittenByPut" : 740000012,
"rocksdbTotalCompactionLatencyMs" : 21949695000,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 7038
}
} ],
"sources" : [ {
} ],
"sink" : {
}
}
Les descriptions détaillées des métriques sont les suivantes :
Nom de métrique | Description |
---|---|
rocksdbCommitWriteBatchLatency | Temps (en millisecondes) pris pour appliquer les écritures intermédiaires dans la structure en mémoire (WriteBatch) au RocksDB natif. |
rocksdbCommitFlushLatency | Temps (en millisecondes) pris pour vider les modifications en mémoire de RocksDB sur le disque local. |
rocksdbCommitCompactLatency | Temps (en millisecondes) pris pour le compactage (facultatif) au cours de la validation du point de contrôle. |
rocksdbCommitPauseLatency | Temps (en millisecondes) pris pour l’arrêt des threads de travail en arrière-plan (pour le compactage, etc.) dans le cadre de la validation du point de contrôle. |
rocksdbCommitCheckpointLatency | Temps (en millisecondes) pris pour prendre un instantané de RocksDB natif et l’écrire dans un répertoire local. |
rocksdbCommitFileSyncLatencyMs | Temps (en millisecondes) pris pour synchroniser les fichiers associés à l’instantané RocksDB natif sur un stockage externe (emplacement du point de contrôle). |
rocksdbGetLatency | Temps moyen (en nanosecondes) pris par l’appel RocksDB::Get natif sous-jacent. |
rocksdbPutCount | Temps moyen (en nanosecondes) pris par l’appel RocksDB::Put natif sous-jacent. |
rocksdbGetCount | Nombre d’appels natifs RocksDB::Get (n’inclut les Gets de WriteBatch - lot en mémoire utilisé pour les écritures intermédiaires). |
rocksdbPutCount | Nombre d’appels natifs RocksDB::Put (n’inclut les Puts vers WriteBatch - lot en mémoire utilisé pour les écritures intermédiaires). |
rocksdbTotalBytesReadByGet | Nombre d’octets non compressés lus via des appels RocksDB::Get natifs. |
rocksdbTotalBytesWrittenByPut | Nombre d’octets non compressés écrits via des appels RocksDB::Put natifs. |
rocksdbReadBlockCacheHitCount | Nombre de fois que le cache de bloc RocksDB natif est utilisé pour éviter une lecture des données à partir du disque local. |
rocksdbReadBlockCacheMissCount | Nombre de fois que le cache de bloc RocksDB natif a manqué et requis une lecture de données à partir du disque local. |
rocksdbTotalBytesReadByCompaction | Nombre d’octets lus à partir du disque local par le processus de compactage RocksDB natif. |
rocksdbTotalBytesWrittenByCompaction | Nombre d’octets écrits sur le disque local par le processus de compactage RocksDB natif. |
rocksdbTotalCompactionLatencyMs | Temps (en millisecondes) pris pour les compactages RocksDB (compactage en arrière-plan et compactage facultatif initié lors de la validation). |
rocksdbWriterStallLatencyMs | Temps (en millisecondes) pendant lequel l’enregistreur est resté bloqué en raison du compactage en arrière-plan ou du vidage des memtables sur le disque. |
rocksdbTotalBytesReadThroughIterator | Certaines opérations avec état (telles que le traitement du délai d’expiration dans flatMapGroupsWithState ou la limitation dans les agrégations fenêtrées) requièrent la lecture de données entières dans DB via un itérateur. Taille totale des données décompressées lues à l’aide de l’itérateur. |