構造化ストリーミングの状態情報の読み取り

プレビュー

この機能はパブリックプレビュー段階です。

Databricks Runtime 14.3 LTS 以降では、DataFrame 操作または SQL テーブル値関数を使用して、構造化ストリーミングの状態データとメタデータに対してクエリを実行できます。 これらの関数を使用して、構造化ストリーミングのステートフル クエリの状態情報を監視でき、モニタリングとデバッグに役立ちます。

状態データまたはメタデータを照会するには、ストリーミング クエリのチェックポイント パスへの読み取りアクセス権が必要です。 この記事で説明する関数は、状態データとメタデータへの読み取り専用アクセスを提供します。 バッチ読み取りセマンティクスは、状態情報のクエリにのみ使用できます。

注:

Delta Live Tables パイプライン、ストリーミング テーブル、または具体化されたビューの状態情報を照会することはできません。

Read Structured Streaming 状態ストア

サポートされている任意の Databricks Runtime で実行される構造化ストリーミング クエリの状態ストア情報を読み取ることができます。 次の構文を使用します。

df = (spark.read
  .format("statestore")
  .load("/checkpoint/path"))
SELECT * FROM read_statestore('/checkpoint/path')

次のオプション構成がサポートされています。

オプション

タイプ

デフォルト値

説明

batchId

ロング

最新のバッチ ID

読み取り元のターゲット バッチを表します。 このオプションを指定すると、クエリの以前の状態の状態情報を照会できます。 バッチはコミットする必要がありますが、まだクリーンアップされていません。

operatorId

ロング

0

読み取り元のターゲット演算子を表します。 このオプションは、クエリで複数のステートフル演算子が使用されている場合に使用します。

storeName

文字列

「デフォルト」

読み取り元のターゲット状態ストア名を表します。 このオプションは、ステートフルなオペレーターが複数の状態ストアインスタンスを使用する場合に使用されます。 ストリームとストリームの結合には、 storeName または joinSide のいずれかを指定する必要がありますが、両方を指定することはできません。

joinSide

文字列 ("left" または "right")

読み取り元のターゲット側を表します。 このオプションは、ユーザーがストリームストリーム結合から状態を読み取る場合に使用します。

返されるデータには、次のスキーマがあります。

タイプ

説明

key

構造体 (状態キーから派生した追加の型)

状態チェックポイント内のステートフルなオペレーター レコードのキー。

value

構造体 (状態値から派生した追加の型)

状態チェックポイント内のステートフルなオペレーター・レコードの値。

partition_id

整数タイプ

ステートフル・オペレーター・レコードを含む状態チェックポイントのパーティション。

構造化ストリーミングの状態メタデータの読み取り

重要

Databricks Runtime 14.2 以降でストリーミング クエリを実行して、状態メタデータを記録する必要があります。 状態メタデータ ファイルでは、下位互換性が損なわれることはありません。 Databricks Runtime 14.1 以下でストリーミング クエリを実行することを選択した場合、既存の状態メタデータ ファイルは無視され、新しい状態メタデータ ファイルは書き込まれません。

Databricks Runtime 14.2 以降で実行される構造化ストリーミング クエリの状態メタデータ情報を読み取ることができます。 次の構文を使用します。

df = (spark.read
  .format("state-metadata")
  .load("<checkpointLocation>"))
SELECT * FROM read_state_metadata('/checkpoint/path')

返されるデータには、次のスキーマがあります。

タイプ

説明

operatorId

整数タイプ

ステートフル ストリーミング演算子の整数 ID。

operatorName

整数タイプ

ステートフル ストリーミング演算子の名前。

stateStoreName

文字列

演算子の状態ストアの名前。

numPartitions

整数タイプ

状態ストアのパーティションの数。

minBatchId

ロング

状態のクエリに使用できる最小バッチ ID。

maxBatchId

ロング

状態のクエリに使用できる最大バッチ ID。

注:

minBatchIdmaxBatchId によって提供されるバッチ ID 値は、チェックポイントが書き込まれた時点の状態を反映しています。古いバッチはマイクロバッチの実行によって自動的にクリーンアップされるため、ここで指定した値が引き続き使用可能であるとは限りません。