メインコンテンツまでスキップ

外部RDBMSテーブルをレプリケートする: AUTO CDC

このページでは、LakeFlow宣言型パイプラインのAUTO CDC APIを使用して、外部リレーショナルデータベース管理システム (RDBMS) からDatabricksにテーブルをレプリケートする方法を説明します。学習内容:

  • ソースを設定するための一般的なパターン。
  • once フローを使用して、既存のデータの 1 回限りのフル コピーを実行する方法。
  • changeフローを使用して新しい変更を継続的に取り込む方法。

このパターンは、ゆっくりと変化するディメンション (SCD) テーブルを構築したり、ターゲット テーブルを外部レコード システムと同期させたりする場合に最適です。

始める前に

このガイドでは、ソースから次のデータセットにアクセスできることを前提としています。

  • クラウドストレージ内のソーステーブルの完全なスナップショット。このデータセットは、初期読み込みに使用されます。
  • 同じクラウドストレージの場所に入力される継続的な変更フィード (Debezium、Kafka、またはログベースの CDC など)。このフィードは、進行中の AUTO CDC プロセスの入力です。

ソースビューを設定する

まず、2 つのソースビューを定義して、クラウドストレージパスorders_snapshot_pathからrdbms_ordersターゲットテーブルを設定します。どちらも、クラウドストレージ内の生データに対するストリーミングビューとして構築されています。ビューを使用すると、 AUTO CDC プロセスで使用する前にデータを書き込む必要がないため、効率が向上します。

  • 最初のソースビューは完全なスナップショット(full_orders_snapshot) です。
  • 2 つ目は、継続的な変更フィード (rdbms_orders_change_feed) です。

このガイドの例では、クラウドストレージをソースとして使用していますが、ストリーミングテーブルでサポートされている任意のソースを使用できます。

full_orders_snapshot()

このステップでは、注文データの初期完全なスナップショットを読み取る LakeFlow 宣言型パイプラインビューを作成します。

次の Python の例:

  • Auto Loaderでspark.readStreamを使用 (format("cloudFiles"))
  • 右によって定義されたディレクトリからJSONファイルを読み込む: orders_snapshot_path
  • パスにすでに存在するヒストリカルデータが処理されるように、 includeExistingFilestrue に設定します
  • スキーマを自動的に推測するために inferColumnTypestrue に設定します。
  • すべての列を .select("\*")
Python
@dlt.view()
def full_orders_snapshot():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.includeExistingFiles", "true")
.option("cloudFiles.inferColumnTypes", "true")
.load(orders_snapshot_path)
.select("*")
)

rdbms_orders_change_feed()

このステップでは、増分変更データ (たとえば、CDC ログや変更テーブルから) を読み取る 2 番目のLakeFlow宣言型パイプラインのビューを作成します。orders_cdc_pathから読み取り、CDC スタイルの JSON ファイルが定期的にこのパスにドロップされることを前提としています。

Python
@dlt.view()
def rdbms_orders_change_feed():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.includeExistingFiles", "true")
.option("cloudFiles.inferColumnTypes", "true")
.load(orders_cdc_path)

初回ハイドレーション(1回のみのフロー)

ソースが設定されたので、 AUTO CDC ロジックによって両方のソースがターゲットストリーミングテーブルにマージされます。まず、ONCE=TRUE で 1 回限りのAUTO CDCフローを使用して、RDBMS テーブルの完全な内容をストリーミングテーブルにコピーします。これにより、ヒストリカルデータを使用してターゲットテーブルが準備され、今後の更新で再生されません。

Python
import dlt

# Step 1: Create the target streaming table

dlt.create_streaming_table("rdbms_orders")

# Step 2: Once Flow — Load initial snapshot of full RDBMS table

dlt.create_auto_cdc_flow(
flow_name = "initial_load_orders",
once = True, # one-time load
target = "rdbms_orders",
source = "full_orders_snapshot", # e.g., ingested from JDBC into bronze
keys = ["order_id"],
sequence_by = "timestamp",
stored_as_scd_type = "1"
)

onceフローは 1 回だけ実行されます。パイプラインの作成後に full_orders_snapshot に追加された新しいファイルは無視されます。

important

ストリーミングテーブル rdbms_orders で完全な更新を実行すると、 once フローが再実行されます。 クラウドストレージ内の初期スナップショットデータが削除されている場合、データが失われます。

継続的な変更フィード (変更フロー)

最初のスナップショットの読み込み後、別の AUTO CDC フローを使用して、RDBMS の CDC フィードから変更を継続的に取り込みます。これにより、 rdbms_orders テーブルが挿入、更新、削除で最新の状態に保たれます。

Python
import dlt

# Step 3: Change Flow — Ingest ongoing CDC stream from source system

dlt.create_auto_cdc_flow(
flow_name = "orders_incremental_cdc",
target = "rdbms_orders",
source = "rdbms_orders_change_feed", # e.g., ingested from Kafka or Debezium
keys = ["order_id"],
sequence_by = "timestamp",
stored_as_scd_type = "1"
)

考慮 事項

バックフィルのべき等性

onceフローは、ターゲット表が完全に更新されたときにのみ再実行されます。

複数のフロー

複数の変更フローを使用して、修正、到着が遅れたデータ、または代替フィードをマージできますが、すべてスキーマとキーを共有する必要があります。

フルリフレッシュ

rdbms_ordersストリーミングテーブルで完全に更新すると、onceフローが再実行されます。これにより、最初のクラウドストレージの場所で最初のスナップショットデータがプルーニングされた場合、データが失われる可能性があります。

フロー実行順序

フローの実行順序は関係ありません。最終結果は同じです。

その他のリソース