メインコンテンツまでスキップ

レイクハウスシンク

備考

Lakebase オートスケールは次のリージョンで利用できます: us-east-1us-east-2us-west-2ca-central-1sa-east-1eu-central-1eu-west-1eu-west-2ap-south-1ap-southeast-1ap-southeast-2

Lakebase オートスケールは、オートスケール コンピュート、ゼロへのスケール、分岐、即時復元を備えた Lakebase の最新バージョンです。 Lakebase プロビジョニング ユーザーの場合は、 「Lakebase プロビジョニング」を参照してください。

注記

レイクハウス Sync はベータ版です。

レイクハウスシンクとは?

レイクハウス Sync は、行レベルの変更をキャプチャし、それをSCDタイプ 2 履歴として書き込むことで、Lakebase Postgres テーブルからUnity Catalogで管理されるDeltaテーブルへの継続的かつ低レイテンシのレプリケーションを可能にします。 各変更は新しい行として追加されるため、時間の経過に伴う行の変更の完全な履歴が保持されます。この同期には、外部コンピュート、パイプライン、ジョブは必要ありません。 これは Lakebase のネイティブ機能です。

レイクハウス アプリケーションから Lakebase を介してUnity Catalogのレイクハウス テーブルにデータ フローを同期します。

レイクハウス Sync は、変更データキャプチャ ( CDC ) を使用して、Lakebase Postgres データベースからUnity Catalogに変更をストリーミングします。 Deltaテーブルは、選択したカタログとスキーマの形式lb_<table_name>_historyに従って名前が付けられます。 各変更 (挿入、更新、削除) は行として追加されるため、時間の経過とともにデータがどのように変化したかの完全な履歴が保持されます。

使用例

以下は、Lakebase トランザクション データベースからレイクハウスにデータをストリーミング変更する、レイクハウス Sync の使用例です。

ユースケース

説明

高速分析

Lakebase データの実行集計と分析。

メダリオンのソース

Lakebase をメダリオンアーキテクチャのソースとして使用します。 Deltaテーブルは、 Databricksパイプライン、 Spark宣言型パイプライン (SDP) 、またはDelta Live Tables ( DLT )を使用して処理して、ダウンストリーム テーブルを構築できます。

レイクハウスの全歴史

オプションでデータのサブセットのみを Lakebase に保持しながら、レイクハウス内のすべての変更の完全な履歴を保存します。

要件

  • Lakebase オートスケール: Postgres 17 を実行するLakebase オートスケール プロジェクト
  • ソース データベース: テーブルは Lakebase のdatabricks_postgresデータベースに存在する必要があります (ベータ版の制限)。各プロジェクトはこのデフォルトのデータベースを使用して作成されます。
  • データ型: テーブルでは、サポートされている列データ型のみを使用する必要があります。サポートされていないタイプの場合、そのテーブルの同期は失敗します。
  • Unity Catalog : 同期を構成する ID には、宛先カタログとスキーマに対する USE CATALOGUSE SCHEMA 、および CREATE TABLE 必要です。 「オブジェクトに対する権限の付与」を参照してください。
  • Lakebase プロジェクト: Postgres ロールには、同期元の Lakebase プロジェクトに対する CAN MANAGE 権限が必要です。自分の ID が Lakebase プロジェクトを所有している場合、デフォルトで CAN MANAGE 権限が付与されます。「プロジェクト権限の管理」を参照してください。

開始するには、同期するテーブルにレプリカ ID を完全に設定し (ステップ 1)、Lakebase アプリで同期を開始します (ステップ 2)。 データは、選択した Unity Catalog カタログとスキーマ内のlb_<table_name>_historyテーブルとして表示されます。開始する前にレイクハウス Sync の仕組みについて詳しく知りたい場合は、 「レイクハウス Sync の仕組み」を参照してください。

同期の設定

ステップ 1: レプリカ ID を完全に設定する

Lakebase テーブルを正常に同期するには、レプリカ ID を full に設定する必要があります。空のスキーマまたは既にテーブルが含まれているスキーマで同期を構成できます。パーティション化されたテーブルはサポートされていません。

デフォルトでは、Postgres は行が更新または削除されたときに主キーのみをログに記録します。REPLICA IDENTITY FULLを設定すると、Postgres は先行書き込みログに行の前後の完全な状態を記録します。これは、レイクハウス Sync が完全な更新履歴を構築できるようにするために必要です。

次のコマンドは、Lakebase SQL エディターまたは任意の Postgres クライアントから実行できます。この例では、Lakebase SQL エディターを使用します。開くには、 Databricksワークスペースで、アプリ スイッチャー (右上) から Lakebase Postgres を 開き、プロジェクトと同期するブランチ (たとえば、 本番運用 または main ) を選択して、サイドバーから SQLエディターを 選択し、ブランチとデータベースを選択します。 詳細については、 Lakebase SQL エディターからのクエリを参照してください。

