Pular para o conteúdo principal

Sincronização da casa do lago

info

O dimensionamento automático do Lakebase está disponível nas seguintes regiões: us-east-1, us-east-2, us-west-2, ca-central-1, sa-east-1, eu-central-1, eu-west-1, eu-west-2, ap-south-1, ap-southeast-1, ap-southeast-2.

O Lakebase autoscale é a versão mais recente do Lakebase, com recursos como autoscale compute, escala-to-zero, branching e instant restore. Se você é usuário de provisionamento Lakebase, consulte Provisionamento Lakebase.

nota

O Lakehouse Sync está em versão Beta.

O que é o Lakehouse Sync?

O lakehouse Sync permite a replicação contínua e de baixa latência de suas tabelas Lakebase Postgres no Unity Catalog , gerenciando tabelas Delta ao capturar alterações em nível de linha e gravá-las como SCD Tipo 2. Cada alteração é adicionada como uma nova linha, permitindo manter um histórico completo de como as linhas mudaram ao longo do tempo. Essa sincronização não requer nenhum compute externo, pipeline ou tarefa. É um recurso nativo do Lakebase.

Sincronizar o fluxo de dados de aplicativos através do Lakebase para tabelas do Lakehouse no Unity Catalog

lakehouse Sync usa captura de dados de alterações (CDC) (CDC) para transmitir alterações de seu banco de dados Lakebase Postgres para o Unity Catalog. As tabelas Delta são nomeadas seguindo o formato lb_<table_name>_history no catálogo e esquema escolhidos. Cada alteração (inserção, atualização, exclusão) é adicionada como uma linha, permitindo que você mantenha um histórico completo de como seus dados evoluíram ao longo do tempo.

Exemplos de casos de uso

A seguir, apresentamos exemplos de casos de uso para o Lakehouse Sync, onde você transmite dados de alteração do seu banco de dados transacional Lakebase para o lakehouse.

Caso de uso

Descrição

Análise rápida

agregações de execução e análises sobre dados do Lakebase.

Fonte do medalhão

Utilize o Lakebase como fonte para a arquitetura do seu medalhão. As tabelas Delta podem ser processadas com o pipeline Databricks , o pipeline declarativoSpark (SDP) ou as tabelasDelta ativas (DLT) para construir tabelas subsequentes.

História completa na lakehouse

Preserve o histórico completo de todas as mudanças na lakehouse , mantendo opcionalmente apenas um subconjunto dos dados no Lakebase.

Requisitos

  • Lakebase autoscale: Um projeto Lakebase autoscale executando o Postgres 17.
  • Banco de dados de origem: As tabelas devem residir no banco de dados databricks_postgres no Lakebase (limitação beta). Cada projeto é criado com este banco de dados default .
  • Tipos de dados: As tabelas devem usar apenas tipos de dados de coluna suportados. Tipos não suportados fazem com que a sincronização falhe para essa tabela.
  • Unity Catalog: A identidade que configura a sincronização precisa de USE CATALOG , USE SCHEMA e CREATE TABLE no catálogo e esquema de destino. Consulte Conceder permissões a um objeto.
  • Projeto Lakebase: Sua função no Postgres requer permissões CAN MANAGE no projeto Lakebase do qual você está sincronizando. Se a sua identidade for proprietária do projeto Lakebase, ela terá permissões CAN MANAGE por default. Consulte as permissões de gerenciamento do projeto.

Para começar: defina a identidade de réplica completa nas tabelas que deseja sincronizar (passo 1) e, em seguida, inicie a sincronização no aplicativo Lakebase (passo 2). Seus dados aparecem como tabelas lb_<table_name>_history no catálogo Unity Catalog e no esquema que você escolher. Se você quiser saber mais sobre como o Lakehouse Sync funciona antes de começar, consulte Como funciona o Lakehouse Sync.

Configurando a sincronização

o passo 1: Definir a identidade da réplica completa

Para que uma tabela do Lakebase seja sincronizada com sucesso, ela deve ter a identidade de réplica definida como completa. Você pode configurar a sincronização em um esquema vazio ou em um que já contenha tabelas. Tabelas particionadas não são suportadas.

Por default, o Postgres logs apenas a key primária quando uma linha é atualizada ou excluída. A configuração REPLICA IDENTITY FULL instrui o Postgres a registrar o estado completo da linha antes e depois no log de escrita antecipada. Isso é necessário para que o Lakehouse Sync possa construir um histórico de atualizações completo.

Você pode executar o seguinte comando no Editor SQL do Lakebase ou em qualquer cliente Postgres. Este exemplo utiliza o Editor SQL do Lakebase. Para abrir: no seu workspace Databricks , abra o Lakebase Postgres no seletor de aplicativos (canto superior direito), selecione seu projeto e a ramificação que deseja sincronizar (por exemplo, produção ou principal ), selecione EditorSQL na barra lateral e escolha a ramificação e o banco de dados. Consulte a seção "Consulta no Editor SQL do Lakebase" para obter detalhes.

