メインコンテンツまでスキップ

カスタムステートフルアプリケーションの構築

備考

プレビュー

この機能は、Databricks Runtime 16.2 以降で パブリック プレビュー 段階です。

カスタムステートフル演算子を使用してストリーミングアプリケーションを構築し、任意のステートフルロジックを使用する低レイテンシでほぼリアルタイムのソリューションを実装できます。 カスタムステートフルオペレーターは、従来の構造化ストリーミング処理では利用できなかった新しい運用ユースケースとパターンを解き放ちます。

注記

Databricks では、集計、重複除去、ストリーミング結合などのサポートされているステートフル操作に、組み込みの構造化ストリーミング機能を使用することをお勧めします。 「ステートフル ストリーミングとは」を参照してください。

Databricks では、任意の状態変換に " transformWithState over legacy 演算子" を使用することをお勧めします。 従来の flatMapGroupsWithState 演算子と mapGroupsWithState 演算子のドキュメントについては、「 従来の任意のステートフル演算子」を参照してください。

必要条件

transformWithState演算子と関連する APIs およびクラスには、次の要件があります。

  • Databricks Runtime 16.2 以降で使用できます。
  • コンピュートは、専用アクセスモードまたは非分離アクセスモードを使用する必要があります。
  • RocksDB状態ストア プロバイダーを使用する必要があります。Databricks では、コンピュート構成の一部として RocksDB を有効にすることをお勧めします。
  • transformWithStateInPandas Databricks Runtime 16.3 以降で標準アクセス モードをサポートします。
注記

現在のセッションで RocksDB 状態ストア プロバイダーを有効にするには、次のコマンドを実行します。

Python
spark.conf.set("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")

transformWithState とは

transformWithState演算子は、カスタムステートフルプロセッサを構造化ストリーミングクエリに適用します。transformWithStateを使用するには、カスタムステートフルプロセッサを実装する必要があります。構造化ストリーミングには、APIs Python、Scala 、または を使用してステートフルプロセッサを構築するためのJava が含まれています。

transformWithState を使用して、構造化ストリーミングで段階的に処理されるレコードのグループ化キーにカスタムロジックを適用します。次に、高レベルの設計について説明します。

  • 1 つ以上の状態変数を定義します。
  • 状態情報はグループ化キーごとに保持され、ユーザー定義のロジックに従って各状態変数からアクセスできます。
  • 処理されたマイクロバッチごとに、キーのすべてのレコードがイテレータとして使用できます。
  • 組み込みハンドルを使用して、タイマーとユーザー定義の条件に基づいてレコードが出力されるタイミングと方法を制御します。
  • 状態値は、個々の有効期限 (TTL) 定義をサポートしているため、状態の有効期限と状態サイズを柔軟に管理できます。
important

PySpark は、 transformWithStateの代わりに演算子 transformWithStateInPandas を使用します。Databricksドキュメントでは、transformWithState を使用して、 Python と Scala の両方の実装の機能を記述します。

および関連 ScalaPythonの とtransformWithStateAPIs の実装は、言語の仕様によって異なりますが、同じ機能を提供します。お好みのプログラミング言語については、言語固有の例と API ドキュメントを参照してください。

組み込み processing handles

カスタム ステートフル アプリケーションのコア ロジックを実装するには、組み込み ハンドル を使用して ハンドラー を実装します。

  • ハンドルは、状態値とタイマーと対話し、受信レコードを処理し、レコードを出力するメソッドを提供します。
  • ハンドラーは、カスタムイベント駆動型ロジックを定義します。

各状態の種類のハンドルは、基になるデータ構造に基づいて実装されますが、それぞれにレコードを取得、入力、更新、および削除する機能が含まれています。

ハンドラーは、入力レコードまたはタイマーで観察されるイベントに基づいて、次のセマンティクスを使用して実装されます。

  • handleInputRowsメソッドを使用してハンドラを定義し、データの処理方法、状態の更新方法、およびグループ化キーで処理されたレコードのマイクロバッチごとにレコードが出力される方法を制御します。「入力行の処理」を参照してください。
  • handleExpiredTimerメソッドを使用してハンドラを定義し、時間ベースのしきい値を使用して、グループ化キーに対して追加のレコードが処理されるかどうかのロジックを実行します。「プログラム時間指定イベント」を参照してください。

