メインコンテンツまでスキップ

Postgres の変更をレイクハウスに保存する

注記

Lakebase変更データフィードはパブリック プレビュー段階です。

Postgres テーブルにLakebase変更データフィード (CDF) を設定し、宛先のDeltaテーブルに行レベルの変更が現れるのを監視します。

ステップ:変更キャプチャを有効にする→ ②フィードを開始する→ ③行をたどってレイクハウスに入る→ ④行を変更し、流れを確認する

注記

これはクイックスタートガイドです。完全なドキュメントについては、 Lakebaseデータフィードを参照してください。

始める前に

  • 「Postgresデータベースを取得する」を完了したことを確認してください。playing_with_lakebaseサンプルテーブルを含むLakebaseプロジェクトが必要です。
  • あなたにCREATE TABLE権限しかないUnity Catalogカタログとスキーマ。

ステップ 1: 変更キャプチャを有効にする

CDFが機能するためには、Postgresではライトアヘッドログに完全な行データが必要です。レプリカIDを「フル」に設定すると、Postgresは変更ごとに古い行の状態と新しい行の状態の両方を記録するようになります。

Lakebase SQL Editorで、以下を実行します。

SQL
ALTER TABLE playing_with_lakebase REPLICA IDENTITY FULL;

詳細はこちら:スキーマ内のすべてのテーブルにレプリカIDを設定し、新しいテーブルに自動的に適用する

ステップ 2: フィードを開始する

Lakebase CDFはスキーマレベルで設定されます。ソーススキーマ内の現在および将来のすべてのテーブルが自動的に含まれるため、個々のテーブルを選択する必要はありません。

本番運用ブランチから、 「データフィード変更」 タブを開き、 「開始」 をクリックします。 スキーマとしてpublicを選択し、次に宛先のUnity Catalogカタログとスキーマを選択します。 最初のスナップショットがすぐに開始され、 lb_playing_with_lakebase_history宛先にDeltaテーブルとして表示されます。

ソースと宛先を選択してダイアログを開始します。

詳細:チェンジデータフィードを開始する

ステップ 3: 列をたどってレイクハウスに入ります

Lakebaseから行を選択してください。 行id=2を見てください。

SQL
SELECT * FROM playing_with_lakebase WHERE id = 2;

次に、 Delta履歴テーブルで同じ行を探します。 Databricks SQLウェアハウスまたはノートブックに切り替えて実行します。

SQL
SELECT * FROM <catalog>.<schema>.lb_playing_with_lakebase_history
WHERE id = 2;

<catalog><schema> 、 Lakebase 2で選択した目的地に置き換えてください。Lakebaseと同じnamevalueに加えて、追加の列を含む行id=2が表示されます。 最初のスナップショットでは、既存のすべての行がDeltaにinsertイベントとして書き込まれました。これは、その行が表すものです。

これらの追加列は、各行がどのような種類のイベントを表しているか ( _pg_change_type )、いつ発生したか ( _timestamp )、および Postgres の順序情報 ( _pg_lsn_pg_xid ) を説明しています。

詳細はこちら:宛先テーブルスキーマ|データ型マッピング

ステップ 4: 行を変更して、流れを確認してください

Lakebase SQLエディタに戻り、行id=2を更新します。

SQL
UPDATE playing_with_lakebase SET value = 55.5 WHERE id = 2;

変更がフィードに反映されるまで数秒待ってから、履歴テーブルを再照会してください。

SQL
SELECT id, value, _pg_change_type, _timestamp
FROM <catalog>.<schema>.lb_playing_with_lakebase_history
WHERE id = 2
ORDER BY _pg_lsn DESC;

Delta履歴テーブルには、id=2 の 3 行(update_preimage、update_postimage、insert)が表示されています。

id=2が3回出現します。元のinsert 、古い値を持つupdate_preimage 、そして新しい値を持つupdate_postimageです。行への変更はすべて新しい履歴行として記録されるため、常に完全な監査証跡が残ります。削除も同様に機能し、 _pg_change_type = 'delete'を1行追加します。

詳細はこちら:一般的な変更パターン下流パイプラインの構築

次のステップ