Para definir a identidade da réplica como completa em uma única tabela, execute o seguinte comando:

SQL
ALTER TABLE <table_name> REPLICA IDENTITY FULL;

Substitua <table_name> pelo nome da sua tabela.

  • Tabelas existentes: execute este comando em todas as tabelas existentes no esquema antes de iniciar a sincronização.
  • Novas tabelas: Para tabelas criadas após a configuração da sincronização, execute este comando antes de inserir quaisquer dados. As linhas inseridas antes da definição desta propriedade não serão sincronizadas.

Verifique quais tabelas têm identidade de réplica definida.

Para ver quais tabelas têm identidade de réplica definida (e quais são full), execute o seguinte comando no Lakebase:

SQL
SELECT n.nspname AS table_schema,
c.relname AS table_name,
CASE c.relreplident
WHEN 'd' THEN 'default'
WHEN 'n' THEN 'nothing'
WHEN 'f' THEN 'full'
WHEN 'i' THEN 'index'
END AS replica_identity
FROM pg_class c
JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE c.relkind = 'r'
AND n.nspname = 'public'
ORDER BY n.nspname, c.relname;

Altere n.nspname = 'public' para o nome do seu esquema, se for diferente. Somente as linhas com replica_identity = full estão prontas para a sincronização.

o passo 2: comece o Lakehouse Sync

O Lakehouse Sync é configurado no nível do esquema. Uma vez configurada, todas as tabelas atuais e futuras desse esquema são sincronizadas com o Unity Catalog.

  1. No seu workspace Databricks , abra o Lakebase Postgres no seletor de aplicativos (canto superior direito).

  2. Selecione seu projeto Lakebase e a ramificação que deseja sincronizar (por exemplo, produção ou principal ).

  3. Abra a visão geral do Branch e clique na tab de sincronização do Lakehouse .

  4. Clique em Iniciar sincronização .

  5. Na caixa de diálogo de configuração:

    • Banco de dados: valor padrão databricks_postgres.
    • Esquema: Selecione o esquema Postgres de origem a ser sincronizado.
    • Para catalogar: Selecione o catálogo Unity Catalog de destino.
    • Esquema: Selecione o esquema de destino Unity Catalog .
  6. Clique em Iniciar sincronização para começar a sincronizar os dados.

Visão geral da ramificação com tab de sincronização do Lakehouse mostrando o início da sincronização e a configuração do esquema.

As tabelas aparecem no catálogo e esquema escolhidos do Unity Catalog como lb_<table_name>_history. No ambiente virtual (lakehouse), abra o Catálogo na barra lateral, acesse o catálogo e o esquema de destino e, em seguida, abra a tab Tabelas na Visão Geral do esquema para visualizar as tabelas Delta . Na tab de sincronização da casa no Lakebase, você pode confirmar o status e verificar o que está sendo sincronizado.

O que você vê na tabde sincronização da casa no lago.

Quando a sincronização está ativada, a parte superior da tab exibe o Status: Sincronizando e indica que as alterações estão sendo capturadas e sincronizadas com as tabelas Delta .

A subguia mostra o mapeamento e o progresso por tabela.

Duas subabas mostram o mapeamento e o progresso por tabela:

  • Esquemas: Lista cada esquema de origem e seu catálogo e esquema de destino no Unity Catalog, com um Status (por exemplo, Sincronizando ) para esse esquema.
  • Tabelas: Lista cada tabela de origem, sua tabela de destino lb_<table_name>_history no Unity Catalog, Status ( Sincronizando ou Capturando Snapshot ), LSN Confirmado (até onde a sincronização gravou no Delta; mostrado como - enquanto uma tabela ainda está no Snapshot inicial) e Última Sincronização (quando a tabela foi sincronizada pela última vez).

Você também pode executar SELECT * FROM wal2delta.tables; no Editor SQL do Lakebase (ou em qualquer cliente Postgres) para inspecionar o status de sincronização. O resultado inclui table_oid, status (por exemplo, STREAMING ou SNAPSHOTTING), committed_lsn e last_write_time para cada tabela.

info

O que é wal2delta? O lakehouse Sync é alimentado pela extensão wal2delta do Postgres, que é executada dentro do compute do Lakebase. Ele usa decodificação lógica para capturar as alterações log de gravação antecipada (WAL) e as grava em tabelas Delta no Unity Catalog.

Desativar a sincronização

