Terraform を使用したクラスター、ノートブック、ジョブの作成
この記事では、 Databricks Terraform プロバイダー を使用して、既存の Databricks ワークスペース に クラスター 、 ノートブック 、 ジョブ を作成する方法について説明します。
この記事は、次の Databricks 入門記事のコンパニオンです。
汎用クラスター、Python ノートブック、およびノートブックを実行するジョブを使用する Databricks で最初の ETL ワークロードを実行します。
チュートリアル: Unity Catalogと連携するクラスター、Python ノートブック、およびノートブックを実行するジョブを使用する エンドツーエンドのlakehouse アナリティクス パイプラインを実行します 。
この記事のTerraform構成を調整して、ワークスペースにカスタム・クラスター、ノートブック、ジョブを作成することもできます。
ステップ 1: Terraform プロジェクトを作成して構成する
Databricks Terraform プロバイダーの概要に関する記事の 「要件 」セクションの手順に従って、Terraform プロジェクトを作成します。
クラスターを作成するには、
cluster.tf
という名前のファイルを作成し、そのファイルに次の内容を追加します。 このコンテンツは、許容される最小量のリソースでクラスターを作成します。 このクラスターでは、最新の Databricks Runtime Long Term Support (LTS) バージョンが使用されます。Unity Catalogで動作するクラスターの場合:
variable "cluster_name" {} variable "cluster_autotermination_minutes" {} variable "cluster_num_workers" {} variable "cluster_data_security_mode" {} # Create the cluster with the "smallest" amount # of resources allowed. data "databricks_node_type" "smallest" { local_disk = true } # Use the latest Databricks Runtime # Long Term Support (LTS) version. data "databricks_spark_version" "latest_lts" { long_term_support = true } resource "databricks_cluster" "this" { cluster_name = var.cluster_name node_type_id = data.databricks_node_type.smallest.id spark_version = data.databricks_spark_version.latest_lts.id autotermination_minutes = var.cluster_autotermination_minutes num_workers = var.cluster_num_workers data_security_mode = var.cluster_data_security_mode } output "cluster_url" { value = databricks_cluster.this.url }
汎用クラスターの場合:
variable "cluster_name" { description = "A name for the cluster." type = string default = "My Cluster" } variable "cluster_autotermination_minutes" { description = "How many minutes before automatically terminating due to inactivity." type = number default = 60 } variable "cluster_num_workers" { description = "The number of workers." type = number default = 1 } # Create the cluster with the "smallest" amount # of resources allowed. data "databricks_node_type" "smallest" { local_disk = true } # Use the latest Databricks Runtime # Long Term Support (LTS) version. data "databricks_spark_version" "latest_lts" { long_term_support = true } resource "databricks_cluster" "this" { cluster_name = var.cluster_name node_type_id = data.databricks_node_type.smallest.id spark_version = data.databricks_spark_version.latest_lts.id autotermination_minutes = var.cluster_autotermination_minutes num_workers = var.cluster_num_workers } output "cluster_url" { value = databricks_cluster.this.url }
クラスターを作成するには、
cluster.auto.tfvars
という名前の別のファイルを作成し、そのファイルに次の内容を追加します。 このファイルには、クラスターをカスタマイズするための変数値が含まれています。 プレースホルダーの値を独自の値に置き換えます。Unity Catalogで動作するクラスターの場合:
cluster_name = "My Cluster" cluster_autotermination_minutes = 60 cluster_num_workers = 1 cluster_data_security_mode = "SINGLE_USER"
汎用クラスターの場合:
cluster_name = "My Cluster" cluster_autotermination_minutes = 60 cluster_num_workers = 1
ノートブックを作成するには、
notebook.tf
という名前の別のファイルを作成し、そのファイルに次の内容を追加します。variable "notebook_subdirectory" { description = "A name for the subdirectory to store the notebook." type = string default = "Terraform" } variable "notebook_filename" { description = "The notebook's filename." type = string } variable "notebook_language" { description = "The language of the notebook." type = string } resource "databricks_notebook" "this" { path = "${data.databricks_current_user.me.home}/${var.notebook_subdirectory}/${var.notebook_filename}" language = var.notebook_language source = "./${var.notebook_filename}" } output "notebook_url" { value = databricks_notebook.this.url }
クラスターを作成する場合は、次のノートブック コードを
notebook.tf
ファイルと同じディレクトリ内のファイルに保存します。Databricks で最初の ETL ワークロードを実行するための Python ノートブックの場合、次の内容を含む
notebook-getting-started-etl-quick-start.py
という名前のファイル。# Databricks notebook source # Import functions from pyspark.sql.functions import col, current_timestamp # Define variables used in code below file_path = "/databricks-datasets/structured-streaming/events" username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0] table_name = f"{username}_etl_quickstart" checkpoint_path = f"/tmp/{username}/_checkpoint/etl_quickstart" # Clear out data from previous demo execution spark.sql(f"DROP TABLE IF EXISTS {table_name}") dbutils.fs.rm(checkpoint_path, True) # Configure Auto Loader to ingest JSON data to a Delta table (spark.readStream .format("cloudFiles") .option("cloudFiles.format", "json") .option("cloudFiles.schemaLocation", checkpoint_path) .load(file_path) .select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time")) .writeStream .option("checkpointLocation", checkpoint_path) .trigger(availableNow=True) .toTable(table_name)) # COMMAND ---------- df = spark.read.table(table_name) # COMMAND ---------- display(df)
「はじめに: ノートブックからデータをクエリして視覚化する」の SQL ノートブックの場合、次の内容を含む
notebook-getting-started-quick-start.sql
という名前のファイルがあります。-- Databricks notebook source -- MAGIC %python -- MAGIC diamonds = (spark.read -- MAGIC .format("csv") -- MAGIC .option("header", "true") -- MAGIC .option("inferSchema", "true") -- MAGIC .load("/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv") -- MAGIC ) -- MAGIC -- MAGIC diamonds.write.format("delta").save("/mnt/delta/diamonds") -- COMMAND ---------- DROP TABLE IF EXISTS diamonds; CREATE TABLE diamonds USING DELTA LOCATION '/mnt/delta/diamonds/' -- COMMAND ---------- SELECT color, avg(price) AS price FROM diamonds GROUP BY color ORDER BY COLOR
チュートリアル用の Python ノートブックの場合: エンドツーエンドのレイクハウス アナリティクス パイプラインを実行する(
notebook-getting-started-lakehouse-e2e.py
という名前のファイルは、次の内容です。# Databricks notebook source external_location = "<your_external_location>" catalog = "<your_catalog>" dbutils.fs.put(f"{external_location}/foobar.txt", "Hello world!", True) display(dbutils.fs.head(f"{external_location}/foobar.txt")) dbutils.fs.rm(f"{external_location}/foobar.txt") display(spark.sql(f"SHOW SCHEMAS IN {catalog}")) # COMMAND ---------- from pyspark.sql.functions import col # Set parameters for isolation in workspace and reset demo username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0] database = f"{catalog}.e2e_lakehouse_{username}_db" source = f"{external_location}/e2e-lakehouse-source" table = f"{database}.target_table" checkpoint_path = f"{external_location}/_checkpoint/e2e-lakehouse-demo" spark.sql(f"SET c.username='{username}'") spark.sql(f"SET c.database={database}") spark.sql(f"SET c.source='{source}'") spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE") spark.sql("CREATE DATABASE ${c.database}") spark.sql("USE ${c.database}") # Clear out data from previous demo execution dbutils.fs.rm(source, True) dbutils.fs.rm(checkpoint_path, True) # Define a class to load batches of data to source class LoadData: def __init__(self, source): self.source = source def get_date(self): try: df = spark.read.format("json").load(source) except: return "2016-01-01" batch_date = df.selectExpr("max(distinct(date(tpep_pickup_datetime))) + 1 day").first()[0] if batch_date.month == 3: raise Exception("Source data exhausted") return batch_date def get_batch(self, batch_date): return ( spark.table("samples.nyctaxi.trips") .filter(col("tpep_pickup_datetime").cast("date") == batch_date) ) def write_batch(self, batch): batch.write.format("json").mode("append").save(self.source) def land_batch(self): batch_date = self.get_date() batch = self.get_batch(batch_date) self.write_batch(batch) RawData = LoadData(source) # COMMAND ---------- RawData.land_batch() # COMMAND ---------- # Import functions from pyspark.sql.functions import col, current_timestamp # Configure Auto Loader to ingest JSON data to a Delta table (spark.readStream .format("cloudFiles") .option("cloudFiles.format", "json") .option("cloudFiles.schemaLocation", checkpoint_path) .load(file_path) .select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time")) .writeStream .option("checkpointLocation", checkpoint_path) .trigger(availableNow=True) .option("mergeSchema", "true") .toTable(table)) # COMMAND ---------- df = spark.read.table(table_name) # COMMAND ---------- display(df)
ノートブックを作成する場合は、
notebook.auto.tfvars
という名前の別のファイルを作成し、そのファイルに次の内容を追加します。 このファイルには、ノートブック構成をカスタマイズするための変数値が含まれています。Databricksで最初のETLワークロードを実行するためのPythonノートブックの場合:
notebook_subdirectory = "Terraform" notebook_filename = "notebook-getting-started-etl-quick-start.py" notebook_language = "PYTHON"
「はじめに: ノートブックからデータをクエリして視覚化する」の SQL ノートブックの場合:
notebook_subdirectory = "Terraform" notebook_filename = "notebook-getting-started-quickstart.sql" notebook_language = "SQL"
チュートリアル用の Python ノートブックの場合: エンドツーエンドのレイクハウス アナリティクス パイプラインを実行する:
notebook_subdirectory = "Terraform" notebook_filename = "notebook-getting-started-lakehouse-e2e.py" notebook_language = "PYTHON"
ノートブックを作成する場合は、Databricks ワークスペースで、次の手順を参照して、ノートブックを正常に実行するための要件を設定してください。
チュートリアル用の Python ノートブック : エンドツーエンドのレイクハウス アナリティクス パイプラインを実行する
ジョブを作成するには、
job.tf
という名前の別のファイルを作成し、そのファイルに次の内容を追加します。 このコンテンツは、ノートブックを実行するジョブを作成します。variable "job_name" { description = "A name for the job." type = string default = "My Job" } variable "task_key" { description = "A name for the task." type = string default = "my_task" } resource "databricks_job" "this" { name = var.job_name task { task_key = var.task_key existing_cluster_id = databricks_cluster.this.cluster_id notebook_task { notebook_path = databricks_notebook.this.path } } email_notifications { on_success = [ data.databricks_current_user.me.user_name ] on_failure = [ data.databricks_current_user.me.user_name ] } } output "job_url" { value = databricks_job.this.url }
ジョブを作成する場合は、
job.auto.tfvars
という名前の別のファイルを作成し、そのファイルに次の内容を追加します。 このファイルには、ジョブ構成をカスタマイズするための変数値が含まれています。job_name = "My Job" task_key = "my_task"
ステップ 2: 構成を実行する
このステップでは、Terraform構成を実行して、クラスター、ノートブックおよびジョブをDatabricksワークスペースにデプロイします。
terraform validate
コマンドを実行して、Terraform構成が有効かどうかを確認します。エラーが報告された場合は、それらを修正し、コマンドを再実行します。terraform validate
terraform plan
コマンドを実行して、Terraformが実際に実行する前に、ワークスペースでTerraformが何をするかを確認します。terraform plan
クラスター、ノートブック、ジョブをワークスペースにデプロイするには、
terraform apply
コマンドを実行します。 デプロイを求めるプロンプトが出されたら、「yes
」と入力して Enter キーを押します。terraform apply
Terraform は、プロジェクトで指定されたリソースをデプロイします。 これらのリソース (特にクラスター) のデプロイには数分かかる場合があります。
ステップ 3: 結果を調べる
クラスターを作成した場合は、
terraform apply
コマンドの出力で、[cluster_url
] の横にあるリンクをコピーし、Web ブラウザーのアドレス バーに貼り付けます。ノートブックを作成した場合は、
terraform apply
コマンドの出力で、[notebook_url
] の横にあるリンクをコピーし、Web ブラウザーのアドレス バーに貼り付けます。注
ノートブックを使用する前に、その内容をカスタマイズする必要がある場合があります。 ノートブックをカスタマイズする方法については、関連ドキュメントを参照してください。
ジョブを作成した場合は、
terraform apply
コマンドの出力で、[job_url
] の横にあるリンクをコピーし、Web ブラウザーのアドレス バーに貼り付けます。注
ノートブックを実行する前に、その内容をカスタマイズする必要がある場合があります。 ノートブックをカスタマイズする方法に関する関連ドキュメントについては、この記事の冒頭にあるリンクを参照してください。
ジョブを作成した場合は、次のようにジョブを実行します。
ジョブ ページで [ 今すぐ実行 ] をクリックします。
ジョブの実行が完了したら、ジョブの実行結果を表示するには、ジョブ ページの [完了した実行 (過去 60 日間)] ボックスの一覧で、[開始時刻] 列の最新の 時刻 エントリをクリックします。 [ 出力 ] ウィンドウには、ノートブックのコードを実行した結果が表示されます。