Lakebase によるリバース ETL
ベータ版
Lakebase Postgres (オートスケール Beta) は、 Lakebase の次のバージョンであり、評価のみに利用できます。 本番運用ワークロードの場合は、 Lakebase Public Previewを使用します。 どのバージョンが適しているかを判断するには、バージョンの選択を参照してください。
Lakebase のリバース ETL は、Unity Catalog テーブルを Postgres に同期し、アプリケーションがキュレーションされたレイクハウスデータを直接使用できるようにします。レイクハウスは分析と強化のために最適化されており、Lakebase は高速なクエリとトランザクションの一貫性を必要とする運用ワークロード向けに設計されています。

リバース ETL とは何ですか?
リバースETL使用すると、アナリティクス グレードのデータをUnity Catalogから Lakebase Postgres に移動でき、低レイテンシー クエリ (10 ミリ秒未満) と完全なACID必要とするアプリケーションでデータを利用できるようになります。 キュレーションされたデータをリアルタイム アプリケーションで使用できるようにすることで、分析ストレージと運用システム間のギャップを埋めます。
仕組み
Databricks 同期テーブルは、 Lakebase にUnity Catalogデータの管理されたコピーを作成します。 同期されたテーブルを作成すると、次のものが得られます。
- 新しいUnity Catalogテーブル (読み取り専用、同期パイプラインによって管理)
- Lakebase の Postgres テーブル (アプリケーションからクエリ可能)

たとえば、ゴールド テーブル、エンジニアリングされた特徴、または ML 出力をanalytics.gold.user_profilesから新しい同期テーブルanalytics.gold.user_profiles_syncedに同期できます。Postgres では、Unity Catalog スキーマ名は Postgres スキーマ名になるため、 "gold"."user_profiles_synced"のように表示されます。
SELECT * FROM "gold"."user_profiles_synced" WHERE "user_id" = 12345;
アプリケーションは標準の Postgres ドライバーに接続し、同期されたデータと自身の動作状態を照会します。
同期パイプラインは、マネージドLakeFlow宣言型パイプラインを使用して、ソース テーブルからの変更でUnity Catalog同期テーブルと Postgres テーブルの両方を継続的に更新します。 各同期では、Lakebase データベースへの最大 16 個の接続を使用でき、容量単位 (CU) あたり毎秒約 1,200 行の連続書き込みと、CU あたり毎秒最大 15,000 行の一括書き込みをサポートします。
Lakebase Postgres は、トランザクション保証付きで最大 10,000 のライナー接続をサポートするため、アプリケーションはエンリッチデータを読み取りながら、同じデータベース内で挿入、更新、削除を処理できます。
同期モード
アプリケーションのニーズに応じて適切な同期モードを選択します。
| モード | 説明 | どのようなタスクにベストなのか | パフォーマンス | 
|---|---|---|---|
| スナップショット | すべてのデータの1回限りのコピー | 初期設定または履歴分析 | ソースデータの10%以上を変更する場合、10倍の効率化 | 
| トリガー | オンデマンドまたは一定間隔で実行されるスケジュール更新 | ダッシュボード(1時間ごと/毎日更新) | コストとラグのバランスが良い。5分間隔で実行すると高価になります | 
| 連続 | 数秒の遅延でリアルタイムストリーミング | ライブ アプリケーション (専用のコンピュートによりコストが高くなります) | 遅延は最低、コストは最高。最小15秒間隔 | 
トリガー モードと連続モードでは、ソース テーブルで変更データフィード (CDF) を有効にする必要があります。 CDF が有効になっていない場合、実行する正確なALTER TABLEコマンドを含む警告が UI に表示されます。変更データフィードの詳細については、 「 DatabricksでDelta Lake変更データフィードを使用する」を参照してください。
使用例
Lakebase を使用したリバース ETL は、一般的な運用シナリオをサポートします。
- Databricks アプリに同期された最新のユーザー プロファイルを必要とするパーソナライゼーション エンジン
- モデル予測や特徴量を提供するアプリケーション レイクハウスのコンピュート
- KPIをリアルタイムで表示する顧客向けダッシュボード
- 即時対応のためにリスクスコアを必要とする不正検出サービス
- レイクハウスからキュレーションされたデータを使用して顧客レコードを充実させるサポートツール
同期されたテーブルを作成する(UI)
同期されたテーブルは、Databricks UI または SDK を使用してプログラムで作成できます。UI ワークフローの概要は次のとおりです。
前提条件
必要なもの:
- Lakebase が有効になっている Databricks ワークスペース。
- Lakebase データベース プロジェクト ( 「データベース プロジェクトの作成」を参照)。
- 厳選されたデータを含むUnity Catalogテーブル。
- 同期されたテーブルを作成する権限。
容量計画とデータ型の互換性については、 「データ型と互換性」および「容量計画」を参照してください。
ステップ 1: ソーステーブルを選択します
ワークスペース サイドバーの [カタログ] に移動し、同期するUnity Catalogテーブルを選択します。

