APPLY CHANGES API : Delta Live Tablesでチェンジデータのキャプチャを簡素化

Delta Live TablesはAPPLY CHANGESAPPLY CHANGES FROM SNAPSHOTAPIを用いて、チェンジデータキャプチャ (CDC )をシンプルにします。使用するインターフェースは、変更データのソースによって異なります。

  • APPLY CHANGESを使用して、チェンジデータフィード (CDF) からの変更を処理します。

  • データベース スナップショットの変更を処理するには、 APPLY CHANGES FROM SNAPSHOT (パブリック プレビュー) を使用します。

以前は、 MERGE INTOステートメントは Databricks 上の CDC レコードの処理によく使用されていました。 ただし、 MERGE INTO では、レコードの順序が正しくないために正しくない結果が生成されたり、レコードを並べ替えるために複雑なロジックが必要になったりする可能性があります。

APPLY CHANGES API は、Delta Live Tables SQL および Python インターフェースでサポートされています。 APPLY CHANGES FROM SNAPSHOT API は Delta Live Tables Python インターフェースでサポートされています。

APPLY CHANGESAPPLY CHANGES FROM SNAPSHOTはどちらも、SCD タイプ 1 とタイプ 2 を使用したテーブルの更新をサポートしています。

  • レコードを直接更新するには、SCD タイプ 1 を使用します。 更新されたレコードの履歴は保持されません。

  • SCD type 2を使用して、すべての更新または指定された列の更新に対してレコードの履歴を保持します。

構文およびその他のリファレンスについては、以下を参照してください:

この記事では、ソースデータの変更に基づいてDelta Live Tablesパイプラインのテーブルを更新する方法について説明します。Deltaテーブルの行レベルの変更情報を記録およびクエリーする方法については、「DatabricksでDelta Lake変更データフィードを使用する」を参照してください。

要件

CDC APIsを使用するには、サーバレス DLT パイプライン、Delta Live Tables Pro エディション、または AdvancedSAP エディションを使用するようにパイプラインを構成する必要があります。

APPLY CHANGES API で CDC はどのように実装されますか?

Delta Live Tables のAPPLY CHANGES API は、順序が正しくないレコードを自動的に処理することで、CDC レコードが正しく処理されることを保証し、順序が正しくないレコードを処理するための複雑なロジックを開発する必要がなくなります。 レコードを順序付けるソース データ内の列を指定する必要があります。Delta Live Tables は、これをソース データの適切な順序の単調増加表現として解釈します。 Delta Live Tables は、順序どおりに到着しないデータを自動的に処理します。 SCD タイプ 2 の変更の場合、Delta Live Tables は適切なシーケンス値をターゲット テーブルの__START_AT__END_AT列に伝播します。 各シーケンス値でキーごとに 1 つの異なる更新を行う必要があり、NULL シーケンス値はサポートされていません。

APPLY CHANGESを使用して CDC 処理を実行するには、まずストリーミング テーブルを作成し、次に SQL のAPPLY CHANGES INTOステートメントまたは Python のapply_changes()関数を使用して、変更フィードのソース、キー、およびシーケンスを指定します。 ターゲット ストリーミング テーブルを作成するには、SQL のCREATE OR REFRESH STREAMING TABLEステートメントまたは Python のcreate_streaming_table()関数を使用します。 SCD タイプ 1 およびタイプ 2 の処理例を参照してください。

構文の詳細については、Delta Live Tables SQL リファレンスまたはPython リファレンスを参照してください。

APPLY CHANGES FROM SNAPSHOT API では CDC はどのように実装されますか?

プレビュー

APPLY CHANGES FROM SNAPSHOT API はパブリック プレビュー段階です。

APPLY CHANGES FROM SNAPSHOT 一連の順序どおりのスナップショットを比較することでソース データの変更を効率的に判断し、スナップショット内のレコードの CDC 処理に必要な処理を実行する宣言型 API です。 APPLY CHANGES FROM SNAPSHOTは Delta Live Tables Python インターフェースでのみサポートされます。

