RocksDBで 状態ストアを構成するDatabricks
RocksDB ベースの状態管理を有効にするには、 SparkSession をストリーミングクエリを開始する前に呼び出します。
spark.conf.set(
"spark.sql.streaming.stateStore.providerClass",
"com.databricks.sql.streaming.state.RocksDBStateStoreProvider")
DLT パイプラインで RocksDB を有効にできます。「ステートフルな処理のためのパイプライン設定の最適化」を参照してください。
変更履歴のチェックポイントを有効にする
Databricks Runtime 13.3 LTS 以降では、変更ログのチェックポイント処理を有効にして、構造化ストリーミング ワークロードのチェックポイントの期間とエンド ツー エンドの待機時間を短縮できます。 Databricks では、すべての構造化ストリーミング ステートフル クエリに対して changelog のチェックポイント設定を有効にすることをお勧めします。
従来 RocksDB 状態ストア スナップショットを使用し、チェックポイント処理中にデータ ファイルをアップロードします。 このコストを回避するために、changelog のチェックポイントは、最後のチェックポイント以降に変更されたレコードのみを耐久性のあるストレージに書き込みます。
Changelog のチェックポイントはデフォルトで無効になっています。 次の構文を使用して、SparkSession レベルで changelog のチェックポイント処理を有効にできます。
spark.conf.set(
"spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")
既存のストリームで changelog のチェックポイント設定を有効にし、チェックポイントに格納された状態情報を維持できます。
changelog のチェックポイント処理が有効になっているクエリは、Databricks Runtime 13.3 LTS 以降でのみ実行できます。 変更ログのチェックポイント処理を無効にして、従来のチェックポイント処理動作に戻すことができますが、これらのクエリは Databricks Runtime 13.3 LTS 以降で引き続き実行する必要があります。 これらの変更を有効にするには、ジョブを再始動する必要があります。
RocksDB 状態ストア メトリクス
各状態演算子は、 RocksDB インスタンスで実行された状態管理操作に関連するメトリクスを収集して、状態ストアを観察し、デバッグ ジョブの速度低下に役立つ可能性があります。
特定の状態ストア インスタンスのメトリクスには、パーティション ID とストア名でラベルが付けられ、それらが分離されたままであることを保証します。 他のすべてのメトリクスは、状態演算子が実行されているすべてのタスクのジョブ内の状態演算子ごとに集計 (合計) されます。
これらのメトリクスは、StreamingQueryProgress
の stateOperators
フィールド内のcustomMetrics
マップの一部です。以下は、JSON 形式での StreamingQueryProgress
の例です ( StreamingQueryProgress.json()
を使用して取得)。
{
"id": "6774075e-8869-454b-ad51-513be86cfd43",
"runId": "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
"batchId": 7,
"stateOperators": [
{
"numRowsTotal": 20000000,
"numRowsUpdated": 20000000,
"memoryUsedBytes": 31005397,
"numRowsDroppedByWatermark": 0,
"customMetrics": {
"SnapshotLastUploaded.partition_0_default": 7,
"SnapshotLastUploaded.partition_1_default": 7,
"SnapshotLastUploaded.partition_2_default": 6,
"SnapshotLastUploaded.partition_3_default": 6,
"SnapshotLastUploaded.partition_4_default": -1,
"rocksdbBytesCopied": 141037747,
"rocksdbCommitCheckpointLatency": 2,
"rocksdbCommitCompactLatency": 22061,
"rocksdbCommitFileSyncLatencyMs": 1710,
"rocksdbCommitFlushLatency": 19032,
"rocksdbCommitPauseLatency": 0,
"rocksdbCommitWriteBatchLatency": 56155,
"rocksdbFilesCopied": 2,
"rocksdbFilesReused": 0,
"rocksdbGetCount": 40000000,
"rocksdbGetLatency": 21834,
"rocksdbPutCount": 1,
"rocksdbPutLatency": 56155599000,
"rocksdbReadBlockCacheHitCount": 1988,
"rocksdbReadBlockCacheMissCount": 40341617,
"rocksdbSstFileSize": 141037747,
"rocksdbTotalBytesReadByCompaction": 336853375,
"rocksdbTotalBytesReadByGet": 680000000,
"rocksdbTotalBytesReadThroughIterator": 0,
"rocksdbTotalBytesWrittenByCompaction": 141037747,
"rocksdbTotalBytesWrittenByPut": 740000012,
"rocksdbTotalCompactionLatencyMs": 21949695000,
"rocksdbWriterStallLatencyMs": 0,
"rocksdbZipFileBytesUncompressed": 7038
}
}
],
"sources": [{}],
"sink": {}
}
メトリクスの詳細な説明は次のとおりです。
メトリクス名 | 説明 |
---|---|
rocksdbCommitWriteBatchLatency | インメモリ構造 (WriteBatch) のステージングされた書き込みをネイティブ RocksDB に適用するのにかかった時間 (ミリ秒単位) 。 |
rocksdbコミットフラッシュレイテンシ | RocksDB のメモリ内変更をローカル ディスクにフラッシュするのにかかった時間 (ミリ単位) です。 |
rocksdbCommitコンパクトレイテンシ | チェックポイントのコミット中に圧縮 (オプション) にかかった時間 (ミリ秒単位)。 |
rocksdbコミット一時停止レイテンシ | チェックポイントコミットの一部としてバックグラウンドワーカースレッド(圧縮など)を停止するのにかかった時間(ミリ秒単位)。 |
RocksDBはチェックポイント・レイテンシーをコミットします | ネイティブRocksDBのスナップショットを撮り、それをローカルディレクトリに書き込むのにかかった時間(ミリ単位)。 |
rocksdbCommitFileSyncLatencyMs さん | ネイティブの RocksDB スナップショット関連ファイルを外部ストレージ (チェックポイントの場所) に同期するのにかかった時間 (ミリ秒単位)。 |
rocksdbのGetLatency | 基になるネイティブ |
rocksdbPutCount さん | 基になるネイティブ |
rocksdbのGetCount | ネイティブ |
rocksdbPutCount さん | ネイティブ |
GETによるRocksDBの合計読み取りバイト数 | ネイティブ |
RocksDBのPUTによって書き込まれた合計バイト数 | ネイティブ |
rocksdbReadBlockCacheHitCount (英語) | ネイティブの RocksDB ブロックキャッシュを使用して、ローカルディスクからのデータの読み取りを回避した回数。 |
rocksdbReadBlockCacheMissCount さん | ネイティブの RocksDB ブロック キャッシュが欠落し、ローカル ディスクからのデータの読み取りが必要になった回数。 |
rocksdbTotalBytesReadByCompaction (英語) | ネイティブの RocksDB コンパクション・プロセスによってローカル・ディスクから読み取られたバイト数。 |
rocksdbTotalBytesWrittenByCompaction (英語) | ネイティブの RocksDB コンパクション・プロセスによってローカル・ディスクに書き込まれたバイト数。 |
rocksdbTotalCompactionLatencyMs (英語) | RocksDBのコンパクションにかかった時間(ミリ秒単位)(バックグラウンドとコミット中に開始されたオプションのコンパクションの両方)。 |
RocksDBライターストールレイテンシム | ライターがバックグラウンドでの圧縮またはメモリテーブルのディスクへのフラッシュのために停止した時間 (ミリ秒単位)。 |
rocksdbTotalBytesReadThroughIterator (英語) | 一部のステートフル操作 ( |