Pular para o conteúdo principal

Operadores de estado arbitrários legados

nota

A Databricks recomenda o uso de transformWithState para criar aplicativos personalizados com estado. Consulte Criar um aplicativo personalizado com estado.

Este artigo contém informações para recursos que suportam mapGroupsWithState, e flatMapGroupsWithState. Para obter mais detalhes sobre esses operadores, consulte o link.

Especifique o estado inicial para mapGroupsWithState

É possível especificar um estado inicial definido pelo usuário para o processamento stateful da transmissão estruturada usando flatMapGroupsWithStateou mapGroupsWithState. Isso permite que o senhor evite o reprocessamento de dados ao iniciar uma transmissão stateful sem um ponto de verificação válido.

Scala
def mapGroupsWithState[S: Encoder, U: Encoder](
timeoutConf: GroupStateTimeout,
initialState: KeyValueGroupedDataset[K, S])(
func: (K, Iterator[V], GroupState[S]) => U): Dataset[U]

def flatMapGroupsWithState[S: Encoder, U: Encoder](
outputMode: OutputMode,
timeoutConf: GroupStateTimeout,
initialState: KeyValueGroupedDataset[K, S])(
func: (K, Iterator[V], GroupState[S]) => Iterator[U])

Exemplo de caso de uso que especifica um estado inicial para o operador flatMapGroupsWithState:

Scala
val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
state.update(new RunningCount(count))
Iterator((key, count.toString))
}

val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
("apple", new RunningCount(1)),
("orange", new RunningCount(2)),
("mango", new RunningCount(5)),
).toDS()

val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)

fruitStream
.groupByKey(x => x)
.flatMapGroupsWithState(Update, GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)

Exemplo de caso de uso que especifica um estado inicial para o operador mapGroupsWithState:

Scala
val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
state.update(new RunningCount(count))
(key, count.toString)
}

val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
("apple", new RunningCount(1)),
("orange", new RunningCount(2)),
("mango", new RunningCount(5)),
).toDS()

val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)

fruitStream
.groupByKey(x => x)
.mapGroupsWithState(GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)

Teste a função de atualização mapGroupsWithState

A API TestGroupState permite que o senhor teste a função de atualização de estado usada para Dataset.groupByKey(...).mapGroupsWithState(...) e Dataset.groupByKey(...).flatMapGroupsWithState(...).

A função de atualização de estado usa o estado anterior como entrada usando um objeto do tipo GroupState. Consulte a documentação de referência do Apache Spark GroupState. Por exemplo:

Scala
import org.apache.spark.sql.streaming._
import org.apache.spark.api.java.Optional

test("flatMapGroupsWithState's state update function") {
var prevState = TestGroupState.create[UserStatus](
optionalState = Optional.empty[UserStatus],
timeoutConf = GroupStateTimeout.EventTimeTimeout,
batchProcessingTimeMs = 1L,
eventTimeWatermarkMs = Optional.of(1L),
hasTimedOut = false)

val userId: String = ...
val actions: Iterator[UserAction] = ...

assert(!prevState.hasUpdated)

updateState(userId, actions, prevState)

assert(prevState.hasUpdated)
}