メインコンテンツまでスキップ

AUTO CDC APIs : パイプラインを使用して変更データ キャプチャを簡素化

LakeFlow Spark宣言型パイプラインは、 AUTO CDCおよびAUTO CDC FROM SNAPSHOT APIsを使用して変更データ キャプチャ ( CDC ) を簡素化します。 これらのAPIs 、 CDCフィードまたはデータベース スナップショットからslowly changing dimensions ( SCD ) タイプ 1 およびタイプ 2 を計算する複雑さを自動化します。 これらの概念の詳細については、 「チェンジデータ キャプチャ」と「スナップショット」を参照してください。

注記

AUTO CDC APIs APPLY CHANGES APIsに代わるもので、同じ構文を持ちます。 APPLY CHANGES APIs引き続き使用できますが、 Databricks代わりにAUTO CDC APIs使用することをお勧めします。

使用する API は、変更データのソースによって異なります。

  • AUTO CDC : ソース データベースで CDC フィードが有効になっている場合に使用します。AUTO CDCは変更データフィード (CDF) からの変更を処理します。 パイプライン SQL と Python インターフェースの両方でサポートされています。
  • AUTO CDC FROM SNAPSHOT : ソース データベースで CDC が有効になっておらず、スナップショットのみが使用可能な場合に使用します。この API はスナップショットを比較して変更を判断し、処理します。Python インターフェースでのみサポートされます。

どちらのAPIs SCDタイプ 1 とタイプ 2 を使用したテーブルの更新をサポートしています。

  • SCD タイプ 1 を使用してレコードを直接更新します。更新されたレコードの履歴は保持されません。
  • SCD type 2を使用して、すべての更新または指定された列の更新に対してレコードの履歴を保持します。

AUTO CDC APIs 、 Apache Spark 宣言型パイプラインではサポートされていません。

構文およびその他のリファレンスについては、 「AUTO CDC INTO (パイプライン)」「create_auto_cdc_flow」 、および「create_auto_cdc_from_snapshot_flow 」を参照してください。

注記

このページでは、ソース データの変更に基づいてパイプライン内のテーブルを更新する方法について説明します。Deltaテーブルの行レベルの変更情報を記録およびクエリする方法については、 DatabricksでDelta Lake変更データフィードを使用する」を参照してください。

要件

CDC APIs使用するには、サーバレス SDPまたは SDP ProまたはAdvancedエディションを使用するようにパイプラインを構成する必要があります。

AUTO CDCの仕組み

AUTO CDCでCDC処理を実行するには、ストリーミング テーブルを作成し、 SQLのAUTO CDC ... INTOステートメントまたはPythonのcreate_auto_cdc_flow()関数を使用して、変更フィードのソース、キー、シーケンスを指定します。 シーケンスとSCDロジックの仕組みの説明については、 「チェンジデータ キャプチャ」と「スナップショット」を参照してください。 AUTO CDC の例を参照してください。

変更フィードを含むソースからの初期ハイドレーションの場合は、 onceフローでAUTO CDC使用し、変更フィードの処理を続行します。「AUTO CDC を使用して外部 RDBMS テーブルを複製する」を参照してください。

構文の詳細については、 「AUTO CDC INTO (パイプライン)」または「create_auto_cdc_flow」を参照してください。

スナップショットからの自動CDC仕組み

AUTO CDC FROM SNAPSHOT 順序どおりのスナップショットを比較してソース データの変更を判別します。Python パイプライン インターフェースでのみサポートされます。スナップショットは、Delta テーブル、クラウド ストレージ ファイル、または JDBC から直接読み取ることができます。

AUTO CDC FROM SNAPSHOTでCDC処理を実行するには、ストリーミング テーブルを作成し、 create_auto_cdc_from_snapshot_flow()関数を使用してスナップショット、キー、その他の引数を指定します。 2 つの取り込みパターンの詳細と、それぞれの使用タイミングについては、 「スナップショット処理パターン」を参照してください。AUTO CDC FROM スナップショットの例を参照してください。

