メインコンテンツまでスキップ

構造化ストリーミングの状態情報の読み取り

Databricks Runtime 14.3 LTS 以降では、コンピュートで専用または分離アクセスモードなしで構成されている場合、データフレーム操作またはSQLテーブル値関数を使用して、構造化ストリーミング状態データとメタデータをクエリできます。これらの関数を使用して、構造化ストリーミング ステートフル クエリの状態情報を監視でき、モニタリングやデバッグに役立ちます。

状態データまたはメタデータをクエリするには、ストリーミング クエリのチェックポイント パスへの読み取りアクセス権が必要です。 この記事で説明する関数は、状態データとメタデータへの読み取り専用アクセスを提供します。 バッチ読み取りセマンティクスは、状態情報のクエリにのみ使用できます。

注記

DLT パイプライン、ストリーミングテーブル、またはマテリアライズドビューの状態情報をクエリすることはできません。標準アクセスモードで設定されたサーバレス コンピュートまたはコンピュートを使用して状態情報を照会することはできません。

Read 構造化ストリーミング 状態ストア

サポートされている任意の Databricks Runtime で実行される構造化ストリーミング クエリの状態ストア情報を読み取ることができます。 次の構文を使用します。

Python
df = (spark.read
.format("statestore")
.load("/checkpoint/path"))

次のオプション設定がサポートされています。

オプション

タイプ

デフォルト値

説明

batchId

Long

最新のバッチ ID

読み取り元のターゲット バッチを表します。 クエリの以前の状態の状態情報をクエリするには、このオプションを指定します。 バッチはコミットされている必要がありますが、まだクリーンアップされていません。

operatorId

Long

0

読み込むターゲット演算子を表します。 このオプションは、クエリが複数のステートフル演算子を使用している場合に使用されます。

storeName

文字列

「デフォルト」

読み取り元のターゲット状態ストア名を表します。 このオプションは、ステートフル オペレーターが複数の状態ストア インスタンスを使用する場合に使用されます。 ストリーム-スチーム結合には、 storeName または joinSide のいずれかを指定する必要がありますが、両方を指定することはできません。

joinSide

文字列 ("left" または "right")

読み取り元のターゲット側を表します。 このオプションは、ユーザーがストリームストリーム結合から状態を読み取る場合に使用されます。

返されるデータのスキーマは次のとおりです。

タイプ

説明

key

Struct (さらに、状態キーから派生した型)

ステートフル・オペレーター・レコードのキー。状態チェックポイントのキー。

value

構造体 (状態値から派生したさらに型)

状態チェックポイントのステートフル オペレーター レコードの値。

partition_id

整数タイプ

ステートフル・オペレーター・レコードを含むステート・チェックポイントのパーティション。

構造化ストリーミングの状態メタデータの読み取り

important

Databricks Runtime 14.2 以降でストリーミング クエリを実行して、状態メタデータを記録する必要があります。 状態メタデータ ファイルは、下位互換性を損なうことはありません。 Databricks Runtime 14.1 以下でストリーミング クエリを実行することを選択した場合、既存の状態メタデータ ファイルは無視され、新しい状態メタデータ ファイルは書き込まれません。

Databricks Runtime 14.2 以降で実行される構造化ストリーミング クエリの状態メタデータ情報を読み取ることができます。 次の構文を使用します。

Python
df = (spark.read
.format("state-metadata")
.load("<checkpointLocation>"))

返されるデータのスキーマは次のとおりです。

タイプ

説明

operatorId

整数タイプ

ステートフル ストリーミング演算子の整数 ID。

operatorName

整数タイプ

ステートフル ストリーミング オペレーターの名前。

stateStoreName

文字列

オペレータの状態ストアの名前。

numPartitions

整数タイプ

状態ストアのパーティションの数。

minBatchId

Long

状態のクエリに使用できる最小バッチ ID。

maxBatchId

Long

状態のクエリに使用できる最大バッチ ID。

注記

minBatchIdmaxBatchId によって提供されるバッチ ID 値は、チェックポイントが書き込まれた時点の状態を反映しています。古いバッチはマイクロバッチ実行で自動的にクリーンアップされるため、ここで提供される値がまだ使用可能であるとは限りません。