クラスタリング、ノートブック、ジョブを Terraform で作成する
この記事では、DatabricksTerraformDatabricksプロバイダー を使用して、既存の ワークスペース に クラスタリング 、 ノートブック 、 およびジョブ を作成する方法について説明します。
この記事は、次の Databricks の概要記事と関連しています。
-
ETLDatabricks汎用クラスタリング、Python ノートブック、およびジョブを使用してノートブックを実行する で最初の ワークロードを実行します 。
-
はじめに: 汎用クラスタリングと ノートブックを使用する ノートブックからデータのクエリと視覚化SQL を行います。
-
チュートリアル: end-to-end レイクハウス アナリティクス パイプライン, which uses a clustering that works with Unity Catalog、a Python ノートブック、およびジョブを使用してノートブックを実行します。
また、この記事の Terraform 構成を調整して、ワークスペースにカスタム クラスタリング、ノートブック、ジョブを作成することもできます。
ステップ 1: Terraform プロジェクトを作成して構成する
-
Databricks Terraform プロバイダーの概要に関する記事の 「要件 」セクションの指示に従って、Terraform プロジェクトを作成します。
-
クラスタリングを作成するには、
cluster.tf
という名前のファイルを作成し、次の内容をファイルに追加します。 このコンテンツは、許可されるリソースの量を最小限に抑えたクラスタリングを作成します。 このクラスタリングでは、最新の Databricks Runtime Long Term Support (LTS) バージョンを使用します。Unity Catalogと連携するクラスタリングの場合:
variable "cluster_name" {}
variable "cluster_autotermination_minutes" {}
variable "cluster_num_workers" {}
variable "cluster_data_security_mode" {}
# Create the cluster with the "smallest" amount
# of resources allowed.
data "databricks_node_type" "smallest" {
local_disk = true
}
# Use the latest Databricks Runtime
# Long Term Support (LTS) version.
data "databricks_spark_version" "latest_lts" {
long_term_support = true
}
resource "databricks_cluster" "this" {
cluster_name = var.cluster_name
node_type_id = data.databricks_node_type.smallest.id
spark_version = data.databricks_spark_version.latest_lts.id
autotermination_minutes = var.cluster_autotermination_minutes
num_workers = var.cluster_num_workers
data_security_mode = var.cluster_data_security_mode
}
output "cluster_url" {
value = databricks_cluster.this.url
}
All-Purposeクラスターの場合:
variable "cluster_name" {
description = "A name for the cluster."
type = string
default = "My Cluster"
}
variable "cluster_autotermination_minutes" {
description = "How many minutes before automatically terminating due to inactivity."
type = number
default = 60
}
variable "cluster_num_workers" {
description = "The number of workers."
type = number
default = 1
}
# Create the cluster with the "smallest" amount
# of resources allowed.
data "databricks_node_type" "smallest" {
local_disk = true
}
# Use the latest Databricks Runtime
# Long Term Support (LTS) version.
data "databricks_spark_version" "latest_lts" {
long_term_support = true
}
resource "databricks_cluster" "this" {
cluster_name = var.cluster_name
node_type_id = data.databricks_node_type.smallest.id
spark_version = data.databricks_spark_version.latest_lts.id
autotermination_minutes = var.cluster_autotermination_minutes
num_workers = var.cluster_num_workers
}
output "cluster_url" {
value = databricks_cluster.this.url
}
-
クラスタリングを作成するには、
cluster.auto.tfvars
という名前の別のファイルを作成し、次の内容をファイルに追加します。 このファイルには、クラスタリングをカスタマイズするための変数値が含まれています。 プレースホルダーの値を独自の値に置き換えます。Unity Catalogと連携するクラスタリングの場合:
cluster_name = "My Cluster"
cluster_autotermination_minutes = 60
cluster_num_workers = 1
cluster_data_security_mode = "SINGLE_USER"
All-Purposeクラスターの場合:
cluster_name = "My Cluster"
cluster_autotermination_minutes = 60
cluster_num_workers = 1
- ノートブックを作成するには、
notebook.tf
という名前の別のファイルを作成し、次の内容をファイルに追加します。
variable "notebook_subdirectory" {
description = "A name for the subdirectory to store the notebook."
type = string
default = "Terraform"
}
variable "notebook_filename" {
description = "The notebook's filename."
type = string
}
variable "notebook_language" {
description = "The language of the notebook."
type = string
}
resource "databricks_notebook" "this" {
path = "${data.databricks_current_user.me.home}/${var.notebook_subdirectory}/${var.notebook_filename}"
language = var.notebook_language
source = "./${var.notebook_filename}"
}
output "notebook_url" {
value = databricks_notebook.this.url
}
-
クラスタリングを作成する場合は、次のノートブック コードを
notebook.tf
ファイルと同じディレクトリ内のファイルに保存します。「Databricks で初めて ETL ワークロードを実行する」の Python ノートブックの場合、次の内容を含む
notebook-getting-started-etl-quick-start.py
という名前のファイル。
# Databricks notebook source
# Import functions
from pyspark.sql.functions import col, current_timestamp
# Define variables used in code below
file_path = "/databricks-datasets/structured-streaming/events"
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
table_name = f"{username}_etl_quickstart"
checkpoint_path = f"/tmp/{username}/_checkpoint/etl_quickstart"
# Clear out data from previous demo execution
spark.sql(f"DROP TABLE IF EXISTS {table_name}")
dbutils.fs.rm(checkpoint_path, True)
# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
.select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(availableNow=True)
.toTable(table_name))
# COMMAND ----------
df = spark.read.table(table_name)
# COMMAND ----------
display(df)
「はじめに: ノートブックからデータをクエリして視覚化する」の SQL ノートブックの場合、次の内容を含む notebook-getting-started-quick-start.sql
という名前のファイル。
-- Databricks notebook source
-- MAGIC %python
-- MAGIC diamonds = (spark.read
-- MAGIC .format("csv")
-- MAGIC .option("header", "true")
-- MAGIC .option("inferSchema", "true")
-- MAGIC .load("/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv")
-- MAGIC )
-- MAGIC
-- MAGIC diamonds.write.format("delta").save("/mnt/delta/diamonds")
-- COMMAND ----------
DROP TABLE IF EXISTS diamonds;
CREATE TABLE diamonds USING DELTA LOCATION '/mnt/delta/diamonds/'
-- COMMAND ----------
SELECT color, avg(price) AS price FROM diamonds GROUP BY color ORDER BY COLOR
チュートリアルの Python ノートブック の場合: エンドツーエンドのレイクハウス アナリティクス パイプライン (次の内容の notebook-getting-started-lakehouse-e2e.py
という名前のファイルを実行してください。
# Databricks notebook source
external_location = "<your_external_location>"
catalog = "<your_catalog>"
dbutils.fs.put(f"{external_location}/foobar.txt", "Hello world!", True)
display(dbutils.fs.head(f"{external_location}/foobar.txt"))
dbutils.fs.rm(f"{external_location}/foobar.txt")
display(spark.sql(f"SHOW SCHEMAS IN {catalog}"))
# COMMAND ----------
from pyspark.sql.functions import col
# Set parameters for isolation in workspace and reset demo
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
database = f"{catalog}.e2e_lakehouse_{username}_db"
source = f"{external_location}/e2e-lakehouse-source"
table = f"{database}.target_table"
checkpoint_path = f"{external_location}/_checkpoint/e2e-lakehouse-demo"
spark.sql(f"SET c.username='{username}'")
spark.sql(f"SET c.database={database}")
spark.sql(f"SET c.source='{source}'")
spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
spark.sql("CREATE DATABASE ${c.database}")
spark.sql("USE ${c.database}")
# Clear out data from previous demo execution
dbutils.fs.rm(source, True)
dbutils.fs.rm(checkpoint_path, True)
# Define a class to load batches of data to source
class LoadData:
def __init__(self, source):
self.source = source
def get_date(self):
try:
df = spark.read.format("json").load(source)
except:
return "2016-01-01"
batch_date = df.selectExpr("max(distinct(date(tpep_pickup_datetime))) + 1 day").first()[0]
if batch_date.month == 3:
raise Exception("Source data exhausted")
return batch_date
def get_batch(self, batch_date):
return (
spark.table("samples.nyctaxi.trips")
.filter(col("tpep_pickup_datetime").cast("date") == batch_date)
)
def write_batch(self, batch):
batch.write.format("json").mode("append").save(self.source)
def land_batch(self):
batch_date = self.get_date()
batch = self.get_batch(batch_date)
self.write_batch(batch)
RawData = LoadData(source)
# COMMAND ----------
RawData.land_batch()
# COMMAND ----------
# Import functions
from pyspark.sql.functions import col, current_timestamp
# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
.select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(availableNow=True)
.option("mergeSchema", "true")
.toTable(table))
# COMMAND ----------
df = spark.read.table(table_name)
# COMMAND ----------
display(df)
-
ノートブックを作成する場合は、
notebook.auto.tfvars
という名前の別のファイルを作成し、次の内容をファイルに追加します。 このファイルには、ノートブック構成をカスタマイズするための変数値が含まれています。Python ノートブックの「 Databricks で初めての ETL ワークロードを実行する」の場合:
notebook_subdirectory = "Terraform"
notebook_filename = "notebook-getting-started-etl-quick-start.py"
notebook_language = "PYTHON"
SQL ノートブックの場合 「はじめに: ノートブックからのデータのクエリと視覚化」の場合:
notebook_subdirectory = "Terraform"
notebook_filename = "notebook-getting-started-quickstart.sql"
notebook_language = "SQL"
For the Python ノートブック for チュートリアル: 実行 an end-to-end レイクハウス アナリティクス パイプライン:
notebook_subdirectory = "Terraform"
notebook_filename = "notebook-getting-started-lakehouse-e2e.py"
notebook_language = "PYTHON"
-
ノートブックを作成する場合は、Databricks ワークスペースで、次の手順を参照して、ノートブックを正常に実行するための要件を必ず設定してください。
-
Databricks で初めての ETL ワークロードを実行するための Python ノートブック
-
「作業の開始: ノートブックからのデータのクエリと視覚化」の SQL ノートブック
-
The Python ノートブック for チュートリアル: 実行 an end-to-end レイクハウス アナリティクス パイプライン
-
-
ジョブを作成するには、
job.tf
という名前の別のファイルを作成し、次の内容をファイルに追加します。 このコンテンツは、ノートブックを実行するジョブを作成します。
variable "job_name" {
description = "A name for the job."
type = string
default = "My Job"
}
variable "task_key" {
description = "A name for the task."
type = string
default = "my_task"
}
resource "databricks_job" "this" {
name = var.job_name
task {
task_key = var.task_key
existing_cluster_id = databricks_cluster.this.cluster_id
notebook_task {
notebook_path = databricks_notebook.this.path
}
}
email_notifications {
on_success = [ data.databricks_current_user.me.user_name ]
on_failure = [ data.databricks_current_user.me.user_name ]
}
}
output "job_url" {
value = databricks_job.this.url
}
- ジョブを作成する場合は、
job.auto.tfvars
という名前の別のファイルを作成し、次の内容をファイルに追加します。 このファイルには、ジョブ設定をカスタマイズするための変数値が含まれています。
job_name = "My Job"
task_key = "my_task"
ステップ 2: 構成を実行する
この手順では、 Terraform 構成を実行して、クラスタリング、ノートブック、およびジョブを Databricks ワークスペースにデプロイします。
-
Terraform構成が有効かどうかを確認するには、
terraform validate
コマンドを実行します。 エラーが報告された場合は、それらを修正し、コマンドを再度実行します。Bashterraform validate
-
Terraform が実際にそれを行う前に、
terraform plan
コマンドを実行して、ワークスペースで Terraform が何を行うかを確認してください。Bashterraform plan
-
クラスタリング、ノートブック、およびジョブをワークスペースにデプロイするには、
terraform apply
コマンドを実行します。 デプロイを求められたら、「yes
」と入力して Enter キーを押します。Bashterraform apply
Terraformは、プロジェクトで指定されたリソースをデプロイします。 これらのリソース (特にクラスタリング) のデプロイには数分かかる場合があります。
ステップ3: 結果を調べてください
-
クラスタリングを作成した場合は、
terraform apply
コマンドの出力で、cluster_url
の横にあるリンクをコピーし、Web ブラウザーのアドレス バーに貼り付けます。 -
ノートブックを作成した場合は、
terraform apply
コマンドの出力で、notebook_url
の横にあるリンクをコピーし、Web ブラウザーのアドレス バーに貼り付けます。
ノートブックを使用する前に、その内容をカスタマイズする必要がある場合があります。 ノートブックのカスタマイズ方法については、関連ドキュメントを参照してください。
- ジョブを作成した場合は、
terraform apply
コマンドの出力で、job_url
の横にあるリンクをコピーし、Web ブラウザーのアドレスバーに貼り付けます。
ノートブックを実行する前に、その内容をカスタマイズする必要がある場合があります。 この記事の冒頭にあるリンクには、ノートブックのカスタマイズ方法に関する関連ドキュメントがあります。
-
ジョブを作成した場合は、次のようにジョブを実行します。
- ジョブページで 「今すぐ実行 」をクリックします。
- ジョブの実行が終了したら、ジョブの実行結果を表示するには、ジョブ ページの [完了した実行 (過去 60 日間)] リストで、[ 開始時刻 ] 列の最新の時刻エントリをクリックします。 [出力 ] ウィンドウには、ノートブックのコードを実行した結果が表示されます。
ステップ4:クリーンアップ
この手順では、ワークスペースから前のリソースを削除します。
-
Terraform が実際にそれを行う前に、
terraform plan
コマンドを実行して、ワークスペースで Terraform が何を行うかを確認してください。Bashterraform plan
-
terraform destroy
コマンドを実行して、クラスタリング、ノートブック、およびジョブをワークスペースから削除します。削除を求めるメッセージが表示されたら、「yes
」と入力して Enter キーを押します。Bashterraform destroy
Terraformは、プロジェクトで指定されたリソースを削除します。