メインコンテンツまでスキップ

自動cdcフローを作成する

create_auto_cdc_flow()関数は、 Lakeflow Spark宣言型パイプライン チェンジデータキャプチャ ( CDC ) 機能を使用して、チェンジデータフィード (CDF) からのソース データを処理するフローを作成します。

注記

この関数は以前の関数apply_changes()を置き換えます。2 つの関数は同じシグネチャを持ちます。Databricks では、新しい名前を使用するように更新することをお勧めします。

重要

変更を適用するターゲット ストリーミング テーブルを宣言する必要があります。 オプションで、ターゲット テーブルのスキーマを指定できます。create_auto_cdc_flow()ターゲット テーブルのスキーマを指定するときは、 sequence_byフィールドと同じデータ型の__START_AT列と__END_AT列を含める必要があります。

必要なターゲット テーブルを作成するには、パイプライン Python インターフェースのcreate_streaming_table()関数を使用できます。

構文

Python
from pyspark import pipelines as dp

dp.create_auto_cdc_flow(
target = "<target-table>",
source = "<data-source>",
keys = ["key1", "key2", "keyN"],
sequence_by = "<sequence-column>",
system_sequence_by = None,
ignore_null_updates = False,
apply_as_deletes = None,
apply_as_truncates = None,
column_list = None,
except_column_list = None,
stored_as_scd_type = "1",
track_history_column_list = None,
track_history_except_column_list = None,
name = None,
once = False
)

create_auto_cdc_flow処理の場合、 INSERTおよびUPDATEイベントのデフォルトの動作は、ソースから CDC イベント をアップサートする ことです。つまり、指定されたキーに一致するターゲット テーブルのすべての行を更新するか、一致するレコードがターゲット テーブルに存在しない場合は新しい行を挿入します。DELETEイベントの処理は、 apply_as_deletesパラメーターで指定できます。

変更フィードを使用したCDC処理の詳細については、 「AUTO CDC APIs : パイプラインを使用した変更データ キャプチャの簡素化」を参照してください。 create_auto_cdc_flow()関数の使用例については、 AUTO CDC の例を参照してください。

パラメーター

パラメーター

Type

説明

target

str

必須。更新するテーブルの名前。create_auto_cdc_flow()関数を実行する前に、 create_streaming_table()関数を使用してターゲット テーブルを作成できます。

source

str

必須。CDCレコードを含むデータソース。

keys

list

必須。ソースデータ内の行を一意に識別する列または列の組み合わせ。これは、どのCDCイベントがターゲットテーブル内の特定のレコードに適用されるかを識別するために使用されます。 次のいずれかを指定できます。

  • 文字列のリスト: ["userId", "orderId"]
  • Spark SQL col()関数のリスト: [col("userId"), col("orderId")]col()関数の引数には修飾子を含めることはできません。たとえば、 col(userId)は使用できますが、 col(source.userId)は使用できません。

sequence_by

strcol()または struct()

必須。ソース データ内の CDC イベントの論理順序を指定する列名。Lakeflow Spark宣言型パイプラインは、このシーケンスを使用して、順序どおりに到着しない変更イベントを処理します。 指定された列は並べ替え可能なデータ型である必要があります。次のいずれかを指定できます。

  • 文字列: "sequenceNum"
  • Spark SQL col()関数: col("sequenceNum")col()関数の引数には修飾子を含めることはできません。たとえば、 col(userId)は使用できますが、 col(source.userId)は使用できません。
  • struct()は、複数の列を組み合わせて同順位を解消するものです: struct("timestamp_col", "id_col")。これは、最初の構造体フィールドで順序付けを行い、同順位の場合は2番目のフィールドで、といった形で順序付けを行います。

system_sequence_by

str または col()

各CDCイベントがシステムに認識されたシステム時刻を指定する列です。stored_as_scd_type="bitemporal"と共に使用され、ビジネス時間 (sequence_by) とシステム時間の両方で変更を追跡します。指定された列はソート可能なデータ型である必要があります。バイテンポラルAUTO CDCはベータ版です。「バイテンポラルAUTO CDCの仕組み」を参照してください。

このパラメーターはオプションであり、バイテンポラルテーブルにのみ適用されます。

ignore_null_updates

bool

着信する CDC アップデートでの null の値の処理方法を制御します。ignore_null_updatesTrue の場合、着信更新の null 列は無視されます。ターゲット行の既存の値は保持されます。これはnullの値を持つネストされた列にも適用されます。ignore_null_updatesFalseの場合、受信更新のnull列はターゲット内の既存の値を上書きします。デフォルトはFalseです。

