Conectar ao Lakebase
Visualização
Esse recurso está em Prévia Pública.
Use a transmissão estructurada para gravar no Lakebase com lotes integrada, novas tentativas automáticas e autenticação gerenciada pelo workspace.
Quando usar o destino do Lakebase
Use o coletor Lakebase para gravações de transmissão de baixa latência no Lakebase. Este coletor não exige a implementação de funções foreachBatch personalizadas para o gerenciamento de lotes, o gerenciamento de conexão e o tratamento de erros.
Casos de uso comuns incluem:
- Atualize bancos de dados de aplicativos em tempo real para painéis operacionais ou recursos voltados para o cliente.
- Sincronizar dados em constante alteração, como resultados de transmissão agregados ou filtrados, em um banco de dados transacional.
- Grave a saída de uma consulta de transmissão estructurada em uma tabela Lakebase com latência de subsegundo usando modo em tempo real.
Para sincronizar dados do Lakebase com as tabelas do Delta Lake no Lakehouse, no sentido inverso, consulte Lakebase Change Data Feed.
Requisitos
- Databricks Runtime 18 e acima
- Compute clássico com modos de acesso Dedicado ou Padrão.
- Um banco de dados Lakebase
Conecte-se a um banco de dados
O coletor do Lakebase oferece suporte aos seguintes métodos de conexão:
Tabelas do Lakebase registradas no Unity Catalog
Para tabelas Lakebase registradas no Unity Catalog, o conector gerencia automaticamente as credenciais e usa a identidade do usuário ou da entidade de serviço do Databricks executando a consulta. Se a tabela não existir, o conector cria a tabela.
Para registrar um banco de dados Lakebase no Unity Catalog, consulte Registrar um banco de dados Lakebase no Unity Catalog.
Para gravar em uma tabela Lakebase, use o método .toTable() com um nome de tabela totalmente qualificado, catalog.schema.table. O exemplo a seguir mostra as opções necessárias, além da opção opcional upsertkey:
- Python
- Scala
(df.writeStream
.format("postgresql")
.outputMode("update")
.option("upsertkey", "<primary-key-column>") # Optional. Inferred from the table's primary key if omitted.
.option("checkpointLocation", "/Volumes/<catalog>/<schema>/<volume>/<checkpoint-name>")
.toTable("<catalog>.<schema>.<table>")
)
df.writeStream
.format("postgresql")
.outputMode("update")
.option("upsertkey", "<primary-key-column>") // Optional. Inferred from the table's primary key if omitted.
.option("checkpointLocation", "/Volumes/<catalog>/<schema>/<volume>/<checkpoint-name>")
.toTable("<catalog>.<schema>.<table>")
Substitua os seguintes espaços reservados:
<catalog>.<schema>.<table>: O nome totalmente qualificado da tabela de destino. Ocatalogé o catálogo do Unity Catalog que você criou quando registrou o banco de dados Lakebase. Consulte Registrar um banco de dados Lakebase no Unity Catalog. Se a tabela não existir, o conector a cria.<primary-key-column>: Opcional. Uma lista separada por vírgulas das colunas que formam a chave de upsert, por exemploidouuser_id,event_type. Se você omitirupsertkey, o coletor inferirá a chave da chave primária da tabela de destino. Consulte o comportamento de Upsert./Volumes/<catalog>/<schema>/<volume>/<checkpoint-name>: Um caminho de volume do Unity Catalog onde a consulta armazena seu ponto de verificação. Você também pode usar um URI de armazenamento de objeto em cloud. O local deve ser um armazenamento onde você pode gravar, não um disco local, e deve ser exclusivo para cada consulta de transmissão. Isso é independente da tabela de destino. Consulte pontos de verificação de transmissão estructurada.
Para configurações opcionais, como batchsize e batchinterval, consulte Opções de configuração.
Tabelas do Lakebase não registradas no Unity Catalog
Para tabelas do Lakebase não registradas no Unity Catalog, o conector gerencia automaticamente as credenciais e usa a identidade do usuário ou da entidade de serviço do Databricks que executa a consulta.
Para gravar em uma tabela Lakebase, use as opções endpoint e dbtable. O exemplo a seguir também inclui as opções opcionais database e upsertkey:
- Python
- Scala
(df.writeStream
.format("postgresql")
.outputMode("update")
.option("endpoint", "<project-id>.<branch-id>.<endpoint-id>")
.option("database", "<database>") # Optional. Defaults to databricks_postgres.
.option("dbtable", "<schema>.<table>")
.option("upsertkey", "<primary-key-column>") # Optional. Inferred from the table's primary key if omitted.
.option("checkpointLocation", "/Volumes/<catalog>/<schema>/<volume>/<checkpoint-name>")
.start()
)
df.writeStream
.format("postgresql")
.outputMode("update")
.option("endpoint", "<project-id>.<branch-id>.<endpoint-id>")
.option("database", "<database>") // Optional. Defaults to databricks_postgres.
.option("dbtable", "<schema>.<table>")
.option("upsertkey", "<primary-key-column>") // Optional. Inferred from the table's primary key if omitted.
.option("checkpointLocation", "/Volumes/<catalog>/<schema>/<volume>/<checkpoint-name>")
.start()
Substitua os seguintes espaços reservados:
<project-id>.<branch-id>.<endpoint-id>: Seu endpoint Lakebase. Encontre todos os três valores no nome do recurso no menu Obter ID da tab Computes , que tem o formatoprojects/<project-id>/branches/<branch-id>/endpoints/<endpoint-id>. Consulte Identificadores de Compute.<database>: Opcional. O nome do banco de dados Postgres de destino. O default édatabricks_postgres. Consultar Gerenciar bancos de dados.<schema>.<table>: A tabela de destino no formatoschema.table. Se você omitir o esquema, o sink usa o esquemapublic. Use identificadores simples que comecem com uma letra ou sublinhado e contenham apenas letras, números e sublinhados; identificadores entre aspas e caracteres especiais, como hifens, não são suportados.<primary-key-column>: Opcional. Uma lista separada por vírgulas das colunas que formam a chave de upsert, por exemploidouuser_id,event_type. Se você omitirupsertkey, o coletor inferirá a chave da chave primária da tabela de destino. Consulte o comportamento de Upsert./Volumes/<catalog>/<schema>/<volume>/<checkpoint-name>: Um caminho de volume do Unity Catalog onde a consulta armazena seu ponto de verificação. Você também pode usar um URI de armazenamento de objeto em cloud. O local deve ser um armazenamento onde você pode gravar, não um disco local, e deve ser exclusivo para cada consulta de transmissão. Isso é independente da tabela de destino. Consulte pontos de verificação de transmissão estructurada.
Para configurações opcionais, como batchsize e batchinterval, consulte Opções de configuração.
Opções de configuração
O sink gera um erro para opções não reconhecidas, JDBC_STREAMING_SINK_INVALID_OPTIONS.
As seguintes opções se aplicam a todos os métodos de conexão:
Chave | Padrão | Descrição |
|---|---|---|
|
| Opcional. O tempo máximo para reter linhas no buffer antes de liberar. Por exemplo, |
|
| Opcional. O número máximo de linhas para cada transação de banco de dados. |
| Nenhuma | Obrigatório. Caminho para um diretório de ponto de verificação, como um volume do Unity Catalog ( |
| Nenhuma | Opcional. Uma lista separada por vírgulas de nomes de colunas que formam a chave upsert. Por exemplo, |
Tabelas do Lakebase não registradas com o Unity Catalog
As seguintes opções se aplicam ao se conectar a uma tabela do Lakebase não registrada com o Unity Catalog:
Chave | Padrão | Descrição |
|---|---|---|
|
| Opcional. O nome do banco de dados PostgreSQL de destino. |
| Nenhuma | Obrigatório. O nome da tabela de destino no formato |
| Nenhuma | Obrigatório. O endpoint do Lakebase, no formato |
Comportamento de Upsert
Quando existirem chaves de upsert, seja especificadas com upsertkey ou inferidas pelo sink a partir das chaves primárias da tabela, o sink realiza upsert na tabela com a sintaxe INSERT INTO ... ON CONFLICT (<upsert_key>) DO UPDATE SET ... do PostgreSQL.
Quando não existem chaves de upsert, o coletor realiza inserções. O modo de saída da consulta não tem efeito sobre o comportamento de upsert ou insert.
As upsertkey colunas devem:
- Seja um subconjunto não vazio das colunas do DataFrame.
- Corresponda o(a)
PRIMARY KEYda tabela de destino exatamente. Se as colunas especificadas não corresponderem à chave primária, a consulta falha. - Devem ser tipos comparáveis, como tipos numéricos ou strings. Para evitar impasses no banco de dados durante gravações concorrentes, o coletor ordena as linhas pela chave de upsert dentro de cada lote. Chaves de upsert não suportam tipos complexos ou struct.
Os nomes das colunas são automaticamente colocados entre aspas com o default do PostgreSQL, aspas duplas ", o que trata palavras reservadas e nomes com maiúsculas e minúsculas misturadas.
Nomes de tabelas e esquemas devem usar identificadores simples que comecem com uma letra ou sublinhado e contenham apenas letras, números e sublinhados. O coletor não suporta identificadores entre aspas ou caracteres especiais, como hífens, em nomes de tabelas ou esquemas.
Ajuste de desempenho
Processamento em lote e contrapressão
Um flush é acionado quando uma das condições é atendida:
- O buffer atinge
batchsizelinhas, cujo padrão é1000. - A idade do buffer excede
batchinterval, que por default é100 milliseconds.
Quando o banco de dados não consegue acompanhar a taxa de dados de entrada, o destino propaga contrapressão a montante para a fonte.
Latência e Taxa de transferência: Orientações
- Para cargas de trabalho de baixa latência com modo em tempo real, diminua
batchintervalpara garantir um tempo máximo menor antes da descarga. Consulte modo em tempo real em Transmissão estructurada. - Para cargas de trabalho de alta taxa de transferência, aumente
batchsizepara reduzir a sobrecarga de cada transação.
Comportamento da conexão
O sink usa pooling de conexão em executors. Por padrão, cada tarefa usa uma conexão de banco de dados.
O Databricks recomenda que você use o valor default de 1 tarefa para cada conexão. Ao aumentar o número de tarefas para cada conexão, poderá causar contenções de conexão e aumentar as latências para conexões de alta taxa de transferência.
Para configurar a proporção de tarefas para conexões, defina a configuração do Spark spark.databricks.sql.streaming.jdbc.tasksPerConnection. Se o banco de dados de destino tiver um limite de conexão baixo, reduza o número de partições em ordem aleatória ou aumente spark.databricks.sql.streaming.jdbc.tasksPerConnection.
O sink tenta novamente, de forma automática, erros transitórios de JDBC, incluindo falhas de conexão, deadlocks e limitação de taxa. Se o coletor esgotar todas as tentativas, a consulta falhará.
Gatilhos e modos de saída compatíveis
Gatilhos
Esta tabela mostra suporte para tipos de trigger de transmissão estructurada.
Trigger | Suportado |
|---|---|
| Sim |
| Sim |
| Sim |
| Sim |
Modos de saída
Esta tabela mostra suporte para modos de saída de transmissão estructurada.
Modo de saída | Suportado |
|---|---|
| Sim |
| Sim. O comportamento é idêntico a |
| Não |
Limitações
- O compute serverless e Lakeflow Spark Declarative Pipelines não são suportados.
- Somente Lakebase tem suporte como destino de gravação. Bancos de dados externos compatíveis com PostgreSQL não são suportados.