メインコンテンツまでスキップ

ステートフルなアプリケーションの例

この記事には、カスタム ステートフル アプリケーションのコード例が含まれています。 Databricks では、集計や結合などの一般的な操作に組み込みのステートフル メソッドを使用することをお勧めします。

この記事のパターンでは、Databricks Runtime 16.2 以降のパブリック プレビューで使用できる transformWithState 演算子と関連クラスを使用します。「カスタム ステートフル アプリケーションの構築」を参照してください。

注記

Python は transformWithStateInPandas 演算子を使用して同じ機能を提供します。 以下の例は、Python と Scala のコードを示しています。

必要条件

transformWithState演算子と関連する APIs およびクラスには、次の要件があります。

  • Databricks Runtime 16.2 以降で使用できます。
  • コンピュートは、専用アクセスモードまたは非分離アクセスモードを使用する必要があります。
  • RocksDB状態ストア プロバイダーを使用する必要があります。Databricks では、コンピュート構成の一部として RocksDB を有効にすることをお勧めします。
  • transformWithStateInPandas Databricks Runtime 16.3 以降で標準アクセス モードをサポートします。
注記

現在のセッションで RocksDB 状態ストア プロバイダーを有効にするには、次のコマンドを実行します。

Python
spark.conf.set("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")

緩やかに変化する寸法(SCD)タイプ1

次のコードは、 transformWithStateを使用して SCD タイプ 1 を実装する例です。 SCD タイプ 1 は、特定のフィールドの最新の値のみを追跡します。

注記

ストリーミング テーブルとストリーミング APPLY CHANGES INTO を使用して、Delta Lake でサポートされるテーブルを使用して SCD タイプ 1 またはタイプ 2 を実装できます。 この例では SCD 状態ストアにタイプ 1 を実装し、リアルタイムに近いアプリケーションの待機時間を短縮します。

Python
# Import necessary libraries
import pandas as pd
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import StructType, StructField, LongType, StringType
from typing import Iterator

# Set the state store provider to RocksDB
spark.conf.set("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")

# Define the output schema for the streaming query
output_schema = StructType([
StructField("user", StringType(), True),
StructField("time", LongType(), True),
StructField("location", StringType(), True)
])

# Define a custom StatefulProcessor for slowly changing dimension type 1 (SCD1) operations
class SCDType1StatefulProcessor(StatefulProcessor):
def init(self, handle: StatefulProcessorHandle) -> None:
# Define the schema for the state value
value_state_schema = StructType([
StructField("user", StringType(), True),
StructField("time", LongType(), True),
StructField("location", StringType(), True)
])
# Initialize the state to store the latest location for each user
self.latest_location = handle.getValueState("latestLocation", value_state_schema)

def handleInputRows(self, key, rows, timer_values) -> Iterator[pd.DataFrame]:
# Find the row with the maximum time value
max_row = None
max_time = float('-inf')
for pdf in rows:
for _, pd_row in pdf.iterrows():
time_value = pd_row["time"]
if time_value > max_time:
max_time = time_value
max_row = tuple(pd_row)

# Check if state exists and update if necessary
exists = self.latest_location.exists()
if not exists or max_row[1] > self.latest_location.get()[1]:
# Update the state with the new max row
self.latest_location.update(max_row)
# Yield the updated row
yield pd.DataFrame(
{"user": (max_row[0],), "time": (max_row[1],), "location": (max_row[2],)}
)
# Yield an empty DataFrame if no update is needed
yield pd.DataFrame()

def close(self) -> None:
# No cleanup needed
pass

# Apply the stateful transformation to the input DataFrame
(df.groupBy("user")
.transformWithStateInPandas(
statefulProcessor=SCDType1StatefulProcessor(),
outputStructType=output_schema,
outputMode="Update",
timeMode="None",
)
.writeStream... # Continue with stream writing configuration
)

緩やかに変化する寸法 (SCD) タイプ2

次のノートブックには、Python または Scala で transformWithState を使用して SCD タイプ 2 を実装する例が含まれています。

SCD タイプ2 Python

Open notebook in new tab

SCD タイプ2 Scala

Open notebook in new tab

ダウンタイム検出器

transformWithState タイマーを実装して、特定のキーのレコードがマイクロバッチで処理されない場合でも、経過時間に基づいてアクションを実行できるようにします。

次の例では、ダウンタイム検出機能のパターンを実装します。 特定のキーに新しい値が表示されるたびに、 lastSeen 状態値が更新され、既存のタイマーがクリアされ、将来のタイマーがリセットされます。

タイマーの期限が切れると、アプリケーションはキーについて最後に観測されたイベントからの経過時間を出力します。 その後、新しいタイマーを設定して、10 秒後に更新を出力します。

Python
import datetime
import time

