ワークフローでパイプラインを実行する
Lakeflow Jobs、 Apache Airflow 、またはAzure Data Factory を使用して、データ処理ワークフローの一部としてパイプラインを実行できます。
ジョブ
Databricks ジョブで複数のタスクを調整して、データ処理ワークフローを実装できます。ジョブにパイプラインを含めるには、ジョブの作成時に パイプライン タスクを使用します。 ジョブのパイプライン タスクを参照してください。
Apache Airflow
Apache Airflow は、データワークフローを管理およびスケジューリングするためのオープンソースソリューションです。Airflow は、ワークフローを操作の有向非巡回グラフ (DAG) として表します。Pythonファイルでワークフローを定義し、Airflowがスケジューリングと実行を管理します。DatabricksでのAirflowのインストールと使用に関する情報については、Apache AirflowによるLakeflowジョブのオーケストレーションを参照してください。
Airflow ワークフローの一部としてパイプラインを実行するには、 DatabricksSubmitRunOperatorを使用します。
要件
Lakeflow Spark宣言型パイプラインのAirflowサポートを使用するには、以下が必要です。
- Airflowバージョン2.1.0 またはそれ以降。
- Databricks プロバイダーパッケージ バージョン 2.1.0またはそれ以降。
例
次の例では、識別子8279d543-063c-4d63-9926-dae38e35ce8bを持つパイプラインの更新をトリガーする Airflow DAG を作成します。
from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow'
}
with DAG('ldp',
start_date=days_ago(2),
schedule_interval="@once",
default_args=default_args
) as dag:
opr_run_now=DatabricksSubmitRunOperator(
task_id='run_now',
databricks_conn_id='CONNECTION_ID',
pipeline_task={"pipeline_id": "8279d543-063c-4d63-9926-dae38e35ce8b"}
)
CONNECTION_ID 、ワークスペースへのAirflow 接続の識別子に置き換えます。
この例をairflow/dagsディレクトリに保存し、Airflow UI を使用して DAG を表示およびトリガーします。パイプライン UI を使用して、パイプライン更新の詳細を表示します。
Azure Data Factory
Lakeflow Spark宣言型パイプラインとAzure Data Factory には、それぞれ、障害発生時の再試行回数を構成するオプションが含まれています。 パイプライン と、 パイプラインを呼び出す Azure Data Factory アクティビティで再試行値が構成されている場合、再試行回数は Azure Data Factory の再試行値にパイプラインの再試行値を掛けた値になります。
たとえば、パイプラインの更新に失敗した場合、 Lakeflow Spark宣言型パイプラインは、当然最大 5 回まで更新を再試行します。 Azure Data Factory の再試行が 3 に設定されていて、パイプラインがデフォルトの 5 回の再試行を使用する場合、失敗したパイプラインは最大 15 回再試行される可能性があります。パイプラインの更新が失敗したときに過度の再試行を回避するために、Databricks では、パイプラインまたはパイプラインを呼び出す Azure Data Factory アクティビティを構成するときに再試行回数を制限することをお勧めします。
パイプラインの再試行設定を変更するには、パイプラインを構成するときにpipelines.numUpdateRetryAttempts設定を使用します。
Azure Data Factory は、データ統合および変換ワークフローを調整できるクラウドベースの ETL サービスです。Azure Data Factory は、ノートブック、JAR タスク、Python スクリプトなどのワークフロー内での Databricks タスクの実行を直接サポートします。Azure Data FactoryWeb アクティビティ からパイプライン RESTAPI を 呼び出して、ワークフローにパイプラインを含めることもできます。たとえば、Azure Data Factory からパイプラインの更新をトリガーするには、次のようにします。
-
データ ファクトリを作成するか、既存のデータ ファクトリを開きます。
-
作成が完了したら、データ ファクトリのページを開き、 Azure Data Factory Studio を開く タイルをクリックします。 Azure データ ファクトリのユーザー インターフェイスが表示されます。
-
Azure Data Factory Studio ユーザー インターフェイスの 新規 ドロップダウン メニューから パイプライン を選択して、新しい Azure Data Factoryパイプラインを作成します。
-
アクティビティ ツールボックスで、 全般 を展開し、 Web アクティビティをパイプライン キャンバスにドラッグします。 設定 タブをクリックし、次の値を入力します。
セキュリティのベストプラクティスとして、自動化されたツール、システム、スクリプト、アプリで認証する場合、Databricks では、ワークスペースユーザーではなく、サービスプリンシパル に属する個人用アクセストークンを使用することをお勧めします。 サービスプリンシパルのトークンを作成するには、「 サービスプリンシパルのトークンの管理」を参照してください。
-
URL :
https://<databricks-instance>/api/2.0/pipelines/<pipeline-id>/updates。<get-workspace-instance>を置き換えます。<pipeline-id>パイプライン識別子に置き換えます。 -
方法 : ドロップダウン メニューから POST を選択します。
-
ヘッダー : + 新規 をクリックします。 名前 テキスト ボックスに、
Authorizationと入力します。 値 テキスト ボックスに、Bearer <personal-access-token>と入力します。<personal-access-token>Databricksの個人アクセス許可に置き換えます。 -
Body : 追加のリクエストを渡すには、パラメーターを含むJSONドキュメントを入力します。 たとえば、更新を開始し、パイプラインのすべてのデータを再処理するには、次のようにします:
{"full_refresh": "true"}。追加のリクエストがない場合は、空の中括弧 ({}) を入力してください。
Web アクティビティをテストするには、Data Factory UI のパイプライン ツール バーで [デバッグ] をクリックします。エラーを含む実行の出力とステータスは、Azure Data Factory パイプラインの [出力] タブに表示されます。パイプラインの更新の詳細を表示するには、パイプライン UI を使用します。
一般的なワークフロー要件は、前のタスクの完了後にタスクを開始することです。パイプラインupdates要求は非同期であるため (要求は更新の開始後、更新が完了する前に返されます)、パイプライン更新に依存する Azure Data Factory パイプライン内のタスクは、更新が完了するまで待機する必要があります。更新の完了を待つオプションは、 Lakeflow Spark宣言型パイプラインの更新をトリガーする Web アクティビティの後にuntil アクティビティを追加することです。 Until アクティビティでは次のようになります。
- 更新が完了するまで設定された秒数待機するための待機アクティビティを追加します。
- パイプライン更新の詳細要求を使用して更新のステータスを取得する Wait アクティビティの後に Web アクティビティを追加します。レスポンスの
stateフィールドには、更新が完了したかどうかも含め、更新の現在の状態が返されます。 stateフィールドの値を使用して、Until アクティビティの終了条件を設定します。変数設定アクティビティを使用して、state値に基づいてパイプライン変数を追加し、この変数を終了条件に使用することもできます。