APPLY CHANGES FROM SNAPSHOT 複数のソース タイプからのスナップショットの取り込みをサポートします。

  • 定期的なスナップショット インジェストを使用して、既存のテーブルまたはビューからスナップショットをインジェストします。 APPLY CHANGES FROM SNAPSHOT には、既存のデータベース オブジェクトからのスナップショットの定期的な取り込みをサポートするためのシンプルで合理化されたインターフェイスがあります。 パイプラインが更新されるたびに新しいスナップショットが取り込まれ、取り込み時間がスナップショット バージョンとして使用されます。 パイプラインを連続モードで実行すると、APPLY CHANGES FROM スナップショット処理を含むフローの トリガー間隔 設定によって決定された期間に、各パイプライン更新で複数のスナップショットが取り込まれます。

  • 履歴 ID 取り込みを使用して、Oracle またはMySQLデータベースやデータウェアハウスから生成された ID などのデータベース ID を含むファイルを処理します。

APPLY CHANGES FROM SNAPSHOTを使用して任意のソース タイプから CDC 処理を実行するには、まずストリーミング テーブルを作成し、次に Python のapply_changes_from_snapshot()関数を使用して、処理の実装に必要なスナップショット、キー、およびその他の引数を指定します。 定期的なスナップショットの取り込み履歴スナップショットの取り込みの例を参照してください。

API に渡されるスナップショットは、バージョンの昇順になっている必要があります。 Delta Live Tables が順序が正しくないスナップショットを検出すると、エラーがスローされます。

構文の詳細については、Delta Live Tables Python リファレンスを参照してください。

制限

シーケンス処理に使用する列は、ソート可能なデータ型である必要があります。

例: CDF ソース データを使用した SCD タイプ 1 および SCD タイプ 2 の処理

次のセクションでは、次のような チェンジデータ フィードからのソース イベントに基づいてターゲット テーブルを更新するDelta Live Tables SCDタイプ 1 およびタイプ 2 クエリの例を示します。

  1. 新しいユーザー・レコードを作成します。

  2. ユーザー・レコードを削除します。

  3. ユーザーレコードを更新します。 SCD タイプ 1 の例では、最後のUPDATE操作が遅れて到着し、ターゲット テーブルから削除され、順序どおりでないイベントの処理が示されます。

次の例は、Delta Live Tables パイプラインの構成と更新に精通していることを前提としています。「チュートリアル:最初の Delta Live Tables パイプラインを実行する」を参照してください。

これらの例を実行するには、まずサンプル データセットを作成する必要があります。 「テスト データの生成」を参照してください。

以下はこれらの例の入力記録です:

ユーザーID

名前

オペレーション

sequenceNum

124

Raul

Oaxaca

INSERT

1

123

Isabel

Monterrey

INSERT

1

125

Mercedes

Tijuana

INSERT

2

126

Lily

Cancun

INSERT

2

123

null

null

DELETE

6

125

Mercedes

Guadalajara

UPDATE

6

125

Mercedes

Mexicali

UPDATE

5

123

Isabel

Chihuahua

UPDATE

5

例題データの最後の行のコメントを外すと、レコードを切り捨てる場所を指定する次のレコードが挿入されます:

ユーザーID

名前

オペレーション

sequenceNum

null

null

null

TRUNCATE

3

次のすべての例には、 DELETE 操作と TRUNCATE 操作の両方を指定するオプションが含まれていますが、それぞれ省略可能です。

SCDタイプ1更新の処理

次の例は、SCD タイプ 1 の更新の処理を示しています。

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  apply_as_truncates = expr("operation = 'TRUNCATE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = 1
)
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
APPLY AS TRUNCATE WHEN
  operation = "TRUNCATE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 1;

SCDタイプ1の例を実行すると、ターゲットテーブルには次のレコードが含まれます:

ユーザーID

名前

124

Raul

Oaxaca

125

Mercedes

Guadalajara

126

Lily

Cancun

追加のTRUNCATEレコードを使用してSCDタイプ1の例を実行すると、sequenceNum=3でのTRUNCATE操作によりレコード124126が切り捨てられ、ターゲットテーブルには次のレコードが含まれます:

ユーザーID

名前

125

Mercedes

Guadalajara

SCDタイプ2更新の処理

次の例は、SCD タイプ 2 の更新の処理を示しています。

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2"
)
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2;

SCDタイプ2の例を実行した後、ターゲットテーブルには以下のレコードが含まれます:

ユーザーID

名前

__START_AT

__END_AT

123

Isabel

Monterrey

1

5

123

Isabel

Chihuahua

5

6

124

Raul

Oaxaca

