Freigeben über


Konfigurieren des RocksDB-Statusspeichers auf Azure Databricks

Sie können die RocksDB-basierte Zustandsverwaltung aktivieren, indem Sie die folgende Konfiguration in der SparkSession-Instanz festlegen, bevor Sie die Streamingabfrage starten.

spark.conf.set(
  "spark.sql.streaming.stateStore.providerClass",
  "com.databricks.sql.streaming.state.RocksDBStateStoreProvider")

Sie können RocksDB für Delta Live Tables-Pipelines aktivieren. Weitere Informationen finden Sie unter Aktivieren des RocksDB-Zustandsspeichers für Delta Live Tables.

Aktivieren der Änderungsprotokollprüfpunkte

In Databricks Runtime 13.3 LTS und höher können Sie Prüfpunkte im Änderungsprotokoll aktivieren, um die Prüfpunktdauer und End-to-End-Latenz für strukturierte Streamingworkloads zu verringern. Databricks empfiehlt, Änderungsprotokollprüfpunkte für alle zustandsbehafteten Abfragen von strukturiertem Streaming zu aktivieren.

Normalerweise werden Momentaufnahmen des RocksDB-Zustandsspeichers während des Prüfpunkts erstellt und Datendateien hochgeladen. Um diese Kosten zu vermeiden, schreiben Änderungsprotokollprüfpunkte nur Datensätze, die sich seit dem letzten Prüfpunkt geändert haben, in dauerhaften Speicher."

Änderungsprotokollprüfpunkte sind standardmäßig deaktiviert. Sie können Änderungsprotokollprüfpunkte in der SparkSession-Ebene mit der folgenden Syntax aktivieren:

spark.conf.set(
  "spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")

Sie können die Änderungsprotokollprüfpunkte für einen vorhandenen Stream aktivieren und die im Prüfpunkt gespeicherten Zustandsinformationen verwalten.

Wichtig

Abfragen mit aktivierten Prüfpunkten im Änderungsprotokoll, können nur unter Databricks Runtime 13.3 LTS und höher ausgeführt werden. Sie können Prüfpunkte im Änderungsprotokoll deaktivieren, um das Legacyverhalten von Prüfpunkten wiederherzustellen. Sie müssen diese Abfragen jedoch trotzdem in Databricks Runtime 13.3 LTS oder höher ausführen. Sie müssen den Auftrag neu starten, damit diese Änderungen vorgenommen werden.

Metriken des RocksDB-Zustandsspeichers

Jeder Zustandsoperator erfasst Metriken im Zusammenhang mit den Zustandsverwaltungsvorgängen, die auf seiner RocksDB-Instanz ausgeführt werden, um den Zustandsspeicher zu beobachten und möglicherweise bei der Behebung einer langsamen Auftragsausführung zu helfen. Diese Metriken werden pro Zustandsoperator im Auftrag für alle Aufgaben aggregiert (Summe), bei denen der Zustandsoperator ausgeführt wird. Diese Metriken sind Teil der customMetrics-Zuordnung in den stateOperators-Feldern in StreamingQueryProgress. Nachfolgend sehen Sie ein Beispiel für StreamingQueryProgress im JSON-Format (mit StreamingQueryProgress.json() abgerufen).

{
  "id" : "6774075e-8869-454b-ad51-513be86cfd43",
  "runId" : "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
  "batchId" : 7,
  "stateOperators" : [ {
    "numRowsTotal" : 20000000,
    "numRowsUpdated" : 20000000,
    "memoryUsedBytes" : 31005397,
    "numRowsDroppedByWatermark" : 0,
    "customMetrics" : {
      "rocksdbBytesCopied" : 141037747,
      "rocksdbCommitCheckpointLatency" : 2,
      "rocksdbCommitCompactLatency" : 22061,
      "rocksdbCommitFileSyncLatencyMs" : 1710,
      "rocksdbCommitFlushLatency" : 19032,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 56155,
      "rocksdbFilesCopied" : 2,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 40000000,
      "rocksdbGetLatency" : 21834,
      "rocksdbPutCount" : 1,
      "rocksdbPutLatency" : 56155599000,
      "rocksdbReadBlockCacheHitCount" : 1988,
      "rocksdbReadBlockCacheMissCount" : 40341617,
      "rocksdbSstFileSize" : 141037747,
      "rocksdbTotalBytesReadByCompaction" : 336853375,
      "rocksdbTotalBytesReadByGet" : 680000000,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 141037747,
      "rocksdbTotalBytesWrittenByPut" : 740000012,
      "rocksdbTotalCompactionLatencyMs" : 21949695000,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 7038
    }
  } ],
  "sources" : [ {
  } ],
  "sink" : {
  }
}

