メインコンテンツまでスキップ

ノートブックを使用してデータベース・インスタンスにアクセスする

備考

プレビュー

この機能は、us-east-1us-west-2eu-west-1ap-southeast-1ap-southeast-2eu-central-1us-east-2ap-south-1のリージョンでパブリック プレビューとして提供されています。

このページには、 Databricks ノートブック を介して Lakebase データベース インスタンスにアクセスし、Python と Scala を使用してクエリを実行する方法を示すコード例が含まれています。

これらの例では、さまざまなユースケースに合わせてさまざまな接続戦略について説明します。

  • 単一接続 : 単一のデータベース接続を開いたり、使用したり、閉じたりする単純なスクリプトに使用されます。
  • 接続プール : 再利用可能な接続のプールが維持される、同時実行性の高いワークロードに使用されます。
  • M2M OAuth トークンのローテーション : 認証には、有効期間が短く、自動的に更新された OAuth トークンを使用します。

次の例では、セキュアな資格情報をプログラムで生成します。資格情報をノートブックに直接配置することは避けてください。Databricks では、次のいずれかのセキュリティで保護された方法を使用することをお勧めします。

  • Postgres パスワードを Databricks シークレットに格納します。
  • M2M OAuth を使用して OAuth トークンを生成します。

始める前に

データベース・インスタンスにアクセスする前に、次の要件を満たしていることを確認してください。

  • データベース・インスタンスにログインするための対応するPostgresロールがあります。Postgresロールの管理を参照してください。
  • Postgres ロールには、データベース、スキーマ、またはテーブルにアクセスするために必要なアクセス許可が付与されます。
  • データベース・インスタンスに対して認証できます。データベースインスタンスの OAuth トークンを手動で取得する必要がある場合は、 データベースインスタンスへの認証を参照してください。

Python

Databricks Python SDK を使用して、それぞれのデータベース インスタンスの OAuth トークンを取得できます。

次の Python ライブラリを使用して、Databricks ノートブックからデータベース インスタンスに接続します。

  • psycopg2
  • psycopg3
  • SQLAlchemy

psycopg2

コード例は、単一の接続と接続プールの使用を示しています。データベースインスタンスと認証情報をプログラムで取得する方法の詳細については、「 Python SDK を使用して OAuth トークンを取得する方法」を参照してください。

Python
import psycopg2

from databricks.sdk import WorkspaceClient
import uuid

w = WorkspaceClient()

instance_name = "<YOUR INSTANCE>"

instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])

# Connection parameters
conn = psycopg2.connect(
host = instance.read_write_dns,
dbname = "databricks_postgres",
user = "<YOUR USER>",
password = cred.token,
sslmode = "require"
)

# Execute query
with conn.cursor() as cur:
cur.execute("SELECT version()")
version = cur.fetchone()[0]
print(version)
conn.close()

psycopg3

このコード例は、回転する M2M OAuth を使用した接続プールの使用を示しています。generate_database_credential()を使用します。データベースインスタンスと認証情報をプログラムで取得する方法の詳細については、「 Python SDK を使用して OAuth トークンを取得する方法」を参照してください。

Python
%pip install "psycopg[binary,pool]"
Python
from databricks.sdk import WorkspaceClient
import uuid

import psycopg
import string
from psycopg_pool import ConnectionPool

w = WorkspaceClient()

class CustomConnection(psycopg.Connection):
global w
def __init__(self, *args, **kwargs):
# Call the parent class constructor
super().__init__(*args, **kwargs)

@classmethod
def connect(cls, conninfo='', **kwargs):
# Append the new password to kwargs
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])
kwargs['password'] = cred.token

# Call the superclass's connect method with updated kwargs
return super().connect(conninfo, **kwargs)


username = "<YOUR USER>"
instance_name = "<YOUR INSTANCE>"
instance = w.database.get_database_instance(name=instance_name)
host = instance.read_write_dns
port = 5432
database = "databricks_postgres"

pool = ConnectionPool(
conninfo=f"dbname={database} user={username} host={host}",
connection_class=CustomConnection,
min_size=1,
max_size=10,
open=True
)

with pool.connection() as conn:
with conn.cursor() as cursor:
cursor.execute("SELECT version()")
for record in cursor:
print(record)

SQLAlchemy

コード例は、単一の接続と、ローテーション M2M OAuth トークンを使用した接続プールの使用を示しています。データベースインスタンスと認証情報をプログラムで取得する方法の詳細については、「 Python SDK を使用して OAuth トークンを取得する方法」を参照してください。

Python
%pip install sqlalchemy==1.4 psycopg[binary]
Python
from sqlalchemy import create_engine, text

from databricks.sdk import WorkspaceClient
import uuid

w = WorkspaceClient()

instance_name = "<YOUR INSTANCE>"

instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])

user = "<YOUR USER>"
host = instance.read_write_dns
port = 5432
database = "databricks_postgres"
password = cred.token

connection_pool = create_engine(f"postgresql://{user}:{password}@{host}:{port}/{database}?sslmode=require")

with connection_pool.connect() as conn:
result = conn.execute(text("SELECT version()"))
for row in result:
print(f"Connected to PostgreSQL database. Version: {row}")

Scala

コード例は、データベース インスタンスと資格情報をプログラムで取得する方法と、単一接続または接続プールを使用してデータベース インスタンスに接続する方法を示しています。

ステップ 1: Databricks Java SDK を使用して OAuth トークンを取得する

データベースインスタンスと資格情報をプログラムで取得する方法の詳細については、 Java SDKを使用してOAuthトークンを取得する方法を参照してください。

ステップ 2: データベースインスタンスに接続する

Scala
import java.sql.{Connection, DriverManager, ResultSet, Statement}

Class.forName("org.postgresql.Driver")

val user = "<YOUR USER>"
val host = instance.getName()
val port = "5432"
val database = "databricks_postgres"
val password = cred.getToken()

val url = f"jdbc:postgresql://${host}:${port}/${database}"

val connection = DriverManager.getConnection(url, user, password)
println("Connected to PostgreSQL database!")

val statement = connection.createStatement()
val resultSet = statement.executeQuery("SELECT version()")

if (resultSet.next()) {
val version = resultSet.getString(1)
println(s"PostgreSQL version: $version")
}