メインコンテンツまでスキップ

RocksDBで 状態ストアを構成するDatabricks

RocksDB ベースの状態管理を有効にするには、 SparkSession をストリーミングクエリを開始する前に呼び出します。

Scala
spark.conf.set(
"spark.sql.streaming.stateStore.providerClass",
"com.databricks.sql.streaming.state.RocksDBStateStoreProvider")

DLT パイプラインで RocksDB を有効にできます。「ステートフルな処理のためのパイプライン設定の最適化」を参照してください。

変更履歴のチェックポイントを有効にする

Databricks Runtime 13.3 LTS 以降では、変更ログのチェックポイント処理を有効にして、構造化ストリーミング ワークロードのチェックポイントの期間とエンド ツー エンドの待機時間を短縮できます。 Databricks では、すべての構造化ストリーミング ステートフル クエリに対して changelog のチェックポイント設定を有効にすることをお勧めします。

従来 RocksDB 状態ストア スナップショットを使用し、チェックポイント処理中にデータ ファイルをアップロードします。 このコストを回避するために、changelog のチェックポイントは、最後のチェックポイント以降に変更されたレコードのみを耐久性のあるストレージに書き込みます。

Changelog のチェックポイントはデフォルトで無効になっています。 次の構文を使用して、SparkSession レベルで changelog のチェックポイント処理を有効にできます。

Scala
spark.conf.set(
"spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")

既存のストリームで changelog のチェックポイント設定を有効にし、チェックポイントに格納された状態情報を維持できます。

important

changelog のチェックポイント処理が有効になっているクエリは、Databricks Runtime 13.3 LTS 以降でのみ実行できます。 変更ログのチェックポイント処理を無効にして、従来のチェックポイント処理動作に戻すことができますが、これらのクエリは Databricks Runtime 13.3 LTS 以降で引き続き実行する必要があります。 これらの変更を有効にするには、ジョブを再始動する必要があります。

RocksDB 状態ストア メトリクス

各状態演算子は、 RocksDB インスタンスで実行された状態管理操作に関連するメトリクスを収集して、状態ストアを観察し、デバッグ ジョブの速度低下に役立つ可能性があります。

特定の状態ストア インスタンスのメトリクスには、パーティション ID とストア名でラベルが付けられ、それらが分離されたままであることを保証します。 他のすべてのメトリクスは、状態演算子が実行されているすべてのタスクのジョブ内の状態演算子ごとに集計 (合計) されます。

これらのメトリクスは、StreamingQueryProgressstateOperators フィールド内のcustomMetricsマップの一部です。以下は、JSON 形式での StreamingQueryProgress の例です ( StreamingQueryProgress.json()を使用して取得)。

JSON
{
"id": "6774075e-8869-454b-ad51-513be86cfd43",
"runId": "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
"batchId": 7,
"stateOperators": [
{
"numRowsTotal": 20000000,
"numRowsUpdated": 20000000,
"memoryUsedBytes": 31005397,
"numRowsDroppedByWatermark": 0,
"customMetrics": {
"SnapshotLastUploaded.partition_0_default": 7,
"SnapshotLastUploaded.partition_1_default": 7,
"SnapshotLastUploaded.partition_2_default": 6,
"SnapshotLastUploaded.partition_3_default": 6,
"SnapshotLastUploaded.partition_4_default": -1,
"rocksdbBytesCopied": 141037747,
"rocksdbCommitCheckpointLatency": 2,
"rocksdbCommitCompactLatency": 22061,
"rocksdbCommitFileSyncLatencyMs": 1710,
"rocksdbCommitFlushLatency": 19032,
"rocksdbCommitPauseLatency": 0,
"rocksdbCommitWriteBatchLatency": 56155,
"rocksdbFilesCopied": 2,
"rocksdbFilesReused": 0,
"rocksdbGetCount": 40000000,
"rocksdbGetLatency": 21834,
"rocksdbPutCount": 1,
"rocksdbPutLatency": 56155599000,
"rocksdbReadBlockCacheHitCount": 1988,
"rocksdbReadBlockCacheMissCount": 40341617,
"rocksdbSstFileSize": 141037747,
"rocksdbTotalBytesReadByCompaction": 336853375,
"rocksdbTotalBytesReadByGet": 680000000,
"rocksdbTotalBytesReadThroughIterator": 0,
"rocksdbTotalBytesWrittenByCompaction": 141037747,
"rocksdbTotalBytesWrittenByPut": 740000012,
"rocksdbTotalCompactionLatencyMs": 21949695000,
"rocksdbWriterStallLatencyMs": 0,
"rocksdbZipFileBytesUncompressed": 7038
}
}
],
"sources": [{}],
"sink": {}
}

