メインコンテンツまでスキップ

APPLY CHANGES APIs: DLT でチェンジデータキャプチャを簡素化

DLTは、CDC APPLY CHANGESと によりチェンジデータキャプチャ()を簡素化します。APPLY CHANGES FROM SNAPSHOTAPIs使用するインタフェースは、変更データのソースによって異なります。

  • APPLY CHANGES を使用して、チェンジデータフィード (CDF) からの変更を処理します。
  • APPLY CHANGES FROM SNAPSHOT (パブリック プレビュー) を使用して、データベース スナップショットの変更を処理します。

以前は、 MERGE INTO ステートメントは、Databricks で CDC レコードを処理するために一般的に使用されていました。 ただし、 MERGE INTO では、レコードの順序が正しくないために正しくない結果が生成されたり、レコードを並べ替えるために複雑なロジックが必要になったりする可能性があります。

APPLY CHANGES API は、DLT SQL および Python インターフェースでサポートされています。APPLY CHANGES FROM SNAPSHOT API は DLT Python インターフェースでサポートされています。

APPLY CHANGESAPPLY CHANGES FROM SNAPSHOT はどちらも、SCD タイプ 1 およびタイプ 2 を使用したテーブルの更新をサポートしています。

  • レコードを直接更新するには、SCD タイプ 1 を使用します。 更新されたレコードの履歴は保持されません。
  • SCD type 2を使用して、すべての更新または指定された列の更新に対してレコードの履歴を保持します。

構文およびその他のリファレンスについては、以下を参照してください:

注記

この記事では、ソース データの変更に基づいて DLT パイプラインのテーブルを更新する方法について説明します。テーブルの行レベルの変更情報を記録してクエリする方法については、「Delta でのDelta Lake チェンジデータフィードの使用Databricks 」を参照してください。

必要条件

CDC APIsを使用するには、サーバレス DLT パイプライン、DLT Pro、または Advanced エディションを使用するようにパイプラインを構成する必要があります。

CDC は APPLY CHANGES API を使用してどのように実装されますか?

DLTの APPLY CHANGES APIは、順不同のレコードを自動的に処理することで、CDCレコードの正しい処理を保証し、順不同のレコードを処理するための複雑なロジックを開発する必要がなくなります。レコードを順序付けるソース・データ内の列を指定する必要があります。これは、DLT がソース・データの適切な順序付けを単調に増加させる表現として解釈します。DLT は、順不同で到着したデータを自動的に処理します。SCD タイプ 2 の変更の場合、DLT は適切な順序付け値をターゲットテーブルの __START_AT 列と __END_AT 列に伝搬します。各シーケンス値でキーごとに 1 つの異なる更新を行う必要があり、NULL シーケンス値はサポートされていません。

APPLY CHANGESを使用して CDC 処理を実行するには、まずストリーミングテーブルを作成し、次に SQL の APPLY CHANGES INTO ステートメントまたは Python の apply_changes() 関数を使用して、変更フィードのソース、キー、および順序を指定します。ターゲット ストリーミングテーブルを作成するには、 SQL の CREATE OR REFRESH STREAMING TABLE ステートメントまたは Pythonの create_streaming_table() 関数を使用します。 SCD Type 1 および Type 2 の処理例を参照してください。

構文の詳細については、DLT SQL リファレンス または Python リファレンスを参照してください。

CDC は APPLY CHANGES FROM SNAPSHOT API を使用してどのように実装されますか?

備考

プレビュー

APPLY CHANGES FROM SNAPSHOT API はパブリック プレビュー段階です。

APPLY CHANGES FROM SNAPSHOT は、一連の順番のスナップショットを比較することでソース データの変更を効率的に判断し、スナップショット内のレコードの CDC 処理に必要な処理を実行する宣言型 API です。 APPLY CHANGES FROM SNAPSHOT は DLT Python インターフェースでのみサポートされています。