ソースイベントに変更された列のみが含まれる場合はTrueに設定し、変更されていない列はnullで上書きされないようにします。

デフォルトはFalseです。

apply_as_deletes

str または expr()

CDCイベントをupsertではなくDELETEとして扱う必要がある場合を指定します。次のいずれかを指定できます。

  • 文字列: "Operation = 'DELETE'"
  • Spark SQL expr()関数は次のとおりです: expr("Operation = 'DELETE'")

順序が正しくないデータを処理するために、削除された行は基になる Delta テーブルにトゥームストーンとして一時的に保持され、これらのトゥームストーンをフィルター処理するビューがメタストアに作成されます。保持間隔はデフォルトで 2 日に設定されており、 pipelines.cdc.tombstoneGCThresholdInSecondsテーブル プロパティを使用して構成できます。

Auto Loader CDCパイプラインのソースとして使用する場合、 Auto Loaderファイルの処理順序を保証しません。 詳細については、 「順不同データの処理」を参照してください。pipelines.cdc.tombstoneGCThresholdInSecondsは、イベントの到着からパイプラインの実行までの最大予想遅延時間を超える値を設定してください。これにより、削除の痕跡が、遅れて到着した削除イベントや順不同の削除イベントを正しく処理するのに十分な期間保持されることが保証されます。

apply_as_truncates

str または expr()

CDCイベントを完全なテーブルTRUNCATEとして扱う必要がある場合を指定します。次のいずれかを指定できます。

  • 文字列: "Operation = 'TRUNCATE'"
  • Spark SQL expr()関数は次のとおりです: expr("Operation = 'TRUNCATE'")

この句はターゲット テーブルの完全な切り捨てをトリガーするため、この機能を必要とする特定のユース ケースにのみ使用する必要があります。apply_as_truncates引数はSCDタイプ 1 でのみサポートされます。SCD タイプ 2 SCD切り捨て操作をサポートしません。

column_list または except_column_list

list

ターゲットテーブルに含める列のサブセット。column_listを使用して、含める列の完全なリストを指定します。except_column_listを使用して、除外する列を指定します。いずれかの値を文字列のリストとして宣言することも、Spark SQL col()関数として宣言することもできます:

  • column_list = ["userId", "name", "city"]
  • column_list = [col("userId"), col("name"), col("city")]
  • except_column_list = ["operation", "sequenceNum"]
  • except_column_list = [col("operation"), col("sequenceNum")

col()関数の引数には修飾子を含めることはできません。たとえば、 col(userId)は使用できますが、 col(source.userId)は使用できません。デフォルトでは、関数にcolumn_listまたはexcept_column_list引数が渡されない場合、ターゲット テーブルのすべての列が含まれます。

stored_as_scd_type

str または int

レコードをSCDタイプ1、SCDタイプ2、またはバイテンポラルとして保存するかどうか。SCD type 1の場合は1、SCD type 2の場合は2、ビジネス時間とシステム時間の両方で変更を追跡する場合は"bitemporal"に設定します。Bitemporalにはsystem_sequence_byが必要で、ベータ版です。「Bitemporal AUTO CDC の仕組み」を参照してください。デフォルトはSCDタイプ1です。

track_history_column_list または track_history_except_column_list

list

ターゲット テーブル内の履歴を追跡する出力列のサブセット。追跡する列の完全なリストを指定するには、 track_history_column_listを使用します。追跡から除外する列を指定するには、 track_history_except_column_listを使用します。どちらの値も、文字列のリストまたは Spark SQL col()関数として宣言できます。

  • track_history_column_list = ["userId", "name", "city"]
  • track_history_column_list = [col("userId"), col("name"), col("city")]
  • track_history_except_column_list = ["operation", "sequenceNum"]
  • track_history_except_column_list = [col("operation"), col("sequenceNum")

col()関数の引数には修飾子を含めることはできません。たとえば、 col(userId)は使用できますが、 col(source.userId)は使用できません。デフォルトでは、関数にtrack_history_column_listまたはtrack_history_except_column_list引数が渡されない場合、ターゲット テーブルのすべての列が含まれます。

name

str

フロー名。指定されていない場合は、デフォルトでtargetと同じ値になります。

once

bool

必要に応じて、フローをバックフィルなどの 1 回限りのフローとして定義します。once=Trueを使用すると、フローは次の 2 つの方法で変化します。

  • 戻り値。streaming-query 。この場合、ストリーミング DataFrame ではなく、バッチ DataFrame である必要があります。
  • デフォルトでは、フローは 1 回実行されます。パイプラインが完全リフレッシュで更新されると、 ONCEフローが再度実行され、データが再作成されます。
このページの見出し