外部RDBMSテーブルをレプリケートする: AUTO CDC
このページでは、LakeFlow宣言型パイプラインのAUTO CDC
APIを使用して、外部リレーショナルデータベース管理システム (RDBMS) からDatabricksにテーブルをレプリケートする方法を説明します。学習内容:
- ソースを設定するための一般的なパターン。
once
フローを使用して、既存のデータの 1 回限りの完全コピーを実行する方法。change
フローを使用して新しい変更を継続的に取り込む方法。
このパターンは、緩やかに変化するディメンション ( SCD ) テーブルを構築したり、ターゲット テーブルを外部のレコード システムと同期させたりするのに最適です。
始める前に
このガイドでは、ソースから次のデータセットにアクセスできることを前提としています。
- クラウド ストレージ内のソース テーブルの完全なスナップショット。このデータセットは初期ロードに使用されます。
- 同じクラウド ストレージの場所に入力される継続的な変更フィード (たとえば、Debezium、Kafka、またはログベースの CDC を使用)。このフィードは進行中の
AUTO CDC
プロセスへの入力です。
ソースビューを設定する
まず、クラウド ストレージ パスorders_snapshot_path
からrdbms_orders
ターゲット テーブルにデータを入力する 2 つのソース ビューを定義します。どちらも、クラウド ストレージ内の生データに対するストリーミング ビューとして構築されます。ビューを使用すると、 AUTO CDC
プロセスで使用する前にデータを書き込む必要がないため、効率が向上します。
- 最初のソース ビューは完全なスナップショットです (
full_orders_snapshot
) - 2 番目は継続的な変更フィード (
rdbms_orders_change_feed
) です。
このガイドの例ではクラウド ストレージをソースとして使用していますが、ストリーミング テーブルでサポートされている任意のソースを使用できます。
full_orders_snapshot()
このステップにより、注文データの最初の完全なスナップショットを読み取るLakeFlow宣言型パイプライン ビューが作成されます。
- Python
- SQL
次の Python の例:
- Auto Loader (
format("cloudFiles")
)でspark.readStream
を使用します - 右によって定義されたディレクトリからJSONファイルを読み込む:
orders_snapshot_path
- パス内にすでに存在する履歴データが確実に処理されるように、
includeExistingFiles
をtrue
に設定します。 - スキーマを自動的に推測するには、
inferColumnTypes
をtrue
に設定します - 次の列をすべて返します
.select("\*")
@dp.view()
def full_orders_snapshot():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.includeExistingFiles", "true")
.option("cloudFiles.inferColumnTypes", "true")
.load(orders_snapshot_path)
.select("*")
)
次の SQL の例では、オプションを文字列のキーと値のペアのマップとして渡します。orders_snapshot_path
SQL変数として使用できる必要があります (たとえば、パイプライン問題を使用して定義されるか、手動で補間されます)。
CREATE OR REFRESH VIEW full_orders_snapshot
AS SELECT *
FROM STREAM read_files("${orders_snapshot_path}", "json", map(
"cloudFiles.includeExistingFiles", "true",
"cloudFiles.inferColumnTypes", "true"
));
rdbms_orders_change_feed()
このステップでは、増分変更データ (たとえば、CDC ログや変更テーブルから) を読み取る 2 番目のLakeFlow宣言型パイプラインのビューを作成します。orders_cdc_path
から読み取り、CDC スタイルの JSON ファイルが定期的にこのパスにドロップされることを前提としています。
- Python
- SQL
@dp.view()
def rdbms_orders_change_feed():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.includeExistingFiles", "true")
.option("cloudFiles.inferColumnTypes", "true")
.load(orders_cdc_path)
次の SQL の例では、 ${orders_cdc_path}
は変数であり、パイプライン設定で値を設定するか、コードで変数を明示的に設定することによって補間できます。
CREATE OR REFRESH VIEW rdbms_orders_change_feed
AS SELECT *
FROM STREAM read_files("${orders_cdc_path}", "json", map(
"cloudFiles.includeExistingFiles", "true",
"cloudFiles.inferColumnTypes", "true"
));
初回ハイドレーション(1回のみのフロー)
ソースが設定されたので、 AUTO CDC
ロジックは両方のソースをターゲット ストリーミング テーブルにマージします。 まず、 ONCE=TRUE
で 1 回限りのAUTO CDC
フローを使用して、 RDBMSテーブルの完全な内容をストリーミング テーブルにコピーします。 これにより、今後の更新で履歴データを再生することなく、ターゲット テーブルに履歴データが準備されます。
- Python
- SQL
from pyspark import pipelines as dp
# Step 1: Create the target streaming table
dp.create_streaming_table("rdbms_orders")
# Step 2: Once Flow — Load initial snapshot of full RDBMS table
dp.create_auto_cdc_flow(
flow_name = "initial_load_orders",
once = True, # one-time load
target = "rdbms_orders",
source = "full_orders_snapshot", # e.g., ingested from JDBC into bronze
keys = ["order_id"],
sequence_by = "timestamp",
stored_as_scd_type = "1"
)
-- Step 1: Create the target streaming table
CREATE OR REFRESH STREAMING TABLE rdbms_orders;
-- Step 2: Once Flow for initial snapshot
CREATE FLOW rdbms_orders_hydrate
AS AUTO CDC ONCE INTO rdbms_orders
FROM stream(full_orders_snapshot)
KEYS (order_id)
SEQUENCE BY timestamp
STORED AS SCD TYPE 1;
once
フローは 1 回だけ実行されます。パイプラインの作成後にfull_orders_snapshot
に追加された新しいファイルは無視されます。
rdbms_orders
ストリーミング テーブルで完全な更新を実行し、 once
フローを再実行します。 クラウド ストレージ内の初期スナップショット データが削除されている場合、データが失われます。
連続変更フィード(変更フロー)
最初のスナップショットのロード後、別のAUTO CDC
フローを使用して、RDBMS の CDC フィードからの変更を継続的に取り込みます。これにより、 rdbms_orders
テーブルは挿入、更新、削除によって最新の状態に保たれます。
- Python
- SQL
from pyspark import pipelines as dp
# Step 3: Change Flow — Ingest ongoing CDC stream from source system
dp.create_auto_cdc_flow(
flow_name = "orders_incremental_cdc",
target = "rdbms_orders",
source = "rdbms_orders_change_feed", # e.g., ingested from Kafka or Debezium
keys = ["order_id"],
sequence_by = "timestamp",
stored_as_scd_type = "1"
)
-- Step 3: Continuous CDC ingestion
CREATE FLOW rdbms_orders_continuous
AS AUTO CDC INTO rdbms_orders
FROM stream(rdbms_orders_change_feed)
KEYS (order_id)
SEQUENCE BY timestamp
STORED AS SCD TYPE 1;
考慮事項
バックフィルの冪等性 |
|
---|---|
複数のフロー | 複数の変更フローを使用して、修正、遅れて到着したデータ、または代替フィードなどをマージできますが、すべてでスキーマとキーを共有する必要があります。 |
フルリフレッシュ |
|
フロー実行順序 | フローの実行順序は重要ではありません。最終結果は同じです。 |
その他のリソース
- Lakeflowコネクトにおけるフルマネージド SQL Server コネクタ