APPLY CHANGES FROM SNAPSHOT 複数のソースタイプからのスナップショットの取り込みをサポートします。

  • 定期的なスナップショット インジェストを使用して、既存のテーブルまたはビューからスナップショットをインジェストします。 APPLY CHANGES FROM SNAPSHOT には、既存のデータベース オブジェクトからのスナップショットの定期的な取り込みをサポートするためのシンプルで合理化されたインターフェイスがあります。 パイプラインが更新されるたびに新しいスナップショットが取り込まれ、取り込み時間がスナップショット バージョンとして使用されます。 パイプラインを連続モードで実行すると、APPLY CHANGES FROM スナップショット処理を含むフローの トリガー間隔 設定によって決定された期間に、各パイプライン更新で複数のスナップショットが取り込まれます。
  • 履歴スナップショットの取り込みを使用して、Oracle データベースや MySQL データベース、データウェアハウスから生成されたスナップショットなど、データベース スナップショットを含むファイルを処理します。

APPLY CHANGES FROM SNAPSHOTを使用して任意のソースタイプから CDC 処理を実行するには、まずストリーミングテーブルを作成し、次に Python の apply_changes_from_snapshot() 関数を使用して、処理の実装に必要なスナップショット、キー、およびその他の引数を指定します。定期的なスナップショットの取り込み履歴スナップショットの取り込みの例を参照してください。

API に渡されるスナップショットは、バージョンごとに昇順である必要があります。 DLT が順不同のスナップショットを検出すると、エラーがスローされます。

構文の詳細については、DLT Python リファレンスを参照してください。

制限

シーケンス処理に使用する列は、ソート可能なデータ型である必要があります。

例: CDF ソース・データを使用した SCD タイプ 1 および SCD タイプ 2 の処理

次のセクションでは、DLT SCD タイプ 1 およびタイプ 2 クエリの例を示します。これらのクエリは、チェンジデータフィードからのソース イベントに基づいてターゲット テーブルを更新します。

  1. 新しいユーザー・レコードを作成します。
  2. ユーザー・レコードを削除します。
  3. ユーザーレコードを更新します。 SCD タイプ 1 の例では、最後の UPDATE 操作が遅れて到着し、ターゲットテーブルから削除され、順不同のイベントの処理を示しています。

次の例は、DLT パイプラインの構成と更新に精通していることを前提としています。「 チュートリアル: 初めての DLT パイプラインを実行する」を参照してください。

これらの例を実行するには、まずサンプル データセットを作成する必要があります。 「テスト データの生成」を参照してください。

以下はこれらの例の入力記録です:

ユーザーID

name

オペレーション

sequenceNum

124

Raul

Oaxaca

INSERT

1

123

Isabel

Monterrey

INSERT

1

125

Mercedes

Tijuana

INSERT

2

126

Lily

Cancun

INSERT

2

123

null

null

DELETE

6

125

Mercedes

Guadalajara

UPDATE

6

125

Mercedes

Mexicali

UPDATE

5

123

Isabel

Chihuahua

UPDATE

5

例題データの最後の行のコメントを外すと、レコードを切り捨てる場所を指定する次のレコードが挿入されます:

ユーザーID

name

オペレーション

sequenceNum

null

null

null

TRUNCATE

3

注記

次のすべての例には、 DELETE 操作と TRUNCATE 操作の両方を指定するオプションが含まれていますが、それぞれ省略可能です。

SCD タイプ 1 更新の処理

次の例は、SCD タイプ 1 更新の処理を示しています。

Python
import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
apply_as_truncates = expr("operation = 'TRUNCATE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)

SCDタイプ1の例を実行すると、ターゲットテーブルには次のレコードが含まれます:

ユーザーID

name

124

Raul

Oaxaca

125

Mercedes

Guadalajara

126

Lily

Cancun

追加のTRUNCATEレコードを使用してSCDタイプ1の例を実行すると、sequenceNum=3でのTRUNCATE操作によりレコード124126が切り捨てられ、ターゲットテーブルには次のレコードが含まれます:

ユーザーID

name

125

Mercedes

Guadalajara

SCD タイプ 2 更新の処理

次の例は、SCD タイプ 2 更新の処理を示しています。

Python
import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2"
)

SCDタイプ2の例を実行した後、ターゲットテーブルには以下のレコードが含まれます:

ユーザーID

name

__START_AT

__END_AT

123

Isabel

Monterrey

1

5

123

Isabel

Chihuahua

5

6

124

Raul

Oaxaca

1

null

125

Mercedes

