APPLY CHANGES APIs: DLT でチェンジデータキャプチャを簡素化
DLTは、CDC APPLY CHANGES
と によりチェンジデータキャプチャ()を簡素化します。APPLY CHANGES FROM SNAPSHOT
APIs使用するインタフェースは、変更データのソースによって異なります。
APPLY CHANGES
を使用して、チェンジデータフィード (CDF) からの変更を処理します。APPLY CHANGES FROM SNAPSHOT
(パブリック プレビュー) を使用して、データベース スナップショットの変更を処理します。
以前は、 MERGE INTO
ステートメントは、Databricks で CDC レコードを処理するために一般的に使用されていました。 ただし、 MERGE INTO
では、レコードの順序が正しくないために正しくない結果が生成されたり、レコードを並べ替えるために複雑なロジックが必要になったりする可能性があります。
APPLY CHANGES
API は、DLT SQL および Python インターフェースでサポートされています。APPLY CHANGES FROM SNAPSHOT
API は DLT Python インターフェースでサポートされています。
APPLY CHANGES
と APPLY CHANGES FROM SNAPSHOT
はどちらも、SCD タイプ 1 およびタイプ 2 を使用したテーブルの更新をサポートしています。
- レコードを直接更新するには、SCD タイプ 1 を使用します。 更新されたレコードの履歴は保持されません。
- SCD type 2を使用して、すべての更新または指定された列の更新に対してレコードの履歴を保持します。
構文およびその他のリファレンスについては、以下を参照してください:
この記事では、ソース データの変更に基づいて DLT パイプラインのテーブルを更新する方法について説明します。テーブルの行レベルの変更情報を記録してクエリする方法については、「Delta でのDelta Lake チェンジデータフィードの使用Databricks 」を参照してください。
必要条件
CDC APIsを使用するには、サーバレス DLT パイプライン、DLT Pro
、または Advanced
エディションを使用するようにパイプラインを構成する必要があります。
CDC は APPLY CHANGES
API を使用してどのように実装されますか?
DLTの APPLY CHANGES
APIは、順不同のレコードを自動的に処理することで、CDCレコードの正しい処理を保証し、順不同のレコードを処理するための複雑なロジックを開発する必要がなくなります。レコードを順序付けるソース・データ内の列を指定する必要があります。これは、DLT がソース・データの適切な順序付けを単調に増加させる表現として解釈します。DLT は、順不同で到着したデータを自動的に処理します。SCD タイプ 2 の変更の場合、DLT は適切な順序付け値をターゲットテーブルの __START_AT
列と __END_AT
列に伝搬します。各シーケンス値でキーごとに 1 つの異なる更新を行う必要があり、NULL シーケンス値はサポートされていません。
APPLY CHANGES
を使用して CDC 処理を実行するには、まずストリーミングテーブルを作成し、次に SQL の APPLY CHANGES INTO
ステートメントまたは Python の apply_changes()
関数を使用して、変更フィードのソース、キー、および順序を指定します。ターゲット ストリーミングテーブルを作成するには、 SQL の CREATE OR REFRESH STREAMING TABLE
ステートメントまたは Pythonの create_streaming_table()
関数を使用します。 SCD Type 1 および Type 2 の処理例を参照してください。
構文の詳細については、DLT SQL リファレンス または Python リファレンスを参照してください。
CDC は APPLY CHANGES FROM SNAPSHOT
API を使用してどのように実装されますか?
プレビュー
APPLY CHANGES FROM SNAPSHOT
API はパブリック プレビュー段階です。
APPLY CHANGES FROM SNAPSHOT
は、一連の順番のスナップショットを比較することでソース データの変更を効率的に判断し、スナップショット内のレコードの CDC 処理に必要な処理を実行する宣言型 API です。 APPLY CHANGES FROM SNAPSHOT
は DLT Python インターフェースでのみサポートされています。
APPLY CHANGES FROM SNAPSHOT
複数のソースタイプからのスナップショットの取り込みをサポートします。
- 定期的なスナップショット インジェストを使用して、既存のテーブルまたはビューからスナップショットをインジェストします。
APPLY CHANGES FROM SNAPSHOT
には、既存のデータベース オブジェクトからのスナップショットの定期的な取り込みをサポートするためのシンプルで合理化されたインターフェイスがあります。 パイプラインが更新されるたびに新しいスナップショットが取り込まれ、取り込み時間がスナップショット バージョンとして使用されます。 パイプラインを連続モードで実行すると、APPLY CHANGES FROM スナップショット処理を含むフローの トリガー間隔 設定によって決定された期間に、各パイプライン更新で複数のスナップショットが取り込まれます。 - 履歴スナップショットの取り込みを使用して、Oracle データベースや MySQL データベース、データウェアハウスから生成されたスナップショットなど、データベース スナップショットを含むファイルを処理します。
APPLY CHANGES FROM SNAPSHOT
を使用して任意のソースタイプから CDC 処理を実行するには、まずストリーミングテーブルを作成し、次に Python の apply_changes_from_snapshot()
関数を使用して、処理の実装に必要なスナップショット、キー、およびその他の引数を指定します。定期的なスナップショットの取り込みと履歴スナップショットの取り込みの例を参照してください。
API に渡されるスナップショットは、バージョンごとに昇順である必要があります。 DLT が順不同のスナップショットを検出すると、エラーがスローされます。
構文の詳細については、DLT Python リファレンスを参照してください。
制限
シーケンス処理に使用する列は、ソート可能なデータ型である必要があります。
例: CDF ソース・データを使用した SCD タイプ 1 および SCD タイプ 2 の処理
次のセクションでは、DLT SCD タイプ 1 およびタイプ 2 クエリの例を示します。これらのクエリは、チェンジデータフィードからのソース イベントに基づいてターゲット テーブルを更新します。
- 新しいユーザー・レコードを作成します。
- ユーザー・レコードを削除します。
- ユーザーレコードを更新します。 SCD タイプ 1 の例では、最後の
UPDATE
操作が遅れて到着し、ターゲットテーブルから削除され、順不同のイベントの処理を示しています。
次の例は、DLT パイプラインの構成と更新に精通していることを前提としています。「 チュートリアル: 初めての DLT パイプラインを実行する」を参照してください。
これらの例を実行するには、まずサンプル データセットを作成する必要があります。 「テスト データの生成」を参照してください。
以下はこれらの例の入力記録です:
ユーザーID | name | 市 | オペレーション | sequenceNum |
---|---|---|---|---|
124 | Raul | Oaxaca | INSERT | 1 |
123 | Isabel | Monterrey | INSERT | 1 |
125 | Mercedes | Tijuana | INSERT | 2 |
126 | Lily | Cancun | INSERT | 2 |
123 | null | null | DELETE | 6 |
125 | Mercedes | Guadalajara | UPDATE | 6 |
125 | Mercedes | Mexicali | UPDATE | 5 |
123 | Isabel | Chihuahua | UPDATE | 5 |
例題データの最後の行のコメントを外すと、レコードを切り捨てる場所を指定する次のレコードが挿入されます:
ユーザーID | name | 市 | オペレーション | sequenceNum |
---|---|---|---|---|
null | null | null | TRUNCATE | 3 |
次のすべての例には、 DELETE
操作と TRUNCATE
操作の両方を指定するオプションが含まれていますが、それぞれ省略可能です。
SCD タイプ 1 更新の処理
次の例は、SCD タイプ 1 更新の処理を示しています。
- Python
- SQL
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
apply_as_truncates = expr("operation = 'TRUNCATE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
APPLY CHANGES INTO
target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
APPLY AS TRUNCATE WHEN
operation = "TRUNCATE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 1;
SCDタイプ1の例を実行すると、ターゲットテーブルには次のレコードが含まれます:
ユーザーID | name | 市 |
---|---|---|
124 | Raul | Oaxaca |
125 | Mercedes | Guadalajara |
126 | Lily | Cancun |
追加のTRUNCATE
レコードを使用してSCDタイプ1の例を実行すると、sequenceNum=3
でのTRUNCATE
操作によりレコード124
と126
が切り捨てられ、ターゲットテーブルには次のレコードが含まれます:
ユーザーID | name | 市 |
---|---|---|
125 | Mercedes | Guadalajara |
SCD タイプ 2 更新の処理
次の例は、SCD タイプ 2 更新の処理を示しています。
- Python
- SQL
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2"
)
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
APPLY CHANGES INTO
target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2;
SCDタイプ2の例を実行した後、ターゲットテーブルには以下のレコードが含まれます:
ユーザーID | name | 市 | __START_AT | __END_AT |
---|---|---|---|---|
123 | Isabel | Monterrey | 1 | 5 |
123 | Isabel | Chihuahua | 5 | 6 |
124 | Raul | Oaxaca | 1 | null |
125 | Mercedes | Tijuana | 2 | 5 |
125 | Mercedes | Mexicali | 5 | 6 |
125 | Mercedes | Guadalajara | 6 | null |
126 | Lily | Cancun | 2 | null |
SCD タイプ 2 クエリでは、ターゲット・テーブル内のヒストリーについて追跡する出力カラムのサブセットを指定することもできます。 他の列への変更は、新しい履歴レコードを生成するのではなく、その場で更新されます。 次の例は、 city
列をトラッキングから除外する方法を示しています。
次の例は、SCDタイプ2でトラックヒストリーを使用する例です:
- Python
- SQL
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2",
track_history_except_column_list = ["city"]
)
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
APPLY CHANGES INTO
target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2
TRACK HISTORY ON * EXCEPT
(city)
この例を追加の TRUNCATE
レコードなしで実行すると、ターゲットテーブルには次のレコードが含まれます。
ユーザーID | name | 市 | __START_AT | __END_AT |
---|---|---|---|---|
123 | Isabel | Chihuahua | 1 | 6 |
124 | Raul | Oaxaca | 1 | null |
125 | Mercedes | Guadalajara | 2 | null |
126 | Lily | Cancun | 2 | null |
テストデータの生成
次のコードは、このチュートリアルで示すクエリ例で使用するデータセットの例を生成するために提供されています。 新しいスキーマを作成し、新しいテーブルを作成するための適切な資格情報があると仮定すると、これらのステートメントはノートブックまたは Databricks SQL で実行できます。 次のコードは、DLT パイプラインの一部として実行するための ものではありません 。
CREATE SCHEMA IF NOT EXISTS cdc_data;
CREATE TABLE
cdc_data.users
AS SELECT
col1 AS userId,
col2 AS name,
col3 AS city,
col4 AS operation,
col5 AS sequenceNum
FROM (
VALUES
-- Initial load.
(124, "Raul", "Oaxaca", "INSERT", 1),
(123, "Isabel", "Monterrey", "INSERT", 1),
-- New users.
(125, "Mercedes", "Tijuana", "INSERT", 2),
(126, "Lily", "Cancun", "INSERT", 2),
-- Isabel is removed from the system and Mercedes moved to Guadalajara.
(123, null, null, "DELETE", 6),
(125, "Mercedes", "Guadalajara", "UPDATE", 6),
-- This batch of updates arrived out of order. The above batch at sequenceNum 6 will be the final state.
(125, "Mercedes", "Mexicali", "UPDATE", 5),
(123, "Isabel", "Chihuahua", "UPDATE", 5)
-- Uncomment to test TRUNCATE.
-- ,(null, null, null, "TRUNCATE", 3)
);
例: 定期的なスナップショット処理
次の例は、 mycatalog.myschema.mytable
に格納されたテーブルのスナップショットを取り込む SCD タイプ 2 処理を示しています。 処理の結果は、 target
という名前のテーブルに書き込まれます。
mycatalog.myschema.mytable
タイムスタンプ 2024-01-01 00:00:00 のレコード
キー | 値 |
---|---|
1 | a1 |
2 | a2 |
mycatalog.myschema.mytable
タイムスタンプ 2024-01-01 12:00:00 のレコード
キー | 値 |
---|---|
2 | b2 |
3 | a3 |
import dlt
@dlt.view(name="source")
def source():
return spark.read.table("mycatalog.myschema.mytable")
dlt.create_streaming_table("target")
dlt.apply_changes_from_snapshot(
target="target",
source="source",
keys=["key"],
stored_as_scd_type=2
)
スナップショットの処理後、ターゲットテーブルには次のレコードが含まれます。
キー | 値 | __START_AT | __END_AT |
---|---|---|---|
1 | a1 | 2024-01-01 00:00:00 | 2024-01-01 12:00:00 |
2 | a2 | 2024-01-01 00:00:00 | 2024-01-01 12:00:00 |
2 | b2 | 2024-01-01 12:00:00 | null |
3 | a3 | 2024-01-01 12:00:00 | null |
例: 履歴スナップショット処理
次の例は、クラウドストレージシステムに保存されている 2 つのスナップショットのソースイベントに基づいてターゲットテーブルを更新する SCD タイプ 2 処理を示しています。
timestamp
のスナップショット、に格納 /<PATH>/filename1.csv
キー | トラッキングカラム | ノントラッキングカラム |
---|---|---|
1 | a1 | B1 の |
2 | a2 | b2 |
4 | a4 | b4 |
timestamp + 5
のスナップショット、に格納 /<PATH>/filename2.csv
キー | トラッキングカラム | ノントラッキングカラム |
---|---|---|
2 | a2_new | b2 |
3 | a3 | B3の |
4 | a4 | b4_new |
次のコード例は、これらのスナップショットを使用した SCD タイプ 2 更新の処理を示しています。
import dlt
def exist(file_name):
# Storage system-dependent function that returns true if file_name exists, false otherwise
# This function returns a tuple, where the first value is a DataFrame containing the snapshot
# records to process, and the second value is the snapshot version representing the logical
# order of the snapshot.
# Returns None if no snapshot exists.
def next_snapshot_and_version(latest_snapshot_version):
latest_snapshot_version = latest_snapshot_version or 0
next_version = latest_snapshot_version + 1
file_name = "dir_path/filename_" + next_version + ".csv"
if (exist(file_name)):
return (spark.read.load(file_name), next_version)
else:
# No snapshot available
return None
dlt.create_streaming_live_table("target")
dlt.apply_changes_from_snapshot(
target = "target",
source = next_snapshot_and_version,
keys = ["Key"],
stored_as_scd_type = 2,
track_history_column_list = ["TrackingCol"]
)
スナップショットの処理後、ターゲットテーブルには次のレコードが含まれます。
キー | トラッキングカラム | ノントラッキングカラム | __START_AT | __END_AT |
---|---|---|---|---|
1 | a1 | B1 の | 1 | 2 |
2 | a2 | b2 | 1 | 2 |
2 | a2_new | b2 | 2 | null |
3 | a3 | B3の | 2 | null |
4 | a4 | b4_new | 1 | null |
ターゲットストリーミングテーブル内のデータを追加、変更、または削除する
パイプラインがテーブルを Unity Catalogにパブリッシュする場合は、 データ操作言語 (DML) ステートメント (insert、update、delete、merge ステートメントなど) を使用して、APPLY CHANGES INTO
ステートメントによって作成されたターゲット ストリーミングテーブルを変更できます。
- ストリーミングテーブルのテーブルスキーマを変更する DML ステートメントはサポートされていません。DML ステートメントによってテーブルスキーマの進化が発生しないようにしてください。
- ストリーミングテーブルを更新する DML ステートメントは、共有Unity Catalog クラスターまたはSQL Databricks Runtime13.3 以上を使用する ウェアハウスでのみ実行できます。LTS
- ストリーミングには追加専用のデータソースが必要なため、変更を伴うソースストリーミングテーブルからのストリーミングが処理に必要な場合 (DML ステートメントなど)、ソースストリーミングテーブルを読み取るときに skipChangeCommits フラグ を設定します。
skipChangeCommits
を設定すると、ソース テーブルのレコードを削除または変更するトランザクションは無視されます。処理にストリーミングテーブルが必要ない場合は、マテリアライズドビュー (追加専用の制限がない) をターゲットテーブルとして使用できます。
DLT は指定された SEQUENCE BY
列を使用し、ターゲットテーブルの __START_AT
列と __END_AT
列 (SCD タイプ 2 の場合) に適切な順序付け値を伝搬するため、レコードの適切な順序を維持するために、DML ステートメントがこれらの列に有効な値を使用するようにする必要があります。CDC は APPLY CHANGES
API でどのように実装されますか?を参照してください。
ストリーミングテーブルでの DML ステートメントの使用の詳細については 、「ストリーミングテーブルのデータを追加、変更、または削除する」を参照してください。
次の例では、開始シーケンスを 5 としたアクティブレコードを挿入しています。
INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);
APPLY CHANGES
ターゲットテーブルからのチェンジデータフィードの読み取り
Databricks Runtime 15.2 以降では、他の Delta テーブルからチェンジデータフィードを読み取るのと同じ方法で、APPLY CHANGES
クエリまたは APPLY CHANGES FROM SNAPSHOT
クエリのターゲットであるストリーミングテーブルからチェンジデータフィードを読み取ることができます。ターゲットストリーミングテーブルからチェンジデータフィードを読み取るには、次のものが必要です。
- ターゲット ストリーミングテーブルは Unity Catalogにパブリッシュする必要があります。 「DLT パイプラインで Unity Catalog を使用する」を参照してください。
- ターゲットストリーミングテーブルからチェンジデータフィードを読み取るには、 Databricks Runtime 15.2以降を使用する必要があります。 チェンジデータフィードを別のDLTパイプラインで読み取るには、 Databricks Runtime 15.2以降を使用するようにパイプラインを構成する必要があります。
DLT パイプラインで作成されたターゲット ストリーミングテーブルからチェンジデータフィードを読み取る方法は、他の Delta テーブルからチェンジデータフィードを読み取るのと同じ方法です。 Deltaチェンジデータフィード機能の使用(Python やSQL の例など)について詳しくは、でのDelta Lake チェンジデータフィードの使用Databricks を参照してください。
チェンジデータフィードレコードには、変更イベントのタイプを識別する メタデータ が含まれています。 テーブル内のレコードが更新されると、関連付けられた変更レコードのメタデータには、通常、update_preimage
イベントと update_postimage
イベントに設定された_change_type
値が含まれます。
ただし、プライマリ・キー値の変更を含む更新がターゲット・ストリーミングテーブルに対して行われた場合、 _change_type
値は異なります。 変更にプライマリ・キーの更新が含まれる場合、 _change_type
メタデータ・フィールドは insert
イベントと delete
イベントに設定されます。 プライマリ・キーの変更は、 UPDATE
ステートメントまたは MERGE
ステートメントを使用してキー・フィールドの 1 つが手動で更新された場合、または SCD タイプ 2 テーブルの場合、 __start_at
・フィールドが以前の開始シーケンス値を反映するように変更された場合に発生する可能性があります。
APPLY CHANGES
クエリは、SCD タイプ 1 と SCD タイプ 2 の処理で異なるプライマリ・キー値を判別します。
- SCD タイプ 1 処理と DLT Python インターフェースの場合、主キーは
apply_changes()
関数のkeys
パラメーターの値です。DLT SQL インターフェースの場合、プライマリ・キーはAPPLY CHANGES INTO
ステートメントのKEYS
節で定義されたカラムです。 - SCDタイプ 2 の場合、プライマリ・キーは
keys
パラメーターまたはKEYS
句にcoalesce(__START_AT, __END_AT)
操作からの戻り値を加えたものです。ここで、__START_AT
と__END_AT
はターゲット ストリーミングテーブルの対応するカラムです。
DLT CDC クエリによって処理されたレコードに関するデータを取得する
次のメトリクスは、 APPLY CHANGES
クエリによってのみキャプチャされ、 APPLY CHANGES FROM SNAPSHOT
クエリではキャプチャされません。
次のメトリクスは、 APPLY CHANGES
クエリによってキャプチャされます。
num_upserted_rows
: 更新中にデータセットにアップサートされた出力行の数。num_deleted_rows
: 更新中にデータセットから削除された既存の出力行の数。
CDC 以外のフローの出力である num_output_rows
メトリクスは、 apply changes
クエリではキャプチャされません。
DLT CDC 処理にはどのようなデータ オブジェクトが使用されますか?
- これらのデータ構造は、
APPLY CHANGES
処理にのみ適用され、APPLY CHANGES FROM SNAPSHOT
処理には適用されません。 - これらのデータ構造は、ターゲット表が Hive metastoreにパブリッシュされる場合にのみ適用されます。 パイプラインが Unity Catalog に発行される場合、ユーザーは内部バッキング テーブルにアクセスできません。
Hive metastore でターゲットテーブルを宣言すると、2 つのデータ構造が作成されます。
- ターゲットテーブルに割り当てられた名前を使用するビュー。
- DLT が CDC 処理を管理するために使用する内部バッキング・テーブル。このテーブルの名前は、ターゲット テーブル名の先頭に
__apply_changes_storage_
を付加して付けられます。
たとえば、 dlt_cdc_target
という名前のターゲット テーブルを宣言すると、メタストアに dlt_cdc_target
という名前のビューと __apply_changes_storage_dlt_cdc_target
という名前のテーブルが表示されます。ビューを作成すると、DLT は順不同のデータを処理するために必要な追加情報 (廃棄石やバージョンなど) を除外できます。処理されたデータを表示するには、ターゲット ビューをクエリします。__apply_changes_storage_
テーブルのスキーマは、将来の機能や拡張機能をサポートするために変更される可能性があるため、本番運用で使用するためにテーブルをクエリしないでください。テーブルにデータを手動で追加すると、バージョン列が欠落しているため、レコードは他の変更よりも前に来ると見なされます。