単一のテーブルでレプリカ ID を完全に設定するには、次を実行します。

SQL
ALTER TABLE <table_name> REPLICA IDENTITY FULL;

<table_name>テーブル名に置き換えます。

  • 既存のテーブル: 同期を開始する前に、スキーマ内のすべての既存のテーブルに対してこれを実行します。
  • 新しいテーブル: 同期が構成された後に作成されたテーブルの場合は、データを挿入する前にこれを実行します。このプロパティが設定される前に挿入された行は同期されません。

レプリカIDが設定されているテーブルを確認する

どのテーブルにレプリカ ID が設定されているのか (そしてどれがfullのか) を確認するには、Lakebase で次のコマンドを実行します。

SQL
SELECT n.nspname AS table_schema,
c.relname AS table_name,
CASE c.relreplident
WHEN 'd' THEN 'default'
WHEN 'n' THEN 'nothing'
WHEN 'f' THEN 'full'
WHEN 'i' THEN 'index'
END AS replica_identity
FROM pg_class c
JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE c.relkind = 'r'
AND n.nspname = 'public'
ORDER BY n.nspname, c.relname;

異なる場合は、 n.nspname = 'public'スキーマ名に変更します。replica_identity = fullの行のみが同期の準備ができています。

ステップ 2: レイクハウス同期を開始する

レイクハウス Sync はスキーマ レベルで構成されます。 一度設定すると、そのスキーマ内の現在のテーブルと将来のテーブルはすべて Unity Catalog に同期されます。

  1. Databricks ワークスペースで、アプリ スイッチャー (右上) から Lakebase Postgres を開きます。

  2. Lakebase プロジェクトと同期するブランチ (たとえば、 本番運用 または main ) を選択します。

  3. ブランチの概要 を開き、 レイクハウスの同期 タブをクリックします。

  4. [同期を開始] をクリックします。

  5. 構成ダイアログで:

    • データベース: デフォルトはdatabricks_postgresです。
    • スキーマ: 同期するソース Postgres スキーマを選択します。
    • カタログへ: 宛先の Unity Catalog カタログを選択します。
    • スキーマ: 宛先のUnity Catalogスキーマを選択します。
  6. データの同期を開始するには、 「同期の開始」 をクリックします。

同期の開始とスキーマ構成を示すレイクハウスの同期タブを含むブランチの概要

テーブルは、選択した Unity Catalog カタログとスキーマにlb_<table_name>_historyとして表示されます。レイクハウスで、サイドバーの [カタログ] を開き、目的のカタログとスキーマに移動して、スキーマの [概要] にある [ テーブル] タブを開いてDeltaテーブルを表示します。 Lakebase の レイクハウス sync タブでは、ステータスを確認し、何が同期しているかを検査できます。

レイクハウスの同期タブに表示される内容

同期が有効になっている場合、タブの上部に 「ステータス: 同期中」 と表示され、変更がキャプチャされて Delta テーブルに同期されていることが示されます。

サブタブにはマッピングとテーブルごとの進行状況が表示されます

2 つのサブタブにマッピングとテーブルごとの進行状況が表示されます。

  • スキーマ: 各ソース スキーマとその宛先カタログおよびUnity Catalog内のスキーマを、そのスキーマの ステータス ( 同期中 など) とともに一覧表示します。
  • テーブル: 各ソース テーブル、 Unity Catalog内のその宛先lb_<table_name>_historyテーブル、 ステータス ( 同期中 または スナップショット中 )、 コミット済み LSN (同期がDeltaにどの程度書き込まれたか。テーブルがまだ初期状態の間は-と表示されます)、および 最終同期 (テーブルが最後に同期された日時) を一覧表示します。

Lakebase SQL エディター(または任意の Postgres クライアント) からSELECT * FROM wal2delta.tables;実行して、同期ステータスを検査することもできます。結果には、テーブルごとにtable_oidstatus (たとえば、 STREAMINGまたはSNAPSHOTTING )、 committed_lsn 、およびlast_write_timeが含まれます。

備考

wal2deltaとは何ですか? レイクハウス Sync は、Lakebase コンピュート内で実行される wal2delta Postgres 拡張機能を利用しています。 論理デコードを使用して、先行書き込みログ (WAL) の変更をキャプチャし、Unity Catalog の Delta テーブルに書き込みます。

同期を無効にする

