Databricks Asset Bundles のジョブにタスクを追加する
このページでは、 Databricksアセット バンドルでジョブ タスクを定義する方法についての情報を提供します。 ジョブタスクの詳細については、 「 Lakeflowジョブでタスクを構成および編集する」を参照してください。
ジョブ git_source フィールドとタスク source フィールドを GIT に設定することは、ローカルの相対パスが Git リポジトリ内の同じコンテンツを指していない可能性があるため、バンドルには推奨されません。バンドルは、デプロイされたジョブがデプロイされた場所のローカル コピーと同じファイルを持つことを想定しています。
代わりに、リポジトリをローカルにクローンし、このリポジトリ内にバンドルプロジェクトを設定して、タスクのソースがワークスペースになるようにします。
タスクを構成する
ジョブ定義のtasksキーでバンドル内のジョブのタスクを定義します。利用可能なタスク タイプのタスク構成の例は、タスク設定セクションにあります。 バンドル内のジョブの定義については、 「ジョブ」を参照してください。
Databricks CLI を使用して既存のジョブのリソース構成をすばやく生成するには、bundle generate job コマンドを使用できます。バンドル・コマンドを参照してください。
タスク値を設定するには、ほとんどのジョブ タスク タイプにタスク固有の確保がありますが、タスクに渡されるジョブ課題を定義することもできます。 動的値参照はジョブ実行に対してサポートされており、タスク間でジョブ実行に固有の値を渡すことができます。 タスク タイプごとにタスク値を渡す方法の詳細については、 「タスク タイプごとの詳細」を参照してください。
一般的なジョブ タスク設定をターゲット ワークスペースの設定で上書きすることもできます。「ターゲット設定で上書きする」を参照してください。
次の構成例では、2 つのノートブック タスクを含むジョブを定義し、最初のタスクから 2 番目のタスクにタスク値を渡します。
resources:
jobs:
pass_task_values_job:
name: pass_task_values_job
tasks:
# Output task
- task_key: output_value
notebook_task:
notebook_path: ../src/output_notebook.ipynb
# Input task
- task_key: input_value
depends_on:
- task_key: output_value
notebook_task:
notebook_path: ../src/input_notebook.ipynb
base_parameters:
received_message: '{{tasks.output_value.values.message}}'
output_notebook.ipynbには、 messageキーのタスク値を設定する次のコードが含まれています。
# Databricks notebook source
# This first task sets a simple output value.
message = "Hello from the first task"
# Set the message to be used by other tasks
dbutils.jobs.taskValues.set(key="message", value=message)
print(f"Produced message: {message}")
input_notebook.ipynbは、タスクの構成で設定された パラメーターreceived_messageの値を取得します。
# This notebook receives the message as a parameter.
dbutils.widgets.text("received_message", "")
received_message = dbutils.widgets.get("received_message")
print(f"Received message: {received_message}")
タスク設定
このセクションには、各ジョブ タスク タイプの設定と例が含まれています。
条件タスク
condition_task使用すると、if/else 条件ロジックを含むタスクをジョブに追加できます。タスクは、他のタスクの実行を制御するために使用できる条件を評価します。 条件タスクはクラスターの実行を必要とせず、再試行や通知をサポートしません。if/else 条件タスクの詳細については、 「If/else タスクを使用してジョブに分岐ロジックを追加する」を参照してください。
条件タスクでは次のキーを使用できます。対応する REST API オブジェクト定義については、 condition_task を参照してください。
Key | Type | 説明 |
|---|---|---|
| String | 必須。条件の左オペランド。文字列値、ジョブ状態、または |
| String | 必須。比較に使用する演算子。有効な値は、 |
| String | 必須。条件の右オペランド。文字列値、ジョブ状態、または動的な値参照にすることができます。 |
例
次の例には、条件タスクとノートブックタスクが含まれており、ノートブックタスクはジョブの修復回数が 5 未満の場合にのみ実行されます。
resources:
jobs:
my-job:
name: my-job
tasks:
- task_key: condition_task
condition_task:
op: LESS_THAN
left: '{{job.repair_count}}'
right: '5'
- task_key: notebook_task
depends_on:
- task_key: condition_task
outcome: 'true'
notebook_task:
notebook_path: ../src/notebook.ipynb
ダッシュボード タスク
このタスクを使用して、ダッシュボードを更新し、スナップショットをサブスクライバーに送信します。バンドル内のダッシュボードの詳細については、 「ダッシュボード」を参照してください。
ダッシュボード タスクでは次のキーを使用できます。対応する REST API オブジェクト定義については、 dashboard_task を参照してください。
Key | Type | 説明 |
|---|---|---|
| String | 必須。更新するダッシュボードの識別子。ダッシュボードがすでに存在している必要があります。 |
| マップ | ダッシュボードのスナップショットを送信するためのサブスクリプション構成。各サブスクリプション オブジェクトでは、ダッシュボードの更新が完了した後にスナップショットを送信する場所の宛先設定を指定できます。サブスクリプションを参照してください。 |
| String | スケジュールに対してダッシュボードを実行するためのウェアハウス ID。 指定しない場合は、ダッシュボードのデフォルトのウェアハウスが使用されます。 |
例
次の例では、ダッシュボード タスクをジョブに追加します。ジョブが実行されると、指定した ID のダッシュボードが更新されます。
resources:
jobs:
my-dashboard-job:
name: my-dashboard-job
tasks:
- task_key: my-dashboard-task
dashboard_task:
dashboard_id: 11111111-1111-1111-1111-111111111111
dbt タスク
このタスクを使用して、1 つ以上の dbt コマンドを実行します。dbtの詳細については、 dbt Cloudへの接続」を参照してください。
dbt タスクでは次のキーが使用できます。対応する REST API オブジェクト定義については、 dbt_task を参照してください。
Key | Type | 説明 |
|---|---|---|
| String | 使用するカタログの名前。カタログ値は、 |
| Sequence | 必須。順番に実行する dbt コマンドのリスト。各コマンドは完全な dbt コマンドである必要があります (例: |
| String | dbt profiles.yml ファイルが含まれているディレクトリへのパス。 |
| String | dbt プロジェクトを含むディレクトリへのパス。指定されていない場合は、リポジトリまたはワークスペース ディレクトリのルートがデフォルトになります。Databricks ワークスペースに保存されているプロジェクトの場合、パスは絶対パスで、スラッシュで始まる必要があります。リモート リポジトリ内のプロジェクトの場合、パスは相対パスである必要があります。 |
| String | 書き込むスキーマ。このパラメーターは、 |
| String | dbt プロジェクトの場所の種類。有効な値は |
| String | dbtコマンドの実行に使用するSQLウェアハウスのID。 指定しない場合は、デフォルトのウェアハウスが使用されます。 |
例
次の例では、ジョブに dbt タスクを追加します。 この dbt タスクは、指定された SQLウェアハウスを使用して、指定された dbt コマンドを実行します。
SQLウェアハウスの ID を取得するには、SQLウェアハウスの設定ページを開き、[ 概要 ] タブの [名前 ] フィールドでウェアハウスの名前の後に括弧で囲まれた ID をコピーします。
Databricks Asset Bundles には、dbtタスクを持つジョブを定義する dbt-sql プロジェクトテンプレートと、デプロイされたdbtジョブのdbtプロファイルも含まれています。Databricks Asset Bundles テンプレートに関する情報については、「デフォルト バンドル テンプレート」を参照してください。
resources:
jobs:
my-dbt-job:
name: my-dbt-job
tasks:
- task_key: my-dbt-task
dbt_task:
commands:
- 'dbt deps'
- 'dbt seed'
- 'dbt run'
project_directory: /Users/someone@example.com/Testing
warehouse_id: 1a111111a1111aa1
libraries:
- pypi:
package: 'dbt-databricks>=1.0.0,<2.0.0'
各タスクについて
この for_each_task を使用すると、for each ループを持つタスクをジョブに追加できます。 タスクは、指定されたすべての入力に対してネストされたタスクを実行します。 for_each_taskの詳細については、「For each タスクを使用してループ内の別のタスクを実行する」を参照してください。
for_each_taskには次のキーが使用できます。対応する REST API オブジェクト定義については、 for_each_task を参照してください。
Key | Type | 説明 |
|---|---|---|
| Integer | 同時に実行できるタスク反復の最大数。指定しない場合は、クラスターとワークスペースの制限に従って、すべての反復が並列で実行される可能性があります。 |
| String | 必須。ループの入力データ。これは、 JSON文字列または配列への参照である可能性があります。 配列内の各要素は、ネストされたタスクの 1 回の反復に渡されます。 |
| マップ | 必須。各入力に対して実行するネストされたタスク定義。このオブジェクトには、 |
例
次の例では、ジョブに for_each_task を追加し、ジョブは別のタスクの値をループして処理します。
resources:
jobs:
my_job:
name: my_job
tasks:
- task_key: generate_countries_list
notebook_task:
notebook_path: ../src/generate_countries_list.ipnyb
- task_key: process_countries
depends_on:
- task_key: generate_countries_list
for_each_task:
inputs: '{{tasks.generate_countries_list.values.countries}}'
task:
task_key: process_countries_iteration
notebook_task:
notebook_path: ../src/process_countries_notebook.ipnyb
JARタスク
このタスクを使用して JAR を実行します。ローカルのJARライブラリ、またはワークスペース、 Unity Catalogボリューム、または外部のクラウド ストレージの場所にある JAR ライブラリを参照できます。 JAR ファイル (Java または Scala)を参照してください。
標準アクセス モードで Unity カタログ対応クラスターにScala JARファイルをコンパイルしてデプロイする方法の詳細については、チュートリアル: サーバーレス コンピュートでのScalaコードの実行を参照してください。
JAR タスクでは次のキーを使用できます。対応する REST API オブジェクト定義については、 jar_task を参照してください。
Key | Type | 説明 |
|---|---|---|
| String | 非推奨。実行する JAR の URI。DBFS およびクラウド ストレージ パスがサポートされています。このフィールドは非推奨なので使用しないでください。代わりに、 |
| String | 必須。実行されるメイン メソッドを含むクラスの完全な名前。このクラスは、ライブラリとして提供される JAR に含まれている必要があります。コードでは、Spark コンテキストを取得するために |
| Sequence | メイン メソッドに渡されるパラメーター。タスクのパラメーター変数を使用して、ジョブ実行に関する情報を含む変数を設定します。 |
例
次の例では、ジョブに JAR タスクを追加します。JAR のパスはボリュームの場所です。
resources:
jobs:
my-jar-job:
name: my-jar-job
tasks:
- task_key: my-jar-task
spark_jar_task:
main_class_name: org.example.com.Main
libraries:
- jar: /Volumes/main/default/my-volume/my-project-0.1.0-SNAPSHOT.jar
ノートブック タスク
このタスクを使用して、ノートブックを実行します。ジョブのノートブック・タスクを参照してください。
ノートブック タスクでは次のキーを使用できます。対応する REST API オブジェクト定義については、 notebook_task を参照してください。
Key | Type | 説明 |
|---|---|---|
| マップ | このジョブの各実行に使用するベースの案。
|
| String | 必須。Databricks ワークスペースまたはリモート リポジトリ内のノートブックのパス (例 |
| String | ノートブックの場所の種類。有効な値は |
| String | ノートブックを実行するウェアハウスの ID。従来のSQLウェアハウスはサポートされていません。 代わりにサーバーレスまたはプロSQLウェアハウスを使用してください。 SQLウェアハウスはSQLセルのみをサポートしていることに注意してください。 ノートブックに SQL 以外のセルが含まれている場合、実行は失敗するため、セル内でPython (またはその他) を使用する必要がある場合は、サーバレスを使用してください。 |
例
次の例では、ノートブック タスクをジョブに追加し、 my_job_run_idという名前のジョブ パラメーターを設定します。 デプロイするノートブックのパスは、このタスクが宣言されている構成ファイルを基準にしています。 タスクは、Databricks ワークスペース内のデプロイされた場所からノートブックを取得します。
resources:
jobs:
my-notebook-job:
name: my-notebook-job
tasks:
- task_key: my-notebook-task
notebook_task:
notebook_path: ./my-notebook.ipynb
parameters:
- name: my_job_run_id
default: '{{job.run_id}}'
パイプライン タスク
このタスクを使用してパイプラインを実行します。Lakeflow Spark宣言型パイプラインを参照してください。
パイプライン タスクでは次のキーを使用できます。対応する REST API オブジェクト定義については、 pipeline_task を参照してください。
Key | Type | 説明 |
|---|---|---|
| Boolean | true の場合、パイプラインの完全な更新がトリガーされ、パイプライン内のすべてのデータセットが完全に再計算されます。false または省略された場合は、増分データのみが処理されます。詳細については、 「パイプライン更新セマンティクス」を参照してください。 |
| String | 必須。実行するパイプラインの ID。パイプラインがすでに存在している必要があります。 |
例
次の例では、パイプライン タスクをジョブに追加します。このタスクは、指定されたパイプラインを実行します。
パイプラインの ID を取得するには、ワークスペースでパイプラインを開き、パイプラインの設定ページの [パイプラインの詳細 ] タブで [パイプライン ID ] の値をコピーします。
resources:
jobs:
my-pipeline-job:
name: my-pipeline-job
tasks:
- task_key: my-pipeline-task
pipeline_task:
pipeline_id: 11111111-1111-1111-1111-111111111111
Power BIタスク
プレビュー
Power BI タスク タイプはパブリック プレビュー段階です。
このタスクを使用して、Power BI セマンティック モデル (旧称データセット) の更新をトリガーします。
Power BI タスクでは次のキーを使用できます。対応する REST API オブジェクト定義については、 power_bi_task を参照してください。
Key | Type | 説明 |
|---|---|---|
| String | 必須。DatabricksからPower BIに認証するためのUnity Catalog接続の名前。 |
| String | 必須。更新する Power BI セマンティック モデル (データセット) の名前。 |
| Boolean | 更新が完了した後に Power BI セマンティック モデルを更新するかどうか。デフォルトは false です。 |
| Sequence | Power BI にエクスポートするテーブル(それぞれマップとして)のリスト。テーブルを参照してください。 |
| String | Power BIデータソースとして使用するSQLウェアハウスの ID。 |
例
次の例では、接続、更新する Power BI モデル、エクスポートする Databricks テーブルを指定する Power BI タスクを定義します。
resources:
jobs:
my_job:
name: my_job
tasks:
- task_key: power_bi_task
power_bi_task:
connection_resource_name: 'connection_name'
power_bi_model:
workspace_name: 'workspace_name'
model_name: 'model_name'
storage_mode: 'DIRECT_QUERY'
authentication_method: 'OAUTH'
overwrite_existing: false
refresh_after_update: false
tables:
- catalog: 'main'
schema: 'tpch'
name: 'customers'
storage_mode: 'DIRECT_QUERY'
warehouse_id: '1a111111a1111aa1'
Python スクリプト タスク
このタスクを使用して、Python ファイルを実行します。
Python スクリプト タスクでは次のキーを使用できます。対応する REST API オブジェクト定義については、 python_task を参照してください。
Key | Type | 説明 |
|---|---|---|
| Sequence | Pythonファイルに渡す課題。 タスクのパラメーター変数を使用して、ジョブ実行に関する情報を含む変数を設定します。 |
| String | 必須。実行する Python ファイルの URI (例 |
| String | Python ファイルの場所の種類。有効な値は |
例
次の例では、Python スクリプト タスクをジョブに追加します。 デプロイする Python ファイルのパスは、このタスクが宣言されている構成ファイルを基準にしています。 タスクは、Databricks ワークスペース内のデプロイされた場所から Python ファイルを取得します。
resources:
jobs:
my-python-script-job:
name: my-python-script-job
tasks:
- task_key: my-python-script-task
spark_python_task:
python_file: ./my-script.py
Python wheel タスク
このタスクを使用して、 Python wheelを実行します。 Databricksアセット バンドルを使用してPython wheelファイルを作成する」を参照してください。
Python wheelタスクでは次のキーが使用できます。 対応する REST API オブジェクト定義については、 python_wheel_task を参照してください。
Key | Type | 説明 |
|---|---|---|
| String | 必須。実行する名前付きエントリ ポイント: 関数またはクラス。パッケージのメタデータに存在しない場合は、 |
| マップ | Python wheelタスクに渡す名前付きの引数。 キーワード引数 とも呼ばれます。 名前付きは、文字列キーと文字列値を持つキーと値のペアです。 |
| String | 必須。実行する Python パッケージの名前。すべての依存関係を環境にインストールする必要があります。パッケージの依存関係はチェックもインストールもされません。 |
| Sequence | Python wheelタスクに渡す課題。 位置引数 とも呼ばれます。 それぞれの疑問は文字列です。 指定する場合は、 |
例
次の例では、Python wheel タスクをジョブに追加します。 デプロイする Python wheel ファイルのパスは、このタスクが宣言されている構成ファイルを基準にしています。 「Databricks Asset Bundles ライブラリの依存関係」を参照してください。
resources:
jobs:
my-python-wheel-job:
name: my-python-wheel-job
tasks:
- task_key: my-python-wheel-task
python_wheel_task:
entry_point: run
package_name: my_package
libraries:
- whl: ./my_package/dist/my_package-*.whl
ジョブタスクの実行
このタスクを使用して、別のジョブを実行します。
実行ジョブタスクでは次のキーが使用できます。対応する REST API オブジェクト定義については、 run_job_task を参照してください。
Key | Type | 説明 |
|---|---|---|
| Integer | 必須。実行するジョブの ID。ジョブはワークスペース内にすでに存在している必要があります。 |
| マップ | 実行中のジョブに渡すジョブレベルのパラメーター。 これらの課題には、ジョブのタスク内でアクセスできます。 |
| マップ | パイプライン タスクのパラメーター。実行中のジョブにパイプライン タスクが含まれている場合にのみ使用されます。パイプラインの完全な更新をトリガーするには、 |
例
次の例は、最初のジョブを実行する 2 番目のジョブの実行 ジョブ タスクを含んでいます。
この例では、 置換 を使用して、実行するジョブの ID を取得します。UI からジョブの ID を取得するには、ワークスペースでジョブを開き、ジョブの設定ページの [ジョブの詳細 ] タブの [ジョブ ID ] の値から ID をコピーします。
resources:
jobs:
my-first-job:
name: my-first-job
tasks:
- task_key: my-first-job-task
new_cluster:
spark_version: '13.3.x-scala2.12'
node_type_id: 'i3.xlarge'
num_workers: 2
notebook_task:
notebook_path: ./src/test.py
my_second_job:
name: my-second-job
tasks:
- task_key: my-second-job-task
run_job_task:
job_id: ${resources.jobs.my-first-job.id}
SQLタスク
このタスクを使用して、SQL ファイル、クエリ、またはアラートを実行します。
SQL タスクでは次のキーを使用できます。対応する REST API オブジェクト定義については、 sql_task を参照してください。
Key | Type | 説明 |
|---|---|---|
| マップ | SQL アラートを実行するための構成。内容:
|
| マップ | SQL ダッシュボードを更新するための構成。内容:
|
| マップ | SQL ファイルを実行するための構成。内容:
|
| マップ | このタスクの各実行に使用されます。 SQLクエリとファイルは、構文 |
| マップ | SQL クエリを実行するための構成。内容:
|
| String | 必須。SQLタスクの実行に使用するSQLウェアハウスのID。 SQLウェアハウスはすでに存在している必要があります。 |
例
SQLウェアハウスの ID を取得するには、SQLウェアハウスの設定ページを開き、[ 概要 ] タブの [名前 ] フィールドでウェアハウスの名前の後に括弧で囲まれた ID をコピーします。
次の例では、SQL ファイル・タスクをジョブに追加します。 この SQL ファイル タスクは、指定された SQLウェアハウスを使用して、指定された SQL ファイルを実行します。
resources:
jobs:
my-sql-file-job:
name: my-sql-file-job
tasks:
- task_key: my-sql-file-task
sql_task:
file:
path: /Users/someone@example.com/hello-world.sql
source: WORKSPACE
warehouse_id: 1a111111a1111aa1
次の例では、ジョブに SQL アラート タスクを追加します。このSQLアラート タスクは、指定されたSQLウェアハウスを使用して、指定されたSQLアラートを更新します。
resources:
jobs:
my-sql-file-job:
name: my-sql-alert-job
tasks:
- task_key: my-sql-alert-task
sql_task:
warehouse_id: 1a111111a1111aa1
alert:
alert_id: 11111111-1111-1111-1111-111111111111
次の例では、ジョブに SQL クエリ タスクを追加します。このSQLクエリタスクは、指定されたSQLウェアハウスを使用して、指定されたSQLクエリを実行します。
resources:
jobs:
my-sql-query-job:
name: my-sql-query-job
tasks:
- task_key: my-sql-query-task
sql_task:
warehouse_id: 1a111111a1111aa1
query:
query_id: 11111111-1111-1111-1111-111111111111
その他のタスク設定
次のタスク設定を使用すると、すべてのタスクの動作を構成できます。 対応する REST API オブジェクト定義については、タスクを参照してください。
Key | Type | 説明 |
|---|---|---|
| String | このタスクに使用するコンピュート リソースのキー。 指定する場合、 |
| Sequence | タスク依存関係のオプションのリスト。各アイテムには次のものが含まれています:
|
| String | タスクのオプションの説明。 |
| Boolean | このタスクの自動最適化を無効にするかどうか。true の場合、適応クエリ実行などの自動最適化は無効になります。 |
| マップ | 実行が開始、完了、または失敗したときに通知する電子メール アドレスのオプションのセット。 各アイテムには次のものが含まれています:
|
| String | ジョブの |
| String | このタスクのすべての実行に使用される既存のクラスターの ID。 |
| マップ | このタスクのヘルス モニタリングのオプションの仕様。評価するヘルス ルールのリストである |
| String | ジョブの |
| Sequence | タスクを実行するクラスターにインストールされるライブラリのオプションのリスト。各ライブラリは、 |
| Integer | タスクが失敗した場合に再試行する最大回数(オプション)。指定しない場合、タスクは再試行されません。 |
| Integer | 失敗した実行の開始と後続の再試行実行の間のオプションの最小間隔(ミリ秒単位)。指定しない場合、デフォルトは 0 (即時再試行) です。 |
| マップ | このタスクの実行ごとに作成される新しいクラスターの仕様。クラスターを参照してください。 |
| マップ | このタスクのオプションの通知設定。各アイテムには次のものが含まれています:
|
| Boolean | タイムアウト時にタスクを再試行するかどうかを指定するオプションのポリシー。指定されていない場合はデフォルトで false になります。 |
| String | タスクを実行する条件を示すオプションの値。有効な値は次のとおりです。
|
| String | 必須。タスクの一意の名前。このフィールドは、 |
| Integer | このタスクの実行ごとに適用されるオプションのタイムアウト。値が 0 の場合、タイムアウトはありません。設定されていない場合は、クラスター構成のデフォルトのタイムアウトが使用されます。 |
| マップ | 実行が開始、完了、または失敗したときに通知するシステム宛先のオプション セット。各アイテムには次のものが含まれています:
|