Databricks Asset Bundlesのジョブにタスクを追加する

この記事では、 Databricks アセット バンドルの Databricks ジョブに追加できるさまざまな種類のタスクの例を示します。 「Databricks アセット バンドルとは何ですか?」を参照してください。 。

ほとんどのジョブ タスクの種類には、サポートされている設定の中にタスク固有のパラメーターがありますが、タスクに渡される ジョブ パラメーター を定義することもできます。 動的値参照はジョブ・パラメーターでサポートされており、タスク間でジョブ実行に固有の値を渡すことができます。 動的値参照とはを参照してください

注:

ジョブタスク設定を上書きできます。 「Databricks アセット バンドルのジョブ タスク設定の上書き」を参照してください。

ヒント

Databricks CLIを使用して既存のジョブのリソース構成をすばやく生成するには、 bundle generate jobコマンドを使用できます。 バンドル コマンドを参照してください。

ノートブック タスク

このタスクを使用して、ノートブックを実行します。

次の例では、ノートブック タスクをジョブに追加し、 my_job_run_idという名前のジョブ パラメーターを設定します。 デプロイするノートブックのパスは、このタスクが宣言されている構成ファイルを基準にしています。 タスクは、Databricks ワークスペース内のデプロイされた場所からノートブックを取得します。

resources:
  jobs:
    my-notebook-job:
      name: my-notebook-job
      tasks:
        - task_key: my-notebook-task
          notebook_task:
            notebook_path: ./my-notebook.ipynb
      parameters:
        - name: my_job_run_id
          default: "{{job.run_id}}"

このタスクに設定できるその他のマッピングについては、YAML 形式で表される リファレンスのtasks > notebook_task POST /api/2.1/job/create で定義されている、ジョブ作成操作の要求ペイロードの を参照してください。RESTAPIジョブのノートブック・タスクを参照してください。

If/else 条件タスク

この condition_task を使用すると、if/else 条件付きロジックを持つタスクをジョブに追加できます。 タスクは、他のタスクの実行を制御するために使用できる条件を評価します。 条件タスクは、実行するためにクラスタリングを必要とせず、再試行や通知もサポートしていません。 if/else タスクの詳細については、「 If/else タスクを使用してジョブに分岐ロジックを追加する」を参照してください。

次の例には、条件タスクとノートブックタスクが含まれており、ノートブックタスクはジョブの修復回数が 5 未満の場合にのみ実行されます。

resources:
  jobs:
    my-job:
      name: my-job
      tasks:
        - task_key: condition_task
          condition_task:
            op: LESS_THAN
            left: "{{job.repair_count}}"
            right: "5"
        - task_key: notebook_task
          depends_on:
            - task_key: condition_task
              outcome: "true"
          notebook_task:
            notebook_path: ../src/notebook.ipynb

このタスクに設定できる追加のマッピングについては、REST API リファレンスのPOST /api/2.1/jobs/createで定義されているジョブ作成オペレーションのリクエスト ペイロードのtasks > condition_taskを参照してください (YAML 形式で表現されています)。

各タスクについて

この for_each_task を使用すると、for each ループを持つタスクをジョブに追加できます。 タスクは、指定されたすべての入力に対してネストされたタスクを実行します。 for_each_taskの詳細については、「 ループ内でのパラメーター化された Databricks ジョブ タスクの実行」を参照してください。

次の例では、ジョブに for_each_task を追加し、ジョブは別のタスクの値をループして処理します。

resources:
  jobs:
    my_job:
      name: my_job
      tasks:
        - task_key: generate_countries_list
          notebook_task:
            notebook_path: ../src/generate_countries_list.ipnyb
        - task_key: process_countries
          depends_on:
            - task_key: generate_countries_list
          for_each_task:
            inputs: "{{tasks.generate_countries_list.values.countries}}"
            task:
              task_key: process_countries_iteration
              notebook_task:
                notebook_path: ../src/process_countries_notebook.ipnyb

