Usar um Notebook para acessar uma instância de banco de dados
Visualização
Esse recurso está em Public Preview nas seguintes regiões: us-east-1
, us-west-2
, eu-west-1
, ap-southeast-1
, ap-southeast-2
, eu-central-1
, us-east-2
, ap-south-1
.
Esta página contém exemplos de código que mostram como acessar a instância do banco de dados do Lakebase por meio do Databricks Notebook e executar consultas usando Python e Scala.
Os exemplos abrangem diferentes estratégias de conexão para se adequar a diferentes casos de uso:
- Conexão única : usada para scripts simples em que uma única conexão de banco de dados é aberta, usada e fechada.
- Conexão pool : Usada para cargas de trabalho de alta simultaneidade, em que é mantida uma pool de conexões reutilizáveis.
- Tokens rotativos M2M OAuth: Usa OAuth tokens de curta duração, atualizados automaticamente para autenticação.
Os exemplos a seguir geram credenciais seguras de forma programática. Evite colocar credenciais diretamente em um Notebook. A Databricks recomenda o uso de um dos seguintes métodos seguros:
- Armazene as senhas do Postgres nos segredos do Databricks.
- Gerar tokens OAuth usando o M2M OAuth.
Antes de começar
Certifique-se de atender aos seguintes requisitos antes de acessar sua instância de banco de dados:
- O senhor tem uma função Postgres correspondente para log in na instância do banco de dados. Consulte gerenciar funções do Postgres.
- Sua função Postgres recebe as permissões necessárias para acessar o banco de dados, esquema ou tabela.
- Você pode se autenticar na instância do banco de dados. Se o senhor precisar obter manualmente um token OAuth para a instância do banco de dados, consulte Autenticação em uma instância do banco de dados.
Python
O Databricks Python SDK pode ser usado para obter tokens OAuth para uma respectiva instância de banco de dados.
Conecte-se à instância do banco de dados a partir de um Notebook Databricks usando a seguinte Python biblioteca:
psycopg2
psycopg3
SQLAlchemy
psycopg2
Os exemplos de código demonstram uma única conexão e o uso de um pool de conexões. Para saber mais sobre como obter a instância do banco de dados e as credenciais de forma programática, consulte como obter tokens OAuth usando o Python SDK .
- Single connection
- Connection pool
import psycopg2
from databricks.sdk import WorkspaceClient
import uuid
w = WorkspaceClient()
instance_name = "<YOUR INSTANCE>"
instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])
# Connection parameters
conn = psycopg2.connect(
host = instance.read_write_dns,
dbname = "databricks_postgres",
user = "<YOUR USER>",
password = cred.token,
sslmode = "require"
)
# Execute query
with conn.cursor() as cur:
cur.execute("SELECT version()")
version = cur.fetchone()[0]
print(version)
conn.close()
import psycopg2
from psycopg2 import sql, pool
from pyspark.sql.functions import udf
from databricks.sdk import WorkspaceClient
import uuid
w = WorkspaceClient()
instance_name = "<YOUR INSTANCE>"
instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])
# Create a connection pool
connection_pool = psycopg2.pool.ThreadedConnectionPool(
minconn = 1, # Minimum number of connections in the pool
maxconn = 10, # Maximum number of connections in the pool
user = "<YOUR USER>",
password = cred.token,
host = instance.read_write_dns,
port = '5432',
database = 'databricks_postgres'
)
if connection_pool:
print("Connection pool created successfully")
def executeWithPgConnection(execFn):
connection = None
try:
# Get a connection from the pool
connection = connection_pool.getconn()
if connection:
print("Successfully received a connection from the pool")
execFn(connection)
finally:
# Release the connection back to the pool
if connection:
connection_pool.putconn(connection)
print("Connection returned to the pool")
def printVersion(connection):
cursor = connection.cursor()
cursor.execute("SELECT version()")
version = cursor.fetchone()
print(f"Connected to PostgreSQL database. Version: {version}")
executeWithPgConnection(printVersion)
psycopg3
O exemplo de código demonstra o uso de um pool de conexões com um M2M OAuth rotativo. Ele usa generate_database_credential()
. Para saber mais sobre como obter a instância do banco de dados e as credenciais de forma programática, consulte como obter tokens OAuth usando o Python SDK .
%pip install "psycopg[binary,pool]"
from databricks.sdk import WorkspaceClient
import uuid
import psycopg
import string
from psycopg_pool import ConnectionPool
w = WorkspaceClient()
class CustomConnection(psycopg.Connection):
global w
def __init__(self, *args, **kwargs):
# Call the parent class constructor
super().__init__(*args, **kwargs)
@classmethod
def connect(cls, conninfo='', **kwargs):
# Append the new password to kwargs
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])
kwargs['password'] = cred.token
# Call the superclass's connect method with updated kwargs
return super().connect(conninfo, **kwargs)
username = "<YOUR USER>"
instance_name = "<YOUR INSTANCE>"
instance = w.database.get_database_instance(name=instance_name)
host = instance.read_write_dns
port = 5432
database = "databricks_postgres"
pool = ConnectionPool(
conninfo=f"dbname={database} user={username} host={host}",
connection_class=CustomConnection,
min_size=1,
max_size=10,
open=True
)
with pool.connection() as conn:
with conn.cursor() as cursor:
cursor.execute("SELECT version()")
for record in cursor:
print(record)
SQLAlchemy
Os exemplos de código demonstram uma única conexão e o uso de uma conexão pool com tokens rotativos M2M OAuth. Para saber mais sobre como obter a instância do banco de dados e as credenciais de forma programática, consulte como obter tokens OAuth usando o Python SDK .
- Single connection
- Connection pool & rotating M2M OAuth
%pip install sqlalchemy==1.4 psycopg[binary]
from sqlalchemy import create_engine, text
from databricks.sdk import WorkspaceClient
import uuid
w = WorkspaceClient()
instance_name = "<YOUR INSTANCE>"
instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])
user = "<YOUR USER>"
host = instance.read_write_dns
port = 5432
database = "databricks_postgres"
password = cred.token
connection_pool = create_engine(f"postgresql://{user}:{password}@{host}:{port}/{database}?sslmode=require")
with connection_pool.connect() as conn:
result = conn.execute(text("SELECT version()"))
for row in result:
print(f"Connected to PostgreSQL database. Version: {row}")
%pip install sqlalchemy==1.4 psycopg[binary]
from databricks.sdk import WorkspaceClient
import uuid
import time
from sqlalchemy import create_engine, text, event
w = WorkspaceClient()
instance_name = "<YOUR INSTANCE>"
instance = w.database.get_database_instance(name=instance_name)
username = "<YOUR USER>"
host = instance.read_write_dns
port = 5432
database = "databricks_postgres"
# sqlalchemy setup + function to refresh the OAuth token that is used as the Postgres password every 15 minutes.
connection_pool = create_engine(f"postgresql+psycopg2://{username}:@{host}:{port}/{database}")
postgres_password = None
last_password_refresh = time.time()
@event.listens_for(connection_pool, "do_connect")
def provide_token(dialect, conn_rec, cargs, cparams):
global postgres_password, last_password_refresh, host
if postgres_password is None or time.time() - last_password_refresh > 900:
print("Refreshing PostgreSQL OAuth token")
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])
postgres_password = cred.token
last_password_refresh = time.time()
cparams["password"] = postgres_password
with connection_pool.connect() as conn:
result = conn.execute(text("SELECT version()"))
for row in result:
print(f"Connected to PostgreSQL database. Version: {row}")
Scala
Os exemplos de código mostram como obter programaticamente a instância e as credenciais do banco de dados e como se conectar a uma instância do banco de dados usando uma única conexão ou um pool de conexões.
Etapa 1: Use o endereço Databricks Java SDK para obter tokens OAuth
Para obter detalhes sobre como obter a instância do banco de dados e as credenciais de forma programática, consulte como obter tokens OAuth usando o Java SDK .
Etapa 2: conectar-se a uma instância de banco de dados
- Single connection
- Connection pool
import java.sql.{Connection, DriverManager, ResultSet, Statement}
Class.forName("org.postgresql.Driver")
val user = "<YOUR USER>"
val host = instance.getName()
val port = "5432"
val database = "databricks_postgres"
val password = cred.getToken()
val url = f"jdbc:postgresql://${host}:${port}/${database}"
val connection = DriverManager.getConnection(url, user, password)
println("Connected to PostgreSQL database!")
val statement = connection.createStatement()
val resultSet = statement.executeQuery("SELECT version()")
if (resultSet.next()) {
val version = resultSet.getString(1)
println(s"PostgreSQL version: $version")
}
import com.zaxxer.hikari.{HikariConfig, HikariDataSource}
import java.sql.Connection
// Configure HikariCP
val config = new HikariConfig()
config.setJdbcUrl("jdbc:postgresql://instance.getName():5432/databricks_postgres")
config.setUsername("<YOUR USER>")
config.setPassword(cred.getToken())
config.setMaximumPoolSize(10)
// Create a data source
val dataSource = new HikariDataSource(config)
// Function to get a connection and execute a query
def runQuery(): Unit = {
var connection: Connection = null
try {
// Get a connection from the pool
connection = dataSource.getConnection()
// Create a statement
val statement = connection.createStatement()
// Execute a query
val resultSet = statement.executeQuery("SELECT version() AS v;")
// Process the result set
while (resultSet.next()) {
val v = resultSet.getString("v")
println(s"*******Connected to PostgreSQL database. Version: $v")
}
} catch {
case e: Exception => e.printStackTrace()
} finally {
// Close the connection which returns it to the pool
if (connection != null) connection.close()
}
}
// Run the query
runQuery()
// Close the data source
dataSource.close()