Tijuana

2

5

125

Mercedes

Mexicali

5

6

125

Mercedes

Guadalajara

6

null

126

Lily

Cancun

2

null

SCD タイプ 2 クエリでは、ターゲット・テーブル内のヒストリーについて追跡する出力カラムのサブセットを指定することもできます。 他の列への変更は、新しい履歴レコードを生成するのではなく、その場で更新されます。 次の例は、 city 列をトラッキングから除外する方法を示しています。

次の例は、SCDタイプ2でトラックヒストリーを使用する例です:

Python
import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2",
track_history_except_column_list = ["city"]
)

この例を追加の TRUNCATE レコードなしで実行すると、ターゲットテーブルには次のレコードが含まれます。

ユーザーID

name

__START_AT

__END_AT

123

Isabel

Chihuahua

1

6

124

Raul

Oaxaca

1

null

125

Mercedes

Guadalajara

2

null

126

Lily

Cancun

2

null

テストデータの生成

次のコードは、このチュートリアルで示すクエリ例で使用するデータセットの例を生成するために提供されています。 新しいスキーマを作成し、新しいテーブルを作成するための適切な資格情報があると仮定すると、これらのステートメントはノートブックまたは Databricks SQL で実行できます。 次のコードは、DLT パイプラインの一部として実行するための ものではありません

SQL
CREATE SCHEMA IF NOT EXISTS cdc_data;

CREATE TABLE
cdc_data.users
AS SELECT
col1 AS userId,
col2 AS name,
col3 AS city,
col4 AS operation,
col5 AS sequenceNum
FROM (
VALUES
-- Initial load.
(124, "Raul", "Oaxaca", "INSERT", 1),
(123, "Isabel", "Monterrey", "INSERT", 1),
-- New users.
(125, "Mercedes", "Tijuana", "INSERT", 2),
(126, "Lily", "Cancun", "INSERT", 2),
-- Isabel is removed from the system and Mercedes moved to Guadalajara.
(123, null, null, "DELETE", 6),
(125, "Mercedes", "Guadalajara", "UPDATE", 6),
-- This batch of updates arrived out of order. The above batch at sequenceNum 6 will be the final state.
(125, "Mercedes", "Mexicali", "UPDATE", 5),
(123, "Isabel", "Chihuahua", "UPDATE", 5)
-- Uncomment to test TRUNCATE.
-- ,(null, null, null, "TRUNCATE", 3)
);

例: 定期的なスナップショット処理

次の例は、 mycatalog.myschema.mytableに格納されたテーブルのスナップショットを取り込む SCD タイプ 2 処理を示しています。 処理の結果は、 targetという名前のテーブルに書き込まれます。

mycatalog.myschema.mytable タイムスタンプ 2024-01-01 00:00:00 のレコード

キー

1

a1

2

a2

mycatalog.myschema.mytable タイムスタンプ 2024-01-01 12:00:00 のレコード

キー

2

b2

3

a3

Python
import dlt

@dlt.view(name="source")
def source():
return spark.read.table("mycatalog.myschema.mytable")

dlt.create_streaming_table("target")

dlt.apply_changes_from_snapshot(
target="target",
source="source",
keys=["key"],
stored_as_scd_type=2
)

スナップショットの処理後、ターゲットテーブルには次のレコードが含まれます。

キー

__START_AT

__END_AT

1

a1

2024-01-01 00:00:00

2024-01-01 12:00:00

2

a2

2024-01-01 00:00:00

2024-01-01 12:00:00

2

b2

2024-01-01 12:00:00

null

3

a3

2024-01-01 12:00:00

null

例: 履歴スナップショット処理

次の例は、クラウドストレージシステムに保存されている 2 つのスナップショットのソースイベントに基づいてターゲットテーブルを更新する SCD タイプ 2 処理を示しています。

timestampのスナップショット、に格納 /<PATH>/filename1.csv

キー

トラッキングカラム

ノントラッキングカラム

1

a1

B1 の

2

a2

b2

4

a4

b4

timestamp + 5のスナップショット、に格納 /<PATH>/filename2.csv

キー

トラッキングカラム

ノントラッキングカラム

2

a2_new

b2

3

a3

B3の

4

a4

