メインコンテンツまでスキップ

AUTO CDC APIs : パイプラインを使用して変更データ キャプチャを簡素化

LakeFlow Spark宣言型パイプライン (SDP) は、 AUTO CDCおよびAUTO CDC FROM SNAPSHOT APIsを使用してチェンジデータ キャプチャ ( CDC ) を簡素化します。

注記

AUTO CDC APIs APPLY CHANGES APIsに代わるもので、同じ構文を持ちます。 APPLY CHANGES APIs引き続き使用できますが、 Databricksでは代わりにAUTO CDC APIsを使用することをお勧めします。

使用するインタフェースは、変更データのソースによって異なります。

  • データ変更フィード (CDF) からの変更を処理するには、 AUTO CDCを使用します。
  • データベース スナップショットの変更を処理するには、 AUTO CDC FROM SNAPSHOT (パブリック プレビュー、Python でのみ使用可能) を使用します。

以前は、 MERGE INTOステートメントは Databricks 上の CDC レコードの処理によく使用されていました。ただし、 MERGE INTOでは、レコードの順序が正しくないために誤った結果が生成されたり、レコードの順序を変更するために複雑なロジックが必要になったりする可能性があります。

AUTO CDC API は、パイプライン SQL および Python インターフェースでサポートされています。AUTO CDC FROM SNAPSHOT API は Python インターフェースでサポートされています。AUTO CDC APIs 、 Apache Spark 宣言型パイプラインではサポートされていません。

AUTO CDCAUTO CDC FROM SNAPSHOTはどちらも、SCD タイプ 1 とタイプ 2 を使用したテーブルの更新をサポートしています。

  • レコードを直接更新するには、SCD タイプ 1 を使用します。更新されたレコードの履歴は保持されません。
  • SCD type 2を使用して、すべての更新または指定された列の更新に対してレコードの履歴を保持します。

構文およびその他のリファレンスについては、 「AUTO CDC for パイプライン ( SQL )」「AUTO CDC for パイプライン ( Python )」「AUTO CDC FROM スナップショット for パイプライン ( Python )」を参照してください。

注記

この記事では、ソース データの変更に基づいてパイプライン内のテーブルを更新する方法について説明します。Deltaテーブルの行レベルの変更情報を記録およびクエリする方法については、 DatabricksでDelta Lake変更データフィードを使用する」を参照してください。

要件

CDC APIs使用するには、サーバレス SDPまたは SDP ProまたはAdvancedエディションを使用するようにパイプラインを構成する必要があります。

AUTO CDC API では CDC はどのように実装されますか?

AUTO CDC API は、順序外レコードを自動的に処理することで、CDC レコードが正しく処理されることを保証し、順序外レコードを処理するための複雑なロジックを開発する必要がなくなります。ソース データ内でレコードを順序付ける列を指定する必要があります。API APIsこれを、ソース データの適切な順序の単調に増加する表現として解釈します。 パイプラインは、順序どおりに到着しないデータを自動的に処理します。SCD タイプ 2 の変更の場合、パイプラインは適切なシーケンス値をターゲット テーブルの__START_AT列と__END_AT列に伝播します。各シーケンス値ごとにキーごとに 1 つの個別の更新が存在する必要があり、NULL シーケンス値はサポートされていません。

AUTO CDCでCDC処理を実行するには、まずストリーミング テーブルを作成し、次にSQLのAUTO CDC ... INTOステートメントまたはPythonのcreate_auto_cdc_flow()関数を使用して、変更フィードのソース、キー、シーケンスを指定します。 ターゲットのストリーミング テーブルを作成するには、 SQLのCREATE OR REFRESH STREAMING TABLEステートメントまたはPythonのcreate_streaming_table()関数を使用します。 SCD タイプ 1 およびタイプ 2 の処理例を参照してください。

構文の詳細については、パイプラインのSQL リファレンスまたはPython リファレンスを参照してください。

AUTO CDC FROM SNAPSHOT API では CDC はどのように実装されますか?

備考

プレビュー

AUTO CDC FROM SNAPSHOT API はパブリック プレビュー段階です。

AUTO CDC FROM SNAPSHOT 一連の順序どおりのスナップショットを比較することでソース データの変更を効率的に判断し、スナップショット内のレコードの CDC 処理に必要な処理を実行する宣言型 API です。AUTO CDC FROM SNAPSHOTは Python パイプライン インターフェースでのみサポートされます。

