ジョブ API 2.0 から 2.1への更新

Databricks ジョブを使用して複数のタスクをオーケストレーションできるようになりました。 この記事では、複数のタスクを持つジョブをサポートする Jobs API の変更点について詳しく説明し、この新機能で動作するように既存の API クライアントを更新するためのガイダンスを提供します。

Databricks では、特に複数のタスクを持つジョブを使用する場合に、API スクリプトとクライアントに Jobs API 2.1 をお勧めします。

この記事では、1 つのタスクで定義されたジョブを single-task 形式 と呼び、複数のタスクで定義されたジョブを multi-task 形式と呼んでいます。

Jobs API 2.0 と 2.1 で 更新 リクエストがサポートされるようになりました。 リセット要求の代わりに update 要求を使用して既存のジョブを変更し、single-task-format ジョブと multi-task-format ジョブ間の変更を最小限に抑えます。

API の変更

Jobs API では、ジョブ内の各タスクの設定をキャプチャするための TaskSettings オブジェクトが定義されるようになりました。 マルチタスク形式のジョブの場合、TaskSettingsデータ構造の配列である tasks フィールドが JobSettings オブジェクトに含まれます。以前は JobSettings の一部であった一部のフィールドは、マルチタスク形式のジョブのタスク設定の一部になりました。 JobSettings も更新され、 format フィールドが含まれるようになりました。 format フィールドはジョブの形式を示し、SINGLE_TASK または MULTI_TASKに設定されたSTRING値です。

マルチタスク形式のジョブの JobSettings に対するこれらの変更については、既存の API クライアントを更新する必要があります。 必要な変更の詳細については、 API クライアント ガイド を参照してください。

Jobs API 2.1 はマルチタスク形式をサポートしています。 すべての API 2.1 要求はマルチタスク形式に準拠している必要があり、応答はマルチタスク形式で構造化されます。 API 2.1 の新機能が最初にリリースされます。

Jobs API 2.0 が更新され、マルチタスク形式のジョブをサポートするためのフィールドが追加されました。 特に記載のない限り、このドキュメントの例では API 2.0 を使用しています。 ただし、Databricks では、新規および既存の API スクリプトとクライアントに対して API 2.1 を推奨しています。

API 2.0 および 2.1 のマルチタスク形式ジョブを表す JSON ドキュメントの例:

{
  "job_id": 53,
  "settings": {
    "name": "A job with multiple tasks",
    "email_notifications": {},
    "timeout_seconds": 0,
    "max_concurrent_runs": 1,
    "tasks": [
      {
        "task_key": "clean_data",
        "description": "Clean and prepare the data",
        "notebook_task": {
          "notebook_path": "/Users/user@databricks.com/clean-data"
        },
        "existing_cluster_id": "1201-my-cluster",
        "max_retries": 3,
        "min_retry_interval_millis": 0,
        "retry_on_timeout": true,
        "timeout_seconds": 3600,
        "email_notifications": {}
      },
      {
        "task_key": "analyze_data",
        "description": "Perform an analysis of the data",
        "notebook_task": {
          "notebook_path": "/Users/user@databricks.com/analyze-data"
        },
        "depends_on": [
          {
            "task_key": "clean_data"
          }
        ],
        "existing_cluster_id": "1201-my-cluster",
        "max_retries": 3,
        "min_retry_interval_millis": 0,
        "retry_on_timeout": true,
        "timeout_seconds": 3600,
        "email_notifications": {}
      }
    ],
    "format": "MULTI_TASK"
  },
  "created_time": 1625841911296,
  "creator_user_name": "user@databricks.com",
  "run_as_user_name": "user@databricks.com"
}