b4_new

次のコード例は、これらのスナップショットを使用した SCD タイプ 2 更新の処理を示しています。

Python
import dlt

def exist(file_name):
# Storage system-dependent function that returns true if file_name exists, false otherwise

# This function returns a tuple, where the first value is a DataFrame containing the snapshot
# records to process, and the second value is the snapshot version representing the logical
# order of the snapshot.
# Returns None if no snapshot exists.
def next_snapshot_and_version(latest_snapshot_version):
latest_snapshot_version = latest_snapshot_version or 0
next_version = latest_snapshot_version + 1
file_name = "dir_path/filename_" + next_version + ".csv"
if (exist(file_name)):
return (spark.read.load(file_name), next_version)
else:
# No snapshot available
return None

dlt.create_streaming_live_table("target")

dlt.apply_changes_from_snapshot(
target = "target",
source = next_snapshot_and_version,
keys = ["Key"],
stored_as_scd_type = 2,
track_history_column_list = ["TrackingCol"]
)

スナップショットの処理後、ターゲットテーブルには次のレコードが含まれます。

キー

トラッキングカラム

ノントラッキングカラム

__START_AT

__END_AT

1

a1

B1 の

1

2

2

a2

b2

1

2

2

a2_new

b2

2

null

3

a3

B3の

2

null

4

a4

b4_new

1

null

ターゲットストリーミングテーブル内のデータを追加、変更、または削除する

パイプラインがテーブルを Unity Catalogにパブリッシュする場合は、 データ操作言語 (DML) ステートメント (insert、update、delete、merge ステートメントなど) を使用して、APPLY CHANGES INTO ステートメントによって作成されたターゲット ストリーミングテーブルを変更できます。

注記
  • ストリーミングテーブルのテーブルスキーマを変更する DML ステートメントはサポートされていません。DML ステートメントによってテーブルスキーマの進化が発生しないようにしてください。
  • ストリーミングテーブルを更新する DML ステートメントは、共有Unity Catalog クラスターまたはSQL Databricks Runtime13.3 以上を使用する ウェアハウスでのみ実行できます。LTS
  • ストリーミングには追加専用のデータソースが必要なため、変更を伴うソースストリーミングテーブルからのストリーミングが処理に必要な場合 (DML ステートメントなど)、ソースストリーミングテーブルを読み取るときに skipChangeCommits フラグ を設定します。 skipChangeCommits を設定すると、ソース テーブルのレコードを削除または変更するトランザクションは無視されます。処理にストリーミングテーブルが必要ない場合は、マテリアライズドビュー (追加専用の制限がない) をターゲットテーブルとして使用できます。

DLT は指定された SEQUENCE BY 列を使用し、ターゲットテーブルの __START_AT 列と __END_AT 列 (SCD タイプ 2 の場合) に適切な順序付け値を伝搬するため、レコードの適切な順序を維持するために、DML ステートメントがこれらの列に有効な値を使用するようにする必要があります。CDC は APPLY CHANGES API でどのように実装されますか?を参照してください。

ストリーミングテーブルでの DML ステートメントの使用の詳細については 、「ストリーミングテーブルのデータを追加、変更、または削除する」を参照してください。

次の例では、開始シーケンスを 5 としたアクティブレコードを挿入しています。

SQL
INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);

APPLY CHANGESターゲットテーブルからのチェンジデータフィードの読み取り

Databricks Runtime 15.2 以降では、他の Delta テーブルからチェンジデータフィードを読み取るのと同じ方法で、APPLY CHANGES クエリまたは APPLY CHANGES FROM SNAPSHOT クエリのターゲットであるストリーミングテーブルからチェンジデータフィードを読み取ることができます。ターゲットストリーミングテーブルからチェンジデータフィードを読み取るには、次のものが必要です。

  • ターゲット ストリーミングテーブルは Unity Catalogにパブリッシュする必要があります。 「DLT パイプラインで Unity Catalog を使用する」を参照してください。
  • ターゲットストリーミングテーブルからチェンジデータフィードを読み取るには、 Databricks Runtime 15.2以降を使用する必要があります。 チェンジデータフィードを別のDLTパイプラインで読み取るには、 Databricks Runtime 15.2以降を使用するようにパイプラインを構成する必要があります。