AUTO CDC FROM SNAPSHOT 複数のソース タイプからのスナップショットの取り込みをサポートします。

  • 定期的なスナップショットの取り込みを使用して、既存のテーブルまたはビューからスナップショットを取り込みます。 AUTO CDC FROM SNAPSHOTには、既存のデータベース オブジェクトからスナップショットを定期的に取り込むことをサポートするシンプルで合理化されたインターフェースがあります。パイプラインの更新ごとに新しいスナップショットが取り込まれ、取り込み時刻がスナップショットのバージョンとして使用されます。パイプラインが連続モードで実行されると、 AUTO CDC FROM SNAPSHOT処理を含むフローのトリガー間隔設定によって決定される期間で、パイプラインの更新ごとに複数のスナップショットが取り込まれます。
  • 履歴スナップショットの取り込みを使用して、Oracle データベース、 MySQLデータベース、またはデータウェアハウスから生成されたスナップショットなどのデータベース スナップショットを含むファイルを処理します。

AUTO CDC FROM SNAPSHOTを使用して任意のソース タイプからCDC処理を実行するには、まずストリーミング テーブルを作成し、次にPythonのcreate_auto_cdc_from_snapshot_flow()関数を使用して、処理の実装に必要なスナップショット、キー、その他の引数を指定します。 定期的なスナップショットの取り込み履歴スナップショットの取り込みの例を参照してください。

API に渡されるスナップショットは、バージョンの昇順になっている必要があります。SDP が順序どおりでないスナップショットを検出すると、エラーがスローされます。

構文の詳細については、パイプラインのPython リファレンスを参照してください。

シーケンスには複数の列を使用する

複数の列で順序付けることができ (たとえば、同点の場合はタイムスタンプと ID)、STRUCT を使用してそれらを組み合わせることができます。つまり、最初に STRUCT の最初のフィールドで順序付けし、同点の場合は 2 番目のフィールドを考慮します。

SQL の例:

SQL
SEQUENCE BY STRUCT(timestamp_col, id_col)

Python の例:

Python
sequence_by = struct("timestamp_col", "id_col")

制限事項

シーケンスに使用する列は、並べ替え可能なデータ型である必要があります。

例: CDFソースデータを使用したSCDタイプ1およびSCDタイプ2の処理

次のセクションでは、変更データフィードからのソース イベントに基づいてターゲット テーブルを更新するSCDタイプ 1 およびタイプ 2 クエリの例を示します。

  1. 新しいユーザー レコードを作成します。
  2. ユーザー レコードを削除します。
  3. ユーザーレコードを更新します。SCD タイプ 1 の例では、最後のUPDATE操作が遅れて到着し、ターゲット テーブルから削除され、順序どおりでないイベントの処理が示されています。

次の例では、パイプラインの構成と更新に関する知識があることを前提としています。「チュートリアル: チェンジデータキャプチャを使用してETLパイプラインを構築する」を参照してください。

これらの例を実行するには、まずサンプル データセットを作成する必要があります。 テスト データの生成を参照してください。

以下はこれらの例の入力記録です:

userId

name

city

operation

sequenceNum

124

Raul

Oaxaca

INSERT

1

123

Isabel

Monterrey

INSERT

1

125

Mercedes

Tijuana

INSERT

2

126

Lily

Cancun

INSERT

2

123

null

null

DELETE

6

125

Mercedes

Guadalajara

UPDATE

6

125

Mercedes

Mexicali

UPDATE

5

123

Isabel

Chihuahua

UPDATE

5

例題データの最後の行のコメントを外すと、レコードを切り捨てる場所を指定する次のレコードが挿入されます:

userId

name

city

operation

sequenceNum

null

null

null

TRUNCATE

3

注記

以下のすべての例には、 DELETETRUNCATE両方の操作を指定するオプションが含まれていますが、それぞれはオプションです。

SCDタイプ1の更新を処理する

次の例は、SCD タイプ 1 の更新の処理を示しています。

Python
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr

@dp.view
def users():
return spark.readStream.table("cdc_data.users")

dp.create_streaming_table("target")

