Pular para o conteúdo principal

Exemplos de aplicativos com estado

Este artigo contém exemplos de código para aplicativos personalizados com estado. Databricks recomenda o uso de métodos integrados com estado para operações comuns, como agregações e junções.

Os padrões deste artigo usam o operador transformWithState e as classes associadas disponíveis no Public Preview em Databricks Runtime 16.2 e acima. Consulte Criar um aplicativo personalizado com estado.

nota

O Python usa o operador transformWithStateInPandas para oferecer a mesma funcionalidade. Os exemplos abaixo fornecem código em Python e Scala.

Requisitos

O operador transformWithState e as APIs e classes relacionadas têm os seguintes requisitos:

  • Disponível em Databricks Runtime 16.2 e acima.
  • deve usar o modo de acesso dedicado ou sem isolamento.
  • O senhor deve usar o provedor RocksDB armazenamento do estado. Databricks recomenda ativar o RocksDB como parte da configuração do compute.
  • transformWithStateInPandas suporta o modo de acesso padrão em Databricks Runtime 16.3 e acima.
nota

Para ativar o provedor RocksDB armazenamento do estado para a sessão atual, execute o seguinte:

Python
spark.conf.set("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")

dimensões que mudam lentamente (SCD) (SCD) tipo 1

O código a seguir é um exemplo de implementação do SCD tipo 1 usando transformWithState. O SCD tipo 1 rastreia apenas o valor mais recente de um determinado campo.

nota

O senhor pode usar tabelas de transmissão e APPLY CHANGES INTO para implementar SCD tipo 1 ou tipo 2 usando Delta Lake-backed tables. Este exemplo implementa o SCD tipo 1 no armazenamento do estado, que oferece menor latência para aplicativos reais de tempo próximo.

Python
# Import necessary libraries
import pandas as pd
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import StructType, StructField, LongType, StringType
from typing import Iterator

# Set the state store provider to RocksDB
spark.conf.set("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")

# Define the output schema for the streaming query
output_schema = StructType([
StructField("user", StringType(), True),
StructField("time", LongType(), True),
StructField("location", StringType(), True)
])

# Define a custom StatefulProcessor for slowly changing dimension type 1 (SCD1) operations
class SCDType1StatefulProcessor(StatefulProcessor):
def init(self, handle: StatefulProcessorHandle) -> None:
# Define the schema for the state value
value_state_schema = StructType([
StructField("user", StringType(), True),
StructField("time", LongType(), True),
StructField("location", StringType(), True)
])
# Initialize the state to store the latest location for each user
self.latest_location = handle.getValueState("latestLocation", value_state_schema)

def handleInputRows(self, key, rows, timer_values) -> Iterator[pd.DataFrame]:
# Find the row with the maximum time value
max_row = None
max_time = float('-inf')
for pdf in rows:
for _, pd_row in pdf.iterrows():
time_value = pd_row["time"]
if time_value > max_time:
max_time = time_value
max_row = tuple(pd_row)

# Check if state exists and update if necessary
exists = self.latest_location.exists()
if not exists or max_row[1] > self.latest_location.get()[1]:
# Update the state with the new max row
self.latest_location.update(max_row)
# Yield the updated row
yield pd.DataFrame(
{"user": (max_row[0],), "time": (max_row[1],), "location": (max_row[2],)}
)
# Yield an empty DataFrame if no update is needed
yield pd.DataFrame()

def close(self) -> None:
# No cleanup needed
pass

# Apply the stateful transformation to the input DataFrame
(df.groupBy("user")
.transformWithStateInPandas(
statefulProcessor=SCDType1StatefulProcessor(),
outputStructType=output_schema,
outputMode="Update",
timeMode="None",
)
.writeStream... # Continue with stream writing configuration
)

dimensões que mudam lentamente (SCD) (SCD) tipo 2

O Notebook a seguir contém um exemplo de implementação do SCD tipo 2 usando transformWithState em Python ou Scala.

SCD Tipo 2 Python

Open notebook in new tab

SCD Tipo 2 Scala

Open notebook in new tab

Detector de tempo de inatividade

transformWithState implementa temporizadores para permitir que o usuário tome medidas com base no tempo decorrido, mesmo que nenhum registro de um determinado key seja processado em um microbatch.

O exemplo a seguir implementa um padrão para um detector de tempo de inatividade. Cada vez que um novo valor é visto para um determinado key, ele atualiza o valor do estado lastSeen, limpa todos os temporizadores existentes e reinicia um temporizador para o futuro.

Quando um cronômetro expira, o aplicativo emite o tempo decorrido desde o último evento observado para o key. Em seguida, ele define um novo cronômetro para emitir uma atualização 10 segundos depois.

Python
import datetime
import time

