Pular para o conteúdo principal

Conectar ao Lakebase

info

Visualização

Esse recurso está em Prévia Pública.

Use a transmissão estructurada para gravar no Lakebase com lotes integrada, novas tentativas automáticas e autenticação gerenciada pelo workspace.

Quando usar o destino do Lakebase

Use o coletor Lakebase para gravações de transmissão de baixa latência no Lakebase. Este coletor não exige a implementação de funções foreachBatch personalizadas para o gerenciamento de lotes, o gerenciamento de conexão e o tratamento de erros.

Casos de uso comuns incluem:

  • Atualize bancos de dados de aplicativos em tempo real para painéis operacionais ou recursos voltados para o cliente.
  • Sincronizar dados em constante alteração, como resultados de transmissão agregados ou filtrados, em um banco de dados transacional.
  • Grave a saída de uma consulta de transmissão estructurada em uma tabela Lakebase com latência de subsegundo usando modo em tempo real.

Para sincronizar dados do Lakebase para tabelas Delta no Lakehouse, na direção inversa, consulte Feed de Dados de Alterações do Lakebase.

Requisitos

  • Databricks Runtime 18.3 ou acima
  • Compute clássico com modos de acesso Dedicado ou Padrão.
  • Um banco de dados Lakebase

Conecte-se a um banco de dados

O coletor do Lakebase oferece suporte aos seguintes métodos de conexão:

Tabelas do Lakebase registradas no Unity Catalog

Para tabelas Lakebase registradas no Unity Catalog, o conector gerencia automaticamente as credenciais e usa a identidade do usuário ou da entidade de serviço do Databricks executando a consulta. Se a tabela não existir, o conector cria a tabela.

Para registrar um banco de dados Lakebase no Unity Catalog, consulte Registrar um banco de dados Lakebase no Unity Catalog.

Para gravar em uma tabela Lakebase, utilize a opção upsertkey e o método .toTable() com um nome de tabela totalmente qualificado, catalog.schema.table:

Python
(df.writeStream
.format("postgresql")
.outputMode("update")
.option("upsertkey", "<primary-key-column>")
.option("checkpointLocation", "/checkpoints/<query-name>")
.toTable("<catalog>.<schema>.<table>")
)

Tabelas do Lakebase não registradas no Unity Catalog

Para tabelas do Lakebase não registradas no Unity Catalog, o conector gerencia automaticamente as credenciais e usa a identidade do usuário ou da entidade de serviço do Databricks que executa a consulta.

Para gravar em uma tabela Lakebase, use as opções endpoint, database, dbtable e upsertkey:

Python
(df.writeStream
.format("postgresql")
.outputMode("update")
.option("endpoint", "<project-id>.<branch-id>.<endpoint-id>")
.option("database", "<database>")
.option("dbtable", "<schema>.<table>")
.option("upsertkey", "<primary-key-column>")
.option("checkpointLocation", "/checkpoints/<query-name>")
.start()
)

Opções de configuração

O sink gera um erro para opções não reconhecidas, JDBC_STREAMING_SINK_INVALID_OPTIONS.

As seguintes opções se aplicam a todos os métodos de conexão:

Chave

Padrão

Descrição

batchinterval

100 milliseconds

O tempo máximo para manter linhas no buffer antes do esvaziamento. Por exemplo, "50 milliseconds".

batchsize

1000

O número máximo de linhas para cada transação de banco de dados.

checkpointLocation

Nenhuma

Obrigatório. Caminho para o diretório de ponto de verificação.

upsertkey

Nenhuma

Uma lista separada por vírgulas de nomes de colunas que formam a chave de upsert. Por exemplo: "id" ou "user_id,event_type". A tabela de destino deve ter uma restrição PRIMARY KEY nas colunas especificadas. Se não for especificada uma chave de upsert, o coletor infere a chave principal do esquema da tabela de destino. Se não houver uma chave primária, a consulta insere a linha, em vez de atualizar.

Tabelas do Lakebase não registradas com o Unity Catalog

As seguintes opções se aplicam ao se conectar a uma tabela do Lakebase não registrada com o Unity Catalog:

Chave

Padrão