Jobs API 2.1 は、タスク レベルのクラスターまたは 1 つ以上の共有ジョブ クラスターの構成をサポートします。

  • タスク・レベル・クラスターは、タスクの開始時に作成および開始され、タスクの完了時に終了します。

  • 共有ジョブクラスターでは、同じジョブ内の複数のタスクがクラスターを使用できます。 クラスターは、クラスターを使用する最初のタスクが開始されたときと、クラスターを使用した最後のタスクが完了した後に終了したときに作成および開始されます。 共有ジョブ・クラスターは、アイドル状態では終了せず、それを使用するすべてのタスクが完了した後にのみ終了します。 クラスターを共有する複数の非依存タスクを同時に開始できます。 共有ジョブ・クラスターが失敗するか、すべてのタスクが完了する前に終了した場合は、新しいクラスターが作成されます。

共有ジョブ・クラスターを構成するには、JobSettings オブジェクトに JobCluster 配列を含めます。ジョブごとに最大 100 個のクラスターを指定できます。 次に、2 つの共有クラスターで設定されたジョブの API 2.1 レスポンスの例を示します。

注:

タスクにライブラリの依存関係がある場合は、 task フィールド設定でライブラリを構成する必要があります。ライブラリは、共有ジョブ クラスター構成では構成できません。 次の例では、ingest_orders タスクの構成の libraries フィールドが、ライブラリ依存関係の指定を示しています。

{
  "job_id": 53,
  "settings": {
    "name": "A job with multiple tasks",
    "email_notifications": {},
    "timeout_seconds": 0,
    "max_concurrent_runs": 1,
    "job_clusters": [
      {
        "job_cluster_key": "default_cluster",
        "new_cluster": {
          "spark_version": "7.3.x-scala2.12",
          "node_type_id": "i3.xlarge",
          "spark_conf": {
            "spark.speculation": true
          },
          "aws_attributes": {
            "availability": "SPOT",
            "zone_id": "us-west-2a"
          },
          "autoscale": {
            "min_workers": 2,
            "max_workers": 8
          }
        }
      },
      {
        "job_cluster_key": "data_processing_cluster",
        "new_cluster": {
          "spark_version": "7.3.x-scala2.12",
          "node_type_id": "r4.2xlarge",
          "spark_conf": {
            "spark.speculation": true
          },
          "aws_attributes": {
            "availability": "SPOT",
            "zone_id": "us-west-2a"
          },
          "autoscale": {
            "min_workers": 8,
            "max_workers": 16
          }
        }
      }
    ],
    "tasks": [
      {
        "task_key": "ingest_orders",
        "description": "Ingest order data",
        "depends_on": [ ],
        "job_cluster_key": "auto_scaling_cluster",
        "spark_jar_task": {
          "main_class_name": "com.databricks.OrdersIngest",
          "parameters": [
            "--data",
            "dbfs:/path/to/order-data.json"
          ]
        },
        "libraries": [
          {
            "jar": "dbfs:/mnt/databricks/OrderIngest.jar"
          }
        ],
        "timeout_seconds": 86400,
        "max_retries": 3,
        "min_retry_interval_millis": 2000,
        "retry_on_timeout": false
      },
      {
        "task_key": "clean_orders",
        "description": "Clean and prepare the order data",
        "notebook_task": {
          "notebook_path": "/Users/user@databricks.com/clean-data"
        },
        "job_cluster_key": "default_cluster",
        "max_retries": 3,
        "min_retry_interval_millis": 0,
        "retry_on_timeout": true,
        "timeout_seconds": 3600,
        "email_notifications": {}
      },
      {
        "task_key": "analyze_orders",
        "description": "Perform an analysis of the order data",
        "notebook_task": {
          "notebook_path": "/Users/user@databricks.com/analyze-data"
        },
        "depends_on": [
          {
            "task_key": "clean_data"
          }
        ],
        "job_cluster_key": "data_processing_cluster",
        "max_retries": 3,
        "min_retry_interval_millis": 0,
        "retry_on_timeout": true,
        "timeout_seconds": 3600,
        "email_notifications": {}
      }
    ],
    "format": "MULTI_TASK"
  },
  "created_time": 1625841911296,
  "creator_user_name": "user@databricks.com",
  "run_as_user_name": "user@databricks.com"
}

