Pular para o conteúdo principal

Feed de dados de alterações do Lakebase

Disponibilidade regional

Lakebase autoscale é a versão mais recente do Lakebase, com recursos como autoscale compute, escala-to-zero, branching e instant restore. Para regiões compatíveis, consulte Disponibilidade por região. Se você é usuário de provisionamento Lakebase , consulte ProvisionamentoLakebase.

nota

O feed de dados de alterações do Lakebase está em pré-visualização pública.

O que é o Lakebase Change Data Feed?

Lakebase introduz um Feed de Dados de Alteração (CDF) nativo, desbloqueando seus dados operacionais para pipelines, modelos e aplicações subsequentes. Cada inserção, atualização e exclusão em uma tabela Postgres Lakebase é capturada do log de escrita antecipada e armazenada como uma nova linha em uma tabela Delta Unity Catalog , sendo os lotes e liberados a cada ~15 segundos. O histórico de alterações é armazenado em um formato aberto que qualquer mecanismo compute pode ler.

As tabelas de destino seguem o mesmo formato do Delta Change Data Feed: cada linha contém um _pg_change_type, um LSN, um ID de transação e um carimbo de data/hora. As mudanças operacionais se tornam uma fonte de primeira classe para ETL, auditoria e consumidores subsequentes — sem a necessidade de instalar uma infraestrutura CDC externa.

Fluxo de dados CDF do Lakebase do Postgres para tabelas Delta no Unity Catalog, através do wal2delta.

Casos de uso

Lakebase CDF integra dados operacionais ao lakehouse , permitindo que os dutos e aplicativos subsequentes reajam às mudanças em tempo real.

Caso de uso

Descrição

Pipeline ETL

Utilize Lakebase como fonte de bronze para o gasoduto Medallion. Crie um Job incremental SDP ou Spark com base no feed de alterações e atualize as tabelas downstream silver e ouro.

Logs de auditoria

Mantenha um histórico completo e pesquisável de cada inserção, atualização e exclusão em uma tabela Lakebase para compliance e análise forense. A história é imutável, Delta.

Sistemas externos

Armazene os dados de alteração do Lakebase em um formato aberto que qualquer mecanismo possa consumir. Como o destino é uma tabela Delta no Unity Catalog, sistemas externos e leitores que não sejam da Databricks podem acessar o feed diretamente.

Ative esta pré-visualização

O administrador workspace deve ativar a pré-visualização do feed de dados de alteraçõesLakebase na página de pré-visualizações workspace .

Requisitos

  • Lakebase autoscale: Um projetoLakebase autoscale executando o Postgres 17.
  • Banco de dados de origem: As tabelas devem residir no banco de dados databricks_postgres no Lakebase. Cada projeto é criado com este banco de dados default . Essa é uma limitação conhecida.
  • Unity Catalog: O CDF de configuração de identidade precisa de USE CATALOG , USE SCHEMA e CREATE TABLE no catálogo e esquema de destino. Consulte Conceder permissões a um objeto.
  • Armazenamento padrão: Catálogos de destino configurados com armazenamento default não são suportados.
  • Projeto Lakebase: Sua função no Postgres requer permissões CAN MANAGE no projeto Lakebase. Os proprietários de projetos têm CAN MANAGE por default. Consulte as permissões de gerenciamento do projeto.
  • Tipos de dados: Consulte Mapeamento de tipos de dados. Os tipos sem um equivalente Delta direto são armazenados como strings.

Configure o CDF do Lakebase

Para começar, defina a identidade de réplica completa nas tabelas que você deseja no feed (passo 1) e, em seguida, inicie o CDF no aplicativo Lakebase (passo 2). Seus dados aparecem como tabelas Delta lb_<table_name>_history no catálogo Unity Catalog e no esquema que você escolher.

o passo 1: Definir a identidade da réplica completa

Para que uma tabela do Lakebase participe da CDF, ela deve ter REPLICA IDENTITY FULL definido. Por default o Postgres logs apenas a key primária quando uma linha é atualizada ou excluída. A configuração de identidade completa instrui o Postgres a registrar o estado da linha antes e depois da alteração no log de escrita antecipada (write-ahead log), o que o CDF precisa para construir um histórico de alterações completo.

Você pode executar esses comandos no Editor SQL Lakebase ou em qualquer cliente Postgres.

SQL
ALTER TABLE <table_name> REPLICA IDENTITY FULL;

Verifique quais tabelas têm identidade de réplica definida.

Para ver quais tabelas em um esquema têm identidade de réplica configurada, execute o seguinte comando:

SQL
SELECT n.nspname AS table_schema,
c.relname AS table_name,
CASE c.relreplident
WHEN 'd' THEN 'default'
WHEN 'n' THEN 'nothing'
WHEN 'f' THEN 'full'
WHEN 'i' THEN 'index'
END AS replica_identity
FROM pg_class c
JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE c.relkind = 'r'
AND n.nspname = 'public'
ORDER BY n.nspname, c.relname;

Somente as linhas com replica_identity = 'full' estão prontas para a CDF.

o passo 2: comece o feed de dados de alteração

O CDF do Lakebase é configurado no nível do esquema. Uma vez iniciado, todas as tabelas atuais e futuras do esquema de origem são incluídas no feed.

  1. No seu workspace Databricks , abra Lakebase Postgres no seletor de aplicativos (canto superior direito).

  2. Selecione seu projeto Lakebase e a ramificação que deseja usar (por exemplo, produção ou principal ).

  3. Abra a visão geral da filial e clique na tab Alterar fluxo de dados .

  4. Click começar .

  5. Na caixa de diálogo de configuração:

    • Banco de dados: valor padrão databricks_postgres.
    • Esquema: Selecione o esquema Postgres de origem.
    • Para catalogar: Selecione o catálogo Unity Catalog de destino.
    • Esquema: Selecione o esquema de destino Unity Catalog .
  6. Clique em Iniciar para começar a reprodução do feed.

Visão geral da ramificação com tab Feed de Dados de Alteração, mostrando o início e a configuração do esquema.

As tabelas aparecem no destino como lb_<table_name>_history. Para encontrá-las, abra o Catálogo na barra lateral, navegue até o catálogo e esquema de destino e abra a tab Tabelas .

A tab Feed de Dados de Alteração" no Lakebase possui duas subguias:

A subaba mostra o mapeamento e o progresso por tabela.

  • Esquemas: Lista cada esquema de origem, seu catálogo de destino e esquema no Unity Catalog, além de um status.
  • Tabelas: Lista cada tabela de origem, sua tabela de destino lb_<table_name>_history , status (Streaming ou Snapshotting), LSN confirmado (até que ponto o feed gravou no Delta, mostrado como - enquanto ainda no Snapshot inicial) e Última atualização (última vez que a tabela recebeu alterações).

Você também pode inspecionar o estado do feed do Postgres executando o seguinte comando no Editor SQL do Lakebase:

SQL
SELECT * FROM wal2delta.tables;

O resultado inclui table_oid, status (STREAMING ou SNAPSHOTTING), committed_lsn e last_write_time por tabela.

info

O que é wal2delta? Lakebase CDF é alimentado pela extensão wal2delta do Postgres, que é executada dentro do compute do Lakebase . Ele usa decodificação lógica para capturar as alterações log de gravação antecipada (WAL) e as grava em tabelas Delta no Unity Catalog.

Esquema da tabela de destino

O CDF grava uma tabela Delta por tabela de origem, nomeada lb_<table_name>_history em seu catálogo e esquema de destino. Além das colunas de origem, cada linha contém estas colunas do sistema:

Coluna

Tipo

Descrição

_pg_change_type

TEXT

tipo de operações: insert, delete, update_preimage ou update_postimage.

_pg_lsn

BigInt

Número de sequência do log do Postgres.

_pg_xid

Integer

ID da transação do Postgres.

_timestamp

Timestamp

Carimbo de data e hora em que a alteração foi processada (sem fuso horário).

_sort_by

BigInt

key de classificação monotônica usada para ordenar todas as alterações.

Padrões de mudança comuns

  • Instantâneo inicial: Na primeira execução do CDF em uma tabela existente Lakebase , cada linha existente é escrita com _pg_change_type = 'insert'.
  • Atualizações: Uma atualização produz duas linhas: uma com _pg_change_type = 'update_preimage' (linha antiga) e uma com _pg_change_type = 'update_postimage' (linha nova).
  • Exclusões: Uma exclusão produz uma linha com _pg_change_type = 'delete'.

Esses são os mesmos eventos de alteração que o Delta Change Data Feed, portanto, os mesmos padrões subsequentes se aplicam.

Comportamento operacional

  • Colisões de nomes: Se duas tabelas de origem mapearem para o mesmo nome de destino (por exemplo, sales.users e marketing.users ambas mapeando para lb_users_history), o CDF grava a primeira em lb_users_history e adiciona automaticamente o sufixo lb_users_history_1 à segunda. Você pode renomear qualquer uma das tabelas de destino no Unity Catalog e o feed continuará funcionando.
  • Escopo em nível de esquema: Ao iniciar um CDF em um esquema Lakebase , todas as tabelas atuais e futuras desse esquema são incluídas. Tabelas vazias são ignoradas — uma tabela precisa ter pelo menos uma linha para aparecer no destino.
  • Tabelas de origem excluídas: Se você excluir uma tabela no Lakebase, a tabela Delta de destino no Unity Catalog será preservada.

