メインコンテンツまでスキップ

スナップショットフローから自動 CDC を作成する

備考

プレビュー

この機能はパブリック プレビュー段階です。

create_auto_cdc_from_snapshot_flow 関数は、Lakeflow 宣言型パイプライン チェンジデータキャプチャ (CDC) 機能を使用してデータベース スナップショットのソース データを処理するフローを作成します。CDC は AUTO CDC FROM SNAPSHOT API でどのように実装されますか?を参照してください。

注記

この関数は以前の関数apply_changes_from_snapshot()を置き換えます。2 つの関数は同じシグネチャを持ちます。Databricks では、新しい名前を使用するように更新することをお勧めします。

important

この操作にはターゲット ストリーミング テーブルが必要です。

必要なターゲット テーブルを作成するには、 create_streaming_table()関数を使用できます。

構文

Python
from pyspark import pipelines as dp

dp.create_auto_cdc_from_snapshot_flow(
target = "<target-table>",
source = Any,
keys = ["key1", "key2", "keyN"],
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
)
注記

AUTO CDC FROM SNAPSHOT処理の場合、デフォルトの動作では、同じキーを持つ一致するレコードがターゲットに存在しない場合に新しい行を挿入します。一致するレコードが存在する場合は、行のいずれかの値が変更された場合にのみ更新されます。ターゲットには存在するがソースには存在しないキーを持つ行は削除されます。

スナップショットを使用した CDC 処理の詳細については、「 AUTO CDC API: Lakeflow 宣言型パイプラインによるチェンジデータキャプチャをシンプルに」を参照してください。 create_auto_cdc_from_snapshot_flow() 関数の使用例については、定期的なスナップショットの取り込みスナップショットの履歴取り込みの例を参照してください。

問題

パラメーター

Type

説明

target

str

必須。更新するテーブルの名前。create_auto_cdc_from_snapshot_flow()関数を実行する前に、 create_streaming_table()関数を使用してターゲット テーブルを作成できます。

source

str または lambda function

必須。定期的にスナップショットを作成するテーブルまたはビューの名前、または処理するスナップショット データフレーム とスナップショット バージョンを返す Python ラムダ関数。 source引数の実装を参照してください。

keys

list

必須。ソースデータ内の行を一意に識別する列または列の組み合わせ。これは、どのCDCイベントがターゲットテーブル内の特定のレコードに適用されるかを識別するために使用されます。 次のいずれかを指定できます。

  • 文字列のリスト: ["userId", "orderId"]

  • Spark SQL col()関数のリスト: [col("userId"), col("orderId"]

col()関数の引数に修飾子を含めることはできません。例えば、col(userId)は使えますが、col(source.userId)は使えません。

stored_as_scd_type

str または int

レコードを SCD タイプ 1 として保存するか、SCD タイプ 2 として保存するかを指定します。SCD タイプ 1 の場合は1 、SCD タイプ 2 の場合は2に設定します。デフォルトは SCD タイプ 1 です。

track_history_column_list または track_history_except_column_list

list

ターゲット テーブル内の履歴を追跡する出力列のサブセット。追跡する列の完全なリストを指定するには、 track_history_column_listを使用します。追跡から除外する列を指定するには、 track_history_except_column_listを使用します。どちらの値も、文字列のリストまたは Spark SQL col()関数として宣言できます。

  • track_history_column_list = ["userId", "name", "city"]
  • track_history_column_list = [col("userId"), col("name"), col("city")]
  • track_history_except_column_list = ["operation", "sequenceNum"]
  • track_history_except_column_list = [col("operation"), col("sequenceNum")

col()関数の引数には修飾子を含めることはできません。たとえば、 col(userId)は使用できますが、 col(source.userId)は使用できません。デフォルトでは、関数にtrack_history_column_listまたはtrack_history_except_column_list引数が渡されない場合、ターゲット テーブルのすべての列が含まれます。

source引数を実装する

create_auto_cdc_from_snapshot_flow() 関数には source 引数が含まれています。履歴スナップショットを処理する場合、 引数は、処理するスナップショットデータを含む とスナップショットバージョンという sourcePython2 つの値を 関数に返す ラムダ関数である必要があります。create_auto_cdc_from_snapshot_flow()Pythonデータフレーム

以下はラムダ関数のシグネチャです。

Python
lambda Any => Optional[(DataFrame, Any)]
  • ラムダ関数への引数は、最後に処理されたスナップショット バージョンです。
  • ラムダ関数の戻り値が None または 2 つの値のタプルである: タプルの最初の値は、処理するスナップショットを含む データフレーム です。 タプルの 2 番目の値は、スナップショットの論理順序を表すスナップショット バージョンです。

ラムダ関数を実装して呼び出す例:

Python
def next_snapshot_and_version(latest_snapshot_version: Optional[int]) -> Tuple[Optional[int], DataFrame]:
if latest_snapshot_version is None:
return (spark.read.load("filename.csv"), 1)
else:
return None

create_auto_cdc_from_snapshot_flow(
# ...
source = next_snapshot_and_version,
# ...
)

Lakeflow 宣言型パイプライン ランタイムは、create_auto_cdc_from_snapshot_flow() 関数を含むパイプラインがトリガーされるたびに、次の手順を実行します。

  1. next_snapshot_and_version 関数を実行して、次のスナップショット データフレーム と対応するスナップショット バージョンを読み込みます。
  2. データフレーム が返されない場合、実行は終了し、パイプラインの更新は完了としてマークされます。
  3. 新しいスナップショットの変更を検出し、それをターゲット テーブルに段階的に適用します。
  4. ステップ #1 に戻り、次のスナップショットとそのバージョンをロードします。