単一タスク・フォーマット・ジョブの場合、 JobSettings データ構造は、 format フィールドの追加を除いて変更されません。 TaskSettings配列は含まれず、タスク設定はJobSettingsデータ構造の最上位で定義されたままになります。シングルタスク形式のジョブを処理するために、既存の API クライアントを変更する必要はありません。

API 2.0 のシングルタスク形式のジョブを表す JSON ドキュメントの例:

{
  "job_id": 27,
  "settings": {
    "name": "Example notebook",
    "existing_cluster_id": "1201-my-cluster",
    "libraries": [
      {
        "jar": "dbfs:/FileStore/jars/spark_examples.jar"
      }
    ],
    "email_notifications": {},
    "timeout_seconds": 0,
    "schedule": {
      "quartz_cron_expression": "0 0 0 * * ?",
      "timezone_id": "US/Pacific",
      "pause_status": "UNPAUSED"
    },
    "notebook_task": {
      "notebook_path": "/notebooks/example-notebook",
      "revision_timestamp": 0
    },
    "max_concurrent_runs": 1,
    "format": "SINGLE_TASK"
  },
  "created_time": 1504128821443,
  "creator_user_name": "user@databricks.com"
}

API クライアントガイド

このセクションでは、新しいマルチタスク形式機能の影響を受ける API 呼び出しのガイドライン、例、および必要な変更について説明します。

創造する

ジョブ API の Create a new job 操作 (POST /jobs/create) を使用して単一タスク形式のジョブを作成する場合、既存のクライアントを変更する必要はありません。

To create a multi-task format job, use the tasks field in JobSettings to specify settings for each task. The following example creates a job with two notebook tasks. This example is for API 2.0 and 2.1:

注:

ジョブごとに最大 100 個のタスクを指定できます。

{
  "name": "Multi-task-job",
  "max_concurrent_runs": 1,
  "tasks": [
    {
      "task_key": "clean_data",
      "description": "Clean and prepare the data",
      "notebook_task": {
        "notebook_path": "/Users/user@databricks.com/clean-data"
      },
      "existing_cluster_id": "1201-my-cluster",
      "timeout_seconds": 3600,
      "max_retries": 3,
      "retry_on_timeout": true
    },
    {
      "task_key": "analyze_data",
      "description": "Perform an analysis of the data",
      "notebook_task": {
        "notebook_path": "/Users/user@databricks.com/analyze-data"
      },
      "depends_on": [
        {
          "task_key": "clean_data"
        }
      ],
      "existing_cluster_id": "1201-my-cluster",
      "timeout_seconds": 3600,
      "max_retries": 3,
      "retry_on_timeout": true
    }
  ]
}

実行 submit

ジョブ API の 1 回限りの実行 の作成 とトリガーの操作 (POST /runs/submit) を使用して、単一タスク形式のジョブの 1 回限りの実行を送信する場合、既存のクライアントを変更する必要はありません。

マルチタスク形式ジョブの 1 回限りの実行を送信するには、JobSettingstasks フィールドを使用して、クラスターを含む各タスクの設定を指定します。マルチタスク format ジョブを送信するときは、 runs submit 要求が共有ジョブ クラスターをサポートしていないため、クラスターをタスク レベルで設定する必要があります。 複数のタスクを指定する例については、「 作成JobSettings を参照してください。

更新

[ジョブ] の[ジョブの一部更新] 操作POST /jobs/update () を使用して単一タスク形式のジョブを更新する場合、既存のクライアントを変更する必要はありません。API

マルチタスク形式ジョブの設定を更新するには、一意の task_key フィールドを使用して新しい task 設定を識別する必要があります。 複数のタスクを指定する例については、「 作成JobSettings を参照してください。

リセット

ジョブ API の [ ジョブのすべての設定を上書き する] 操作 (POST /jobs/reset) を使用して、単一タスク形式のジョブの設定を上書きする場合、既存のクライアントを変更する必要はありません。