Desativar a sincronização interrompe a replicação de todos os esquemas do Lakebase que estavam sendo sincronizados.

  1. No seu workspace Databricks , abra o Lakebase Postgres no seletor de aplicativos (canto superior direito).
  2. Selecione seu projeto Lakebase e a ramificação onde você configurou a sincronização (por exemplo, produção ou principal ).
  3. Abra a visão geral do Branch e clique na tab de sincronização do Lakehouse .
  4. Clique em Desativar sincronização . Na caixa de diálogo de confirmação, revise o aviso de que as alterações deixarão de ser sincronizadas com as tabelas Delta e clique em Desativar novamente para confirmar.

Desativar a sincronização não reinicia o seu compute.

atenção

Se você reativar a sincronização posteriormente, o sistema não realizará um novo snapshot completo. Quaisquer alterações que tenham ocorrido enquanto a sincronização estava desativada são permanentemente perdidas da tabela Delta de destino.

Como funciona o Lakehouse Sync

Em vez de sobrescrever a tabela de destino do Unity Catalog para refletir o estado atual dos seus dados no Lakebase, a sincronização adiciona uma nova linha para cada evento de alteração. Isso fornece um log completo e imutável de como seus dados evoluíram ao longo do tempo.

  • Nomeação de destino: As tabelas Delta são criadas no Unity Catalog e usam o padrão de nomenclatura lb_<table_name>_history. Estas são tabelas Delta da UC.
  • Sincronização em nível de esquema: Ao configurar a sincronização para um esquema do Lakebase, todas as tabelas atuais e futuras desse esquema são sincronizadas. Tabelas vazias não são sincronizadas. Deve haver pelo menos uma linha na tabela Lakebase para que ela apareça na sincronização.
  • Tabelas excluídas: Se você excluir uma tabela no Lakebase, a tabela Delta de destino no Unity Catalog será preservada (não excluída).

Você pode monitorar o status de sincronização na tab de sincronização do Lakehouse na visão geral do branch ou executando SELECT * FROM wal2delta.tables; no Lakebase.

Esquema da tabela Delta de destino

Além das suas colunas de dados, a sincronização adiciona estas colunas do sistema a cada tabela Delta de destino no Unity Catalog:

Coluna

Tipo

Descrição

_change_type

TEXT

tipo de operações: insert, delete, update_preimage ou update_postimage.

_timestamp

Timestamp

Tempo commit da transação no Postgres (sem fuso horário).

_lsn

BigInt

Número de sequência do log do Postgres.

_xid

Integer

ID da transação do Postgres.

Padrões de mudança comuns

Esses padrões aparecem nas tabelas Delta de destino no Unity Catalog:

  • Carga inicial: Na primeira execução de sincronização em uma tabela Lakebase existente, cada linha existente é escrita com _change_type = insert.
  • Atualizações: Uma atualização produz duas linhas: uma com _change_type = update_preimage (linha antiga) e uma com _change_type = update_postimage (linha nova).
  • Exclusões: Uma exclusão produz uma linha com _change_type = delete.

Limitações e resolução de problemas

Você pode ver o status da tabela (quais tabelas estão sendo atualizadas, ignoradas ou transmitidas) na tab de sincronização do Lakehouse ou executando o seguinte comando no Lakebase:

SQL
SELECT * FROM wal2delta.tables;

Motivos comuns para uma tabela não sincronizar:

  • IDENTIDADE DE RÉPLICA COMPLETA não definida: Certifique-se de executar ALTER TABLE <table_name> REPLICA IDENTITY FULL; para cada tabela.
  • Tabelas particionadas: Tabelas particionadas no Lakebase não são suportadas. A sincronização de um esquema que contém tabelas particionadas faz com que essas tabelas não sejam sincronizadas.
  • Conflitos de nomenclatura: as tabelas Delta são nomeadas como lb_<table_name>_history sem o prefixo do esquema Postgres de origem. Se você sincronizar dois esquemas Postgres diferentes (por exemplo, sales.users e marketing.users) para o mesmo esquema Unity Catalog , a primeira tabela será sincronizada e a segunda falhará devido à colisão de nomes. Mapear esquemas do Postgres que compartilham nomes de tabelas para diferentes esquemas do Unity Catalog.
  • Tipos de dados não suportados: Se uma tabela tiver um tipo de dados não suportado, a criação dessa tabela Delta falhará e a tabela não será sincronizada. Consulte Mapeamento de tipos de dados.

Mapeamento de tipo de dados

A sincronização é compatível com a maioria dos tipos primitivos padrão do PostgreSQL. Tipos não suportados fazem com que a criação da tabela Delta falhe para essa tabela.

Tipo PostgreSQL

Tipo Databricks Delta

Notas

Booleana

Booleana

INT, SMALLINT, BIGINT

INT, SMALLINT, BIGINT

TEXT, VARCHAR, CARACTERE

String

JSONB

String

Armazenado como uma string JSON .

