Databricks SDK for Python
Databricks では、ジョブやその他の Databricks リソースをソース コードとして作成、開発、デプロイ、テストするために、Databricks アセット バンドルを推奨しています。 「Databricks アセットバンドルとは」を参照してください。
この記事では、 Databricks SDK for Python を使用して Databricks の運用を自動化し、開発を加速する方法について説明します。 この記事は、Read The Docs の Databricks SDK for Python ドキュメント と、GitHub の Databricks SDK for Python リポジトリの コード例 を補足するものです。
用のDatabricksSDK Pythonはベータ版 ですので、本番運用で使用しても問題ありません。
ベータ期間中は、 では、コードが依存する について、Databricks の特定のマイナー バージョンへの依存関係をピン留めすることをお勧めします。DatabricksSDKPythonたとえば、venv
の場合は [requirements.txt
] などのファイルに依存関係をピン留めしたり、Poetry の場合は pyproject.toml
と poetry.lock
をピン留めしたりできます。依存関係のピン留めの詳細については、 の「仮想環境とパッケージ 」または「 のvenv
依存関係のインストール Poetry」を参照してください。
始める前に
Databricks SDK for Pythonは、Databricksノートブック内で、あるいはローカルの開発マシンから使用できます。
- Databricks ノートブック内から Databricks SDK for Python を使用するには、「 Databricks ノートブックから Databricks SDK for Python を使用する」に進んでください。
- ローカルの開発マシンからDatabricks SDK for Pythonを使用するには、このセクションの手順を実行します。
Databricks SDK for Pythonの使用を開始する前に、開発マシンが以下の要件を満たしていることを確認してください。
- Databricks 認証 が構成されています。
- Python 3.8以降がインストールされていること。 Databricksコンピュート リソースを自動化するために、Databricks PythonDatabricksでは、ターゲット コンピュート リソースにインストールされているものと一致する のメジャー バージョンとマイナー バージョンをインストールすることをお勧めします。この記事の例では、Databricks Runtime 3.10 がインストールされている 13.3 を使用したクラスターの自動化について説明します。LTSPythonDatabricks Runtime正しいバージョンについては、「リリースノートのバージョン とクラスターのDatabricks Runtime バージョンの互換性」を参照してください。
- Databricks では、Databricks SDK for Python で使用する Python プロジェクトごとに Python 仮想環境 を作成してアクティブ化することをお勧めします。 Python 仮想環境は、コード プロジェクトが互換性のあるバージョンの Python と Python パッケージ (この場合は Databricks SDK for Python パッケージ) を使用していることを確認するのに役立ちます。 Python仮想環境の詳細については、venv または Poetryを参照してください。
Databricks SDK for Python の使用を開始する
このセクションでは、ローカル開発マシンから Databricks SDK for Python の使用を開始する方法について説明します。 Databricks ノートブック内から Databricks SDK for Python を使用するには、「 Databricks ノートブックから Databricks SDK for Python を使用する」に進んでください。
- Databricks認証が設定され、Pythonがすでにインストールされており、Python仮想環境が有効になっている開発マシン上で、Python Package Index (PyPI) から databricks-sdk パッケージ (とその依存関係) を以下のようにインストールします。
- Venv
- Poetry
Use pip
to install the databricks-sdk
package. (On some systems, you might need to replace pip3
with pip
, here and throughout.)
pip3 install databricks-sdk
poetry add databricks-sdk
Databricks SDK for Python がベータ版のときに特定のバージョンの databricks-sdk
パッケージをインストールするには、パッケージの リリース履歴を参照してください。 たとえば、バージョン 0.1.6
をインストールするには、次のようにします。
- Venv
- Poetry
pip3 install databricks-sdk==0.1.6
poetry add databricks-sdk==0.1.6
既存のDatabricks SDK for Pythonパッケージのインストールを最新バージョンにアップグレードするには、以下のコマンドを実行します。
- Venv
- Poetry
pip3 install --upgrade databricks-sdk
poetry add databricks-sdk@latest
Databricks SDK for Pythonパッケージの現在のVersion
およびその他の詳細を表示するには、以下のコマンドを実行します。
- Venv
- Poetry
pip3 show databricks-sdk
poetry show databricks-sdk
-
Pythonの仮想環境で、Databricks SDK for PythonをインポートするPython コードファイルを作成します。次の例では、以下の内容を含む
main.py
という名前のファイル内に、Databricksワークスペース内のすべてのクラスターが一覧表示されています。Pythonfrom databricks.sdk import WorkspaceClient
w = WorkspaceClient()
for c in w.clusters.list():
print(c.cluster_name) -
python
コマンドを実行して、ファイル名がmain.py
であると仮定してPythonコードファイルを実行します。
- Venv
- Poetry
python3.10 main.py
If you are in the virtual environment’s shell:
python3.10 main.py
If you are not in the virtual environment’s shell:
poetry run python3.10 main.py
前の呼び出しで引数をw = WorkspaceClient()
に設定しないことで、 のDatabricksSDK は、Python 認証の実行にデフォルトDatabricks プロセスを使用します。このデフォルトの動作を上書きするには、次の 認証 セクションを参照してください。
Databricks アカウントまたはワークスペースで Databricks SDK for Python を認証します
このセクションでは、ローカルの開発マシンから Databricks アカウントまたはワークスペースに Databricks SDK for Python を認証する方法について説明します。 Databricks ノートブック内から Databricks SDK for Python を認証するには、「 Databricks ノートブックから Databricks SDK for Python を使用する」に進んでください。
Databricks SDK for Python は、 Databricks クライアント統合認証 標準を実装しており、認証に対する統合された一貫性のあるアーキテクチャおよびプログラムによるアプローチです。 このアプローチにより、Databricks による認証の設定と自動化をより一元化し、予測可能にすることができます。 これにより、Databricks 認証を一度構成すると、認証構成をさらに変更することなく、その構成を複数の Databricks ツールや SDK で使用できます。 詳細については、Pythonのより詳細なコード例を含む、「クライアント統合認証Databricks」を参照してください。
Databricks SDK for Pythonを使用してDatabricks認証を初期化するために使用できるコーディングのパターンには、以下のような例が挙げられます。
-
以下のいずれかを実行して、Databricksのデフォルト認証を使用します。
- ターゲット Databricks 認証の種類に必要なフィールドを持つカスタム Databricks 構成プロファイル を作成または識別します。 次に、
DATABRICKS_CONFIG_PROFILE
環境変数をカスタム構成プロファイルの名前に設定します。 - ターゲットのDatabricks認証タイプに必要とされる環境変数を設定します。
次に、たとえば次のようにDatabricksのデフォルト認証で
WorkspaceClient
オブジェクトをインスタンス化します。Pythonfrom databricks.sdk import WorkspaceClient
w = WorkspaceClient()
# ... - ターゲット Databricks 認証の種類に必要なフィールドを持つカスタム Databricks 構成プロファイル を作成または識別します。 次に、
-
必須フィールドのハードコーディングはサポートされていますが、Databricksパーソナルアクセストークンなどのコード内の機密情報が公開される危険があるため推奨はされません。以下の例では、Databricksトークン認証用にDatabricksホストとアクセストークンの値がハードコーディングされています。
Pythonfrom databricks.sdk import WorkspaceClient
w = WorkspaceClient(
host = 'https://...',
token = '...'
)
# ...
Databricks SDK for Pythonのドキュメントの「認証」も参照してください。
Databricks ノートブックから Databricks SDK for Python を使用する
DatabricksSDKPythonDatabricksDatabricksfor 機能は、DatabricksSDK の がインストールされた クラスターがアタッチされた ノートブックから呼び出すことができます。Pythonこれは、Databricks Databricks Runtime13.3 以上を使用するすべての クラスターにデフォルトによってインストールされます。LTSDatabricksDatabricks Runtime12.2LTS 以前を使用する クラスターの場合は、最初に 12.2DatabricksSDK のPython をインストールする必要があります。「手順 1: Databricks SDK for Python をインストールまたはアップグレードする」を参照してください。
DatabricksSDKPython特定のDatabricks Runtime バージョンにインストールされている バージョンの を確認するには、そのバージョンのDatabricks Runtime リリースノート の 「インストール済み Pythonライブラリ 」セクションを参照してください。
Databricksでは、利用可能な最新バージョンのSDK をPiPy からインストールすることをお勧めしますが、 DatabricksSDKPython0.6.0 以降では、すべての バージョンでデフォルトDatabricks ノートブック認証が使用されるため、少なくとも 0.6.0Databricks Runtime 以降をインストールするか、アップグレードすることをお勧めします。
15.1 は、アップグレード不要のデフォルトの 認証をサポートする for the (0.20.0)Databricks Runtime のバージョンをインストールした最初の です。Databricks RuntimeDatabricksSDKPython
次の表は、Databricks SDK for Python バージョンと Databricks Runtime バージョンのノートブック認証サポートの概要を示しています。
SDK/DBRの | 10.4 LTS | 11.3 LTS | 12.3 LTSの | 13.3 LTS | 14.3 LTSの | 15.1 以上 |
---|---|---|---|---|---|---|
0.1.7 およびそれ以下 | ||||||
0.1.10 | ✓ | ✓ | ✓ | ✓ | ✓ | |
0.6.0 | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ |
0.20.0以上は | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ |
デフォルトのDatabricksノートブック認証は、Databricksがバックグラウンドで自動的に生成する一時的なDatabricks個人用アクセストークンに依存しています。Databricksはノートブックの実行を停止した後、この一時トークンを削除します。
- デフォルトのDatabricksノートブック認証はクラスターのドライバーノードでのみ動作し、クラスターのワーカーノードやエグゼキューターノードでは動作しません。
- Databricksノートブック認証はDatabricks構成プロファイルでは機能しません。
DatabricksアカウントレベルのAPIs Databricksを呼び出す場合、またはデフォルトDatabricks ノートブック 認証以外の 認証タイプを使用する場合は、次の認証タイプもサポートされています。
認証タイプ | Databricks SDK for Pythonバージョン |
---|---|
0.19.0以上 | |
0.19.0以上 | |
0.14.0以上 | |
0.14.0以上 | |
すべてのバージョン |
手順 1: Databricks SDK for Python をインストールまたはアップグレードする
-
Databricks Pythonノートブックでは、他のPythonライブラリと同様にDatabricks SDK for Pythonを使用できます。接続されているDatabricksクラスタにDatabricks SDK for Pythonライブラリをインストールまたはアップグレードするには、ノートブックセルから
%pip
マジックコマンドを次のように実行します。Python%pip install databricks-sdk --upgrade
-
%pip
マジックコマンドを実行した後、インストールもしくはアップグレードしたライブラリをノートブックで使用できるようにするために、Pythonを再起動する必要があります。これを行うには、%pip
のマジックコマンドのあるセルの直後のノートブックセルから以下のコマンドを実行します。Pythondbutils.library.restartPython()
-
インストールされているDatabricks SDK for Pythonのバージョンを表示するには、ノートブックセルから次のコマンドを実行します。
Python%pip show databricks-sdk | grep -oP '(?<=Version: )\S+'
ステップ 2: コードを実行する
ノートブックのセルで、Databricks SDK for Pythonをインポートして呼び出すPythonコードを作成します。以下の例では、デフォルトのDatabricksノートブック認証を使用して、Databricksワークスペース内のクラスターを一覧表示します。
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
for c in w.clusters.list():
print(c.cluster_name)
このセルを実行すると、Databricksワークスペースで使用可能なすべてのクラスターの名前が一覧表示されます。
別の Databricks 認証の種類を使用するには、「 Databricks の認証方法 」を参照し、対応するリンクをクリックして追加の技術的な詳細を確認してください。
Databricks ユーティリティを使用する
の Databricksユーティリティ を使用して、ローカルの開発マシンまたは DatabricksSDKPythonDatabricksノートブック内からコードを実行する できます。
- ローカルの開発マシンからDatabricksユーティリティがアクセスできるものは、
dbutils.fs
、dbutils.secrets
、dbutils.widgets
、およびdbutils.jobs
のコマンドグループのみです。 - Databricks クラスターに接続されている Databricks ノートブックから、Databricks ユーティリティは使用可能なすべての Databricks ユーティリティ コマンド グループにアクセスできますが、
dbutils.notebook
コマンド グループは、dbutils.notebook.run
やdbutils.notebook.exit
などの 2 つのレベルのコマンドのみに制限されます。
ローカルの開発マシンまたはDatabricksノートブックからDatabricksユーティリティを呼び出すには、WorkspaceClient
内で dbutils
を使用します。このコード例では、デフォルトのDatabricksノートブック認証を使用してWorkspaceClient
内のdbutils
を呼び出し、ワークスペースのDBFSルートにあるすべてのオブジェクトのパスを一覧表示します。
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
d = w.dbutils.fs.ls('/')
for f in d:
print(f.path)
または、dbutils
を直接呼び出すこともできます。ただし、使用できるのはデフォルトのDatabricksノートブック認証のみに限定されます。このコード例では、dbutils
を直接呼び出して、ワークスペースのDBFSルートにあるすべてのオブジェクトを一覧表示します。
from databricks.sdk.runtime import *
d = dbutils.fs.ls('/')
for f in d:
print(f.path)
Unity Catalog ボリュームにアクセスするには、WorkspaceClient
内の files
を使用します。「Unity Catalog ボリューム内のファイルの管理」を参照してください。dbutils
を単独で使用したり、WorkspaceClient
内でボリュームにアクセスしたりすることはできません。
dbutilsの操作も参照してください。
コード例
次のコード例は、DatabricksSDK のPython を使用して、クラスター、実行ジョブ、およびアカウント レベルのグループの作成と削除を行う方法を示しています。これらのコード例では、デフォルトの Databricks ノートブック認証を使用しています。 デフォルトDatabricks ノートブック認証の詳細については、「ノートブックからの DatabricksSDKPythonに を使用するDatabricks 」を参照してください。ノートブックの外部でのデフォルト認証の詳細については、「アカウントまたはワークスペースで DatabricksSDKPythonの を認証Databricks する」を参照してください。
その他のコード例については、GitHubのDatabricks SDK for Pythonリポジトリのexamplesを参照してください。関連項目は次を参照してください。
クラスターを作成する
このコード例では、指定されたDatabricks Runtimeのバージョンとクラスターのノードタイプを用いてクラスターを作成します。このクラスターは1つのワーカーを持ち、クラスターのアイドル状態が15分経過すると自動的に終了します。
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
print("Attempting to create the cluster. Please wait...")
c = w.clusters.create_and_wait(
cluster_name = 'my-cluster',
spark_version = '12.2.x-scala2.12',
node_type_id = 'n2-highmem-4',
autotermination_minutes = 15,
num_workers = 1
)
print(f"View the cluster at " \
f"{w.config.host}#setting/clusters/{c.cluster_id}/configuration\n")
クラスターを完全に削除する
このコード例では、指定されたクラスターIDを持つクラスターをワークスペースから完全に削除します。
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
c_id = input('ID of cluster to delete (for example, 1234-567890-ab123cd4): ')
w.clusters.permanent_delete(cluster_id = c_id)
ジョブを作成する
このコード例では、指定されたクラスター上で特定のノートブックを実行するDatabricksジョブを作成します。コードが実行されると、既存のノートブックのパス、既存のクラスターID、および関連するジョブ設定がターミナルのユーザーから取得されます。
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.jobs import Task, NotebookTask, Source
w = WorkspaceClient()
job_name = input("Some short name for the job (for example, my-job): ")
description = input("Some short description for the job (for example, My job): ")
existing_cluster_id = input("ID of the existing cluster in the workspace to run the job on (for example, 1234-567890-ab123cd4): ")
notebook_path = input("Workspace path of the notebook to run (for example, /Users/someone@example.com/my-notebook): ")
task_key = input("Some key to apply to the job's tasks (for example, my-key): ")
print("Attempting to create the job. Please wait...\n")
j = w.jobs.create(
name = job_name,
tasks = [
Task(
description = description,
existing_cluster_id = existing_cluster_id,
notebook_task = NotebookTask(
base_parameters = dict(""),
notebook_path = notebook_path,
source = Source("WORKSPACE")
),
task_key = task_key
)
]
)
print(f"View the job at {w.config.host}/#job/{j.job_id}\n")
Unity Catalog ボリューム内のファイルを管理する
このコード例では、Unity Catalog ボリュームにアクセスするために WorkspaceClient
内の files
機能に対するさまざまな呼び出しを示します。
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
# Define volume, folder, and file details.
catalog = 'main'
schema = 'default'
volume = 'my-volume'
volume_path = f"/Volumes/{catalog}/{schema}/{volume}" # /Volumes/main/default/my-volume
volume_folder = 'my-folder'
volume_folder_path = f"{volume_path}/{volume_folder}" # /Volumes/main/default/my-volume/my-folder
volume_file = 'data.csv'
volume_file_path = f"{volume_folder_path}/{volume_file}" # /Volumes/main/default/my-volume/my-folder/data.csv
upload_file_path = './data.csv'
# Create an empty folder in a volume.
w.files.create_directory(volume_folder_path)
# Upload a file to a volume.
with open(upload_file_path, 'rb') as file:
file_bytes = file.read()
binary_data = io.BytesIO(file_bytes)
w.files.upload(volume_file_path, binary_data, overwrite = True)
# List the contents of a volume.
for item in w.files.list_directory_contents(volume_path):
print(item.path)
# List the contents of a folder in a volume.
for item in w.files.list_directory_contents(volume_folder_path):
print(item.path)
# Print the contents of a file in a volume.
resp = w.files.download(volume_file_path)
print(str(resp.contents.read(), encoding='utf-8'))
# Delete a file from a volume.
w.files.delete(volume_file_path)
# Delete a folder from a volume.
w.files.delete_directory(volume_folder_path)
アカウントレベルのグループを一覧表示する
このコード例では、Databricksアカウント内で使用可能なすべてのグループの表示名を一覧表示します。
from databricks.sdk import AccountClient
a = AccountClient()
for g in a.groups.list():
print(g.display_name)
テスティング
コードをテストするには、pytestのようなPythonのテストフレームワークを使います。Databricks REST APIエンドポイントを呼び出したり、Databricksアカウントやワークスペースの状態を変更することなく、シミュレートされた条件下でコードをテストするには、unittest.mockなどのPythonモッキングライブラリを使用します。
Databricks Labs には、Databricks との統合テストを簡略化するための pytest プラグイン と、コードの品質を確保するための pylint プラグイン が用意されています。
次の helpers.py
という名前のサンプル ファイルには、新しいクラスターに関する情報を返す create_cluster
関数が含まれています。
# helpers.py
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.compute import ClusterDetails
def create_cluster(
w: WorkspaceClient,
cluster_name: str,
spark_version: str,
node_type_id: str,
autotermination_minutes: int,
num_workers: int
) -> ClusterDetails:
response = w.clusters.create(
cluster_name = cluster_name,
spark_version = spark_version,
node_type_id = node_type_id,
autotermination_minutes = autotermination_minutes,
num_workers = num_workers
)
return response
次の main.py
という名前のファイルが create_cluster
関数を呼び出すとします。
# main.py
from databricks.sdk import WorkspaceClient
from helpers import *
w = WorkspaceClient()
# Replace <spark-version> with the target Spark version string.
# Replace <node-type-id> with the target node type string.
response = create_cluster(
w = w,
cluster_name = 'Test Cluster',
spark_version = '<spark-version>',
node_type_id = '<node-type-id>',
autotermination_minutes = 15,
num_workers = 1
)
print(response.cluster_id)
次のtest_helpers.py
という名前のファイルはcreate_cluster
関数が期待された応答を返すかどうかをテストします。このテストでは、対象のワークスペースにクラスターを作成するのではなく、WorkspaceClient
オブジェクトをモックし、モックしたオブジェクトの設定を定義してから、create_cluster
関数にモックしたオブジェクトを渡します。次に、この関数が新しいモックされたクラスタのIDを返すかどうかをチェックします。
# test_helpers.py
from databricks.sdk import WorkspaceClient
from helpers import *
from unittest.mock import create_autospec # Included with the Python standard library.
def test_create_cluster():
# Create a mock WorkspaceClient.
mock_workspace_client = create_autospec(WorkspaceClient)
# Set the mock WorkspaceClient's clusters.create().cluster_id value.
mock_workspace_client.clusters.create.return_value.cluster_id = '123abc'
# Call the actual function but with the mock WorkspaceClient.
# Replace <spark-version> with the target Spark version string.
# Replace <node-type-id> with the target node type string.
response = create_cluster(
w = mock_workspace_client,
cluster_name = 'Test Cluster',
spark_version = '<spark-version>',
node_type_id = '<node-type-id>',
autotermination_minutes = 15,
num_workers = 1
)
# Assert that the function returned the mocked cluster ID.
assert response.cluster_id == '123abc'
このテストを実行するには、コードプロジェクトのルートからpytest
コマンドを実行します。
$ pytest
=================== test session starts ====================
platform darwin -- Python 3.12.2, pytest-8.1.1, pluggy-1.4.0
rootdir: <project-rootdir>
collected 1 item
test_helpers.py . [100%]
======================== 1 passed ==========================
追加のリソース
詳細については、以下を参照してください。
- Databricks SDK for Pythonのドキュメント
- その他のコード例
- DatabricksワークスペースのAPIリファレンス
- DatabricksアカウントのAPIリファレンス
- ロギング
- 長期にわたるオペレーション
- ページ分割されたレスポンス