次の表は、これらのハンドラーがサポートする機能動作の比較を示しています。

挙動

handleInputRows

handleExpiredTimer

状態値の取得、配置、更新、またはクリア

あり

あり

タイマーを作成または削除する

あり

あり

レコードを出力する

あり

あり

現在のマイクロバッチのレコードを反復処理する

あり

いいえ

経過時間に基づくトリガーロジック

いいえ

あり

必要に応じて、 handleInputRowshandleExpiredTimer を組み合わせて複雑なロジックを実装できます。

たとえば、 handleInputRows を使用して各マイクロバッチの状態値を更新し、将来 10 秒のタイマーを設定するアプリケーションを実装できます。 追加のレコードが処理されない場合は、 handleExpiredTimer を使用して、状態ストアの現在の値を出力できます。 グループ化キーで新しいレコードが処理される場合は、既存のタイマーをクリアして新しいタイマーを設定できます。

カスタム状態の種類

1 つのステートフル演算子で複数のステートオブジェクトを実装できます。 各状態オブジェクトに付けた名前は状態ストアに保持され、状態ストア リーダーでアクセスできます。 状態オブジェクトが StructTypeを使用する場合は、スキーマを渡すときに構造体の各フィールドに名前を指定します。 これらの名前は、状態ストアを読み取るときにも表示されます。 「構造化ストリーミングの状態情報の読み取り」を参照してください。

組み込みクラスと組み込み演算子によって提供される機能は、柔軟性と拡張性を提供することを目的としており、実装の選択肢は、アプリケーションの実行に必要な完全なロジックによって通知される必要があります。 たとえば、フィールド user_idsession_id でグループ化されたValueStateや、user_id でグループ化されたMapState (session_id が のキー) を使用して、ほぼ同じロジックを実装できますMapState。この場合、ロジックが複数のsession_id間で条件を評価する必要がある場合、MapStateが推奨される実装になる可能性があります。

次のセクションでは、 transformWithStateでサポートされる状態タイプについて説明します。

値の状態

グループ化キーごとに、値が関連付けられています。

値の状態には、構造体やタプルなどの複合型を含めることができます。 ValueStateを更新するときは、値全体を置き換えるロジックを実装します。値の TTL は、値が更新されるとリセットされますが、保存されているValueStateを更新せずにValueStateに一致するソース キーが処理された場合はリセットされません。

リストステート

グループ化キーごとに、関連付けられたリストがあります。

リストの状態は値のコレクションであり、各値には複合型を含めることができます。 リスト内の各値には、独自の TTL があります。 リストにアイテムを追加するには、個々のアイテムを追加するか、アイテムのリストを追加するか、リスト全体を putで上書きします。 put 操作のみが TTL リセットの更新と見なされます。

マップステート

グループ化キーごとに、マップが関連付けられています。 マップは、Python の辞書と同等の Apache Spark 機能です。

important

グループ化キーは、構造化ストリーミング クエリの GROUP BY 句で指定されたフィールドを記述します。 マップ状態には、グループ化キーの任意の数のキーと値のペアが含まれます。

たとえば、 user_id でグループ化し、各 session_idのマップを定義する場合、グループ化キーは user_id で、マップ内のキーは session_idです。

マップ状態は、それぞれが複合型を含むことができる値にマップする個別のキーのコレクションです。 マップ内の各キーと値のペアには、独自の TTL があります。 特定のキーの値を更新したり、キーとその値を削除したりできます。 キーを使用して個々の値を返したり、すべてのキーをリストしたり、すべての値をリストしたり、マップ内のキーと値のペアの完全なセットを操作するためのイテレータを返したりできます。

カスタム状態変数の初期化

StatefulProcessorを初期化すると、各ステートオブジェクトにローカル変数が作成され、カスタムロジックでステートオブジェクトと対話できるようになります。状態変数は、StatefulProcessor クラスの組み込み init メソッドをオーバーライドすることで定義および初期化されます。

任意の量の状態オブジェクトを定義するには、 getValueStategetListState、および getMapState メソッドを使用して、 StatefulProcessorを初期化します。

各状態オブジェクトには、次のものが必要です。

  • 一意の名前
  • 指定されたスキーマ
    • Python では、スキーマは明示的に指定されます。
    • Scala では、状態スキーマを指定するための Encoder を渡します。

