Databricks Asset Bundlesのジョブにタスクを追加する
この記事では、 Databricks アセット バンドルの Databricks ジョブに追加できるさまざまな種類のタスクの例を示します。 「Databricks アセット バンドルとは何ですか?」を参照してください。 。
ほとんどのジョブ タスクの種類には、サポートされている設定の中にタスク固有のパラメーターがありますが、タスクに渡される ジョブ パラメーター を定義することもできます。 動的値参照はジョブ・パラメーターでサポートされており、タスク間でジョブ実行に固有の値を渡すことができます。 動的値参照とはを参照してください。
注:
ジョブタスク設定を上書きできます。 「Databricks アセット バンドルのジョブ タスク設定の上書き」を参照してください。
ヒント
Databricks CLIを使用して既存のジョブのリソース構成をすばやく生成するには、 bundle generate job
コマンドを使用できます。 バンドル コマンドを参照してください。
ノートブック タスク
このタスクを使用して、ノートブックを実行します。
次の例では、ノートブック タスクをジョブに追加し、 my_job_run_id
という名前のジョブ パラメーターを設定します。 デプロイするノートブックのパスは、このタスクが宣言されている構成ファイルを基準にしています。 タスクは、Databricks ワークスペース内のデプロイされた場所からノートブックを取得します。
resources:
jobs:
my-notebook-job:
name: my-notebook-job
tasks:
- task_key: my-notebook-task
notebook_task:
notebook_path: ./my-notebook.ipynb
parameters:
- name: my_job_run_id
default: "{{job.run_id}}"
このタスクに設定できるその他のマッピングについては、YAML 形式で表される リファレンスのtasks > notebook_task
POST /api/2.1/job/create で定義されている、ジョブ作成操作の要求ペイロードの を参照してください。RESTAPIジョブのノートブック・タスクを参照してください。
If/else 条件タスク
この condition_task
を使用すると、if/else 条件付きロジックを持つタスクをジョブに追加できます。 タスクは、他のタスクの実行を制御するために使用できる条件を評価します。 条件タスクは、実行するためにクラスタリングを必要とせず、再試行や通知もサポートしていません。 if/else タスクの詳細については、「 If/else タスクを使用してジョブに分岐ロジックを追加する」を参照してください。
次の例には、条件タスクとノートブックタスクが含まれており、ノートブックタスクはジョブの修復回数が 5 未満の場合にのみ実行されます。
resources:
jobs:
my-job:
name: my-job
tasks:
- task_key: condition_task
condition_task:
op: LESS_THAN
left: "{{job.repair_count}}"
right: "5"
- task_key: notebook_task
depends_on:
- task_key: condition_task
outcome: "true"
notebook_task:
notebook_path: ../src/notebook.ipynb
このタスクに設定できる追加のマッピングについては、REST API リファレンスのPOST /api/2.1/jobs/createで定義されているジョブ作成オペレーションのリクエスト ペイロードのtasks > condition_task
を参照してください (YAML 形式で表現されています)。
各タスクについて
この for_each_task
を使用すると、for each ループを持つタスクをジョブに追加できます。 タスクは、指定されたすべての入力に対してネストされたタスクを実行します。 for_each_task
の詳細については、「 ループ内でのパラメーター化された Databricks ジョブ タスクの実行」を参照してください。
次の例では、ジョブに for_each_task
を追加し、ジョブは別のタスクの値をループして処理します。
resources:
jobs:
my_job:
name: my_job
tasks:
- task_key: generate_countries_list
notebook_task:
notebook_path: ../src/generate_countries_list.ipnyb
- task_key: process_countries
depends_on:
- task_key: generate_countries_list
for_each_task:
inputs: "{{tasks.generate_countries_list.values.countries}}"
task:
task_key: process_countries_iteration
notebook_task:
notebook_path: ../src/process_countries_notebook.ipnyb
このタスクに設定できる追加のマッピングについては、REST API リファレンスのPOST /api/2.1/jobs/createで定義されているジョブ作成オペレーションのリクエスト ペイロードのtasks > for_each_task
を参照してください (YAML 形式で表現されています)。
Python スクリプト タスク
このタスクを使用して、Python ファイルを実行します。
次の例では、Python スクリプト タスクをジョブに追加します。 デプロイする Python ファイルのパスは、このタスクが宣言されている構成ファイルを基準にしています。 タスクは、Databricks ワークスペース内のデプロイされた場所から Python ファイルを取得します。
resources:
jobs:
my-python-script-job:
name: my-python-script-job
tasks:
- task_key: my-python-script-task
spark_python_task:
python_file: ./my-script.py
このタスクに設定できるその他のマッピングについては、YAML 形式で表される リファレンスのtasks > spark_python_task
POST /api/2.1/job/create で定義されている、ジョブ作成操作の要求ペイロードの を参照してください。RESTAPIジョブの Python スクリプト タスクも参照してください。
Python wheel タスク
このタスクを使用して、 Python wheelファイルを実行します。
次の例では、Python wheel タスクをジョブに追加します。 デプロイする Python wheel ファイルのパスは、このタスクが宣言されている構成ファイルを基準にしています。 「Databricks Asset Bundles ライブラリの依存関係」を参照してください。
resources:
jobs:
my-python-wheel-job:
name: my-python-wheel-job
tasks:
- task_key: my-python-wheel-task
python_wheel_task:
entry_point: run
package_name: my_package
libraries:
- whl: ./my_package/dist/my_package-*.whl
このタスクに設定できるその他のマッピングについては、YAML 形式で表される リファレンスのtasks > python_wheel_task
POST /api/2.1/job/create で定義されている、ジョブ作成操作の要求ペイロードの を参照してください。RESTAPIPython wheelDatabricks「Python wheel アセットバンドルを使用した ファイルの開発 」および 「ジョブのタスク 」も参照してください。
JAR タスク
このタスクを使用して JAR を実行します。 ローカルの JAR ライブラリを参照することも、ワークスペース、Unity Catalog ボリューム、または外部クラウド ストレージの場所にあるライブラリを参照することもできます。 「Databricks アセット バンドル ライブラリの依存関係」を参照してください。
次の例では、ジョブに JAR タスクを追加します。 JAR のパスは、指定されたボリューム・ロケーションです。
resources:
jobs:
my-jar-job:
name: my-jar-job
tasks:
- task_key: my-jar-task
spark_jar_task:
main_class_name: org.example.com.Main
libraries:
- jar: /Volumes/main/default/my-volume/my-project-0.1.0-SNAPSHOT.jar
このタスクに設定できるその他のマッピングについては、YAML 形式で表される リファレンスのtasks > spark_jar_task
POST /api/2.1/job/create で定義されている、ジョブ作成操作の要求ペイロードの を参照してください。RESTAPIジョブの JAR タスクを参照してください。
SQLファイルタスク
このタスクを使用して、ワークスペースまたはリモート Git リポジトリにある SQL ファイルを実行します。
次の例では、SQL ファイル・タスクをジョブに追加します。 この SQL ファイル タスクは、指定された SQLウェアハウスを使用して、指定された SQL ファイルを実行します。
resources:
jobs:
my-sql-file-job:
name: my-sql-file-job
tasks:
- task_key: my-sql-file-task
sql_task:
file:
path: /Users/someone@example.com/hello-world.sql
source: WORKSPACE
warehouse_id: 1a111111a1111aa1
SQLウェアハウスのIDを取得するには、SQLウェアハウスの設定ページを開き、 「概要」タブの「名前」フィールドのウェアハウス名の後ろのかっこ内にあるIDをコピーします。
このタスクに設定できるその他のマッピングについては、YAML 形式で表される リファレンスのtasks > sql_task > file
POST /api/2.1/job/create で定義されている、ジョブ作成操作の要求ペイロードの を参照してください。RESTAPIジョブの SQL タスクを参照してください。
Delta Live Tablesパイプラインタスク
このタスクを使用して、Delta Live Tables パイプラインを実行します。 「Delta Live Tablesとは」を参照してください。
次の例では、Delta Live Tables パイプライン タスクをジョブに追加します。 この Delta Live Tables パイプライン タスクは、指定されたパイプラインを実行します。
resources:
jobs:
my-pipeline-job:
name: my-pipeline-job
tasks:
- task_key: my-pipeline-task
pipeline_task:
pipeline_id: 11111111-1111-1111-1111-111111111111
パイプラインの ID を取得するには、ワークスペースでパイプラインを開き、パイプラインの設定ページのパイプライン詳細タブでパイプライン ID値をコピーします。
このタスクに設定できるその他のマッピングについては、YAML 形式で表される リファレンスのtasks > pipeline_task
POST /api/2.1/job/create で定義されている、ジョブ作成操作の要求ペイロードの を参照してください。RESTAPIジョブの Delta Live Tables パイプライン タスクを参照してください。
dbtタスク
このタスクを使用して、1 つ以上の dbt コマンドを実行します。 dbt Cloudへの接続」を参照してください。
次の例では、ジョブに dbt タスクを追加します。 この dbt タスクは、指定された SQLウェアハウスを使用して、指定された dbt コマンドを実行します。
resources:
jobs:
my-dbt-job:
name: my-dbt-job
tasks:
- task_key: my-dbt-task
dbt_task:
commands:
- "dbt deps"
- "dbt seed"
- "dbt run"
project_directory: /Users/someone@example.com/Testing
warehouse_id: 1a111111a1111aa1
libraries:
- pypi:
package: "dbt-databricks>=1.0.0,<2.0.0"
SQLウェアハウスのIDを取得するには、SQLウェアハウスの設定ページを開き、 「概要」タブの「名前」フィールドのウェアハウス名の後ろのかっこ内にあるIDをコピーします。
このタスクに設定できるその他のマッピングについては、YAML 形式で表される リファレンスのtasks > dbt_task
POST /api/2.1/job/create で定義されている、ジョブ作成操作の要求ペイロードの を参照してください。RESTAPI ジョブのdbtタスクを参照してください。
Databricks Asset Bundles には、dbtタスクを持つジョブを定義する dbt-sql
プロジェクトテンプレートと、デプロイされたdbtジョブのdbtプロファイルも含まれています。アセットバンドルテンプレート Databricks に関する情報については、「 デフォルトバンドルテンプレートの使用」を参照してください。
ジョブタスクの実行
このタスクを使用して、別のジョブを実行します。
次の例には、最初のジョブを実行する 2 番目のジョブに実行ジョブ タスクが含まれています。
resources:
jobs:
my-first-job:
name: my-first-job
tasks:
- task_key: my-first-job-task
new_cluster:
spark_version: "13.3.x-scala2.12"
node_type_id: "i3.xlarge"
num_workers: 2
notebook_task:
notebook_path: ./src/test.py
my_second_job:
name: my-second-job
tasks:
- task_key: my-second-job-task
run_job_task:
job_id: ${resources.jobs.my-first-job.id}
この例では、 置換 を使用して、実行するジョブの ID を取得します。 UI からジョブの ID を取得するには、ワークスペースでジョブを開き、ジョブの設定ページの [ジョブの詳細] タブの [ジョブ ID] の値から ID をコピーします。
このタスクに設定できる追加のマッピングについては、REST API リファレンスのPOST /api/2.1/jobs/createで定義されているジョブ作成オペレーションのリクエスト ペイロードのtasks > run_job_task
を参照してください (YAML 形式で表現されています)。