ステップ 2: 変更データフィードを有効にする (必要な場合)
トリガー 同期モードまたは 連続 同期モードを使用する場合は、ソース テーブルで変更データフィードを有効にする必要があります。 テーブルですでに CDF が有効になっているかどうかを確認するか、SQL エディターまたはノートブックで次のコマンドを実行します。
ALTER TABLE your_catalog.your_schema.your_table
SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
your_catalog.your_schema.your_table実際のテーブル名に置き換えます。
ステップ 3: 同期テーブルを作成する
テーブルの詳細ビューから、 [作成] > [同期されたテーブル] をクリックします。

ステップ 4: 設定する
同期テーブルの作成 ダイアログで次の操作を行います。
- テーブル名 : 同期されたテーブルの名前を入力します (ソース テーブルと同じカタログとスキーマに作成されます)。これにより、Unity Catalog 同期テーブルとクエリ可能な Postgres テーブルの両方が作成されます。
- データベースの種類 : Lakebase サーバーレス (ベータ) を選択します。
- 同期モード : ニーズに応じて、 スナップショット 、 トリガー 、または 連続 を選択します (上記の同期モードを参照)。
- プロジェクト、ブランチ、データベースの選択を構成します。
- 主キー が正しいことを確認します (通常は自動検出されます)。
Triggered または Continuous モードを選択し、変更データフィードをまだ有効にしていない場合は、実行するための正確なコマンドを含む警告が表示されます。 データ型の互換性に関する質問については、 「データ型と互換性」を参照してください。
同期されたテーブルを作成するには、 [作成] をクリックします。
ステップ 5: モニター
作成後、 カタログ 内の同期されたテーブルを監視します。 [概要] タブには、同期ステータス、構成、パイプライン ステータス、および最後の同期タイムスタンプが表示されます。手動で更新するには 、今すぐ同期 を使用してください。
データ型と互換性
同期されたテーブルを作成するときに、Unity Catalog データ型は Postgres 型にマッピングされます。複合型 (ARRAY、MAP、STRUCT) は、Postgres では JSONB として保存されます。
| ソース列タイプ | Postgresの列型 | 
|---|---|
| BIGINT | BIGINT | 
| バイナリ | バイト | 
| ブール値 | ブール値 | 
| DATE | DATE | 
| 10進数(p,s) | 数値 | 
| DOUBLE | 倍精度 | 
| Float | real | 
| INT | Integer | 
| 間隔 | 間隔 | 
| smallint | smallint | 
| STRING | TEXT | 
| TIMESTAMP | タイムゾーン付きタイムスタンプ | 
| タイムスタンプ_NTZ | タイムゾーンなしのタイムスタンプ | 
| tinyint | smallint | 
| ARRAY<elementType> | JSONB | 
| MAP\ | JSONB | 
| STRUCT<フィールド名:フィールドタイプ[, ...]> | JSONB | 
GEOGRAPHY、GEOMETRY、VARIANT、および OBJECT タイプはサポートされていません。
無効な文字を処理する
ヌル バイト (0x00) などの特定の文字は、 Unity Catalog文字列、ARRAY、MAP、または STRUCT 列では許可されますが、Postgres のTEXTまたは JSONB 列ではサポートされません。 これにより、次のようなエラーが発生し、同期が失敗する可能性があります。
ERROR: invalid byte sequence for encoding "UTF8": 0x00
ERROR: unsupported Unicode escape sequence DETAIL: \u0000 cannot be converted to text
ソリューション:
- 
文字列フィールドをサニタイズ : 同期する前にサポートされていない文字を削除します。文字列列の null バイトの場合: SQLSELECT REPLACE(column_name, CAST(CHAR(0) AS STRING), '') AS cleaned_column FROM your_table
- 
BINARY に変換 : 生のバイトを保持する必要がある文字列列の場合は、BINARY 型に変換します。 
プログラムによる作成
自動化ワークフローの場合、Databricks SDK、CLI、または REST API を使用して、同期されたテーブルをプログラムで作成できます。
- Python SDK
- CLI
- REST API
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.database import (
    SyncedDatabaseTable,
    SyncedTableSpec,
    NewPipelineSpec,
    SyncedTableSchedulingPolicy
)
# Initialize the Workspace client
w = WorkspaceClient()
# Create a synced table
synced_table = w.database.create_synced_database_table(
    SyncedDatabaseTable(
        name="lakebase_catalog.schema.synced_table",  # Full three-part name
        spec=SyncedTableSpec(
            source_table_full_name="analytics.gold.user_profiles",
            primary_key_columns=["user_id"],  # Primary key columns
            scheduling_policy=SyncedTableSchedulingPolicy.TRIGGERED,  # SNAPSHOT, TRIGGERED, or CONTINUOUS
            new_pipeline_spec=NewPipelineSpec(
                storage_catalog="lakebase_catalog",
                storage_schema="staging"
            )
        ),
    )
)
print(f"Created synced table: {synced_table.name}")
# Check the status of a synced table
status = w.database.get_synced_database_table(name=synced_table.name)
print(f"Synced table status: {status.data_synchronization_status.detailed_state}")
print(f"Status message: {status.data_synchronization_status.message}")
# Create a synced table
databricks database create-synced-database-table \
  --json '{
    "name": "lakebase_catalog.schema.synced_table",
    "spec": {
      "source_table_full_name": "analytics.gold.user_profiles",
      "primary_key_columns": ["user_id"],
      "scheduling_policy": "TRIGGERED",
      "new_pipeline_spec": {
        "storage_catalog": "lakebase_catalog",
        "storage_schema": "staging"
      }
    }
  }'
