Legacy arbitrary stateful operators
Databricks recommends using transformWithState to build custom stateful applications. See Build a custom stateful application.
This article has information for features that support mapGroupsWithState, and flatMapGroupsWithState. For more details on these operators, see link.
Specify initial state for mapGroupsWithState
You can specify a user-defined initial state for Structured Streaming stateful processing using flatMapGroupsWithStateor mapGroupsWithState. This allows you to avoid reprocessing data when starting a stateful stream without a valid checkpoint.
def mapGroupsWithState[S: Encoder, U: Encoder](
    timeoutConf: GroupStateTimeout,
    initialState: KeyValueGroupedDataset[K, S])(
    func: (K, Iterator[V], GroupState[S]) => U): Dataset[U]
def flatMapGroupsWithState[S: Encoder, U: Encoder](
    outputMode: OutputMode,
    timeoutConf: GroupStateTimeout,
    initialState: KeyValueGroupedDataset[K, S])(
    func: (K, Iterator[V], GroupState[S]) => Iterator[U])
Example use case that specifies an initial state to the flatMapGroupsWithState operator:
val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
  val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
  state.update(new RunningCount(count))
  Iterator((key, count.toString))
}
val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
  ("apple", new RunningCount(1)),
  ("orange", new RunningCount(2)),
  ("mango", new RunningCount(5)),
).toDS()
val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)
fruitStream
  .groupByKey(x => x)
  .flatMapGroupsWithState(Update, GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)
Example use case that specifies an initial state to the mapGroupsWithState operator:
val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
  val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
  state.update(new RunningCount(count))
  (key, count.toString)
}
val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
  ("apple", new RunningCount(1)),
  ("orange", new RunningCount(2)),
  ("mango", new RunningCount(5)),
).toDS()
val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)
fruitStream
  .groupByKey(x => x)
  .mapGroupsWithState(GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)
Test the mapGroupsWithState update function
The TestGroupState API enables you to test the state update function used for Dataset.groupByKey(...).mapGroupsWithState(...) and Dataset.groupByKey(...).flatMapGroupsWithState(...).
The state update function takes the previous state as input using an object of type GroupState. See the Apache Spark GroupState reference documentation. For example:
import org.apache.spark.sql.streaming._
import org.apache.spark.api.java.Optional
test("flatMapGroupsWithState's state update function") {
  var prevState = TestGroupState.create[UserStatus](
    optionalState = Optional.empty[UserStatus],
    timeoutConf = GroupStateTimeout.EventTimeTimeout,
    batchProcessingTimeMs = 1L,
    eventTimeWatermarkMs = Optional.of(1L),
    hasTimedOut = false)
  val userId: String = ...
  val actions: Iterator[UserAction] = ...
  assert(!prevState.hasUpdated)
  updateState(userId, actions, prevState)
  assert(prevState.hasUpdated)
}