Pular para o conteúdo principal

Conectar ao Lakebase

info

Visualização

Esse recurso está em Prévia Pública.

Use a transmissão estructurada para gravar no Lakebase com lotes integrada, novas tentativas automáticas e autenticação gerenciada pelo workspace.

Quando usar o destino do Lakebase

Use o coletor Lakebase para gravações de transmissão de baixa latência no Lakebase. Este coletor não exige a implementação de funções foreachBatch personalizadas para o gerenciamento de lotes, o gerenciamento de conexão e o tratamento de erros.

Casos de uso comuns incluem:

  • Atualize bancos de dados de aplicativos em tempo real para painéis operacionais ou recursos voltados para o cliente.
  • Sincronizar dados em constante alteração, como resultados de transmissão agregados ou filtrados, em um banco de dados transacional.
  • Grave a saída de uma consulta de transmissão estructurada em uma tabela Lakebase com latência de subsegundo usando modo em tempo real.

Requisitos

  • Databricks Runtime 18 e acima
  • Compute clássico com modos de acesso Dedicado ou Padrão.
  • Um banco de dados Lakebase

Conecte-se a um banco de dados

O coletor do Lakebase oferece suporte aos seguintes métodos de conexão:

Tabelas do Lakebase não registradas no Unity Catalog

Para tabelas do Lakebase não registradas no Unity Catalog, o conector gerencia automaticamente as credenciais e usa a identidade do usuário ou da entidade de serviço do Databricks que executa a consulta.

Para gravar em uma tabela Lakebase, use as opções endpoint e dbtable. O exemplo a seguir também inclui as opções opcionais database e upsertkey:

Python
(df.writeStream
.format("postgresql")
.outputMode("update")
.option("endpoint", "<project-id>.<branch-id>.<endpoint-id>")
.option("database", "<database>") # Optional. Defaults to databricks_postgres.
.option("dbtable", "<schema>.<table>")
.option("upsertkey", "<primary-key-column>") # Optional. Inferred from the table's primary key if omitted.
.option("checkpointLocation", "/Volumes/<catalog>/<schema>/<volume>/<checkpoint-name>")
.start()
)

Substitua os seguintes espaços reservados:

  • <project-id>.<branch-id>.<endpoint-id>: Seu endpoint Lakebase. Encontre todos os três valores no nome do recurso no menu Obter ID da tab Computes , que tem o formato projects/<project-id>/branches/<branch-id>/endpoints/<endpoint-id>. Consulte Identificadores de Compute.
  • <database>: Opcional. O nome do banco de dados Postgres de destino. O default é databricks_postgres. Consultar Gerenciar bancos de dados.
  • <schema>.<table>: A tabela de destino no formato schema.table. Se você omitir o esquema, o sink usa o esquema public. Use identificadores simples que comecem com uma letra ou sublinhado e contenham apenas letras, números e sublinhados; identificadores entre aspas e caracteres especiais, como hifens, não são suportados.
  • <primary-key-column>: Opcional. Uma lista separada por vírgulas das colunas que formam a chave de upsert, por exemplo id ou user_id,event_type. Se você omitir upsertkey, o coletor inferirá a chave da chave primária da tabela de destino. Consulte o comportamento de Upsert.
  • /Volumes/<catalog>/<schema>/<volume>/<checkpoint-name>: Um caminho de volume do Unity Catalog onde a consulta armazena seu ponto de verificação. Você também pode usar um URI de armazenamento de objeto em cloud. O local deve ser um armazenamento onde você pode gravar, não um disco local, e deve ser exclusivo para cada consulta de transmissão. Isso é independente da tabela de destino. Consulte pontos de verificação de transmissão estructurada.

Para configurações opcionais, como batchsize e batchinterval, consulte Opções de configuração.

Opções de configuração

O sink gera um erro para opções não reconhecidas, JDBC_STREAMING_SINK_INVALID_OPTIONS.

As seguintes opções se aplicam a todos os métodos de conexão:

Chave

Padrão

Descrição

batchinterval

100 milliseconds

Opcional. O tempo máximo para reter linhas no buffer antes de liberar. Por exemplo, "50 milliseconds".

batchsize

1000

Opcional. O número máximo de linhas para cada transação de banco de dados.

checkpointLocation

Nenhuma

Obrigatório. Caminho para um diretório de ponto de verificação, como um volume do Unity Catalog (/Volumes/<catalog>/<schema>/<volume>/<checkpoint-name>). Deve ser exclusivo para cada consulta. Consulte Pontos de verificação de Transmissão estructurada.

upsertkey

Nenhuma

Opcional. Uma lista separada por vírgulas de nomes de colunas que formam a chave upsert. Por exemplo, "id" ou "user_id,event_type". Se você especificar upsertkey, as colunas devem corresponder à chave primária da tabela, ou a consulta falhará. Se omitir, o sink usa a chave primária automaticamente. Para obter mais informações, consulte comportamento de upsert.