class DownTimeDetectorStatefulProcessor(StatefulProcessor):
def init(self, handle: StatefulProcessorHandle) -> None:
# Define schema for the state value (timestamp)
state_schema = StructType([StructField("value", TimestampType(), True)])
self.handle = handle
# Initialize state to store the last seen timestamp for each key
self.last_seen = handle.getValueState("last_seen", state_schema)

def handleExpiredTimer(self, key, timerValues, expiredTimerInfo) -> Iterator[pd.DataFrame]:
latest_from_existing = self.last_seen.get()
# Calculate downtime duration
downtime_duration = timerValues.getCurrentProcessingTimeInMs() - int(time.time() * 1000)
# Register a new timer for 10 seconds in the future
self.handle.registerTimer(timerValues.getCurrentProcessingTimeInMs() + 10000)
# Yield a DataFrame with the key and downtime duration
yield pd.DataFrame(
{
"id": key,
"timeValues": str(downtime_duration),
}
)

def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
# Find the row with the maximum timestamp
max_row = max((tuple(pdf.iloc[0]) for pdf in rows), key=lambda row: row[1])

# Get the latest timestamp from existing state or use epoch start if not exists
if self.last_seen.exists():
latest_from_existing = self.last_seen.get()
else:
latest_from_existing = datetime.fromtimestamp(0)

# If new data is more recent than existing state
if latest_from_existing < max_row[1]:
# Delete all existing timers
for timer in self.handle.listTimers():
self.handle.deleteTimer(timer)
# Update the last seen timestamp
self.last_seen.update((max_row[1],))

# Register a new timer for 5 seconds in the future
self.handle.registerTimer(timerValues.getCurrentProcessingTimeInMs() + 5000)

# Get current processing time in milliseconds
timestamp_in_millis = str(timerValues.getCurrentProcessingTimeInMs())

# Yield a DataFrame with the key and current timestamp
yield pd.DataFrame({"id": key, "timeValues": timestamp_in_millis})

def close(self) -> None:
# No cleanup needed
pass

既存の状態情報を移行する

次の例は、初期状態を受け入れるステートフル アプリケーションを実装する方法を示しています。 初期状態処理は任意のステートフル アプリケーションに追加できますが、初期状態はアプリケーションを最初に初期化するときにのみ設定できます。

この例では、 statestore リーダーを使用して、チェックポイント パスから既存の状態情報を読み込みます。 このパターンの使用例として、従来のステートフルアプリケーションから transformWithStateへの移行があります。

Python
# Import necessary libraries
import pandas as pd
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import StructType, StructField, LongType, StringType, IntegerType
from typing import Iterator

# Set RocksDB as the state store provider for better performance
spark.conf.set("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")

"""
Input schema is as below

input_schema = StructType(
[StructField("id", StringType(), True)],
[StructField("value", StringType(), True)]
)
"""

# Define the output schema for the streaming query
output_schema = StructType([
StructField("id", StringType(), True),
StructField("accumulated", StringType(), True)
])

class AccumulatedCounterStatefulProcessorWithInitialState(StatefulProcessor):

def init(self, handle: StatefulProcessorHandle) -> None:
# Define schema for the state value (integer)
state_schema = StructType([StructField("value", IntegerType(), True)])
# Initialize state to store the accumulated counter for each id
self.counter_state = handle.getValueState("counter_state", state_schema)
self.handle = handle

def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
# Check if state exists for the current key
exists = self.counter_state.exists()
if exists:
value_row = self.counter_state.get()
existing_value = value_row[0]
else:
existing_value = 0

accumulated_value = existing_value

# Process input rows and accumulate values
for pdf in rows:
value = pdf["value"].astype(int).sum()
accumulated_value += value

# Update the state with the new accumulated value
self.counter_state.update((accumulated_value,))

# Yield a DataFrame with the key and accumulated value
yield pd.DataFrame({"id": key, "accumulated": str(accumulated_value)})

def handleInitialState(self, key, initialState, timerValues) -> None:
# Initialize the state with the provided initial value
init_val = initialState.at[0, "initVal"]
self.counter_state.update((init_val,))

def close(self) -> None:
# No cleanup needed
pass

# Load initial state from a checkpoint directory
initial_state = spark.read.format("statestore")
.option("path", "$checkpointsDir")
.load()

# Apply the stateful transformation to the input DataFrame
df.groupBy("id")
.transformWithStateInPandas(
statefulProcessor=AccumulatedCounterStatefulProcessorWithInitialState(),
outputStructType=output_schema,
outputMode="Update",
timeMode="None",
initialState=initial_state,
)
.writeStream... # Continue with stream writing configuration

初期化のために Delta テーブルを状態ストアに移行する