同期を無効にすると、同期していたすべての Lakebase スキーマのレプリケーションが停止します。

  1. Databricks ワークスペースで、アプリ スイッチャー (右上) から Lakebase Postgres を開きます。
  2. Lakebase プロジェクトと、同期を構成したブランチ (例: 本番運用 または main ) を選択します。
  3. ブランチの概要 を開き、 レイクハウスの同期 タブをクリックします。
  4. 同期を無効にするを クリックします。確認ダイアログで、変更によって Delta テーブルへの同期が停止されるという警告を確認し、もう一度 [無効にする] をクリックして確認します。

同期を無効にしてもコンピュートは再起動されません。

警告

後で同期を再度有効にした場合、システムは完全な再スナップショットを実行しません。同期が無効になっている間に発生した変更は、宛先 Delta テーブルから永久に失われます。

レイクハウス Sync の仕組み

同期では、Lakebase 内のデータの現在の状態をミラーリングするために宛先の Unity Catalog テーブルを上書きするのではなく、変更イベントごとに新しい行を追加します。これにより、時間の経過とともにデータがどのように変化したかを示す完全かつ不変のログが提供されます。

  • 宛先の命名: DeltaテーブルはUnity Catalogに作成され、命名パターンlb_<table_name>_historyが使用されます。 これらは UC 管理の Delta テーブルです。
  • スキーマ レベルの同期: Lakebase スキーマの同期を構成すると、そのスキーマ内の現在のテーブルと将来のテーブルがすべて同期されます。空のテーブルは同期されません。同期に表示されるためには、Lakebase テーブルに少なくとも 1 つの行が必要です。
  • 削除されたテーブル: Lakebase でテーブルを削除した場合、Unity Catalog 内の宛先 Delta テーブルは保持されます (削除されません)。

同期ステータスは、ブランチ概要の レイクハウス sync タブで監視するか、Lakebase でSELECT * FROM wal2delta.tables;実行することで監視できます。

宛先Deltaテーブルスキーマ

同期により、データ列に加えて、次のシステム列が Unity Catalog 内の各宛先 Delta テーブルに追加されます。

Type

説明

_change_type

TEXT

操作タイプ: insertdeleteupdate_preimage 、またはupdate_postimage

_timestamp

TIMESTAMP

Postgres でのトランザクションコミット時間 (タイムゾーンなし)。

_lsn

BIGINT

Postgres ログ シーケンス番号。

_xid

Integer

Postgres トランザクション ID。

一般的な変化のパターン

これらのパターンは、Unity Catalog の宛先 Delta テーブルに表示されます。

  • 初期ロード: 既存の Lakebase テーブルで初めて同期が実行されると、既存の各行に_change_type = insertが書き込まれます。
  • 更新: 更新により 2 つの行が生成されます。1 つは_change_type = update_preimage (古い行)、もう 1 つは_change_type = update_postimage (新しい行) です。
  • 削除: 削除により、 _change_type = deleteの行が 1 つ生成されます。

制限事項とトラブルシューティング

テーブルのステータス (どのテーブルがスナップショットしているか、スキップされているか、ストリーミングしているか) は 、レイクハウスの sync タブで、または Lakebase で実行することで確認できます。

SQL
SELECT * FROM wal2delta.tables;

テーブルが同期されない一般的な理由:

  • REPLICA IDENTITY FULL が設定されていません: 各テーブルに対してALTER TABLE <table_name> REPLICA IDENTITY FULL;を実行したことを確認してください。
  • パーティション テーブル: Lakebase パーティション テーブルはサポートされていません。パーティション化されたテーブルを含むスキーマを同期すると、それらのテーブルの同期が失敗します。
  • 名前の競合: Deltaテーブルの名前は、ソース Postgres スキーマ プレフィックスなしでlb_<table_name>_historyになっています。 2 つの異なる Postgres スキーマ (たとえば、 sales.usersmarketing.users ) を同じ Unity Catalog スキーマに同期すると、最初のテーブルは同期されますが、2 番目のテーブルは名前の競合により失敗します。テーブル名を共有する Postgres スキーマを異なる Unity Catalog スキーマにマップします。
  • サポートされていないデータ型: テーブルにサポートされていない型がある場合、その Delta テーブルの作成は失敗し、テーブルは同期されません。データ型のマッピングを参照してください。

データ型のマッピング

同期は、ほとんどの標準的な PostgreSQL プリミティブ型をサポートします。サポートされていない型の場合、そのテーブルでの Delta テーブルの作成は失敗します。

PostgreSQL型

Databricks Delta型

ブール値

ブール値

INT、SMALLINT、BIGINT

INT、SMALLINT、BIGINT

TEXT 、VARCHAR、CHAR

STRING

JSONB

STRING

JSON 文字列として保存されます。

enum

STRING