メトリクスの詳細な説明は次のとおりです。

メトリクス名

説明

rocksdbCommitWriteBatchLatency

インメモリ構造 (WriteBatch) のステージングされた書き込みをネイティブ RocksDB に適用するのにかかった時間 (ミリ秒単位) 。

rocksdbコミットフラッシュレイテンシ

RocksDB のメモリ内変更をローカル ディスクにフラッシュするのにかかった時間 (ミリ単位) です。

rocksdbCommitコンパクトレイテンシ

チェックポイントのコミット中に圧縮 (オプション) にかかった時間 (ミリ秒単位)。

rocksdbコミット一時停止レイテンシ

チェックポイントコミットの一部としてバックグラウンドワーカースレッド(圧縮など)を停止するのにかかった時間(ミリ秒単位)。

RocksDBはチェックポイント・レイテンシーをコミットします

ネイティブRocksDBのスナップショットを撮り、それをローカルディレクトリに書き込むのにかかった時間(ミリ単位)。

rocksdbCommitFileSyncLatencyMs さん

ネイティブの RocksDB スナップショット関連ファイルを外部ストレージ (チェックポイントの場所) に同期するのにかかった時間 (ミリ秒単位)。

rocksdbのGetLatency

基になるネイティブ RocksDB::Get 呼び出しごとにかかった平均時間 (ナノ秒単位)。

rocksdbPutCount さん

基になるネイティブ RocksDB::Put 呼び出しごとにかかった平均時間 (ナノ秒単位)。

rocksdbのGetCount

ネイティブ RocksDB::Get 呼び出しの数 (WriteBatch からの Gets は含まれません - ステージング書き込みに使用されるメモリ内バッチ)。

rocksdbPutCount さん

ネイティブ RocksDB::Put 呼び出しの数 (WriteBatch への Puts (ステージング書き込みに使用されるメモリ内バッチ) は含まれません)。

GETによるRocksDBの合計読み取りバイト数

ネイティブ RocksDB::Get 呼び出しによって読み取られた非圧縮バイトの数。

RocksDBのPUTによって書き込まれた合計バイト数

ネイティブ RocksDB::Put 呼び出しによって書き込まれた非圧縮バイトの数。

rocksdbReadBlockCacheHitCount (英語)

ネイティブの RocksDB ブロックキャッシュを使用して、ローカルディスクからのデータの読み取りを回避した回数。

rocksdbReadBlockCacheMissCount さん

ネイティブの RocksDB ブロック キャッシュが欠落し、ローカル ディスクからのデータの読み取りが必要になった回数。

rocksdbTotalBytesReadByCompaction (英語)

ネイティブの RocksDB コンパクション・プロセスによってローカル・ディスクから読み取られたバイト数。

rocksdbTotalBytesWrittenByCompaction (英語)

ネイティブの RocksDB コンパクション・プロセスによってローカル・ディスクに書き込まれたバイト数。

rocksdbTotalCompactionLatencyMs (英語)

RocksDBのコンパクションにかかった時間(ミリ秒単位)(バックグラウンドとコミット中に開始されたオプションのコンパクションの両方)。

RocksDBライターストールレイテンシム

ライターがバックグラウンドでの圧縮またはメモリテーブルのディスクへのフラッシュのために停止した時間 (ミリ秒単位)。

rocksdbTotalBytesReadThroughIterator (英語)

一部のステートフル操作 ( flatMapGroupsWithState でのタイムアウト処理やウィンドウ集計でのウォーターマークなど) では、イテレータを使用して DB 内のデータ全体を読み取る必要があります。 イテレータを使用して読み取られた非圧縮データの合計サイズ。