apply_changes_from_snapshot
プレビュー
この機能は パブリック プレビュー段階です。
apply_changes_from_snapshot
関数は、DLT チェンジデータキャプチャ (CDC) 機能を使用して、データベース スナップショットのソース データを処理します。CDC は APPLY CHANGES FROM SNAPSHOT
API でどのように実装されますか?を参照してください。
この操作には、ターゲット ストリーミングテーブルが必要です。 オプションで、ターゲットテーブルの列とそのタイプを指定できます。apply_changes_from_snapshot()
ターゲット表の列とそのタイプを指定する場合は、sequence_by
フィールドと同じデータ・タイプの__START_AT
列と__END_AT
列も含める必要があります。
必要なターゲットテーブルを作成するには、 create_streaming_table() 関数を使用できます。
構文
import dlt
dlt.apply_changes_from_snapshot(
target = "<target-table>",
source = Any,
keys = ["key1", "key2", "keyN"],
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
)
APPLY CHANGES FROM SNAPSHOT
処理の場合、デフォルトの動作では、同じキーを持つ一致するレコードがターゲットに存在しない場合に新しいローが挿入されます。一致するレコードが存在する場合は、行のいずれかの値が変更された場合にのみ更新されます。ターゲットに存在し、ソースに存在しなくなったキーを持つ行は削除されます。
スナップショットを使用したCDC処理の詳細については、「 APPLY CHANGES APIs: DLTによるチェンジデータキャプチャの簡素化」を参照してください。apply_changes_from_snapshot()
関数の使用例については、定期的なスナップショットの取り込みとスナップショットの履歴取り込みの例を参照してください。
パラメーター
パラメーター | タイプ | 説明 |
---|---|---|
|
| 必須。更新するテーブルの名前。create_streaming_table() 関数を使用して、 |
|
| 必須。定期的にスナップショットを作成するテーブルまたはビューの名前、または処理するスナップショット データフレーム とスナップショット バージョンを返す Python ラムダ関数。 |
|
| 必須。 ソースデータ内の行を一意に識別する列または列の組み合わせ。これは、どのCDCイベントがターゲットテーブル内の特定のレコードに適用されるかを識別するために使用されます。 次のいずれかを指定できます:
|
|
| レコードを SCD タイプ 1 として保管するか、SCD タイプ 2 として保管するか。SCD タイプ 1 の場合は |
|
| ターゲット・テーブル内のヒストリーについて追跡する出力列のサブセット。
|
source
引数を実装する
apply_changes_from_snapshot()
関数には source
引数が含まれています。履歴スナップショットを処理する場合、 引数は、処理するスナップショットデータを含む とスナップショットバージョンという source
Python2 つの値を 関数に返す ラムダ関数である必要があります。apply_changes_from_snapshot()
Pythonデータフレーム
以下は、ラムダ関数のシグネチャです。
lambda Any => Optional[(DataFrame, Any)]
- lambda 関数の引数は、最後に処理されたスナップショット バージョンです。
- ラムダ関数の戻り値が
None
または 2 つの値のタプルである: タプルの最初の値は、処理するスナップショットを含む データフレーム です。 タプルの 2 番目の値は、スナップショットの論理順序を表すスナップショット バージョンです。
ラムダ関数を実装して呼び出す例を次に示します。
def next_snapshot_and_version(latest_snapshot_version: Optional[int]) -> Tuple[Optional[int], DataFrame]:
if latest_snapshot_version is None:
return (spark.read.load("filename.csv"), 1)
else:
return None
apply_changes_from_snapshot(
# ...
source = next_snapshot_and_version,
# ...
)
DLT ランタイムは、 apply_changes_from_snapshot()
関数を含むパイプラインがトリガーされるたびに、次の手順を実行します。
next_snapshot_and_version
関数を実行して、次のスナップショット データフレーム と対応するスナップショット バージョンを読み込みます。- データフレーム が返されない場合、実行は終了し、パイプラインの更新は完了としてマークされます。
- 新しいスナップショットの変更を検出し、それらをターゲット・テーブルに段階的に適用します。
- ステップ #1 に戻り、次のスナップショットとそのバージョンをロードします。