Conectar ao Lakebase
Visualização
Esse recurso está em Prévia Pública.
Use a transmissão estructurada para gravar no Lakebase com lotes integrada, novas tentativas automáticas e autenticação gerenciada pelo workspace.
Quando usar o destino do Lakebase
Use o coletor Lakebase para gravações de transmissão de baixa latência no Lakebase. Este coletor não exige a implementação de funções foreachBatch personalizadas para o gerenciamento de lotes, o gerenciamento de conexão e o tratamento de erros.
Casos de uso comuns incluem:
- Atualize bancos de dados de aplicativos em tempo real para painéis operacionais ou recursos voltados para o cliente.
- Sincronizar dados em constante alteração, como resultados de transmissão agregados ou filtrados, em um banco de dados transacional.
- Grave a saída de uma consulta de transmissão estructurada em uma tabela Lakebase com latência de subsegundo usando modo em tempo real.
Requisitos
- Databricks Runtime 18.3 ou acima
- Compute clássico com modos de acesso Dedicado ou Padrão.
- Um banco de dados Lakebase
Conecte-se a um banco de dados
O coletor do Lakebase oferece suporte aos seguintes métodos de conexão:
Tabelas do Lakebase não registradas no Unity Catalog
Para tabelas do Lakebase não registradas no Unity Catalog, o conector gerencia automaticamente as credenciais e usa a identidade do usuário ou da entidade de serviço do Databricks que executa a consulta.
Para gravar em uma tabela Lakebase, use as opções endpoint, database, dbtable e upsertkey:
- Python
- Scala
(df.writeStream
.format("postgresql")
.outputMode("update")
.option("endpoint", "<project-id>.<branch-id>.<endpoint-id>")
.option("database", "<database>")
.option("dbtable", "<schema>.<table>")
.option("upsertkey", "<primary-key-column>")
.option("checkpointLocation", "/checkpoints/<query-name>")
.start()
)
df.writeStream
.format("postgresql")
.outputMode("update")
.option("endpoint", "<project-id>.<branch-id>.<endpoint-id>")
.option("database", "<database>")
.option("dbtable", "<schema>.<table>")
.option("upsertkey", "<primary-key-column>")
.option("checkpointLocation", "/checkpoints/<query-name>")
.start()
Opções de configuração
O sink gera um erro para opções não reconhecidas, JDBC_STREAMING_SINK_INVALID_OPTIONS.
As seguintes opções se aplicam a todos os métodos de conexão:
Chave | Padrão | Descrição |
|---|---|---|
|
| O tempo máximo para manter linhas no buffer antes do esvaziamento. Por exemplo, |
|
| O número máximo de linhas para cada transação de banco de dados. |
| Nenhuma | Obrigatório. Caminho para o diretório de ponto de verificação. |
| Nenhuma | Uma lista separada por vírgulas de nomes de colunas que formam a chave de upsert. Por exemplo: |
Tabelas do Lakebase não registradas com o Unity Catalog
As seguintes opções se aplicam ao se conectar a uma tabela do Lakebase não registrada com o Unity Catalog:
Chave | Padrão | Descrição |
|---|---|---|
| Nenhuma | O nome da base de dados PostgreSQL de destino. |
| Nenhuma | O nome da tabela de destino no formato |
| Nenhuma | Especifique o endpoint do Lakebase no formato |
Comportamento de Upsert
Quando existirem chaves de upsert, seja especificadas com upsertkey ou inferidas pelo sink a partir das chaves primárias da tabela, o sink realiza upsert na tabela com a sintaxe INSERT INTO ... ON CONFLICT (<upsert_key>) DO UPDATE SET ... do PostgreSQL.
Quando não existem chaves de upsert, o coletor realiza inserções. O modo de saída da consulta não tem efeito sobre o comportamento de upsert ou insert.
As upsertkey colunas devem:
- Seja um subconjunto não vazio das colunas do DataFrame.
- Faça referência a uma coluna da tabela de destino com uma restrição
PRIMARY KEY. - Devem ser tipos comparáveis, como tipos numéricos ou strings. Para evitar impasses no banco de dados durante gravações concorrentes, o coletor ordena as linhas pela chave de upsert dentro de cada lote. Chaves de upsert não suportam tipos complexos ou struct.
Nomes de coluna são automaticamente citados com o default do PostgreSQL, aspas duplas ", que trata de palavras-chave reservadas, nomes com maiúsculas e minúsculas e caracteres especiais.
O destino não cita os nomes das tabelas e os passa como estão para o banco de dados. Você deve colocar entre aspas os nomes de tabela com caracteres especiais, como "my-schema"."my-table".
Ajuste de desempenho
Processamento em lote e contrapressão
Um flush é acionado quando uma das condições é atendida:
- O buffer atinge
batchsizelinhas, cujo padrão é1000. - A idade do buffer excede
batchinterval, que por default é100 milliseconds.
Quando o banco de dados não consegue acompanhar a taxa de dados de entrada, o destino propaga contrapressão a montante para a fonte.
Latência e Taxa de transferência: Orientações
- Para cargas de trabalho de baixa latência com modo em tempo real, diminua
batchintervalpara garantir um tempo máximo menor antes da descarga. Consulte modo em tempo real em Transmissão estructurada. - Para cargas de trabalho de alta taxa de transferência, aumente
batchsizepara reduzir a sobrecarga de cada transação.
Comportamento da conexão
O sink usa pooling de conexão em executors. Por padrão, cada tarefa usa uma conexão de banco de dados.
O Databricks recomenda que você use o valor default de 1 tarefa para cada conexão. Ao aumentar o número de tarefas para cada conexão, poderá causar contenções de conexão e aumentar as latências para conexões de alta taxa de transferência.
Para configurar a proporção de tarefas para conexões, defina a configuração do Spark spark.databricks.sql.streaming.jdbc.tasksPerConnection. Se o banco de dados de destino tiver um limite de conexão baixo, reduza o número de partições em ordem aleatória ou aumente spark.databricks.sql.streaming.jdbc.tasksPerConnection.
O sink tenta novamente, de forma automática, erros transitórios de JDBC, incluindo falhas de conexão, deadlocks e limitação de taxa. Se o coletor esgotar todas as tentativas, a consulta falhará.
Gatilhos e modos de saída compatíveis
Gatilhos
Esta tabela mostra suporte para tipos de trigger de transmissão estructurada.
Trigger | Suportado |
|---|---|
| Sim |
| Sim |
| Sim |
| Sim |
Modos de saída
Esta tabela mostra suporte para modos de saída de transmissão estructurada.
Modo de saída | Suportado |
|---|---|
| Sim |
| Sim. O comportamento é idêntico a |
| Não |
Limitações
- O compute serverless e Lakeflow Spark Declarative Pipelines não são suportados.
- Somente Lakebase tem suporte como destino de gravação. Bancos de dados externos compatíveis com PostgreSQL não são suportados.