マルチタスク形式ジョブの設定を上書きするには、TaskSettingsデータ構造の配列を持つJobSettingsデータ構造を指定します。複数のタスクを指定する例については、「 作成JobSettings を参照してください。

[更新] を使用すると、シングルタスク形式からマルチタスク形式に切り替えずに個々のフィールドを変更できます。

リスト

単一タスク形式のジョブの場合、ジョブ API の List all jobs 操作 (GET /jobs/list) からの応答を処理するために、クライアントを変更する必要はありません。

マルチタスク フォーマット ジョブの場合、ほとんどの設定はジョブ レベルではなくタスク レベルで定義されます。 クラスター コンフィギュレーションは、タスク レベルまたはジョブ レベルで設定できます。 Job構造体で返されるマルチタスク形式ジョブのクラスターまたはタスク設定にアクセスするようにクライアントを変更するには、次のようにします。

  • マルチタスク形式ジョブの job_id フィールドを解析します。

  • ジョブの詳細を取得するには、ジョブ の ジョブの取得 job_id操作 ()GET /jobs/get に を渡します。APIマルチタスク形式ジョブの Get API 呼び出しからの応答例については、「Get」を参照してください。

次の例は、シングルタスク形式とマルチタスク形式のジョブを含むレスポンスを示しています。 次の例は API 2.0 用です。

{
  "jobs": [
    {
      "job_id": 36,
      "settings": {
        "name": "A job with a single task",
        "existing_cluster_id": "1201-my-cluster",
        "email_notifications": {},
        "timeout_seconds": 0,
        "notebook_task": {
          "notebook_path": "/Users/user@databricks.com/example-notebook",
          "revision_timestamp": 0
        },
        "max_concurrent_runs": 1,
        "format": "SINGLE_TASK"
      },
      "created_time": 1505427148390,
      "creator_user_name": "user@databricks.com"
    },
    {
      "job_id": 53,
      "settings": {
        "name": "A job with multiple tasks",
        "email_notifications": {},
        "timeout_seconds": 0,
        "max_concurrent_runs": 1,
        "format": "MULTI_TASK"
      },
      "created_time": 1625841911296,
      "creator_user_name": "user@databricks.com"
    }
  ]
}

取得

single-task format ジョブの場合、ジョブ の Get a ジョブ 操作 ()GET /jobs/get APIからの応答を処理するためにクライアントを変更する必要はありません。

マルチタスク形式のジョブは、タスク設定を含む task データ構造の配列を返します。 タスク レベルの詳細にアクセスする必要がある場合は、 tasks 配列を反復処理し、必要なフィールドを抽出するようにクライアントを変更する必要があります。

次に、マルチタスク形式のジョブに対する Get API 呼び出しからの応答の例を示します。 次の例は、API 2.0 と 2.1 用です。

{
  "job_id": 53,
  "settings": {
    "name": "A job with multiple tasks",
    "email_notifications": {},
    "timeout_seconds": 0,
    "max_concurrent_runs": 1,
    "tasks": [
      {
        "task_key": "clean_data",
        "description": "Clean and prepare the data",
        "notebook_task": {
          "notebook_path": "/Users/user@databricks.com/clean-data"
        },
        "existing_cluster_id": "1201-my-cluster",
        "max_retries": 3,
        "min_retry_interval_millis": 0,
        "retry_on_timeout": true,
        "timeout_seconds": 3600,
        "email_notifications": {}
      },
      {
        "task_key": "analyze_data",
        "description": "Perform an analysis of the data",
        "notebook_task": {
          "notebook_path": "/Users/user@databricks.com/analyze-data"
        },
        "depends_on": [
          {
            "task_key": "clean_data"
          }
        ],
        "existing_cluster_id": "1201-my-cluster",
        "max_retries": 3,
        "min_retry_interval_millis": 0,
        "retry_on_timeout": true,
        "timeout_seconds": 3600,
        "email_notifications": {}
      }
    ],
    "format": "MULTI_TASK"
  },
  "created_time": 1625841911296,
  "creator_user_name": "user@databricks.com",
  "run_as_user_name": "user@databricks.com"
}

