構造化ストリーミングの状態情報の読み取り
DataFrame オペレーションまたは SQL テーブル値関数を使用して、構造化ストリーミングの状態データとメタデータをクエリできます。これらの関数を使用して、構造化ストリーミング ステートフル クエリの状態情報を監視し、モニタリングやデバッグに役立ちます。
状態データまたはメタデータをクエリするには、ストリーミング クエリのチェックポイント パスへの読み取りアクセス権が必要です。 この記事で説明する関数は、状態データとメタデータへの読み取り専用アクセスを提供します。 バッチ読み取りセマンティクスは、状態情報のクエリにのみ使用できます。
Lakeflow Spark宣言型パイプライン、ストリーミング テーブル、またはマテリアライズドビューの状態情報をクエリすることはできません。 サーバレスコンピュートまたは標準アクセスモードで設定されたコンピュートを使用して状態情報を問い合わせることはできません。
必要条件
-
次のいずれかのコンピュート構成を使用します。
- Databricks Runtime 16.3 以降、標準アクセス モードで構成されたコンピュート。
- Databricks Runtime 14.3 LTS 以上 (コンピュート) は、専用または分離なしのアクセス モードで構成されています。
-
ストリーミング クエリで使用されるチェックポイント パスへの読み取りアクセス。
構造化ストリーミング 状態ストアの読み込み
サポートされている任意の Databricks Runtime で実行される構造化ストリーミング クエリの状態ストア情報を読み取ることができます。 次の構文を使用します。
- Python
- Scala
- SQL
df = (spark.read
.format("statestore")
.load("/checkpoint/path"))
val df = spark.read
.format("statestore")
.load("/checkpoint/path")
SELECT * FROM read_statestore('/checkpoint/path')
状態リーダーAPIのオプションとスキーマ
statestoreフォーマットオプションの完全なリストについては、 「状態ストア」を参照してください。
出力データは以下のスキーマを持ちます。
列 | タイプ | 説明 |
|---|---|---|
| Struct (さらに、状態キーから派生した型) | ステートフル・オペレーター・レコードのキー。状態チェックポイントのキー。 |
| 構造体 (状態値から派生したさらに型) | 状態チェックポイントのステートフル オペレーター レコードの値。 |
| 整数タイプ | ステートフル・オペレーター・レコードを含むステート・チェックポイントのパーティション。 |
Databricks Runtime 16.4 LTS以降では、 readChangeFeedオプションをtrueに設定すると、出力データは次のスキーマになります。
列 | タイプ | 説明 |
|---|---|---|
| Long | 状態変更が属するバッチ ID。 |
| 文字列 | バッチによって適用された変更の種類: |
| Struct (さらに、状態キーから派生した型) | ステートフル・オペレーター・レコードのキー。状態チェックポイントのキー。 |
| 構造体 (状態値から派生したさらに型) | ステートチェックポイントにおけるステートフルオペレーターレコードの値。 |
| 整数タイプ | ステートフル・オペレーター・レコードを含むステート・チェックポイントのパーティション。 |
read_statestoreテーブル値関数を参照してください。
構造化ストリーミングの状態変化を読み取る
Databricks Runtime 16.4 LTS以降で利用可能です。単一のマイクロバッチで完全な状態を表示するのではなく、マイクロバッチ間で状態がどのように変化するかを読み取るには、 readChangeFeedをtrueに設定し、 changeStartBatchIdを指定します。必要に応じて、 changeEndBatchIdを指定してください。オプションの完全なリストについては、状態ストアを参照してください。
例えば、バッチ2から最新のコミット済みバッチまでの状態変更を読み取るには、次のようにします。
- Python
- Scala
- SQL
df = (spark.read
.format("statestore")
.option("readChangeFeed", True)
.option("changeStartBatchId", 2)
.load("<checkpointLocation>")
)
val df = spark.read
.format("statestore")
.option("readChangeFeed", true)
.option("changeStartBatchId", 2)
.load("<checkpointLocation>")
SELECT * FROM read_statestore(
'<checkpointLocation>',
readChangeFeed => true,
changeStartBatchId => 2
);
出力スキーマには、追加のbatch_id列とchange_type列が含まれます。完全なスキーマについては、 「状態リーダーAPIのオプションとスキーマ」を参照してください。
構造化ストリーミングの状態メタデータの読み取り
Databricks Runtime 14.3 LTS以降で利用可能です。構造化ストリーミングクエリの状態メタデータ情報を読み取ることができます。
- Python
- Scala
- SQL
df = (spark.read
.format("state-metadata")
.load("<checkpointLocation>"))
val df = spark.read
.format("state-metadata")
.load("<checkpointLocation>")
SELECT * FROM read_state_metadata('/checkpoint/path')
返されるデータのスキーマは次のとおりです。
列 | タイプ | 説明 |
|---|---|---|
| 整数タイプ | ステートフル ストリーミング演算子の整数 ID。 |
| 文字列 | ステートフル ストリーミング オペレーターの名前。 |
| 文字列 | オペレータの状態ストアの名前。 |
| 整数タイプ | 状態ストアのパーティションの数。 |
| Long | 状態のクエリに使用できる最小バッチ ID。 |
| Long | 状態のクエリに使用できる最大バッチ ID。 |
minBatchId と maxBatchId によって提供されるバッチ ID 値は、チェックポイントが書き込まれた時点の状態を反映しています。古いバッチはマイクロバッチ実行で自動的にクリーンアップされるため、ここで提供される値がまだ使用可能であるとは限りません。
read_state_metadataテーブル値関数を参照してください。
例: ストリーム-ストリーム結合の一方の側をクエリする
次の構文を使用して、ストリーム/ストリーム結合の左側をクエリします。
- Python
- Scala
- SQL
left_df = (spark.read
.format("statestore")
.option("joinSide", "left")
.load("/checkpoint/path"))
val leftDf = spark.read
.format("statestore")
.option("joinSide", "left")
.load("/checkpoint/path")
SELECT * FROM read_statestore(
'/checkpoint/path',
joinSide => 'left'
);
例: 複数のステートフル演算子を持つストリームの状態ストアをクエリする
この例では、状態メタデータ リーダーを使用して、複数のステートフル演算子を持つストリーミング クエリのメタデータの詳細を収集し、メタデータ結果を状態リーダーのオプションとして使用します。
状態メタデータ リーダーは、次の構文例のように、チェックポイント パスを唯一のオプションとして受け取ります。
- Python
- Scala
- SQL
df = (spark.read
.format("state-metadata")
.load("<checkpointLocation>"))
val df = spark.read
.format("state-metadata")
.load("<checkpointLocation>")
SELECT * FROM read_state_metadata('/checkpoint/path')
次の表は、状態ストア のメタデータ の出力例を示しています。
演算子ID | 演算子名 | stateStoreName | numPartitions | minBatchId | maxBatchId |
|---|---|---|---|---|---|
0 | stateStoreSave | default | 200 | 0 | 13 |
1 | dedupeWithinWatermark | default | 200 | 0 | 13 |
dedupeWithinWatermark 演算子の結果を取得するには、次の例のように、operatorId オプションを使用して状態リーダーにクエリを実行します。
- Python
- Scala
- SQL
left_df = (spark.read
.format("statestore")
.option("operatorId", 1)
.load("/checkpoint/path"))
val leftDf = spark.read
.format("statestore")
.option("operatorId", 1)
.load("/checkpoint/path")
SELECT * FROM read_statestore(
'/checkpoint/path',
operatorId => 1
);