メインコンテンツまでスキップ

ノートブックを使用してデータベースインスタンスにアクセスする

備考

プレビュー

この機能は、us-east-1us-west-2eu-west-1ap-southeast-1ap-southeast-2eu-central-1us-east-2ap-south-1の各地域でパブリック プレビュー段階です。

このページには、 Databricks ノートブック を使用して Lakebase データベースインスタンスにアクセスし、Python と Scala を使用してクエリを実行する方法を示すコード例が含まれています。

これらの例では、さまざまなユースケースに合わせてさまざまな接続戦略を取り上げています。

  • 単一接続 : 単一のデータベース接続が開かれ、使用され、閉じられる単純なスクリプトに使用されます。
  • 接続プール : 再利用可能な接続のプールが維持される、同時実行性の高いワークロードに使用されます。
  • M2M OAuth トークンのローテーション : 認証に、自動的に更新される短命の OAuth トークンを使用します。

次の例では、セキュリティで保護された資格情報をプログラムで生成します。ノートブックに資格情報を直接入力することは避けてください。Databricks では、次のいずれかの安全な方法を使用することをお勧めします。

  • Postgres のパスワードを Databricks シークレットに格納します。
  • M2M OAuth を使用して OAuth トークンを生成します。

始める前に

データベース・インスタンスにアクセスする前に、次の要件を満たしていることを確認してください。

  • データベースインスタンスにログインするための対応するPostgresロールがあります。Databricks ID の Postgres ロールを作成および管理するを参照してください。
  • Postgres ロールには、データベース、スキーマ、またはテーブルにアクセスするために必要なアクセス許可が付与されます。
  • データベース・インスタンスに対して認証できます。データベース・インスタンスの OAuth トークンを手動で取得する必要がある場合は、 データベース・インスタンスへの認証を参照してください。

Python

Databricks Python SDK を使用して、それぞれのデータベース インスタンスの OAuth トークンを取得できます。

次の Python ライブラリを使用して、Databricks ノートブックからデータベース インスタンスに接続します。

  • psycopg2
  • psycopg3
  • SQLAlchemy

Databricks Python SDK を使用して OAuth トークンを取得する

Databricks SDK バインディングは、Python SDK バージョン v0.56.0 で使用できます。古いバージョンの SDK で実行している場合は、最初にこれらのコマンドを実行します。

Python
%pip install --upgrade databricks-sdk
%restart_python

このDatabricks SDKは、データベース・インスタンスのcredinstanceのセキュアOAuthトークンを生成します。必要に応じて、データベースインスタンス名を入力します。

Python
from databricks.sdk import WorkspaceClient
import uuid

w = WorkspaceClient()

instance_name = "<YOUR INSTANCE>"

instance = w.database.get_database_instance(name=instance_name)

cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])

psycopg2

これらのコード例では、1 つの接続と接続プールの使用方法を示します。データベース インスタンスと資格情報をプログラムで取得する方法の詳細については、「 Databricks Python SDK を使用して OAuth トークンを取得する」を参照してください。

Python
import psycopg2

from databricks.sdk import WorkspaceClient
import uuid

w = WorkspaceClient()

instance_name = "<YOUR INSTANCE>"

instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])

# Connection parameters
conn = psycopg2.connect(
host = instance.read_write_dns,
dbname = "databricks_postgres",
user = "<YOUR USER>",
password = cred.token,
sslmode = "require"
)

# Execute query
with conn.cursor() as cur:
cur.execute("SELECT version()")
version = cur.fetchone()[0]
print(version)
conn.close()

psycopg3

このコード例は、ローテーション M2M OAuth で接続プールを使用する方法を示しています。generate_database_credential()を使用します。データベース インスタンスと資格情報をプログラムで取得する方法の詳細については、「 Databricks Python SDK を使用して OAuth トークンを取得する」を参照してください。

Python
%pip install "psycopg[binary,pool]"
Python
from databricks.sdk import WorkspaceClient
import uuid

import psycopg
import string
from psycopg_pool import ConnectionPool

w = WorkspaceClient()