また、必要に応じて、有効期限 (TTL) の期間をミリ秒単位で指定することもできます。 マップ状態を実装する場合は、マップ・キーと値に対して個別のスキーマ定義を提供する必要があります。

注記

状態情報のクエリ、更新、および出力の方法に関するロジックは、個別に処理されます。 「状態変数の使用」を参照してください。

ステートフルなアプリケーションの例

次に、サポートされている各型の状態変数の例を含む、 transformWithStateを使用してカスタム ステートフル プロセッサを定義して使用するための基本的な構文を示します。 その他の例については、「 ステートフル アプリケーションの例」を参照してください。

注記

Python は、状態値とのすべての対話にタプルを使用します。 つまり、Python コードは putupdate などの操作を使用する場合はタプルを使用して値を渡し、 getを使用する場合はタプルを処理することを期待する必要があります。

たとえば、値の状態のスキーマが 1 つの整数のみの場合は、次のようなコードを実装します。

Python
current_value_tuple = value_state.get() # Returns the value state as a tuple
current_value = current_value_tuple[0] # Extracts the first item in the tuple
new_value = current_value + 1 # Calculate a new value
value_state.update((new_value,)) # Pass the new value formatted as a tuple

これは、 ListState 内のアイテムや MapState内の値にも当てはまります。

Python
import pandas as pd
from pyspark.sql import Row
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from typing import Iterator

spark.conf.set("spark.sql.streaming.stateStore.providerClass","org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")

output_schema = StructType(
[
StructField("id", StringType(), True),
StructField("countAsString", StringType(), True),
]
)

class SimpleCounterProcessor(StatefulProcessor):
def init(self, handle: StatefulProcessorHandle) -> None:
value_state_schema = StructType([StructField("count", IntegerType(), True)])
list_state_schema = StructType([StructField("count", IntegerType(), True)])
self.value_state = handle.getValueState(stateName="valueState", schema=value_state_schema)
self.list_state = handle.getListState(stateName="listState", schema=list_state_schema)
# Schema can also be defined using strings and SQL DDL syntax
self.map_state = handle.getMapState(stateName="mapState", userKeySchema="name string", valueSchema="count int")

def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
count = 0
for pdf in rows:
list_state_rows = [(120,), (20,)] # A list of tuples
self.list_state.put(list_state_rows)
self.list_state.appendValue((111,))
self.list_state.appendList(list_state_rows)
pdf_count = pdf.count()
count += pdf_count.get("value")
self.value_state.update((count,)) # Count is passed as a tuple
iter = self.list_state.get()
list_state_value = next(iter1)[0]
value = count
user_key = ("user_key",)
if self.map_state.exists():
if self.map_state.containsKey(user_key):
value += self.map_state.getValue(user_key)[0]
self.map_state.updateValue(user_key, (value,)) # Value is a tuple
yield pd.DataFrame({"id": key, "countAsString": str(count)})

q = (df.groupBy("key")
.transformWithStateInPandas(
statefulProcessor=SimpleCounterProcessor(),
outputStructType=output_schema,
outputMode="Update",
timeMode="None",
)
.writeStream...
)

ステートフルプロセッサハンドル

PySpark には、ユーザー定義の Python コードが状態情報と対話する方法を制御する関数へのアクセスを提供する StatefulProcessorHandle クラスが含まれています。 StatefulProcessorを初期化するときは、常にStatefulProcessorHandleをインポートしてhandle変数に渡す必要があります。

handle変数は、Python クラスのローカル変数を状態変数に関連付けます。

注記

Scala は getHandle メソッドを使用します。

初期状態の指定

必要に応じて、最初のマイクロバッチで使用する初期状態を指定できます。 これは、既存のワークフローを新しいカスタムアプリケーションに移行する場合、ステートフルな演算子をアップグレードしてスキーマやロジックを変更する場合、または自動的に修復できず手動による介入が必要な障害を修復する場合に便利です。

注記

状態ストア リーダーを使用して、既存のチェックポイントから状態情報を照会します。 「構造化ストリーミングの状態情報の読み取り」を参照してください。

既存の Delta テーブルをステートフル アプリケーションに変換する場合は、 spark.read.table("table_name") を使用してテーブルを読み取り、結果の DataFrame を渡します。 オプションで、新しいステートフル・アプリケーションに準拠するようにフィールドを選択または変更できます。