class DownTimeDetectorStatefulProcessor(StatefulProcessor):
def init(self, handle: StatefulProcessorHandle) -> None:
# Define schema for the state value (timestamp)
state_schema = StructType([StructField("value", TimestampType(), True)])
self.handle = handle
# Initialize state to store the last seen timestamp for each key
self.last_seen = handle.getValueState("last_seen", state_schema)

def handleExpiredTimer(self, key, timerValues, expiredTimerInfo) -> Iterator[pd.DataFrame]:
latest_from_existing = self.last_seen.get()
# Calculate downtime duration
downtime_duration = timerValues.getCurrentProcessingTimeInMs() - int(time.time() * 1000)
# Register a new timer for 10 seconds in the future
self.handle.registerTimer(timerValues.getCurrentProcessingTimeInMs() + 10000)
# Yield a DataFrame with the key and downtime duration
yield pd.DataFrame(
{
"id": key,
"timeValues": str(downtime_duration),
}
)

def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
# Find the row with the maximum timestamp
max_row = max((tuple(pdf.iloc[0]) for pdf in rows), key=lambda row: row[1])

# Get the latest timestamp from existing state or use epoch start if not exists
if self.last_seen.exists():
latest_from_existing = self.last_seen.get()
else:
latest_from_existing = datetime.fromtimestamp(0)

# If new data is more recent than existing state
if latest_from_existing < max_row[1]:
# Delete all existing timers
for timer in self.handle.listTimers():
self.handle.deleteTimer(timer)
# Update the last seen timestamp
self.last_seen.update((max_row[1],))

# Register a new timer for 5 seconds in the future
self.handle.registerTimer(timerValues.getCurrentProcessingTimeInMs() + 5000)

# Get current processing time in milliseconds
timestamp_in_millis = str(timerValues.getCurrentProcessingTimeInMs())

# Yield a DataFrame with the key and current timestamp
yield pd.DataFrame({"id": key, "timeValues": timestamp_in_millis})

def close(self) -> None:
# No cleanup needed
pass

Migrar informações existentes sobre o estado

O exemplo a seguir demonstra como implementar um aplicativo com estado que aceita um estado inicial. Você pode adicionar o tratamento do estado inicial a qualquer aplicativo com estado, mas o estado inicial só pode ser definido ao inicializar o aplicativo pela primeira vez.

Este exemplo usa o leitor statestore para carregar informações de estado existentes de um caminho de ponto de verificação. Um exemplo de caso de uso desse padrão é a migração de aplicativos legados com estado para transformWithState.

Python
# Import necessary libraries
import pandas as pd
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import StructType, StructField, LongType, StringType, IntegerType
from typing import Iterator

# Set RocksDB as the state store provider for better performance
spark.conf.set("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")

"""
Input schema is as below

input_schema = StructType(
[StructField("id", StringType(), True)],
[StructField("value", StringType(), True)]
)
"""

# Define the output schema for the streaming query
output_schema = StructType([
StructField("id", StringType(), True),
StructField("accumulated", StringType(), True)
])

class AccumulatedCounterStatefulProcessorWithInitialState(StatefulProcessor):

def init(self, handle: StatefulProcessorHandle) -> None:
# Define schema for the state value (integer)
state_schema = StructType([StructField("value", IntegerType(), True)])
# Initialize state to store the accumulated counter for each id
self.counter_state = handle.getValueState("counter_state", state_schema)
self.handle = handle

def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
# Check if state exists for the current key
exists = self.counter_state.exists()
if exists:
value_row = self.counter_state.get()
existing_value = value_row[0]
else:
existing_value = 0

accumulated_value = existing_value

# Process input rows and accumulate values
for pdf in rows:
value = pdf["value"].astype(int).sum()
accumulated_value += value

# Update the state with the new accumulated value
self.counter_state.update((accumulated_value,))

# Yield a DataFrame with the key and accumulated value
yield pd.DataFrame({"id": key, "accumulated": str(accumulated_value)})

def handleInitialState(self, key, initialState, timerValues) -> None:
# Initialize the state with the provided initial value
init_val = initialState.at[0, "initVal"]
self.counter_state.update((init_val,))

def close(self) -> None:
# No cleanup needed
pass

# Load initial state from a checkpoint directory
initial_state = spark.read.format("statestore")
.option("path", "$checkpointsDir")
.load()

# Apply the stateful transformation to the input DataFrame
df.groupBy("id")
.transformWithStateInPandas(
statefulProcessor=AccumulatedCounterStatefulProcessorWithInitialState(),
outputStructType=output_schema,
outputMode="Update",
timeMode="None",
initialState=initial_state,
)
.writeStream... # Continue with stream writing configuration

Migrar a tabela Delta para o armazenamento do estado para inicialização

O Notebook a seguir contém um exemplo de inicialização de valores de armazenamento do estado de uma tabela Delta usando transformWithState em Python ou Scala.