構文の詳細については、 create_auto_cdc_from_snapshot_flow を参照してください。

シーケンスには複数の列を使用する

複数の列で順序付けるには(たとえば、同点を区切るためのタイムスタンプと ID)、 STRUCTを使用してそれらを結合します。API はまず最初のフィールドで順序付けし、同点の場合は 2 番目のフィールドを考慮します。

SQL
SEQUENCE BY STRUCT(timestamp_col, id_col)

AUTO CDCの例

次の例は、チェンジデータフィード ソースを使用したSCDタイプ 1 およびタイプ 2 の処理を示しています。 サンプル データは、新しいユーザー レコードを作成し、ユーザー レコードを削除し、ユーザー レコードを更新します。SCD タイプ 1 の例では、最後のUPDATE操作が遅れて到着し、ターゲット テーブルから削除され、順序どおりでないイベント処理が示されています。

これらの例で使用される入力レコードは次のとおりです。このデータは、サンプル データの作成セクションでクエリを実行することによって作成されます。

userId

name

city

operation

sequenceNum

124

Raul

Oaxaca

INSERT

1

123

Isabel

Monterrey

INSERT

1

125

Mercedes

Tijuana

INSERT

2

126

Lily

Cancun

INSERT

2

123

null

null

DELETE

6

125

Mercedes

Guadalajara

UPDATE

6

125

Mercedes

Mexicali

UPDATE

5

123

Isabel

Chihuahua

UPDATE

5

サンプル データ生成クエリの最後の行のコメントを解除すると、 sequenceNum=3でテーブルを切り捨てる (テーブルをクリアする) ことを指定する次のレコードが挿入されます。

userId

name

city

operation

sequenceNum

null

null

null

TRUNCATE

3

注記

以下のすべての例には、 DELETETRUNCATE両方の操作を指定するオプションが含まれていますが、それぞれはオプションです。

サンプルデータを作成する

次のステートメントを実行してサンプル データセットを作成します。このコードは、パイプライン定義の一部として実行されることを意図したものではありません。変換フォルダーではなく、パイプラインの探索フォルダーから実行します。

SQL
CREATE SCHEMA IF NOT EXISTS main.cdc_tutorial;

CREATE TABLE main.cdc_tutorial.users_cdf
AS SELECT
col1 AS userId,
col2 AS name,
col3 AS city,
col4 AS operation,
col5 AS sequenceNum
FROM (
VALUES
-- Initial load.
(124, "Raul", "Oaxaca", "INSERT", 1),
(123, "Isabel", "Monterrey", "INSERT", 1),
-- New users.
(125, "Mercedes", "Tijuana", "INSERT", 2),
(126, "Lily", "Cancun", "INSERT", 2),
-- Isabel is removed from the system and Mercedes moved to Guadalajara.
(123, null, null, "DELETE", 6),
(125, "Mercedes", "Guadalajara", "UPDATE", 6),
-- This batch of updates arrived out of order. The batch at sequenceNum 6 is the final state.
(125, "Mercedes", "Mexicali", "UPDATE", 5),
(123, "Isabel", "Chihuahua", "UPDATE", 5)
-- Uncomment to test TRUNCATE.
-- ,(null, null, null, "TRUNCATE", 3)
);

SCDタイプ1の更新を処理する

SCD タイプ 1 は、各レコードの最新バージョンのみを保持します。次の例では、上で作成した変更データフィードから読み取り、変更をストリーミング テーブル ターゲットに適用します。 このコードを実行するには、 LakeFlow Spark宣言型パイプラインを開発します

Python
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr

@dp.view
def users():
return spark.readStream.table("main.cdc_tutorial.users_cdf")

dp.create_streaming_table("users_current")

