メインコンテンツまでスキップ

AUTO CDC INTO (Lakeflow 宣言型パイプライン)

AUTO CDC ... INTOステートメントを使用して、Lakeflow宣言型パイプライン チェンジデータキャプチャ(CDC) 機能を使用するフローを作成します。このステートメントは、CDC ソースから変更を読み取り、ストリーミングターゲットに適用します。

構文

CREATE OR REFRESH STREAMING TABLE table_name;

CREATE FLOW flow_name AS AUTO CDC INTO table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]

ターゲットのデータ品質制約は、他の Lakeflow 宣言型パイプラインクエリと同じ CONSTRAINT 句を使用して定義します。パイプラインの期待値を使用してデータ品質を管理するを参照してください。

INSERTUPDATE イベントのデフォルトの動作は、ソースからCDCイベントを アップサート することです。指定されたキーに一致するターゲットテーブルの行を更新するか、ターゲットテーブルに一致するレコードが存在しない場合に新しい行を挿入します。DELETEイベントの処理はAPPLY AS DELETE WHEN条件で指定できます。

important

変更を適用するターゲット ストリーミング テーブルを宣言する必要があります。 オプションで、ターゲット テーブルのスキーマを指定できます。SCD タイプ 2 テーブルの場合、ターゲット テーブルのスキーマを指定するときに、 sequence_byフィールドと同じデータ型の__START_AT列と__END_AT列も含める必要があります。

AUTO CDC API: Lakeflow 宣言型パイプラインでチェンジデータキャプチャをシンプルにを参照してください。

問題

  • flow_name

    作成するフローの名前。

  • source

    データのソース。ソースは ストリーミング ソースである必要があります。ストリーム キーワードを使用して、ストリーミング セマンティクスを使用してソースから読み取ります。 読み取り中に既存のレコードの変更または削除が検出されると、エラーがスローされます。静的ソースまたは追加専用のソースから読み取るのが最も安全です。変更コミットを含むデータを取り込むには、Python とSkipChangeCommitsオプションを使用してエラーを処理できます。

    ストリーミング データの詳細については、 「パイプラインを使用したデータの変換」を参照してください。

  • KEYS

    ソース データ内の行を一意に識別する列または列の組み合わせ。これらの列の値は、ターゲット テーブル内の特定のレコードに適用される CDC イベントを識別するために使用されます。

    列の組み合わせを定義するには、列をコンマで区切ったリストを使用します。

    この条項は必須です。

  • IGNORE NULL UPDATES

    ターゲットカラムのサブセットを含む更新を取り込むことができます。 CDC イベントが既存の行と一致し、IGNORE NULL UPDATES が指定されている場合、 null値を持つ列はターゲット内の既存の値を保持します。これは、 null値を持つネストされた列にも適用されます。

    この句はオプションです。

    デフォルトでは、既存の列がnull値で上書きされます。

  • APPLY AS DELETE WHEN

    CDCイベントをアップサートではなくDELETEとして扱う必要がある場合を指定します。

    SCD タイプ 2 ソースの場合、順不同のデータを処理するために、削除された行は基になる Delta テーブルに廃棄石として一時的に保持され、これらの廃棄石を除外するビューがメタストアに作成されます。保持間隔は、 pipelines.cdc.tombstoneGCThresholdInSeconds テーブル プロパティで構成できます。

    この句はオプションです。

  • APPLY AS TRUNCATE WHEN

    CDCイベントを完全なテーブルTRUNCATEとして扱う必要がある場合を指定します。この句はターゲットテーブルの完全な切り捨てをトリガーするため、この機能が必要な特定のユースケースにのみ使用してください。

    APPLY AS TRUNCATE WHEN句は SCD タイプ 1 でのみサポートされます。SCD タイプ 2 は切り捨て操作をサポートしていません。

    この句はオプションです。

  • SEQUENCE BY

    ソース・データ内の CDC イベントの論理的な順序を指定する列名。Lakeflow 宣言型パイプラインは、このシーケンスを使用して、順不同で到着した変更イベントを処理します。

    順序付けに複数の列が必要な場合は、 STRUCT式を使用します。最初に最初の構造体フィールドで順序付けされ、次に同点の場合は 2 番目のフィールドで順序付けされます。

    指定された列は並べ替え可能なデータ型である必要があります。

    この条項は必須です。

  • COLUMNS

    ターゲット テーブルに含める列のサブセットを指定します。次のいずれかを実行できます。

    • 含める列の完全なリストを指定します: COLUMNS (userId, name, city)
    • 除外する列のリストを指定します: COLUMNS * EXCEPT (operation, sequenceNum)

    この句はオプションです。

    COLUMNS句が指定されていない場合、デフォルトではターゲット テーブルのすべての列が含まれます。

  • STORED AS

    レコードをSCDタイプ1として保存するか、SCDタイプ2として保存するか。

    この句はオプションです。

    デフォルトはSCDタイプ1です。

  • TRACK HISTORY ON

    指定された列に変更があった場合に履歴レコードを生成する出力列のサブセットを指定します。次のいずれかを実行できます。

    • 追跡する列の完全なリストを指定します: COLUMNS (userId, name, city)
    • 追跡から除外する列のリストを指定します。 COLUMNS * EXCEPT (operation, sequenceNum)

    この条項はオプションです。デフォルトでは、変更があった場合にすべての出力列の履歴を追跡します。これはTRACK HISTORY ON *と同じです。

SQL
-- Create a streaming table, then use AUTO CDC to populate it:
CREATE OR REFRESH STREAMING TABLE target;

CREATE FLOW flow
AS AUTO CDC INTO
target
FROM stream(cdc_data.users)
KEYS (userId)
APPLY AS DELETE WHEN operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2
TRACK HISTORY ON * EXCEPT (city);