変更の適用先 (DLT)
APPLY CHANGES INTO
ステートメントを使用して、DLT チェンジデータキャプチャ (CDC) 機能を使用します。このステートメントは、CDC ソースから変更を読み取り、ストリーミングターゲットに適用するフローを作成します。
- CDCの詳細については、「チェンジデータキャプチャ (CDC) とは」を参照してください。
- CDCでの「APPLY CHANGES」の使用の詳細については、「APPLY CHANGES APIs: DLTによるチェンジデータキャプチャの簡素化」を参照してください。
構文
CREATE OR REFRESH STREAMING TABLE table_name;
APPLY CHANGES INTO table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]
APPLY CHANGES
ターゲットのデータ品質制約を定義するには、非APPLY CHANGES
クエリと同じ CONSTRAINT
句を使用します。パイプラインのエクスペクテーションを使用してデータ品質を管理するを参照してください。
INSERT
とUPDATE
イベントのデフォルトの動作は、ソースからCDCイベントを アップサート することです。指定されたキーに一致するターゲットテーブルの行を更新するか、ターゲットテーブルに一致するレコードが存在しない場合に新しい行を挿入します。DELETE
イベントの処理はAPPLY AS DELETE WHEN
条件で指定できます。
変更を適用するターゲット ストリーミングテーブルを宣言する必要があります。 オプションで、ターゲットテーブルのスキーマを指定できます。SCD タイプ 2 テーブルの場合、ターゲットテーブルのスキーマを指定するときに、sequence_by
フィールドと同じデータ型の __START_AT
列と __END_AT
列も含める必要があります。
「 APPLY CHANGES API: DLTによるチェンジデータキャプチャの簡素化」を参照してください。
パラメーター
-
source
データのソース。ソースは ストリーミング ソースである必要があります。ストリーム キーワードを使用して、ストリーミング セマンティクスを使用してソースから読み取ります。 読み取りで既存のレコードに対する変更または削除が検出されると、エラーがスローされます。静的なソースまたは追加専用のソースから読み取るのが最も安全です。変更コミットのあるデータを取り込むには、Python と
SkipChangeCommits
オプションを使用してエラーを処理できます。ストリーミング データの詳細については、「 パイプラインを使用したデータの変換」を参照してください。
-
KEYS
ソース データ内の行を一意に識別する列または列の組み合わせ。これらの列の値は、ターゲット テーブル内の特定のレコードに適用される CDC イベントを識別するために使用されます。
列の組み合わせを定義するには、カンマ区切りの列のリストを使用します。
この条項は必須です。
-
IGNORE NULL UPDATES
ターゲットカラムのサブセットを含む更新の取り込みを許可します。 CDC イベントが既存の行と一致し、IGNORE NULL UPDATES が指定されている場合、
null
値を持つ列はターゲット内の既存の値を保持します。これは、null
値を持つネストされた列にも適用されます。この句はオプションです。
デフォルトでは、既存の列を
null
値で上書きします。 -
APPLY AS DELETE WHEN
CDCイベントをupsertではなく
DELETE
として扱う必要がある場合を指定します。SCD タイプ 2 ソースの場合、順不同のデータを処理するために、削除された行は基になる Delta テーブルに廃棄石として一時的に保持され、これらの廃棄石を除外するビューがメタストアに作成されます。保持間隔は、
pipelines.cdc.tombstoneGCThresholdInSeconds
table プロパティで構成できます。この句はオプションです。
-
APPLY AS TRUNCATE WHEN
CDCイベントを完全なテーブル
TRUNCATE
として扱う必要がある場合を指定します。この句はターゲットテーブルの完全な切り捨てをトリガーするため、この機能が必要な特定のユースケースにのみ使用してください。APPLY AS TRUNCATE WHEN
句は、SCD タイプ 1 でのみサポートされます。SCD タイプ 2 は、切り捨て操作をサポートしていません。この句はオプションです。
-
SEQUENCE BY
ソース・データ内の CDC イベントの論理的な順序を指定する列名。DLT は、このシーケンスを使用して、順不同で到着した変更イベントを処理します。
シーケンス処理に複数の列が必要な場合は、
STRUCT
式を使用します: 最初に最初の構造体フィールドによって順序付けされ、次に同点の場合は 2 番目のフィールドによって順序付けられます。指定する列は、ソート可能なデータ型である必要があります。
この句は必須です。
-
COLUMNS
ターゲット・テーブルに含める列のサブセットを指定します。次のいずれかを実行できます。
- 含める列の完全なリストを指定します:
COLUMNS (userId, name, city)
。 - 除外する列のリストを指定します。
COLUMNS * EXCEPT (operation, sequenceNum)
この句はオプションです。
デフォルトでは、
COLUMNS
句が指定されていない場合、ターゲット・テーブルのすべてのカラムが含まれます。 - 含める列の完全なリストを指定します:
-
STORED AS
レコードをSCDタイプ1として保存するか、SCDタイプ2として保存するか。
この句はオプションです。
デフォルトはSCDタイプ1です。
-
TRACK HISTORY ON
出力列のサブセットを指定して、指定した列に変更があった場合に履歴レコードを生成します。次のいずれかを実行できます。
- 追跡する列の完全なリストを指定します:
COLUMNS (userId, name, city)
。 - トラッキングから除外する列のリストを指定します。
COLUMNS * EXCEPT (operation, sequenceNum)
この句はオプションです。デフォルトでは、変更があった場合にすべての出力カラムの履歴を追跡します。これは
TRACK HISTORY ON *
に相当します。 - 追跡する列の完全なリストを指定します:
例
-- Create a streaming table, then use apply changes into to populate it:
CREATE OR REFRESH STREAMING TABLE target;
APPLY CHANGES INTO target
FROM stream(cdc_data.users)
KEYS (userId)
APPLY AS DELETE WHEN operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2
TRACK HISTORY ON * EXCEPT (city)