ステートフルストリーミングとは?
ステートフル な構造化ストリーミング クエリでは、中間状態情報の増分更新が必要ですが、 ステートレス な構造化ストリーミング クエリでは、ソースからシンクに処理された行に関する情報のみを追跡します。
ステートフル操作には、ストリーミング アグリゲーション、ストリーミング dropDuplicates
、ストリーム ストリーム結合、カスタム ステートフル アプリケーションが含まれます。
ステートフルな構造化ストリーミング クエリに必要な中間状態情報の構成を誤ると、予期しない待機時間や本番運用の問題が発生する可能性があります。
Databricks Runtime 13.3 LTS 以降では、RocksDB を使用して変更ログのチェックポイント処理を有効にし、構造化ストリーミング ワークロードのチェックポイントの期間とエンドツーエンドの待機時間を短縮できます。 Databricks では、すべての構造化ストリーミング ステートフル クエリに対して changelog のチェックポイント設定を有効にすることをお勧めします。 「変更ログのチェックポイント設定を有効にする」を参照してください。
ステートフルな構造化ストリーミング クエリの最適化
ステートフルな構造化ストリーミング クエリの中間状態情報を管理すると、予期しない待機時間や本番運用の問題を防ぐのに役立ちます。
Databricks では、次のことを推奨しています。
- コンピュート最適化インスタンスをワーカーとして使用します。
- シャッフル パーティションの数を、クラスターのコア数の 1 倍から 2 倍に設定します。
- SparkSession で
spark.sql.streaming.noDataMicroBatches.enabled
構成をfalse
に設定します。 これにより、ストリーミング マイクロバッチ エンジンは、データを含まないマイクロバッチを処理できなくなります。 また、この構成をfalse
に設定すると、ウォーターマークを使用したステートフル操作や、新しいデータが到着するまでデータ出力を取得しない処理時間タイムアウトが発生する可能性があることにも注意してください。
Databricks では、ステートフル ストリームの状態を管理するために、変更ログ チェックポイント処理と共に RocksDB を使用することをお勧めします。 RocksDBでの 状態ストアの設定Databricks を参照してください。
状態管理スキームは、クエリの再起動間で変更できません。 クエリがデフォルト管理で開始された場合、状態ストアを変更するには、新しいチェックポイントの場所を使用してクエリを最初から再開する必要があります。
構造化ストリーミングでの複数のステートフル演算子の操作
Databricks Runtime 13.3 LTS 以降では、Databricks は構造化ストリーミング ワークロードのステートフル演算子の高度なサポートを提供します。 複数のステートフルな演算子を連結できるようになったため、ウィンドウ集計などの操作の出力を、結合などの別のステートフルな操作にフィードできるようになりました。
Databricks Runtime 16.2 以降では、複数のステートフル演算子を持つワークロードで transformWithState
を使用できます。 「カスタム ステートフル アプリケーションの構築」を参照してください。
次の例は、使用できるいくつかのパターンを示しています。
複数のステートフル オペレーターを使用する場合、次の制限があります。
- 従来のカスタム ステートフル演算子 (
FlatMapGroupWithState
とapplyInPandasWithState
はサポートされていません。 - 追加出力モードのみがサポートされています。
チェーンされたタイム ウィンドウ集約
- Python
- Scala
words = ... # streaming DataFrame of schema { timestamp: Timestamp, word: String }
# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
window(words.timestamp, "10 minutes", "5 minutes"),
words.word
).count()
# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
window(window_time(windowedCounts.window), "1 hour"),
windowedCounts.word
).count()
import spark.implicits._
val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
window($"timestamp", "10 minutes", "5 minutes"),
$"word"
).count()
// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
window($"window", "1 hour"),
$"word"
).count()
2 つの異なるストリームでのタイム ウィンドウ集計とそれに続くストリーム-ストリーム ウィンドウ結合
- Python
- Scala
clicksWindow = clicksWithWatermark.groupBy(
clicksWithWatermark.clickAdId,
window(clicksWithWatermark.clickTime, "1 hour")
).count()
impressionsWindow = impressionsWithWatermark.groupBy(
impressionsWithWatermark.impressionAdId,
window(impressionsWithWatermark.impressionTime, "1 hour")
).count()
clicksWindow.join(impressionsWindow, "window", "inner")
val clicksWindow = clicksWithWatermark
.groupBy(window("clickTime", "1 hour"))
.count()
val impressionsWindow = impressionsWithWatermark
.groupBy(window("impressionTime", "1 hour"))
.count()
clicksWindow.join(impressionsWindow, "window", "inner")
ストリーム-ストリーム 時間間隔の結合とそれに続く時間枠の集計
- Python
- Scala
joined = impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
"leftOuter" # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)
joined.groupBy(
joined.clickAdId,
window(joined.clickTime, "1 hour")
).count()
val joined = impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
joinType = "leftOuter" // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)
joined
.groupBy($"clickAdId", window($"clickTime", "1 hour"))
.count()
構造化ストリーミングの状態リバランス
状態の再調整は、DLT のすべてのストリーミング ワークロードに対して Default によって有効になります。 Databricks Runtime 11.3 LTS 以降では、Spark クラスタリング構成で次の構成オプションを設定して、状態の再調整を有効にできます。
spark.sql.streaming.statefulOperator.stateRebalancing.enabled true
状態の再調整は、クラスター サイズ変更イベントを受けるステートフルな構造化ストリーミング パイプラインにメリットをもたらします。 ステートレス ストリーミング操作は、クラスター サイズの変更に関係なく、メリットはありません。
コンピュート Auto-Scaling には、構造化ストリーミング ワークロードのクラスタリング サイズをスケールダウンする制限があります。 Databricks では、ストリーミング ワークロードに拡張オートスケールの DLT を使用することをお勧めします。 拡張オートスケールによる DLT パイプラインのクラスタリング利用の最適化を参照してください。
クラスター サイズ変更イベントは、状態の再調整をトリガーします。 マイクロバッチは、状態がクラウドストレージから新しいエグゼキューターにロードされるため、リバランスイベント中のレイテンシーが高くなる可能性があります。