Construir o oleoduto a jusante

Lakebase CDF foi projetado para dutos de distribuição que reagem a mudanças operacionais. Os padrões abaixo mostram três maneiras de consumir a ração, ordenadas da mais simples à mais flexível.

Exemplo de cenário. Um aplicativo de comércio eletrônico registra pedidos em uma tabela Postgres orders , cada linha contendo um item_id e quantity. A equipe de logística precisa de níveis de estoque em tempo real. Com o CDF, cada alteração em orders é armazenada na tabela Delta lb_orders_history no Unity Catalog. O pipeline downstream lê esse feed de alterações e atualiza uma tabela inventory_levels sempre que um pedido é feito, editado ou cancelado.

Calcular o estoque atual com uma viewmaterializada.

O padrão mais simples é uma viewmaterializada em SQL sobre a tabela história. A tabela MV é atualizada incrementalmente à medida que novos eventos de alteração chegam, e os consumidores subsequentes a consultam como qualquer outra tabela.

SQL
CREATE MATERIALIZED VIEW inventory_levels AS
SELECT
item_id,
SUM(
CASE
-- New orders (and the "new half" of updates) decrement inventory
WHEN _pg_change_type IN ('insert', 'update_postimage') THEN -quantity
-- Cancellations (and the "old half" of updates) restore inventory
WHEN _pg_change_type IN ('delete', 'update_preimage') THEN quantity
ELSE 0
END
) AS current_inventory,
MAX(_timestamp) AS last_transaction_ts,
MAX(_pg_lsn) AS last_lsn
FROM lb_orders_history
GROUP BY item_id;

As duas linhas geradas a cada atualização se cancelam, exceto pela variação líquida, de modo que a soma acumulada permanece correta à medida que os pedidos são editados.

mudanças de transmissão com pipeline declarativo Spark

Para uma arquitetura de medalhão estruturada, use o pipeline declarativoSpark (SDP) para declarar as tabelas bronze, prata e ouro. A execução do SDP funciona como um pipeline conectado, com pontos de verificação e gerenciamento de dependências automatizados.

Python
import dlt
from pyspark.sql import functions as F

@dlt.table
def inventory_adjustments():
return (
spark.readStream.table("<catalog>.<schema>.lb_orders_history")
.withColumn(
"delta",
F.when(F.col("_pg_change_type").isin("insert", "update_postimage"), -F.col("quantity"))
.when(F.col("_pg_change_type").isin("delete", "update_preimage"), F.col("quantity"))
.otherwise(0),
)
.select("item_id", "delta", "_timestamp")
)

@dlt.expect_or_drop("non_negative_stock", "on_hand >= 0")
@dlt.table
def inventory_levels():
return (
spark.read.table("LIVE.inventory_adjustments")
.groupBy("item_id")
.agg(F.sum("delta").alias("on_hand"))
)

inventory_adjustmentslb_orders_history incrementalmente com readStream e produz um delta por evento. inventory_levels agrega por item_id para compute o estoque atual. A expectativa de queda de linhas levaria o estoque a valores negativos, sinalizando um bug em um estágio anterior do processamento.

Para obter um passo a passo completo, consulte o tutorial: Construir um pipeline ETL usando captura de dados de alterações (CDC).

Processamento personalizado com Spark transmissão estruturada

Quando você precisa de controle total — por exemplo, mesclagem personalizada, efeitos colaterais ou vários sinks — leia a tabela de história diretamente com Spark transmissão estruturada e use foreachBatch para escrever em seu destino.

Python
from pyspark.sql import functions as F
from delta.tables import DeltaTable

def update_inventory(batch_df, batch_id):
deltas = (
batch_df
.withColumn(
"delta",
F.when(F.col("_pg_change_type").isin("insert", "update_postimage"), -F.col("quantity"))
.when(F.col("_pg_change_type").isin("delete", "update_preimage"), F.col("quantity"))
.otherwise(0),
)
.groupBy("item_id")
.agg(F.sum("delta").alias("delta"))
)

