メインコンテンツまでスキップ

ステートフルストリーミングとは?

このページでは、ステートフル操作、最適化の推奨事項、複数のステートフル演算子の連鎖、状態の再調整など、ステートフル構造化ストリーミング クエリについて説明します。

ステートフル 構造化ストリーミング クエリでは、中間状態情報への増分更新が必要ですが、 ステートレス 構造化ストリーミング クエリでは、どの行がソースからシンクまで処理されたかに関する情報のみが追跡されます。 ステートレス クエリで使用できる最適化機能については、 「ステートレス ストリーミング クエリの最適化」を参照してください。

ステートフル操作

ステートフル操作には、ストリーミング集約、 distinctdropDuplicates 、ストリーム ストリーム結合、カスタム ステートフル アプリケーションが含まれます。

ステートフルな構造化ストリーミング クエリに必要な中間状態情報の構成を誤ると、予期しない待機時間や本番運用の問題が発生する可能性があります。

Databricks Runtime 13.3 LTS 以降では、RocksDB を使用して変更ログのチェックポイントを有効にし、構造化ストリーミング ワークロードのチェックポイントの期間とエンドツーエンドの待機時間を短縮できます。Databricks では、すべての構造化ストリーミング ステートフル クエリに対して変更ログ チェックポイントを有効にすることをお勧めします。「変更ログのチェックポイントを有効にする」を参照してください。

ステートフル構造化ストリーミングクエリを最適化する

Databricks では、ステートフル構造化ストリーミング クエリに対して以下を推奨しています。

  • コンピュート最適化インスタンスをワーカーとして使用します。
  • シャッフル パーティションの数を、クラスターのコア数の 1 倍から 2 倍に設定します。
  • SparkSession でspark.sql.streaming.noDataMicroBatches.enabled構成をfalseに設定します。これにより、ストリーミング マイクロ バッチ エンジンがデータを含まないマイクロ バッチを処理できなくなります。 この構成をfalseに設定すると、ウォーターマークまたは処理時間タイムアウトを使用するステートフル操作で、新しいデータがすぐに到着するのではなく到着するまでデータが出力されない可能性もあります。

Databricks では、ステートフル ストリームの状態を管理するために、変更ログ チェックポイント処理と共に RocksDB を使用することをお勧めします。 RocksDBでの 状態ストアの設定Databricks を参照してください。

注記

クエリの再起動間で状態管理スキームを変更することはできません。クエリがデフォルトの管理で開始されている場合は、状態ストアを変更するために、新しいチェックポイントの場所を使用して最初からクエリを再開する必要があります。

構造化ストリーミングで複数のステートフルオペレータを操作する

Databricks Runtime 13.3 LTS 以降では、Databricks は構造化ストリーミング ワークロードのステートフル オペレーターの高度なサポートを提供します。複数のステートフル演算子を連結することができます。つまり、ウィンドウ集計などの操作の出力を、結合などの別のステートフル操作に渡すことができます。

Databricks Runtime 16.2 以降では、複数のステートフル演算子を含むワークロードでtransformWithStateを使用できます。「カスタム ステートフル アプリケーションの構築」を参照してください。

次の例は、使用できるいくつかのパターンを示しています。

重要

複数のステートフル オペレーターを使用する場合、次の制限があります。

  • 従来のカスタム ステートフル演算子 ( FlatMapGroupWithStateおよびapplyInPandasWithState ) はサポートされていません。
  • 追加出力モードのみがサポートされています。

チェーンされたタイム ウィンドウ集約

Python
words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
window(words.timestamp, "10 minutes", "5 minutes"),
words.word
).count()

# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
window(window_time(windowedCounts.window), "1 hour"),
windowedCounts.word
).count()

2 つの異なるストリームでのタイム ウィンドウ集計とそれに続くストリーム-ストリーム ウィンドウ結合

Python
clicksWindow = clicksWithWatermark.groupBy(
clicksWithWatermark.clickAdId,
window(clicksWithWatermark.clickTime, "1 hour")
).count()

impressionsWindow = impressionsWithWatermark.groupBy(
impressionsWithWatermark.impressionAdId,
window(impressionsWithWatermark.impressionTime, "1 hour")
).count()

clicksWindow.join(impressionsWindow, "window", "inner")

ストリーム-ストリーム 時間間隔の結合とそれに続く時間枠の集計

Python
joined = impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
"leftOuter" # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined.groupBy(
joined.clickAdId,
window(joined.clickTime, "1 hour")
).count()

構造化ストリーミングの状態リバランス

状態のリバランスは、 LakeFlow Spark宣言型パイプラインのすべてのストリーミング ワークロードに対して確実に有効になります。 Databricks Runtime 11.3 LTS 以降では、Spark クラスター構成で次の構成オプションを設定して、状態の再調整を有効にすることができます。

ini
spark.sql.streaming.statefulOperator.stateRebalancing.enabled true

状態の再調整は、クラスターのサイズ変更イベントが発生するステートフル構造化ストリーミング パイプラインにメリットをもたらします。クラスターのサイズを変更しても、ステートレス ストリーミング操作にはメリットがありません。

注記

コンピュートの自動スケーリングには、構造化ストリーミング ワークロードのクラスター サイズをスケールダウンする際の制限があります。 Databricksストリーミング ワークロード用に強化されたオートスケールを備えたLakeflow Spark宣言型パイプラインを使用することをお勧めします。 「オートスケールを使用したLakeflow Spark宣言型パイプラインのクラスター使用率の最適化」を参照してください。

クラスター サイズ変更イベントは、状態の再調整をトリガーします。 マイクロバッチは、状態がクラウドストレージから新しいエグゼキューターにロードされるため、リバランスイベント中のレイテンシーが高くなる可能性があります。