1

null

125

Mercedes

Tijuana

2

5

125

Mercedes

Mexicali

5

6

125

Mercedes

Guadalajara

6

null

126

Lily

Cancun

2

null

SCD タイプ 2 クエリーでは、ターゲット表のヒストリーを追跡する出力列のサブセットを指定することもできます。 他の列への変更は、新しい履歴レコードを生成するのではなく、その場で更新されます。 次の例は、追跡から city 列を除外する方法を示しています。

次の例は、SCDタイプ2でトラックヒストリーを使用する例です:

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2",
  track_history_except_column_list = ["city"]
)
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2
TRACK HISTORY ON * EXCEPT
  (city)

追加の TRUNCATE レコードを指定せずにこの例を実行すると、ターゲット表には以下のレコードが含まれます。

ユーザーID

名前

__START_AT

__END_AT

123

Isabel

Chihuahua

1

6

124

Raul

Oaxaca

1

null

125

Mercedes

Guadalajara

2

null

126

Lily

Cancun

2

null

テストデータの作成

以下のコードは、このチュートリアルにあるサンプルクエリで使用するためのサンプルデータセットを生成するために提供されています。 新しいスキーマを作成して新しいテーブルを作成するための適切な資格情報を持っていると仮定すると、ノートブックまたは Databricks SQL のいずれかを使用してこれらのステートメントを実行できます。 次のコードは、Delta Live Tables パイプラインの一部として実行されることを意図したものではありません

CREATE SCHEMA IF NOT EXISTS cdc_data;

CREATE TABLE
  cdc_data.users
AS SELECT
  col1 AS userId,
  col2 AS name,
  col3 AS city,
  col4 AS operation,
  col5 AS sequenceNum
FROM (
  VALUES
  -- Initial load.
  (124, "Raul",     "Oaxaca",      "INSERT", 1),
  (123, "Isabel",   "Monterrey",   "INSERT", 1),
  -- New users.
  (125, "Mercedes", "Tijuana",     "INSERT", 2),
  (126, "Lily",     "Cancun",      "INSERT", 2),
  -- Isabel is removed from the system and Mercedes moved to Guadalajara.
  (123, null,       null,          "DELETE", 6),
  (125, "Mercedes", "Guadalajara", "UPDATE", 6),
  -- This batch of updates arrived out of order. The above batch at sequenceNum 5 will be the final state.
  (125, "Mercedes", "Mexicali",    "UPDATE", 5),
  (123, "Isabel",   "Chihuahua",   "UPDATE", 5)
  -- Uncomment to test TRUNCATE.
  -- ,(null, null,      null,          "TRUNCATE", 3)
);

例: 定期的なスナップショット処理

次の例は、 mycatalog.myschema.mytableに保存されているテーブルのスナップショットを取り込む SCD タイプ 2 処理を示しています。 処理の結果は、 targetという名前のテーブルに書き込まれます。

mycatalog.myschema.mytable タイムスタンプ 2024-01-01 00:00:00 のレコード

キー

1

a1

2

a2

mycatalog.myschema.mytable タイムスタンプ 2024-01-01 12:00:00 のレコード

キー

2

b2

3

a3

import dlt

@dlt.view(name="source")
def source():
 return spark.read.table("mycatalog.myschema.mytable")

dlt.create_streaming_table("target")

dlt.apply_changes_from_snapshot(
 target="target",
 source="source",
 keys=["key"],
 stored_as_scd_type=2
)

スナップショットを処理すると、ターゲット テーブルに次のレコードが含まれます。

キー

__START_AT

__END_AT

1

a1

2024-01-01 00:00:00

2024-01-01 12:00:00

2

a2

2024-01-01 00:00:00

2024-01-01 12:00:00

2

b2

2024-01-01 12:00:00

null

3

a3

2024-01-01 12:00:00

null

例: 履歴スナップショット処理

次の例は、クラウド ストレージ システムに保存されている 2 つのスナップショットからのソース イベントに基づいてターゲット テーブルを更新する SCD タイプ 2 処理を示しています。

timestampのスナップショット、保存場所 /<PATH>/filename1.csv

キー

トラッキングカラム

ノントラッキングカラム

1

a1

B1 の

2

a2

b2

4

a4

b4

timestamp + 5のスナップショット、保存場所 /<PATH>/filename2.csv

キー

