メインコンテンツまでスキップ

AUTO CDC API: LakeFlow 宣言型 パイプラインでチェンジデータキャプチャをシンプルに

LakeFlow宣言型パイプラインは、AUTO CDCAUTO CDC FROM SNAPSHOTAPI を使用してチェンジデータキャプチャ (CDC) を簡素化します。

注記

AUTO CDC API は以前は APPLY CHANGESと呼ばれ、構文も同じでした。

使用するインタフェースは、変更データのソースによって異なります。

  • AUTO CDC を使用して、チェンジデータフィード (CDF) からの変更を処理します。
  • AUTO CDC FROM SNAPSHOT (パブリック プレビュー、Python でのみ使用可能) を使用して、データベース スナップショットの変更を処理します。

以前は、 MERGE INTO ステートメントは、Databricks で CDC レコードを処理するために一般的に使用されていました。 ただし、 MERGE INTO では、レコードの順序が正しくないために正しくない結果が生成されたり、レコードを並べ替えるために複雑なロジックが必要になったりする可能性があります。

この AUTO CDC API は、 LakeFlow 宣言型 パイプラインの SQL インターフェイスと Python インターフェイスでサポートされています。 この AUTO CDC FROM SNAPSHOT API は、 LakeFlow 宣言型 パイプライン Python インターフェイスでサポートされています。

AUTO CDCAUTO CDC FROM SNAPSHOT はどちらも、SCD タイプ 1 およびタイプ 2 を使用したテーブルの更新をサポートしています。

  • レコードを直接更新するには、SCD タイプ 1 を使用します。 更新されたレコードの履歴は保持されません。
  • SCD type 2を使用して、すべての更新または指定された列の更新に対してレコードの履歴を保持します。

構文およびその他のリファレンスについては、Lakeflow宣言型パイプラインSQLのAUTO CDCLakeflow宣言型パイプラインPython の AUTO CDC、およびLakeflow宣言型パイプラインPythonのAUTO CDC FROM SNAPSHOT を参照してください。

注記

この記事では、ソース データの変更に基づいて LakeFlow 宣言型パイプラインのテーブルを更新する方法について説明します。 Deltaテーブルの行レベルの変更情報を記録してクエリする方法については、「DatabricksでのDelta Lake チェンジデータフィードの使用 」を参照してください。

必要条件

CDC APIを使用するには、 サーバレスLakeflow 宣言型 パイプライン または LakeFlow 宣言型 パイプライン Pro または Advanced エディションを使用するようにパイプラインを構成する必要があります。

CDC は AUTO CDC API でどのように実装されますか?

Lakeflow宣言型パイプラインの AUTO CDC APIは、順序がずれたレコードを自動的に処理することで、CDCレコードの正しい処理を保証し、順序が間違っているレコードを処理するための複雑なロジックを開発する必要がなくなります。ソース データには、レコードを順序付けする列を指定する必要があります。これにより、宣言型パイプライン Lakeflow ソース データの適切な順序付けが単調に増加する表現として解釈されます。 Lakeflow 宣言型パイプラインは、順不同で到着したデータを自動的に処理します。 SCDタイプ 2 の変更においては、Lakeflow宣言型パイプラインは適切な順序付け値をターゲットテーブルの __START_AT 列と __END_AT 列に伝達します。 各シーケンス値でキーごとに 1 つの異なる更新を行う必要があり、NULL シーケンス値はサポートされていません。

AUTO CDCを使用して CDC 処理を実行するには、まずストリーミングテーブルを作成し、次に SQL の AUTO CDC ... INTO ステートメントまたは Python の create_auto_cdc_flow() 関数を使用して、変更フィードのソース、キー、および順序を指定します。ターゲット ストリーミングテーブルを作成するには、 SQL の CREATE OR REFRESH STREAMING TABLE ステートメントまたは Pythonの create_streaming_table() 関数を使用します。 SCD Type 1 および Type 2 の処理例を参照してください。

構文の詳細については、LakeFlow 宣言型 パイプライン SQL リファレンスまたは Python リファレンスを参照してください。

CDC は AUTO CDC FROM SNAPSHOT API を使用してどのように実装されますか?

備考

プレビュー

AUTO CDC FROM SNAPSHOT API はパブリック プレビュー段階です。

AUTO CDC FROM SNAPSHOT は、一連の順番のスナップショットを比較することでソース データの変更を効率的に判断し、スナップショット内のレコードの CDC 処理に必要な処理を実行する宣言型 API です。AUTO CDC FROM SNAPSHOT は、 LakeFlow 宣言型パイプライン Python インターフェイスでのみサポートされています。

