ワークフローでのLakeflow 宣言型パイプラインの実行
Lakeflow 宣言型パイプラインは、Lakeflow Jobs、Apache Airflow、または Azure Data Factory を使用してデータ処理ワークフローの一部として実行できます。
求人
Databricks ジョブで複数のタスクを調整して、データ処理ワークフローを実装できます。ジョブにパイプラインを含めるには、ジョブの作成時に パイプライン タスクを使用します。 ジョブのパイプライン タスクを参照してください。
Apache Airflow
Apache Airflow は、データワークフローを管理およびスケジューリングするためのオープンソースソリューションです。Airflow は、ワークフローを操作の有向非巡回グラフ (DAG) として表します。Pythonファイルでワークフローを定義し、Airflowがスケジューリングと実行を管理します。DatabricksでのAirflowのインストールと使用に関する情報については、Apache AirflowによるLakeflowジョブのオーケストレーションを参照してください。
Airflow ワークフローの一部としてパイプラインを実行するには、 DatabricksSubmitRunOperatorを使用します。
要件
AirflowLakeflow宣言型パイプラインの サポートを使用するには、次のものが必要です。
- Airflowバージョン2.1.0 またはそれ以降。
- Databricks プロバイダーパッケージ バージョン 2.1.0またはそれ以降。
例
次の例では、識別子8279d543-063c-4d63-9926-dae38e35ce8b
を持つパイプラインの更新をトリガーする Airflow DAG を作成します。
from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow'
}
with DAG('ldp',
start_date=days_ago(2),
schedule_interval="@once",
default_args=default_args
) as dag:
opr_run_now=DatabricksSubmitRunOperator(
task_id='run_now',
databricks_conn_id='CONNECTION_ID',
pipeline_task={"pipeline_id": "8279d543-063c-4d63-9926-dae38e35ce8b"}
)
CONNECTION_ID
、ワークスペースへのAirflow 接続の識別子に置き換えます。
この例を airflow/dags
ディレクトリに保存し、Airflow UI を使用して DAGを 表示およびトリガー します。Lakeflow 宣言型パイプライン UI を使用して、パイプラインの更新の詳細を表示します。
Azure Data Factory
Lakeflow 宣言型パイプラインと Azure Data Factory にはそれぞれ、障害発生時の再試行回数を構成するオプションが含まれています。 再試行値がパイプライン と 、そのパイプラインを呼び出す Azure Data Factory アクティビティで構成されている場合、再試行回数は、 Azure Data Factory の再試行値に Lakeflow 宣言型パイプラインの再試行値を掛けた値になります。
たとえば、パイプラインの更新が失敗した場合、 Lakeflow 宣言型パイプラインはデフォルトによって最大 5 回更新を再試行します。 Azure Data Factory の再試行が 3 回に設定され、パイプラインがデフォルトの 5 回の再試行を使用している場合、失敗したパイプラインは最大 15 回再試行される可能性があります。パイプラインの更新が失敗した場合に過剰な再試行を避けるために、Databricks では、パイプラインまたはパイプラインを呼び出す Azure Data Factory アクティビティを構成するときに、再試行の回数を制限することをお勧めします。
パイプラインの再試行設定を変更するには、パイプラインを構成するときにpipelines.numUpdateRetryAttempts
設定を使用します。
Azure Data Factory は、データ統合と変換のワークフローを調整できるクラウドベースの ETL サービスです。Azure Data Factory は、ノートブック、JAR タスク、Python スクリプトなど、ワークフローでの Databricks タスクの実行を直接サポートします。また、 Data FactoryWeb アクティビティ LakeflowからAPI DeclarativeAzure パイプライン を呼び出すことで、パイプラインをワークフローに含めることもできます。たとえば、Azure Data Factory からパイプラインの更新をトリガーするには、次のようにします。
-
データ ファクトリを作成するか、既存のデータ ファクトリを開きます。
-
作成が完了したら、データ ファクトリのページを開き、 Azure Data Factory Studio を開く タイルをクリックします。 Azure データ ファクトリのユーザー インターフェイスが表示されます。
-
Azure Data Factory Studio ユーザー インターフェイスの 新規 ドロップダウン メニューから パイプライン を選択して、新しい Azure Data Factoryパイプラインを作成します。
-
アクティビティ ツールボックスで、 全般 を展開し、 Web アクティビティをパイプライン キャンバスにドラッグします。 設定 タブをクリックし、次の値を入力します。
自動化されたツール、システム、スクリプト、アプリで認証する際のセキュリティのベストプラクティスとして、DatabricksではOAuth トークンを使用することをお勧めします。
パーソナルアクセストークン認証 を使用する場合、 Databricks では、ワークスペース ユーザーではなく、サービスプリンシパル に属する パーソナルアクセストークン を使用することをお勧めします。 サービスプリンシパルのトークンを作成するには、「 サービスプリンシパルのトークンの管理」を参照してください。
-
URL :
https://<databricks-instance>/api/2.0/pipelines/<pipeline-id>/updates
。<get-workspace-instance>
を置き換えます。<pipeline-id>
パイプライン識別子に置き換えます。 -
方法 : ドロップダウン メニューから POST を選択します。
-
ヘッダー : + 新規 をクリックします。 名前 テキスト ボックスに、
Authorization
と入力します。 値 テキスト ボックスに、Bearer <personal-access-token>
と入力します。<personal-access-token>
Databricksの個人アクセス許可に置き換えます。 -
Body : 追加のリクエストを渡すには、問題を含むJSONドキュメントを入力します。 たとえば、更新を開始し、パイプラインのすべてのデータを再処理するには、次のようにします:
{"full_refresh": "true"}
。追加のリクエストがない場合は、空の中括弧 ({}
) を入力してください。
Web アクティビティをテストするには、Data Factory UI のパイプライン ツール バーで デバッグ をクリックします。実行の出力と状態 (エラーを含む) は、Azure Data Factory パイプラインの 出力 タブに表示されます。Lakeflow 宣言型パイプライン UI を使用して、パイプラインの更新の詳細を表示します。
一般的なワークフロー要件は、前のタスクの完了後にタスクを開始することです。Lakeflow宣言型パイプラインのupdates
要求は非同期であるため (更新の開始後、更新が完了する前に要求が返される)、Azure Lakeflow宣言型パイプラインの更新に依存する Data Factory パイプラインのタスクは、更新が完了するまで待機する必要があります。更新の完了を待つオプションとして、Lakeflow 宣言型パイプラインの更新をトリガーする Web アクティビティの後に Until アクティビティ を追加する方法があります。Until アクティビティでは、次の操作を行います。
- 更新が完了するまで設定された秒数待機するための待機アクティビティを追加します。
- Wait アクティビティの後に、 Lakeflow 宣言型パイプライン更新の詳細要求を使用して更新のステータスを取得する Web アクティビティを追加します。 応答の
state
フィールドは、更新が完了したかどうかを含め、更新の現在の状態を返します。 state
フィールドの値を使用して、Until アクティビティの終了条件を設定します。変数設定アクティビティを使用して、state
値に基づいてパイプライン変数を追加し、この変数を終了条件に使用することもできます。