初期状態は、入力行と同じグループ化キースキーマを持つ DataFrame を使用して提供します。

注記

Python は、StatefulProcessorを定義する際に handleInitialState を使用して初期状態を指定します。Scala は、 StatefulProcessorWithInitialStateの個別のクラスを使用します。

状態変数を使用する

サポートされている状態オブジェクトは、状態の取得、既存の状態情報の更新、または現在の状態のクリアを行うためのメソッドを提供します。 サポートされている各状態の種類には、実装されたデータ構造に対応するメソッドの一意の実装があります。

観測される各グループ化キーには、専用の状態情報があります。

  • レコードは、実装するロジックに基づいて、指定した出力スキーマを使用して出力されます。 「レコードの出力」を参照してください。
  • 状態ストア内の値には、 statestore リーダーを使用してアクセスできます。 このリーダーにはバッチ機能があり、待機時間の短いワークロードを対象としていません。 「構造化ストリーミングの状態情報の読み取り」を参照してください。
  • handleInputRowsを使用して指定されたロジックは、キーのレコードがマイクロバッチに存在する場合にのみ起動します。「入力行の処理」を参照してください。
  • handleExpiredTimer を使用して、レコードの起動を監視することに依存しない時間ベースのロジックを実装します。「プログラム時間指定イベント」を参照してください。
注記

状態オブジェクトは、次の意味を持つグループ化キーによって分離されます。

  • 状態の値は、異なるグループ化キーに関連付けられたレコードによって影響を受けることはありません。
  • グループ化キー間での値の比較や状態の更新に依存するロジックは実装 できません

グループ化キー内の値を比較できます。 MapStateを使用して、カスタムロジックで使用できる 2 番目のキーでロジックを実装します。たとえば、 user_id でグループ化し、IPアドレスを使用して MapState をキー設定すると、同時ユーザーセッションを追跡するロジックを実装できます。

状態の操作に関する高度な考慮事項

状態変数への書き込みは、RocksDB への書き込みをトリガーします。 パフォーマンスを最適化するために、Databricks では、特定のキーの反復子のすべての値を処理し、可能な限り 1 回の書き込みで更新をコミットすることをお勧めします。

注記

状態の更新はフォールト トレラントです。 マイクロバッチの処理が完了する前にタスクがクラッシュした場合、再試行では最後に成功したマイクロバッチの値が使用されます。

State 値には組み込みのデフォルトはありません。 ロジックで既存の状態情報を読み取る必要がある場合は、ロジックの実装時に exists メソッドを使用します。

注記

MapState 変数には、個々のキーをチェックしたり、すべてのキーを一覧表示して NULL 状態のロジックを実装するための追加機能があります。

レコードを出力する

ユーザー定義ロジックは、 transformWithState がレコードを出力する方法を制御します。 レコードは、グループ化キーごとに出力されます。

カスタム・ステートフル・アプリケーションは、レコードの出力方法を決定する際に状態情報の使用方法について何も想定せず、特定の条件に対して返されるレコードの数は、なし、1、または多数になります。

レコードを出力するロジックは、 handleInputRows または handleExpiredTimerを使用して実装します。 入力行の処理およびプログラム時間指定イベントを参照してください。

注記

複数の状態値を実装し、レコードを出力するための複数の条件を定義できますが、出力されるすべてのレコードで同じスキーマを使用する必要があります。

In Python, you define your output schema using the outputStructType keyword while calling transformWithStateInPandas.

You emit records using a pandas DataFrame object and yield.

You can optionally yield an empty DataFrame. When combined with update output mode, emitting an empty DataFrame updates the values for the grouping key to be null.

入力行の処理

handleInputRowsメソッドを使用して、ストリーミングクエリで観察されたレコードが状態値とどのように相互作用し、更新するかのロジックを定義します。handleInputRowsメソッドで定義するハンドラは、構造化ストリーミングクエリを通じてレコードが処理されるたびに実行されます。

transformWithStateで実装されたほとんどのステートフル アプリケーションでは、コア ロジックは handleInputRowsを使用して定義されます。

処理されたマイクロバッチ更新ごとに、特定のグループ化キーのマイクロバッチ内のすべてのレコードが反復子を使用して使用可能になります。 ユーザー定義ロジックは、現在のマイクロバッチのすべてのレコードとステートストア内の値と対話できます。