トラッキングカラム

ノントラッキングカラム

2

a2_new

b2

3

a3

B3の

4

a4

b4_new

次のコード例は、これらのスナップショットを使用して SCD タイプ 2 の更新を処理する方法を示しています。

import dlt

def exist(file_name):
  # Storage system-dependent function that returns true if file_name exists, false otherwise

# This function returns a tuple, where the first value is a DataFrame containing the snapshot
# records to process, and the second value is the snapshot version representing the logical
# order of the snapshot.
# Returns None if no snapshot exists.
def next_snapshot_and_version(latest_snapshot_version):
  latest_snapshot_version = latest_snapshot_version or 0
  next_version = latest_snapshot_version + 1
  file_name = "dir_path/filename_" + next_version + ".csv"
  if (exist(file_name)):
    return (spark.read.load(file_name), next_version)
   else:
     # No snapshot available
     return None

dlt.create_streaming_live_table("target")

dlt.apply_changes_from_snapshot(
  target = "target",
  source = next_snapshot_and_version,
  keys = ["Key"],
  stored_as_scd_type = 2,
  track_history_column_list = ["TrackingCol"]
)

スナップショットを処理すると、ターゲット テーブルに次のレコードが含まれます。

キー

トラッキングカラム

ノントラッキングカラム

__START_AT

__END_AT

1

a1

B1 の

1

2

2

a2

b2

1

2

2

a2_new

b2

2

null

3

a3

B3の

2

null

4

a4

b4_new

1

null

ターゲットストリーミングテーブルのデータの追加、変更、または削除

パイプラインでテーブルが Unity Catalog に発行される場合は、データ 操作言語 (DML) ステートメント (挿入、更新、削除、マージ ステートメントなど) を使用して、 APPLY CHANGES INTO ステートメントによって作成されたターゲット ストリーミング テーブルを変更できます。

  • ストリーミングテーブルのテーブルスキーマを変更する DML ステートメントはサポートされていません。DML ステートメントによってテーブルスキーマの進化が発生しないようにしてください。

  • ストリーミング テーブルを更新する DML ステートメントは、Unity Catalog Databricks Runtime13.3LTS 以降を使用している共有 クラスターまたは SQL Server でのみ実行できます。

  • ストリーミングには追加専用のデータソースが必要なため、処理で (DML ステートメントなどによる) 変更を含むソース ストリーミング テーブルからのストリーミングが必要な場合は、ソース ストリーミング テーブルを読み取るときに skipChangeCommits フラグ を設定します。 skipChangeCommits を設定すると、ソース テーブルのレコードを削除または変更するトランザクションは無視されます。処理にストリーミング テーブルが必要ない場合は、マテリアライズド ビュー (追加のみの制限がない) をターゲット テーブルとして使用できます。

Delta Live Tables は指定されたSEQUENCE BY列を使用し、適切な順序値をターゲット テーブルの__START_AT__END_AT列に伝播するため (SCD タイプ 2 の場合)、レコードの適切な順序を維持するために、DML ステートメントがこれらの列に有効な値を使用するようにする必要があります。 「APPLY CHANGES API を使用して CDC を実装する方法」を参照してください。

ストリーミング テーブルでの DML ステートメントの使用の詳細については、「 ストリーミング テーブルのデータを追加、変更、または削除する」を参照してください。

次の例では、開始シーケンスを 5 としたアクティブレコードを挿入しています。

INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);

APPLY CHANGESターゲットテーブルからのチェンジデータフィードの読み取り

Databricks Runtime 15.2 以降では、他のDeltaテーブルからチェンジデータフィードを読み取るのと同じ方法で、APPLY CHANGESクエリまたはAPPLY CHANGES FROM SNAPSHOTクエリのターゲットであるストリーミングテーブルからチェンジデータフィードを読み取ることができます。ターゲットストリーミングテーブルからチェンジデータフィードを読み取るには、次のものが必要です。

  • ターゲット ストリーミング テーブルは、Unity Catalog に発行する必要があります。 「Delta Live Tables パイプラインで Unity Catalog を使用する」を参照してください。

  • ターゲットストリーミングテーブルからチェンジデータフィードを読み取るには、 Databricks Runtime 15.2 以降を使用する必要があります。 チェンジデータフィードを別の Delta Live Tables パイプラインで読み取るには、 Databricks Runtime 15.2 以降を使用するようにパイプラインを構成する必要があります。

