メインコンテンツまでスキップ

ステートフル クエリの非同期状態チェックポイント処理

注記

Databricks Runtime 10.4 LTS 以降で使用できます。

非同期状態チェックポイント処理では、ストリーミング クエリの exactly-once 保証が維持されますが、状態の更新でボトルネックになっている一部の構造化ストリーミング ステートフル ワークロードの全体的な待機時間を短縮できます。 これは、状態チェックポイント処理が完了するのを待たずに、前のマイクロバッチの計算が完了するとすぐに、次のマイクロバッチの処理を開始することによって実現されます。 次の表は、同期チェックポイントと非同期チェックポイントのトレードオフを比較したものです。

特性

同期チェックポイント

非同期チェックポイント

レイテンシー

各マイクロバッチのレイテンシーが高くなります。

マイクロバッチが重複する可能性があるため、レイテンシーが短縮されます。

再起動

最後のバッチのみを再実行する必要があるため、迅速なリカバリ。

マイクロバッチよりも長い再起動遅延は、再実行が必要になる場合があります。

次に、非同期状態チェックポイント処理の恩恵を受ける可能性のあるストリーミング ジョブの特性を示します。

  • ジョブに 1 つ以上のステートフルな操作 (集計、 flatMapGroupsWithStatemapGroupsWithState、ストリーム-ストリーム結合など) がある

  • 状態チェックポイントのレイテンシは、バッチ実行の全体的なレイテンシの主な要因の 1 つです。 この情報は、 StreamingQueryProgress イベントで確認できます。 これらのイベントは、Spark ドライバーの log4j ログにも表示されます。 ストリーミング クエリの進行状況の例と、バッチ実行の全体的な待機時間に対する状態チェックポイントの影響を見つける方法を次に示します。

    JSON
     {
    "id" : "2e3495a2-de2c-4a6a-9a8e-f6d4c4796f19",
    "runId" : "e36e9d7e-d2b1-4a43-b0b3-e875e767e1fe",
    "...",
    "batchId" : 0,
    "durationMs" : {
    "...",
    "triggerExecution" : 547730,
    "..."
    },
    "stateOperators" : [ {
    "...",
    "commitTimeMs" : 3186626,
    "numShufflePartitions" : 64,
    "..."
    }]
    }
    • 上記のクエリ進行イベントの状態チェックポイントのレイテンシ分析

      • バッチ期間 (durationMs.triggerDuration) は約 547 秒です。
      • 状態ストア コミット レイテンシ (stateOperations[0].commitTimeMs) は約 3,186 秒です。 コミット レイテンシは、状態ストアを含むタスク間で集計されます。 この場合、そのようなタスクは64個あります(stateOperators[0].numShufflePartitions)。
      • 状態演算子を含む各タスクは、チェックポイントに平均 50 秒 (3,186/64) かかりました。 これは、バッチ期間に寄与する追加のレイテンシです。 64 個のタスクすべてが同時に実行されていると仮定すると、チェックポイント ステップはバッチ期間の約 9% (50 秒 / 547 秒) を占めます。 この割合は、最大並列タスクが 64 未満になるとさらに高くなります。

非同期状態チェックポイント処理の有効化

非同期状態チェックポイント処理には、 RocksDB ベースの状態ストア を使用する必要があります。 次の設定を行います。

Scala

spark.conf.set(
"spark.databricks.streaming.statefulOperator.asyncCheckpoint.enabled",
"true"
)

spark.conf.set(
"spark.sql.streaming.stateStore.providerClass",
"com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
)

非同期チェックポイント設定の制限事項と要件

注記

コンピュート Auto-Scaling には、構造化ストリーミング ワークロードのクラスタリング サイズをスケールダウンする制限があります。 Databricks では、ストリーミングワークロードに拡張オートスケールの DLT を使用することをお勧めします。 オートスケールを使用したDLT パイプラインのクラスタリング利用の最適化を参照してください。

  • 1 つ以上のストアで非同期チェックポイントでエラーが発生すると、クエリは失敗します。 同期チェックポイント モードでは、チェックポイントはタスクの一部として実行され、Spark はクエリが失敗する前にタスクを複数回再試行します。 このメカニズムは、非同期状態チェックポイント処理には存在しません。 Databricks では、ジョブの失敗時の自動再試行に continuous ジョブを使用することをお勧めします。 「ジョブの継続的な実行」を参照してください。
  • 非同期チェックポイント処理は、マイクロバッチ実行間で状態ストアの場所が変更されない場合に最適です。 クラスターのサイズ変更は、非同期状態チェックポイント処理と組み合わせて、クラスター サイズ変更イベントの一部としてノードが追加または削除されると、状態ストア インスタンスが再分散される可能性があるため、うまく機能しない可能性があります。
  • 非同期状態チェックポイント処理は、RocksDB 状態ストア プロバイダーの実装でのみサポートされます。 デフォルト in-memory 状態ストアの実装ではサポートされていません。