従来の任意のステートフル演算子
注記
Databricks では、 transformWithState
を使用してカスタム ステートフル アプリケーションを構築することをお勧めします。 「カスタム ステートフル アプリケーションの構築」を参照してください。
この記事では、 mapGroupsWithState
と flatMapGroupsWithState
をサポートする機能に関する情報を示します。 これらの演算子の詳細については、 リンクを参照してください。
の初期状態を指定 mapGroupsWithState
構造化ストリーミング ステートフル処理のユーザー定義の初期状態は、 flatMapGroupsWithState
または mapGroupsWithState
を使用して指定できます。 これにより、有効なチェックポイントなしでステートフルストリームを開始するときにデータを再処理するのを回避できます。
Scala
def mapGroupsWithState[S: Encoder, U: Encoder](
timeoutConf: GroupStateTimeout,
initialState: KeyValueGroupedDataset[K, S])(
func: (K, Iterator[V], GroupState[S]) => U): Dataset[U]
def flatMapGroupsWithState[S: Encoder, U: Encoder](
outputMode: OutputMode,
timeoutConf: GroupStateTimeout,
initialState: KeyValueGroupedDataset[K, S])(
func: (K, Iterator[V], GroupState[S]) => Iterator[U])
初期状態を flatMapGroupsWithState
演算子に指定する使用例:
Scala
val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
state.update(new RunningCount(count))
Iterator((key, count.toString))
}
val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
("apple", new RunningCount(1)),
("orange", new RunningCount(2)),
("mango", new RunningCount(5)),
).toDS()
val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)
fruitStream
.groupByKey(x => x)
.flatMapGroupsWithState(Update, GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)
初期状態を mapGroupsWithState
演算子に指定する使用例:
Scala
val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
state.update(new RunningCount(count))
(key, count.toString)
}
val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
("apple", new RunningCount(1)),
("orange", new RunningCount(2)),
("mango", new RunningCount(5)),
).toDS()
val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)
fruitStream
.groupByKey(x => x)
.mapGroupsWithState(GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)
mapGroupsWithState
更新機能をテストする
TestGroupState
API を使用すると、Dataset.groupByKey(...).mapGroupsWithState(...)
と Dataset.groupByKey(...).flatMapGroupsWithState(...)
に使用される状態更新機能をテストできます。
状態更新関数は、 GroupState
型のオブジェクトを使用して、前の状態を入力として受け取ります。 Apache Spark GroupState のリファレンス ドキュメントを参照してください。 例えば:
Scala
import org.apache.spark.sql.streaming._
import org.apache.spark.api.java.Optional
test("flatMapGroupsWithState's state update function") {
var prevState = TestGroupState.create[UserStatus](
optionalState = Optional.empty[UserStatus],
timeoutConf = GroupStateTimeout.EventTimeTimeout,
batchProcessingTimeMs = 1L,
eventTimeWatermarkMs = Optional.of(1L),
hasTimedOut = false)
val userId: String = ...
val actions: Iterator[UserAction] = ...
assert(!prevState.hasUpdated)
updateState(userId, actions, prevState)
assert(prevState.hasUpdated)
}