ノートブックを使用してデータベースインスタンスにアクセスする
プレビュー
この機能は、us-east-1
、us-west-2
、eu-west-1
、ap-southeast-1
、ap-southeast-2
、eu-central-1
、us-east-2
、ap-south-1
の各地域でパブリック プレビュー段階です。
このページには、 Databricks ノートブック を使用して Lakebase データベースインスタンスにアクセスし、Python と Scala を使用してクエリを実行する方法を示すコード例が含まれています。
これらの例では、さまざまなユースケースに合わせてさまざまな接続戦略を取り上げています。
- 単一接続 : 単一のデータベース接続が開かれ、使用され、閉じられる単純なスクリプトに使用されます。
- 接続プール : 再利用可能な接続のプールが維持される、同時実行性の高いワークロードに使用されます。
- M2M OAuth トークンのローテーション : 認証に、自動的に更新される短命の OAuth トークンを使用します。
次の例では、セキュリティで保護された資格情報をプログラムで生成します。ノートブックに資格情報を直接入力することは避けてください。Databricks では、次のいずれかの安全な方法を使用することをお勧めします。
- Postgres のパスワードを Databricks シークレットに格納します。
- M2M OAuth を使用して OAuth トークンを生成します。
始める前に
データベース・インスタンスにアクセスする前に、次の要件を満たしていることを確認してください。
- データベースインスタンスにログインするための対応するPostgresロールがあります。Databricks ID の Postgres ロールを作成および管理するを参照してください。
- Postgres ロールには、データベース、スキーマ、またはテーブルにアクセスするために必要なアクセス許可が付与されます。
- データベース・インスタンスに対して認証できます。データベース・インスタンスの OAuth トークンを手動で取得する必要がある場合は、 データベース・インスタンスへの認証を参照してください。
Python
Databricks Python SDK を使用して、それぞれのデータベース インスタンスの OAuth トークンを取得できます。
次の Python ライブラリを使用して、Databricks ノートブックからデータベース インスタンスに接続します。
psycopg2
psycopg3
SQLAlchemy
Databricks Python SDK を使用して OAuth トークンを取得する
Databricks SDK バインディングは、Python SDK バージョン v0.56.0 で使用できます。古いバージョンの SDK で実行している場合は、最初にこれらのコマンドを実行します。
%pip install --upgrade databricks-sdk
%restart_python
このDatabricks SDKは、データベース・インスタンスのcred
、instance
のセキュアOAuthトークンを生成します。必要に応じて、データベースインスタンス名を入力します。
from databricks.sdk import WorkspaceClient
import uuid
w = WorkspaceClient()
instance_name = "<YOUR INSTANCE>"
instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])
psycopg2
これらのコード例では、1 つの接続と接続プールの使用方法を示します。データベース インスタンスと資格情報をプログラムで取得する方法の詳細については、「 Databricks Python SDK を使用して OAuth トークンを取得する」を参照してください。
- Single connection
- Connection pool
import psycopg2
from databricks.sdk import WorkspaceClient
import uuid
w = WorkspaceClient()
instance_name = "<YOUR INSTANCE>"
instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])
# Connection parameters
conn = psycopg2.connect(
host = instance.read_write_dns,
dbname = "databricks_postgres",
user = "<YOUR USER>",
password = cred.token,
sslmode = "require"
)
# Execute query
with conn.cursor() as cur:
cur.execute("SELECT version()")
version = cur.fetchone()[0]
print(version)
conn.close()
import psycopg2
from psycopg2 import sql, pool
from pyspark.sql.functions import udf
from databricks.sdk import WorkspaceClient
import uuid
w = WorkspaceClient()
instance_name = "<YOUR INSTANCE>"
instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])
# Create a connection pool
connection_pool = psycopg2.pool.ThreadedConnectionPool(
minconn = 1, # Minimum number of connections in the pool
maxconn = 10, # Maximum number of connections in the pool
user = "<YOUR USER>",
password = cred.token,
host = instance.read_write_dns,
port = '5432',
database = 'databricks_postgres'
)
if connection_pool:
print("Connection pool created successfully")
def executeWithPgConnection(execFn):
connection = None
try:
# Get a connection from the pool
connection = connection_pool.getconn()
if connection:
print("Successfully received a connection from the pool")
execFn(connection)
finally:
# Release the connection back to the pool
if connection:
connection_pool.putconn(connection)
print("Connection returned to the pool")
def printVersion(connection):
cursor = connection.cursor()
cursor.execute("SELECT version()")
version = cursor.fetchone()
print(f"Connected to PostgreSQL database. Version: {version}")
executeWithPgConnection(printVersion)
psycopg3
このコード例は、ローテーション M2M OAuth で接続プールを使用する方法を示しています。generate_database_credential()
を使用します。データベース インスタンスと資格情報をプログラムで取得する方法の詳細については、「 Databricks Python SDK を使用して OAuth トークンを取得する」を参照してください。
%pip install "psycopg[binary,pool]"
from databricks.sdk import WorkspaceClient
import uuid
import psycopg
import string
from psycopg_pool import ConnectionPool
w = WorkspaceClient()
class CustomConnection(psycopg.Connection):
global w
def __init__(self, *args, **kwargs):
# Call the parent class constructor
super().__init__(*args, **kwargs)
@classmethod
def connect(cls, conninfo='', **kwargs):
# Append the new password to kwargs
instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[host])
kwargs['password'] = cred.token
# Call the superclass's connect method with updated kwargs
return super().connect(conninfo, **kwargs)
username = "<YOUR USER>"
instance_name = "<YOUR INSTANCE>"
instance = w.database.get_database_instance(name=instance_name)
host = instance.read_write_dns
port = 5432
database = "databricks_postgres"
pool = ConnectionPool(
conninfo=f"dbname={database} user={username} host={host}",
connection_class=CustomConnection,
min_size=1,
max_size=10,
open=True
)
with pool.connection() as conn:
with conn.cursor() as cursor:
cursor.execute("SELECT version()")
for record in cursor:
print(record)
SQLAlchemy
コード例は、単一の接続と、ローテーションされる M2M OAuth トークンを使用した接続プールの使用を示しています。データベース インスタンスと資格情報をプログラムで取得する方法の詳細については、「 Databricks Python SDK を使用して OAuth トークンを取得する」を参照してください。
- Single connection
- Connection pool & rotating M2M OAuth
%pip install sqlalchemy==1.4 psycopg[binary]
from sqlalchemy import create_engine, text
from databricks.sdk import WorkspaceClient
import uuid
w = WorkspaceClient()
instance_name = "<YOUR INSTANCE>"
instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])
user = "<YOUR USER>"
host = instance.read_write_dns
port = 5432
database = "databricks_postgres"
password = cred.token
connection_pool = create_engine(f"postgresql://{user}:{password}@{host}:{port}/{database}?sslmode=require")
with connection_pool.connect() as conn:
result = conn.execute(text("SELECT version()"))
for row in result:
print(f"Connected to PostgreSQL database. Version: {row}")
%pip install sqlalchemy==1.4 psycopg[binary]
from databricks.sdk import WorkspaceClient
import uuid
import time
from sqlalchemy import create_engine, text, event
w = WorkspaceClient()
instance_name = "<YOUR INSTANCE>"
instance = w.database.get_database_instance(name=instance_name)
username = "<YOUR USER>"
host = instance.read_write_dns
port = 5432
database = "databricks_postgres"
# sqlalchemy setup + function to refresh the OAuth token that is used as the Postgres password every 15 minutes.
connection_pool = create_engine(f"postgresql+psycopg2://{username}:@{host}:{port}/{database}")
postgres_password = None
last_password_refresh = time.time()
@event.listens_for(connection_pool, "do_connect")
def provide_token(dialect, conn_rec, cargs, cparams):
global postgres_password, last_password_refresh, host
if postgres_password is None or time.time() - last_password_refresh > 900:
print("Refreshing PostgreSQL OAuth token")
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[host])
postgres_password = cred.token
last_password_refresh = time.time()
cparams["password"] = postgres_password
with connection_pool.connect() as conn:
result = conn.execute(text("SELECT version()"))
for row in result:
print(f"Connected to PostgreSQL database. Version: {row}")
Scala
これらのコード例は、データベース・インスタンスと認証情報をプログラムで取得する方法と、1 つの接続または接続プールを使用してデータベース・インスタンスに接続する方法を示しています。
手順 1: Databricks Java SDK を使用して OAuth トークンを取得する
データベース SDK バインディングは、Java SDK バージョン v0.53.0 以降で使用できます。古いバージョンの SDK で実行している場合は、インポートした SDK の更新が必要になることがあります。詳細については、 こちらを参照してください。
%scala
import com.databricks.sdk.WorkspaceClient
import com.databricks.sdk.service.database.GetDatabaseInstanceRequest
import com.databricks.sdk.service.database.GenerateDatabaseCredentialRequest
import com.databricks.sdk.service.database.DatabaseInstance
import com.databricks.sdk.service.database.DatabaseCredential
import java.util.Collections
import java.util.UUID
val w = new WorkspaceClient()
val instanceName = "<YOUR INSTANCE>"
val databaseName = "databricks_postgres"
val userName = "<YOUR USER>"
// Get database instance
val instance = w.database().getDatabaseInstance(
new GetDatabaseInstanceRequest().setName(instanceName)
);
// Generate database credential
val cred = w.database().generateDatabaseCredential(
new GenerateDatabaseCredentialRequest()
.setRequestId(UUID.randomUUID().toString())
.setInstanceNames(Collections.singletonList(instanceName))
);
// Print out credential details
System.out.println("Database instance: " + instance.getName());
System.out.println("Credential: " + cred.getToken());
ステップ 2: データベースインスタンスに接続する
- Single connection
- Connection pool
import java.sql.{Connection, DriverManager, ResultSet, Statement}
Class.forName("org.postgresql.Driver")
val user = "<YOUR USER>"
val host = instance.getName()
val port = "5432"
val database = "databricks_postgres"
val password = cred.getToken()
val url = f"jdbc:postgresql://${host}:${port}/${database}"
val connection = DriverManager.getConnection(url, user, password)
println("Connected to PostgreSQL database!")
val statement = connection.createStatement()
val resultSet = statement.executeQuery("SELECT version()")
if (resultSet.next()) {
val version = resultSet.getString(1)
println(s"PostgreSQL version: $version")
}
import com.zaxxer.hikari.{HikariConfig, HikariDataSource}
import java.sql.Connection
// Configure HikariCP
val config = new HikariConfig()
config.setJdbcUrl("jdbc:postgresql://instance.getName():5432/databricks_postgres")
config.setUsername("<YOUR USER>")
config.setPassword(cred.getToken())
config.setMaximumPoolSize(10)
// Create a data source
val dataSource = new HikariDataSource(config)
// Function to get a connection and execute a query
def runQuery(): Unit = {
var connection: Connection = null
try {
// Get a connection from the pool
connection = dataSource.getConnection()
// Create a statement
val statement = connection.createStatement()
// Execute a query
val resultSet = statement.executeQuery("SELECT version() AS v;")
// Process the result set
while (resultSet.next()) {
val v = resultSet.getString("v")
println(s"*******Connected to PostgreSQL database. Version: $v")
}
} catch {
case e: Exception => e.printStackTrace()
} finally {
// Close the connection which returns it to the pool
if (connection != null) connection.close()
}
}
// Run the query
runQuery()
// Close the data source
dataSource.close()