このタスクに設定できる追加のマッピングについては、REST API リファレンスのPOST /api/2.1/jobs/createで定義されているジョブ作成オペレーションのリクエスト ペイロードのtasks > for_each_taskを参照してください (YAML 形式で表現されています)。

Python スクリプト タスク

このタスクを使用して、Python ファイルを実行します。

次の例では、Python スクリプト タスクをジョブに追加します。 デプロイする Python ファイルのパスは、このタスクが宣言されている構成ファイルを基準にしています。 タスクは、Databricks ワークスペース内のデプロイされた場所から Python ファイルを取得します。

resources:
  jobs:
    my-python-script-job:
      name: my-python-script-job

      tasks:
        - task_key: my-python-script-task
          spark_python_task:
            python_file: ./my-script.py

このタスクに設定できるその他のマッピングについては、YAML 形式で表される リファレンスのtasks > spark_python_task POST /api/2.1/job/create で定義されている、ジョブ作成操作の要求ペイロードの を参照してください。RESTAPIジョブの Python スクリプト タスクも参照してください。

Python wheel タスク

このタスクを使用して、 Python wheelファイルを実行します。

次の例では、Python wheel タスクをジョブに追加します。 デプロイする Python wheel ファイルのパスは、このタスクが宣言されている構成ファイルを基準にしています。 「Databricks Asset Bundles ライブラリの依存関係」を参照してください。

resources:
  jobs:
    my-python-wheel-job:
      name: my-python-wheel-job
      tasks:
        - task_key: my-python-wheel-task
          python_wheel_task:
            entry_point: run
            package_name: my_package
          libraries:
            - whl: ./my_package/dist/my_package-*.whl

このタスクに設定できるその他のマッピングについては、YAML 形式で表される リファレンスのtasks > python_wheel_task POST /api/2.1/job/create で定義されている、ジョブ作成操作の要求ペイロードの を参照してください。RESTAPIPython wheelDatabricksPython wheel アセットバンドルを使用した ファイルの開発 」および 「ジョブのタスク 」も参照してください。

JAR タスク

このタスクを使用して JAR を実行します。 ローカルの JAR ライブラリを参照することも、ワークスペース、Unity Catalog ボリューム、または外部クラウド ストレージの場所にあるライブラリを参照することもできます。 「Databricks アセット バンドル ライブラリの依存関係」を参照してください。

次の例では、ジョブに JAR タスクを追加します。 JAR のパスは、指定されたボリューム・ロケーションです。

resources:
  jobs:
    my-jar-job:
      name: my-jar-job
      tasks:
        - task_key: my-jar-task
          spark_jar_task:
            main_class_name: org.example.com.Main
          libraries:
            - jar: /Volumes/main/default/my-volume/my-project-0.1.0-SNAPSHOT.jar

このタスクに設定できるその他のマッピングについては、YAML 形式で表される リファレンスのtasks > spark_jar_task POST /api/2.1/job/create で定義されている、ジョブ作成操作の要求ペイロードの を参照してください。RESTAPIジョブの JAR タスクを参照してください。

SQLファイルタスク

このタスクを使用して、ワークスペースまたはリモート Git リポジトリにある SQL ファイルを実行します。

次の例では、SQL ファイル・タスクをジョブに追加します。 この SQL ファイル タスクは、指定された SQLウェアハウスを使用して、指定された SQL ファイルを実行します。

resources:
  jobs:
    my-sql-file-job:
      name: my-sql-file-job
      tasks:
        - task_key: my-sql-file-task
          sql_task:
            file:
              path: /Users/someone@example.com/hello-world.sql
              source: WORKSPACE
            warehouse_id: 1a111111a1111aa1

SQLウェアハウスのIDを取得するには、SQLウェアハウスの設定ページを開き、 「概要」タブの「名前」フィールドのウェアハウス名の後ろのかっこ内にあるIDをコピーします。

