メインコンテンツまでスキップ

高度なAUTO CDCトピック

このページでは、DML 操作、変更データフィードの読み取り、メトリクスのモニタリング処理など、 AUTO CDCおよびAUTO CDC FROM SNAPSHOTターゲット テーブルを操作するための高度なトピックについて説明します。 AUTO CDC APIsの概要については、 「AUTO CDC APIs : パイプラインを使用した変更データ キャプチャの簡素化」を参照してください。

ターゲットのストリーミングテーブルのデータを追加、変更、または削除します。

パイプラインがテーブルをUnity Catalogに公開する場合、insert、update、delete、merge ステートメントを含むデータ操作言語(DML) ステートメントを使用して、 AUTO CDC ... INTOステートメントによって作成されたターゲット ストリーミング テーブルを変更できます。

注記
  • ストリーミングテーブルのテーブルスキーマを変更する DML ステートメントはサポートされていません。DML ステートメントによってテーブルスキーマの進化が発生しないようにしてください。
  • ストリーミングテーブルを更新する DML ステートメントは、Databricks Runtime 13.3 LTS 以上を使用する共有Unity Catalog クラスターまたはSQL ウェアハウスでのみ実行できます。
  • ストリーミングには追加専用のデータ ソースが必要なため、処理で (DML ステートメントなどによる) 変更を伴うソース ストリーミング テーブルからのストリーミングが必要な場合は、ソース ストリーミング テーブルの読み取り時にSkipChangeCommits フラグを設定します。 skipChangeCommitsが設定されている場合、ソース テーブルのレコードを削除または変更するトランザクションは無視されます。処理にストリーミング テーブルが必要ない場合は、ターゲット テーブルとしてマテリアライズドビュー (追加のみの制限がない) を使用できます。

LakeFlow Spark宣言型パイプラインは指定されたSEQUENCE BY列を使用し、適切な順序値をターゲット テーブルの__START_AT列と__END_AT列に伝播するため ( SCDタイプ 2 の場合)、レコードの適切な順序を維持するために、DML ステートメントでこれらの列に有効な値が使用されていることを確認する必要があります。 AUTO CDC の仕組みをご覧ください。

ストリーミングテーブルでの DML ステートメントの使用の詳細については、 ストリーミングテーブルのデータを追加、変更、または削除するを参照してください。

次の例では、開始シーケンスを 5 としたアクティブレコードを挿入しています。

SQL
INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);
ヒント

SCD タイプ 2 ターゲット テーブル内の__START_AT列と__END_AT列の名前を変更する必要がある場合 (たとえば、ダウンストリーム スキーマの要件に一致させるため)、ターゲット テーブルにビューを作成します。

SQL
CREATE VIEW my_employees_view AS
SELECT
*,
__START_AT AS valid_from,
__END_AT AS valid_to
FROM my_scd2_target_table;

AUTO CDCターゲットテーブルから変更データフィードを読み取ります

Databricks Runtime 15.2 以降では、他の Delta テーブルから変更データフィードを読み取るのと同じ方法で、 AUTO CDCまたはAUTO CDC FROM SNAPSHOTクエリのターゲットであるDeltaテーブルから変更データフィードを読み取ることができます。 ターゲットのストリーミング テーブルから変更データフィードを読み取るには、次のものが必要です。

  • ターゲット ストリーミングテーブルは Unity Catalogにパブリッシュする必要があります。 「パイプラインで Unity Catalog を使用する」を参照してください。
  • ターゲット ストリーミング テーブルから変更データフィードを読み取るには、 Databricks Runtime 15.2 以降を使用する必要があります。 別のパイプラインで変更データフィードを読み取るには、 Databricks Runtime 15.2 以降を使用するようにパイプラインを構成する必要があります。

他の Delta テーブルから変更データフィードを読み取るのと同じ方法で、 Lakeflow Spark宣言型パイプラインで作成されたDeltaストリーミング テーブルから変更データフィードを読み取ります。 Deltaデータフィード機能の使用方法 ( PythonやSQLの例など) の詳細については、 DatabricksでDelta Lakeデータフィードを使用する」を参照してください。

注記

変更データフィード レコードには、変更イベントのタイプを識別するメタデータが含まれています。 テーブル内のレコードが更新されると、関連付けられた変更レコードのメタデータには通常、 update_preimageおよびupdate_postimageイベントに設定された_change_type値が含まれます。

ただし、主キー値の変更を含む更新がターゲット ストリーミング テーブルに行われた場合、 _change_type値は異なります。 変更に主キーの更新が含まれる場合、 _change_typeメタデータ フィールドはinsertおよびdeleteイベントに設定されます。主キーの変更は、 UPDATEまたはMERGEステートメントを使用してキー フィールドの 1 つに手動で更新が行われたとき、または SCD タイプ 2 テーブルの場合は、 __start_atフィールドが以前の開始シーケンス値を反映するように変更されたときに発生する可能性があります。

AUTO CDCクエリは、SCD タイプ 1 と SCD タイプ 2 の処理で異なる主キー値を決定します。

SCD Type

Primary key

SCD type 1, and the pipelines Python interface

The primary key is the value of the keys parameter in the create_auto_cdc_flow() function. For the SQL interface the primary key is the columns defined by the KEYS clause in the AUTO CDC ... INTO statement.

SCD type 2

The primary key is the keys parameter or KEYS clause plus the return value from the coalesce(__START_AT, __END_AT) operation, where __START_AT and __END_AT are the corresponding columns from the target streaming table.

パイプライン内の CDC クエリによって処理されたレコードに関するデータを取得する

注記

次のメトリクスは、 AUTO CDCクエリによってのみキャプチャされ、 AUTO CDC FROM SNAPSHOTクエリによってはキャプチャされません。

次のメトリクスは、 AUTO CDCクエリによってキャプチャされます。

  • num_upserted_rows : 更新中にデータセットにアップサートされた出力行の数。
  • num_deleted_rows : 更新中にデータセットから削除された既存の出力行の数。

非 CDC フローの出力であるnum_output_rowsメトリクスは、 AUTO CDCクエリではキャプチャされません。

パイプラインの CDC 処理に使用されるデータ オブジェクトは何ですか?

Hive metastore でターゲットテーブルを宣言すると、2 つのデータ構造が作成されます。

  • ターゲットテーブルに割り当てられた名前を使用するビュー。
  • CDC 処理を管理するためにパイプラインによって使用される内部バッキング テーブル。このテーブルの名前は、ターゲット テーブル名の前に__apply_changes_storage_を付加して付けられます。

たとえば、 dp_cdc_targetという名前のターゲット テーブルを宣言すると、メタストアにdp_cdc_targetという名前のビューと__apply_changes_storage_dp_cdc_targetという名前のテーブルが表示されます。処理されたデータにアクセスするには、ビューをクエリします。バッキングテーブルを直接変更しないでください。

注記

これらのデータ構造はAUTO CDC処理にのみ適用され、 AUTO CDC FROM SNAPSHOT処理には適用されません。これらは、 Unity Catalogではなく、 Hive metastoreにのみ適用されます。