# Check the status of a synced table
databricks database get-synced-database-table "lakebase_catalog.schema.synced_table"
export WORKSPACE_URL="https://your-workspace.cloud.databricks.com"
export DATABRICKS_TOKEN="your-token"
# Create a synced table
curl -X POST "$WORKSPACE_URL/api/2.0/database/synced_tables" \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer $DATABRICKS_TOKEN" \
  --data '{
    "name": "lakebase_catalog.schema.synced_table",
    "spec": {
      "source_table_full_name": "analytics.gold.user_profiles",
      "primary_key_columns": ["user_id"],
      "scheduling_policy": "TRIGGERED",
      "new_pipeline_spec": {
        "storage_catalog": "lakebase_catalog",
        "storage_schema": "staging"
      }
    }
  }'
# Check the status
curl -X GET "$WORKSPACE_URL/api/2.0/database/synced_tables/lakebase_catalog.schema.synced_table" \
  -H "Authorization: Bearer $DATABRICKS_TOKEN"
キャパシティプランニング
リバース ETL 実装を計画するときは、次のリソース要件を考慮してください。
- 接続使用量 : 同期された各テーブルは、Lakebase データベースへの接続を最大 16 個使用します。これはインスタンスの接続制限にカウントされます。
- サイズ制限 : 同期されたすべてのテーブル全体の論理データ サイズ制限は 2 TB です。個々のテーブルには制限はありませんが、Databricks では更新が必要なテーブルについては 1 TB を超えないようにすることを推奨しています。
- 命名要件 : データベース、スキーマ、およびテーブル名には、英数字とアンダースコア ( [A-Za-z0-9_]+) のみを含めることができます。
- スキーマ進化 : トリガー モードと連続モードでは、追加的なスキーマ変更 (列の追加など) のみがサポートされています。
同期されたテーブルを削除する
同期されたテーブルを削除するには、Unity Catalog と Postgres の両方から削除する必要があります。
- 
Unity Catalogから削除 : カタログ で同期したテーブルを見つけて、 メニューをクリックし、 [削除] を選択します。 これにより、データの更新は停止しますが、テーブルは Postgres に残ります。 
- 
Postgres から削除 : Lakebase データベースに接続し、テーブルを削除してスペースを解放します。 SQLDROP TABLE your_database.your_schema.your_table;
SQL エディターまたは外部ツールを使用して Postgres に接続できます。
もっと詳しく知る
| タスク | 説明 | 
|---|---|
| Lakebaseデータベースプロジェクトを設定する | |
| Lakebaseの接続オプションについて | |
| Lakebase データを Unity Catalog で表示して、統合ガバナンスとクロスソースクエリを実現します。 | |
| ガバナンスと権限を理解する | 
その他のオプション
Databricks 以外のシステムにデータを同期する場合は、Census や Hightouch などのPartner Connect リバース ETL ソリューションを参照してください。