class CustomConnection(psycopg.Connection):
global w
def __init__(self, *args, **kwargs):
# Call the parent class constructor
super().__init__(*args, **kwargs)

@classmethod
def connect(cls, conninfo='', **kwargs):
# Append the new password to kwargs
instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[host])
kwargs['password'] = cred.token

# Call the superclass's connect method with updated kwargs
return super().connect(conninfo, **kwargs)


username = "<YOUR USER>"
instance_name = "<YOUR INSTANCE>"
instance = w.database.get_database_instance(name=instance_name)
host = instance.read_write_dns
port = 5432
database = "databricks_postgres"

pool = ConnectionPool(
conninfo=f"dbname={database} user={username} host={host}",
connection_class=CustomConnection,
min_size=1,
max_size=10,
open=True
)

with pool.connection() as conn:
with conn.cursor() as cursor:
cursor.execute("SELECT version()")
for record in cursor:
print(record)

SQLAlchemy

コード例は、単一の接続と、ローテーションされる M2M OAuth トークンを使用した接続プールの使用を示しています。データベース インスタンスと資格情報をプログラムで取得する方法の詳細については、「 Databricks Python SDK を使用して OAuth トークンを取得する」を参照してください。

Python
%pip install sqlalchemy==1.4 psycopg[binary]
Python
from sqlalchemy import create_engine, text

from databricks.sdk import WorkspaceClient
import uuid

w = WorkspaceClient()

instance_name = "<YOUR INSTANCE>"

instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])

user = "<YOUR USER>"
host = instance.read_write_dns
port = 5432
database = "databricks_postgres"
password = cred.token

connection_pool = create_engine(f"postgresql://{user}:{password}@{host}:{port}/{database}?sslmode=require")

with connection_pool.connect() as conn:
result = conn.execute(text("SELECT version()"))
for row in result:
print(f"Connected to PostgreSQL database. Version: {row}")

Scala

これらのコード例は、データベース・インスタンスと認証情報をプログラムで取得する方法と、1 つの接続または接続プールを使用してデータベース・インスタンスに接続する方法を示しています。

手順 1: Databricks Java SDK を使用して OAuth トークンを取得する

データベース SDK バインディングは、Java SDK バージョン v0.53.0 以降で使用できます。古いバージョンの SDK で実行している場合は、インポートした SDK の更新が必要になることがあります。詳細については、 こちらを参照してください。

Scala
%scala

import com.databricks.sdk.WorkspaceClient
import com.databricks.sdk.service.database.GetDatabaseInstanceRequest
import com.databricks.sdk.service.database.GenerateDatabaseCredentialRequest
import com.databricks.sdk.service.database.DatabaseInstance
import com.databricks.sdk.service.database.DatabaseCredential
import java.util.Collections
import java.util.UUID

val w = new WorkspaceClient()

val instanceName = "<YOUR INSTANCE>"
val databaseName = "databricks_postgres"
val userName = "<YOUR USER>"

// Get database instance
val instance = w.database().getDatabaseInstance(
new GetDatabaseInstanceRequest().setName(instanceName)
);

// Generate database credential
val cred = w.database().generateDatabaseCredential(
new GenerateDatabaseCredentialRequest()
.setRequestId(UUID.randomUUID().toString())
.setInstanceNames(Collections.singletonList(instanceName))
);

// Print out credential details
System.out.println("Database instance: " + instance.getName());
System.out.println("Credential: " + cred.getToken());

ステップ 2: データベースインスタンスに接続する

Scala
import java.sql.{Connection, DriverManager, ResultSet, Statement}

Class.forName("org.postgresql.Driver")

val user = "<YOUR USER>"
val host = instance.getName()
val port = "5432"
val database = "databricks_postgres"
val password = cred.getToken()

val url = f"jdbc:postgresql://${host}:${port}/${database}"

val connection = DriverManager.getConnection(url, user, password)
println("Connected to PostgreSQL database!")

val statement = connection.createStatement()
val resultSet = statement.executeQuery("SELECT version()")

if (resultSet.next()) {
val version = resultSet.getString(1)
println(s"PostgreSQL version: $version")
}