DLT パイプラインで作成されたターゲット ストリーミングテーブルからチェンジデータフィードを読み取る方法は、他の Delta テーブルからチェンジデータフィードを読み取るのと同じ方法です。 Deltaチェンジデータフィード機能の使用(Python やSQL の例など)について詳しくは、でのDelta Lake チェンジデータフィードの使用Databricks を参照してください。

注記

チェンジデータフィードレコードには、変更イベントのタイプを識別する メタデータ が含まれています。 テーブル内のレコードが更新されると、関連付けられた変更レコードのメタデータには、通常、update_preimage イベントと update_postimage イベントに設定された_change_type値が含まれます。

ただし、プライマリ・キー値の変更を含む更新がターゲット・ストリーミングテーブルに対して行われた場合、 _change_type 値は異なります。 変更にプライマリ・キーの更新が含まれる場合、 _change_type メタデータ・フィールドは insert イベントと delete イベントに設定されます。 プライマリ・キーの変更は、 UPDATE ステートメントまたは MERGE ステートメントを使用してキー・フィールドの 1 つが手動で更新された場合、または SCD タイプ 2 テーブルの場合、 __start_at ・フィールドが以前の開始シーケンス値を反映するように変更された場合に発生する可能性があります。

APPLY CHANGES クエリは、SCD タイプ 1 と SCD タイプ 2 の処理で異なるプライマリ・キー値を判別します。

  • SCD タイプ 1 処理と DLT Python インターフェースの場合、主キーは apply_changes() 関数の keys パラメーターの値です。DLT SQL インターフェースの場合、プライマリ・キーは APPLY CHANGES INTO ステートメントの KEYS 節で定義されたカラムです。
  • SCDタイプ 2 の場合、プライマリ・キーは keys パラメーターまたは KEYS 句に coalesce(__START_AT, __END_AT) 操作からの戻り値を加えたものです。ここで、__START_AT__END_AT はターゲット ストリーミングテーブルの対応するカラムです。

DLT CDC クエリによって処理されたレコードに関するデータを取得する

注記

次のメトリクスは、 APPLY CHANGES クエリによってのみキャプチャされ、 APPLY CHANGES FROM SNAPSHOT クエリではキャプチャされません。

次のメトリクスは、 APPLY CHANGES クエリによってキャプチャされます。

  • num_upserted_rows : 更新中にデータセットにアップサートされた出力行の数。
  • num_deleted_rows : 更新中にデータセットから削除された既存の出力行の数。

CDC 以外のフローの出力である num_output_rows メトリクスは、 apply changes クエリではキャプチャされません。

DLT CDC 処理にはどのようなデータ オブジェクトが使用されますか?

注記
  • これらのデータ構造は、 APPLY CHANGES 処理にのみ適用され、 APPLY CHANGES FROM SNAPSHOT 処理には適用されません。
  • これらのデータ構造は、ターゲット表が Hive metastoreにパブリッシュされる場合にのみ適用されます。 パイプラインが Unity Catalog に発行される場合、ユーザーは内部バッキング テーブルにアクセスできません。

Hive metastore でターゲットテーブルを宣言すると、2 つのデータ構造が作成されます。

  • ターゲットテーブルに割り当てられた名前を使用するビュー。
  • DLT が CDC 処理を管理するために使用する内部バッキング・テーブル。このテーブルの名前は、ターゲット テーブル名の先頭に __apply_changes_storage_ を付加して付けられます。

たとえば、 dlt_cdc_targetという名前のターゲット テーブルを宣言すると、メタストアに dlt_cdc_target という名前のビューと __apply_changes_storage_dlt_cdc_target という名前のテーブルが表示されます。ビューを作成すると、DLT は順不同のデータを処理するために必要な追加情報 (廃棄石やバージョンなど) を除外できます。処理されたデータを表示するには、ターゲット ビューをクエリします。__apply_changes_storage_テーブルのスキーマは、将来の機能や拡張機能をサポートするために変更される可能性があるため、本番運用で使用するためにテーブルをクエリしないでください。テーブルにデータを手動で追加すると、バージョン列が欠落しているため、レコードは他の変更よりも前に来ると見なされます。