ステートフルな構造化ストリーミングの最適化クエリー

ステートフルな構造化ストリーミング クエリーの中間状態情報を管理すると、予期しない待機時間や運用上の問題を防ぐのに役立ちます。

Databricks お勧めします:

  • コンピュート最適化インスタンスをワーカーとして使用します。

  • シャッフル パーティションの数を、クラスター内のコア数の 1 倍から 2 倍に設定します。

  • spark.sql.streaming.noDataMicroBatches.enabled 構成を SparkSession で false に設定します。これにより、ストリーミングマイクロバッチエンジンがデータを含まないマイクロバッチを処理できなくなります。 また、この構成を false に設定すると、ウォーターマークまたは処理時間のタイムアウトを利用して、すぐにではなく新しいデータが到着するまでデータ出力を取得しないステートフル操作が発生する可能性があることにも注意してください。

Databricks では、変更ログ チェックポイントで RocksDB を使用して、ステートフル ストリームの状態を管理することをお勧めします。 Databricksでの RocksDB 状態ストアの構成を参照してください。

クエリ再起動間で状態管理スキームを変更することはできません。 つまり、クエリーがデフォルトの管理で開始されている場合、新しいチェックポイント位置でクエリーを最初から開始しない限り、クエリーを変更することはできません。

構造化ストリーミング での複数のステートフル演算子の操作

Databricks Runtime 13.1 以降では、Databricks は構造化ストリーミング ワークロードのステートフル オペレーターに対して高度なサポートを提供します。複数のステートフル演算子を連結できるようになったため、ウィンドウ集計などの操作の出力を、結合などの別のステートフル操作にフィードできるようになりました。

次の例は、使用できるいくつかのパターンを示しています。

重要

複数のステートフル演算子を使用する場合は、次の制限があります。

  • FlatMapGroupWithState はサポートされていません。

  • 追加出力モードのみがサポートされています。

チェーンされたタイム ウィンドウ集約

words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
    window(words.timestamp, "10 minutes", "5 minutes"),
    words.word
).count()

# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
    window(window_time(windowedCounts.window), "1 hour"),
    windowedCounts.word
).count()
import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
  window($"timestamp", "10 minutes", "5 minutes"),
  $"word"
).count()

// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
  window($"window", "1 hour"),
  $"word"
).count()

2つの異なるストリームでの時間枠集約とそれに続くストリーム-ストリームウィンドウ結合

clicksWindow = clicksWithWatermark.groupBy(
  clicksWithWatermark.clickAdId,
  window(clicksWithWatermark.clickTime, "1 hour")
).count()

impressionsWindow = impressionsWithWatermark.groupBy(
  impressionsWithWatermark.impressionAdId,
  window(impressionsWithWatermark.impressionTime, "1 hour")
).count()

clicksWindow.join(impressionsWindow, "window", "inner")
val clicksWindow = clicksWithWatermark
  .groupBy(window("clickTime", "1 hour"))
  .count()

val impressionsWindow = impressionsWithWatermark
  .groupBy(window("impressionTime", "1 hour"))
  .count()

clicksWindow.join(impressionsWindow, "window", "inner")

ストリームとストリームの時間間隔の結合とそれに続くタイム ウィンドウの集計

joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """),
  "leftOuter"                 # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined.groupBy(
  joined.clickAdId,
  window(joined.clickTime, "1 hour")
).count()
val joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
  """),
  joinType = "leftOuter"      // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined
  .groupBy($"clickAdId", window($"clickTime", "1 hour"))
  .count()

構造化ストリーミング の状態の再調整

状態の再調整は、 Delta Live Tablesのすべてのストリーミングワークロードに対してデフォルトで有効になります。 Databricks Runtime 11.1 以降では、Spark クラスター構成で次の構成オプションを設定して、状態の再調整を有効にすることができます。

spark.sql.streaming.statefulOperator.stateRebalancing.enabled true

状態の再調整は、クラスターのサイズ変更イベントが発生するステートフルな構造化ストリーミングパイプラインの利点です。 ステートレス ストリーミング操作は、クラスター サイズの変更に関係なく、メリットはありません。

コンピュートの自動スケーリングには、構造化ストリーミング ワークロードのクラスター サイズをスケールダウンする制限があります。 Databricks 、ストリーミング ワークロードに拡張オートスケールを備えたDelta Live Tablesを使用することをお勧めします。 「拡張オートスケールDelta Live Tables Pipeline のクラスター使用率を最適化する」を参照してください。

クラスターのサイズ変更イベントにより、状態の再調整がトリガーされます。 再調整イベント中、micro-バッチは、状態がクラウドストレージから新しいエグゼキューターに読み込まれるため、レイテンシーが高くなる可能性があります。

mapGroupsWithState の初期状態を指定する

構造化ストリーミングのステートフル処理のユーザー定義の初期状態は、 flatMapGroupsWithStateまたは mapGroupsWithStateを使用して指定できます。 これにより、有効なチェックポイントなしでステートフル ストリームを開始するときにデータを再処理することを回避できます。

def mapGroupsWithState[S: Encoder, U: Encoder](
    timeoutConf: GroupStateTimeout,
    initialState: KeyValueGroupedDataset[K, S])(
    func: (K, Iterator[V], GroupState[S]) => U): Dataset[U]

def flatMapGroupsWithState[S: Encoder, U: Encoder](
    outputMode: OutputMode,
    timeoutConf: GroupStateTimeout,
    initialState: KeyValueGroupedDataset[K, S])(
    func: (K, Iterator[V], GroupState[S]) => Iterator[U])

flatMapGroupsWithState 演算子に初期状態を指定するユースケースの例:

val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
  val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
  state.update(new RunningCount(count))
  Iterator((key, count.toString))
}

val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
  ("apple", new RunningCount(1)),
  ("orange", new RunningCount(2)),
  ("mango", new RunningCount(5)),
).toDS()

val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)

fruitStream
  .groupByKey(x => x)
  .flatMapGroupsWithState(Update, GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)

mapGroupsWithState 演算子に初期状態を指定するユースケースの例:

val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
  val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
  state.update(new RunningCount(count))
  (key, count.toString)
}

val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
  ("apple", new RunningCount(1)),
  ("orange", new RunningCount(2)),
  ("mango", new RunningCount(5)),
).toDS()

val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)

fruitStream
  .groupByKey(x => x)
  .mapGroupsWithState(GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)

mapGroupsWithState 更新機能 をテストする

TestGroupState API を使用すると、 Dataset.groupByKey(...).mapGroupsWithState(...)Dataset.groupByKey(...).flatMapGroupsWithState(...)に使用される状態更新関数をテストできます。

状態更新関数は、 GroupState型のオブジェクトを使用して、前の状態を入力として受け取ります。 Apache Spark GroupState のリファレンス ドキュメントを参照してください。 例えば:

import org.apache.spark.sql.streaming._
import org.apache.spark.api.java.Optional

test("flatMapGroupsWithState's state update function") {
  var prevState = TestGroupState.create[UserStatus](
    optionalState = Optional.empty[UserStatus],
    timeoutConf = GroupStateTimeout.EventTimeTimeout,
    batchProcessingTimeMs = 1L,
    eventTimeWatermarkMs = Optional.of(1L),
    hasTimedOut = false)

  val userId: String = ...
  val actions: Iterator[UserAction] = ...

  assert(!prevState.hasUpdated)

  updateState(userId, actions, prevState)

  assert(prevState.hasUpdated)
}