Delta Live Tables パイプラインで作成されたターゲット ストリーミング テーブルからチェンジデータフィードを読み取る方法は、他の Delta テーブルからチェンジデータフィードを読み取るのと同じ方法です。Deltaチェンジデータフィード機能の使用について、Python やSQL の例など、詳しくは、 でのDelta Lake チェンジデータフィードの使用Databricks を参照してください。

チェンジデータフィードレコードには、変更イベントのタイプを識別する メタデータ が含まれています。 テーブル内のレコードが更新されると、関連付けられた変更レコードのメタデータには、通常、update_preimage イベントと update_postimage イベントに設定された_change_type値が含まれます。

ただし、プライマリ・キー値の変更を含むターゲット・ストリーミング・テーブルの更新が行われた場合、 _change_type 値は異なります。 変更にプライマリ・キーの更新が含まれる場合、 _change_type メタデータ・フィールドは insert イベントと delete イベントに設定されます。 プライマリ・キーの変更は、 UPDATE ステートメントまたは MERGE ステートメントを使用してキー・フィールドの 1 つが手動で更新された場合、または SCD タイプ 2 テーブルの場合、 __start_at ・フィールドが以前の開始シーケンス値を反映するように変更された場合に発生する可能性があります。

APPLY CHANGES クエリは、SCD タイプ 1 と SCD タイプ 2 の処理で異なるプライマリ・キー値を判別します。

  • SCD タイプ 1 処理と Delta Live Tables Python インターフェースの場合、主キーは apply_changes() 関数の keys パラメーターの値です。Delta Live Tables SQL インターフェイスの場合、主キーは APPLY CHANGES INTO ステートメントの KEYS 句で定義された列です。

  • SCDタイプ 2 の場合、プライマリ・キーは keys パラメーターまたは KEYS 句に coalesce(__START_AT, __END_AT) 操作からの戻り値を加えたものです。ここで、__START_AT__END_AT はターゲット ストリーミング テーブルの対応するカラムです。

Delta Live Tables CDC クエリーによって処理されたレコードに関するデータを取得する

次のメトリクスは、 APPLY CHANGES クエリによってのみキャプチャされ、 APPLY CHANGES FROM SNAPSHOT クエリではキャプチャされません。

以下のメトリクスは、 APPLY CHANGES クエリーによってキャプチャされます。

  • num_upserted_rows: 更新中にデータセットにアップサートされた出力行の数。

  • num_deleted_rows: 更新中にデータセットから削除された既存の出力行の数。

CDC 以外のフローの出力である num_output_rows メトリクスは、 apply changes クエリではキャプチャされません。

Delta Live TablesのCDC処理に使用されるデータオブジェクトは何ですか?

  • これらのデータ構造は、 APPLY CHANGES 処理にのみ適用され、 APPLY CHANGES FROM SNAPSHOT 処理には適用されません。

  • これらのデータ構造は、ターゲット表が Hive metastoreにパブリッシュされる場合にのみ適用されます。 パイプラインが Unity Catalog に発行される場合、ユーザーは内部バッキング テーブルにアクセスできません。

Hive metastore でターゲットテーブルを宣言すると、2 つのデータ構造が作成されます。

  • ターゲットテーブルに割り当てられた名前を使用するビュー。

  • Delta Live Tables が CDC 処理を管理するために使用する内部バッキングテーブル。このテーブルには、ターゲットテーブル名の前に __apply_changes_storage_ を追加した名前が付けられます。

たとえば、dlt_cdc_targetという名前のターゲットテーブルを宣言すると、メタストアにdlt_cdc_targetという名前のビューと__apply_changes_storage_dlt_cdc_targetという名前のテーブルが表示されます。ビューを作成すると、Delta Live Tablesで、順不同になっているデータの処理に必要な追加情報(トゥームストーンやバージョンなど)をフィルターで除外できるようになります。処理されたデータを表示するには、ターゲットビューに対してクエリを実行します。__apply_changes_storage_テーブルのスキーマは、将来の機能や拡張機能をサポートするために変更される可能性があるため、運用環境での使用ではこのテーブルをクエリしないでください。テーブルにデータを手動で追加する場合、バージョン列が欠落しているため、レコードは他の変更の前にあるものとみなされます。