メインコンテンツまでスキップ

外部RDBMSテーブルをレプリケートする: AUTO CDC

このページでは、LakeFlow宣言型パイプラインのAUTO CDC APIを使用して、外部リレーショナルデータベース管理システム (RDBMS) からDatabricksにテーブルをレプリケートする方法を説明します。学習内容:

  • ソースを設定するための一般的なパターン。
  • onceフローを使用して、既存のデータの 1 回限りの完全コピーを実行する方法。
  • changeフローを使用して新しい変更を継続的に取り込む方法。

このパターンは、緩やかに変化するディメンション ( SCD ) テーブルを構築したり、ターゲット テーブルを外部のレコード システムと同期させたりするのに最適です。

始める前に

このガイドでは、ソースから次のデータセットにアクセスできることを前提としています。

  • クラウド ストレージ内のソース テーブルの完全なスナップショット。このデータセットは初期ロードに使用されます。
  • 同じクラウド ストレージの場所に入力される継続的な変更フィード (たとえば、Debezium、Kafka、またはログベースの CDC を使用)。このフィードは進行中のAUTO CDCプロセスへの入力です。

ソースビューを設定する

まず、クラウド ストレージ パスorders_snapshot_pathからrdbms_ordersターゲット テーブルにデータを入力する 2 つのソース ビューを定義します。どちらも、クラウド ストレージ内の生データに対するストリーミング ビューとして構築されます。ビューを使用すると、 AUTO CDCプロセスで使用する前にデータを書き込む必要がないため、効率が向上します。

  • 最初のソース ビューは完全なスナップショットです ( full_orders_snapshot )
  • 2 番目は継続的な変更フィード ( rdbms_orders_change_feed ) です。

このガイドの例ではクラウド ストレージをソースとして使用していますが、ストリーミング テーブルでサポートされている任意のソースを使用できます。

full_orders_snapshot()

このステップにより、注文データの最初の完全なスナップショットを読み取るLakeFlow宣言型パイプライン ビューが作成されます。

次の Python の例:

  • Auto Loader ( format("cloudFiles") )でspark.readStreamを使用します
  • 右によって定義されたディレクトリからJSONファイルを読み込む: orders_snapshot_path
  • パス内にすでに存在する履歴データが確実に処理されるように、 includeExistingFilestrueに設定します。
  • スキーマを自動的に推測するには、 inferColumnTypestrueに設定します
  • 次の列をすべて返します .select("\*")
Python
@dp.view()
def full_orders_snapshot():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.includeExistingFiles", "true")
.option("cloudFiles.inferColumnTypes", "true")
.load(orders_snapshot_path)
.select("*")
)

rdbms_orders_change_feed()

このステップでは、増分変更データ (たとえば、CDC ログや変更テーブルから) を読み取る 2 番目のLakeFlow宣言型パイプラインのビューを作成します。orders_cdc_pathから読み取り、CDC スタイルの JSON ファイルが定期的にこのパスにドロップされることを前提としています。

Python
@dp.view()
def rdbms_orders_change_feed():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.includeExistingFiles", "true")
.option("cloudFiles.inferColumnTypes", "true")
.load(orders_cdc_path)

初回ハイドレーション(1回のみのフロー)

ソースが設定されたので、 AUTO CDCロジックは両方のソースをターゲット ストリーミング テーブルにマージします。 まず、 ONCE=TRUEで 1 回限りのAUTO CDCフローを使用して、 RDBMSテーブルの完全な内容をストリーミング テーブルにコピーします。 これにより、今後の更新で履歴データを再生することなく、ターゲット テーブルに履歴データが準備されます。

Python
from pyspark import pipelines as dp

# Step 1: Create the target streaming table

dp.create_streaming_table("rdbms_orders")

# Step 2: Once Flow — Load initial snapshot of full RDBMS table

dp.create_auto_cdc_flow(
flow_name = "initial_load_orders",
once = True, # one-time load
target = "rdbms_orders",
source = "full_orders_snapshot", # e.g., ingested from JDBC into bronze
keys = ["order_id"],
sequence_by = "timestamp",
stored_as_scd_type = "1"
)

onceフローは 1 回だけ実行されます。パイプラインの作成後にfull_orders_snapshotに追加された新しいファイルは無視されます。

important

rdbms_ordersストリーミング テーブルで完全な更新を実行し、 onceフローを再実行します。 クラウド ストレージ内の初期スナップショット データが削除されている場合、データが失われます。

連続変更フィード(変更フロー)

最初のスナップショットのロード後、別のAUTO CDCフローを使用して、RDBMS の CDC フィードからの変更を継続的に取り込みます。これにより、 rdbms_ordersテーブルは挿入、更新、削除によって最新の状態に保たれます。

Python
from pyspark import pipelines as dp

# Step 3: Change Flow — Ingest ongoing CDC stream from source system

dp.create_auto_cdc_flow(
flow_name = "orders_incremental_cdc",
target = "rdbms_orders",
source = "rdbms_orders_change_feed", # e.g., ingested from Kafka or Debezium
keys = ["order_id"],
sequence_by = "timestamp",
stored_as_scd_type = "1"
)

考慮事項

バックフィルの冪等性

onceフローは、ターゲット テーブルが完全に更新された場合にのみ再実行されます。

複数のフロー

複数の変更フローを使用して、修正、遅れて到着したデータ、または代替フィードなどをマージできますが、すべてでスキーマとキーを共有する必要があります。

フルリフレッシュ

rdbms_ordersストリーミング テーブルを完全に更新し、 onceフローを再実行します。 初期のクラウド ストレージの場所で初期のスナップショット データが削除されている場合、これによってデータが失われる可能性があります。

フロー実行順序

フローの実行順序は重要ではありません。最終結果は同じです。

その他のリソース