ワークフローのスケジュールとオーケストレーション

Databricks Workflows は、Databricks でデータ処理タスクをスケジュールおよび調整できるツールのコレクションを提供します。 Databricks Workflowsジョブを構成するには、Databricks を使用します。

この記事では、 Databricksジョブを使用して本番運用ワークロードを管理することに関連する概念について説明します。

注:

Delta Live Tables は、データ処理パイプラインを作成するための宣言型構文を提供します。 「Delta Live Tables とは何ですか?」を参照してください。

Databricksジョブとは何ですか?

Databricksジョブを使用すると、指定したスケジュールで指定したコンピュート環境でタスクを実行するように構成できます。 Delta Live Tablesパイプラインと並んで、ジョブは、データ処理とDatabricks MLロジックを本番運用にデプロイするために で使用される主要なツールです。

ジョブの複雑さは、 Databricksノートブックを実行する単一のタスクから、条件付きロジックと依存関係を使用して実行される数千のタスクまでさまざまです。

ジョブを設定して実行するにはどうすればよいですか?

ジョブの作成と実行は、ジョブ UI、Databricks CLI、および Jobs API を使って行うことができます。失敗したジョブまたはキャンセルされたジョブは、UI または API を使用して修復および再実行できます。UI、CLI、API、および通知(電子メール、Webhook 宛先、Slack 通知など)を使用して、ジョブの実行結果を監視できます。

Databricks CLI の使用方法については、 「Databricks CLI とは」を参照してください。 Jobs API の使用方法については、 Jobs APIを参照してください。

ジョブに必要な最小構成は何ですか?

Databricks のすべてのジョブには以下が必要です。

  • 実行されるロジックを含むソース コード。

  • ロジックを実行するためのコンピュート リソース。 コンピュート リソースには、サーバレス コンピュート、クラシック ジョブ コンピュート、または汎用コンピュートがあります。 「ジョブでDatabricksコンピュートを使用する」を参照してください。

  • ジョブを実行するタイミングを指定するスケジュール、または手動トリガー。

  • ユニークな名前。

注:

Databricksノートブックでコードを開発する場合、 [スケジュール]ボタンを使用してそのノートブックをジョブとして構成できます。 「スケジュールされたノートブック ジョブの作成と管理」を参照してください。

タスクとは何ですか?

タスクはジョブ内のロジックの単位を表します。 タスクの複雑さはさまざまですが、次のようなものがあります。

  • 読書

  • JAR

  • SQLクエリ

  • DLTパイプライン

  • もう一つのジョブ

  • 制御フロータスク

タスク間の依存関係を指定することにより、タスクの実行順序を制御できます。 タスクを順番に実行したり、並行して実行したりするように構成できます。

ジョブはタスクの状態情報およびメタデータと対話しますが、タスクのスコープは分離されています。 タスク値を使用して、スケジュールされたタスク間でコンテキストを共有できます。 「Databricks ジョブ内のタスク間で情報を共有する」を参照してください。

ジョブにはどのような制御フロー オプションが利用できますか?

ジョブとジョブ内でタスクを構成する場合、ジョブ全体と個々のタスクの実行方法を制御する設定をカスタマイズできます。

トリガーの種類

ジョブを構成するときにトリガー タイプを指定する必要があります。 次のトリガーの種類から選択できます。

ジョブを手動でトリガーすることもできますが、これは主に次のような特定のユースケースにのみ使用されます。

  • REST API 呼び出しを使用してジョブをトリガーするには、外部オーケストレーション ツールを使用します。

  • めったに実行されないジョブがあり、検証やデータ品質の問題の解決に人間による介入が必要です。

  • 移行など、1 回または数回だけ実行する必要があるワークロードを実行しています。

「新しいファイルが到着したときにジョブをトリガーする」を参照してください。

再試行

再試行は、ジョブがエラー メッセージで失敗した場合に、特定のジョブまたはタスクを何回再実行するかを指定します。 多くの場合、エラーは一時的なものであり、再起動によって解決されます。構造化ストリーミングによるスキーマ進化などのDatabricksの一部の機能は、環境をリセットしてワークフローを続行できるようにするために、再試行してジョブを実行することを前提としています。

再試行を設定するためのオプションが、サポートされているコンテキストの UI に表示されます。 これには、次のものが含まれます。

  • ジョブ全体に対して再試行を指定できます。つまり、いずれかのタスクが失敗した場合にジョブ全体が再開されます。

  • タスクの再試行を指定できます。その場合、エラーが発生した場合、タスクは指定された回数まで再起動されます。

