構造化ストリーミングの状態情報の読み取り
Databricks Runtime 14.3 LTS 以降では、コンピュートで専用または分離アクセスモードなしで構成されている場合、データフレーム操作またはSQLテーブル値関数を使用して、構造化ストリーミング状態データとメタデータをクエリできます。これらの関数を使用して、構造化ストリーミング ステートフル クエリの状態情報を監視でき、モニタリングやデバッグに役立ちます。
状態データまたはメタデータをクエリするには、ストリーミング クエリのチェックポイント パスへの読み取りアクセス権が必要です。 この記事で説明する関数は、状態データとメタデータへの読み取り専用アクセスを提供します。 バッチ読み取りセマンティクスは、状態情報のクエリにのみ使用できます。
DLT パイプライン、ストリーミングテーブル、またはマテリアライズドビューの状態情報をクエリすることはできません。標準アクセスモードで設定されたサーバレス コンピュートまたはコンピュートを使用して状態情報を照会することはできません。
Read 構造化ストリーミング 状態ストア
サポートされている任意の Databricks Runtime で実行される構造化ストリーミング クエリの状態ストア情報を読み取ることができます。 次の構文を使用します。
- Python
- SQL
df = (spark.read
.format("statestore")
.load("/checkpoint/path"))
SELECT * FROM read_statestore('/checkpoint/path')
次のオプション設定がサポートされています。
オプション | タイプ | デフォルト値 | 説明 |
---|---|---|---|
| Long | 最新のバッチ ID | 読み取り元のターゲット バッチを表します。 クエリの以前の状態の状態情報をクエリするには、このオプションを指定します。 バッチはコミットされている必要がありますが、まだクリーンアップされていません。 |
| Long | 0 | 読み込むターゲット演算子を表します。 このオプションは、クエリが複数のステートフル演算子を使用している場合に使用されます。 |
| 文字列 | 「デフォルト」 | 読み取り元のターゲット状態ストア名を表します。 このオプションは、ステートフル オペレーターが複数の状態ストア インスタンスを使用する場合に使用されます。 ストリーム-スチーム結合には、 |
| 文字列 ("left" または "right") | 読み取り元のターゲット側を表します。 このオプションは、ユーザーがストリームストリーム結合から状態を読み取る場合に使用されます。 |
返されるデータのスキーマは次のとおりです。
列 | タイプ | 説明 |
---|---|---|
| Struct (さらに、状態キーから派生した型) | ステートフル・オペレーター・レコードのキー。状態チェックポイントのキー。 |
| 構造体 (状態値から派生したさらに型) | 状態チェックポイントのステートフル オペレーター レコードの値。 |
| 整数タイプ | ステートフル・オペレーター・レコードを含むステート・チェックポイントのパーティション。 |
構造化ストリーミングの状態メタデータの読み取り
Databricks Runtime 14.2 以降でストリーミング クエリを実行して、状態メタデータを記録する必要があります。 状態メタデータ ファイルは、下位互換性を損なうことはありません。 Databricks Runtime 14.1 以下でストリーミング クエリを実行することを選択した場合、既存の状態メタデータ ファイルは無視され、新しい状態メタデータ ファイルは書き込まれません。
Databricks Runtime 14.2 以降で実行される構造化ストリーミング クエリの状態メタデータ情報を読み取ることができます。 次の構文を使用します。
- Python
- SQL
df = (spark.read
.format("state-metadata")
.load("<checkpointLocation>"))
SELECT * FROM read_state_metadata('/checkpoint/path')
返されるデータのスキーマは次のとおりです。
列 | タイプ | 説明 |
---|---|---|
| 整数タイプ | ステートフル ストリーミング演算子の整数 ID。 |
| 整数タイプ | ステートフル ストリーミング オペレーターの名前。 |
| 文字列 | オペレータの状態ストアの名前。 |
| 整数タイプ | 状態ストアのパーティションの数。 |
| Long | 状態のクエリに使用できる最小バッチ ID。 |
| Long | 状態のクエリに使用できる最大バッチ ID。 |
minBatchId
と maxBatchId
によって提供されるバッチ ID 値は、チェックポイントが書き込まれた時点の状態を反映しています。古いバッチはマイクロバッチ実行で自動的にクリーンアップされるため、ここで提供される値がまだ使用可能であるとは限りません。