メインコンテンツまでスキップ

構造化ストリーミングの状態情報の読み取り

DataFrame オペレーションまたは SQL テーブル値関数を使用して、構造化ストリーミングの状態データとメタデータをクエリできます。これらの関数を使用して、構造化ストリーミング ステートフル クエリの状態情報を監視し、モニタリングやデバッグに役立ちます。

状態データまたはメタデータをクエリするには、ストリーミング クエリのチェックポイント パスへの読み取りアクセス権が必要です。 この記事で説明する関数は、状態データとメタデータへの読み取り専用アクセスを提供します。 バッチ読み取りセマンティクスは、状態情報のクエリにのみ使用できます。

注記

DLT パイプライン、ストリーミングテーブル、またはマテリアライズドビューの状態情報をクエリすることはできません。標準アクセスモードで設定されたサーバレス コンピュートまたはコンピュートを使用して状態情報を照会することはできません。

必要条件

  • 次のいずれかのコンピュート構成を使用します。

    • Databricks Runtime 16.3 以降、標準アクセス モードで構成されたコンピュート。
    • Databricks Runtime 14.3 LTS 以上 (コンピュート) は、専用または分離なしのアクセス モードで構成されています。
  • ストリーミング クエリで使用されるチェックポイント パスへの読み取りアクセス。

Read 構造化ストリーミング 状態ストア

サポートされている任意の Databricks Runtime で実行される構造化ストリーミング クエリの状態ストア情報を読み取ることができます。 次の構文を使用します。

Python
df = (spark.read
.format("statestore")
.load("/checkpoint/path"))

状態リーダー API パラメーター

状態リーダー API は、次のオプション構成をサポートしています。

オプション

タイプ

デフォルト値

説明

batchId

Long

最新のバッチ ID

読み取り元のターゲット バッチを表します。 クエリの以前の状態の状態情報をクエリするには、このオプションを指定します。 バッチはコミットされている必要がありますが、まだクリーンアップされていません。

operatorId

Long

0

読み込むターゲット演算子を表します。 このオプションは、クエリが複数のステートフル演算子を使用している場合に使用されます。

storeName

文字列

「デフォルト」

読み取り元のターゲット状態ストア名を表します。 このオプションは、ステートフル オペレーターが複数の状態ストア インスタンスを使用する場合に使用されます。 ストリーム-スチーム結合には、 storeName または joinSide のいずれかを指定する必要がありますが、両方を指定することはできません。

joinSide

文字列 ("left" または "right")

読み取り元のターゲット側を表します。このオプションは、ユーザーがストリーム/ストリーム結合から状態を読み取る場合に使用されます。

stateVarName

文字列

なし

このクエリの一部として読み取る状態変数の名前。状態変数名は、transformWithState 演算子によって使用されるStatefulProcessorinit 関数内の各変数に付けられた一意の名前です。このオプションは、 transformWithState 演算子を使用する場合に必須オプションです。このオプションは transformWithState 演算子にのみ適用され、他の演算子では無視されます。Databricks Runtime 16.2 以降で使用できます。

readRegisteredTimers

ブール値

False

true に設定すると、transformWithState オペレーター内で使用される登録済みタイマーが読み取られます。このオプションは transformWithState 演算子にのみ適用され、他の演算子では無視されます。Databricks Runtime 16.2 以降で使用できます。

flattenCollectionTypes

ブール値

True

trueの場合、map 状態変数と list 状態変数に対して返されるレコードをフラット化します。falseの場合、レコードは Spark SQL Array または Mapを使用して返されます。このオプションは transformWithState 演算子にのみ適用され、他の演算子では無視されます。Databricks Runtime 16.2 以降で使用できます。

返されるデータのスキーマは次のとおりです。

タイプ

説明

key

Struct (さらに、状態キーから派生した型)

ステートフル・オペレーター・レコードのキー。状態チェックポイントのキー。

value

構造体 (状態値から派生したさらに型)

状態チェックポイントのステートフル オペレーター レコードの値。

partition_id

整数タイプ

ステートフル・オペレーター・レコードを含むステート・チェックポイントのパーティション。

read_statestoreテーブル値関数を参照してください。

構造化ストリーミングの状態メタデータの読み取り

important

Databricks Runtime 14.2 以降でストリーミング クエリを実行して、状態メタデータを記録する必要があります。 状態メタデータ ファイルは、下位互換性を損なうことはありません。 Databricks Runtime 14.1 以下でストリーミング クエリを実行することを選択した場合、既存の状態メタデータ ファイルは無視され、新しい状態メタデータ ファイルは書き込まれません。

Databricks Runtime 14.2 以降で実行される構造化ストリーミング クエリの状態メタデータ情報を読み取ることができます。 次の構文を使用します。

Python
df = (spark.read
.format("state-metadata")
.load("<checkpointLocation>"))

返されるデータのスキーマは次のとおりです。

タイプ

説明

operatorId

整数タイプ

ステートフル ストリーミング演算子の整数 ID。

operatorName

整数タイプ

ステートフル ストリーミング オペレーターの名前。

stateStoreName

文字列

オペレータの状態ストアの名前。

numPartitions

整数タイプ

状態ストアのパーティションの数。

minBatchId

Long

状態のクエリに使用できる最小バッチ ID。

maxBatchId

Long

状態のクエリに使用できる最大バッチ ID。

注記

minBatchIdmaxBatchId によって提供されるバッチ ID 値は、チェックポイントが書き込まれた時点の状態を反映しています。古いバッチはマイクロバッチ実行で自動的にクリーンアップされるため、ここで提供される値がまだ使用可能であるとは限りません。

read_state_metadataテーブル値関数を参照してください。

例: ストリーム-ストリーム結合の一方の側をクエリする

次の構文を使用して、ストリーム/ストリーム結合の左側をクエリします。

Python
left_df = (spark.read
.format("statestore")
.option("joinSide", "left")
.load("/checkpoint/path"))

例: 複数のステートフル演算子を持つストリームの状態ストアをクエリする

この例では、状態メタデータ リーダーを使用して、複数のステートフル演算子を持つストリーミング クエリのメタデータの詳細を収集し、メタデータ結果を状態リーダーのオプションとして使用します。

状態メタデータ リーダーは、次の構文例のように、チェックポイント パスを唯一のオプションとして受け取ります。

Python
df = (spark.read
.format("state-metadata")
.load("<checkpointLocation>"))

次の表は、状態ストア metadata の出力例を示しています。

演算子ID

演算子名

stateStoreName (ステートストア名)

numパーティション

minBatchId (英語)

maxBatchId (マックスバッチイド)

0

stateStore保存

default

200

0

13

1

重複排除ウィthinウォーターマーク

default

200

0

13

dedupeWithinWatermark 演算子の結果を取得するには、次の例のように、operatorId オプションを使用して状態リーダーにクエリを実行します。

Python
left_df = (spark.read
.format("statestore")
.option("operatorId", 1)
.load("/checkpoint/path"))