Tabelas do Lakebase não registradas com o Unity Catalog

As seguintes opções se aplicam ao se conectar a uma tabela do Lakebase não registrada com o Unity Catalog:

Chave

Padrão

Descrição

database

databricks_postgres

Opcional. O nome do banco de dados PostgreSQL de destino.

dbtable

Nenhuma

Obrigatório. O nome da tabela de destino no formato schema.table. Se você não especificar um esquema, o valor do esquema default é public. Use identificadores simples que começam com uma letra ou sublinhado e contêm apenas letras, números e sublinhados. Não coloque entre aspas os nomes de tabelas ou esquemas; identificadores entre aspas e nomes com caracteres especiais, como hifens, não são suportados.

endpoint

Nenhuma

Obrigatório. O endpoint do Lakebase, no formato project_id.branch_id ou project_id.branch_id.endpoint_id. O endpoint_id é opcional; caso seja omitido e o branch possua um único endpoint de leitura e gravação, o sink seleciona esse endpoint por default.

Comportamento de Upsert

Quando existirem chaves de upsert, seja especificadas com upsertkey ou inferidas pelo sink a partir das chaves primárias da tabela, o sink realiza upsert na tabela com a sintaxe INSERT INTO ... ON CONFLICT (<upsert_key>) DO UPDATE SET ... do PostgreSQL.

Quando não existem chaves de upsert, o coletor realiza inserções. O modo de saída da consulta não tem efeito sobre o comportamento de upsert ou insert.

As upsertkey colunas devem:

  • Seja um subconjunto não vazio das colunas do DataFrame.
  • Corresponda o(a) PRIMARY KEY da tabela de destino exatamente. Se as colunas especificadas não corresponderem à chave primária, a consulta falha.
  • Devem ser tipos comparáveis, como tipos numéricos ou strings. Para evitar impasses no banco de dados durante gravações concorrentes, o coletor ordena as linhas pela chave de upsert dentro de cada lote. Chaves de upsert não suportam tipos complexos ou struct.

Os nomes das colunas são automaticamente colocados entre aspas com o default do PostgreSQL, aspas duplas ", o que trata palavras reservadas e nomes com maiúsculas e minúsculas misturadas.

Nomes de tabelas e esquemas devem usar identificadores simples que comecem com uma letra ou sublinhado e contenham apenas letras, números e sublinhados. O coletor não suporta identificadores entre aspas ou caracteres especiais, como hífens, em nomes de tabelas ou esquemas.

Ajuste de desempenho

Processamento em lote e contrapressão

Um flush é acionado quando uma das condições é atendida:

  • O buffer atinge batchsize linhas, cujo padrão é 1000.
  • A idade do buffer excede batchinterval, que por default é 100 milliseconds.

Quando o banco de dados não consegue acompanhar a taxa de dados de entrada, o destino propaga contrapressão a montante para a fonte.

Latência e Taxa de transferência: Orientações

  • Para cargas de trabalho de baixa latência com modo em tempo real, diminua batchinterval para garantir um tempo máximo menor antes da descarga. Consulte modo em tempo real em Transmissão estructurada.
  • Para cargas de trabalho de alta taxa de transferência, aumente batchsize para reduzir a sobrecarga de cada transação.

Comportamento da conexão

O sink usa pooling de conexão em executors. Por padrão, cada tarefa usa uma conexão de banco de dados.

O Databricks recomenda que você use o valor default de 1 tarefa para cada conexão. Ao aumentar o número de tarefas para cada conexão, poderá causar contenções de conexão e aumentar as latências para conexões de alta taxa de transferência.

Para configurar a proporção de tarefas para conexões, defina a configuração do Spark spark.databricks.sql.streaming.jdbc.tasksPerConnection. Se o banco de dados de destino tiver um limite de conexão baixo, reduza o número de partições em ordem aleatória ou aumente spark.databricks.sql.streaming.jdbc.tasksPerConnection.

O sink tenta novamente, de forma automática, erros transitórios de JDBC, incluindo falhas de conexão, deadlocks e limitação de taxa. Se o coletor esgotar todas as tentativas, a consulta falhará.

Gatilhos e modos de saída compatíveis

Gatilhos

Esta tabela mostra suporte para tipos de trigger de transmissão estructurada.

Trigger

Suportado

realTime

Sim

ProcessingTime

Sim

AvailableNow

Sim

Once

Sim

Modos de saída

Esta tabela mostra suporte para modos de saída de transmissão estructurada.

Modo de saída

Suportado

update

Sim

append

Sim. O comportamento é idêntico a update. A consulta faz upsert quando a tabela de destino tem uma chave primária, caso contrário, a consulta insere. Ver Comportamento do Upsert.

complete

Não

Limitações

  • O compute serverless e Lakeflow Spark Declarative Pipelines não são suportados.
  • Somente Lakebase tem suporte como destino de gravação. Bancos de dados externos compatíveis com PostgreSQL não são suportados.