AUTO CDC FROM SNAPSHOT 複数のソースタイプからのスナップショットの取り込みをサポートします。

  • 定期的なスナップショット インジェストを使用して、既存のテーブルまたはビューからスナップショットをインジェストします。 AUTO CDC FROM SNAPSHOT には、既存のデータベース オブジェクトからのスナップショットの定期的な取り込みをサポートするためのシンプルで合理化されたインターフェイスがあります。パイプラインが更新されるたびに新しいスナップショットが取り込まれ、取り込み時間がスナップショット バージョンとして使用されます。パイプラインを連続モードで実行すると、AUTO CDC FROM SNAPSHOT処理を含むフローのトリガー間隔設定によって決定された期間に、パイプラインが更新されるたびに複数のスナップショットが取り込まれます。
  • 履歴スナップショットの取り込みを使用して、Oracle データベースや MySQL データベース、データウェアハウスから生成されたスナップショットなど、データベース スナップショットを含むファイルを処理します。

AUTO CDC FROM SNAPSHOTを使用して任意のソースタイプから CDC 処理を実行するには、まずストリーミングテーブルを作成し、次に Python の create_auto_cdc_from_snapshot_flow() 関数を使用して、処理の実装に必要なスナップショット、キー、およびその他の引数を指定します。定期的なスナップショットの取り込み履歴スナップショットの取り込みの例を参照してください。

API に渡されるスナップショットは、バージョンごとに昇順である必要があります。LakeFlow宣言型パイプライン が順不同のスナップショットを検出すると、エラーがスローされます。

構文の詳細については、LakeFlow 宣言型 パイプライン Python リファレンスを参照してください。

シーケンシングに複数のカラムを使用

複数の列 (たとえば、タイムスタンプと同順位を破るための ID) で順序付けしたり、STRUCT を使用してそれらを組み合わせたりすることができます: STRUCT の最初のフィールドを最初に順序付け、同順位の場合は 2 番目のフィールドを考慮します。

SQLの例:

SQL
SEQUENCE BY STRUCT(timestamp_col, id_col)

Pythonの例:

Python
sequence_by = struct("timestamp_col", "id_col")

制限

シーケンス処理に使用する列は、ソート可能なデータ型である必要があります。

例: CDF ソース・データを使用した SCD タイプ 1 および SCD タイプ 2 の処理

次のセクションでは LakeFlow チェンジデータフィードからのソース イベントに基づいてターゲット テーブルを更新する 宣言型 パイプライン SCD タイプ 1 およびタイプ 2 クエリの例を示します。

  1. 新しいユーザー・レコードを作成します。
  2. ユーザー・レコードを削除します。
  3. ユーザーレコードを更新します。 SCD タイプ 1 の例では、最後の UPDATE 操作が遅れて到着し、ターゲットテーブルから削除され、順不同のイベントの処理を示しています。

次の例は、宣言型パイプラインの構成と更新に精通していることを前提としています LakeFlow 。 チュートリアル: LakeFlow 宣言型 パイプラインのチェンジデータキャプチャを用いたETL パイプラインの構築を参照してください。

これらの例を実行するには、まずサンプル データセットを作成する必要があります。 テスト データの生成を参照してください。

以下はこれらの例の入力記録です:

userId

name

city

operation

sequenceNum

124

Raul

Oaxaca

INSERT

1

123

Isabel

Monterrey

INSERT

1

125

Mercedes

Tijuana

INSERT

2

126

Lily

Cancun

INSERT

2

123

null

null

DELETE

6

125

Mercedes

Guadalajara

UPDATE

6

125

Mercedes

Mexicali

UPDATE

5

123

Isabel

Chihuahua

UPDATE

5

例題データの最後の行のコメントを外すと、レコードを切り捨てる場所を指定する次のレコードが挿入されます:

userId

name

city

operation

sequenceNum

null

null

null

TRUNCATE

3

注記

次のすべての例には、 DELETE 操作と TRUNCATE 操作の両方を指定するオプションが含まれていますが、それぞれ省略可能です。

SCD タイプ 1 更新の処理

次の例は、SCD タイプ 1 更新の処理を示しています。

