ワークフローのスケジュールとオーケストレーション
November 05, 2024
Databricks Workflows には、Databricks でデータ処理タスクをスケジュールおよび調整できるツールがあります。 Databricks Workflows を使用して Databricks ジョブを設定します。
この記事では、 Databricks ジョブを使用した本番運用ワークロードの管理に関連する概念と選択肢について説明します。
Databricks ジョブとは
ジョブは、 Databricksでの本番運用ワークロードをスケジュールおよび調整するための主要な単位です。 ジョブは 1 つ以上のタスクで構成されます。 タスクとジョブを組み合わせて、以下を構成およびデプロイできます。
Spark、SQL、OSS Python、ML、任意のコードなどのカスタムロジック。
コンピュート リソースとカスタム環境とライブラリ。
実行中のワークロードのスケジュールとトリガー。
タスク間の制御フローの条件付きロジック。
ジョブは、 タスク間の関係を定義するための手続き型アプローチを提供します。 Delta Live Tables パイプラインは、 データセット と 変換の間のリレーションシップを定義するための宣言型アプローチを提供します。 Delta Live Tables パイプラインをタスクとしてジョブに含めることができます。 ジョブの Delta Live Tables パイプライン タスクを参照してください。
ジョブの複雑さは、 Databricksノートブックを実行する単一のタスクから、条件付きロジックと依存関係を使用して実行される数千のタスクまでさまざまです。
ジョブを設定して実行するにはどうすればよいですか?
ジョブ UI や Databricks CLI を使用するか、ジョブ API を呼び出すことで、ジョブを作成および実行できます。 UI または API を使用して、失敗したジョブまたはキャンセルされたジョブを修復して再実行できます。 ジョブの実行結果は、UI、 CLI、 API、および通知(Eメール、Webhook宛先、Slack通知など)を使用して監視できます。
ジョブの構成とオーケストレーションに Infrastructure-as-Code (IaC) アプローチを使用する場合は、Databricks アセット バンドル (DAB) を使用します。 バンドルには、ジョブとタスクの YAML 定義を含めることができ、 Databricks CLIを使用して管理され、異なるターゲット ワークスペース (開発、ステージング、本番運用など) で共有および実行できます。 DAB を使用してジョブを構成および調整する方法については、「 Databricks アセット バンドル」を参照してください。
Databricks CLI の使用方法については、 「Databricks CLI とは」を参照してください。 Jobs API の使用方法については、 Jobs APIを参照してください。
ジョブに必要な最小構成は何ですか?
Databricks のすべてのジョブには以下が必要です。
実行するロジックを含むソースコード (Databricks ノートブックなど)。
ロジックを実行するためのコンピュート リソース。 コンピュートリソースには、サーバレスコンピュート、クラシックジョブコンピュート、またはAll-Purposeコンピュートを使用できます。 「ジョブのコンピュートの設定」を参照してください。
ジョブを実行するタイミングについて指定されたスケジュール。 オプションで、スケジュールの設定を省略して、ジョブを手動でトリガーできます。
ユニークな名前。
注:
Databricksノートブックでコードを開発する場合、 [スケジュール]ボタンを使用してそのノートブックをジョブとして構成できます。 「スケジュールされたノートブックジョブの作成と管理」を参照してください。
タスクとは何ですか?
タスクは、ジョブのステップとして実行されるロジックの単位を表します。 タスクの複雑さはさまざまで、次のようなものがあります。
ノートブック
JAR
SQL クエリ
DLTパイプライン
別のジョブ
制御フロータスク
タスク間の依存関係を指定することにより、タスクの実行順序を制御できます。 タスクを順番に実行したり、並行して実行したりするように構成できます。
ジョブは、タスクの状態情報とメタデータと対話しますが、タスクのスコープは分離されています。 タスク値を使用して、スケジュールされたタスク間でコンテキストを共有できます。 「タスク値を使用してタスク間で情報を渡す」を参照してください。
ジョブにはどのような制御フローオプションが利用できますか?
ジョブとジョブのタスクを構成する場合、ジョブ全体と個々のタスクの実行方法を制御する設定をカスタマイズできます。 これらのオプションは次のとおりです。
トリガーの種類
ジョブを構成するときにトリガー タイプを指定する必要があります。 次のトリガーの種類から選択できます。
ジョブを手動でトリガーすることもできますが、これは主に次のような特定のユースケースに限られています。
外部オーケストレーション ツールを使用して、REST API 呼び出しを使用してジョブをトリガーする。
ほとんど実行されず、検証やデータ品質の問題の解決のために手動による介入が必要なジョブ。
移行など、1 回または数回だけ実行する必要があるワークロードを実行する。
スケジュールとトリガーを使用したジョブの自動化を参照してください。
再試行
再試行は、タスクがエラー メッセージで失敗した場合に、特定のタスクを再実行する回数を指定します。 多くの場合、エラーは一時的なもので、再起動によって解決されます。 Databricksの一部の機能 (スキーマ進化と構造化ストリーミングなど) では、ジョブを再試行して環境をリセットし、ワークフローを続行することを前提としています。
タスクの再試行を指定した場合、エラーが発生した場合、タスクは指定された回数まで再起動します。 すべてのジョブ構成がタスクの再試行をサポートしているわけではありません。 再試行ポリシーの設定を参照してください。
連続トリガー モードで実行されている場合、Databricks はエクスポネンシャル バックオフを使用して自動的に再試行します。 「連続ジョブの障害はどのように処理されますか?」を参照してください。
Run if条件付きタスク
Run ifタスク タイプを使用すると、他のタスクの結果に基づいて、後のタスクの条件を指定できます。 ジョブにタスクを追加し、上流に依存するタスクを指定します。 これらのタスクのステータスに基づいて、実行する 1 つ以上のダウンストリーム タスクを構成できます。 ジョブは、次の依存関係をサポートします。
すべて成功しました
少なくとも1つが成功しました
失敗したものはありません
すべて完了
少なくとも1回失敗しました
すべて失敗しました
「タスクの依存関係を構成する」を参照してください
If/else条件付きタスク
If/else タスクタイプを使用して、ある値に基づいて条件を指定できます。「 If/else タスクを使用してジョブに分岐ロジックを追加する」を参照してください。
ジョブは、ロジックで定義する taskValues
をサポートし、タスクからジョブ環境に一部の計算または状態の結果を返すことができます。 If/else 条件は、taskValues
、ジョブ・パラメーター、または動的値に対して定義できます。
Databricks は条件文に次のオペランドをサポートしています。
==
!=
>
>=
<
<=
関連項目:
For eachタスク
For each
タスクを使用して、ループ内で別のタスクを実行し、タスクの各反復に異なるパラメーターのセットを渡します。
For each
タスクをジョブに追加するには、For each
タスクとネストされたタスクの 2 つのタスクを定義する必要があります。入れ子になったタスクは、 For each
タスクの各イテレーションに対して実行するタスクであり、標準の Databricks ジョブ タスクの種類の 1 つです。 入れ子になったタスクにパラメーターを渡すために、複数のメソッドがサポートされています。
「 パラメーター化された Databricks ジョブ タスクをループで実行する」を参照してください。
期間のしきい値
実行時間のしきい値を指定して、指定した期間を超えた場合に警告を送信したり、タスクやジョブを停止したりできます。 この設定を構成する場合の例としては、次のようなものがあります。
ハング状態になりやすいタスクがある。
ワークフローの SLA を超えた場合は、エンジニアに警告する必要がある。
予期しないコストを回避するために、大規模なクラスターで構成されたジョブを失敗させる必要がある。
「ジョブ 実行 duration またはストリーミング バックログ メトリクスのしきい値を構成する」および「タスク 実行 duration またはストリーミング バックログ メトリクスのしきい値を構成する」を参照してください。
並列処理
ほとんどのジョブは、デフォルトの 1 つの並行ジョブで構成されています。 つまり、新しいジョブがトリガーされるまでに前のジョブの実行が完了していない場合、次のジョブの実行はスキップされます。
並列数を増やすユースケースもありますが、ほとんどのワークロードではこの設定を変更する必要はありません。
同時並列数の設定の詳細については、「Databricksのジョブのキューイングと同時並列数の設定」を参照してください。
ジョブを監視するにはどうすればよいですか?
ジョブ UI では、ジョブの実行 (実行進行中を含む) を確認できます。 Databricksジョブのモニタリングと可観測性を参照してください。
ジョブまたはタスクが開始、完了、または失敗したときに通知を受け取ることができます。 通知は、1つ以上のEメールアドレスまたはシステム宛先に送信できます。 「ジョブに通知を追加する」を参照してください。
システムテーブルには、アカウント内のジョブ アクティビティに関連するレコードを表示できるlakeflow
スキーマが含まれています。 「ジョブ システム テーブル リファレンス」を参照してください。
また、ジョブシステムテーブルを請求テーブルと結合して、アカウント全体のジョブのコストを監視することもできます。 Monitor job costs & performance with システムテーブルを参照してください。
制限事項
次の制限があります。
ワークスペースの並列実行タスク数は 2000 に制限されています。 すぐに開始できない実行を要求すると、
429 Too Many Requests
応答が返されます。ワークスペースが 1 時間に作成できるジョブの数は 10000 に制限されています(「実行の送信」を含む)。この制限は、REST API およびノートブックワークフローによって作成されたジョブにも影響します。
ワークスペースには最大 12,000 個の保存されたジョブを含めることができます。
ジョブには最大 100 個のタスクを含めることができます。
ワークフローをプログラムで管理できますか?
Databricks には、次のようなツールと APIs があり、プログラムによってワークフローをスケジュールおよび調整できます。
開発者ツールの詳細については、「 開発者ツール」を参照してください。
Apache AirFlowを使用したワークフローオーケストレーション
Apache Airflowを使用して、データ ワークフローを管理およびスケジュールできます。 Airflow を使用すると、Python ファイルでワークフローを定義し、Airflow がワークフローのスケジュールと実行を管理します。 「Apache Airflow を使用して Databricks ジョブを調整する」を参照してください。