Ausführliche Beschreibungen der Metriken:

Metrikname BESCHREIBUNG
rocksdbCommitWriteBatchLatency Zeit (in Millisekunden) zum Anwenden der Stagingschreibvorgänge in der In-Memory-Struktur (WriteBatch) auf die native RocksDB-Instanz.
rocksdbCommitFlushLatency Zeit (in Millisekunden) für das Leeren der In-Memory-Änderungen in RocksDB auf den lokalen Datenträger.
rocksdbCommitCompactLatency Zeit (in Millisekunden) für die Komprimierung (optional) während des Prüfpunktcommits.
rocksdbCommitPauseLatency Zeit (in Millisekunden) zum Beenden der Arbeitsthreads im Hintergrund (für Komprimierung usw.) als Teil des Prüfpunkt-Commits.
rocksdbCommitCheckpointLatency Zeit (in Millisekunden) für das Erstellen einer Momentaufnahme der nativen RocksDB-Instanz und das Schreiben in ein lokales Verzeichnis.
rocksdbCommitFileSyncLatencyMs Zeit (in Millisekunden) für die Synchronisierung der Dateien für die Momentaufnahme der nativen RocksDB-Instanz mit einem externen Speicher (Prüfpunktspeicherort).
rocksdbGetLatency Durchschnittliche Zeit (in Nanosekunden) pro zugrunde liegendem nativen RocksDB::Get-Aufruf.
rocksdbPutCount Durchschnittliche Zeit (in Nanosekunden) pro zugrunde liegendem nativen RocksDB::Put-Aufruf.
rocksdbGetCount Anzahl nativer RocksDB::Get-Aufrufe (ohne Gets aus WriteBatch – In-Memory-Batch für Stagingschreibvorgänge verwendet).
rocksdbPutCount Anzahl nativer RocksDB::Put-Aufrufe (ohne Puts in WriteBatch – In-Memory-Batch für Stagingschreibvorgänge verwendet).
rocksdbTotalBytesReadByGet Anzahl unkomprimierter Bytes, die durch native RocksDB::Get-Aufrufe gelesen wurden.
rocksdbTotalBytesWrittenByPut Anzahl unkomprimierter Bytes, die durch native RocksDB::Put-Aufrufe geschrieben wurden.
rocksdbReadBlockCacheHitCount Anzahl, wie oft der native RocksDB-Blockcache verwendet wird, um das Lesen von Daten vom lokalen Datenträger zu vermeiden.
rocksdbReadBlockCacheMissCount Anzahl, wie oft der native RocksDB-Blockcache nicht funktioniert hat und Daten vom lokalen Datenträger gelesen werden mussten.
rocksdbTotalBytesReadByCompaction Anzahl der Bytes, die durch den nativen RocksDB-Komprimierungsprozess vom lokalen Datenträger gelesen wurden.
rocksdbTotalBytesWrittenByCompaction Anzahl der Bytes, die durch den nativen RocksDB-Komprimierungsprozess auf den lokalen Datenträger geschrieben wurden.
rocksdbTotalCompactionLatencyMs Zeit (in Millisekunden) für RocksDB-Komprimierungen (sowohl im Hintergrund als auch optionale Komprimierung, die während des Commits initiiert wurde).
rocksdbWriterStallLatencyMs Zeit (in Millisekunden), über die der Writer aufgrund einer Komprimierung im Hintergrund oder zum Leeren der Memtables auf den Datenträger angehalten hat.
rocksdbTotalBytesReadThroughIterator Einige zustandsbehaftete Vorgänge (z. B. Timeoutverarbeitung in flatMapGroupsWithState oder Wasserzeichen für Aggregationen im Fenstermodus) erfordern das Lesen der gesamten Daten in der Datenbank über Iterator. Die Gesamtgröße der mit dem Iterator gelesenen unkomprimierten Daten.