構造化ストリーミングの状態情報の読み取り
プレビュー
この機能はパブリックプレビュー段階です。
Databricks Runtime 14.3 LTS 以降では、DataFrame 操作または SQL テーブル値関数を使用して、構造化ストリーミングの状態データとメタデータに対してクエリを実行できます。 これらの関数を使用して、構造化ストリーミングのステートフル クエリの状態情報を監視でき、モニタリングとデバッグに役立ちます。
状態データまたはメタデータを照会するには、ストリーミング クエリのチェックポイント パスへの読み取りアクセス権が必要です。 この記事で説明する関数は、状態データとメタデータへの読み取り専用アクセスを提供します。 バッチ読み取りセマンティクスは、状態情報のクエリにのみ使用できます。
注:
Delta Live Tables パイプライン、ストリーミング テーブル、またはマテリアライズドビューの状態情報を照会することはできません。
Read Structured Streaming 状態ストア
サポートされている任意の Databricks Runtime で実行される構造化ストリーミング クエリの状態ストア情報を読み取ることができます。 次の構文を使用します。
df = (spark.read
.format("statestore")
.load("/checkpoint/path"))
SELECT * FROM read_statestore('/checkpoint/path')
次のオプション構成がサポートされています。
オプション |
タイプ |
デフォルト値 |
説明 |
---|---|---|---|
|
ロング |
最新のバッチ ID |
読み取り元のターゲット バッチを表します。 このオプションを指定すると、クエリの以前の状態の状態情報を照会できます。 バッチはコミットする必要がありますが、まだクリーンアップされていません。 |
|
ロング |
0 |
読み取り元のターゲット演算子を表します。 このオプションは、クエリで複数のステートフル演算子が使用されている場合に使用します。 |
|
文字列 |
「デフォルト」 |
読み取り元のターゲット状態ストア名を表します。 このオプションは、ステートフルなオペレーターが複数の状態ストアインスタンスを使用する場合に使用されます。 ストリームとストリームの結合には、 |
|
文字列 ("left" または "right") |
読み取り元のターゲット側を表します。 このオプションは、ユーザーがストリームストリーム結合から状態を読み取る場合に使用します。 |
返されるデータには、次のスキーマがあります。
列 |
タイプ |
説明 |
---|---|---|
|
構造体 (状態キーから派生した追加の型) |
状態チェックポイント内のステートフルなオペレーター レコードのキー。 |
|
構造体 (状態値から派生した追加の型) |
状態チェックポイント内のステートフルなオペレーター・レコードの値。 |
|
整数タイプ |
ステートフル・オペレーター・レコードを含む状態チェックポイントのパーティション。 |
構造化ストリーミングの状態メタデータの読み取り
重要
Databricks Runtime 14.2 以降でストリーミング クエリを実行して、状態メタデータを記録する必要があります。 状態メタデータ ファイルでは、下位互換性が損なわれることはありません。 Databricks Runtime 14.1 以下でストリーミング クエリを実行することを選択した場合、既存の状態メタデータ ファイルは無視され、新しい状態メタデータ ファイルは書き込まれません。
Databricks Runtime 14.2 以降で実行される構造化ストリーミング クエリの状態メタデータ情報を読み取ることができます。 次の構文を使用します。
df = (spark.read
.format("state-metadata")
.load("<checkpointLocation>"))
SELECT * FROM read_state_metadata('/checkpoint/path')
返されるデータには、次のスキーマがあります。
列 |
タイプ |
説明 |
---|---|---|
|
整数タイプ |
ステートフル ストリーミング演算子の整数 ID。 |
|
整数タイプ |
ステートフル ストリーミング演算子の名前。 |
|
文字列 |
演算子の状態ストアの名前。 |
|
整数タイプ |
状態ストアのパーティションの数。 |
|
ロング |
状態のクエリに使用できる最小バッチ ID。 |
|
ロング |
状態のクエリに使用できる最大バッチ ID。 |
注:
minBatchId
と maxBatchId
によって提供されるバッチ ID 値は、チェックポイントが書き込まれた時点の状態を反映しています。古いバッチはマイクロバッチの実行によって自動的にクリーンアップされるため、ここで指定した値が引き続き使用可能であるとは限りません。