dp.create_auto_cdc_flow(
target = "users_current",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
apply_as_truncates = expr("operation = 'TRUNCATE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)

SCDタイプ1の例を実行すると、ターゲットテーブルには次のレコードが含まれます:

userId

name

city

124

Raul

Oaxaca

125

Mercedes

Guadalajara

126

Lily

Cancun

ユーザー 123 (Isabel) は削除されたため、表示されません。ユーザー 125 (メルセデス) には、SCD タイプ 1 が以前の値を上書きするため、最新の都市 (グアダラハラ) のみが表示されます。sequenceNum=5の以前のUPDATEは、 sequenceNum=6の後続の更新が到着したため削除されました。

TRUNCATEレコードのコメントを解除して例を実行すると、テーブルはsequenceNum=3でクリアされます。これは、レコード124126テーブル内に存在せず、最終的なターゲット テーブルには次のレコードのみが含まれていることを意味します。

userId

name

city

125

Mercedes

Guadalajara

SCDタイプ2の更新を処理する

SCD タイプ 2 は、レコードの各バージョンに対して新しい行を作成し、各バージョンがアクティブだった時期を__START_AT列と__END_AT列で示すことで、変更の完全な履歴を保存します。

Python
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr

@dp.view
def users():
return spark.readStream.table("main.cdc_tutorial.users_cdf")

dp.create_streaming_table("users_history")

dp.create_auto_cdc_flow(
target = "users_history",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2"
)

SCDタイプ2の例を実行した後、ターゲットテーブルには以下のレコードが含まれます:

userId

name

city

__START_AT

__END_AT

123

Isabel

Monterrey

1

5

123

Isabel

Chihuahua

5

6

124

Raul

Oaxaca

1

null

125

Mercedes

Tijuana

2

5

125

Mercedes

Mexicali

5

6

125

Mercedes

Guadalajara

6

null

126

Lily

Cancun

2

null

テーブルには完全な履歴が保存されます。ユーザー 123 には 2 つのバージョンがあります (削除時にシーケンス 6 で終了)。ユーザー 125 には、都市の変更を示す 3 つのバージョンがあります。__END_AT = nullのレコードは現在アクティブです。

SCD タイプ 2 で列のサブセットを追跡する

デフォルトでは、SCD タイプ 2 は列の値が変更されるたびに新しいバージョンを作成します。追跡する列のサブセットを指定して、他の列を変更した場合に新しい履歴レコードを生成するのではなく、現在のバージョンをその場で更新することができます。

次の例では、 city列を履歴追跡から除外します。

Python
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr

@dp.view
def users():
return spark.readStream.table("main.cdc_tutorial.users_cdf")

dp.create_streaming_table("users_history")

dp.create_auto_cdc_flow(
target = "users_history",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2",
track_history_except_column_list = ["city"]
)

city変更は追跡されないため、都市の更新では新しいバージョンが作成されるのではなく、現在の行が上書きされます。ターゲット テーブルには次のレコードが含まれています。

userId

name

city

__START_AT

__END_AT

123

Isabel

Chihuahua

1

6

124

Raul

Oaxaca

1

null

125

Mercedes

Guadalajara

2

null

126

Lily

Cancun

2

null

スナップショットからの自動CDCの例

次のセクションでは、 AUTO CDC FROM SNAPSHOTを使用してスナップショットを SCD タイプ 1 またはタイプ 2 のターゲット テーブルに処理する例を示します。このAPIを使用する場合の背景については、 「チェンジデータ キャプチャ」と「スナップショット」を参照してください。

例: パイプラインの取り込み時間を使用してスナップショットを処理する

スナップショットが定期的かつ順番に到着し、パイプライン実行タイムスタンプをバージョン管理に利用できる場合は、このアプローチを使用します。パイプラインが更新されるたびに、新しいスナップショットが取り込まれます。

Delta テーブル、クラウド ストレージ ファイル、JDBC 接続など、複数のソース タイプからスナップショットを読み取ることができます。

ステップ1: サンプルデータを作成する

スナップショット データを含むテーブルを作成します。パイプラインのexplorationsフォルダーにあるノートブックまたは Databricks SQL から次のコードを実行します。

SQL
CREATE SCHEMA IF NOT EXISTS main.cdc_tutorial;

CREATE TABLE main.cdc_tutorial.snapshot (
userId INT,
city STRING
);

INSERT INTO main.cdc_tutorial.snapshot VALUES
(1, 'Oaxaca'),
(2, 'Monterrey'),
(3, 'Tijuana');

ステップ 2: スナップショットから AUTO CDCを実行

LakeFlow Spark宣言型パイプラインを開発して、このステップのコードを実行します。

スナップショット ビューのソース タイプを選択します (サンプル作成コードは Delta テーブルを生成します)。

オプションA: Deltaテーブルから読み取る

Python
from pyspark import pipelines as dp

@dp.view(name="source")
def source():
return spark.read.table("main.cdc_tutorial.snapshot")

オプションB: クラウドストレージから読み取る

Python
from pyspark import pipelines as dp

@dp.view(name="source")
def source():
return spark.read.format("csv").option("header", True).load("<snapshot-path>")

オプション C: JDBCからの読み取り (クラシック コンピュートのみ)

Python
from pyspark import pipelines as dp

@dp.view(name="source")
def source():
return (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.load()
)

すべてのオプション、ターゲットに書き込む

次に、ターゲット テーブルとフローを追加します。

Python
dp.create_streaming_table("target")

dp.create_auto_cdc_from_snapshot_flow(
target = "target",
source = "source",
keys = ["userId"],
stored_as_scd_type = 2
)

最初のパイプラインの実行後、すべてのレコードがアクティブ行として挿入されます。

userId

city

__START_AT

__END_AT

1

Oaxaca

0

null

2

Monterrey

0

null

3

Tijuana

0

null

注記

代わりに SCD タイプ 1 を使用し、現在の状態のみを保持するには、 stored_as_scd_type=1を設定します。この場合、ターゲット テーブルには__START_AT__END_AT列が含まれません。

ステップ 3: 新しいスナップショットをシミュレートして再実行します

ソース テーブルを更新して、新しいスナップショットの到着をシミュレートします (パイプラインのexplorationsフォルダーにあるノートブックまたは SQL ファイルからこのコードを実行します)。

SQL
TRUNCATE TABLE main.cdc_tutorial.snapshot;

INSERT INTO main.cdc_tutorial.snapshot VALUES
(2, 'Carmel'),
(3, 'Los Angeles'),
(4, 'Death Valley'),
(6, 'Kings Canyon');

パイプラインを再度実行します。AUTO CDC FROM SNAPSHOT新しいスナップショットを以前のスナップショットと比較し、ユーザー 1 が削除され、ユーザー 2 と 3 が更新され、ユーザー 4 と 6 が挿入されたことを検出します。これにより変更フィードが生成され、 AUTO CDCを使用して出力テーブルが作成されます。

SCD タイプ 2 で 2 回目の実行を行うと、ターゲット テーブルに次のレコードが含まれます。

userId

city

__START_AT

__END_AT

1

Oaxaca

0

1

2

Monterrey

0

1

2

カーメル

1

null

3

Tijuana

0

1

3

ロサンゼルス

1

null

4

デスバレー

1

null

6

キングスキャニオン

1

null

ユーザー1は終了(削除)されました。ユーザー 2 と 3 には、それぞれ都市の変更を示す 2 つのバージョンがあります。ユーザー4と6が新たに挿入されました。

SCD タイプ 1 で 2 回目の実行を行うと、ターゲット テーブルには現在の状態のみが表示されます。

userId

city

2

カーメル

3

ロサンゼルス

4

デスバレー

6

キングスキャニオン

例: バージョン関数を使用してスナップショットを処理する

スナップショットの順序を明示的に制御する必要がある場合は、このアプローチを使用します。たとえば、複数のスナップショットが同時に到着した場合や、スナップショットが順序どおりに到着しない場合に、このアプローチを使用します。次に処理するスナップショットとそのバージョン番号を指定する関数を記述します。API はスナップショットをバージョンの昇順で処理します。

  • 複数のスナップショットが保存されている場合、それらはすべて順番に処理されます。
  • スナップショットが順序どおりに到着しない場合(たとえば、 snapshot_3 snapshot_4後に到着した場合)、そのスナップショットはスキップされます。
  • 新しいスナップショットがない場合、関数はNoneを返し、処理は行われません。

ステップ 1: スナップショット ファイルを準備する

スナップショット データを含む CSV ファイルを作成し、ボリュームまたはクラウド ストレージの場所に追加します。ファイルに時系列順に名前を付けます (例: snapshot_1.csvsnapshot_2.csv )。

各ファイルには、 userIdcity列が含まれている必要があります。例えば:

snapshot_1.csv :

userId

city

1

Oaxaca

2

Monterrey

3

Tijuana

snapshot_2.csv :

userId

city

2

カーメル

3

ロサンゼルス

4

デスバレー

ステップ 2: バージョン機能を使用して AUTO CDC FROM スナップショットを実行する

新しいノートブックを作成し、次のパイプライン コードを貼り付けます。次に、 LakeFlow Spark宣言型パイプラインを開発します

Python
from pyspark import pipelines as dp
from typing import Optional, Tuple
from pyspark.sql import DataFrame

def next_snapshot_and_version(latest_snapshot_version: Optional[int]) -> Optional[Tuple[DataFrame, int]]:
snapshot_dir = "/Volumes/main/cdc_tutorial/snapshots/" # or the location you created the sample data

files = dbutils.fs.ls(snapshot_dir)
snapshot_files = [f.name for f in files if f.name.startswith("snapshot_") and f.name.endswith(".csv")]

snapshot_versions = []
for filename in snapshot_files:
try:
version = int(filename.replace("snapshot_", "").replace(".csv", ""))
snapshot_versions.append(version)
except ValueError:
continue

snapshot_versions.sort()

if latest_snapshot_version is None:
if snapshot_versions:
next_version = snapshot_versions[0]
else:
return None
else:
next_versions = [v for v in snapshot_versions if v > latest_snapshot_version]
if next_versions:
next_version = next_versions[0]
else:
return None

snapshot_path = f"{snapshot_dir}snapshot_{next_version}.csv"
df = spark.read.format("csv").option("header", True).load(snapshot_path)
return (df, next_version)


dp.create_streaming_table("main.cdc_tutorial.target_versioned")

dp.create_auto_cdc_from_snapshot_flow(
target = "main.cdc_tutorial.target_versioned",
source = next_snapshot_and_version,
keys = ["userId"],
stored_as_scd_type = 2
)
注記

代わりに SCD タイプ 1 を使用するには、 stored_as_scd_type=1を設定します。

snapshot_1.csvを処理した後、ターゲット テーブルには次のレコードが含まれます。

userId

city

__START_AT

__END_AT

1

Oaxaca

1

null

2

Monterrey

1

null

3

Tijuana

1

null

snapshot_2.csvを処理した後、ターゲット テーブルには次のレコードが含まれます。

userId

city

__START_AT

__END_AT

1

Oaxaca

1

2

2

Monterrey

1

2

2

カーメル

2

null

3

Tijuana

1

2

3

ロサンゼルス

2

null

4

デスバレー

2

null

注記

SCD タイプ 1 の場合、テーブルは最新のスナップショットとまったく同じになることに注意してください。違いは、ダウンストリーム クエリでは変更フィードを使用して変更されたレコードのみを処理できることです。

ステップ 3: 新しいスナップショットを追加する

変更されたデータ (都市の値の変更、新しい行、削除された行など) を含む新しい CSV ファイルを保存場所に追加します。次に、パイプラインを再度実行して、新しいスナップショットを処理します。

制限事項

  • シーケンス列は並べ替え可能なデータ型である必要があります。NULLシーケンス値はサポートされていません。
  • AUTO CDC FROM SNAPSHOT Python パイプライン インターフェースでのみサポートされており、SQL インターフェースはサポートされていません。

その他のリソース