このタスクに設定できるその他のマッピングについては、YAML 形式で表される リファレンスのtasks > sql_task > file POST /api/2.1/job/create で定義されている、ジョブ作成操作の要求ペイロードの を参照してください。RESTAPIジョブの SQL タスクを参照してください。

Delta Live Tablesパイプラインタスク

このタスクを使用して、Delta Live Tables パイプラインを実行します。 「Delta Live Tablesとは」を参照してください。

次の例では、Delta Live Tables パイプライン タスクをジョブに追加します。 この Delta Live Tables パイプライン タスクは、指定されたパイプラインを実行します。

resources:
  jobs:
    my-pipeline-job:
      name: my-pipeline-job
      tasks:
        - task_key: my-pipeline-task
          pipeline_task:
            pipeline_id: 11111111-1111-1111-1111-111111111111

パイプラインの ID を取得するには、ワークスペースでパイプラインを開き、パイプラインの設定ページのパイプライン詳細タブでパイプライン ID値をコピーします。

このタスクに設定できるその他のマッピングについては、YAML 形式で表される リファレンスのtasks > pipeline_task POST /api/2.1/job/create で定義されている、ジョブ作成操作の要求ペイロードの を参照してください。RESTAPIジョブの Delta Live Tables パイプライン タスクを参照してください。

dbtタスク

このタスクを使用して、1 つ以上の dbt コマンドを実行します。 dbt Cloudへの接続」を参照してください。

次の例では、ジョブに dbt タスクを追加します。 この dbt タスクは、指定された SQLウェアハウスを使用して、指定された dbt コマンドを実行します。

resources:
  jobs:
    my-dbt-job:
      name: my-dbt-job
      tasks:
        - task_key: my-dbt-task
          dbt_task:
            commands:
              - "dbt deps"
              - "dbt seed"
              - "dbt run"
            project_directory: /Users/someone@example.com/Testing
            warehouse_id: 1a111111a1111aa1
          libraries:
            - pypi:
                package: "dbt-databricks>=1.0.0,<2.0.0"

SQLウェアハウスのIDを取得するには、SQLウェアハウスの設定ページを開き、 「概要」タブの「名前」フィールドのウェアハウス名の後ろのかっこ内にあるIDをコピーします。

このタスクに設定できるその他のマッピングについては、YAML 形式で表される リファレンスのtasks > dbt_task POST /api/2.1/job/create で定義されている、ジョブ作成操作の要求ペイロードの を参照してください。RESTAPI ジョブのdbtタスクを参照してください。

Databricks Asset Bundles には、dbtタスクを持つジョブを定義する dbt-sql プロジェクトテンプレートと、デプロイされたdbtジョブのdbtプロファイルも含まれています。アセットバンドルテンプレート Databricks に関する情報については、「 デフォルトバンドルテンプレートの使用」を参照してください。

ジョブタスクの実行

このタスクを使用して、別のジョブを実行します。

次の例には、最初のジョブを実行する 2 番目のジョブに実行ジョブ タスクが含まれています。

resources:
  jobs:
    my-first-job:
      name: my-first-job
      tasks:
        - task_key: my-first-job-task
          new_cluster:
            spark_version: "13.3.x-scala2.12"
            node_type_id: "i3.xlarge"
            num_workers: 2
          notebook_task:
            notebook_path: ./src/test.py
    my_second_job:
      name: my-second-job
      tasks:
        - task_key: my-second-job-task
          run_job_task:
            job_id: ${resources.jobs.my-first-job.id}

この例では、 置換 を使用して、実行するジョブの ID を取得します。 UI からジョブの ID を取得するには、ワークスペースでジョブを開き、ジョブの設定ページの [ジョブの詳細] タブの [ジョブ ID] の値から ID をコピーします。

このタスクに設定できる追加のマッピングについては、REST API リファレンスのPOST /api/2.1/jobs/createで定義されているジョブ作成オペレーションのリクエスト ペイロードのtasks > run_job_taskを参照してください (YAML 形式で表現されています)。