Inicializar o estado a partir do Delta Python

Open notebook in new tab

Inicializar o estado a partir do Delta Scala

Open notebook in new tab

Sessão de acompanhamento

O Notebook a seguir contém um exemplo de acompanhamento de sessão usando transformWithState em Python ou Scala.

Sessão de acompanhamento Python

Open notebook in new tab

Sessão de acompanhamento Scala

Open notebook in new tab

Transmissão-transmissão personalizada join usando transformWithState

O código a seguir demonstra uma transmissão-transmissão personalizada join em várias transmissões usando transformWithState. O senhor pode usar essa abordagem em vez de um operador integrado join pelos seguintes motivos:

  • O senhor precisa usar o modo de saída de atualização que não suporta a união de transmissão-transmissão. Isso é especialmente útil para aplicativos de baixa latência.
  • O senhor precisa continuar a executar a união para as linhas que chegam mais tarde (após a expiração da marca d'água).
  • O senhor precisa realizar uma união de transmissão-transmissão de muitos para muitos.

Esse exemplo dá ao usuário controle total sobre a lógica de expiração do estado, permitindo a extensão dinâmica do período de retenção para lidar com eventos fora de ordem mesmo após a marca d'água.

Python
# Import necessary libraries
import pandas as pd
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import StructType, StructField, StringType, TimestampType
from typing import Iterator

# Define output schema for the joined data
output_schema = StructType([
StructField("user_id", StringType(), True),
StructField("event_type", StringType(), True),
StructField("timestamp", TimestampType(), True),
StructField("profile_name", StringType(), True),
StructField("email", StringType(), True),
StructField("preferred_category", StringType(), True)
])

class CustomStreamJoinProcessor(StatefulProcessor):
# Initialize stateful storage for user profiles, preferences, and event tracking.
def init(self, handle: StatefulProcessorHandle) -> None:

# Define schemas for different types of state data
profile_schema = StructType([
StructField("name", StringType(), True),
StructField("email", StringType(), True),
StructField("updated_at", TimestampType(), True)
])
preferences_schema = StructType([
StructField("preferred_category", StringType(), True),
StructField("updated_at", TimestampType(), True)
])
activity_schema = StructType([
StructField("event_type", StringType(), True),
StructField("timestamp", TimestampType(), True)
])

# Initialize state storage for user profiles, preferences, and activity
self.profile_state = handle.getMapState("user_profiles", "string", profile_schema)
self.preferences_state = handle.getMapState("user_preferences", "string", preferences_schema)
self.activity_state = handle.getMapState("user_activity", "string", activity_schema)

# Process incoming events and update state
def handleInputRows(self, key, rows: Iterator[pd.DataFrame], timer_values) -> Iterator[pd.DataFrame]:
df = pd.concat(rows, ignore_index=True)
output_rows = []

for _, row in df.iterrows():
user_id = row["user_id"]

if "event_type" in row: # User activity event
self.activity_state.update_value(user_id, row.to_dict())
# Set a timer to process this event after a 10-second delay
self.getHandle().registerTimer(timer_values.get_current_processing_time_in_ms() + (10 * 1000))

elif "name" in row: # Profile update
self.profile_state.update_value(user_id, row.to_dict())

elif "preferred_category" in row: # Preference update
self.preferences_state.update_value(user_id, row.to_dict())

# No immediate output; processing will happen when timer expires
return iter([])

# Perform lookup after delay, handling out-of-order and late-arriving events.
def handleExpiredTimer(self, key, timer_values, expired_timer_info) -> Iterator[pd.DataFrame]:

# Retrieve stored state for the user
user_activity = self.activity_state.get_value(key)
user_profile = self.profile_state.get_value(key)
user_preferences = self.preferences_state.get_value(key)

if user_activity:
# Combine data from different states into a single output row
output_row = {
"user_id": key,
"event_type": user_activity["event_type"],
"timestamp": user_activity["timestamp"],
"profile_name": user_profile.get("name") if user_profile else None,
"email": user_profile.get("email") if user_profile else None,
"preferred_category": user_preferences.get("preferred_category") if user_preferences else None
}
return iter([pd.DataFrame([output_row])])

return iter([])

def close(self) -> None:
# No cleanup needed
pass

# Apply transformWithState to the input DataFrame
(df.groupBy("user_id")
.transformWithStateInPandas(
statefulProcessor=CustomStreamJoinProcessor(),
outputStructType=output_schema,
outputMode="Append",
timeMode="ProcessingTime"
)
.writeStream... # Continue with stream writing configuration
)

Computação Top-K

O exemplo a seguir usa um ListState com uma fila de prioridade para manter e atualizar os K elementos principais em uma transmissão para cada grupo key em tempo real próximo.

Top-K Python

Open notebook in new tab

Top-K Scala

Open notebook in new tab