dp.create_auto_cdc_flow(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
apply_as_truncates = expr("operation = 'TRUNCATE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)

SCDタイプ1の例を実行すると、ターゲットテーブルには次のレコードが含まれます:

userId

name

city

124

Raul

Oaxaca

125

Mercedes

Guadalajara

126

Lily

Cancun

追加のTRUNCATEレコードを使用してSCDタイプ1の例を実行すると、sequenceNum=3でのTRUNCATE操作によりレコード124126が切り捨てられ、ターゲットテーブルには次のレコードが含まれます:

userId

name

city

125

Mercedes

Guadalajara

SCDタイプ2の更新を処理する

次の例は、SCD タイプ 2 の更新の処理を示しています。

Python
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr

@dp.view
def users():
return spark.readStream.table("cdc_data.users")

dp.create_streaming_table("target")

dp.create_auto_cdc_flow(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2"
)

SCDタイプ2の例を実行した後、ターゲットテーブルには以下のレコードが含まれます:

userId

name

city

__START_AT

__END_AT

123

Isabel

Monterrey

1

5

123

Isabel

Chihuahua

5

6

124

Raul

Oaxaca

1

null

125

Mercedes

Tijuana

2

5

125

Mercedes

Mexicali

5

6

125

Mercedes

Guadalajara

6

null

126

Lily

Cancun

2

null

SCD タイプ 2 クエリでは、ターゲット テーブルの履歴を追跡する出力列のサブセットも指定できます。他の列への変更は、新しい履歴レコードを生成するのではなく、その場で更新されます。次の例は、 city列を追跡から除外する方法を示しています。

次の例は、SCDタイプ2でトラックヒストリーを使用する例です:

Python
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr

@dp.view
def users():
return spark.readStream.table("cdc_data.users")

dp.create_streaming_table("target")

dp.create_auto_cdc_flow(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2",
track_history_except_column_list = ["city"]
)

追加のTRUNCATEレコードなしでこの例を実行すると、ターゲット テーブルには次のレコードが含まれます。

userId

name

city

__START_AT

__END_AT

123

Isabel

Chihuahua

1

6

124

Raul

Oaxaca

1

null

125

Mercedes

Guadalajara

2

null

126

Lily

Cancun

2

null

テストデータを生成する

以下のコードは、このチュートリアルにあるサンプルクエリで使用するサンプル データセットを生成するために提供されています。新しいスキーマを作成して新しいテーブルを作成するための適切な資格情報を持っていると仮定すると、ノートブックまたは Databricks SQL のいずれかを使用してこれらのステートメントを実行できます。次のコードは、パイプライン定義の一部として実行されることを意図したものでは ありません

SQL
CREATE SCHEMA IF NOT EXISTS cdc_data;

CREATE TABLE
cdc_data.users
AS SELECT
col1 AS userId,
col2 AS name,
col3 AS city,
col4 AS operation,
col5 AS sequenceNum
FROM (
VALUES
-- Initial load.
(124, "Raul", "Oaxaca", "INSERT", 1),
(123, "Isabel", "Monterrey", "INSERT", 1),
-- New users.
(125, "Mercedes", "Tijuana", "INSERT", 2),
(126, "Lily", "Cancun", "INSERT", 2),
-- Isabel is removed from the system and Mercedes moved to Guadalajara.
(123, null, null, "DELETE", 6),
(125, "Mercedes", "Guadalajara", "UPDATE", 6),
-- This batch of updates arrived out of order. The above batch at sequenceNum 6 will be the final state.
(125, "Mercedes", "Mexicali", "UPDATE", 5),
(123, "Isabel", "Chihuahua", "UPDATE", 5)
-- Uncomment to test TRUNCATE.
-- ,(null, null, null, "TRUNCATE", 3)
);

例: 定期的なスナップショット処理

次の例は、 mycatalog.myschema.mytableに保存されているテーブルのスナップショットを取り込む SCD タイプ 2 処理を示しています。処理の結果はtargetという名前のテーブルに書き込まれます。

mycatalog.myschema.mytable タイムスタンプ2024-01-01 00:00:00の記録

Key

Value

1

a1

2

a2

mycatalog.myschema.mytable タイムスタンプ2024-01-01 12:00:00の記録

Key

Value

2

b2

3

a3

Python
from pyspark import pipelines as dp

@dp.view(name="source")
def source():
return spark.read.table("mycatalog.myschema.mytable")

dp.create_streaming_table("target")

dp.create_auto_cdc_from_snapshot_flow(
target="target",
source="source",
keys=["key"],
stored_as_scd_type=2
)

スナップショットを処理すると、ターゲット テーブルに次のレコードが含まれます。

Key

Value

__START_AT

__END_AT

1

a1

2024年1月1日 00:00:00

2024年1月1日 12:00:00

2

a2

2024年1月1日 00:00:00

2024年1月1日 12:00:00

2

b2

2024年1月1日 12:00:00

null

3

a3

2024年1月1日 12:00:00

null

例: 履歴スナップショット処理

次の例は、クラウド ストレージ システムに保存されている 2 つのスナップショットからのソース イベントに基づいてターゲット テーブルを更新する SCD タイプ 2 処理を示しています。

timestampのスナップショット、保存場所 /<PATH>/filename1.csv

Key

TrackingColumn

NonTrackingColumn

1

a1

b1

2

a2

b2

4

a4

b4

timestamp + 5のスナップショット、保存場所 /<PATH>/filename2.csv

Key

TrackingColumn

NonTrackingColumn

2

a2_new

b2

3

a3

b3

4

a4

b4_new

次のコード例は、これらのスナップショットを使用して SCD タイプ 2 の更新を処理する方法を示しています。

Python
from pyspark import pipelines as dp

def exist(file_name):
# Storage system-dependent function that returns true if file_name exists, false otherwise

# This function returns a tuple, where the first value is a DataFrame containing the snapshot
# records to process, and the second value is the snapshot version representing the logical
# order of the snapshot.
# Returns None if no snapshot exists.
def next_snapshot_and_version(latest_snapshot_version):
latest_snapshot_version = latest_snapshot_version or 0
next_version = latest_snapshot_version + 1
file_name = "dir_path/filename_" + next_version + ".csv"
if (exist(file_name)):
return (spark.read.load(file_name), next_version)
else:
# No snapshot available
return None

dp.create_streaming_live_table("target")

dp.create_auto_cdc_from_snapshot_flow(
target = "target",
source = next_snapshot_and_version,
keys = ["Key"],
stored_as_scd_type = 2,
track_history_column_list = ["TrackingCol"]
)

スナップショットを処理すると、ターゲット テーブルに次のレコードが含まれます。

Key

TrackingColumn

NonTrackingColumn

__START_AT

__END_AT

1

a1

b1

1

2

2

a2

b2

1

2

2

a2_new

b2

2

null

3

a3

b3

2

null

4

a4

b4_new

1

null

ターゲットのストリーミングテーブルのデータを追加、変更、または削除します。

パイプラインがテーブルをUnity Catalogに公開する場合、insert、update、delete、merge ステートメントを含むデータ操作言語(DML) ステートメントを使用して、 AUTO CDC ... INTOステートメントによって作成されたターゲット ストリーミング テーブルを変更できます。

注記
  • ストリーミングテーブルのテーブルスキーマを変更する DML ステートメントはサポートされていません。DML ステートメントによってテーブルスキーマの進化が発生しないようにしてください。
  • ストリーミングテーブルを更新する DML ステートメントは、Databricks Runtime 13.3 LTS 以上を使用する共有Unity Catalog クラスターまたはSQL ウェアハウスでのみ実行できます。
  • ストリーミングには追加専用のデータ ソースが必要なため、処理で (DML ステートメントなどによる) 変更を伴うソース ストリーミング テーブルからのストリーミングが必要な場合は、ソース ストリーミング テーブルの読み取り時にSkipChangeCommits フラグを設定します。 skipChangeCommitsが設定されている場合、ソース テーブルのレコードを削除または変更するトランザクションは無視されます。処理にストリーミング テーブルが必要ない場合は、ターゲット テーブルとしてマテリアライズドビュー (追加のみの制限がない) を使用できます。

LakeFlow Spark宣言型パイプラインは指定されたSEQUENCE BY列を使用し、適切な順序値をターゲット テーブルの__START_AT列と__END_AT列に伝播するため ( SCDタイプ 2 の場合)、レコードの適切な順序を維持するために、DML ステートメントでこれらの列に有効な値が使用されていることを確認する必要があります。 「AUTO CDC API を使用して CDC をどのように実装するか」を参照してください。

ストリーミングテーブルでの DML ステートメントの使用の詳細については、 ストリーミングテーブルのデータを追加、変更、または削除するを参照してください。

次の例では、開始シーケンスを 5 としたアクティブレコードを挿入しています。

SQL
INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);

AUTO CDCターゲットテーブルから変更データフィードを読み取ります

Databricks Runtime 15.2 以降では、他の Delta テーブルから変更データフィードを読み取るのと同じ方法で、 AUTO CDCまたはAUTO CDC FROM SNAPSHOTターゲットであるDeltaテーブルから変更データフィードを読み取ることができます。 ターゲットのストリーミング テーブルから変更データフィードを読み取るには、次のものが必要です。

  • ターゲットのストリーミング テーブルはUnity Catalogに公開されている必要があります。 「パイプラインで Unity Catalog を使用する」を参照してください。
  • ターゲット ストリーミング テーブルから変更データフィードを読み取るには、 Databricks Runtime 15.2 以降を使用する必要があります。 別のパイプラインで変更データフィードを読み取るには、 Databricks Runtime 15.2 以降を使用するようにパイプラインを構成する必要があります。

他の Delta テーブルから変更データフィードを読み取るのと同じ方法で、 LakeFlow Spark宣言型パイプラインで作成されたDeltaストリーミング テーブルから変更データフィードを読み取ります。 Deltaデータフィード機能の使用方法 ( PythonやSQLの例など) の詳細については、 DatabricksでDelta Lakeデータフィードを使用する」を参照してください。

注記

変更データフィード レコードには、変更イベントのタイプを識別するメタデータが含まれています。 テーブル内のレコードが更新されると、関連付けられた変更レコードのメタデータには通常、 update_preimageおよびupdate_postimageイベントに設定された_change_type値が含まれます。

ただし、主キー値の変更を含む更新がターゲット ストリーミング テーブルに行われた場合、 _change_type値は異なります。 変更に主キーの更新が含まれる場合、 _change_typeメタデータ フィールドはinsertおよびdeleteイベントに設定されます。主キーの変更は、 UPDATEまたはMERGEステートメントを使用してキー フィールドの 1 つに手動で更新が行われたとき、または SCD タイプ 2 テーブルの場合は、 __start_atフィールドが以前の開始シーケンス値を反映するように変更されたときに発生する可能性があります。

AUTO CDCクエリは、SCD タイプ 1 と SCD タイプ 2 の処理で異なる主キー値を決定します。

  • SCDタイプ 1 処理とパイプラインPythonインターフェイスの場合、主キーはcreate_auto_cdc_flow()関数のkeysの値です。 SQL インターフェースの場合、主キーはAUTO CDC ... INTOステートメントのKEYS句で定義された列です。
  • SCDタイプ 2 の場合、主キーは、 keys問題またはKEYS句に、 coalesce(__START_AT, __END_AT)操作からの戻り値を加えたものです。ここで、 __START_AT__END_AT 、ターゲット ストリーミング テーブルの対応する列です。

パイプライン内の CDC クエリによって処理されたレコードに関するデータを取得する

注記

次のメトリクスは、 AUTO CDCクエリによってのみキャプチャされ、 AUTO CDC FROM SNAPSHOTクエリによってはキャプチャされません。

次のメトリクスは、 AUTO CDCクエリによってキャプチャされます。

  • num_upserted_rows : 更新中にデータセットにアップサートされた出力行の数。
  • num_deleted_rows : 更新中にデータセットから削除された既存の出力行の数。

非 CDC フローの出力であるnum_output_rowsメトリクスは、 AUTO CDCクエリではキャプチャされません。

パイプラインの CDC 処理に使用されるデータ オブジェクトは何ですか?

注記
  • これらのデータ構造はAUTO CDC処理にのみ適用され、 AUTO CDC FROM SNAPSHOT処理には適用されません。
  • これらのデータ構造は、ターゲット テーブルがHive metastoreに公開される場合にのみ適用されます。 パイプラインが Unity Catalog に公開される場合、ユーザーは内部のバッキング テーブルにアクセスできません。

Hive metastore でターゲットテーブルを宣言すると、2 つのデータ構造が作成されます。

  • ターゲットテーブルに割り当てられた名前を使用するビュー。
  • CDC 処理を管理するためにパイプラインによって使用される内部バッキング テーブル。このテーブルの名前は、ターゲット テーブル名の前に__apply_changes_storage_を付加して付けられます。

たとえば、 dp_cdc_targetという名前のターゲット テーブルを宣言すると、メタストアにdp_cdc_targetという名前のビューと__apply_changes_storage_dp_cdc_targetという名前のテーブルが表示されます。ビューを作成すると、 LakeFlow Spark宣言型パイプラインは、順序が崩れたデータを処理するために必要な余分な情報 (墓石やバージョンなど) をフィルターで除外できるようになります。 処理されたデータを表示するには、ターゲット ビューをクエリします。__apply_changes_storage_テーブルのスキーマは将来の機能や拡張機能をサポートするために変更される可能性があるため、本番運用で使用するためにテーブルをクエリしないでください。 テーブルにデータを手動で追加する場合、バージョン列がないため、レコードは他の変更よりも前に来るものと見なされます。

その他のリソース