メインコンテンツまでスキップ

apply_changes

apply_changes() 関数は、DLT チェンジデータキャプチャ (CDC) 機能を使用して、チェンジデータフィード (CDF) からのソース データを処理します。

important

変更を適用するターゲット ストリーミングテーブルを宣言する必要があります。 オプションで、ターゲットテーブルのスキーマを指定できます。apply_changes()ターゲットテーブルのスキーマを指定するときは、sequence_byフィールドと同じデータ型の__START_AT列と__END_AT列を含める必要があります。

必要なターゲットテーブルを作成するには、DLT Pythonインターフェースの create_streaming_table() 関数を使用できます。

構文

Python
import dlt

dlt.apply_changes(
target = "<target-table>",
source = "<data-source>",
keys = ["key1", "key2", "keyN"],
sequence_by = "<sequence-column>",
ignore_null_updates = False,
apply_as_deletes = None,
apply_as_truncates = None,
column_list = None,
except_column_list = None,
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
)

apply_changes処理の場合、INSERTイベントとUPDATEイベントのデフォルトの動作は、ソースから CDC イベント を更新する ことです。指定したキーに一致するターゲットテーブルの行を更新するか、一致するレコードがターゲットテーブルに存在しない場合は新しい行を挿入します。DELETEイベントの処理は、apply_as_deletes パラメーターで指定できます。

変更フィードを使用したCDC処理の詳細については、「変更の適用APIs: DLTによるチェンジデータキャプチャの簡略化」を参照してください。apply_changes() 関数の使用例については、例: CDF ソース・データを使用した SCD タイプ 1 および SCD タイプ 2 の処理を参照してください。

パラメーター

パラメーター

タイプ

説明

target

str

必須。更新するテーブルの名前。create_streaming_table() 関数を使用して、apply_changes() 関数を実行する前にターゲット テーブルを作成できます。

source

str

必須。 CDCレコードを含むデータソース。

keys

list

必須。 ソースデータ内の行を一意に識別する列または列の組み合わせ。これは、どのCDCイベントがターゲットテーブル内の特定のレコードに適用されるかを識別するために使用されます。 次のいずれかを指定できます:

  • 文字列のリスト: ["userId", "orderId"]
  • Spark SQL col() 関数のリスト: [col("userId"), col("orderId"].col()関数の引数に修飾子を含めることはできません。たとえば、 col(userId)は使用できますが、 col(source.userId)は使用できません。

sequence_by

strcol() 、または struct()

必須。ソース・データ内の CDC イベントの論理的な順序を指定する列名。DLT は、このシーケンスを使用して、順不同で到着した変更イベントを処理します。指定する列は、ソート可能なデータ型である必要があります。次のいずれかを指定できます。

  • 文字列: "sequenceNum"
  • Spark SQL col() 関数: col("sequenceNum").col()関数の引数に修飾子を含めることはできません。たとえば、 col(userId)は使用できますが、 col(source.userId)は使用できません。
  • 複数の列を組み合わせて同順位を破る struct() : struct("timestamp_col", "id_col")、最初に最初の構造体フィールド、次に同点の場合は 2 番目のフィールドの順に並べられます。

ignore_null_updates

bool

ターゲットカラムのサブセットを含む更新の取り込みを許可します。 CDC イベントが既存の行と一致し、 ignore_null_updatesTrueの場合、 null を持つ列はターゲット内の既存の値を保持します。これは、値が nullのネストされた列にも適用されます。ignore_null_updatesFalseの場合、既存の値はnull値で上書きされます。

デフォルトはFalseです。

apply_as_deletes

str または expr()

CDCイベントをupsertではなくDELETEとして扱う必要がある場合を指定します。次のいずれかを指定できます:

  • 文字列: "Operation = 'DELETE'"
  • Spark SQL expr()関数は次のとおりです: expr("Operation = 'DELETE'")

順不同のデータを処理するために、削除された行は基になる Delta テーブルに廃棄石として一時的に保持され、これらの廃棄石を除外するビューがメタストアに作成されます。保持間隔は、 pipelines.cdc.tombstoneGCThresholdInSeconds テーブル プロパティを使用して構成できます。

apply_as_truncates

str または expr()

CDCイベントを完全なテーブルTRUNCATEとして扱う必要がある場合を指定します。次のいずれかを指定できます:

  • 文字列: "Operation = 'TRUNCATE'"
  • Spark SQL expr()関数は次のとおりです: expr("Operation = 'TRUNCATE'")

この句はターゲットテーブルの完全な切り捨てをトリガーするため、この機能を必要とする特定のユースケースにのみ使用してください。apply_as_truncates パラメーターは、SCD タイプ 1 でのみサポートされます。SCDタイプ 2 では、切り捨て操作はサポートされていません。

column_list または except_column_list

list

ターゲットテーブルに含める列のサブセット。column_listを使用して、含める列の完全なリストを指定します。except_column_listを使用して、除外する列を指定します。いずれかの値を文字列のリストとして宣言することも、Spark SQL col()関数として宣言することもできます:

  • column_list = ["userId", "name", "city"]
  • column_list = [col("userId"), col("name"), col("city")]
  • except_column_list = ["operation", "sequenceNum"]
  • except_column_list = [col("operation"), col("sequenceNum")

col()関数の引数に修飾子を含めることはできません。たとえば、 col(userId)は使用できますが、 col(source.userId)は使用できません。デフォルトでは、 column_list または except_column_list 引数が関数に渡されない場合、ターゲットテーブルのすべての列が含まれます。

stored_as_scd_type

str または int

レコードを SCD タイプ 1 として保管するか、SCD タイプ 2 として保管するか。SCD タイプ 1 の場合は 1 、SCD タイプ 2 の場合は 2 に設定します。デフォルトは SCD タイプ 1 です。

track_history_column_list または track_history_except_column_list

list

ターゲット・テーブル内のヒストリーについて追跡する出力列のサブセット。track_history_column_list を使用して、追跡する列の完全なリストを指定します。track_history_except_column_list を使用して、トラッキングから除外する列を指定します。どちらの値も、文字列のリストとして、または Spark SQL col() 関数として宣言できます。

  • track_history_column_list = ["userId", "name", "city"]
  • track_history_column_list = [col("userId"), col("name"), col("city")]
  • track_history_except_column_list = ["operation", "sequenceNum"]
  • track_history_except_column_list = [col("operation"), col("sequenceNum")

col()関数の引数に修飾子を含めることはできません。たとえば、 col(userId)は使用できますが、 col(source.userId)は使用できません。デフォルトでは、 track_history_column_list または track_history_except_column_list 引数が関数に渡されない場合、ターゲットテーブルのすべての列が含まれます。