Exemplos de aplicativos com estado
Este artigo contém exemplos de código para aplicativos personalizados com estado. Databricks recomenda o uso de métodos integrados com estado para operações comuns, como agregações e junções.
Os padrões deste artigo usam o operador transformWithState
e as classes associadas disponíveis no Public Preview em Databricks Runtime 16.2 e acima. Consulte Criar um aplicativo personalizado com estado.
O Python usa o operador transformWithStateInPandas
para oferecer a mesma funcionalidade. Os exemplos abaixo fornecem código em Python e Scala.
Requisitos
O operador transformWithState
e as APIs e classes relacionadas têm os seguintes requisitos:
- Disponível em Databricks Runtime 16.2 e acima.
- deve usar o modo de acesso dedicado ou sem isolamento.
- O senhor deve usar o provedor RocksDB armazenamento do estado. Databricks recomenda ativar o RocksDB como parte da configuração do compute.
transformWithStateInPandas
suporta o modo de acesso padrão em Databricks Runtime 16.3 e acima.
Para ativar o provedor RocksDB armazenamento do estado para a sessão atual, execute o seguinte:
spark.conf.set("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
dimensões que mudam lentamente (SCD) (SCD) tipo 1
O código a seguir é um exemplo de implementação do SCD tipo 1 usando transformWithState
. O SCD tipo 1 rastreia apenas o valor mais recente de um determinado campo.
O senhor pode usar tabelas de transmissão e APPLY CHANGES INTO
para implementar SCD tipo 1 ou tipo 2 usando Delta Lake-backed tables. Este exemplo implementa o SCD tipo 1 no armazenamento do estado, que oferece menor latência para aplicativos reais de tempo próximo.
- Python
- Scala
# Import necessary libraries
import pandas as pd
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import StructType, StructField, LongType, StringType
from typing import Iterator
# Set the state store provider to RocksDB
spark.conf.set("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
# Define the output schema for the streaming query
output_schema = StructType([
StructField("user", StringType(), True),
StructField("time", LongType(), True),
StructField("location", StringType(), True)
])
# Define a custom StatefulProcessor for slowly changing dimension type 1 (SCD1) operations
class SCDType1StatefulProcessor(StatefulProcessor):
def init(self, handle: StatefulProcessorHandle) -> None:
# Define the schema for the state value
value_state_schema = StructType([
StructField("user", StringType(), True),
StructField("time", LongType(), True),
StructField("location", StringType(), True)
])
# Initialize the state to store the latest location for each user
self.latest_location = handle.getValueState("latestLocation", value_state_schema)
def handleInputRows(self, key, rows, timer_values) -> Iterator[pd.DataFrame]:
# Find the row with the maximum time value
max_row = None
max_time = float('-inf')
for pdf in rows:
for _, pd_row in pdf.iterrows():
time_value = pd_row["time"]
if time_value > max_time:
max_time = time_value
max_row = tuple(pd_row)
# Check if state exists and update if necessary
exists = self.latest_location.exists()
if not exists or max_row[1] > self.latest_location.get()[1]:
# Update the state with the new max row
self.latest_location.update(max_row)
# Yield the updated row
yield pd.DataFrame(
{"user": (max_row[0],), "time": (max_row[1],), "location": (max_row[2],)}
)
# Yield an empty DataFrame if no update is needed
yield pd.DataFrame()
def close(self) -> None:
# No cleanup needed
pass
# Apply the stateful transformation to the input DataFrame
(df.groupBy("user")
.transformWithStateInPandas(
statefulProcessor=SCDType1StatefulProcessor(),
outputStructType=output_schema,
outputMode="Update",
timeMode="None",
)
.writeStream... # Continue with stream writing configuration
)
// Define a case class to represent user location data
case class UserLocation(
user: String,
time: Long,
location: String)
// Define a stateful processor for slowly changing dimension type 1 (SCD1) operations
class SCDType1StatefulProcessor extends StatefulProcessor[String, UserLocation, UserLocation] {
import org.apache.spark.sql.{Encoders}
// Transient value state to store the latest location for each user
@transient private var _latestLocation: ValueState[UserLocation] = _
// Initialize the state store
override def init(
outputMode: OutputMode,
timeMode: TimeMode): Unit = {
// Create a value state named "locationState" using UserLocation encoder
// TTLConfig.NONE means the state has no expiration
_latestLocation = getHandle.getValueState[UserLocation]("locationState",
Encoders.product[UserLocation], TTLConfig.NONE)
}
// Process input rows and update state
override def handleInputRows(
key: String,
inputRows: Iterator[UserLocation],
timerValues: TimerValues): Iterator[UserLocation] = {
// Find the location with the maximum timestamp from input rows
val maxNewLocation = inputRows.maxBy(_.time)
// Update state and emit output if:
// 1. No previous state exists, or
// 2. New location has a more recent timestamp than the stored one
if (_latestLocation.getOption().isEmpty || maxNewLocation.time > _latestLocation.get().time) {
_latestLocation.update(maxNewLocation)
Iterator.single(maxNewLocation) // Emit the updated location
} else {
Iterator.empty // No update needed, emit nothing
}
}
}
}
dimensões que mudam lentamente (SCD) (SCD) tipo 2
O Notebook a seguir contém um exemplo de implementação do SCD tipo 2 usando transformWithState
em Python ou Scala.
SCD Tipo 2 Python
SCD Tipo 2 Scala
Detector de tempo de inatividade
transformWithState
implementa temporizadores para permitir que o usuário tome medidas com base no tempo decorrido, mesmo que nenhum registro de um determinado key seja processado em um microbatch.
O exemplo a seguir implementa um padrão para um detector de tempo de inatividade. Cada vez que um novo valor é visto para um determinado key, ele atualiza o valor do estado lastSeen
, limpa todos os temporizadores existentes e reinicia um temporizador para o futuro.
Quando um cronômetro expira, o aplicativo emite o tempo decorrido desde o último evento observado para o key. Em seguida, ele define um novo cronômetro para emitir uma atualização 10 segundos depois.
- Python
- Scala
import datetime
import time
class DownTimeDetectorStatefulProcessor(StatefulProcessor):
def init(self, handle: StatefulProcessorHandle) -> None:
# Define schema for the state value (timestamp)
state_schema = StructType([StructField("value", TimestampType(), True)])
self.handle = handle
# Initialize state to store the last seen timestamp for each key
self.last_seen = handle.getValueState("last_seen", state_schema)
def handleExpiredTimer(self, key, timerValues, expiredTimerInfo) -> Iterator[pd.DataFrame]:
latest_from_existing = self.last_seen.get()
# Calculate downtime duration
downtime_duration = timerValues.getCurrentProcessingTimeInMs() - int(time.time() * 1000)
# Register a new timer for 10 seconds in the future
self.handle.registerTimer(timerValues.getCurrentProcessingTimeInMs() + 10000)
# Yield a DataFrame with the key and downtime duration
yield pd.DataFrame(
{
"id": key,
"timeValues": str(downtime_duration),
}
)
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
# Find the row with the maximum timestamp
max_row = max((tuple(pdf.iloc[0]) for pdf in rows), key=lambda row: row[1])
# Get the latest timestamp from existing state or use epoch start if not exists
if self.last_seen.exists():
latest_from_existing = self.last_seen.get()
else:
latest_from_existing = datetime.fromtimestamp(0)
# If new data is more recent than existing state
if latest_from_existing < max_row[1]:
# Delete all existing timers
for timer in self.handle.listTimers():
self.handle.deleteTimer(timer)
# Update the last seen timestamp
self.last_seen.update((max_row[1],))
# Register a new timer for 5 seconds in the future
self.handle.registerTimer(timerValues.getCurrentProcessingTimeInMs() + 5000)
# Get current processing time in milliseconds
timestamp_in_millis = str(timerValues.getCurrentProcessingTimeInMs())
# Yield a DataFrame with the key and current timestamp
yield pd.DataFrame({"id": key, "timeValues": timestamp_in_millis})
def close(self) -> None:
# No cleanup needed
pass
import java.sql.Timestamp
import org.apache.spark.sql.Encoders
// The (String, Timestamp) schema represents an (id, time). We want to do downtime
// detection on every single unique sensor, where each sensor has a sensor ID.
class DowntimeDetector(duration: Duration) extends
StatefulProcessor[String, (String, Timestamp), (String, Duration)] {
@transient private var _lastSeen: ValueState[Timestamp] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
_lastSeen = getHandle.getValueState[Timestamp]("lastSeen", Encoders.TIMESTAMP, TTLConfig.NONE)
}
// The logic here is as follows: find the largest timestamp seen so far. Set a timer for
// the duration later.
override def handleInputRows(
key: String,
inputRows: Iterator[(String, Timestamp)],
timerValues: TimerValues): Iterator[(String, Duration)] = {
val latestRecordFromNewRows = inputRows.maxBy(_._2.getTime)
// Use getOrElse to initiate state variable if it doesn't exist
val latestTimestampFromExistingRows = _lastSeen.getOption().getOrElse(new Timestamp(0))
val latestTimestampFromNewRows = latestRecordFromNewRows._2
if (latestTimestampFromNewRows.after(latestTimestampFromExistingRows)) {
// Cancel the one existing timer, since we have a new latest timestamp.
// We call "listTimers()" just because we don't know ahead of time what
// the timestamp of the existing timer is.
getHandle.listTimers().foreach(timer => getHandle.deleteTimer(timer))
_lastSeen.update(latestTimestampFromNewRows)
// Use timerValues to schedule a timer using processing time.
getHandle.registerTimer(timerValues.getCurrentProcessingTimeInMs() + duration.toMillis)
} else {
// No new latest timestamp, so no need to update state or set a timer.
}
Iterator.empty
}
override def handleExpiredTimer(
key: String,
timerValues: TimerValues,
expiredTimerInfo: ExpiredTimerInfo): Iterator[(String, Duration)] = {
val latestTimestamp = _lastSeen.get()
val downtimeDuration = new Duration(
timerValues.getCurrentProcessingTimeInMs() - latestTimestamp.getTime)
// Register another timer that will fire in 10 seconds.
// Timers can be registered anywhere but init()
getHandle.registerTimer(timerValues.getCurrentProcessingTimeInMs() + 10000)
Iterator((key, downtimeDuration))
}
}
Migrar informações existentes sobre o estado
O exemplo a seguir demonstra como implementar um aplicativo com estado que aceita um estado inicial. Você pode adicionar o tratamento do estado inicial a qualquer aplicativo com estado, mas o estado inicial só pode ser definido ao inicializar o aplicativo pela primeira vez.
Este exemplo usa o leitor statestore
para carregar informações de estado existentes de um caminho de ponto de verificação. Um exemplo de caso de uso desse padrão é a migração de aplicativos legados com estado para transformWithState
.
- Python
- Scala
# Import necessary libraries
import pandas as pd
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import StructType, StructField, LongType, StringType, IntegerType
from typing import Iterator
# Set RocksDB as the state store provider for better performance
spark.conf.set("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
"""
Input schema is as below
input_schema = StructType(
[StructField("id", StringType(), True)],
[StructField("value", StringType(), True)]
)
"""
# Define the output schema for the streaming query
output_schema = StructType([
StructField("id", StringType(), True),
StructField("accumulated", StringType(), True)
])
class AccumulatedCounterStatefulProcessorWithInitialState(StatefulProcessor):
def init(self, handle: StatefulProcessorHandle) -> None:
# Define schema for the state value (integer)
state_schema = StructType([StructField("value", IntegerType(), True)])
# Initialize state to store the accumulated counter for each id
self.counter_state = handle.getValueState("counter_state", state_schema)
self.handle = handle
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
# Check if state exists for the current key
exists = self.counter_state.exists()
if exists:
value_row = self.counter_state.get()
existing_value = value_row[0]
else:
existing_value = 0
accumulated_value = existing_value
# Process input rows and accumulate values
for pdf in rows:
value = pdf["value"].astype(int).sum()
accumulated_value += value
# Update the state with the new accumulated value
self.counter_state.update((accumulated_value,))
# Yield a DataFrame with the key and accumulated value
yield pd.DataFrame({"id": key, "accumulated": str(accumulated_value)})
def handleInitialState(self, key, initialState, timerValues) -> None:
# Initialize the state with the provided initial value
init_val = initialState.at[0, "initVal"]
self.counter_state.update((init_val,))
def close(self) -> None:
# No cleanup needed
pass
# Load initial state from a checkpoint directory
initial_state = spark.read.format("statestore")
.option("path", "$checkpointsDir")
.load()
# Apply the stateful transformation to the input DataFrame
df.groupBy("id")
.transformWithStateInPandas(
statefulProcessor=AccumulatedCounterStatefulProcessorWithInitialState(),
outputStructType=output_schema,
outputMode="Update",
timeMode="None",
initialState=initial_state,
)
.writeStream... # Continue with stream writing configuration
// Import necessary libraries
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.{Dataset, Encoder, Encoders , DataFrame}
import org.apache.spark.sql.types._
// Define a stateful processor that can handle initial state
class InitialStateStatefulProcessor extends StatefulProcessorWithInitialState[String, (String, String, String), (String, String), (String, Int)] {
// Transient value state to store the accumulated value
@transient protected var valueState: ValueState[Int] = _
// Initialize the state store
override def init(
outputMode: OutputMode,
timeMode: TimeMode): Unit = {
// Create a value state named "valueState" using Int encoder
// TTLConfig.NONE means the state has no automatic expiration
valueState = getHandle.getValueState[Int]("valueState",
Encoders.scalaInt, TTLConfig.NONE)
}
// Process input rows and update state
override def handleInputRows(
key: String,
inputRows: Iterator[(String, String, String)],
timerValues: TimerValues): Iterator[(String, String)] = {
var existingValue = 0
// Retrieve existing value from state if it exists
if (valueState.exists()) {
existingValue += valueState.get()
}
var accumulatedValue = existingValue
// Accumulate values from input rows
for (row <- inputRows) {
accumulatedValue += row._2.toInt
}
// Update the state with the new accumulated value
valueState.update(accumulatedValue)
// Return the key and accumulated value as a string
Iterator((key, accumulatedValue.toString))
}
// Handle initial state when provided
override def handleInitialState(
key: String, initialState: (String, Int), timerValues: TimerValues): Unit = {
// Update the state with the initial value
valueState.update(initialState._2)
}
}
Migrar a tabela Delta para o armazenamento do estado para inicialização
O Notebook a seguir contém um exemplo de inicialização de valores de armazenamento do estado de uma tabela Delta usando transformWithState
em Python ou Scala.
Inicializar o estado a partir do Delta Python
Inicializar o estado a partir do Delta Scala
Sessão de acompanhamento
O Notebook a seguir contém um exemplo de acompanhamento de sessão usando transformWithState
em Python ou Scala.
Sessão de acompanhamento Python
Sessão de acompanhamento Scala
Transmissão-transmissão personalizada join usando transformWithState
O código a seguir demonstra uma transmissão-transmissão personalizada join em várias transmissões usando transformWithState
. O senhor pode usar essa abordagem em vez de um operador integrado join pelos seguintes motivos:
- O senhor precisa usar o modo de saída de atualização que não suporta a união de transmissão-transmissão. Isso é especialmente útil para aplicativos de baixa latência.
- O senhor precisa continuar a executar a união para as linhas que chegam mais tarde (após a expiração da marca d'água).
- O senhor precisa realizar uma união de transmissão-transmissão de muitos para muitos.
Esse exemplo dá ao usuário controle total sobre a lógica de expiração do estado, permitindo a extensão dinâmica do período de retenção para lidar com eventos fora de ordem mesmo após a marca d'água.
- Python
- Scala
# Import necessary libraries
import pandas as pd
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import StructType, StructField, StringType, TimestampType
from typing import Iterator
# Define output schema for the joined data
output_schema = StructType([
StructField("user_id", StringType(), True),
StructField("event_type", StringType(), True),
StructField("timestamp", TimestampType(), True),
StructField("profile_name", StringType(), True),
StructField("email", StringType(), True),
StructField("preferred_category", StringType(), True)
])
class CustomStreamJoinProcessor(StatefulProcessor):
# Initialize stateful storage for user profiles, preferences, and event tracking.
def init(self, handle: StatefulProcessorHandle) -> None:
# Define schemas for different types of state data
profile_schema = StructType([
StructField("name", StringType(), True),
StructField("email", StringType(), True),
StructField("updated_at", TimestampType(), True)
])
preferences_schema = StructType([
StructField("preferred_category", StringType(), True),
StructField("updated_at", TimestampType(), True)
])
activity_schema = StructType([
StructField("event_type", StringType(), True),
StructField("timestamp", TimestampType(), True)
])
# Initialize state storage for user profiles, preferences, and activity
self.profile_state = handle.getMapState("user_profiles", "string", profile_schema)
self.preferences_state = handle.getMapState("user_preferences", "string", preferences_schema)
self.activity_state = handle.getMapState("user_activity", "string", activity_schema)
# Process incoming events and update state
def handleInputRows(self, key, rows: Iterator[pd.DataFrame], timer_values) -> Iterator[pd.DataFrame]:
df = pd.concat(rows, ignore_index=True)
output_rows = []
for _, row in df.iterrows():
user_id = row["user_id"]
if "event_type" in row: # User activity event
self.activity_state.update_value(user_id, row.to_dict())
# Set a timer to process this event after a 10-second delay
self.getHandle().registerTimer(timer_values.get_current_processing_time_in_ms() + (10 * 1000))
elif "name" in row: # Profile update
self.profile_state.update_value(user_id, row.to_dict())
elif "preferred_category" in row: # Preference update
self.preferences_state.update_value(user_id, row.to_dict())
# No immediate output; processing will happen when timer expires
return iter([])
# Perform lookup after delay, handling out-of-order and late-arriving events.
def handleExpiredTimer(self, key, timer_values, expired_timer_info) -> Iterator[pd.DataFrame]:
# Retrieve stored state for the user
user_activity = self.activity_state.get_value(key)
user_profile = self.profile_state.get_value(key)
user_preferences = self.preferences_state.get_value(key)
if user_activity:
# Combine data from different states into a single output row
output_row = {
"user_id": key,
"event_type": user_activity["event_type"],
"timestamp": user_activity["timestamp"],
"profile_name": user_profile.get("name") if user_profile else None,
"email": user_profile.get("email") if user_profile else None,
"preferred_category": user_preferences.get("preferred_category") if user_preferences else None
}
return iter([pd.DataFrame([output_row])])
return iter([])
def close(self) -> None:
# No cleanup needed
pass
# Apply transformWithState to the input DataFrame
(df.groupBy("user_id")
.transformWithStateInPandas(
statefulProcessor=CustomStreamJoinProcessor(),
outputStructType=output_schema,
outputMode="Append",
timeMode="ProcessingTime"
)
.writeStream... # Continue with stream writing configuration
)
// Import necessary libraries
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.types.TimestampType
import java.sql.Timestamp
// Define a case class for enriched user events, combining user activity with profile and preference data
case class EnrichedUserEvent(
user_id: String,
event_type: String,
timestamp: Timestamp,
profile_name: Option[String],
email: Option[String],
preferred_category: Option[String]
)
// Custom stateful processor for stream-stream join
class CustomStreamJoinProcessor extends StatefulProcessor[String, UserEvent, EnrichedUserEvent] {
// Transient state variables to store user profiles, preferences, and activities
@transient private var _profileState: MapState[String, UserProfile] = _
@transient private var _preferencesState: MapState[String, UserPreferences] = _
@transient private var _activityState: MapState[String, UserEvent] = _
// Initialize state stores
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
_profileState = getHandle.getMapState[String, UserProfile]("profileState", Encoders.product[UserProfile], TTLConfig.NONE)
_preferencesState = getHandle.getMapState[String, UserPreferences]("preferencesState", Encoders.product[UserPreferences], TTLConfig.NONE)
_activityState = getHandle.getMapState[String, UserEvent]("activityState", Encoders.product[UserEvent], TTLConfig.NONE)
}
// Handle incoming user events
override def handleInputRows(
key: String,
inputRows: Iterator[UserEvent],
timerValues: TimerValues): Iterator[EnrichedUserEvent] = {
inputRows.foreach { event =>
if (event.event_type.nonEmpty) {
// Update activity state and set a timer for 10 seconds in the future
_activityState.update(key, event)
getHandle.registerTimer(timerValues.getCurrentProcessingTimeInMs() + 10000)
}
}
Iterator.empty
}
// Handle expired timers to produce enriched events
override def handleExpiredTimer(
key: String,
timerValues: TimerValues,
expiredTimerInfo: ExpiredTimerInfo): Iterator[EnrichedUserEvent] = {
// Retrieve user data from state stores
val userEvent = _activityState.getOption(key)
val userProfile = _profileState.getOption(key)
val userPreferences = _preferencesState.getOption(key)
if (userEvent.isDefined) {
// Create and return an enriched event if user activity exists
val enrichedEvent = EnrichedUserEvent(
user_id = key,
event_type = userEvent.get.event_type,
timestamp = userEvent.get.timestamp,
profile_name = userProfile.map(_.name),
email = userProfile.map(_.email),
preferred_category = userPreferences.map(_.preferred_category)
)
Iterator.single(enrichedEvent)
} else {
Iterator.empty
}
}
}
// Apply the custom stateful processor to the input DataFrame
val enrichedStream = df
.groupByKey(_.user_id)
.transformWithState(
new CustomStreamJoinProcessor(),
TimeMode.ProcessingTime(),
OutputMode.Append()
)
// Write the enriched stream to Delta Lake
enrichedStream.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/mnt/delta/checkpoints")
.start("/mnt/delta/enriched_events")
Computação Top-K
O exemplo a seguir usa um ListState
com uma fila de prioridade para manter e atualizar os K elementos principais em uma transmissão para cada grupo key em tempo real próximo.