メインコンテンツまでスキップ

構造化ストリーミングの状態情報の読み取り

DataFrame オペレーションまたは SQL テーブル値関数を使用して、構造化ストリーミングの状態データとメタデータをクエリできます。これらの関数を使用して、構造化ストリーミング ステートフル クエリの状態情報を監視し、モニタリングやデバッグに役立ちます。

状態データまたはメタデータをクエリするには、ストリーミング クエリのチェックポイント パスへの読み取りアクセス権が必要です。 この記事で説明する関数は、状態データとメタデータへの読み取り専用アクセスを提供します。 バッチ読み取りセマンティクスは、状態情報のクエリにのみ使用できます。

注記

Lakeflow Spark宣言型パイプライン、ストリーミング テーブル、またはマテリアライズドビューの状態情報をクエリすることはできません。 サーバレスコンピュートまたは標準アクセスモードで設定されたコンピュートを使用して状態情報を問い合わせることはできません。

必要条件

  • 次のいずれかのコンピュート構成を使用します。

    • Databricks Runtime 16.3 以降、標準アクセス モードで構成されたコンピュート。
    • Databricks Runtime 14.3 LTS 以上 (コンピュート) は、専用または分離なしのアクセス モードで構成されています。
  • ストリーミング クエリで使用されるチェックポイント パスへの読み取りアクセス。

構造化ストリーミング 状態ストアの読み込み

サポートされている任意の Databricks Runtime で実行される構造化ストリーミング クエリの状態ストア情報を読み取ることができます。 次の構文を使用します。

Python
df = (spark.read
.format("statestore")
.load("/checkpoint/path"))

状態リーダーAPIのオプションとスキーマ

statestoreフォーマットオプションの完全なリストについては、 「状態ストア」を参照してください。

出力データは以下のスキーマを持ちます。

タイプ

説明

key

Struct (さらに、状態キーから派生した型)

ステートフル・オペレーター・レコードのキー。状態チェックポイントのキー。

value

構造体 (状態値から派生したさらに型)

状態チェックポイントのステートフル オペレーター レコードの値。

partition_id

整数タイプ

ステートフル・オペレーター・レコードを含むステート・チェックポイントのパーティション。

Databricks Runtime 16.4 LTS以降では、 readChangeFeedオプションをtrueに設定すると、出力データは次のスキーマになります。

タイプ

説明

batch_id

Long

状態変更が属するバッチ ID。

change_type

文字列

バッチによって適用された変更の種類: updateは挿入と更新、 deleteは削除。

key

Struct (さらに、状態キーから派生した型)

ステートフル・オペレーター・レコードのキー。状態チェックポイントのキー。

value

構造体 (状態値から派生したさらに型)

ステートチェックポイントにおけるステートフルオペレーターレコードの値。change_typedeleteであるレコードの場合、 null

partition_id

整数タイプ

ステートフル・オペレーター・レコードを含むステート・チェックポイントのパーティション。

read_statestoreテーブル値関数を参照してください。

構造化ストリーミングの状態変化を読み取る

Databricks Runtime 16.4 LTS以降で利用可能です。単一のマイクロバッチで完全な状態を表示するのではなく、マイクロバッチ間で状態がどのように変化するかを読み取るには、 readChangeFeedtrueに設定し、 changeStartBatchIdを指定します。必要に応じて、 changeEndBatchIdを指定してください。オプションの完全なリストについては、状態ストアを参照してください。

例えば、バッチ2から最新のコミット済みバッチまでの状態変更を読み取るには、次のようにします。

Python
df = (spark.read
.format("statestore")
.option("readChangeFeed", True)
.option("changeStartBatchId", 2)
.load("<checkpointLocation>")
)

出力スキーマには、追加のbatch_id列とchange_type列が含まれます。完全なスキーマについては、 「状態リーダーAPIのオプションとスキーマ」を参照してください。

構造化ストリーミングの状態メタデータの読み取り

Databricks Runtime 14.3 LTS以降で利用可能です。構造化ストリーミングクエリの状態メタデータ情報を読み取ることができます。

Python
df = (spark.read
.format("state-metadata")
.load("<checkpointLocation>"))

返されるデータのスキーマは次のとおりです。

タイプ

説明

operatorId

整数タイプ

ステートフル ストリーミング演算子の整数 ID。

operatorName

文字列

ステートフル ストリーミング オペレーターの名前。

stateStoreName

文字列

オペレータの状態ストアの名前。

numPartitions

整数タイプ

状態ストアのパーティションの数。

minBatchId

Long

状態のクエリに使用できる最小バッチ ID。

maxBatchId

Long

状態のクエリに使用できる最大バッチ ID。

注記

minBatchIdmaxBatchId によって提供されるバッチ ID 値は、チェックポイントが書き込まれた時点の状態を反映しています。古いバッチはマイクロバッチ実行で自動的にクリーンアップされるため、ここで提供される値がまだ使用可能であるとは限りません。

read_state_metadataテーブル値関数を参照してください。

例: ストリーム-ストリーム結合の一方の側をクエリする

次の構文を使用して、ストリーム/ストリーム結合の左側をクエリします。

Python
left_df = (spark.read
.format("statestore")
.option("joinSide", "left")
.load("/checkpoint/path"))

例: 複数のステートフル演算子を持つストリームの状態ストアをクエリする

この例では、状態メタデータ リーダーを使用して、複数のステートフル演算子を持つストリーミング クエリのメタデータの詳細を収集し、メタデータ結果を状態リーダーのオプションとして使用します。

状態メタデータ リーダーは、次の構文例のように、チェックポイント パスを唯一のオプションとして受け取ります。

Python
df = (spark.read
.format("state-metadata")
.load("<checkpointLocation>"))

次の表は、状態ストア のメタデータ の出力例を示しています。

演算子ID

演算子名

stateStoreName

numPartitions

minBatchId

maxBatchId

0

stateStoreSave

default

200

0

13

1

dedupeWithinWatermark

default

200

0

13

dedupeWithinWatermark 演算子の結果を取得するには、次の例のように、operatorId オプションを使用して状態リーダーにクエリを実行します。

Python
left_df = (spark.read
.format("statestore")
.option("operatorId", 1)
.load("/checkpoint/path"))