Python
import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.create_auto_cdc_flow(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
apply_as_truncates = expr("operation = 'TRUNCATE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)

SCDタイプ1の例を実行すると、ターゲットテーブルには次のレコードが含まれます:

userId

name

city

124

Raul

Oaxaca

125

Mercedes

Guadalajara

126

Lily

Cancun

追加のTRUNCATEレコードを使用してSCDタイプ1の例を実行すると、sequenceNum=3でのTRUNCATE操作によりレコード124126が切り捨てられ、ターゲットテーブルには次のレコードが含まれます:

userId

name

city

125

Mercedes

Guadalajara

SCD タイプ 2 更新の処理

次の例は、SCD タイプ 2 更新の処理を示しています。

Python
import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.create_auto_cdc_flow(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2"
)

SCDタイプ2の例を実行した後、ターゲットテーブルには以下のレコードが含まれます:

userId

name

city

__START_AT

__END_AT

123

Isabel

Monterrey

1

5

123

Isabel

Chihuahua

5

6

124

Raul

Oaxaca

1

null

125

Mercedes

Tijuana

2

5

125

Mercedes

Mexicali

5

6

125

Mercedes

Guadalajara

6

null

126

Lily

Cancun

2

null

SCD タイプ 2 クエリでは、ターゲット・テーブル内のヒストリーについて追跡する出力カラムのサブセットを指定することもできます。 他の列への変更は、新しい履歴レコードを生成するのではなく、その場で更新されます。 次の例は、 city 列をトラッキングから除外する方法を示しています。

次の例は、SCDタイプ2でトラックヒストリーを使用する例です:

Python
import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.create_auto_cdc_flow(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2",
track_history_except_column_list = ["city"]
)

この例を追加の TRUNCATE レコードなしで実行すると、ターゲットテーブルには次のレコードが含まれます。

userId

name

city

__START_AT

__END_AT

123

Isabel

Chihuahua

1

6

124

Raul

Oaxaca

1

null

125

Mercedes

Guadalajara

2

null

126

Lily

Cancun

2

null

テストデータの生成

次のコードは、このチュートリアルで示すクエリ例で使用するデータセットの例を生成するために提供されています。新しいスキーマを作成し、新しいテーブルを作成するための適切な資格情報があると仮定すると、これらのステートメントはノートブックまたは Databricks SQL で実行できます。次のコードは、LakeFlow宣言型パイプラインの一部として実行するための ものではありません

SQL
CREATE SCHEMA IF NOT EXISTS cdc_data;

CREATE TABLE
cdc_data.users
AS SELECT
col1 AS userId,
col2 AS name,
col3 AS city,
col4 AS operation,
col5 AS sequenceNum
FROM (
VALUES
-- Initial load.
(124, "Raul", "Oaxaca", "INSERT", 1),
(123, "Isabel", "Monterrey", "INSERT", 1),
-- New users.
(125, "Mercedes", "Tijuana", "INSERT", 2),
(126, "Lily", "Cancun", "INSERT", 2),
-- Isabel is removed from the system and Mercedes moved to Guadalajara.
(123, null, null, "DELETE", 6),
(125, "Mercedes", "Guadalajara", "UPDATE", 6),
-- This batch of updates arrived out of order. The above batch at sequenceNum 6 will be the final state.
(125, "Mercedes", "Mexicali", "UPDATE", 5),
(123, "Isabel", "Chihuahua", "UPDATE", 5)
-- Uncomment to test TRUNCATE.
-- ,(null, null, null, "TRUNCATE", 3)
);

例: 定期的なスナップショット処理

次の例は、 mycatalog.myschema.mytableに格納されたテーブルのスナップショットを取り込む SCD タイプ 2 処理を示しています。 処理の結果は、 targetという名前のテーブルに書き込まれます。

mycatalog.myschema.mytable タイムスタンプ 2024-01-01 00:00:00 のレコード

Key

Value

1

a1

2

a2

mycatalog.myschema.mytable タイムスタンプ 2024-01-01 12:00:00 のレコード

Key

Value

2

b2

3

a3

Python
import dlt

@dlt.view(name="source")
def source():
return spark.read.table("mycatalog.myschema.mytable")

dlt.create_streaming_table("target")

dlt.create_auto_cdc_from_snapshot_flow(
target="target",
source="source",
keys=["key"],
stored_as_scd_type=2
)

スナップショットの処理後、ターゲットテーブルには次のレコードが含まれます。

Key

Value

__START_AT

__END_AT

1

a1

2024-01-01 00:00:00

2024-01-01 12:00:00

2

a2

2024-01-01 00:00:00

2024-01-01 12:00:00

2

b2

2024-01-01 12:00:00

null

3

a3

2024-01-01 12:00:00

null

例: 履歴スナップショット処理

次の例は、クラウドストレージシステムに保存されている 2 つのスナップショットのソースイベントに基づいてターゲットテーブルを更新する SCD タイプ 2 処理を示しています。

timestampのスナップショット、に格納 /<PATH>/filename1.csv

Key

TrackingColumn

NonTrackingColumn

1

a1

b1

2

a2

b2

4

a4

b4

timestamp + 5のスナップショット、に格納 /<PATH>/filename2.csv

Key

TrackingColumn

NonTrackingColumn

2

a2_new

b2

3

a3

B3の

4

a4

b4_new

次のコード例は、これらのスナップショットを使用した SCD タイプ 2 更新の処理を示しています。

Python
import dlt

def exist(file_name):
# Storage system-dependent function that returns true if file_name exists, false otherwise

# This function returns a tuple, where the first value is a DataFrame containing the snapshot
# records to process, and the second value is the snapshot version representing the logical
# order of the snapshot.
# Returns None if no snapshot exists.
def next_snapshot_and_version(latest_snapshot_version):
latest_snapshot_version = latest_snapshot_version or 0
next_version = latest_snapshot_version + 1
file_name = "dir_path/filename_" + next_version + ".csv"
if (exist(file_name)):
return (spark.read.load(file_name), next_version)
else:
# No snapshot available
return None

dlt.create_streaming_live_table("target")

dlt.create_auto_cdc_from_snapshot_flow(
target = "target",
source = next_snapshot_and_version,
keys = ["Key"],
stored_as_scd_type = 2,
track_history_column_list = ["TrackingCol"]
)

スナップショットの処理後、ターゲットテーブルには次のレコードが含まれます。

Key

TrackingColumn

NonTrackingColumn

__START_AT

__END_AT

1

a1

b1

1

2

2

a2

b2

1

2

2

a2_new

b2

2

null

3

a3

B3の

2

null

4

a4

b4_new

1

null

ターゲットストリーミングテーブル内のデータを追加、変更、または削除する

パイプラインがテーブルを Unity Catalogにパブリッシュする場合は、 データ操作言語 (DML) ステートメント (insert、update、delete、merge ステートメントなど) を使用して、AUTO CDC ... INTO ステートメントによって作成されたターゲット ストリーミングテーブルを変更できます。

注記
  • ストリーミングテーブルのテーブルスキーマを変更する DML ステートメントはサポートされていません。DML ステートメントによってテーブルスキーマの進化が発生しないようにしてください。
  • ストリーミングテーブルを更新する DML ステートメントは、Databricks Runtime 13.3 LTS 以上を使用する共有Unity Catalog クラスターまたはSQL ウェアハウスでのみ実行できます。
  • ストリーミングには追加専用のデータソースが必要なため、変更を伴うソースストリーミングテーブルからのストリーミングが処理に必要な場合 (DML ステートメントなど)、ソースストリーミングテーブルを読み取るときに skipChangeCommits フラグ を設定します。 skipChangeCommits を設定すると、ソース テーブルのレコードを削除または変更するトランザクションは無視されます。処理にストリーミングテーブルが必要ない場合は、マテリアライズドビュー (追加専用の制限がない) をターゲットテーブルとして使用できます。

LakeFlow宣言型パイプラインは指定された SEQUENCE BY 列を使用し、ターゲットテーブルの __START_AT 列と __END_AT 列 ( SCD タイプ 2) に適切な順序付け値を伝達するため、レコードの適切な順序を維持するためには、DML ステートメントでこれらの列に有効な値を使用する必要があります。 AUTO CDC API を使用した CDC の実装方法を参照してください。

ストリーミングテーブルでの DML ステートメントの使用の詳細については、 ストリーミングテーブルのデータを追加、変更、または削除するを参照してください。

次の例では、開始シーケンスを 5 としたアクティブレコードを挿入しています。

SQL
INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);

AUTO CDC 対象テーブルからのチェンジデータフィードの読み込み

Databricks Runtime 15.2 以降では、他の Delta テーブルからチェンジデータフィードを読み取るのと同じ方法で、AUTO CDC クエリまたは AUTO CDC FROM SNAPSHOT クエリのターゲットであるストリーミングテーブルからチェンジデータフィードを読み取ることができます。ターゲットストリーミングテーブルからチェンジデータフィードを読み取るには、次のものが必要です。

  • ターゲット ストリーミングテーブルは Unity Catalogにパブリッシュする必要があります。 「LakeFlow宣言型 パイプラインでの Unity Catalogの使用 」を参照してください。
  • ターゲットストリーミングテーブルからチェンジデータフィードを読み取るには、 Databricks Runtime 15.2以降を使用する必要があります。 チェンジデータフィードを別のパイプラインで読み取るには、 Databricks Runtime 15.2 以降を使用するようにパイプラインを構成する必要があります。

LakeFlow宣言型 パイプラインで作成されたターゲット ストリーミングテーブルからチェンジデータフィードを読み取るには、他の Delta テーブルからチェンジデータフィードを読み取るのと同じ方法で読み取ります。 Deltaチェンジデータフィード機能の使用(Python やSQL の例など)について詳しくは、DatabricksでのDelta Lake チェンジデータフィードの使用 を参照してください。

注記

チェンジデータフィードレコードには、変更イベントのタイプを識別する メタデータ が含まれています。 テーブル内のレコードが更新されると、関連付けられた変更レコードのメタデータには、通常、update_preimage イベントと update_postimage イベントに設定された_change_type値が含まれます。

ただし、プライマリ・キー値の変更を含む更新がターゲット・ストリーミングテーブルに対して行われた場合、 _change_type 値は異なります。 変更にプライマリ・キーの更新が含まれる場合、 _change_type メタデータ・フィールドは insert イベントと delete イベントに設定されます。 プライマリ・キーの変更は、 UPDATE ステートメントまたは MERGE ステートメントを使用してキー・フィールドの 1 つが手動で更新された場合、または SCD タイプ 2 テーブルの場合、 __start_at ・フィールドが以前の開始シーケンス値を反映するように変更された場合に発生する可能性があります。

AUTO CDC クエリは、SCD タイプ 1 と SCD タイプ 2 の処理で異なるプライマリ・キー値を判別します。

  • SCDタイプ 1 処理と LakeFlow 宣言型 パイプライン Python インターフェースの場合、主キーは create_auto_cdc_flow() 関数の keys パラメーターの値です。LakeFlow 宣言型パイプライン SQL インターフェイスの場合、主キーは AUTO CDC ... INTO ステートメントの KEYS 句で定義された列です。
  • SCDタイプ 2 の場合、プライマリ・キーは keys パラメーターまたは KEYS 句に coalesce(__START_AT, __END_AT) 操作からの戻り値を加えたものです。ここで、__START_AT__END_AT はターゲット ストリーミングテーブルの対応するカラムです。

LakeFlow 宣言型 パイプライン の CDC クエリによって処理されたレコードに関するデータを取得する

注記

次のメトリクスは、 AUTO CDC クエリによってのみキャプチャされ、 AUTO CDC FROM SNAPSHOT クエリではキャプチャされません。

次のメトリクスは、 AUTO CDC クエリによってキャプチャされます。

  • num_upserted_rows : 更新中にデータセットにアップサートされた出力行の数。
  • num_deleted_rows : 更新中にデータセットから削除された既存の出力行の数。

CDC 以外のフローの出力である num_output_rows メトリクスは、 AUTO CDC クエリではキャプチャされません。

LakeFlow宣言型パイプラインのCDC処理にはどのようなデータオブジェクトが使用されますか?

注記
  • これらのデータ構造は、 AUTO CDC 処理にのみ適用され、 AUTO CDC FROM SNAPSHOT 処理には適用されません。
  • これらのデータ構造は、ターゲット表が Hive metastoreにパブリッシュされる場合にのみ適用されます。 パイプラインが Unity Catalog に発行される場合、ユーザーは内部バッキング テーブルにアクセスできません。

Hive metastore でターゲットテーブルを宣言すると、2 つのデータ構造が作成されます。

  • ターゲットテーブルに割り当てられた名前を使用するビュー。
  • 宣言型パイプラインが 処理を管理するために使用する内部バッキング テーブルLakeFlowCDC 。このテーブルの名前は、ターゲット テーブル名の先頭に __apply_changes_storage_ を付加して付けられます。

たとえば、 dlt_cdc_targetという名前のターゲット テーブルを宣言すると、メタストアに dlt_cdc_target という名前のビューと __apply_changes_storage_dlt_cdc_target という名前のテーブルが表示されます。ビューを作成すると LakeFlow 宣言型パイプラインで、順不同のデータを処理するために必要な追加情報 (廃棄標識やバージョンなど) を除外できます。 処理されたデータを表示するには、ターゲット ビューをクエリします。__apply_changes_storage_テーブルのスキーマは、将来の機能や拡張機能をサポートするために変更される可能性があるため、本番運用で使用するためにテーブルをクエリしないでください。テーブルにデータを手動で追加すると、バージョン列が欠落しているため、レコードは他の変更よりも前に来ると見なされます。

追加のリソース