列挙ラベルとして保存されます。

数値 / 小数点

DECIMALタイプ

ソースの精度/スケールを使用します。定義されていない場合はデフォルトでDECIMAL(38, 18)になります。

DATE

DATE

TIMESTAMP

タイムスタンプ_NTZ

タイムスタンプ

TIMESTAMP

浮動小数点数、倍精度浮動小数点数

浮動小数点数、倍精度浮動小数点数

サポートされていないタイプ:

  • 地理/ジオメトリ (PostGIS): PostGIS 拡張機能のタイプ (例: geometrygeography )。
  • ベクター (pgvector): pgvector 拡張機能のvector型。
  • 複合/構造体型: CREATE TYPE ... AS (field_name type, ...)で定義されたカスタム型。これらは、名前付きフィールド (構造体と呼ばれることもある) を持つ行のような型です。複合型の列は同期できません。
  • マップ: hstore などのマップのようなキー値型 ( hstore拡張機能から)。Postgres には組み込みのマップ タイプはありません。 hstoreは、列にキーと値のペアを格納する通常の方法です。

同期をブロックする可能性のある列を見つける

同期をブロックする可能性のある列を見つけるには、Lakebase で次のクエリを実行します。上記のサポートされているセットに 含まれない 型を持つすべての列をリストします (列挙型を含む)。このような列が 1 つ以上あるテーブルは同期されません。

SQL
SELECT c.table_schema, c.table_name, c.column_name, c.udt_name AS data_type
FROM information_schema.columns c
JOIN pg_catalog.pg_type t ON t.typname = c.udt_name
WHERE c.table_schema = 'public'
AND c.table_name IN (SELECT tablename FROM pg_tables WHERE schemaname = c.table_schema)
AND NOT (
c.udt_name IN ('bool', 'int2', 'int4', 'int8', 'text', 'varchar', 'bpchar', 'jsonb', 'numeric', 'date', 'timestamp', 'timestamptz', 'real', 'float4', 'float8')
OR t.typcategory = 'E'
)
ORDER BY c.table_schema, c.table_name, c.ordinal_position;

スキーマ変更の管理

サポートされています: Postgres 内のテーブルの名前を変更すると (たとえば、 ALTER TABLE users RENAME TO customers )、同期が続行されます。宛先 Delta テーブル名は変更されません。残りはlb_users_historyです。

注意: 列の追加、列の削除、または列のデータ型の変更は、宛先に自動的には適用されません。このような変更により、そのテーブルの同期で新しいイベントの受信が停止されます。

回避策: 同期を中断せずにスキーマの変更を安全に適用するには、次のステップを使用して新しいテーブルを作成し、それを所定の位置に入れ替えます。 これにより、同期がトリガーされ、新しいテーブルが認識され、新しい履歴が開始されます。

  1. 更新されたスキーマを使用して新しいテーブルを作成します。

    SQL
    CREATE TABLE users_v2 (
    id INT PRIMARY KEY,
    name TEXT,
    new_column TEXT -- The new schema change
    );
  2. レプリカ ID を完全に設定します。

    SQL
    ALTER TABLE users_v2 REPLICA IDENTITY FULL;
  3. 古いテーブルからデータをバックフィルします。

    SQL
    INSERT INTO users_v2 SELECT *, NULL FROM users;
  4. テーブル名を交換します。

    SQL
    BEGIN;
    ALTER TABLE users RENAME TO users_backup;
    ALTER TABLE users_v2 RENAME TO users;
    COMMIT;
注記

このアプローチでは、前のテーブルの履歴が失われます。新しいテーブルは同期履歴を最初から開始します。

重複排除して現在の状態のミラーを作成する

完全な履歴ではなく、Lakebase テーブルの現在の状態 (ミラー) が必要な分析の場合、ウィンドウ関数を使用してDatabricks SQL (たとえば、 レイクハウスSQLエディター やSQLウェアハウスに接続されたノートブック) で重複排除できます。

SQL
SELECT
*
FROM
(
SELECT
*,
ROW_NUMBER() OVER (PARTITION BY id ORDER BY _lsn DESC) AS rn
FROM
`<catalog>.<schema>.lb_<table_name>_history`
WHERE
_change_type IN ('insert', 'update_postimage', 'delete')
)
WHERE
rn = 1
AND _change_type != 'delete';

idテーブルの主キー列に置き換えます。クエリはそのキーでパーティション分割し、 _lsnで順序付けして最新のイベントを取得し、最新のイベントが削除であった行を除外しながら、キーごとに最新の行のみを保持します。

次のステップ

目的に応じて、 lb_<table_name>_history Delta テーブルを他の Databricks 機能と組み合わせて使用します。