Unity Catalog テーブルからデータベース インスタンスにデータを同期する
プレビュー
この機能は、us-east-1
、 us-west-2
、 eu-west-1
、 ap-southeast-1
、 ap-southeast-2
、 eu-central-1
、 us-east-2
、 ap-south-1
のリージョンでパブリック プレビューとして提供されています。
このページでは、同期されたテーブルを作成および管理する方法について説明します。同期テーブルは、Unity Catalog テーブルから Lakebase データベース インスタンスにデータを自動的に同期する Unity Catalog 読み取り専用 Postgres テーブルです。Unity Catalog テーブルを Postgres に同期すると、待機時間の短い読み取りクエリが有効になり、他の Postgres テーブルとのクエリ時結合がサポートされます。
同期は宣言型パイプライン LakeFlow によって処理されます。 マネージドパイプラインは、ソーステーブルからの変更を使用してPostgresテーブルを継続的に更新します。作成後、同期されたテーブルは Postgres ツールを使用して直接クエリできます。
同期されたテーブルの主な特徴は次のとおりです。
- Postgresで読み取り専用でデータ完全性を維持する ソース
- マネージド Lakeflow 宣言型パイプラインを使用して自動的に同期されます
- 標準のPostgreSQLインターフェースを介してクエリ可能
- ガバナンスとライフサイクル管理のために Unity Catalog を使用して管理されます
始める前に
- いずれかのカタログにUnity Catalogテーブルがあること。
- データベース・インスタンスに対する
CAN USE
権限があること。
同期されたテーブルを作成する
- UI
- Python SDK
- CLI
- curl
Unity Catalog テーブルを Postgres に同期するには、次の操作を行います。
-
ワークスペースのサイドバーで カタログ をクリックします。
-
同期テーブルを作成する Unity Catalog テーブルを見つけて選択します。
-
作成 > 同期テーブル をクリックします。
-
カタログとスキーマを選択し、新しい同期テーブルのテーブル名を入力します。
- 同期されたテーブルは、いくつかの追加構成を使用して、 標準カタログに作成することもできます。標準カタログとスキーマを選択し、新しく作成した同期テーブルのテーブル名を入力します。
-
データベースインスタンスを選択し、同期テーブルを作成するPostgresデータベースの名前を入力します。Postgres データベース フィールドは、デフォルトで現在選択されているターゲット カタログになります。この名前で Postgres データベースが存在しない場合、Databricks によって新しいデータベースが作成されます。
-
[主キー ] を選択します。主キーは、読み取り、更新、および削除のために行に効率的にアクセスできるようにするため、 必要 です。
-
ソーステーブルで 2 つの行の主キーが同じ場合は、 時系列キー を選択して重複排除を構成します。 時系列キー を指定すると、同期されたテーブルには、各主キーの最新の時系列キー値を持つ行のみが含まれます。
-
同期モードを [スナップショット] 、[ トリガー] 、および [ 継続] から選択します。詳細については、各同期モードの詳細については、「 同期モードの説明」を参照してください。
-
この同期テーブルを新規パイプラインまたは既存のパイプラインから作成するかどうかを選択します。
- 新しいパイプラインを作成し、マネージドカタログを使用する場合は、ステージングテーブルの保存場所を選択します。標準カタログを使用している場合、ステージング テーブルはカタログに自動的に保存されます。
- 既存のパイプラインを使用している場合は、新しい同期モードがパイプラインモードと一致していることを確認します。
-
(オプション) サーバレス予算ポリシー を選択します。 サーバレス予算ポリシーを作成するには、 サーバレス予算ポリシーでの属性の使用を参照してください。 これにより、請求の使用量を特定の使用ポリシーに帰属させることができます。
- 同期されたテーブルの場合、請求対象エンティティは基になる LakeFlow Declarative パイプライン パイプラインです。 予算ポリシーを変更するには、基になるパイプライン・オブジェクトを変更します。サーバレス パイプラインの設定を参照してください。
-
同期済みテーブルのステータス が オンライン になったら、データベースインスタンスにログインし、新しく作成したテーブルをクエリします。SQL エディタ、外部ツール、またはノートブックを使用してテーブルをクエリします。
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.database import SyncedDatabaseTable, SyncedTableSpec, NewPipelineSpec, SyncedTableSchedulingPolicy
# Initialize the Workspace client
w = WorkspaceClient()
# Create a synced table in a database catalog
synced_table = w.database.create_synced_database_table(
SyncedDatabaseTable(
name="database_catalog.schema.synced_table", # Full three-part name
spec=SyncedTableSpec(
source_table_full_name="source_catalog.source_schema.source_table",
primary_key_columns=["id"], # Primary key columns
scheduling_policy=SyncedTableSchedulingPolicy.TRIGGERED, # SNAPSHOT, TRIGGERED, or CONTINUOUS
# Optional: timeseries_key="timestamp" # For deduplication
new_pipeline_spec=NewPipelineSpec(
storage_catalog="storage_catalog",
storage_schema="storage_schema"
)
),
)
)
print(f"Created synced table: {synced_table.name}")
# Create a synced table in a standard UC catalog
synced_table = w.database.create_synced_database_table(
SyncedDatabaseTable(
name="standard_catalog.schema.synced_table", # Full three-part name
database_instance_name="my-database-instance", # Required for standard catalogs
logical_database_name="postgres_database", # Required for standard catalogs
spec=SyncedTableSpec(
source_table_full_name="source_catalog.source_schema.source_table",
primary_key_columns=["id"],
scheduling_policy=SyncedTableSchedulingPolicy.CONTINUOUS,
create_database_objects_if_missing=True, # Create database/schema if needed
new_pipeline_spec=NewPipelineSpec(
storage_catalog="storage_catalog",
storage_schema="storage_schema"
)
),
)
)
print(f"Created synced table: {synced_table.name}")
# Check the status of a synced table
synced_table_name = "database_catalog.schema.synced_table"
status = w.database.get_synced_database_table(name=synced_table_name)
print(f"Synced table status: {status.data_synchronization_status.detailed_state}")
print(f"Status message: {status.data_synchronization_status.message}")
# Create a synced table in a database catalog
databricks database create-synced-database-table \
--json '{
"spec": {
"name": "database_catalog.schema.synced_table",
"source_table_full_name": "source_catalog.source_schema.source_table",
"primary_key_columns": ["id"],
"scheduling_policy": "TRIGGERED"
},
"new_pipeline_spec": {
"storage_catalog": "storage_catalog",
"storage_schema": "storage_schema"
}
}'
# Create a synced table in a standard UC catalog
# new_pipeline_spec, storage_catalog, and storage_schema are optional
databricks database create-synced-database-table \
--database-instance-name "my-database-instance" \
--logical-database-name "databricks_postgres" \
--json '{
"name": "standard_catalog.schema.synced_table",
"spec": {
"source_table_full_name": "source_catalog.source_schema.source_table",
"primary_key_columns": ["id"],
"scheduling_policy": "CONTINUOUS",
"create_database_objects_if_missing": true
}
}'
# Check the status of a synced table
databricks database get-synced-database-table "database_catalog.schema.synced_table"
データベースカタログに同期テーブルを作成します。
export CATALOG_NAME=<Database catalog>
export SRC_TBL="source_catalog.source_schema.source_table"
export DEST_TBL="$CATALOG_NAME.some_schema.synced_table"
export PKS='["id"]'
export ST_CATALOG = "storage_catalog"
export ST_SCHEMA = "storage_schema"
curl -X POST https://$WORKSPACE/api/2.0/database/synced_tables \
-H "Content-Type: text/json" \
-H "Authorization: Bearer ${DATABRICKS_TOKEN}" \
--data-binary @- << EOF
{
"name": "$DEST_TBL",
"spec": {
"source_table_full_name": "$SRC_TBL",
"primary_key_columns": $PKS,
"scheduling_policy": "TRIGGERED",
},
"new_pipeline_spec": {
"storage_catalog": "$ST_CATALOG",
"storage_schema": "$ST_SCHEMA",
}
}
EOF
標準の Unity Catalog カタログに同期されたテーブルを作成します。
export CATALOG_NAME=<Standard catalog>
export DATABASE_INSTANCE=<database instance>
export POSTGRES_DATABASE=$CATALOG_NAME
export SRC_TBL="source_catalog.source_schema.source_table"
export DEST_TBL="$CATALOG_NAME.some_schema.sync_table"
export PKS='["id"]'
export ST_CATALOG = "storage_catalog"
export ST_SCHEMA = "storage_schema"
curl -X POST https://$WORKSPACE/api/2.0/database/synced_tables \
-H "Content-Type: text/json" \
-H "Authorization: Bearer ${DATABRICKS_TOKEN}" \
--data-binary @- << EOF
{
"name": "$DEST_TBL",
"database_instance_name": "$DATABASE_INSTANCE",
"logical_database_name": "$POSTGRES_DATABASE",
"spec": {
"source_table_full_name": "$SRC_TBL",
"primary_key_columns": $PKS,
"scheduling_policy": "TRIGGERED",
},
"new_pipeline_spec": {
"storage_catalog": "$ST_CATALOG",
"storage_schema": "$ST_SCHEMA",
}
}
EOF
同期されたテーブルのステータスを確認します。
export SYNCEDTABLE='pg_db.silver.sbtest1_online'
curl --request GET \
"https://e2-dogfood.staging.cloud.databricks.com/api/2.0/database/synced_tables/$SYNCEDTABLE" \
--header "Authorization: Bearer dapi..."
同期モードの説明
同期テーブルは、Postgresのソースから同期テーブルにデータを同期する方法を決定する、次のいずれかの同期モードで作成できます。
同期モード | 説明 | パフォーマンス |
---|---|---|
スナップショット | パイプラインは 1 回実行され、ソース テーブルのスナップショットを取得し、同期されたテーブルにコピーします。後続のパイプライン実行では、ソース データ全体が宛先にコピーされ、アトミックに置き換えられます。パイプラインは、手動で、API を介して、またはスケジュールに従ってトリガーできます。 | このモードは、データを最初から再作成するため、トリガーまたは連続同期モードよりも 10 倍効率的です。ソース テーブルの 10% 以上を変更する場合は、このモードの使用を検討してください。 |
トリガー | パイプラインは 1 回実行され、ソース テーブルのスナップショットを取得し、同期されたテーブルにコピーします。スナップショット同期モードとは異なり、同期されたテーブルが更新されると、最後のパイプライン実行以降の変更のみが取得され、同期されたテーブルに適用されます。増分更新は、手動で、API を介して、またはスケジュールに従ってトリガーできます。 | このモードは、オンデマンドで実行し、前回以降の変更のみを適用するため、ラグとコストの間の適切な妥協点です 実行。 遅延を最小限に抑えるには、ソーステーブルを更新した直後にこのパイプラインを実行します。これを 5 分ごとよりも頻繁に実行すると、パイプラインを毎回開始および停止するコストがかかるため、継続モードよりもコストが高くなる可能性があります。 |
連続 | パイプラインは 1 回実行され、ソース テーブルのスナップショットを取得し、同期されたテーブルにコピーすると、パイプラインは継続的に実行されます。ソース表に対する後続の変更は、同期された表にリアルタイムで徐々に適用されます。手動で更新する必要はありません。 | このモードは、継続的に実行されるため、遅延は最も低くなりますが、コストは高くなります。 |
トリガー同期 モードまたは 連続 同期モードをサポートするには、ソーステーブルでチェンジデータフィードを有効にする必要があります。特定のソース (ビューなど) はチェンジデータフィードをサポートしていないため、スナップショットモードでのみ同期できます。
サポートされている操作
Databricks では、偶発的な上書きやデータの不整合を防ぐために、同期されたテーブルに対して Postgres で次の操作のみを実行することをお勧めします。
- 読み取り専用クエリ
- インデックスの作成
- テーブルを削除する (同期されたテーブルを Unity Catalog から削除した後に領域を解放するため)
このテーブルを他の方法で変更することはできますが、同期パイプラインに干渉します。
同期されたテーブルを削除する
同期されたテーブルを削除するには、Unity Catalog からテーブルを削除し、データベース インスタンスにテーブルを削除する必要があります。同期されたテーブルを Unity Catalog から削除すると、テーブルの登録が解除され、データの更新が停止されます。ただし、テーブルは基盤となるPostgresデータベースに残ります。データベース・インスタンスの領域を解放するには、インスタンスに接続し、 DROP TABLE
コマンドを使用します。
- UI
- Python SDK
- CLI
- curl
-
ワークスペースのサイドバーで カタログ をクリックします。
-
削除する同期テーブルを見つけて選択します。
-
> 削除 をクリックします。
-
psql
、SQL エディター、またはノートブックからインスタンスに接続します。 -
PostgreSQLを使用してテーブルを削除します。
SQLDROP TABLE synced_table_database.synced_table_schema.synced_table
from databricks.sdk import WorkspaceClient
# Initialize the Workspace client
w = WorkspaceClient()
# Delete a synced table from UC
synced_table_name = "catalog.schema.synced_table"
w.database.delete_synced_database_table(name=synced_table_name)
print(f"Deleted synced table from UC: {synced_table_name}")
# To free up space in your database instance, you need to connect to the
# instance and drop the table using PostgreSQL:
#
# DROP TABLE synced_table_database.synced_table_schema.synced_table;
# Delete a synced table from UC
databricks database delete-synced-database-table "catalog.schema.synced_table"
# To free up space in your database instance, you need to connect to the
# instance and drop the table using PostgreSQL:
#
# DROP TABLE synced_table_database.synced_table_schema.synced_table;
# Delete a synced table from UC
curl -X DELETE --header "Authorization: Bearer ${DATABRICKS_TOKEN}" \
https://$WORKSPACE/api/2.0/database/synced_tables/$SYNCED_TABLE_NAME
# To free up space in your database instance, you need to connect to the
# instance and drop the table using PostgreSQL:
#
# DROP TABLE synced_table_database.synced_table_schema.synced_table;
所有権と権限
新しいPostgresデータベース、スキーマ、またはテーブルを作成する場合、Postgresの所有権は次のように設定されます。
- Databricks ログインが Postgres にロールとして存在する場合、所有権はデータベース、スキーマ、またはテーブルを作成するユーザーに割り当てられます。Postgres で Databricks ID ロールを追加するには、「 Postgres ロールの管理」を参照してください。
- それ以外の場合、所有権はPostgresの親オブジェクトの所有者(通常は
databricks_superuser
)に割り当てられます。
同期されたテーブル アクセスを管理する
同期されたテーブルが作成されると、 databricks_superuser
は同期されたテーブルをPostgresから READ
できます。databricks_superuser
には、pg_read_all_data
権限とpg_write_all_data
権限があります。
-
databricks_superuser
は、次の権限を他のユーザーに付与することもできます。SQLGRANT USAGE ON SCHEMA synced_table_schema TO user;
SQLGRANT SELECT ON synced_table_name TO user;
-
databricks_superuser
は、次の権限を取り消すことができます。SQLREVOKE USAGE ON SCHEMA synced_table_schema FROM user;
SQLREVOKE {SELECT | INSERT | UPDATE | DELETE} ON synced_table_name FROM user;
同期されたテーブル操作を管理する
databricks_superuser
は、同期されたテーブルに対して特定の操作を実行する権限を持つユーザーを管理できます。同期されたテーブルでサポートされている操作は次のとおりです。
CREATE INDEX
ALTER INDEX
DROP INDEX
DROP TABLE
同期されたテーブルに対して、他のすべての DDL 操作は拒否されます。
これらの権限を追加のユーザーに付与するには、 databricks_superuser
はまず databricks_auth
に拡張機能を作成する必要があります。
CREATE EXTENSION IF NOT EXISTS databricks_auth;
その後、 databricks_superuser
は同期されたテーブルを管理するユーザーを追加できます。
SELECT databricks_synced_table_add_manager('"synced_table_schema"."synced_table"'::regclass, '[user]');
databricks_superuser
は、同期されたテーブルの管理からユーザーを削除できます。
SELECT databricks_synced_table_remove_manager('[table]', '[user]');
databricks_superuser
は、すべてのマネージャーを表示できます。
SELECT * FROM databricks_synced_table_managers;
データ型マッピング
この型マッピング テーブルは、ソース Unity Catalog テーブル内の各 データ型 を Postgres の宛先同期テーブルにマップする方法を定義します。
ソース列タイプ | Postgres 列型 |
---|---|
BIGINT | |
バイト | |
ブール値 | |
DATE | |
数値 | |
倍精度 | |
real | |
Integer | |
間隔 | |
smallint | |
TEXT | |
タイムゾーン付きタイムスタンプ | |
タイム・ゾーンなしのタイム・スタンプ | |
smallint | |
サポートされていない | |
サポートされていない | |
JSONBの | |
JSONBの | |
STRUCT < [フィールド名 : フィールドタイプ [NOT NULL][COMMENT str][, ...]] > | JSONBの |
サポートされていない | |
サポートされていない |
- 2025 年 5 月に ARRAY、MAP、STRUCT 型のマッピングが変更されました。それより前に作成された同期テーブルは、これらの型を引き続き JSON にマップします。
- TIMESTAMP のマッピングは 2025 年 8 月に変更されました。それより前に作成された同期テーブルは、引き続き TIMESTAMP WITHOUT TIME ZONE にマップされます。
無効な文字を処理する
null バイト (0x00) などの特定の文字は、Delta テーブルの STRING
、 ARRAY
、 MAP
、または STRUCT
列では許可されますが、Postgres TEXT
列または JSONB
列ではサポートされていません。その結果、このようなデータを Delta から Postgres に同期すると、エラーを伴う挿入失敗が発生する可能性があります。
org.postgresql.util.PSQLException: ERROR: invalid byte sequence for encoding "UTF8": 0x00
org.postgresql.util.PSQLException: ERROR: unsupported Unicode escape sequence DETAIL: \u0000 cannot be converted to text.
- 最初のエラーは、Postgres
TEXT
に直接マップされる最上位の文字列列にnullバイトが表示されたときに発生します。 - 2 番目のエラーは、Databricks が
JSONB
としてシリアル化する複合型 (STRUCT
、ARRAY
、またはMAP
) 内に入れ子になった文字列に null バイトが表示される場合に発生します。シリアル化中、すべての文字列はPostgresTEXT
にキャストされ、\u0000
は許可されません。
解決方法:
この問題は、次のいずれかの方法で対処できます。
-
文字列フィールドのサニタイズ
Postgres に同期する前に、複合型内の文字列を含むすべての文字列フィールドからサポートされていない文字を削除または置換します。
最上位の
STRING
列からヌルバイトを削除するには、REPLACE
関数を使用します。SQLSELECT REPLACE(column_name, CAST(CHAR(0) AS STRING), '') AS cleaned_column FROM your_table;
-
バイナリに変換 (
STRING
列のみ)未加工のバイト内容を保持する必要がある場合は、影響を受ける
STRING
列をBINARY
に変換します。
制限事項と考慮事項
サポートされているソーステーブル
同期されたテーブルの同期モードに応じて、さまざまなタイプのソース テーブルがサポートされます。
-
スナップショット モードの場合、ソース テーブルは
SELECT *
をサポートしている必要があります。例としては、 Delta テーブル、 Iceberg テーブル、ビュー、マテリアライズドビュー、その他の同様のタイプがあります。 -
トリガー同期モードまたは連続同期モードの場合、ソーステーブルでチェンジデータフィード も 有効になっている必要があります。
名前付けと識別子の制限
- 使用できる文字: 同期されたテーブルのPostgresデータベース名、スキーマ名、およびテーブル名には、英数字とアンダースコアのみを含めることができます(
[A-Za-z0-9_]+
)。ハイフン (-
) およびその他の特殊文字はサポートされていません。 - 列とテーブルの識別子: ソース Unity Catalog テーブルの列名またはテーブル名に大文字や特殊文字を使用しないでください。保持されている場合は、Postgresで参照するときにこれらの識別子を引用符で囲む必要があります。
パフォーマンスと同期
- 同期速度: 同期されたテーブルへのデータの同期は、追加の処理により、ネイティブ PostgreSQL クライアントを使用してデータベース インスタンスに直接データを書き込むよりも遅くなる場合があります。連続同期モードは、 Unity Catalog マネージドテーブルから同期テーブルに最小 15 秒間隔でデータを更新します。
- 接続の使用状況: 各テーブル同期では、データベースインスタンスへの最大 16 個の接続を使用できますが、これはインスタンスの接続制限にカウントされます。
- APIべき等性: 同期されたテーブルAPIsはべき等であり、タイムリーな操作を確保するためにエラーが発生した場合に再試行する必要がある場合があります。
- スキーマの変更:
Triggered
モードまたはContinuous
モードの同期テーブルの場合、Unity Catalog テーブルからの追加のスキーマ変更 (新しい列の追加など) のみが同期テーブルに反映されます。 - 重複するキー: ソーステーブルで 2 つの行の主キーが同じ場合、時系列キーを使用して重複排除を設定しない限り、同期パイプラインは失敗します。ただし、時系列キーを使用すると、パフォーマンスが低下します。
- 更新率: 同期パイプラインは、キャパシティーユニット (CU) あたり約 1,200 行/秒の連続書き込みと、CU あたり最大 15,000 行/秒の一括書き込みをサポートします。
容量と制限
-
テーブルの制限:
- ソーステーブルごとに同期されたテーブルは 20 個に制限されています。
- 各テーブル同期では、最大 16 個のデータベース接続を使用できます。
-
サイズ制限と完全な更新:
- 同期されたテーブルを完全に更新した場合、新しいテーブルが同期されるまで、Postgres の古いバージョンは削除されません。どちらのバージョンも、更新中に一時的に論理データベースのサイズ制限にカウントされます。
- 個々の同期テーブルには制限はありませんが、インスタンス内のすべてのテーブルの論理データサイズの合計制限は 2 TB です。ただし、テーブル全体の再作成ではなく更新が必要な場合は、1 TB を超えないように Databricks ことをお勧めします。
- Unity Catalog テーブルの非圧縮の行形式サイズがデータベース インスタンスのサイズ制限 (2 TB) を超えると、同期は失敗します。インスタンスにさらに書き込む前に、同期されたテーブルをPostgresで削除する必要があります。
カタログ統合
- カタログの複製: 別のデータベース カタログとしても登録されている Postgres データベースを対象とする標準カタログに同期テーブルを作成すると、同期されたテーブルが Unity Catalog の標準カタログとデータベース カタログの両方に表示されます。