Conectar ao Lakebase
Visualização
Esse recurso está em Prévia Pública.
Use a transmissão estructurada para gravar no Lakebase com lotes integrada, novas tentativas automáticas e autenticação gerenciada pelo workspace.
Quando usar o destino do Lakebase
Use o coletor Lakebase para gravações de transmissão de baixa latência no Lakebase. Este coletor não exige a implementação de funções foreachBatch personalizadas para o gerenciamento de lotes, o gerenciamento de conexão e o tratamento de erros.
Casos de uso comuns incluem:
- Atualize bancos de dados de aplicativos em tempo real para painéis operacionais ou recursos voltados para o cliente.
- Sincronizar dados em constante alteração, como resultados de transmissão agregados ou filtrados, em um banco de dados transacional.
- Grave a saída de uma consulta de transmissão estructurada em uma tabela Lakebase com latência de subsegundo usando modo em tempo real.
Para sincronizar dados do Lakebase para tabelas Delta no Lakehouse, na direção inversa, consulte Feed de Dados de Alterações do Lakebase.
Requisitos
- Databricks Runtime 18.3 ou acima
- Compute clássico com modos de acesso Dedicado ou Padrão.
- Um banco de dados Lakebase
Conecte-se a um banco de dados
O coletor do Lakebase oferece suporte aos seguintes métodos de conexão:
Tabelas do Lakebase registradas no Unity Catalog
Para tabelas Lakebase registradas no Unity Catalog, o conector gerencia automaticamente as credenciais e usa a identidade do usuário ou da entidade de serviço do Databricks executando a consulta. Se a tabela não existir, o conector cria a tabela.
Para registrar um banco de dados Lakebase no Unity Catalog, consulte Registrar um banco de dados Lakebase no Unity Catalog.
Para gravar em uma tabela Lakebase, utilize a opção upsertkey e o método .toTable() com um nome de tabela totalmente qualificado, catalog.schema.table:
- Python
- Scala
(df.writeStream
.format("postgresql")
.outputMode("update")
.option("upsertkey", "<primary-key-column>")
.option("checkpointLocation", "/checkpoints/<query-name>")
.toTable("<catalog>.<schema>.<table>")
)
df.writeStream
.format("postgresql")
.outputMode("update")
.option("upsertkey", "<primary-key-column>")
.option("checkpointLocation", "/checkpoints/<query-name>")
.toTable("<catalog>.<schema>.<table>")
Tabelas do Lakebase não registradas no Unity Catalog
Para tabelas do Lakebase não registradas no Unity Catalog, o conector gerencia automaticamente as credenciais e usa a identidade do usuário ou da entidade de serviço do Databricks que executa a consulta.
Para gravar em uma tabela Lakebase, use as opções endpoint, database, dbtable e upsertkey:
- Python
- Scala
(df.writeStream
.format("postgresql")
.outputMode("update")
.option("endpoint", "<project-id>.<branch-id>.<endpoint-id>")
.option("database", "<database>")
.option("dbtable", "<schema>.<table>")
.option("upsertkey", "<primary-key-column>")
.option("checkpointLocation", "/checkpoints/<query-name>")
.start()
)
df.writeStream
.format("postgresql")
.outputMode("update")
.option("endpoint", "<project-id>.<branch-id>.<endpoint-id>")
.option("database", "<database>")
.option("dbtable", "<schema>.<table>")
.option("upsertkey", "<primary-key-column>")
.option("checkpointLocation", "/checkpoints/<query-name>")
.start()
Opções de configuração
O sink gera um erro para opções não reconhecidas, JDBC_STREAMING_SINK_INVALID_OPTIONS.
As seguintes opções se aplicam a todos os métodos de conexão:
Chave | Padrão | Descrição |
|---|---|---|
|
| O tempo máximo para manter linhas no buffer antes do esvaziamento. Por exemplo, |
|
| O número máximo de linhas para cada transação de banco de dados. |
| Nenhuma | Obrigatório. Caminho para o diretório de ponto de verificação. |
| Nenhuma | Uma lista separada por vírgulas de nomes de colunas que formam a chave de upsert. Por exemplo: |
Tabelas do Lakebase não registradas com o Unity Catalog
As seguintes opções se aplicam ao se conectar a uma tabela do Lakebase não registrada com o Unity Catalog:
Chave | Padrão | Descrição |
|---|---|---|
| Nenhuma | O nome da base de dados PostgreSQL de destino. |
| Nenhuma | O nome da tabela de destino no formato |
| Nenhuma | Especifique o endpoint do Lakebase no formato |
Comportamento de Upsert
Quando existirem chaves de upsert, seja especificadas com upsertkey ou inferidas pelo sink a partir das chaves primárias da tabela, o sink realiza upsert na tabela com a sintaxe INSERT INTO ... ON CONFLICT (<upsert_key>) DO UPDATE SET ... do PostgreSQL.
Quando não existem chaves de upsert, o coletor realiza inserções. O modo de saída da consulta não tem efeito sobre o comportamento de upsert ou insert.
As upsertkey colunas devem:
- Seja um subconjunto não vazio das colunas do DataFrame.
- Faça referência a uma coluna da tabela de destino com uma restrição
PRIMARY KEY. - Devem ser tipos comparáveis, como tipos numéricos ou strings. Para evitar impasses no banco de dados durante gravações concorrentes, o coletor ordena as linhas pela chave de upsert dentro de cada lote. Chaves de upsert não suportam tipos complexos ou struct.
Nomes de coluna são automaticamente citados com o default do PostgreSQL, aspas duplas ", que trata de palavras-chave reservadas, nomes com maiúsculas e minúsculas e caracteres especiais.
O destino não cita os nomes das tabelas e os passa como estão para o banco de dados. Você deve colocar entre aspas os nomes de tabela com caracteres especiais, como "my-schema"."my-table".
Ajuste de desempenho
Processamento em lote e contrapressão
Um flush é acionado quando uma das condições é atendida:
- O buffer atinge
batchsizelinhas, cujo padrão é1000. - A idade do buffer excede
batchinterval, que por default é100 milliseconds.
Quando o banco de dados não consegue acompanhar a taxa de dados de entrada, o destino propaga contrapressão a montante para a fonte.
Latência e Taxa de transferência: Orientações
- Para cargas de trabalho de baixa latência com modo em tempo real, diminua
batchintervalpara garantir um tempo máximo menor antes da descarga. Consulte modo em tempo real em Transmissão estructurada. - Para cargas de trabalho de alta taxa de transferência, aumente
batchsizepara reduzir a sobrecarga de cada transação.
Comportamento da conexão
O sink usa pooling de conexão em executors. Por padrão, cada tarefa usa uma conexão de banco de dados.
O Databricks recomenda que você use o valor default de 1 tarefa para cada conexão. Ao aumentar o número de tarefas para cada conexão, poderá causar contenções de conexão e aumentar as latências para conexões de alta taxa de transferência.
Para configurar a proporção de tarefas para conexões, defina a configuração do Spark spark.databricks.sql.streaming.jdbc.tasksPerConnection. Se o banco de dados de destino tiver um limite de conexão baixo, reduza o número de partições em ordem aleatória ou aumente spark.databricks.sql.streaming.jdbc.tasksPerConnection.
O sink tenta novamente, de forma automática, erros transitórios de JDBC, incluindo falhas de conexão, deadlocks e limitação de taxa. Se o coletor esgotar todas as tentativas, a consulta falhará.
Gatilhos e modos de saída compatíveis
Gatilhos
Esta tabela mostra suporte para tipos de trigger de transmissão estructurada.
Trigger | Suportado |
|---|---|
| Sim |
| Sim |
| Sim |
| Sim |
Modos de saída
Esta tabela mostra suporte para modos de saída de transmissão estructurada.
Modo de saída | Suportado |
|---|---|
| Sim |
| Sim. O comportamento é idêntico a |
| Não |
Limitações
- O compute serverless e Lakeflow Spark Declarative Pipelines não são suportados.
- Somente Lakebase tem suporte como destino de gravação. Bancos de dados externos compatíveis com PostgreSQL não são suportados.