Databricks SDK for R
この記事では、 Databricks SDK for R を使用して Databricks ワークスペースで Databricks 操作を自動化する方法について説明します。 この記事は、 Databricks SDK for R のドキュメントを補足するものです。
Databricks SDK for R は、Databricks アカウントでの操作の自動化をサポートしていません。 アカウント レベルの操作を呼び出すには、次のように別の Databricks SDK を使用します。
始める前に
Databricks SDK for R の使用を開始する前に、開発マシンに次のものが必要です。
- 自動化するターゲット Databricks ワークスペースの Databricks 個人用アクセス トークン 。
Databricks SDK for R は、Databricks パーソナル アクセス トークン認証のみをサポートします。
- R、およびオプションでR互換の統合開発環境(IDE)。 Databricks では RStudio Desktop が推奨されており、この記事の手順で使用されています。
Databricks SDK for R の使用を開始する
-
Databricks ワークスペースの URL と個人用アクセス トークンを R プロジェクトのスクリプトで使用できるようにします。 たとえば、R プロジェクトの
.Renviron
ファイルに次のものを追加できます。<your-workspace-url>
をワークスペース インスタンスの URL に置き換えます (例:https://1234567890123456.7.gcp.databricks.com
)。<your-personal-access-token>
を Databricks の個人用アクセス トークン (dapi12345678901234567890123456789012
など) に置き換えます。DATABRICKS_HOST=<your-workspace-url>
DATABRICKS_TOKEN=<your-personal-access-token>Databricks個人用アクセス トークンを作成するには、「ワークスペース ユーザー用の個人用アクセス トークンDatabricks」の手順に従います。
Databricks ワークスペース URL と個人用アクセス トークンを提供するその他の方法については、GitHub の Databricks SDK for R リポジトリでの 認証 に関するページを参照してください。
バージョン管理システムに .Renviron
ファイルを追加すると、Databricks の個人用アクセス トークンなどの機密情報が公開されるリスクがあるため、追加しないでください。
-
Databricks SDK for R パッケージをインストールします。 たとえば、RStudio Desktop の [コンソール ] ビュー ( [表示] > [フォーカスをコンソールに移動 ]) で、次のコマンドを一度に 1 つずつ実行します。
Rinstall.packages("devtools")
library(devtools)
install_github("databrickslabs/databricks-sdk-r")
Databricks SDK for R パッケージは CRAN では使用できません。
-
R の Databricks SDK を参照し、 Databricks ワークスペース内のすべてのクラスターを一覧表示するコードを追加します。 たとえば、プロジェクトの
main.r
ファイルでは、コードは次のようになります。Rrequire(databricks)
client <- DatabricksClient()
list_clusters(client)[, "cluster_name"] -
スクリプトを実行します。 たとえば、RStudio Desktop で、プロジェクトの
main.r
ファイルをアクティブにしたスクリプト エディタで、[ Source (ソース)] > [Source (ソース )] または [Source with Echo (エコー付きソース )] をクリックします。 -
クラスターのリストが表示されます。 たとえば、RStudio Desktop では、これは コンソール ビューにあります。
コード例
次のコード例は、R の Databricks SDK を使用してクラスターを作成および削除し、ジョブを作成する方法を示しています。
クラスターを作成する
このコード例では、指定した Databricks Runtime バージョンとクラスター ノード タイプでクラスターを作成します。 このクラスターには 1 人のワーカーがおり、クラスターはアイドル時間が 15 分経過すると自動的に終了します。
require(databricks)
client <- DatabricksClient()
response <- create_cluster(
client = client,
cluster_name = "my-cluster",
spark_version = "12.2.x-scala2.12",
node_type_id = "n2-highmem-4",
autotermination_minutes = 15,
num_workers = 1
)
# Get the workspace URL to be used in the following results message.
get_client_debug <- strsplit(client$debug_string(), split = "host=")
get_host <- strsplit(get_client_debug[[1]][2], split = ",")
host <- get_host[[1]][1]
# Make sure the workspace URL ends with a forward slash.
if (endsWith(host, "/")) {
} else {
host <- paste(host, "/", sep = "")
}
print(paste(
"View the cluster at ",
host,
"#setting/clusters/",
response$cluster_id,
"/configuration",
sep = "")
)
クラスターを完全に削除する
このコード例では、指定されたクラスターIDを持つクラスターをワークスペースから完全に削除します。
require(databricks)
client <- DatabricksClient()
cluster_id <- readline("ID of the cluster to delete (for example, 1234-567890-ab123cd4):")
delete_cluster(client, cluster_id)
ジョブを作成する
このコード例では、指定したクラスターで指定したノートブックを実行するために使用できる Databricks ジョブを作成します。 このコードを実行すると、コンソールでユーザーから既存のノートブックのパス、既存のクラスター ID、および関連するジョブ設定が取得されます。
require(databricks)
client <- DatabricksClient()
job_name <- readline("Some short name for the job (for example, my-job):")
description <- readline("Some short description for the job (for example, My job):")
existing_cluster_id <- readline("ID of the existing cluster in the workspace to run the job on (for example, 1234-567890-ab123cd4):")
notebook_path <- readline("Workspace path of the notebook to run (for example, /Users/someone@example.com/my-notebook):")
task_key <- readline("Some key to apply to the job's tasks (for example, my-key):")
print("Attempting to create the job. Please wait...")
notebook_task <- list(
notebook_path = notebook_path,
source = "WORKSPACE"
)
job_task <- list(
task_key = task_key,
description = description,
existing_cluster_id = existing_cluster_id,
notebook_task = notebook_task
)
response <- create_job(
client,
name = job_name,
tasks = list(job_task)
)
# Get the workspace URL to be used in the following results message.
get_client_debug <- strsplit(client$debug_string(), split = "host=")
get_host <- strsplit(get_client_debug[[1]][2], split = ",")
host <- get_host[[1]][1]
# Make sure the workspace URL ends with a forward slash.
if (endsWith(host, "/")) {
} else {
host <- paste(host, "/", sep = "")
}
print(paste(
"View the job at ",
host,
"#job/",
response$job_id,
sep = "")
)
伐採
一般的な logging
パッケージを使用して、メッセージをログに記録できます。 このパッケージは、複数のログレベルとカスタムログ形式をサポートします。 このパッケージを使用して、メッセージをコンソールまたはファイルに記録できます。 メッセージをログに記録するには、次の操作を行います。
-
logging
パッケージをインストールします。たとえば、RStudio Desktop の [コンソール ] ビュー ( [表示] > [フォーカスをコンソールに移動 ]) で、次のコマンドを実行します。Rinstall.packages("logging")
library(logging) -
ロギングパッケージをブートストラップし、メッセージを記録する場所を設定し、ログレベルを設定します。 たとえば、次のコードは
ERROR
以下のすべてのメッセージをresults.log
ファイルに記録します。RbasicConfig()
addHandler(writeToFile, file="results.log")
setLevel("ERROR") -
必要に応じてメッセージをログに記録します。 たとえば、次のコードでは、コードが認証できないか、使用可能なクラスターの名前を一覧表示できない場合に、エラーをログに記録します。
Rrequire(databricks)
require(logging)
basicConfig()
addHandler(writeToFile, file="results.log")
setLevel("ERROR")
tryCatch({
client <- DatabricksClient()
}, error = function(e) {
logerror(paste("Error initializing DatabricksClient(): ", e$message))
return(NA)
})
tryCatch({
list_clusters(client)[, "cluster_name"]
}, error = function(e) {
logerror(paste("Error in list_clusters(client): ", e$message))
return(NA)
})
テスティング
コードをテストするには、 testthat などの R テスト フレームワークを使用できます。 Databricks REST API エンドポイントを呼び出したり、Databricks アカウントやワークスペースの状態を変更したりせずに、シミュレートされた条件下でコードをテストするには、 mockery などの R モック ライブラリを使用できます。
たとえば、新しいクラスターに関する情報を返すcreateCluster
関数を含むhelpers.r
という名前の次のファイルがあるとします。
library(databricks)
createCluster <- function(
databricks_client,
cluster_name,
spark_version,
node_type_id,
autotermination_minutes,
num_workers
) {
response <- create_cluster(
client = databricks_client,
cluster_name = cluster_name,
spark_version = spark_version,
node_type_id = node_type_id,
autotermination_minutes = autotermination_minutes,
num_workers = num_workers
)
return(response)
}
そして、main.R
関数を呼び出すcreateCluster
という名前の次のファイルが与えられます。
library(databricks)
source("helpers.R")
client <- DatabricksClient()
# Replace <spark-version> with the target Spark version string.
# Replace <node-type-id> with the target node type string.
response = createCluster(
databricks_client = client,
cluster_name = "my-cluster",
spark_version = "<spark-version>",
node_type_id = "<node-type-id>",
autotermination_minutes = 15,
num_workers = 1
)
print(response$cluster_id)
次のtest-helpers.py
という名前のファイルはcreateCluster
関数が期待された応答を返すかどうかをテストします。このテストでは、対象のワークスペースにクラスターを作成するのではなく、DatabricksClient
オブジェクトをモックし、モックしたオブジェクトの設定を定義してから、createCluster
関数にモックしたオブジェクトを渡します。次に、この関数が新しいモックされたクラスタのIDを返すかどうかをチェックします。
# install.packages("testthat")
# install.pacakges("mockery")
# testthat::test_file("test-helpers.R")
lapply(c("databricks", "testthat", "mockery"), library, character.only = TRUE)
source("helpers.R")
test_that("createCluster mock returns expected results", {
# Create a mock response.
mock_response <- list(cluster_id = "abc123")
# Create a mock function for create_cluster().
mock_create_cluster <- mock(return_value = mock_response)
# Run the test with the mock function.
with_mock(
create_cluster = mock_create_cluster,
{
# Create a mock Databricks client.
mock_client <- mock()
# Call the function with the mock client.
# Replace <spark-version> with the target Spark version string.
# Replace <node-type-id> with the target node type string.
response <- createCluster(
databricks_client = mock_client,
cluster_name = "my-cluster",
spark_version = "<spark-version>",
node_type_id = "<node-type-id>",
autotermination_minutes = 15,
num_workers = 1
)
# Check that the function returned the correct mock response.
expect_equal(response$cluster_id, "abc123")
}
)
})
追加のリソース
詳細については、以下を参照してください。