実行 取得

単一タスク形式のジョブの場合、ジョブ の Get a ジョブ実行 操作 ()GET /jobs/runs/get APIからの応答を処理するためにクライアントを変更する必要はありません。

The response for a multi-task format job run contains an array of TaskSettings. To retrieve run results for each task:

  • 各タスクを反復処理します。

  • 各タスクの run_id を解析します。

  • run_idを使用して Get the output for a run operation (GET /jobs/runs/get-output) を呼び出し、各タスクの実行の詳細を取得します。このリクエストからの応答の例を次に示します。

{
  "job_id": 53,
  "run_id": 759600,
  "number_in_job": 7,
  "original_attempt_run_id": 759600,
  "state": {
    "life_cycle_state": "TERMINATED",
    "result_state": "SUCCESS",
    "state_message": ""
  },
  "cluster_spec": {},
  "start_time": 1595943854860,
  "setup_duration": 0,
  "execution_duration": 0,
  "cleanup_duration": 0,
  "trigger": "ONE_TIME",
  "creator_user_name": "user@databricks.com",
  "run_name": "Query logs",
  "run_type": "JOB_RUN",
  "tasks": [
    {
      "run_id": 759601,
      "task_key": "query-logs",
      "description": "Query session logs",
      "notebook_task": {
        "notebook_path": "/Users/user@databricks.com/log-query"
      },
      "existing_cluster_id": "1201-my-cluster",
      "state": {
        "life_cycle_state": "TERMINATED",
        "result_state": "SUCCESS",
        "state_message": ""
      }
    },
    {
      "run_id": 759602,
      "task_key": "validate_output",
      "description": "Validate query output",
      "depends_on": [
        {
          "task_key": "query-logs"
        }
      ],
      "notebook_task": {
        "notebook_path": "/Users/user@databricks.com/validate-query-results"
      },
      "existing_cluster_id": "1201-my-cluster",
      "state": {
        "life_cycle_state": "TERMINATED",
        "result_state": "SUCCESS",
        "state_message": ""
      }
    }
  ],
  "format": "MULTI_TASK"
}

get output を実行します

単一タスク形式のジョブの場合、ジョブ API の実行操作 (GET /jobs/runs/get-output) の出力の取得からの応答を処理するために、クライアントを変更する必要はありません。

マルチタスク フォーマット ジョブの場合、親実行で Runs get output を呼び出すと、実行出力は個々のタスクでのみ使用できるため、エラーが発生します。 マルチタスクフォーマットジョブの出力とメタデータを取得するには:

  • 実行要求 の出力の取得 を呼び出します。

  • 応答の子フィールド run_id フィールドを反復処理します。

  • run_id の値を使用して Runs get outputを呼び出します。

実行 list

シングルタスク形式のジョブの場合、ジョブ操作 (GET /jobs/runs/list) の List 実行からの応答を処理するために、クライアントを変更する必要はありません。

マルチタスク形式のジョブの場合、空の tasks 配列が返されます。 run_idを Get a ジョブ実行操作 (GET /jobs/runs/get) に渡して、タスクを取得します。以下は、マルチタスク形式ジョブの Runs list API 呼び出しからの応答の例を示しています。

{
  "runs": [
    {
      "job_id": 53,
      "run_id": 759600,
      "number_in_job": 7,
      "original_attempt_run_id": 759600,
      "state": {
          "life_cycle_state": "TERMINATED",
          "result_state": "SUCCESS",
          "state_message": ""
      },
      "cluster_spec": {},
      "start_time": 1595943854860,
      "setup_duration": 0,
      "execution_duration": 0,
      "cleanup_duration": 0,
      "trigger": "ONE_TIME",
      "creator_user_name": "user@databricks.com",
      "run_name": "Query logs",
      "run_type": "JOB_RUN",
      "tasks": [],
      "format": "MULTI_TASK"
    }
  ],
  "has_more": false
}