ENUM

String

Armazenado como o rótulo enum.

NUMÉRICO / DECIMAL

Decimal

Utiliza a precisão/escala da fonte. Se não estiver definido, o valor padrão será DECIMAL(38, 18).

Data

Data

Timestamp

TIMESTAMP_NTZ

MARCAÇÃO DE TEMPO

Timestamp

FLUTUAR, DUPLO

FLUTUAR, DUPLO

Tipos não suportados:

  • Geografia/Geometria (PostGIS): Tipos da extensão PostGIS (por exemplo, geometry, geography).
  • Vetor (pgvector): O tipo vector da extensão pgvector.
  • Tipos compostos/estruturais: Tipos personalizados definidos com CREATE TYPE ... AS (field_name type, ...). São tipos semelhantes a linhas com campos nomeados (às vezes chamados de structs). Colunas cujo tipo é composto não podem ser sincronizadas.
  • Mapa: Tipos key-valor semelhantes a mapas, como hstore (da extensão hstore ). O Postgres não possui um tipo de mapa integrado; hstore é a maneira usual de armazenar pares key-valor em uma coluna.

Encontre colunas que possam estar bloqueando a sincronização.

Para encontrar colunas que possam bloquear a sincronização, execute a seguinte consulta no Lakebase. A lista inclui todas as colunas cujo tipo não está no conjunto de tipos suportados acima (incluindo enums). Qualquer tabela que possua uma ou mais dessas colunas não será sincronizada.

SQL
SELECT c.table_schema, c.table_name, c.column_name, c.udt_name AS data_type
FROM information_schema.columns c
JOIN pg_catalog.pg_type t ON t.typname = c.udt_name
WHERE c.table_schema = 'public'
AND c.table_name IN (SELECT tablename FROM pg_tables WHERE schemaname = c.table_schema)
AND NOT (
c.udt_name IN ('bool', 'int2', 'int4', 'int8', 'text', 'varchar', 'bpchar', 'jsonb', 'numeric', 'date', 'timestamp', 'timestamptz', 'real', 'float4', 'float8')
OR t.typcategory = 'E'
)
ORDER BY c.table_schema, c.table_name, c.ordinal_position;

Gerenciando alterações de esquema

Suportado: Renomear uma tabela no Postgres (por exemplo, ALTER TABLE users RENAME TO customers) permite que a sincronização continue. O nome da tabela Delta de destino não muda. Permanece lb_users_history.

Atenção: Adicionar uma coluna, remover uma coluna ou alterar o tipo de dados de uma coluna não é aplicado automaticamente ao destino. Essas alterações fazem com que a sincronização dessa tabela pare de receber novos eventos.

Solução alternativa: Para aplicar alterações de esquema com segurança sem interromper a sincronização, use os seguintes passos para criar uma nova tabela e swap la. Isso faz com que a sincronização reconheça uma nova tabela e inicie um novo histórico.

  1. Crie a nova tabela com o esquema atualizado.

    SQL
    CREATE TABLE users_v2 (
    id INT PRIMARY KEY,
    name TEXT,
    new_column TEXT -- The new schema change
    );
  2. Defina a identidade da réplica como completa.

    SQL
    ALTER TABLE users_v2 REPLICA IDENTITY FULL;
  3. Preencher com dados da tabela antiga.

    SQL
    INSERT INTO users_v2 SELECT *, NULL FROM users;
  4. Troque os nomes das tabelas.

    SQL
    BEGIN;
    ALTER TABLE users RENAME TO users_backup;
    ALTER TABLE users_v2 RENAME TO users;
    COMMIT;
nota

Com essa abordagem, você perde a história da tabela anterior. A nova tabela inicia sua história de sincronização do zero.

Desduplicar para criar um espelho do estado atual

Para análises que precisam do estado atual da tabela Lakebase (um espelho) em vez do histórico completo, você pode remover duplicatas no Databricks SQL (por exemplo, no editor SQL do Lakehouse ou em um Notebook conectado a um SQL warehouse) usando uma função de janela:

SQL
SELECT
*
FROM
(
SELECT
*,
ROW_NUMBER() OVER (PARTITION BY id ORDER BY _lsn DESC) AS rn
FROM
`<catalog>.<schema>.lb_<table_name>_history`
WHERE
_change_type IN ('insert', 'update_postimage', 'delete')
)
WHERE
rn = 1
AND _change_type != 'delete';

Substitua id pela coluna key primária da sua tabela. A consulta particiona por essa key, ordena por _lsn para obter o evento mais recente e mantém apenas a linha mais recente por key , excluindo as linhas cujo evento mais recente foi uma exclusão.

Próximos passos

Dependendo do seu objetivo, use as tabelas Delta lb_<table_name>_history com outros recursos Databricks :