連続トリガー モードで実行している場合、Databricks は指数バックオフで自動的に再試行します。 「継続的なジョブの障害はどのように処理されますか?」を参照してください。

条件付きタスクを実行する

実行 ifタスク タイプを使用すると、他のタスクの結果に基づいて、後のタスクの条件を指定できます。 ジョブにタスクを追加し、上流に依存するタスクを指定します。 これらのタスクのステータスに基づいて、実行する 1 つ以上のダウンストリーム タスクを構成できます。 ジョブは、次の依存関係をサポートします。

  • すべて成功しました

  • 少なくとも1つが成功しました

  • 失敗したものはありません

  • すべて完了

  • 少なくとも1回失敗しました

  • すべて失敗しました

Databricksジョブの条件付き実行タスクを参照してください。

If/else条件付きタスク

If/elseタスク タイプを使用すると、何らかの値に基づいて条件を指定できます。 「If/else条件タスクを使用してジョブに分岐ロジックを追加する」を参照してください。

ジョブは、ロジック内で定義するtaskValuesをサポートし、タスクからジョブ環境に計算または状態の結果を返すことができます。 taskValues 、ジョブ パラメーター、または動的な値に対してIf/else条件を定義できます。

Databricks は条件文に次のオペランドをサポートしています。

  • ==

  • !=

  • >

  • >=

  • <

  • <=

関連項目:

各タスク

For each タスクを使用して、ループ内で別のタスクを実行し、タスクの各反復に異なるパラメーターのセットを渡します。

For each タスクをジョブに追加するには、For each タスクとネストされたタスクの 2 つのタスクを定義する必要があります。入れ子になったタスクは、 For each タスクの各イテレーションに対して実行するタスクであり、標準の Databricks ジョブ タスクの種類の 1 つです。 入れ子になったタスクにパラメーターを渡すために、複数のメソッドがサポートされています。

パラメーター化された Databricks ジョブ タスクをループで実行する」を参照してください。

期間のしきい値

期間のしきい値を指定して、指定した期間を超えた場合に警告を送信するか、タスクまたはジョブを停止することができます。 この設定を構成する場合の例としては、次のようなものがあります。

  • ハング状態のままになりやすいタスクがあります。

  • ワークフローの SLA を超えた場合はエンジニアに警告する必要があります。

  • 予期しないコストを回避するために、大規模なクラスターで構成されたジョブを失敗させたいと考えています。

並行 処理

ほとんどのジョブは、一応 1 つの並行ジョブで構成されています。 つまり、新しいジョブがトリガーされるまでに前のジョブの実行が完了していない場合、次のジョブの実行はスキップされます。

同時実行性を向上させるユースケースがいくつかありますが、ほとんどのワークロードではこの設定を変更する必要はありません。

ジョブを監視するにはどうすればよいですか?

ジョブまたはタスクが開始、完了、または失敗したときに通知を受け取ることができます。 通知を 1 つ以上の電子メール アドレスまたはシステムの宛先に送信できます。 「ジョブ イベントの電子メールとシステム通知の追加」を参照してください。

システムテーブルには、アカウント内のジョブ アクティビティに関連するレコードを表示できるlakeflowスキーマが含まれています。 「ジョブ システム テーブル リファレンス」を参照してください。

ジョブ システム テーブルを請求テーブルと結合して、アカウント全体のジョブのコストを監視することもできます。 「システムテーブルを使用したジョブコストの監視」を参照してください。

制限事項

次の制限があります。

  • ワークスペースの同時タスク実行数は 1000 に制限されています。すぐに開始できない実行を要求すると、429 Too Many Requests 応答が返されます。

  • ワークスペースが 1 時間に作成できるジョブの数は 10000 に制限されています(「実行の送信」を含む)。この制限は、REST API およびノートブックワークフローによって作成されたジョブにも影響します。

  • ワークスペースには最大 12,000 個の保存されたジョブを含めることができます。

  • ジョブには最大 100 個のタスクを含めることができます。

ワークフローをプログラムで管理できますか?

Databricks 、次のようなワークフローをプログラムでスケジュールおよび調整できるツールとAPIsを提供します。

開発者ツールの詳細については、 「開発者ツールとガイダンス」を参照してください。

Apache AirFlowを使用したワークフローオーケストレーション

Apache Airflowを使用して、データ ワークフローを管理およびスケジュールできます。 Airflow を使用すると、Python ファイルでワークフローを定義し、Airflow がワークフローのスケジュールと実行を管理します。 「Apache Airflow を使用して Databricks ジョブを調整する」を参照してください。