外部RDBMSテーブルをレプリケートする: AUTO CDC
このページでは、LakeFlow宣言型パイプラインのAUTO CDC
APIを使用して、外部リレーショナルデータベース管理システム (RDBMS) からDatabricksにテーブルをレプリケートする方法を説明します。学習内容:
- ソースを設定するための一般的なパターン。
once
フローを使用して、既存のデータの 1 回限りのフル コピーを実行する方法。change
フローを使用して新しい変更を継続的に取り込む方法。
このパターンは、ゆっくりと変化するディメンション (SCD) テーブルを構築したり、ターゲット テーブルを外部レコード システムと同期させたりする場合に最適です。
始める前に
このガイドでは、ソースから次のデータセットにアクセスできることを前提としています。
- クラウドストレージ内のソーステーブルの完全なスナップショット。このデータセットは、初期読み込みに使用されます。
- 同じクラウドストレージの場所に入力される継続的な変更フィード (Debezium、Kafka、またはログベースの CDC など)。このフィードは、進行中の
AUTO CDC
プロセスの入力です。
ソースビューを設定する
まず、2 つのソースビューを定義して、クラウドストレージパスorders_snapshot_path
からrdbms_orders
ターゲットテーブルを設定します。どちらも、クラウドストレージ内の生データに対するストリーミングビューとして構築されています。ビューを使用すると、 AUTO CDC
プロセスで使用する前にデータを書き込む必要がないため、効率が向上します。
- 最初のソースビューは完全なスナップショット(
full_orders_snapshot
) です。 - 2 つ目は、継続的な変更フィード (
rdbms_orders_change_feed
) です。
このガイドの例では、クラウドストレージをソースとして使用していますが、ストリーミングテーブルでサポートされている任意のソースを使用できます。
full_orders_snapshot()
このステップでは、注文データの初期完全なスナップショットを読み取る LakeFlow 宣言型パイプラインビューを作成します。
- Python
- SQL
次の Python の例:
- Auto Loaderで
spark.readStream
を使用 (format("cloudFiles")
) - 右によって定義されたディレクトリからJSONファイルを読み込む:
orders_snapshot_path
- パスにすでに存在するヒストリカルデータが処理されるように、
includeExistingFiles
をtrue
に設定します - スキーマを自動的に推測するために
inferColumnTypes
をtrue
に設定します。 - すべての列を
.select("\*")
@dlt.view()
def full_orders_snapshot():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.includeExistingFiles", "true")
.option("cloudFiles.inferColumnTypes", "true")
.load(orders_snapshot_path)
.select("*")
)
次のSQL例では、文字列キーと値のペアのマップとしてオプションを渡します。orders_snapshot_path
は SQL 変数として使用できる必要があります (たとえば、パイプライン パラメーターを使用して定義したり、手動で補間したりします)。
CREATE OR REFRESH VIEW full_orders_snapshot
AS SELECT *
FROM STREAM read_files("${orders_snapshot_path}", "json", map(
"cloudFiles.includeExistingFiles", "true",
"cloudFiles.inferColumnTypes", "true"
));
rdbms_orders_change_feed()
このステップでは、増分変更データ (たとえば、CDC ログや変更テーブルから) を読み取る 2 番目のLakeFlow宣言型パイプラインのビューを作成します。orders_cdc_path
から読み取り、CDC スタイルの JSON ファイルが定期的にこのパスにドロップされることを前提としています。
- Python
- SQL
@dlt.view()
def rdbms_orders_change_feed():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.includeExistingFiles", "true")
.option("cloudFiles.inferColumnTypes", "true")
.load(orders_cdc_path)
次の SQL の例では、 ${orders_cdc_path}
は変数であり、パイプライン設定で値を設定するか、コードで変数を明示的に設定することで補間できます。
CREATE OR REFRESH VIEW rdbms_orders_change_feed
AS SELECT *
FROM STREAM read_files("${orders_cdc_path}", "json", map(
"cloudFiles.includeExistingFiles", "true",
"cloudFiles.inferColumnTypes", "true"
));
初回ハイドレーション(1回のみのフロー)
ソースが設定されたので、 AUTO CDC
ロジックによって両方のソースがターゲットストリーミングテーブルにマージされます。まず、ONCE=TRUE
で 1 回限りのAUTO CDC
フローを使用して、RDBMS テーブルの完全な内容をストリーミングテーブルにコピーします。これにより、ヒストリカルデータを使用してターゲットテーブルが準備され、今後の更新で再生されません。
- Python
- SQL
import dlt
# Step 1: Create the target streaming table
dlt.create_streaming_table("rdbms_orders")
# Step 2: Once Flow — Load initial snapshot of full RDBMS table
dlt.create_auto_cdc_flow(
flow_name = "initial_load_orders",
once = True, # one-time load
target = "rdbms_orders",
source = "full_orders_snapshot", # e.g., ingested from JDBC into bronze
keys = ["order_id"],
sequence_by = "timestamp",
stored_as_scd_type = "1"
)
-- Step 1: Create the target streaming table
CREATE OR REFRESH STREAMING TABLE rdbms_orders;
-- Step 2: Once Flow for initial snapshot
CREATE FLOW rdbms_orders_hydrate
AS AUTO CDC ONCE INTO rdbms_orders
FROM stream(full_orders_snapshot)
KEYS (order_id)
SEQUENCE BY timestamp
STORED AS SCD TYPE 1;
once
フローは 1 回だけ実行されます。パイプラインの作成後に full_orders_snapshot
に追加された新しいファイルは無視されます。
ストリーミングテーブル rdbms_orders
で完全な更新を実行すると、 once
フローが再実行されます。 クラウドストレージ内の初期スナップショットデータが削除されている場合、データが失われます。
継続的な変更フィード (変更フロー)
最初のスナップショットの読み込み後、別の AUTO CDC
フローを使用して、RDBMS の CDC フィードから変更を継続的に取り込みます。これにより、 rdbms_orders
テーブルが挿入、更新、削除で最新の状態に保たれます。
- Python
- SQL
import dlt
# Step 3: Change Flow — Ingest ongoing CDC stream from source system
dlt.create_auto_cdc_flow(
flow_name = "orders_incremental_cdc",
target = "rdbms_orders",
source = "rdbms_orders_change_feed", # e.g., ingested from Kafka or Debezium
keys = ["order_id"],
sequence_by = "timestamp",
stored_as_scd_type = "1"
)
-- Step 3: Continuous CDC ingestion
CREATE FLOW rdbms_orders_continuous
AS AUTO CDC INTO rdbms_orders
FROM stream(rdbms_orders_change_feed)
KEYS (order_id)
SEQUENCE BY timestamp
STORED AS SCD TYPE 1;
考慮 事項
バックフィルのべき等性 |
|
---|---|
複数のフロー | 複数の変更フローを使用して、修正、到着が遅れたデータ、または代替フィードをマージできますが、すべてスキーマとキーを共有する必要があります。 |
フルリフレッシュ |
|
フロー実行順序 | フローの実行順序は関係ありません。最終結果は同じです。 |
その他のリソース
- Lakeflowコネクトにおけるフルマネージド SQL Server コネクタ