Unity Catalog テーブルからデータベース インスタンスへのデータの同期
プレビュー
この機能は、us-east-1
、us-west-2
、eu-west-1
、ap-southeast-1
、ap-southeast-2
、eu-central-1
、us-east-2
、ap-south-1
の各地域でパブリック プレビュー段階です。
このページでは、同期テーブルを作成および管理する方法について説明します。同期されたテーブルは、Unity Catalog テーブルのデータを Lakebase データベース インスタンスに自動的に同期する、Unity Catalog 読み取り専用の Postgres テーブルです。Unity Catalog テーブルを Postgres に同期すると、待機時間の短い読み取りクエリが可能になり、他の Postgres テーブルとのクエリ時の結合がサポートされます。
同期は、ソース テーブルからの変更で Postgres テーブルを継続的に更新するマネージド Lakeflow 宣言型パイプラインによって処理されます。 作成後、同期されたテーブルは、Postgres ツールを使用して直接クエリを実行できます。
同期テーブルの主な特性は次のとおりです。
- Postgres で読み取り専用で、ソースでデータの完全性を維持します。
- マネージド Lakeflow 宣言型パイプラインを使用して自動的に同期されます
- 標準のPostgreSQLインターフェースを介してクエリ可能
- Unity Catalog によるガバナンスとライフサイクル管理による管理
始める前に
- いずれかのカタログにUnity Catalogテーブルがあること。
- データベース・インスタンスに対する
CAN USE
権限があること。
同期テーブルの作成
- UI
- curl
Unity Catalog テーブルを Postgres に同期するには、次の操作を行います。
-
ワークスペースのサイドバーで カタログ をクリックします。
-
同期テーブルを作成するUnity Catalogテーブルを見つけて選択します。
-
作成 > 同期テーブル をクリックします。
-
カタログとスキーマを選択し、新しい同期テーブルのテーブル名を入力します。
- 同期テーブルは、いくつかの追加設定を使用して 、標準カタログでも作成できます。標準カタログとスキーマを選択し、新しく作成した同期テーブルのテーブル名を入力します。
-
データベースインスタンスを選択し、同期テーブルを作成するPostgresデータベースの名前を入力します。Postgres データベース フィールドは、デフォルトで現在選択されているターゲット カタログになります。この名前で Postgres データベースが存在しない場合、Databricks は新しいデータベースを作成します。
-
[プライマリキー ] を選択します。プライマリ・キーは、読み取り、更新、および削除のためにローに効率的にアクセスできるようにする ために必要です 。
-
必要に応じて、 時系列キー を選択します。ソーステーブルで 2 つの行のプライマリキーが同じ場合は、時系列キーを使用して重複排除を設定します。
-
同期モードを スナップショット 、 トリガー 、 連続 から選択します。すべての同期モードで、ソース テーブル全体が Postgres に読み取られ、Postgres に書き込まれます。
ポリシー
説明
スナップショット
パイプラインは 1 回実行され、ソース テーブルのスナップショットを取得し、同期されたテーブルにコピーします。ソーステーブルに対する後続の変更は、ソースの新しいスナップショットを作成し、新しいコピーを作成することで、同期されたテーブルに自動的に反映されます。同期されたテーブルの内容はアトミックに更新されます。
トリガー
パイプラインは 1 回実行され、同期されたテーブルにソース テーブルの初期スナップショット コピーが作成されます。スナップショット同期モードとは異なり、同期されたテーブルが更新されると、最後のパイプライン実行以降の変更のみが取得され、同期されたテーブルに適用されます。増分更新は、手動でトリガーすることも、スケジュールに従って自動的にトリガーすることもできます。
連続
パイプラインは継続的に実行されます。ソース テーブルに対する後続の変更は、リアルタイム ストリーミング モードで同期されたテーブルに段階的に適用されます。手動で更新する必要はありません。
トリガー 同期モードまたは 連続 同期モードをサポートするには、ソース テーブルでチェンジデータフィードが有効になっている必要があります。
-
この同期テーブルを新規または既存のパイプラインから作成するかどうかを選択します。
- 新しいパイプラインを作成し、管理カタログを使用する場合は、ステージング テーブルのストレージの場所を選択する必要があります。標準カタログを使用している場合、ステージング・テーブルは自動的にカタログに格納されます。
- 既存のパイプラインを使用している場合は、新しい同期モードがパイプライン モードと一致していることを確認します。
-
同期済みテーブルのステータス が オンライン になったら、データベースインスタンスにログインし、新しく作成したテーブルをクエリします。SQL エディタ、外部ツール、またはノートブックを使用してテーブルをクエリします。
データベース・カタログに同期テーブルを作成します。
export CATALOG_NAME=<Database catalog>
export SRC_TBL="source_catalog.source_schema.source_table"
export DEST_TBL="$CATALOG_NAME.some_schema.synced_table"
export PKS='["id"]'
export ST_CATALOG = "storage_catalog"
export ST_SCHEMA = "storage_schema"
curl -X POST https://$WORKSPACE/api/2.0/database/synced_tables \
-H "Content-Type: text/json" \
-H "Authorization: Bearer ${DATABRICKS_TOKEN}" \
--data-binary @- << EOF
{
"name": "$DEST_TBL",
"spec": {
"source_table_full_name": "$SRC_TBL",
"primary_key_columns": $PKS,
"scheduling_policy": "TRIGGERED",
},
"new_pipeline_spec": {
"storage_catalog": "$ST_CATALOG",
"storage_schema": "$ST_SCHEMA",
}
}
EOF
同期テーブルを標準の Unity Catalog カタログに作成します。
export CATALOG_NAME=<Standard catalog>
export DATABASE_INSTANCE=<database instance>
export POSTGRES_DATABASE=$CATALOG_NAME
export SRC_TBL="source_catalog.source_schema.source_table"
export DEST_TBL="$CATALOG_NAME.some_schema.sync_table"
export PKS='["id"]'
export ST_CATALOG = "storage_catalog"
export ST_SCHEMA = "storage_schema"
curl -X POST https://$WORKSPACE/api/2.0/database/synced_tables \
-H "Content-Type: text/json" \
-H "Authorization: Bearer ${DATABRICKS_TOKEN}" \
--data-binary @- << EOF
{
"name": "$DEST_TBL",
"database_instance_name": "$DATABASE_INSTANCE",
"logical_database_name": "$POSTGRES_DATABASE",
"spec": {
"source_table_full_name": "$SRC_TBL",
"primary_key_columns": $PKS,
"scheduling_policy": "TRIGGERED",
},
"new_pipeline_spec": {
"storage_catalog": "$ST_CATALOG",
"storage_schema": "$ST_SCHEMA",
}
}
EOF
同期されたテーブルのステータスを確認します。
export SYNCEDTABLE='pg_db.silver.sbtest1_online'
curl --request GET \
"https://e2-dogfood.staging.cloud.databricks.com/api/2.0/database/synced_tables/$SYNCEDTABLE" \
--header "Authorization: Bearer dapi..."
サポートされている操作
同期されたテーブルに対して Postgres 側でサポートされる操作のセットは限られています。
- 読み取り専用クエリ
- インデックスの作成
- テーブルの削除 (同期されたテーブルを Unity Catalogから削除した後でスペースを解放するため)
このテーブルは他の方法で変更できますが、同期パイプラインに干渉します。
同期されたテーブルの削除
同期されたテーブルを削除するには、Unity Catalog からテーブルを削除してから、データベース インスタンス内のテーブルを削除する必要があります。同期されたテーブルを Unity Catalog から削除すると、テーブルの登録が解除され、データの更新が停止します。ただし、テーブルは基になる Postgres データベースに残ります。データベース・インスタンスのスペースを解放するには、インスタンスに接続し、 DROP TABLE
コマンドを使用します。
-
ワークスペースのサイドバーで カタログ をクリックします。
-
削除する同期テーブルを見つけて選択します。
-
> 削除 をクリックします。
-
インスタンスには、
psql
、SQL エディター、またはノートブックから接続します。 -
PostgreSQL を使用してテーブルを削除します。
PsqlDROP TABLE synced_table_database.synced_table_schema.synced_table
所有権と権限
新しい Postgres データベース、スキーマ、またはテーブルを作成する場合、Postgres の所有権は次のように設定されます。
- 所有権は、データベース、スキーマ、またはテーブルを作成するユーザーに割り当てられます (Databricks ログインが Postgres にロールとして存在する場合)。Postgres で Databricks ID ロールを追加するには、「 Databricks ID の Postgres ロールを作成および管理する」を参照してください。
- それ以外の場合、所有権はPostgresの親オブジェクトの所有者(通常は
databricks_superuser
)に割り当てられます。
同期テーブルへのアクセスを管理する
同期テーブルが作成された後、 databricks_superuser
は Postgres から同期テーブルを READ
できます。databricks_superuser
には、pg_read_all_data
権限とpg_write_all_data
権限があります。
-
databricks_superuser
は、次の権限を他のユーザーに付与することもできます。PsqlGRANT USAGE ON SCHEMA synced_table_schema TO user;
PsqlGRANT SELECT ON synced_table_name TO user;
-
databricks_superuser
は、次の特権を取り消すことができます。PsqlREVOKE USAGE ON SCHEMA synced_table_schema FROM user;
PsqlREVOKE {SELECT | INSERT | UPDATE | DELETE} ON synced_table_name FROM user;
同期されたテーブルの操作を管理する
databricks_superuser
は、同期されたテーブルで特定の操作を実行する権限を持つユーザーを管理できます。同期されたテーブルでサポートされている操作は次のとおりです。
CREATE INDEX
ALTER INDEX
DROP INDEX
DROP TABLE
同期されたテーブルに対する他のすべての DDL 操作は拒否されます。
これらの権限を追加のユーザーに付与するには、 databricks_superuser
まず databricks_auth
で拡張機能を作成する必要があります。
CREATE EXTENSION IF NOT EXISTS databricks_auth;
その後、 databricks_superuser
同期テーブルを管理するユーザーを追加できます。
SELECT databricks_synced_table_add_manager('"synced_table_schema"."synced_table"'::regclass, '[user]');
databricks_superuser
は、同期されたテーブルの管理からユーザーを削除できます。
SELECT databricks_synced_table_remove_manager('[table]', '[user]');
databricks_superuser
は、すべてのマネージャーを表示できます。
SELECT * FROM databricks_synced_table_managers;
無効な文字の処理
ヌル バイト (0x00) などの特定の文字は、Delta テーブルの STRING
、 ARRAY
、 MAP
、または STRUCT
列で使用できますが、Postgres の TEXT
列や JSONB
列ではサポートされていません。その結果、このようなデータを Delta から Postgres に同期すると、エラーを伴う挿入エラーが発生する可能性があります。
org.postgresql.util.PSQLException: ERROR: invalid byte sequence for encoding "UTF8": 0x00
org.postgresql.util.PSQLException: ERROR: unsupported Unicode escape sequence DETAIL: \u0000 cannot be converted to text.
- 最初のエラーは、ヌルバイトが最上位の文字列列に表示され、Postgres
TEXT
に直接マップされる場合に発生します。 - 2 番目のエラーは、Databricks が
JSONB
としてシリアル化する複合型 (STRUCT
、ARRAY
、またはMAP
) 内にネストされた文字列に null バイトが出現する場合に発生します。シリアル化中、すべての文字列は PostgresTEXT
にキャストされ、\u0000
は許可されません。
解決方法:
この問題は、次のいずれかの方法で対処できます。
-
文字列フィールドをサニタイズ
Postgres に同期する前に、複合型内の文字列を含むすべての文字列フィールドからサポートされていない文字を削除または置換します。
最上位の
STRING
列から null バイトを削除するには、REPLACE
関数を使用します。SQLSELECT REPLACE(column_name, CAST(CHAR(0) AS STRING), '') AS cleaned_column FROM your_table;
-
バイナリに変換 (
STRING
列のみ)未加工のバイト内容を保持する必要がある場合は、影響を受ける
STRING
列をBINARY
に変換します。
制限事項と考慮事項
名前付けと識別子の制限
- 使用できる文字: 同期されたテーブルの Postgres データベース名、スキーマ名、およびテーブル名には、英数字とアンダースコア (
[A-Za-z0-9_]+
) のみを含めることができます。ハイフン (-
) およびその他の特殊文字はサポートされていません。 - 列とテーブルの識別子: ソース Unity Catalog テーブルの列名またはテーブル名に大文字や特殊文字を使用しないでください。 保持する場合は、Postgres で参照するときにこれらの識別子を引用符で囲む必要があります。
パフォーマンスと同期
- 同期速度: 同期されたテーブルへのデータの同期は、追加の処理により、ネイティブの PostgreSQL クライアントを使用してデータベース インスタンスに直接データを書き込むよりも遅くなる可能性があります。連続同期モードは、 Unity Catalog マネージドテーブルから同期テーブルにデータを最小 15 秒間隔で更新します。
- 接続の使用状況: 各テーブルの同期では、データベースインスタンスへの最大 16 個の接続を使用できます。これは、インスタンスの接続制限にカウントされます。
- APIべき等性: 同期されたテーブルAPIsはべき等であり、エラーが発生した場合は、タイムリーな操作を確保するために再試行が必要になる場合があります。
- スキーマの変更:
Triggered
モードまたはContinuous
モードの同期テーブルの場合、Unity Catalog テーブルからのスキーマの追加変更 (新しい列の追加など) のみが同期テーブルに反映されます。 - 重複するキー: ソース テーブルで 2 つの行の主キーが同じ場合、Timeseries キーを使用して重複除去を構成しない限り、同期パイプラインは失敗します。ただし、時系列キーを使用すると、パフォーマンスが低下します。
- 更新レート: 同期パイプラインは、キャパシティーユニット (CU) あたり約 1,200 行/秒の連続書き込みと、CU あたり最大 15,000 行/秒の一括書き込みをサポートします。
容量と制限
-
テーブルの制限:
- 同期テーブルは、ソース テーブルごとに 20 個に制限されています。
- 各テーブルの同期では、最大 16 のデータベース接続を使用できます。
-
サイズ制限と完全更新:
- 同期されたテーブルをフル更新した場合、新しいテーブルが同期されるまで、Postgres の古いバージョンは削除されません。 どちらのバージョンも、更新中に一時的に論理データベース・サイズ制限にカウントされます。
- 同期される各テーブルは最大 2 TB です。完全なテーブル再作成の代わりに更新が必要な場合、制限は 1 TB です。
- Unity Catalog テーブルの非圧縮の行形式サイズがデータベース インスタンス サイズの制限 (2 TB) を超えると、同期は失敗します。インスタンスにさらに書き込む前に、同期されたテーブルをPostgresで削除する必要があります。
カタログ統合
- カタログの複製: 別のデータベース カタログとしても登録されている Postgres データベースを対象とする標準カタログに同期テーブルを作成すると、同期されたテーブルは Unity Catalog の標準カタログとデータベース カタログの両方に表示されます。