Terraform を使用したクラスター、ノートブック、ジョブの作成

この記事では、 Databricks Terraform プロバイダー を使用して、既存の Databricks ワークスペース に クラスター ノートブック 、 ジョブ を作成する方法について説明します。

この記事は、次の Databricks 入門記事のコンパニオンです。

この記事のTerraform構成を調整して、ワークスペースにカスタム・クラスター、ノートブック、ジョブを作成することもできます。

ステップ 1: Terraform プロジェクトを作成して構成する

  1. Databricks Terraform プロバイダーの概要に関する記事の 「要件 」セクションの手順に従って、Terraform プロジェクトを作成します。

  2. クラスターを作成するには、 cluster.tfという名前のファイルを作成し、そのファイルに次の内容を追加します。 このコンテンツは、許容される最小量のリソースでクラスターを作成します。 このクラスターでは、最新の Databricks Runtime Long Term Support (LTS) バージョンが使用されます。

    Unity Catalogで動作するクラスターの場合:

    variable "cluster_name" {}
    variable "cluster_autotermination_minutes" {}
    variable "cluster_num_workers" {}
    variable "cluster_data_security_mode" {}
    
    # Create the cluster with the "smallest" amount
    # of resources allowed.
    data "databricks_node_type" "smallest" {
      local_disk = true
    }
    
    # Use the latest Databricks Runtime
    # Long Term Support (LTS) version.
    data "databricks_spark_version" "latest_lts" {
      long_term_support = true
    }
    
    resource "databricks_cluster" "this" {
      cluster_name            = var.cluster_name
      node_type_id            = data.databricks_node_type.smallest.id
      spark_version           = data.databricks_spark_version.latest_lts.id
      autotermination_minutes = var.cluster_autotermination_minutes
      num_workers             = var.cluster_num_workers
      data_security_mode      = var.cluster_data_security_mode
    }
    
    output "cluster_url" {
     value = databricks_cluster.this.url
    }
    

    汎用クラスターの場合:

    variable "cluster_name" {
      description = "A name for the cluster."
      type        = string
      default     = "My Cluster"
    }
    
    variable "cluster_autotermination_minutes" {
      description = "How many minutes before automatically terminating due to inactivity."
      type        = number
      default     = 60
    }
    
    variable "cluster_num_workers" {
      description = "The number of workers."
      type        = number
      default     = 1
    }
    
    # Create the cluster with the "smallest" amount
    # of resources allowed.
    data "databricks_node_type" "smallest" {
      local_disk = true
    }
    
    # Use the latest Databricks Runtime
    # Long Term Support (LTS) version.
    data "databricks_spark_version" "latest_lts" {
      long_term_support = true
    }
    
    resource "databricks_cluster" "this" {
      cluster_name            = var.cluster_name
      node_type_id            = data.databricks_node_type.smallest.id
      spark_version           = data.databricks_spark_version.latest_lts.id
      autotermination_minutes = var.cluster_autotermination_minutes
      num_workers             = var.cluster_num_workers
    }
    
    output "cluster_url" {
     value = databricks_cluster.this.url
    }
    
  3. クラスターを作成するには、 cluster.auto.tfvarsという名前の別のファイルを作成し、そのファイルに次の内容を追加します。 このファイルには、クラスターをカスタマイズするための変数値が含まれています。 プレースホルダーの値を独自の値に置き換えます。

    Unity Catalogで動作するクラスターの場合:

    cluster_name                    = "My Cluster"
    cluster_autotermination_minutes = 60
    cluster_num_workers             = 1
    cluster_data_security_mode      = "SINGLE_USER"
    

    汎用クラスターの場合:

    cluster_name                    = "My Cluster"
    cluster_autotermination_minutes = 60
    cluster_num_workers             = 1
    
  4. ノートブックを作成するには、 notebook.tfという名前の別のファイルを作成し、そのファイルに次の内容を追加します。

    variable "notebook_subdirectory" {
      description = "A name for the subdirectory to store the notebook."
      type        = string
      default     = "Terraform"
    }
    
    variable "notebook_filename" {
      description = "The notebook's filename."
      type        = string
    }
    
    variable "notebook_language" {
      description = "The language of the notebook."
      type        = string
    }
    
    resource "databricks_notebook" "this" {
      path     = "${data.databricks_current_user.me.home}/${var.notebook_subdirectory}/${var.notebook_filename}"
      language = var.notebook_language
      source   = "./${var.notebook_filename}"
    }
    
    output "notebook_url" {
     value = databricks_notebook.this.url
    }
    
  5. クラスターを作成する場合は、次のノートブック コードを notebook.tf ファイルと同じディレクトリ内のファイルに保存します。

    Databricks で最初の ETL ワークロードを実行するための Python ノートブックの場合、次の内容を含む notebook-getting-started-etl-quick-start.py という名前のファイル。

    # Databricks notebook source
    # Import functions
    from pyspark.sql.functions import col, current_timestamp
    
    # Define variables used in code below
    file_path = "/databricks-datasets/structured-streaming/events"
    username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
    table_name = f"{username}_etl_quickstart"
    checkpoint_path = f"/tmp/{username}/_checkpoint/etl_quickstart"
    
    # Clear out data from previous demo execution
    spark.sql(f"DROP TABLE IF EXISTS {table_name}")
    dbutils.fs.rm(checkpoint_path, True)
    
    # Configure Auto Loader to ingest JSON data to a Delta table
    (spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "json")
      .option("cloudFiles.schemaLocation", checkpoint_path)
      .load(file_path)
      .select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))
      .writeStream
      .option("checkpointLocation", checkpoint_path)
      .trigger(availableNow=True)
      .toTable(table_name))
    
    # COMMAND ----------
    
    df = spark.read.table(table_name)
    
    # COMMAND ----------
    
    display(df)
    

    「はじめに: ノートブックからデータをクエリして視覚化する」の SQL ノートブックの場合、次の内容を含むnotebook-getting-started-quick-start.sqlという名前のファイルがあります。

    -- Databricks notebook source
    -- MAGIC %python
    -- MAGIC diamonds = (spark.read
    -- MAGIC   .format("csv")
    -- MAGIC   .option("header", "true")
    -- MAGIC   .option("inferSchema", "true")
    -- MAGIC   .load("/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv")
    -- MAGIC )
    -- MAGIC 
    -- MAGIC diamonds.write.format("delta").save("/mnt/delta/diamonds")
    
    -- COMMAND ----------
    
    DROP TABLE IF EXISTS diamonds;
    
    CREATE TABLE diamonds USING DELTA LOCATION '/mnt/delta/diamonds/'
    
    -- COMMAND ----------
    
    SELECT color, avg(price) AS price FROM diamonds GROUP BY color ORDER BY COLOR
    

    チュートリアル用の Python ノートブックの場合: エンドツーエンドのレイクハウス アナリティクス パイプラインを実行する( notebook-getting-started-lakehouse-e2e.py という名前のファイルは、次の内容です。

    # Databricks notebook source
    external_location = "<your_external_location>"
    catalog = "<your_catalog>"
    
    dbutils.fs.put(f"{external_location}/foobar.txt", "Hello world!", True)
    display(dbutils.fs.head(f"{external_location}/foobar.txt"))
    dbutils.fs.rm(f"{external_location}/foobar.txt")
    
    display(spark.sql(f"SHOW SCHEMAS IN {catalog}"))
    
    # COMMAND ----------
    
    from pyspark.sql.functions import col
    
    # Set parameters for isolation in workspace and reset demo
    username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
    database = f"{catalog}.e2e_lakehouse_{username}_db"
    source = f"{external_location}/e2e-lakehouse-source"
    table = f"{database}.target_table"
    checkpoint_path = f"{external_location}/_checkpoint/e2e-lakehouse-demo"
    
    spark.sql(f"SET c.username='{username}'")
    spark.sql(f"SET c.database={database}")
    spark.sql(f"SET c.source='{source}'")
    
    spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
    spark.sql("CREATE DATABASE ${c.database}")
    spark.sql("USE ${c.database}")
    
    # Clear out data from previous demo execution
    dbutils.fs.rm(source, True)
    dbutils.fs.rm(checkpoint_path, True)
    
    
    # Define a class to load batches of data to source
    class LoadData:
    
      def __init__(self, source):
        self.source = source
    
      def get_date(self):
        try:
          df = spark.read.format("json").load(source)
        except:
            return "2016-01-01"
        batch_date = df.selectExpr("max(distinct(date(tpep_pickup_datetime))) + 1 day").first()[0]
        if batch_date.month == 3:
          raise Exception("Source data exhausted")
          return batch_date
    
      def get_batch(self, batch_date):
        return (
          spark.table("samples.nyctaxi.trips")
            .filter(col("tpep_pickup_datetime").cast("date") == batch_date)
        )
    
      def write_batch(self, batch):
        batch.write.format("json").mode("append").save(self.source)
    
      def land_batch(self):
        batch_date = self.get_date()
        batch = self.get_batch(batch_date)
        self.write_batch(batch)
    
    RawData = LoadData(source)
    
    # COMMAND ----------
    
    RawData.land_batch()
    
    # COMMAND ----------
    
    # Import functions
    from pyspark.sql.functions import col, current_timestamp
    
    # Configure Auto Loader to ingest JSON data to a Delta table
    (spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "json")
      .option("cloudFiles.schemaLocation", checkpoint_path)
      .load(file_path)
      .select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))
      .writeStream
      .option("checkpointLocation", checkpoint_path)
      .trigger(availableNow=True)
      .option("mergeSchema", "true")
      .toTable(table))
    
    # COMMAND ----------
    
    df = spark.read.table(table_name)
    
    # COMMAND ----------
    
    display(df)
    
  6. ノートブックを作成する場合は、 notebook.auto.tfvarsという名前の別のファイルを作成し、そのファイルに次の内容を追加します。 このファイルには、ノートブック構成をカスタマイズするための変数値が含まれています。

    Databricksで最初のETLワークロードを実行するためのPythonノートブックの場合:

    notebook_subdirectory = "Terraform"
    notebook_filename     = "notebook-getting-started-etl-quick-start.py"
    notebook_language     = "PYTHON"
    

    「はじめに: ノートブックからデータをクエリして視覚化する」の SQL ノートブックの場合:

    notebook_subdirectory = "Terraform"
    notebook_filename     = "notebook-getting-started-quickstart.sql"
    notebook_language     = "SQL"
    

    チュートリアル用の Python ノートブックの場合: エンドツーエンドのレイクハウス アナリティクス パイプラインを実行する:

    notebook_subdirectory = "Terraform"
    notebook_filename     = "notebook-getting-started-lakehouse-e2e.py"
    notebook_language     = "PYTHON"
    
  7. ノートブックを作成する場合は、Databricks ワークスペースで、次の手順を参照して、ノートブックを正常に実行するための要件を設定してください。

  8. ジョブを作成するには、 job.tfという名前の別のファイルを作成し、そのファイルに次の内容を追加します。 このコンテンツは、ノートブックを実行するジョブを作成します。

    variable "job_name" {
      description = "A name for the job."
      type        = string
      default     = "My Job"
    }
    
    resource "databricks_job" "this" {
      name = var.job_name
      existing_cluster_id = databricks_cluster.this.cluster_id
      notebook_task {
        notebook_path = databricks_notebook.this.path
      }
      email_notifications {
        on_success = [ data.databricks_current_user.me.user_name ]
        on_failure = [ data.databricks_current_user.me.user_name ]
      }
    }
    
    output "job_url" {
      value = databricks_job.this.url
    }
    
  9. ジョブを作成する場合は、 job.auto.tfvarsという名前の別のファイルを作成し、そのファイルに次の内容を追加します。 このファイルには、ジョブ構成をカスタマイズするための変数値が含まれています。

    job_name = "My Job"
    