Descrição

database

Nenhuma

O nome da base de dados PostgreSQL de destino.

dbtable

Nenhuma

O nome da tabela de destino no formato schema.table. Se você não especificar um esquema, o valor do esquema default é public.

endpoint

Nenhuma

Especifique o endpoint do Lakebase no formato project_id.branch_id.endpoint_id.

Comportamento de Upsert

Quando existirem chaves de upsert, seja especificadas com upsertkey ou inferidas pelo sink a partir das chaves primárias da tabela, o sink realiza upsert na tabela com a sintaxe INSERT INTO ... ON CONFLICT (<upsert_key>) DO UPDATE SET ... do PostgreSQL.

Quando não existem chaves de upsert, o coletor realiza inserções. O modo de saída da consulta não tem efeito sobre o comportamento de upsert ou insert.

As upsertkey colunas devem:

  • Seja um subconjunto não vazio das colunas do DataFrame.
  • Faça referência a uma coluna da tabela de destino com uma restrição PRIMARY KEY.
  • Devem ser tipos comparáveis, como tipos numéricos ou strings. Para evitar impasses no banco de dados durante gravações concorrentes, o coletor ordena as linhas pela chave de upsert dentro de cada lote. Chaves de upsert não suportam tipos complexos ou struct.

Nomes de coluna são automaticamente citados com o default do PostgreSQL, aspas duplas ", que trata de palavras-chave reservadas, nomes com maiúsculas e minúsculas e caracteres especiais.

O destino não cita os nomes das tabelas e os passa como estão para o banco de dados. Você deve colocar entre aspas os nomes de tabela com caracteres especiais, como "my-schema"."my-table".

Ajuste de desempenho

Processamento em lote e contrapressão

Um flush é acionado quando uma das condições é atendida:

  • O buffer atinge batchsize linhas, cujo padrão é 1000.
  • A idade do buffer excede batchinterval, que por default é 100 milliseconds.

Quando o banco de dados não consegue acompanhar a taxa de dados de entrada, o destino propaga contrapressão a montante para a fonte.

Latência e Taxa de transferência: Orientações

  • Para cargas de trabalho de baixa latência com modo em tempo real, diminua batchinterval para garantir um tempo máximo menor antes da descarga. Consulte modo em tempo real em Transmissão estructurada.
  • Para cargas de trabalho de alta taxa de transferência, aumente batchsize para reduzir a sobrecarga de cada transação.

Comportamento da conexão

O sink usa pooling de conexão em executors. Por padrão, cada tarefa usa uma conexão de banco de dados.

O Databricks recomenda que você use o valor default de 1 tarefa para cada conexão. Ao aumentar o número de tarefas para cada conexão, poderá causar contenções de conexão e aumentar as latências para conexões de alta taxa de transferência.

Para configurar a proporção de tarefas para conexões, defina a configuração do Spark spark.databricks.sql.streaming.jdbc.tasksPerConnection. Se o banco de dados de destino tiver um limite de conexão baixo, reduza o número de partições em ordem aleatória ou aumente spark.databricks.sql.streaming.jdbc.tasksPerConnection.

O sink tenta novamente, de forma automática, erros transitórios de JDBC, incluindo falhas de conexão, deadlocks e limitação de taxa. Se o coletor esgotar todas as tentativas, a consulta falhará.

Gatilhos e modos de saída compatíveis

Gatilhos

Esta tabela mostra suporte para tipos de trigger de transmissão estructurada.

Trigger

Suportado

realTime

Sim

ProcessingTime

Sim

AvailableNow

Sim

Once

Sim

Modos de saída

Esta tabela mostra suporte para modos de saída de transmissão estructurada.

Modo de saída

Suportado

update

Sim

append

Sim. O comportamento é idêntico a update. A consulta faz upsert quando a tabela de destino tem uma chave primária, caso contrário, a consulta insere. Ver Comportamento do Upsert.

complete

Não

Limitações

  • O compute serverless e Lakeflow Spark Declarative Pipelines não são suportados.
  • Somente Lakebase tem suporte como destino de gravação. Bancos de dados externos compatíveis com PostgreSQL não são suportados.