Usar um Notebook para acessar uma instância de banco de dados
O provisionamento Lakebase é a oferta original Lakebase que usa compute de provisionamento que você escala manualmente. Para regiões compatíveis, consulte Disponibilidade por região. Para obter a versão mais recente do Lakebase, com compute, redução de recursos a zero, ramificação e restauração instantânea, consulte a documentação sobre escalonamento automáticoLakebase.
Desde 12 de março de 2026, novas instâncias Lakebase são criadas como projetos de escalonamento automático. As instâncias de provisionamento existentes serão atualizadas automaticamente para escalonamento automático a partir de junho de 2026. Para obter detalhes, consulte Atualização para dimensionamento automático Lakebase.
Esta página contém exemplos de código que mostram como acessar a instância do banco de dados do Lakebase por meio do Databricks Notebook e executar consultas usando Python e Scala.
Os exemplos abrangem diferentes estratégias de conexão para se adequar a diferentes casos de uso:
- Conexão única : usada para scripts simples em que uma única conexão de banco de dados é aberta, usada e fechada.
- Conexão pool : Usada para cargas de trabalho de alta simultaneidade, em que é mantida uma pool de conexões reutilizáveis.
- Tokens rotativos M2M OAuth: Usa OAuth tokens de curta duração, atualizados automaticamente para autenticação.
Os exemplos a seguir geram credenciais seguras de forma programática. Evite colocar credenciais diretamente em um Notebook. A Databricks recomenda o uso de um dos seguintes métodos seguros:
- Armazene as senhas do Postgres nos segredos do Databricks.
- Gerar tokens OAuth usando o M2M OAuth.
Antes de começar
Certifique-se de atender aos seguintes requisitos antes de acessar sua instância de banco de dados:
- Você possui uma função correspondente no Postgres para log in na instância do banco de dados. Consulte Funções do Postgres.
- Sua função Postgres recebe as permissões necessárias para acessar o banco de dados, esquema ou tabela.
- Você pode se autenticar na instância do banco de dados. Se o senhor precisar obter manualmente um token OAuth para a instância do banco de dados, consulte Autenticação em uma instância do banco de dados.
Se você estiver usando um link privado, precisará usar um cluster de usuário único.
Python
O Databricks Python SDK pode ser usado para obter tokens OAuth para uma respectiva instância de banco de dados.
Conecte-se à instância do banco de dados a partir de um Notebook Databricks usando a seguinte Python biblioteca:
psycopg(psycopg3)psycopgcom agrupamento de conexõesSQLAlchemy
Pré-requisitos
Antes de executar os exemplos de código a seguir, atualize o SDK Databricks para Python para a versão 0.61.0 ou superior e reinicie Python.
%pip install databricks-sdk>=0.61.0
%restart_python
psycopg
Os exemplos de código demonstram uma única conexão e o uso de um pool de conexões. Para saber mais sobre como obter a instância do banco de dados e as credenciais de forma programática, consulte como obter tokens OAuth usando o Python SDK .
%pip install "psycopg[binary,pool]"
- Single connection
- Connection pool
import psycopg
from databricks.sdk import WorkspaceClient
import uuid
w = WorkspaceClient()
instance_name = "<YOUR INSTANCE>"
instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])
# Connection parameters
conn = psycopg.connect(
host = instance.read_write_dns,
dbname = "databricks_postgres",
user = "<YOUR USER>",
password = cred.token,
sslmode = "require"
)
# Execute query
with conn.cursor() as cur:
cur.execute("SELECT version()")
version = cur.fetchone()[0]
print(version)
conn.close()
import psycopg
from psycopg_pool import ConnectionPool
from databricks.sdk import WorkspaceClient
import uuid
w = WorkspaceClient()
instance_name = "<YOUR INSTANCE>"
instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])
# Create a connection pool
connection_pool = ConnectionPool(
conninfo=f"dbname=databricks_postgres user=<YOUR USER> host={instance.read_write_dns} port=5432 password={cred.token} sslmode=require",
min_size=1, # Minimum number of connections in the pool
max_size=10, # Maximum number of connections in the pool
open=True
)
print("Connection pool created successfully")
def executeWithPgConnection(execFn):
with connection_pool.connection() as connection:
print("Successfully received a connection from the pool")
execFn(connection)
print("Connection returned to the pool")
def printVersion(connection):
cursor = connection.cursor()
cursor.execute("SELECT version()")
version = cursor.fetchone()
print(f"Connected to PostgreSQL database. Version: {version}")
executeWithPgConnection(printVersion)
psycopg3
O exemplo de código demonstra o uso de um pool de conexões com um M2M OAuth rotativo. Ele usa generate_database_credential(). Para saber mais sobre como obter a instância do banco de dados e as credenciais de forma programática, consulte como obter tokens OAuth usando o Python SDK .
%pip install "psycopg[binary,pool]"
from databricks.sdk import WorkspaceClient
import uuid
import psycopg
import string
from psycopg_pool import ConnectionPool
w = WorkspaceClient()
class CustomConnection(psycopg.Connection):
global w
def __init__(self, *args, **kwargs):
# Call the parent class constructor
super().__init__(*args, **kwargs)
@classmethod
def connect(cls, conninfo='', **kwargs):
# Append the new password to kwargs
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])
kwargs['password'] = cred.token
# Call the superclass's connect method with updated kwargs
return super().connect(conninfo, **kwargs)
username = "<YOUR USER>"
instance_name = "<YOUR INSTANCE>"
instance = w.database.get_database_instance(name=instance_name)
host = instance.read_write_dns
port = 5432
database = "databricks_postgres"
pool = ConnectionPool(
conninfo=f"dbname={database} user={username} host={host}",
connection_class=CustomConnection,
min_size=1,
max_size=10,
open=True
)
with pool.connection() as conn:
with conn.cursor() as cursor:
cursor.execute("SELECT version()")
for record in cursor:
print(record)
SQLAlchemy
Os exemplos de código demonstram uma única conexão e o uso de uma conexão pool com tokens rotativos M2M OAuth. Para saber mais sobre como obter a instância do banco de dados e as credenciais de forma programática, consulte como obter tokens OAuth usando o Python SDK .
- Single connection
- Connection pool & rotating M2M OAuth
%pip install "sqlalchemy>=2.0" "psycopg[binary]"
from sqlalchemy import create_engine, text
from databricks.sdk import WorkspaceClient
import uuid
w = WorkspaceClient()
instance_name = "<YOUR INSTANCE>"
instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])
user = "<YOUR USER>"
host = instance.read_write_dns
port = 5432
database = "databricks_postgres"
password = cred.token
connection_pool = create_engine(f"postgresql+psycopg://{user}:{password}@{host}:{port}/{database}?sslmode=require")
with connection_pool.connect() as conn:
result = conn.execute(text("SELECT version()"))
for row in result:
print(f"Connected to PostgreSQL database. Version: {row}")
%pip install "sqlalchemy>=2.0" "psycopg[binary]"
from databricks.sdk import WorkspaceClient
import uuid
import time
from sqlalchemy import create_engine, text, event
w = WorkspaceClient()
instance_name = "<YOUR INSTANCE>"
instance = w.database.get_database_instance(name=instance_name)
username = "<YOUR USER>"
host = instance.read_write_dns
port = 5432
database = "databricks_postgres"
# sqlalchemy setup + function to refresh the OAuth token that is used as the Postgres password every 15 minutes.
connection_pool = create_engine(f"postgresql+psycopg://{username}:@{host}:{port}/{database}")
postgres_password = None
last_password_refresh = time.time()
@event.listens_for(connection_pool, "do_connect")
def provide_token(dialect, conn_rec, cargs, cparams):
global postgres_password, last_password_refresh, host
if postgres_password is None or time.time() - last_password_refresh > 900:
print("Refreshing PostgreSQL OAuth token")
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])
postgres_password = cred.token
last_password_refresh = time.time()
cparams["password"] = postgres_password
with connection_pool.connect() as conn:
result = conn.execute(text("SELECT version()"))
for row in result:
print(f"Connected to PostgreSQL database. Version: {row}")
Scala
Os exemplos de código mostram como obter programaticamente a instância e as credenciais do banco de dados e como se conectar a uma instância do banco de dados usando uma única conexão ou um pool de conexões.
o passo 1: Use o endereço Databricks Java SDK para obter um token OAuth
Para obter detalhes sobre como obter a instância do banco de dados e as credenciais de forma programática, consulte como obter tokens OAuth usando o Java SDK .
o passo 2: Conectar-se a uma instância de banco de dados
- Single connection
- Connection pool
import java.sql.{Connection, DriverManager, ResultSet, Statement}
Class.forName("org.postgresql.Driver")
val user = "<YOUR USER>"
val host = instance.getName()
val port = "5432"
val database = "databricks_postgres"
val password = cred.getToken()
val url = f"jdbc:postgresql://${host}:${port}/${database}"
val connection = DriverManager.getConnection(url, user, password)
println("Connected to PostgreSQL database!")
val statement = connection.createStatement()
val resultSet = statement.executeQuery("SELECT version()")
if (resultSet.next()) {
val version = resultSet.getString(1)
println(s"PostgreSQL version: $version")
}
import com.zaxxer.hikari.{HikariConfig, HikariDataSource}
import java.sql.Connection
// Configure HikariCP
val config = new HikariConfig()
config.setJdbcUrl("jdbc:postgresql://instance.getName():5432/databricks_postgres")
config.setUsername("<YOUR USER>")
config.setPassword(cred.getToken())
config.setMaximumPoolSize(10)
// Create a data source
val dataSource = new HikariDataSource(config)
// Function to get a connection and execute a query
def runQuery(): Unit = {
var connection: Connection = null
try {
// Get a connection from the pool
connection = dataSource.getConnection()
// Create a statement
val statement = connection.createStatement()
// Execute a query
val resultSet = statement.executeQuery("SELECT version() AS v;")
// Process the result set
while (resultSet.next()) {
val v = resultSet.getString("v")
println(s"*******Connected to PostgreSQL database. Version: $v")
}
} catch {
case e: Exception => e.printStackTrace()
} finally {
// Close the connection which returns it to the pool
if (connection != null) connection.close()
}
}
// Run the query
runQuery()
// Close the data source
dataSource.close()