Use a notebook to access a database instance
This feature is in Public Preview in the following regions: us-east-1
, us-west-2
, eu-west-1
, ap-southeast-1
, ap-southeast-2
, eu-central-1
, us-east-2
, ap-south-1
.
This page contains code examples that show you how to access your Lakebase database instance through Databricks notebooks and run queries using Python and Scala.
The examples cover different connection strategies to suit different use cases:
- Single connection: Used for simple scripts where a single database connection is opened, used, and closed.
- Connection pool: Used for high-concurrency workloads, where a pool of reusable connections is maintained.
- Rotating M2M OAuth token: Uses short-lived, automatically refreshed OAuth tokens for authentication.
The following examples programmatically generate secure credentials. Avoid directly putting credentials in a notebook. Databricks recommends using one of the following secure methods:
- Store Postgres passwords in Databricks secrets.
- Generate OAuth tokens using M2M OAuth.
Before you begin
Ensure you meet the following requirements before accessing your database instance:
- You have a corresponding Postgres role to log in to the database instance. See Create and manage Postgres roles for Databricks identities.
- Your Postgres role is granted the necessary permissions to access the database, schema, or table.
- You can authenticate to the database instance. If you must manually obtain an OAuth token for your database instance, see Authenticate to database instance.
Python
The Databricks Python SDK can be used to obtain an OAuth token for a respective database instance.
Connect to your database instance from a Databricks notebook using the following Python libraries:
psycopg2
psycopg3
SQLAlchemy
Use the Databricks Python SDK to obtain an OAuth token
Databricks SDK bindings are available in Python SDK version v0.56.0. If you are running with an older version of the SDK, run these commands first.
%pip install --upgrade databricks-sdk
%restart_python
The Databricks SDK generates a secure OAuth token, cred
for your database instance, instance
. Enter your database instance name where needed.
from databricks.sdk import WorkspaceClient
import uuid
w = WorkspaceClient()
instance_name = "<YOUR INSTANCE>"
instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])
psycopg2
The code examples demonstrate a single connection and the use of a connection pool. For more on how to obtain the database instance and credentials programmatically, see Use the Databricks Python SDK to obtain an OAuth token.
- Single connection
- Connection pool
import psycopg2
from databricks.sdk import WorkspaceClient
import uuid
w = WorkspaceClient()
instance_name = "<YOUR INSTANCE>"
instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])
# Connection parameters
conn = psycopg2.connect(
host = instance.read_write_dns,
dbname = "databricks_postgres",
user = "<YOUR USER>",
password = cred.token,
sslmode = "require"
)
# Execute query
with conn.cursor() as cur:
cur.execute("SELECT version()")
version = cur.fetchone()[0]
print(version)
conn.close()
import psycopg2
from psycopg2 import sql, pool
from pyspark.sql.functions import udf
from databricks.sdk import WorkspaceClient
import uuid
w = WorkspaceClient()
instance_name = "<YOUR INSTANCE>"
instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])
# Create a connection pool
connection_pool = psycopg2.pool.ThreadedConnectionPool(
minconn = 1, # Minimum number of connections in the pool
maxconn = 10, # Maximum number of connections in the pool
user = "<YOUR USER>",
password = cred.token,
host = instance.read_write_dns,
port = '5432',
database = 'databricks_postgres'
)
if connection_pool:
print("Connection pool created successfully")
def executeWithPgConnection(execFn):
connection = None
try:
# Get a connection from the pool
connection = connection_pool.getconn()
if connection:
print("Successfully received a connection from the pool")
execFn(connection)
finally:
# Release the connection back to the pool
if connection:
connection_pool.putconn(connection)
print("Connection returned to the pool")
def printVersion(connection):
cursor = connection.cursor()
cursor.execute("SELECT version()")
version = cursor.fetchone()
print(f"Connected to PostgreSQL database. Version: {version}")
executeWithPgConnection(printVersion)
psycopg3
The code example demonstrates the use of a connection pool with a rotating M2M OAuth. It uses generate_database_credential()
. For more on how to obtain the database instance and credentials programmatically, see Use the Databricks Python SDK to obtain an OAuth token.
%pip install "psycopg[binary,pool]"
from databricks.sdk import WorkspaceClient
import uuid
import psycopg
import string
from psycopg_pool import ConnectionPool
w = WorkspaceClient()
class CustomConnection(psycopg.Connection):
global w
def __init__(self, *args, **kwargs):
# Call the parent class constructor
super().__init__(*args, **kwargs)
@classmethod
def connect(cls, conninfo='', **kwargs):
# Append the new password to kwargs
instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[host])
kwargs['password'] = cred.token
# Call the superclass's connect method with updated kwargs
return super().connect(conninfo, **kwargs)
username = "<YOUR USER>"
instance_name = "<YOUR INSTANCE>"
instance = w.database.get_database_instance(name=instance_name)
host = instance.read_write_dns
port = 5432
database = "databricks_postgres"
pool = ConnectionPool(
conninfo=f"dbname={database} user={username} host={host}",
connection_class=CustomConnection,
min_size=1,
max_size=10,
open=True
)
with pool.connection() as conn:
with conn.cursor() as cursor:
cursor.execute("SELECT version()")
for record in cursor:
print(record)
SQLAlchemy
The code examples demonstrate a single connection and the use of a connection pool with a rotating M2M OAuth token. For more on how to obtain the database instance and credentials programmatically, see Use the Databricks Python SDK to obtain an OAuth token.
- Single connection
- Connection pool & rotating M2M OAuth
%pip install sqlalchemy==1.4 psycopg[binary]
from sqlalchemy import create_engine, text
from databricks.sdk import WorkspaceClient
import uuid
w = WorkspaceClient()
instance_name = "<YOUR INSTANCE>"
instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])
user = "<YOUR USER>"
host = instance.read_write_dns
port = 5432
database = "databricks_postgres"
password = cred.token
connection_pool = create_engine(f"postgresql://{user}:{password}@{host}:{port}/{database}?sslmode=require")
with connection_pool.connect() as conn:
result = conn.execute(text("SELECT version()"))
for row in result:
print(f"Connected to PostgreSQL database. Version: {row}")
%pip install sqlalchemy==1.4 psycopg[binary]
from databricks.sdk import WorkspaceClient
import uuid
import time
from sqlalchemy import create_engine, text, event
w = WorkspaceClient()
instance_name = "<YOUR INSTANCE>"
instance = w.database.get_database_instance(name=instance_name)
username = "<YOUR USER>"
host = instance.read_write_dns
port = 5432
database = "databricks_postgres"
# sqlalchemy setup + function to refresh the OAuth token that is used as the Postgres password every 15 minutes.
connection_pool = create_engine(f"postgresql+psycopg2://{username}:@{host}:{port}/{database}")
postgres_password = None
last_password_refresh = time.time()
@event.listens_for(connection_pool, "do_connect")
def provide_token(dialect, conn_rec, cargs, cparams):
global postgres_password, last_password_refresh, host
if postgres_password is None or time.time() - last_password_refresh > 900:
print("Refreshing PostgreSQL OAuth token")
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[host])
postgres_password = cred.token
last_password_refresh = time.time()
cparams["password"] = postgres_password
with connection_pool.connect() as conn:
result = conn.execute(text("SELECT version()"))
for row in result:
print(f"Connected to PostgreSQL database. Version: {row}")
Scala
The code examples show how to programmatically obtain the database instance and credentials, and how to connect to a database instance using a single connection or a connection pool.
Step 1: Use the Databricks Java SDK to obtain an OAuth token
Database SDK bindings are available in Java SDK version v0.53.0 or newer. If you are running with an older version of the SDK, you might need to refresh the imported SDK. For more information, see here.
%scala
import com.databricks.sdk.WorkspaceClient
import com.databricks.sdk.service.database.GetDatabaseInstanceRequest
import com.databricks.sdk.service.database.GenerateDatabaseCredentialRequest
import com.databricks.sdk.service.database.DatabaseInstance
import com.databricks.sdk.service.database.DatabaseCredential
import java.util.Collections
import java.util.UUID
val w = new WorkspaceClient()
val instanceName = "<YOUR INSTANCE>"
val databaseName = "databricks_postgres"
val userName = "<YOUR USER>"
// Get database instance
val instance = w.database().getDatabaseInstance(
new GetDatabaseInstanceRequest().setName(instanceName)
);
// Generate database credential
val cred = w.database().generateDatabaseCredential(
new GenerateDatabaseCredentialRequest()
.setRequestId(UUID.randomUUID().toString())
.setInstanceNames(Collections.singletonList(instanceName))
);
// Print out credential details
System.out.println("Database instance: " + instance.getName());
System.out.println("Credential: " + cred.getToken());
Step 2: Connect to a database instance
- Single connection
- Connection pool
import java.sql.{Connection, DriverManager, ResultSet, Statement}
Class.forName("org.postgresql.Driver")
val user = "<YOUR USER>"
val host = instance.getName()
val port = "5432"
val database = "databricks_postgres"
val password = cred.getToken()
val url = f"jdbc:postgresql://${host}:${port}/${database}"
val connection = DriverManager.getConnection(url, user, password)
println("Connected to PostgreSQL database!")
val statement = connection.createStatement()
val resultSet = statement.executeQuery("SELECT version()")
if (resultSet.next()) {
val version = resultSet.getString(1)
println(s"PostgreSQL version: $version")
}
import com.zaxxer.hikari.{HikariConfig, HikariDataSource}
import java.sql.Connection
// Configure HikariCP
val config = new HikariConfig()
config.setJdbcUrl("jdbc:postgresql://instance.getName():5432/databricks_postgres")
config.setUsername("<YOUR USER>")
config.setPassword(cred.getToken())
config.setMaximumPoolSize(10)
// Create a data source
val dataSource = new HikariDataSource(config)
// Function to get a connection and execute a query
def runQuery(): Unit = {
var connection: Connection = null
try {
// Get a connection from the pool
connection = dataSource.getConnection()
// Create a statement
val statement = connection.createStatement()
// Execute a query
val resultSet = statement.executeQuery("SELECT version() AS v;")
// Process the result set
while (resultSet.next()) {
val v = resultSet.getString("v")
println(s"*******Connected to PostgreSQL database. Version: $v")
}
} catch {
case e: Exception => e.printStackTrace()
} finally {
// Close the connection which returns it to the pool
if (connection != null) connection.close()
}
}
// Run the query
runQuery()
// Close the data source
dataSource.close()