次のノートブックには、Delta transformWithStatePythonまたは の を使用して テーブルから状態ストアの値を初期化する例が含まれています。Scala

Delta Python から状態を初期化する

Open notebook in new tab

Delta Scalaから状態を初期化する

Open notebook in new tab

セッション追跡

次のノートブックには、Python または Scala の transformWithState を使用したセッション追跡の例が含まれています。

セッション追跡 Python

Open notebook in new tab

セッショントラッキング Scala

Open notebook in new tab

カスタムストリーム-ストリーム結合 transformWithState

次のコードは、 transformWithState. 次の理由により、組み込みの結合演算子の代わりにこの方法を使用できます。

  • ストリーム-ストリーム結合をサポートしていない更新出力モードを使用する必要があります。 これは、低レイテンシのアプリケーションに特に役立ちます。
  • 遅れて到着した行 (ウォーターマークの有効期限が切れた後) の結合を引き続き実行する必要があります。
  • 多対多のストリーム-ストリーム結合を実行する必要があります。

この例では、ユーザーが状態の有効期限ロジックを完全に制御できるため、ウォーターマークの後でも順不同のイベントを処理するための動的な保持期間の延長が可能になります。

Python
# Import necessary libraries
import pandas as pd
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import StructType, StructField, StringType, TimestampType
from typing import Iterator

# Define output schema for the joined data
output_schema = StructType([
StructField("user_id", StringType(), True),
StructField("event_type", StringType(), True),
StructField("timestamp", TimestampType(), True),
StructField("profile_name", StringType(), True),
StructField("email", StringType(), True),
StructField("preferred_category", StringType(), True)
])

class CustomStreamJoinProcessor(StatefulProcessor):
# Initialize stateful storage for user profiles, preferences, and event tracking.
def init(self, handle: StatefulProcessorHandle) -> None:

# Define schemas for different types of state data
profile_schema = StructType([
StructField("name", StringType(), True),
StructField("email", StringType(), True),
StructField("updated_at", TimestampType(), True)
])
preferences_schema = StructType([
StructField("preferred_category", StringType(), True),
StructField("updated_at", TimestampType(), True)
])
activity_schema = StructType([
StructField("event_type", StringType(), True),
StructField("timestamp", TimestampType(), True)
])

# Initialize state storage for user profiles, preferences, and activity
self.profile_state = handle.getMapState("user_profiles", "string", profile_schema)
self.preferences_state = handle.getMapState("user_preferences", "string", preferences_schema)
self.activity_state = handle.getMapState("user_activity", "string", activity_schema)

# Process incoming events and update state
def handleInputRows(self, key, rows: Iterator[pd.DataFrame], timer_values) -> Iterator[pd.DataFrame]:
df = pd.concat(rows, ignore_index=True)
output_rows = []

for _, row in df.iterrows():
user_id = row["user_id"]

if "event_type" in row: # User activity event
self.activity_state.update_value(user_id, row.to_dict())
# Set a timer to process this event after a 10-second delay
self.getHandle().registerTimer(timer_values.get_current_processing_time_in_ms() + (10 * 1000))

elif "name" in row: # Profile update
self.profile_state.update_value(user_id, row.to_dict())

elif "preferred_category" in row: # Preference update
self.preferences_state.update_value(user_id, row.to_dict())

# No immediate output; processing will happen when timer expires
return iter([])

# Perform lookup after delay, handling out-of-order and late-arriving events.
def handleExpiredTimer(self, key, timer_values, expired_timer_info) -> Iterator[pd.DataFrame]:

# Retrieve stored state for the user
user_activity = self.activity_state.get_value(key)
user_profile = self.profile_state.get_value(key)
user_preferences = self.preferences_state.get_value(key)

if user_activity:
# Combine data from different states into a single output row
output_row = {
"user_id": key,
"event_type": user_activity["event_type"],
"timestamp": user_activity["timestamp"],
"profile_name": user_profile.get("name") if user_profile else None,
"email": user_profile.get("email") if user_profile else None,
"preferred_category": user_preferences.get("preferred_category") if user_preferences else None
}
return iter([pd.DataFrame([output_row])])

return iter([])

def close(self) -> None:
# No cleanup needed
pass

# Apply transformWithState to the input DataFrame
(df.groupBy("user_id")
.transformWithStateInPandas(
statefulProcessor=CustomStreamJoinProcessor(),
outputStructType=output_schema,
outputMode="Append",
timeMode="ProcessingTime"
)
.writeStream... # Continue with stream writing configuration
)

Top-K 計算

次の例では、優先キューを持つ ListState を使用して、各グループ キーのストリーム内の上位 K 要素をほぼリアルタイムで維持および更新します。

トップKの Python

Open notebook in new tab

トップKの Scala

Open notebook in new tab