Sincronizar dados das tabelas do Unity Catalog com uma instância de banco de dados
Visualização
Esse recurso está em Public Preview nas seguintes regiões: us-east-1
, us-west-2
, eu-west-1
, ap-southeast-1
, ap-southeast-2
, eu-central-1
, us-east-2
, ap-south-1
.
Esta página descreve como criar e gerenciar uma tabela sincronizada. Uma tabela sincronizada é uma tabela Postgres somente de leitura do Unity Catalog que sincroniza automaticamente os dados de uma tabela do Unity Catalog com a instância do banco de dados do Lakebase. A sincronização de uma tabela Unity Catalog no Postgres permite consultas de leitura de baixa latência e oferece suporte à união em tempo de consulta com outras tabelas do Postgres.
A sincronização é tratada por um gerenciador LakeFlow declarativo pipeline que atualiza continuamente a tabela do Postgres com as alterações da tabela de origem. Após a criação, as tabelas sincronizadas podem ser consultadas diretamente usando as ferramentas do Postgres.
As características key das tabelas sincronizadas são as seguintes:
- Somente leitura no Postgres para manter a integridade dos dados com a fonte
- Sincronizado automaticamente usando o gerenciar LakeFlow Pipeline declarativo
- Pode ser consultado por meio de interfaces padrão do PostgreSQL
- gerenciar por meio do site Unity Catalog para governança e gerenciamento do ciclo de vida
Antes de começar
- O senhor tem uma tabela do Unity Catalog em qualquer catálogo.
- Você tem permissões
CAN USE
na instância do banco de dados.
Crie uma tabela sincronizada
- UI
- Python SDK
- CLI
- curl
Para sincronizar uma tabela do Unity Catalog com o Postgres, faça o seguinte:
-
Clique em Catalog na barra lateral workspace.
-
Encontre e selecione a tabela do Unity Catalog na qual o senhor deseja criar uma tabela sincronizada.
-
Clique em Criar tabela sincronizada >.
-
Selecione seu catálogo, esquema e insira um nome de tabela para a nova tabela sincronizada.
- Tabelas sincronizadas também podem ser criadas em catálogos padrão, com algumas configurações adicionais. Selecione seu catálogo padrão, um esquema e insira um nome de tabela para a tabela sincronizada recém-criada.
-
Selecione uma instância de banco de dados e insira o nome do banco de dados Postgres no qual criar a tabela sincronizada. O campo do banco de dados Postgres será default para o catálogo de destino selecionado no momento. Se não houver um banco de dados Postgres com esse nome, o Databricks criará um novo.
-
Selecione uma chave primária . É necessário um site primário,key pois ele permite o acesso eficiente às linhas para leituras, atualizações e exclusões.
-
Se duas linhas tiverem o mesmo primário key na tabela de origem, selecione uma chave Timeseries para configurar a deduplicação. Quando uma chave Timeseries é especificada, as tabelas sincronizadas contêm apenas as linhas com o valor mais recente da timeseries key para cada primário key.
-
Selecione o modo de sincronização entre Snapshot , Acionado e Contínuo . Para obter mais informações sobre cada modo de sincronização, consulte Explicação dos modos de sincronização.
-
Escolha se o senhor deseja criar essa tabela sincronizada a partir de um pipeline novo ou existente.
- Se estiver criando um novo pipeline e usando um catálogo gerenciar, escolha o local de armazenamento para a tabela de preparação. Se estiver usando um catálogo padrão, a tabela intermediária será automaticamente armazenada no catálogo.
- Se estiver usando um pipeline existente, verifique se o novo modo de sincronização corresponde ao modo do pipeline.
-
(Opcional) Selecione uma política de orçamento sem servidor . Para criar uma política de orçamento serverless, consulte Uso de atributos com políticas de orçamento serverless. Isso permite atribuir o uso do faturamento a políticas de uso específicas.
- Para tabelas sincronizadas, a entidade faturável é o pipeline declarativo subjacente LakeFlow pipeline. Para modificar a política de orçamento, modifique o objeto de pipeline subjacente. Consulte Configurar um serverless pipeline .
-
Depois que o status da tabela sincronizada for Online , acesse log in na instância do banco de dados e consulte a tabela recém-criada. Consulte sua tabela usando o editorSQL, ferramentas externas ou o Notebook.
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.database import SyncedDatabaseTable, SyncedTableSpec, NewPipelineSpec, SyncedTableSchedulingPolicy
# Initialize the Workspace client
w = WorkspaceClient()
# Create a synced table in a database catalog
synced_table = w.database.create_synced_database_table(
SyncedDatabaseTable(
name="database_catalog.schema.synced_table", # Full three-part name
spec=SyncedTableSpec(
source_table_full_name="source_catalog.source_schema.source_table",
primary_key_columns=["id"], # Primary key columns
scheduling_policy=SyncedTableSchedulingPolicy.TRIGGERED, # SNAPSHOT, TRIGGERED, or CONTINUOUS
# Optional: timeseries_key="timestamp" # For deduplication
new_pipeline_spec=NewPipelineSpec(
storage_catalog="storage_catalog",
storage_schema="storage_schema"
)
),
)
)
print(f"Created synced table: {synced_table.name}")
# Create a synced table in a standard UC catalog
synced_table = w.database.create_synced_database_table(
SyncedDatabaseTable(
name="standard_catalog.schema.synced_table", # Full three-part name
database_instance_name="my-database-instance", # Required for standard catalogs
logical_database_name="postgres_database", # Required for standard catalogs
spec=SyncedTableSpec(
source_table_full_name="source_catalog.source_schema.source_table",
primary_key_columns=["id"],
scheduling_policy=SyncedTableSchedulingPolicy.CONTINUOUS,
create_database_objects_if_missing=True, # Create database/schema if needed
new_pipeline_spec=NewPipelineSpec(
storage_catalog="storage_catalog",
storage_schema="storage_schema"
)
),
)
)
print(f"Created synced table: {synced_table.name}")
# Check the status of a synced table
synced_table_name = "database_catalog.schema.synced_table"
status = w.database.get_synced_database_table(name=synced_table_name)
print(f"Synced table status: {status.data_synchronization_status.detailed_state}")
print(f"Status message: {status.data_synchronization_status.message}")
# Create a synced table in a database catalog
databricks database create-synced-database-table \
--json '{
"spec": {
"name": "database_catalog.schema.synced_table",
"source_table_full_name": "source_catalog.source_schema.source_table",
"primary_key_columns": ["id"],
"scheduling_policy": "TRIGGERED"
},
"new_pipeline_spec": {
"storage_catalog": "storage_catalog",
"storage_schema": "storage_schema"
}
}'
# Create a synced table in a standard UC catalog
# new_pipeline_spec, storage_catalog, and storage_schema are optional
databricks database create-synced-database-table \
--database-instance-name "my-database-instance" \
--logical-database-name "databricks_postgres" \
--json '{
"name": "standard_catalog.schema.synced_table",
"spec": {
"source_table_full_name": "source_catalog.source_schema.source_table",
"primary_key_columns": ["id"],
"scheduling_policy": "CONTINUOUS",
"create_database_objects_if_missing": true
}
}'
# Check the status of a synced table
databricks database get-synced-database-table "database_catalog.schema.synced_table"
Crie uma tabela sincronizada em um catálogo de banco de dados.
export CATALOG_NAME=<Database catalog>
export SRC_TBL="source_catalog.source_schema.source_table"
export DEST_TBL="$CATALOG_NAME.some_schema.synced_table"
export PKS='["id"]'
export ST_CATALOG = "storage_catalog"
export ST_SCHEMA = "storage_schema"
curl -X POST https://$WORKSPACE/api/2.0/database/synced_tables \
-H "Content-Type: text/json" \
-H "Authorization: Bearer ${DATABRICKS_TOKEN}" \
--data-binary @- << EOF
{
"name": "$DEST_TBL",
"spec": {
"source_table_full_name": "$SRC_TBL",
"primary_key_columns": $PKS,
"scheduling_policy": "TRIGGERED",
},
"new_pipeline_spec": {
"storage_catalog": "$ST_CATALOG",
"storage_schema": "$ST_SCHEMA",
}
}
EOF
Crie uma tabela sincronizada em um catálogo padrão do Unity Catalog.
export CATALOG_NAME=<Standard catalog>
export DATABASE_INSTANCE=<database instance>
export POSTGRES_DATABASE=$CATALOG_NAME
export SRC_TBL="source_catalog.source_schema.source_table"
export DEST_TBL="$CATALOG_NAME.some_schema.sync_table"
export PKS='["id"]'
export ST_CATALOG = "storage_catalog"
export ST_SCHEMA = "storage_schema"
curl -X POST https://$WORKSPACE/api/2.0/database/synced_tables \
-H "Content-Type: text/json" \
-H "Authorization: Bearer ${DATABRICKS_TOKEN}" \
--data-binary @- << EOF
{
"name": "$DEST_TBL",
"database_instance_name": "$DATABASE_INSTANCE",
"logical_database_name": "$POSTGRES_DATABASE",
"spec": {
"source_table_full_name": "$SRC_TBL",
"primary_key_columns": $PKS,
"scheduling_policy": "TRIGGERED",
},
"new_pipeline_spec": {
"storage_catalog": "$ST_CATALOG",
"storage_schema": "$ST_SCHEMA",
}
}
EOF
Verifique o status de uma tabela sincronizada.
export SYNCEDTABLE='pg_db.silver.sbtest1_online'
curl --request GET \
"https://e2-dogfood.staging.cloud.databricks.com/api/2.0/database/synced_tables/$SYNCEDTABLE" \
--header "Authorization: Bearer dapi..."
Modos de sincronização explicados
Uma tabela sincronizada pode ser criada com um dos seguintes modos de sincronização, que determinam como os dados são sincronizados da fonte para a tabela sincronizada no Postgres:
Modo de sincronização | Descrição | Desempenho |
---|---|---|
Snapshot | A pipeline execução uma vez para tirar um Snapshot da tabela de origem e copiá-lo para a tabela sincronizada. pipeline e execução subsequentes copiam todos os dados de origem para o destino e substituem-nos no local de forma atômica. O sensor de pressão de combustível ( pipeline ) pode ser acionado manualmente, através de um sensor de pressão de combustível ( API ) ou em um programa. | Esse modo é 10 vezes mais eficiente do que os modos de sincronização acionada ou contínua, pois recria dados do zero. Se você estiver modificando mais de 10% da tabela de origem, considere usar esse modo. |
Acionado | A pipeline execução uma vez para tirar um Snapshot da tabela de origem e copiá-lo para a tabela sincronizada. Diferentemente do modo de sincronização Snapshot, quando a tabela sincronizada é atualizada, somente as alterações desde a última execução do pipeline são recuperadas e aplicadas à tabela sincronizada. A atualização incremental ( refresh ) pode ser acionada manualmente, através de uma solicitação de atualização ( API ) ou em um programa. | Este modo é um bom compromisso entre atraso e custo, pois é executado sob demanda e aplica apenas as alterações desde a última execução. Para minimizar o atraso, execute este comando pipeline imediatamente após atualizar a tabela de origem. Se executar isso com mais frequência do que a cada 5 minutos, poderá ser mais dispendioso do que o modo Contínuo devido ao custo de iniciar e parar o pipeline a cada vez. |
Contínuo | A execução do “ pipeline ” é realizada uma vez para obter um instantâneo da tabela de origem e copiá-lo para a tabela sincronizada, e a execução do “ pipeline ” é realizada continuamente. Alterações subsequentes na tabela de origem são aplicadas de forma incremental à tabela sincronizada em tempo real. Não é necessário realizar o e refresh e manual. | Esse modo tem o menor atraso, mas o custo mais alto, porque está em execução contínua. |
Para oferecer suporte ao modo de sincronização acionada ou contínua , a tabela de origem deve ter a opção Alterar feed de dados ativada. Certas fontes (como visualização) não suportam feed de dados alterados, portanto, só podem ser sincronizadas no modo “ Snapshot ”.
Operações suportadas
Somente um conjunto limitado de operações é suportado no lado do Postgres para tabelas sincronizadas:
- Consultas somente para leitura
- Criação de índices
- Eliminar a tabela (para liberar espaço depois de remover a tabela sincronizada do Unity Catalog)
Embora o senhor possa modificar essa tabela de outras maneiras, ela interfere no pipeline de sincronização.
Excluir uma tabela sincronizada
Para excluir uma tabela sincronizada, o senhor deve excluí-la do Unity Catalog e, em seguida, soltar a tabela na instância do banco de dados. A exclusão da tabela sincronizada do site Unity Catalog cancela o registro da tabela e interrompe qualquer atualização de dados. No entanto, a tabela permanece no banco de dados Postgres subjacente. Para liberar espaço na instância do banco de dados, conecte-se à instância e use o comando DROP TABLE
.
- UI
- Python SDK
- CLI
- curl
-
Clique em Catalog na barra lateral workspace.
-
Encontre e selecione a tabela sincronizada que você deseja excluir.
-
Clique em
> Excluir .
-
Conecte-se à instância com
psql
, o editor SQL ou a partir de um Notebook. -
Eliminar a tabela usando o PostgreSQL.
SQLDROP TABLE synced_table_database.synced_table_schema.synced_table
from databricks.sdk import WorkspaceClient
# Initialize the Workspace client
w = WorkspaceClient()
# Delete a synced table from UC
synced_table_name = "catalog.schema.synced_table"
w.database.delete_synced_database_table(name=synced_table_name)
print(f"Deleted synced table from UC: {synced_table_name}")
# To free up space in your database instance, you need to connect to the
# instance and drop the table using PostgreSQL:
#
# DROP TABLE synced_table_database.synced_table_schema.synced_table;
# Delete a synced table from UC
databricks database delete-synced-database-table "catalog.schema.synced_table"
# To free up space in your database instance, you need to connect to the
# instance and drop the table using PostgreSQL:
#
# DROP TABLE synced_table_database.synced_table_schema.synced_table;
# Delete a synced table from UC
curl -X DELETE --header "Authorization: Bearer ${DATABRICKS_TOKEN}" \
https://$WORKSPACE/api/2.0/database/synced_tables/$SYNCED_TABLE_NAME
# To free up space in your database instance, you need to connect to the
# instance and drop the table using PostgreSQL:
#
# DROP TABLE synced_table_database.synced_table_schema.synced_table;
Propriedade e permissões
Se você criar um novo banco de dados, esquema ou tabela Postgres, a propriedade do Postgres será definida da seguinte forma:
- A propriedade é atribuída ao usuário que cria o banco de dados, o esquema ou a tabela, se o login do Databricks existir como uma função no Postgres. Para adicionar uma função de identidade Databricks no Postgres, consulte Gerenciar funções do Postgres.
- Caso contrário, a propriedade é atribuída ao proprietário do objeto pai no Postgres (normalmente o
databricks_superuser
).
gerenciar o acesso à tabela sincronizada
Depois que uma tabela sincronizada é criada, o databricks_superuser
pode READ
uma tabela sincronizada do Postgres. O databricks_superuser
tem privilégios pg_read_all_data
e pg_write_all_data
:
-
O
databricks_superuser
também pode conceder esses privilégios a outros usuários:SQLGRANT USAGE ON SCHEMA synced_table_schema TO user;
SQLGRANT SELECT ON synced_table_name TO user;
-
O
databricks_superuser
pode revogar esses privilégios:SQLREVOKE USAGE ON SCHEMA synced_table_schema FROM user;
SQLREVOKE {SELECT | INSERT | UPDATE | DELETE} ON synced_table_name FROM user;
gerenciar operações de tabela sincronizada
O site databricks_superuser
pode gerenciar quais usuários estão autorizados a realizar operações específicas em uma tabela sincronizada. As operações suportadas para tabelas sincronizadas são as seguintes:
CREATE INDEX
ALTER INDEX
DROP INDEX
DROP TABLE
Todas as outras operações DDL são negadas para tabelas sincronizadas.
Para conceder esses privilégios a usuários adicionais, o databricks_superuser
deve primeiro criar uma extensão em databricks_auth
:
CREATE EXTENSION IF NOT EXISTS databricks_auth;
Em seguida, o databricks_superuser
pode adicionar um usuário para gerenciar uma tabela sincronizada:
SELECT databricks_synced_table_add_manager('"synced_table_schema"."synced_table"'::regclass, '[user]');
O databricks_superuser
pode remover um usuário do gerenciamento de uma tabela sincronizada:
SELECT databricks_synced_table_remove_manager('[table]', '[user]');
O site databricks_superuser
pode view todos os gerentes:
SELECT * FROM databricks_synced_table_managers;
Mapeamento do tipo de dados
Essa tabela de mapeamento de tipos define como cada tipo de dados na tabela de origem do Unity Catalog é mapeado para a tabela de sincronização de destino no Postgres:
Tipo de coluna de origem | Tipo de coluna Postgres |
---|---|
BigInt | |
POR CHÁ | |
Booleana | |
Data | |
NUMÉRICO | |
PRECISÃO DUPLA | |
REAL | |
Integer | |
INTERVALO | |
PEQUENININHO | |
TEXT | |
CARIMBO DE DATA/HORA COM FUSO HORÁRIO | |
TIMESTAMP SEM FUSO HORÁRIO | |
PEQUENININHO | |
Não suportado | |
Não suportado | |
JSONB | |
JSONB | |
STRUCT\ < [Nome do campo: Tipo de campo [NÃO NULO] [COMMENT str] [,...]] > | JSONB |
Não suportado | |
Não suportado |
- O mapeamento dos tipos ARRAY, MAP e STRUCT foi alterado em maio de 2025. As tabelas de sincronização criadas antes disso continuam a mapear esses tipos para JSON.
- O mapeamento do TIMESTAMP foi alterado em agosto de 2025. As tabelas de sincronização criadas antes disso continuam a mapeá-las para TIMESTAMP SEM FUSO HORÁRIO.
Manipular caracteres inválidos
Certos caracteres, como o byte nulo (0x00), são permitidos nas colunas STRING
, ARRAY
, MAP
ou STRUCT
nas tabelas Delta, mas não são compatíveis com as colunas TEXT
ou JSONB
do Postgres. Como resultado, a sincronização desses dados do Delta para o Postgres pode levar a falhas de inserção com erros:
org.postgresql.util.PSQLException: ERROR: invalid byte sequence for encoding "UTF8": 0x00
org.postgresql.util.PSQLException: ERROR: unsupported Unicode escape sequence DETAIL: \u0000 cannot be converted to text.
- O primeiro erro ocorre quando um byte nulo aparece em uma coluna de strings de nível superior, que mapeia diretamente para o Postgres
TEXT
. - O segundo erro ocorre quando um byte nulo aparece em uma cadeia de caracteres aninhada dentro de um tipo complexo (
STRUCT
,ARRAY
, ouMAP
), que Databricks serializa comoJSONB
. Durante a serialização, todas as strings são convertidas para PostgresTEXT
, onde\u0000
não é permitido.
Como resolver:
Você pode resolver esse problema de uma das seguintes maneiras:
-
Sanitizar campos de strings
Remova ou substitua caracteres não suportados de todos os campos de strings, inclusive aqueles dentro de tipos complexos, antes de sincronizar com o Postgres.
Para remover bytes nulos de uma coluna
STRING
de nível superior, use a funçãoREPLACE
:SQLSELECT REPLACE(column_name, CAST(CHAR(0) AS STRING), '') AS cleaned_column FROM your_table;
-
Converter em binário (somente para colunas
STRING
)Se for necessário preservar o conteúdo bruto de bytes, converta as colunas
STRING
afetadas emBINARY
.
Limitações e considerações
Tabelas de origem suportadas
Dependendo do modo de sincronização de uma tabela sincronizada, diferentes tipos de tabelas de origem são suportados:
-
Para o modo Snapshot, a tabela de origem deve ser compatível com
SELECT *
. Os exemplos incluem Delta tables, Iceberg tables, view, materialized view e outros tipos semelhantes. -
Para os modos de sincronização acionada ou contínua, a tabela de origem também deve ter o feed de dados de alteração ativado.
Limitações de nomenclatura e identificador
- Caracteres permitidos: Os nomes de bancos de dados, esquemas e tabelas do Postgres para tabelas sincronizadas só podem conter caracteres alfanuméricos e sublinhado (
[A-Za-z0-9_]+
). Não há suporte para hífens (-
) e outros caracteres especiais. - Identificadores de colunas e tabelas: Evite usar letras maiúsculas ou caracteres especiais em nomes de colunas ou tabelas na tabela de origem do Unity Catalog. Se mantidos, você precisará citar esses identificadores ao referenciá-los no Postgres.
desempenho e sincronização
- Velocidade de sincronização: A sincronização de dados em tabelas sincronizadas pode ser mais lenta do que gravar dados diretamente na instância do banco de dados com um cliente PostgreSQL nativo devido ao processamento adicional. O modo de sincronização contínua atualiza os dados da tabela gerenciar Unity Catalog para a tabela sincronizada em um intervalo mínimo de 15 segundos.
- Uso da conexão: Cada sincronização de tabela pode usar até 16 conexões com a instância do banco de dados, que contam para o limite de conexão da instância.
- Idempotência da API: As APIs de tabela sincronizada são idempotentes e podem precisar ser repetidas se ocorrerem erros para garantir operações oportunas.
- Alterações de esquema: Para tabelas sincronizadas no modo
Triggered
ouContinuous
, somente alterações de esquema aditivas (por exemplo, adição de uma nova coluna) das tabelas do Unity Catalog são refletidas na tabela sincronizada. - Chave duplicada: Se duas linhas tiverem a mesma chave primária key na tabela de origem, a sincronização pipeline falhará, a menos que o senhor configure a deduplicação usando uma chave Timeseries. No entanto, o uso de uma chave Timeseries tem uma penalidade de desempenho.
- Taxa de atualização: O pipeline de sincronização suporta gravações contínuas de aproximadamente 1.200 linhas por segundo por Unidade de Capacidade (CU) e gravações em massa de até 15.000 linhas por segundo por CU.
Capacidade e limites
-
Limites da tabela:
- Limite de 20 tabelas sincronizadas por tabela de origem.
- Cada sincronização de tabela pode usar até 16 conexões de banco de dados.
-
Limites de tamanho e refresh completo:
- Se o senhor preencherrefresh uma tabela sincronizada, a versão antiga no Postgres não será excluída até que a nova tabela seja sincronizada. Ambas as versões contam temporariamente para o limite de tamanho do banco de dados lógico durante o refresh.
- Cada tabela sincronizada pode ter até 2 TB. Se o senhor precisar de atualização em vez de recriação completa da tabela, o limite será de 1 TB.
- Se o tamanho da tabela do Unity Catalog não compactado e em formato de linha exceder o limite de tamanho da instância do banco de dados (2 TB), a sincronização falhará. Você deve descartar a tabela sincronizada no Postgres antes de escrever mais na instância.
Integração de catálogos
- Duplicação de catálogo: A criação de uma tabela sincronizada em um catálogo padrão direcionado a um banco de dados Postgres que também esteja registrado como um catálogo de banco de dados separado fará com que a tabela sincronizada apareça no Unity Catalog nos catálogos padrão e de banco de dados.