プログラム時間指定イベント

タイマーを使用して、指定した条件からの経過時間に基づいてカスタムロジックを実装できます。

タイマーを操作するには、 handleExpiredTimer メソッドを実装します。

グループ化キー内では、タイマーはタイムスタンプによって一意に識別されます。

タイマーの有効期限が切れると、結果はアプリケーションに実装されたロジックによって決定されます。 一般的なパターンは次のとおりです。

  • 状態変数に格納された情報を出力します。
  • 保存された状態情報の削除。
  • 新しいタイマーを作成します。

期限切れのタイマーは、関連付けられたキーのレコードがマイクロバッチで処理されない場合でも起動します。

時間モデルの指定

StatefulProcessortransformWithStateに渡す場合は、時間モデルを指定する必要があります。次のオプションがサポートされています。

  • ProcessingTime
  • EventTime
  • NoTime または TimeMode.None()

NoTimeを指定すると、プロセッサでタイマーがサポートされません。

組み込みのタイマー値

Databricks では、タスクの失敗時に信頼性の低い再試行につながる可能性があるため、カスタム ステートフル アプリケーションでシステム クロックを呼び出さないことをお勧めします。 処理時間またはウォーターマークにアクセスする必要がある場合は、 TimerValues クラスのメソッドを使用します。

TimerValues

説明

getCurrentProcessingTimeInMs

現在のバッチの処理時間のタイムスタンプをエポックからのミリ秒単位で返します。

getCurrentWatermarkInMs

現在のバッチのウォーターマークのタイムスタンプを、エポックからのミリ秒単位で返します。

注記

処理時間は、マイクロバッチが Apache Spark によって処理される時間を示します。 Kafka などの多くのストリーミング ソースには、システムの処理時間も含まれています。

ストリーミング クエリのウォーターマークは、多くの場合、イベント時間またはストリーミング ソースの処理時間に対して定義されます。 ウォーターマークを適用してデータ処理のしきい値を制御するを参照してください。

透かしとウィンドウはどちらも、 transformWithStateと組み合わせて使用 できます。 カスタム ステートフル アプリケーションに同様の機能を実装するには、TTL、タイマー、 MapState または ListState 機能を活用します。

状態の有効期限 (TTL) とは何ですか?

transformWithState で使用される state 値は、オプションの有効期限 (TTL) 仕様をサポートします。TTL の有効期限が切れると、値は状態ストアから削除されます。 TTL は状態ストア内の値とのみ対話するため、状態情報を削除するロジックを実装できますが、TTL が状態値を削除するときにロジックを直接トリガーすることはできません。

important

TTL を実装しない場合は、他のロジックを使用して状態の削除を処理し、状態の無限の増加を回避する必要があります。

TTL は状態値ごとに適用され、状態の種類ごとに異なるルールが適用されます。

  • 状態変数のスコープはグループ化キーです。
  • ValueStateオブジェクトの場合、グループ化キーごとに 1 つの値のみが格納されます。TTL はこの値に適用されます。
  • ListStateオブジェクトの場合、リストには多くの値を含めることができます。TTL は、リスト内の各値に個別に適用されます。
  • MapStateオブジェクトの場合、各マップキーには状態値が関連付けられています。TTL は、マップ内の各キーと値のペアに個別に適用されます。

すべての状態タイプで、状態情報が更新されると TTL がリセットされます。

注記

TTL は ListState内の個々の値にスコープされますが、リスト内の値を更新する唯一の方法は、 put メソッドを使用して ListState 変数の内容全体を上書きすることです。

タイマーとTTLの違いは何ですか?

タイマーと状態変数の有効期限 (TTL) との間には重複する部分がありますが、タイマーは TTL よりも幅広い機能セットを提供します。

TTL は、ユーザーが指定した期間に更新されていない状態情報を削除します。 これにより、ユーザーはチェックされていない状態の増加を防ぎ、古い状態エントリを削除できます。 マップとリストは各値に対して TTL を実装するため、TTL を設定することで、最近更新された状態値のみを考慮する関数を実装できます。

タイマーを使用すると、レコードの発行など、状態の削除以外のカスタムロジックを定義できます。 必要に応じて、タイマーを使用して特定の状態値の状態情報をクリアし、タイマーに基づいて値を出力したり、他の条件付きロジックをトリガーしたりする柔軟性を追加できます。