target = DeltaTable.forName(spark, "<catalog>.<schema>.inventory_levels")
(target.alias("t")
.merge(deltas.alias("s"), "t.item_id = s.item_id")
.whenMatchedUpdate(set={"on_hand": F.expr("t.on_hand + s.delta")})
.whenNotMatchedInsert(values={"item_id": "s.item_id", "on_hand": "s.delta"})
.execute())

(spark.readStream.table("<catalog>.<schema>.lb_orders_history")
.writeStream
.foreachBatch(update_inventory)
.option("checkpointLocation", "/Volumes/<catalog>/<schema>/checkpoints/inventory_levels")
.start())

Cada microlote agrega os eventos de mudança por item_id e mescla os deltas líquidos em inventory_levels.

Mapeamento de tipo de dados

O CDF suporta a maioria dos tipos primitivos padrão do PostgreSQL. Os tipos sem um equivalente Delta direto são armazenados como strings.

Tipo PostgreSQL

Tipo Databricks Delta

Notas

Booleana

Booleana

INT, SMALLINT, BIGINT

INT, SMALLINT, BIGINT

TEXT, VARCHAR, CARACTERE

String

JSONB

String

Armazenado como uma string JSON .

ENUM

String

Armazenado como o rótulo enum.

NUMÉRICO / DECIMAL

DECIMAL ou strings

Utiliza a precisão/escala da fonte sempre que possível. Realiza reescalonamento sem perda de dados para valores de precisão/escala incompatíveis. Recorre a strings quando a precisão excede 38 ou quando a precisão/escala não estão definidas (NUMERIC ilimitado). Todas as colunas NUMERIC/DECIMAL podem ser nulas, pois os valores NaN são mapeados para NULL. Consulte Tipos numéricos do PostgreSQL.

Data

Data

Timestamp

TIMESTAMP_NTZ

MARCAÇÃO DE TEMPO

Timestamp

FLUTUAR, DUPLO

FLUTUAR, DUPLO

Tipos armazenados como strings:

  • Geografia/Geometria (PostGIS): Tipos da extensão PostGIS (por exemplo, geometry, geography).
  • Vetor (pgvector): O tipo vector da extensão pgvector.
  • Tipos compostos/estruturais: Tipos personalizados definidos com CREATE TYPE ... AS (field_name type, ...). Esses são tipos semelhantes a linhas com campos nomeados.
  • Mapa: Tipos key-valor semelhantes a mapas, como hstore (da extensão hstore ). O Postgres não possui um tipo de mapa integrado. hstore é a maneira comum de armazenar par key-valor em uma coluna.

Gerenciando alterações de esquema

  • Renomear uma tabela no Postgres (por exemplo, ALTER TABLE users RENAME TO customers) permite que o feed continue. O nome da tabela Delta de destino não muda — permanece lb_users_history.
  • Alterações no esquema (adicionar uma coluna, remover uma coluna ou alterar o tipo de dados de uma coluna) acionam um novo snapshot da tabela afetada. O CDF lê novamente a tabela inteira do Postgres e a reescreve na tabela Delta de destino.

Desativar CDF do Lakebase

Desativar o CDF interrompe o feed para todos os esquemas do Lakebase no projeto.

  1. No seu workspace Databricks , abra Lakebase Postgres no seletor de aplicativos (canto superior direito).
  2. Selecione seu projeto Lakebase e a ramificação onde você configurou o CDF.
  3. Abra a visão geral da filial e clique na tab Alterar fluxo de dados .
  4. Clique em Desativar . Na caixa de diálogo de confirmação, revise o aviso de que as alterações deixarão de ser propagadas para as tabelas Delta e clique em Desativar novamente para confirmar.

Desativar o CDF não reinicia o seu compute.

atenção

Se você reativar o CDF posteriormente, o sistema não realizará um novo snapshot completo. Quaisquer alterações que tenham ocorrido enquanto o CDF estava desativado são permanentemente perdidas das tabelas Delta de destino.

Limitações e resolução de problemas

Você pode ver o status de cada tabela (em snapshot, ignorada ou transmissão) na tab Alterar Feed de Dados ou executando o seguinte comando no Lakebase:

SQL
SELECT * FROM wal2delta.tables;

Motivos comuns pelos quais uma tabela não aparece no feed:

  • REPLICA IDENTITY FULL não definido: execução ALTER TABLE <table_name> REPLICA IDENTITY FULL; para a tabela. Veja o passo 1: Defina a identidade da réplica como completa.
  • Tabelas particionadas: Tabelas particionadas no Lakebase não são suportadas. Um esquema que contém tabelas particionadas faz com que essas tabelas falhem.
  • Tabelas vazias: Uma tabela com zero linhas é ignorada até que exista pelo menos uma linha.

Próximos passos