Unity Catalog テーブルからデータベース インスタンスへのデータの同期
プレビュー
この機能は、us-east-1
、us-west-2
、eu-west-1
、ap-southeast-1
、ap-southeast-2
、eu-central-1
、us-east-2
、ap-south-1
の各地域でパブリック プレビュー段階です。
このページでは、同期テーブルを作成および管理する方法について説明します。同期されたテーブルは、Unity Catalog テーブルのデータを Lakebase データベース インスタンスに自動的に同期する、Unity Catalog 読み取り専用の Postgres テーブルです。Unity Catalog テーブルを Postgres に同期すると、待機時間の短い読み取りクエリが可能になり、他の Postgres テーブルとのクエリ時の結合がサポートされます。
同期は、ソース テーブルからの変更で Postgres テーブルを継続的に更新するマネージド Lakeflow 宣言型パイプラインによって処理されます。 作成後、同期されたテーブルは、Postgres ツールを使用して直接クエリを実行できます。
同期テーブルの主な特性は次のとおりです。
- Postgres で読み取り専用で、ソースでデータの完全性を維持します。
- マネージド Lakeflow 宣言型パイプラインを使用して自動的に同期されます
- 標準のPostgreSQLインターフェースを介してクエリ可能
- Unity Catalog によるガバナンスとライフサイクル管理による管理
始める前に
- いずれかのカタログにUnity Catalogテーブルがあること。
- データベース・インスタンスに対する
CAN USE
権限があること。
同期テーブルの作成
- UI
- Python SDK
- CLI
- curl
Unity Catalog テーブルを Postgres に同期するには、次の操作を行います。
-
ワークスペースのサイドバーで カタログ をクリックします。
-
同期テーブルを作成するUnity Catalogテーブルを見つけて選択します。
-
作成 > 同期テーブル をクリックします。
-
カタログとスキーマを選択し、新しい同期テーブルのテーブル名を入力します。
- 同期テーブルは、いくつかの追加設定を使用して 、標準カタログでも作成できます。標準カタログとスキーマを選択し、新しく作成した同期テーブルのテーブル名を入力します。
-
データベースインスタンスを選択し、同期テーブルを作成するPostgresデータベースの名前を入力します。Postgres データベース フィールドは、デフォルトで現在選択されているターゲット カタログになります。この名前で Postgres データベースが存在しない場合、Databricks は新しいデータベースを作成します。
-
[プライマリキー ] を選択します。プライマリ・キーは、読み取り、更新、および削除のためにローに効率的にアクセスできるようにする ために必要です 。
-
ソーステーブルで 2 つの行のプライマリキーが同じ場合は、 時系列キー を選択して重複排除を設定します。 時系列キー を指定すると、同期されたテーブルには、各プライマリ・キーの最新の時系列キー値を持つ行のみが含まれます。
-
同期モードを [スナップショット ]、[ トリガー]、[ 連続] から選択します。各同期モードの詳細については、「 同期モードの説明」を参照してください。
-
この同期テーブルを新規または既存のパイプラインから作成するかどうかを選択します。
- 新しいパイプラインを作成し、マネージドカタログを使用する場合は、ステージングテーブルの保存場所を選択します。標準カタログを使用している場合、ステージング テーブルはカタログに自動的に保存されます。
- 既存のパイプラインを使用している場合は、新しい同期モードがパイプライン モードと一致していることを確認します。
-
(オプション) サーバレス予算ポリシー を選択します。 サーバレス予算ポリシーを作成するには、 サーバレス予算ポリシーでの属性の使用を参照してください。 これにより、請求の使用量を特定の使用ポリシーに帰属させることができます。
- 同期されたテーブルの場合、請求対象エンティティは基になる LakeFlow Declarative パイプライン パイプラインです。 予算ポリシーを変更するには、基になるパイプライン・オブジェクトを変更します。サーバレス パイプラインの設定を参照してください。
-
同期済みテーブルのステータス が オンライン になったら、データベースインスタンスにログインし、新しく作成したテーブルをクエリします。SQL エディタ、外部ツール、またはノートブックを使用してテーブルをクエリします。
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.database import SyncedDatabaseTable, SyncedTableSpec, NewPipelineSpec, SyncedTableSchedulingPolicy
# Initialize the Workspace client
w = WorkspaceClient()
# Create a synced table in a database catalog
synced_table = w.database.create_synced_database_table(
SyncedDatabaseTable(
name="database_catalog.schema.synced_table", # Full three-part name
spec=SyncedTableSpec(
source_table_full_name="source_catalog.source_schema.source_table",
primary_key_columns=["id"], # Primary key columns
scheduling_policy=SyncedTableSchedulingPolicy.TRIGGERED, # SNAPSHOT, TRIGGERED, or CONTINUOUS
# Optional: timeseries_key="timestamp" # For deduplication
new_pipeline_spec=NewPipelineSpec(
storage_catalog="storage_catalog",
storage_schema="storage_schema"
)
),
)
)
print(f"Created synced table: {synced_table.name}")
# Create a synced table in a standard UC catalog
synced_table = w.database.create_synced_database_table(
SyncedDatabaseTable(
name="standard_catalog.schema.synced_table", # Full three-part name
database_instance_name="my-database-instance", # Required for standard catalogs
logical_database_name="postgres_database", # Required for standard catalogs
spec=SyncedTableSpec(
source_table_full_name="source_catalog.source_schema.source_table",
primary_key_columns=["id"],
scheduling_policy=SyncedTableSchedulingPolicy.CONTINUOUS,
create_database_objects_if_missing=True, # Create database/schema if needed
new_pipeline_spec=NewPipelineSpec(
storage_catalog="storage_catalog",
storage_schema="storage_schema"
)
),
)
)
print(f"Created synced table: {synced_table.name}")
# Check the status of a synced table
synced_table_name = "database_catalog.schema.synced_table"
status = w.database.get_synced_database_table(name=synced_table_name)
print(f"Synced table status: {status.data_synchronization_status.detailed_state}")
print(f"Status message: {status.data_synchronization_status.message}")
# Create a synced table in a database catalog
databricks database create-synced-database-table \
--json '{
"spec": {
"name": "database_catalog.schema.synced_table",
"source_table_full_name": "source_catalog.source_schema.source_table",
"primary_key_columns": ["id"],
"scheduling_policy": "TRIGGERED"
},
"new_pipeline_spec": {
"storage_catalog": "storage_catalog",
"storage_schema": "storage_schema"
}
}'
# Create a synced table in a standard UC catalog
# new_pipeline_spec, storage_catalog, and storage_schema are optional
databricks database create-synced-database-table \
--database-instance-name "my-database-instance" \
--logical-database-name "databricks_postgres" \
--json '{
"name": "standard_catalog.schema.synced_table",
"spec": {
"source_table_full_name": "source_catalog.source_schema.source_table",
"primary_key_columns": ["id"],
"scheduling_policy": "CONTINUOUS",
"create_database_objects_if_missing": true
}
}'
# Check the status of a synced table
databricks database get-synced-database-table "database_catalog.schema.synced_table"
データベース・カタログに同期テーブルを作成します。
export CATALOG_NAME=<Database catalog>
export SRC_TBL="source_catalog.source_schema.source_table"
export DEST_TBL="$CATALOG_NAME.some_schema.synced_table"
export PKS='["id"]'
export ST_CATALOG = "storage_catalog"
export ST_SCHEMA = "storage_schema"
curl -X POST https://$WORKSPACE/api/2.0/database/synced_tables \
-H "Content-Type: text/json" \
-H "Authorization: Bearer ${DATABRICKS_TOKEN}" \
--data-binary @- << EOF
{
"name": "$DEST_TBL",
"spec": {
"source_table_full_name": "$SRC_TBL",
"primary_key_columns": $PKS,
"scheduling_policy": "TRIGGERED",
},
"new_pipeline_spec": {
"storage_catalog": "$ST_CATALOG",
"storage_schema": "$ST_SCHEMA",
}
}
EOF
同期テーブルを標準の Unity Catalog カタログに作成します。
export CATALOG_NAME=<Standard catalog>
export DATABASE_INSTANCE=<database instance>
export POSTGRES_DATABASE=$CATALOG_NAME
export SRC_TBL="source_catalog.source_schema.source_table"
export DEST_TBL="$CATALOG_NAME.some_schema.sync_table"
export PKS='["id"]'
export ST_CATALOG = "storage_catalog"
export ST_SCHEMA = "storage_schema"
curl -X POST https://$WORKSPACE/api/2.0/database/synced_tables \
-H "Content-Type: text/json" \
-H "Authorization: Bearer ${DATABRICKS_TOKEN}" \
--data-binary @- << EOF
{
"name": "$DEST_TBL",
"database_instance_name": "$DATABASE_INSTANCE",
"logical_database_name": "$POSTGRES_DATABASE",
"spec": {
"source_table_full_name": "$SRC_TBL",
"primary_key_columns": $PKS,
"scheduling_policy": "TRIGGERED",
},
"new_pipeline_spec": {
"storage_catalog": "$ST_CATALOG",
"storage_schema": "$ST_SCHEMA",
}
}
EOF
同期されたテーブルのステータスを確認します。
export SYNCEDTABLE='pg_db.silver.sbtest1_online'
curl --request GET \
"https://e2-dogfood.staging.cloud.databricks.com/api/2.0/database/synced_tables/$SYNCEDTABLE" \
--header "Authorization: Bearer dapi..."
同期モードの説明
同期テーブルは、次のいずれかの同期モードで作成でき、これにより、ソースから Postgres の同期テーブルにデータを同期する方法が決まります。
同期モード | 説明 | パフォーマンス |
---|---|---|
スナップショット | パイプラインは 1 回実行され、ソース テーブルのスナップショットを取得し、同期されたテーブルにコピーします。後続のパイプライン実行では、ソース データ全体が宛先にコピーされ、アトミックに置き換えられます。パイプラインは、API を使用して手動でトリガーすることも、スケジュールに従ってトリガーすることもできます。 | このモードは、データを最初から再作成するため、トリガー同期モードや連続同期モードよりも 10 倍効率的です。ソース テーブルの 10% 以上を変更する場合は、このモードの使用を検討してください。 |
トリガー | パイプラインは 1 回実行され、ソース テーブルのスナップショットを取得し、同期されたテーブルにコピーします。スナップショット同期モードとは異なり、同期されたテーブルが更新されると、最後のパイプライン実行以降の変更のみが取得され、同期されたテーブルに適用されます。増分更新は、手動でトリガーすることも、API を使用してトリガーすることも、スケジュールに従ってトリガーすることもできます。 | このモードは、オンデマンドで実行され、最後の実行以降の変更のみを適用するため、ラグとコストの適切な妥協点です。 遅延を最小限に抑えるには、ソース テーブルを更新した直後にこのパイプラインを実行します。これを 5 分ごとよりも頻繁に実行すると、パイプラインを毎回開始および停止するコストがかかるため、連続モードよりもコストがかかる可能性があります。 |
連続 | パイプラインは 1 回実行され、ソース テーブルのスナップショットを取得し、同期されたテーブルにコピーすると、パイプラインは継続的に実行されます。ソース テーブルに対する後続の変更は、同期されたテーブルにリアルタイムで増分的に適用されます。手動で更新する必要はありません。 | このモードは、継続的に実行されるため、遅延は最も少なくなりますが、コストが高くなります。 |
トリガー 同期モードまたは 連続 同期モードをサポートするには、ソース テーブルでチェンジデータフィードが有効になっている必要があります。一部のソース (ビューなど) はチェンジデータフィードをサポートしていないため、スナップショットモードでのみ同期できます。
サポートされている操作
同期されたテーブルに対して Postgres 側でサポートされる操作のセットは限られています。
- 読み取り専用クエリ
- インデックスの作成
- テーブルの削除 (同期されたテーブルを Unity Catalogから削除した後でスペースを解放するため)
このテーブルは他の方法で変更できますが、同期パイプラインに干渉します。
同期されたテーブルの削除
同期されたテーブルを削除するには、Unity Catalog からテーブルを削除してから、データベース インスタンス内のテーブルを削除する必要があります。同期されたテーブルを Unity Catalog から削除すると、テーブルの登録が解除され、データの更新が停止します。ただし、テーブルは基になる Postgres データベースに残ります。データベース・インスタンスのスペースを解放するには、インスタンスに接続し、 DROP TABLE
コマンドを使用します。
- UI
- Python SDK
- CLI
- curl
-
ワークスペースのサイドバーで カタログ をクリックします。
-
削除する同期テーブルを見つけて選択します。
-
> 削除 をクリックします。
-
インスタンスには、
psql
、SQL エディター、またはノートブックから接続します。 -
PostgreSQL を使用してテーブルを削除します。
SQLDROP TABLE synced_table_database.synced_table_schema.synced_table
from databricks.sdk import WorkspaceClient
# Initialize the Workspace client
w = WorkspaceClient()
# Delete a synced table from UC
synced_table_name = "catalog.schema.synced_table"
w.database.delete_synced_database_table(name=synced_table_name)
print(f"Deleted synced table from UC: {synced_table_name}")
# To free up space in your database instance, you need to connect to the
# instance and drop the table using PostgreSQL:
#
# DROP TABLE synced_table_database.synced_table_schema.synced_table;
# Delete a synced table from UC
databricks database delete-synced-database-table "catalog.schema.synced_table"
# To free up space in your database instance, you need to connect to the
# instance and drop the table using PostgreSQL:
#
# DROP TABLE synced_table_database.synced_table_schema.synced_table;
# Delete a synced table from UC
curl -X DELETE --header "Authorization: Bearer ${DATABRICKS_TOKEN}" \
https://$WORKSPACE/api/2.0/database/synced_tables/$SYNCED_TABLE_NAME
# To free up space in your database instance, you need to connect to the
# instance and drop the table using PostgreSQL:
#
# DROP TABLE synced_table_database.synced_table_schema.synced_table;
所有権と権限
新しい Postgres データベース、スキーマ、またはテーブルを作成する場合、Postgres の所有権は次のように設定されます。
- Databricks ログインが Postgres にロールとして存在する場合、所有権はデータベース、スキーマ、またはテーブルを作成するユーザーに割り当てられます。Postgres で Databricks ID ロールを追加するには、「 Postgres ロールの管理」を参照してください。
- それ以外の場合、所有権はPostgresの親オブジェクトの所有者(通常は
databricks_superuser
)に割り当てられます。
同期テーブルへのアクセスを管理する
同期テーブルが作成された後、 databricks_superuser
は Postgres から同期テーブルを READ
できます。databricks_superuser
には、pg_read_all_data
権限とpg_write_all_data
権限があります。
-
databricks_superuser
は、次の権限を他のユーザーに付与することもできます。SQLGRANT USAGE ON SCHEMA synced_table_schema TO user;
SQLGRANT SELECT ON synced_table_name TO user;
-
databricks_superuser
は、次の特権を取り消すことができます。SQLREVOKE USAGE ON SCHEMA synced_table_schema FROM user;
SQLREVOKE {SELECT | INSERT | UPDATE | DELETE} ON synced_table_name FROM user;
同期されたテーブルの操作を管理する
databricks_superuser
は、同期されたテーブルで特定の操作を実行する権限を持つユーザーを管理できます。同期されたテーブルでサポートされている操作は次のとおりです。
CREATE INDEX
ALTER INDEX
DROP INDEX
DROP TABLE
同期されたテーブルに対する他のすべての DDL 操作は拒否されます。
これらの権限を追加のユーザーに付与するには、 databricks_superuser
まず databricks_auth
で拡張機能を作成する必要があります。
CREATE EXTENSION IF NOT EXISTS databricks_auth;
その後、 databricks_superuser
同期テーブルを管理するユーザーを追加できます。
SELECT databricks_synced_table_add_manager('"synced_table_schema"."synced_table"'::regclass, '[user]');
databricks_superuser
は、同期されたテーブルの管理からユーザーを削除できます。
SELECT databricks_synced_table_remove_manager('[table]', '[user]');
databricks_superuser
は、すべてのマネージャーを表示できます。
SELECT * FROM databricks_synced_table_managers;
データ型マッピング
この型マッピング テーブルは、ソース Unity Catalog テーブル内の各 データ型 を Postgres の宛先同期テーブルにマップする方法を定義します。
ソース列タイプ | Postgres 列型 |
---|---|
BIGINT | |
バイト | |
ブール値 | |
DATE | |
数値 | |
倍精度 | |
real | |
Integer | |
間隔 | |
smallint | |
TEXT | |
タイムゾーン付きタイムスタンプ | |
タイム・ゾーンなしのタイム・スタンプ | |
smallint | |
サポートされていない | |
サポートされていない | |
JSONBの | |
JSONBの | |
STRUCT < [フィールド名 : フィールドタイプ [NOT NULL][COMMENT str][, ...]] > | JSONBの |
サポートされていない | |
サポートされていない |
- 2025 年 5 月に ARRAY、MAP、STRUCT 型のマッピングが変更されました。それより前に作成された同期テーブルは、これらの型を引き続き JSON にマップします。
- TIMESTAMP のマッピングは 2025 年 8 月に変更されました。それより前に作成された同期テーブルは、引き続き TIMESTAMP WITHOUT TIME ZONE にマップされます。
無効な文字の処理
ヌル バイト (0x00) などの特定の文字は、Delta テーブルの STRING
、 ARRAY
、 MAP
、または STRUCT
列で使用できますが、Postgres の TEXT
列や JSONB
列ではサポートされていません。その結果、このようなデータを Delta から Postgres に同期すると、エラーを伴う挿入エラーが発生する可能性があります。
org.postgresql.util.PSQLException: ERROR: invalid byte sequence for encoding "UTF8": 0x00
org.postgresql.util.PSQLException: ERROR: unsupported Unicode escape sequence DETAIL: \u0000 cannot be converted to text.
- 最初のエラーは、ヌルバイトが最上位の文字列列に表示され、Postgres
TEXT
に直接マップされる場合に発生します。 - 2 番目のエラーは、Databricks が
JSONB
としてシリアル化する複合型 (STRUCT
、ARRAY
、またはMAP
) 内にネストされた文字列に null バイトが出現する場合に発生します。シリアル化中、すべての文字列は PostgresTEXT
にキャストされ、\u0000
は許可されません。
解決方法:
この問題は、次のいずれかの方法で対処できます。
-
文字列フィールドをサニタイズ
Postgres に同期する前に、複合型内の文字列を含むすべての文字列フィールドからサポートされていない文字を削除または置換します。
最上位の
STRING
列から null バイトを削除するには、REPLACE
関数を使用します。SQLSELECT REPLACE(column_name, CAST(CHAR(0) AS STRING), '') AS cleaned_column FROM your_table;
-
バイナリに変換 (
STRING
列のみ)未加工のバイト内容を保持する必要がある場合は、影響を受ける
STRING
列をBINARY
に変換します。
制限事項と考慮事項
サポートされているソーステーブル
同期されたテーブルの同期モードに応じて、さまざまなタイプのソース テーブルがサポートされます。
-
スナップショット モードの場合、ソース テーブルは
SELECT *
をサポートしている必要があります。例としては、 Delta テーブル、 Iceberg テーブル、ビュー、マテリアライズドビュー、その他の同様のタイプがあります。 -
トリガー同期モードまたは連続同期モードの場合、ソーステーブルでチェンジデータフィード も 有効になっている必要があります。
名前付けと識別子の制限
- 使用できる文字: 同期されたテーブルの Postgres データベース名、スキーマ名、およびテーブル名には、英数字とアンダースコア (
[A-Za-z0-9_]+
) のみを含めることができます。ハイフン (-
) およびその他の特殊文字はサポートされていません。 - 列とテーブルの識別子: ソース Unity Catalog テーブルの列名またはテーブル名に大文字や特殊文字を使用しないでください。 保持する場合は、Postgres で参照するときにこれらの識別子を引用符で囲む必要があります。
パフォーマンスと同期
- 同期速度: 同期されたテーブルへのデータの同期は、追加の処理により、ネイティブの PostgreSQL クライアントを使用してデータベース インスタンスに直接データを書き込むよりも遅くなる可能性があります。連続同期モードは、 Unity Catalog マネージドテーブルから同期テーブルにデータを最小 15 秒間隔で更新します。
- 接続の使用状況: 各テーブルの同期では、データベースインスタンスへの最大 16 個の接続を使用できます。これは、インスタンスの接続制限にカウントされます。
- APIべき等性: 同期されたテーブルAPIsはべき等であり、エラーが発生した場合は、タイムリーな操作を確保するために再試行が必要になる場合があります。
- スキーマの変更:
Triggered
モードまたはContinuous
モードの同期テーブルの場合、Unity Catalog テーブルからのスキーマの追加変更 (新しい列の追加など) のみが同期テーブルに反映されます。 - 重複するキー: ソース テーブルで 2 つの行の主キーが同じ場合、Timeseries キーを使用して重複除去を構成しない限り、同期パイプラインは失敗します。ただし、時系列キーを使用すると、パフォーマンスが低下します。
- 更新レート: 同期パイプラインは、キャパシティーユニット (CU) あたり約 1,200 行/秒の連続書き込みと、CU あたり最大 15,000 行/秒の一括書き込みをサポートします。
容量と制限
-
テーブルの制限:
- 同期テーブルは、ソース テーブルごとに 20 個に制限されています。
- 各テーブルの同期では、最大 16 のデータベース接続を使用できます。
-
サイズ制限と完全更新:
- 同期されたテーブルをフル更新した場合、新しいテーブルが同期されるまで、Postgres の古いバージョンは削除されません。 どちらのバージョンも、更新中に一時的に論理データベース・サイズ制限にカウントされます。
- 同期される各テーブルは最大 2 TB です。完全なテーブル再作成の代わりに更新が必要な場合、制限は 1 TB です。
- Unity Catalog テーブルの非圧縮の行形式サイズがデータベース インスタンス サイズの制限 (2 TB) を超えると、同期は失敗します。インスタンスにさらに書き込む前に、同期されたテーブルをPostgresで削除する必要があります。
カタログ統合
- カタログの複製: 別のデータベース カタログとしても登録されている Postgres データベースを対象とする標準カタログに同期テーブルを作成すると、同期されたテーブルは Unity Catalog の標準カタログとデータベース カタログの両方に表示されます。