カスタムステートフルアプリケーションの構築
transformWithState を使用してステートフル ストリーミング アプリケーションを構築し、低レイテンシーのソリューションとほぼリアルタイムのソリューションを実装できます。カスタムステートフル演算子を使用することで、従来の構造化ストリーミング処理では実現できなかった新しい運用ユースケースを構築するための、任意のステートフルロジックを作成することが可能です。
集計、重複排除、ストリーミング結合などのステートフル操作には、Databricks はカスタムロジックの代わりに組み込みの構造化ストリーミング演算子を使用することを推奨しています。ステートフル ストリーミングとはを参照してください。
Databricks では、任意のステート変換を行う場合は、レガシー演算子(flatMapGroupsWithState、mapGroupsWithState など)の代わりに transformWithState を使用することをお勧めします。レガシー任意のステートフル演算子を参照してください。
必要条件
transformWithState と transformWithStateInPandas 演算子には、次の要件があります。
-
Databricks Runtime 16.2 以降で使用できます。
- リアルタイムモードの場合、Databricks Runtime 17.3 LTS以降を使用してください。構造化ストリーミングのリアルタイムモードを参照してください。
- 標準アクセスモードでは、Python は Databricks Runtime 16.3 以降で使用でき、Scala は Databricks Runtime 17.3 以降で使用できます。
-
RocksDBは、Databricks Runtime 17.3以降のデフォルトの状態ストアプロバイダーです。
-
Databricks Runtime 17.2以前の場合、RocksDB状態ストアプロバイダーを構成する必要があります。Databricks は、Spark 構成で RocksDB を有効にすることを推奨しています。
Pythonspark.conf.set("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
-
transformWithState とは
transformWithState演算子は、カスタムステートフルプロセッサを構造化ストリーミングクエリに適用します。transformWithStateを使用するには、カスタムステートフルプロセッサを実装する必要があります。構造化ストリーミングには、API Python、Scala 、または を使用してステートフルプロセッサを構築するためのJava が含まれています。
transformWithState を使用して、グループ化キーにカスタムロジックを適用します。次の高レベル設計について説明します。
- 1 つ以上の状態変数を定義します。
- 状態情報は各グループ化キーに保持されます。ユーザー定義コードで各状態変数にアクセスできます。
- 処理される各マイクロバッチでは、キーのすべての行がイテレータとして利用できます。
StatefulProcessorHandleとタイマー、ユーザー定義の条件を使用して、行の出力方法を制御できます。- 状態の有効期限とサイズを管理するために、状態値は個別のTime-To-Live(TTL)定義をサポートします。
transformWithState が状態ストアでのスキーマ進化をサポートしているため、過去の状態情報を失うことなく本番運用アプリケーションを繰り返し更新できます。状態スキーマを更新した後、行を再処理する必要はありません。これにより、コードのデプロイとメンテナンスが簡素化されます。状態ストアにおけるスキーマ進化を参照してください。
Databricksドキュメントでは、transformWithStateを使用して、PythonおよびScalaの両方の実装について説明しています:
-
PySpark は、行ベースの
transformWithStateAPI と PandasベースのtransformWithStateInPandasオペレーターの両方をサポートしています。transformWithStateInPandasリアルタイム モードではサポートされていません。代わりにtransformWithStateを使用してください。詳細については、「リアルタイムモードでのtransformWithState」をご参照ください。
-
Scalaは行ベースの
transformWithStateAPIのみをサポートしています。
transformWithState の Scala および Python の実装は同じ機能を有していますが、構文にはいくつかの相違点があります。
定義 StatefulProcessor
ステートフルプロセッサを定義するには、StatefulProcessorクラスを拡張し、そのメソッドを実装します。
Spark は StatefulProcessorHandle を StatefulProcessor の init メソッドに渡します。ハンドルを使用して状態変数を作成し、状態ストアを操作します。
transformWithState 3つの状態タイプ:ValueState、ListState、およびMapStateに対応しています。各タイプは、グループ化キーごとに異なる基盤となるデータ構造を使用して状態を格納します。
カスタムロジックを定義するには、以下のメソッドを実装します。
handleInputRowsを実装して、アプリケーションがデータを処理し、状態を更新し、各マイクロバッチについて行を出力する方法を制御します。入力行の処理を参照してください。handleExpiredTimerを実装して、グループ化キーがマイクロバッチで新しい行を受け取るかどうかにかかわらず、時間ベースのロジックを実行します。「期限切れタイマーの処理」を参照してください。- 必要に応じて、アプリケーションが入力行を処理する前に状態を事前設定するために、
handleInitialStateを実装します。「初期状態の処理」を参照してください。
次の表は、これらのメソッドの機能的な動作を比較しています。
挙動 |
|
|
|---|---|---|
状態値の取得、配置、更新、またはクリア | あり | あり |
タイマーを作成または削除する | あり | あり |
行の出力 | あり | あり |
現在のマイクロバッチ内の行を反復処理します。 | あり | いいえ |
経過時間に基づくトリガーロジック | いいえ | あり |
handleInputRows と handleExpiredTimer を組み合わせることで、必要に応じて複雑なロジックを実装できます。
たとえば、handleInputRowsを使用して各マイクロバッチの状態値を更新し、10 秒後にタイマーを設定するアプリケーションを実装できます。追加の行が処理されない場合、handleExpiredTimer を使用して状態ストア内の現在の値を出力できます。グルーピングキーに対して新しい行が処理される場合、既存のタイマーをクリアし、新しいタイマーを設定できます。
StatefulProcessorHandle
PySpark では、StatefulProcessorHandle クラスを使用すると、コードが状態情報をどのように使用するかを制御する関数にアクセスできます。
StatefulProcessorを初期化する際は、常にStatefulProcessorHandleをインポートし、handle変数に渡す必要があります。handle 変数は、Python クラス内のローカル変数を状態変数に関連付けます。
Scala は getHandle メソッドを使用します。
カスタム状態の種類
単一のステートフル演算子で複数の状態オブジェクトを実装できます。
完全なアプリケーションロジックに基づいて、状態タイプを選択します。たとえば、ValueState を使用して、user_id と session_id でグループ化されたセッションを追跡できます。または、複数のセッションにわたって条件を評価するには、session_id をマップキーとして、user_id でグループ化された MapState を使用します。
状態オブジェクトでStructTypeを使用する場合は、スキーマの構造体内の各フィールドに一意の名前を定義する必要があります。これらの名称は、状態ストアを読み込む際に表示されます。「構造化ストリーミングの状態情報の読み取り」を参照してください。
次のセクションでは、transformWithState によってサポートされる状態タイプについて説明します:
ValueState
ValueState 各グループ化キーに値を格納します。
値の状態には、構造体やタプルなどの複雑な型を含めることができます。ValueState については、値全体を置き換えるロジックを実装する必要があります。
値が更新されると、値のステートの有効期間はリセットされます。ValueState のソースキーを処理し、保存されている ValueState を更新しない場合、Time-to-Live はリセットされません。
ListState
ListState グルーピングキーごとにリストを格納します。
リスト状態は値のコレクションであり、それぞれが複雑な型を含むことができます。リスト内の各値には独自の有効期間があります。
個別の項目を追加するか、項目のリストを追加するか、またはputを使用してリスト全体を上書きすることで、リストに項目を追加できます。Time-to-Liveをリセットするには、put操作を使用する必要があります。
MapState
MapState 各グループ化キーにマップを格納します。マップは、Python の辞書に相当する Apache Spark のものです (dict)。
マップの状態は、それぞれが値にマッピングされる異なるキーのコレクションであり、各値には複雑な型を含めることができます。マップ内の各キーと値のペアには、独自の有効期限があります。
特定のキーの値を更新できます、またはキーとその値を削除できます。キーを使用して個々の値を返す、すべてのキーをリストする、すべての値をリストする、またはマップ内のキーと値のペアの完全なセットを操作するためにイテレーターを返すことができます。
グルーピングキーは、構造化ストリーミングクエリのGROUP BY句で指定されたフィールドを記述します。マップの状態は、グループ化キーの任意の数のキーと値のペアを含めることができます。
たとえば、クエリで GROUP BY user_id を使用し、session_id ごとにマップを定義する場合、グループ化キーは user_id であり、MapState キーは session_id です:
- Python
- Scala
class SessionTracker(StatefulProcessor):
def init(self, handle: StatefulProcessorHandle) -> None:
self.sessions = handle.getMapState("sessions", StringType(), LongType())
def handleInputRows(self, key, rows: Iterator[Row], timerValues) -> Iterator[Row]:
for row in rows:
session_id = row["session_id"] # session_id is the MapState key
count = self.sessions.getValue(session_id)[0] if self.sessions.containsKey(session_id) else 0
new_count = count + 1
self.sessions.updateValue(session_id, (new_count,))
yield from []
def close(self) -> None:
pass
df.groupBy("user_id").transformWithState(SessionTracker(), ...) # user_id is the grouping key
case class Event(userId: String, sessionId: String)
class SessionTracker extends StatefulProcessor[String, Event, Row] {
@transient private var sessions: MapState[String, Long] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
sessions = getHandle.getMapState[String, Long]("sessions", Encoders.STRING, Encoders.scalaLong, TTLConfig.NONE)
}
override def handleInputRows(
key: String,
rows: Iterator[Event],
timerValues: TimerValues): Iterator[Row] = {
rows.foreach { event =>
val count = if (sessions.containsKey(event.sessionId)) sessions.getValue(event.sessionId) else 0L
sessions.updateValue(event.sessionId, count + 1) // sessionId is the MapState key
}
Iterator.empty
}
}
df.as[Event]
.groupByKey(_.userId) // userId is the grouping key
.transformWithState(new SessionTracker(), TimeMode.None(), OutputMode.Update())
カスタム状態変数を作成 StatefulProcessor
StatefulProcessor を初期化すると、カスタムロジックで状態オブジェクトを操作できる、各状態オブジェクトのローカル変数が作成されます。StatefulProcessor クラスの組み込みの init メソッドをオーバーライドして、状態変数を定義および初期化します。
StatefulProcessorで、getValueState、getListState、およびgetMapStateのメソッドを使用して、任意の数の状態オブジェクトを定義できます。
各状態オブジェクトには、次のものが必要です。
- 一意の名前
- スキーマ
- Pythonでは、スキーマを指定する必要があります。
- Scalaでは、
Encoderを渡すことで状態スキーマを指定できます。
オプションで、有効期間(TTL)をミリ秒単位で指定することもできます。マップステートを実装する際は、マップキーと値に対して個別のスキーマ定義を指定する必要があります。
StatefulProcessor は、クエリ、更新、状態情報の出力に対してロジックを個別に処理します。カスタム・ロジックを使用するメソッドで状態変数を使用するを参照してください。
カスタムロジックを持つメソッドで状態変数を使用する
State オブジェクトは、状態の取得、既存の状態情報の更新、および現在の状態のクリアを行うためのメソッドを備えています。
各グルーピングキーには専用のステート情報があります。
StatefulProcessorは、お客様のカスタムロジックおよび指定された出力スキーマに基づいて行を出力します。行の発行を参照してください。- 「
statestore」リーダーを使用して、状態ストアの値にアクセスします。このリーダーはバッチワークロードを対象としており、低レイテンシーのワークロードには適していません。「構造化ストリーミングの状態情報の読み取り」を参照してください。 handleInputRowsで指定されたロジックは、キーの行がマイクロバッチ内に存在する場合にのみ実行されます。入力行の処理を参照してください。handleExpiredTimerを使用して、行の監視に依存せずに起動する時間ベースのロジックを実装します。「期限切れタイマーの処理」を参照してください。
状態オブジェクトは、次の意味を持つグループ化キーによって分離されます。
- 状態値は、異なるグループ化キーに関連付けられた行によって影響されません。
- グループ化キー間での値の比較や状態の更新に依存するロジックは実装 できません 。
グループ化キー内で値を比較できます。MapStateを使用して、カスタムロジックが使用できるセカンドキーを持つロジックを実装します。たとえば、user_id でグループ化し、MapState のキーに ip_address を使用すると、同時ユーザーセッションを追跡できます。
状態の操作に関する高度な考慮事項
状態更新は耐障害性です。マイクロバッチの処理が完了する前にタスクがクラッシュした場合、再試行では最後の正常なマイクロバッチからの値が使用されます。
パフォーマンスを最適化するために、Databricks は、指定されたキーのイテレーター内のすべての値を処理し、更新を1回の書き込みでコミットすることをお勧めします。状態変数に書き込むと、これにより、RocksDBへの書き込みがトリガーされます。
状態の値にはデフォルトがありません。ロジックで既存のステート情報を読み取る必要がある場合は、exists メソッドを使用します。
null状態のロジックを実装するには、MapState変数を使用すると、個々のキーを確認したり、すべてのキーを一覧表示したりできます。
入力行の処理
handleInputRows メソッドを使用して、アプリケーションが行を処理し、状態値を更新する方法を定義します。このメソッドは、構造化ストリーミングクエリがグループ化キーに対して行を処理するたびに実行されます。
transformWithStateで実装されたほとんどのステートフル アプリケーションでは、コア ロジックは handleInputRowsを使用して定義されます。
処理される各マイクロバッチの更新ごとに、特定のグループ化キーに対するマイクロバッチ内のすべての行は、イテレーターを使用して取得できます。ユーザー定義ロジックは、現在のマイクロバッチのすべての行およびステートストアの値とやり取りできます。
期限切れタイマーの処理
handleExpiredTimerメソッドを使用して、経過時間に基づいてカスタムロジックを実装します。
グループ化キー内では、タイマーはタイムスタンプによって一意に識別されます。
タイマーの有効期限が切れると、結果はアプリケーションに実装されたロジックによって決定されます。 一般的なパターンは次のとおりです。
- 状態変数に格納された情報を出力します。
- 保存された状態情報の削除。
- 新しいタイマーを作成します。
期限切れのタイマーは、関連付けられたキーの行がマイクロバッチで処理されなくても作動します。
時刻モードを指定します。
StatefulProcessor を transformWithState に渡す際は、timeMode パラメーターを使用して、時間モードを指定する必要があります。
次のオプションがサポートされています。
時間モード | 説明 |
|---|---|
| タイマーとTTLは両方ともサポートされており、Apache Sparkが各マイクロバッチを処理する際のウォールクロック時間に基づいて評価されます。データ内のタイムスタンプに関わらず、行が処理されるタイミングと相対的に、タイマーを固定間隔で起動させたい場合に |
| タイマーはサポートされており、イベント時間のウォーターマークに基づいて評価されます。Apache Spark が入力データ内のタイムスタンプを観測するにつれて、ウォーターマークが進行します。TTL は |
| タイマーとTTLはサポートされていません。ステートフルアプリケーションが時間ベースのロジックを必要としない場合は、 |
eventTimeColumnName
EventTimeタイムモードを使用する場合、eventTimeColumnNameパラメーターは、出力スキーマでイベントタイムスタンプが含まれる列の名前を指定します。Apache Spark は、ウォーターマークをこの列で出力ストリームに伝播させることで、ダウンストリームの適切な時間ベースの操作を可能にします。
- Python
- Scala
eventTimeColumnName transformWithState または transformWithStateInPandas の追加の引数です:
q = (
df.groupBy("key")
.transformWithState(
statefulProcessor=MyProcessor(),
outputStructType=output_schema,
outputMode="Append",
timeMode="EventTime",
eventTimeColumnName="outputTimestamp",
)
.writeStream...
)
transformWithState timeMode の代わりに eventTimeColumnName を受け入れます。このアプローチは常にEventTimeモードを使用します。
val q = spark
.readStream
.format("delta")
.load(srcDeltaTableDir)
.as[(String, String)]
.groupByKey(x => x._1)
.transformWithState(
new MyProcessor(),
"outputTimestamp",
OutputMode.Append(),
)
.writeStream...
組み込みのタイマー値
Databricks では、タスクの失敗時に信頼性の低い再試行につながる可能性があるため、カスタム ステートフル アプリケーションでシステム クロックを呼び出さないことをお勧めします。 処理時間またはウォーターマークにアクセスする必要がある場合は、 TimerValues クラスのメソッドを使用します。
| 説明 |
|---|---|
| 現在のバッチの処理時間のタイムスタンプをエポックからのミリ秒単位で返します。 |
| 現在のバッチのウォーターマークのタイムスタンプを、エポックからのミリ秒単位で返します。 |
処理時間は、マイクロバッチが Apache Spark によって処理される時間を示します。 Kafka などの多くのストリーミング ソースには、システムの処理時間も含まれています。
ストリーミング クエリのウォーターマークは、多くの場合、イベント時間またはストリーミング ソースの処理時間に対して定義されます。 ウォーターマークを適用してデータ処理のしきい値を制御するを参照してください。
ウォーターマークとウィンドウはどちらも、 transformWithStateと組み合わせて使用 できます。 カスタム ステートフル アプリケーションに同様の機能を実装するには、TTL、タイマー、 MapState または ListState 機能を活用します。
ステートタイプの有効期間(TTL)
メモリ不足エラーを防ぎ、古いステートタイプの値を削除するために、transformWithStateは各ステートタイプの値に対してオプションの有効期間(TTL)値をサポートしています。有効期限後、TTLによりステート型値は自動的に削除されます。TTL は handleExpiredTimer またはカスタム ロジックを実行しません。ステートが期限切れになったときにコードを実行するには、代わりにタイマーを使用してください。
TTLを実装しない場合、メモリ不足エラーを回避するには、状態の消去を処理する必要があります。
すべての状態タイプにおいて、状態情報が更新されると、TTLはリセットされます。TTL は各状態タイプの値に適用され、状態タイプごとに異なるルールが設定されています:
-
状態変数のスコープはグループ化キーです。
-
ValueStateオブジェクトの場合、グループ化キーごとに 1 つの値のみが格納されます。TTL はこの値に適用されます。 -
ListStateオブジェクトの場合、リストには多くの値を含めることができます。TTL は、リスト内の各値に個別に適用されます。- TTLは
ListState内の個別の値にスコープ設定されていますが、個別の値を更新する唯一の方法は、putメソッドを使用することです。このメソッドは、ListState変数の内容全体を上書きし、リスト内のすべての値のTTLをリセットします。
- TTLは
-
MapStateオブジェクトの場合、各マップキーには状態値が関連付けられています。TTL は、マップ内の各キーと値のペアに個別に適用されます。
タイマーを使用すると、状態の削除を超えたカスタムロジック (行の出力など) を定義できます。必要に応じて、タイマーを使用して、特定の状態値の状態情報をクリアしたり、値を出力したり、条件ロジックをトリガーしたりできます。「期限切れタイマーの処理」を参照してください。
ステートフルなアプリケーションの例
次の例では、例の状態変数を含むカスタムステートフルプロセッサ `SimpleCounterProcessor` を定義します。SimpleCounterProcessor は、各グルーピングキーの行数をカウントするために、ValueState、ListState、および MapState を使用します。
- Python (Pandas)
- Python (row-based)
- Scala
import pandas as pd
from pyspark.sql import Row
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from typing import Iterator
spark.conf.set("spark.sql.streaming.stateStore.providerClass","org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
output_schema = StructType(
[
StructField("id", StringType(), True),
StructField("countAsString", StringType(), True),
]
)
class SimpleCounterProcessor(StatefulProcessor):
def init(self, handle: StatefulProcessorHandle) -> None:
value_state_schema = StructType([StructField("count", IntegerType(), True)])
list_state_schema = StructType([StructField("count", IntegerType(), True)])
self.value_state = handle.getValueState(stateName="valueState", schema=value_state_schema)
self.list_state = handle.getListState(stateName="listState", schema=list_state_schema)
# Schema can also be defined using strings and SQL DDL syntax
self.map_state = handle.getMapState(stateName="mapState", userKeySchema="name string", valueSchema="count int")
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
count = 0
for pdf in rows:
list_state_rows = [(120,), (20,)] # A list of tuples
self.list_state.put(list_state_rows)
self.list_state.appendValue((111,))
self.list_state.appendList(list_state_rows)
pdf_count = pdf.count()
count += pdf_count.get("value")
self.value_state.update((count,)) # Count is passed as a tuple
iter = self.list_state.get()
list_state_value = next(iter)[0]
value = count
user_key = ("user_key",)
if self.map_state.exists():
if self.map_state.containsKey(user_key):
value += self.map_state.getValue(user_key)[0]
self.map_state.updateValue(user_key, (value,)) # Value is a tuple
yield pd.DataFrame({"id": key, "countAsString": str(count)})
q = (df.groupBy("key")
.transformWithStateInPandas(
statefulProcessor=SimpleCounterProcessor(),
outputStructType=output_schema,
outputMode="Update",
timeMode="None",
)
.writeStream...
)
from pyspark.sql import Row
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from typing import Iterator
spark.conf.set("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
output_schema = StructType(
[
StructField("id", StringType(), True),
StructField("countAsString", StringType(), True),
]
)
class SimpleCounterProcessor(StatefulProcessor):
def init(self, handle: StatefulProcessorHandle) -> None:
value_state_schema = StructType([StructField("count", IntegerType(), True)])
list_state_schema = StructType([StructField("count", IntegerType(), True)])
self.value_state = handle.getValueState(stateName="valueState", schema=value_state_schema)
self.list_state = handle.getListState(stateName="listState", schema=list_state_schema)
self.map_state = handle.getMapState(stateName="mapState", userKeySchema="name string", valueSchema="count int")
def handleInputRows(self, key, rows: Iterator[Row], timerValues) -> Iterator[Row]:
count = 0
for row in rows:
list_state_rows = [(120,), (20,)] # A list of tuples
self.list_state.put(list_state_rows)
self.list_state.appendValue((111,))
self.list_state.appendList(list_state_rows)
count += 1
self.value_state.update((count,)) # Count is passed as a tuple
iter_list = self.list_state.get()
list_state_value = next(iter_list)[0]
value = count
user_key = ("user_key",)
if self.map_state.exists():
if self.map_state.containsKey(user_key):
value += self.map_state.getValue(user_key)[0]
self.map_state.updateValue(user_key, (value,)) # Value is a tuple
yield Row(id=key, countAsString=str(count))
q = (
df.groupBy("key")
.transformWithState(
statefulProcessor=SimpleCounterProcessor(),
outputStructType=output_schema,
outputMode="Update",
timeMode="None",
)
.writeStream...
)
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.{Dataset, Encoder, Encoders , DataFrame}
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
spark.conf.set("spark.sql.streaming.stateStore.providerClass","org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
class SimpleCounterProcessor extends StatefulProcessor[String, (String, String), (String, String)] {
@transient private var countState: ValueState[Int] = _
@transient private var listState: ListState[Int] = _
@transient private var mapState: MapState[String, Int] = _
private val longEncoder = Encoders.scalaLong
private val intEncoder = Encoders.scalaInt
private val stringEncoder = Encoders.STRING
override def init(
outputMode: OutputMode,
timeMode: TimeMode): Unit = {
countState = getHandle.getValueState[Int]("countState",
intEncoder, TTLConfig.NONE)
listState = getHandle.getListState[Int]("listState",
intEncoder, TTLConfig.NONE)
mapState = getHandle.getMapState[String, Int]("mapState",
stringEncoder, intEncoder, TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[(String, String)],
timerValues: TimerValues): Iterator[(String, String)] = {
var count = countState.getOption().getOrElse(0)
for (row <- inputRows) {
val listData = Array(120, 20)
listState.put(listData)
listState.appendValue(count)
listState.appendList(listData)
count += 1
}
val iter = listState.get()
var listStateValue = 0
if (iter.hasNext) {
listStateValue = iter.next()
}
countState.update(count)
var value = count
val userKey = "userKey"
if (mapState.exists()) {
if (mapState.containsKey(userKey)) {
value += mapState.getValue(userKey)
}
}
mapState.updateValue(userKey, value)
Iterator((key, count.toString))
}
}
val q = spark
.readStream
.format("delta")
.load("$srcDeltaTableDir")
.as[(String, String)]
.groupByKey(x => x._1)
.transformWithState(
new SimpleCounterProcessor(),
TimeMode.None(),
OutputMode.Update(),
)
.writeStream...
その他の例については、「ステートフルなアプリケーションの例」を参照してください。
Pythonでは、状態の値はタプルです。put および update にタプルを渡し、get からタプルが返されることを想定します。
たとえば、ValueStateのスキーマが単一の整数であるとします。
current_value_tuple = value_state.get() # Returns the value state as a tuple
current_value = current_value_tuple[0] # Extracts the first item in the tuple
new_value = current_value + 1 # Calculate a new value
value_state.update((new_value,)) # Pass the new value formatted as a tuple
このアプローチは、ListState内の項目、またはMapState内の値にも使用してください。
行を出力
transformWithState が各グルーピングキーに対して行をどのように出力するかを定義するには、handleInputRows または handleExpiredTimer を使用する必要があります。入力行の処理と期限切れタイマーの処理を参照してください。
カスタム ステートフル アプリケーションは、状態情報の使用方法に関して、いかなる前提も設けません。特定の条件の場合、アプリケーションは行を出力しないこと、1行を出力すること、または複数の行を出力することがあります。
複数の状態値を実装し、行を出力するための複数の条件を定義できますが、すべての行は同じスキーマを使用する必要があります。
- Python (Pandas)
- Python (row-based)
- Scala
transformWithStateInPandas を使用して、outputStructType キーワードで出力スキーマを定義します。
Pandas DataFrame オブジェクトと yield を使用して行を出力します。
オプションで、空の DataFrame を yield できます。updateの出力モードを使用し、空のDataFrameを出力すると、グループ化キーの値がnullに更新されます。
transformWithState を使用して、outputStructType キーワードで出力スキーマを定義します。
Rowオブジェクトとyieldを使用して行を出力します。
必要に応じて、空のイテレータを返却できます。update出力モードを使用し、空のイテレータを出力すると、グループ化キーの値がnullに更新されます。
Scalaでは、行はIteratorオブジェクトを使用して出力されます。スキーマは、出力される行のスキーマから自動的に導出されます。
必要に応じて、空のIteratorを返すことができます。update出力モードを使用し、空のIteratorを出力した場合、グループ化キーの値はnullに更新されます。
初期状態に対応する
オプションで、最初のマイクロバッチに初期状態を渡すことができます。
例えば、これを使用して以下を行うことができます。
- 既存のワークフローを新しいカスタムアプリケーションに移行する。
- スキーマまたはロジックを変更するには、ステートフル演算子をアップグレードします。
- 自動的に修復できず、手動での介入が必要な失敗を修復します。
状態ストア リーダーを使用して、既存のチェックポイントから状態情報を照会します。 「構造化ストリーミングの状態情報の読み取り」を参照してください。
既存の Delta テーブルをステートフル アプリケーションに変換する場合は、 spark.read.table("table_name") を使用してテーブルを読み取り、結果の データフレーム を渡します。 オプションで、新しいステートフル・アプリケーションに準拠するようにフィールドを選択または変更できます。
初期状態は、入力行と同じグループ化キースキーマを持つ データフレーム を使用して提供します。
Python は、StatefulProcessorを定義する際に handleInitialState を使用して初期状態を指定します。Scala は、 StatefulProcessorWithInitialStateの個別のクラスを使用します。
次の例では、既存のDeltaテーブルからキーごとのカウンターをシードします。
- Python (row-based)
- Scala
from pyspark.sql import Row
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from typing import Iterator
class CounterWithInitialState(StatefulProcessor):
def init(self, handle: StatefulProcessorHandle) -> None:
state_schema = StructType([StructField("count", IntegerType(), True)])
self.count_state = handle.getValueState("countState", state_schema)
def handleInitialState(self, key, initialState: Row, timerValues) -> None:
self.count_state.update((initialState["count"],))
def handleInputRows(self, key, rows: Iterator[Row], timerValues) -> Iterator[Row]:
count = self.count_state.get()[0] if self.count_state.exists() else 0
for _ in rows:
count += 1
self.count_state.update((count,))
yield Row(id=key[0], count=count)
def close(self) -> None:
pass
output_schema = StructType([
StructField("id", StringType(), True),
StructField("count", IntegerType(), True),
])
# Load existing counts as initial state — must use the same grouping key as the input
initial_state = spark.read.table("existing_counts").groupBy("id")
q = (
df.groupBy("id")
.transformWithState(
statefulProcessor=CounterWithInitialState(),
outputStructType=output_schema,
outputMode="Update",
timeMode="None",
initialState=initial_state,
)
.writeStream...
)
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.Encoders
class CounterWithInitialState
extends StatefulProcessorWithInitialState[String, (String, String), (String, String), (String, Int)] {
@transient private var countState: ValueState[Int] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
countState = getHandle.getValueState[Int]("countState", Encoders.scalaInt, TTLConfig.NONE)
}
override def handleInitialState(
key: String, initialState: (String, Int), timerValues: TimerValues): Unit = {
countState.update(initialState._2)
}
override def handleInputRows(
key: String,
rows: Iterator[(String, String)],
timerValues: TimerValues): Iterator[(String, String)] = {
val count = if (countState.exists()) countState.get() else 0
val newCount = count + rows.size
countState.update(newCount)
Iterator((key, newCount.toString))
}
}
// Load existing counts as initial state — must use the same grouping key as the input
val initialState = spark.read.table("existing_counts")
.as[(String, Int)]
.groupByKey(_._1)
val q = spark
.readStream
.format("delta")
.load(srcDeltaTableDir)
.as[(String, String)]
.groupByKey(_._1)
.transformWithState(
new CounterWithInitialState(),
TimeMode.None(),
OutputMode.Update(),
initialState,
)
.writeStream...
LakeFlow Spark宣言型パイプラインでtransformWithState使用します
LakeFlow Spark宣言型パイプライン内でtransformWithState演算子を使用して、 Pythonを使用してストリーミング パイプラインに任意のステートフル ロジックを実装します。
これを行うには、次のステップを完成させます。
- 任意のステートフルトランスフォーメーションのために、出力スキーマとステートフルプロセッサロジックを定義します。例については、 「ステートフルアプリケーションの例」を参照してください。
- DataFrameで
transformWithStateオペレーターを呼び出すLakeFlow Spark宣言型パイプラインフローを作成します。「チュートリアル: Lakeflow Pipelines Editor を使用して最初のパイプラインを作成する」を参照してください。 - パイプラインを実行し、ターゲット テーブルまたはシンクで結果を検証します。
transformWithStateを使用してセンサーのハートビートを監視する例については、 「例: transformWithStateを使用してセンサーのハートビートを監視する」を参照してください。