ステップ 2: 構成 を実行する

このステップでは、Terraform構成を実行して、クラスター、ノートブックおよびジョブをDatabricksワークスペースにデプロイします。

  1. terraform validate コマンドを実行して、Terraform構成が有効かどうかを確認します。エラーが報告された場合は、それらを修正し、コマンドを再実行します。

    terraform validate
    
  2. terraform plan コマンドを実行して、Terraformが実際に実行する前に、ワークスペースでTerraformが何をするかを確認します。

    terraform plan
    
  3. クラスター、ノートブック、ジョブをワークスペースにデプロイするには、 terraform apply コマンドを実行します。 デプロイを求めるプロンプトが出されたら、「 yes 」と入力して Enter キーを押します。

    terraform apply
    

    Terraform は、プロジェクトで指定されたリソースをデプロイします。 これらのリソース (特にクラスター) のデプロイには数分かかる場合があります。

ステップ 3: 結果 を調べる

  1. クラスターを作成した場合は、 terraform apply コマンドの出力で、[ cluster_url] の横にあるリンクをコピーし、Web ブラウザーのアドレス バーに貼り付けます。

  2. ノートブックを作成した場合は、 terraform apply コマンドの出力で、[ notebook_url] の横にあるリンクをコピーし、Web ブラウザーのアドレス バーに貼り付けます。

    ノートブックを使用する前に、その内容をカスタマイズする必要がある場合があります。 ノートブックをカスタマイズする方法については、関連ドキュメントを参照してください。

  3. ジョブを作成した場合は、 terraform apply コマンドの出力で、[ job_url] の横にあるリンクをコピーし、Web ブラウザーのアドレス バーに貼り付けます。

    ノートブックを実行する前に、その内容をカスタマイズする必要がある場合があります。 ノートブックをカスタマイズする方法に関する関連ドキュメントについては、この記事の冒頭にあるリンクを参照してください。

  4. ジョブを作成した場合は、次のようにジョブを実行します。

    1. ジョブ ページで [ 今すぐ実行 ] をクリックします。

    2. ジョブの実行が完了したら、ジョブの実行結果を表示するには、ジョブ ページの [完了した実行 (過去 60 日間)] ボックスの一覧で、[開始時刻] 列の最新の 時刻 エントリをクリックします。 [ 出力 ] ウィンドウには、ノートブックのコードを実行した結果が表示されます。

ステップ 4: クリーンアップ

このステップでは、ワークスペースから前述のリソースを削除します。

  1. terraform plan コマンドを実行して、Terraformが実際に実行する前に、ワークスペースでTerraformが何をするかを確認します。

    terraform plan
    
  2. terraform destroy コマンドを実行して、ワークスペースからクラスター、ノートブック、ジョブを削除します。削除を求めるメッセージが表示されたら、 yes と入力して Enter キーを押します。

    terraform destroy
    

    Terraform は、プロジェクトで指定されたリソースを削除します。