メインコンテンツまでスキップ

ワークフローでのLakeflow 宣言型パイプラインの実行

Lakeflow 宣言型パイプラインは、Lakeflow Jobs、Apache Airflow、または Azure Data Factory を使用してデータ処理ワークフローの一部として実行できます。

ジョブ

Databricks ジョブで複数のタスクをオーケストレーションして、データ処理ワークフローを実装できます。パイプラインをジョブに含めるには、ジョブの作成時に パイプライン タスクを使用します。 ジョブのパイプラインタスクを参照してください。

Apache Airflow

Apache Airflow は、データワークフローを管理およびスケジューリングするためのオープンソースソリューションです。Airflow は、ワークフローを操作の有向非巡回グラフ (DAG) として表します。Pythonファイルでワークフローを定義し、Airflowがスケジューリングと実行を管理します。DatabricksでのAirflowのインストールと使用に関する情報については、Apache AirflowによるLakeflowジョブのオーケストレーションを参照してください。

Airflow ワークフローの一部としてパイプラインを実行するには、 DatabricksSubmitRunOperator を使用します。

必要条件

AirflowLakeflow宣言型パイプラインの サポートを使用するには、次のものが必要です。

  • Airflowバージョン2.1.0 またはそれ以降。
  • Databricks プロバイダー パッケージ バージョン 2.1.0またはそれ以降。

次の例では、識別子 が 次の 8279d543-063c-4d63-9926-dae38e35ce8bでパイプラインの更新をトリガーする Airflow DAG を作成します。

Python
from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
from airflow.utils.dates import days_ago

default_args = {
'owner': 'airflow'
}

with DAG('dlt',
start_date=days_ago(2),
schedule_interval="@once",
default_args=default_args
) as dag:

opr_run_now=DatabricksSubmitRunOperator(
task_id='run_now',
databricks_conn_id='CONNECTION_ID',
pipeline_task={"pipeline_id": "8279d543-063c-4d63-9926-dae38e35ce8b"}
)

CONNECTION_ID をワークスペースへの Airflow 接続の識別子に置き換えます。

この例を airflow/dags ディレクトリに保存し、Airflow UI を使用して DAGを 表示およびトリガー します。Lakeflow 宣言型パイプライン UI を使用して、パイプラインの更新の詳細を表示します。

Azure Data Factory

注記

Lakeflow 宣言型パイプラインと Azure Data Factory にはそれぞれ、障害発生時の再試行回数を構成するオプションが含まれています。 再試行値がパイプライン 、そのパイプラインを呼び出す Azure Data Factory アクティビティで構成されている場合、再試行回数は、 Azure Data Factory の再試行値に Lakeflow 宣言型パイプラインの再試行値を掛けた値になります。

たとえば、パイプラインの更新が失敗した場合、 Lakeflow 宣言型パイプラインはデフォルトによって最大 5 回更新を再試行します。 Azure Data Factory の再試行が 3 回に設定され、パイプラインがデフォルトの 5 回の再試行を使用している場合、失敗したパイプラインは最大 15 回再試行される可能性があります。パイプラインの更新が失敗した場合に過剰な再試行を避けるために、Databricks では、パイプラインまたはパイプラインを呼び出す Azure Data Factory アクティビティを構成するときに、再試行の回数を制限することをお勧めします。

パイプラインの再試行設定を変更するには、パイプラインの設定時に pipelines.numUpdateRetryAttempts 設定を使用します。

Azure Data Factory は、データ統合と変換のワークフローを調整できるクラウドベースの ETL サービスです。Azure Data Factory は、ノートブック、JAR タスク、Python スクリプトなど、ワークフローでの Databricks タスクの実行を直接サポートします。また、 Data FactoryWeb アクティビティ LakeflowからAPI DeclarativeAzure パイプライン を呼び出すことで、パイプラインをワークフローに含めることもできます。たとえば、Azure Data Factory からパイプラインの更新をトリガーするには、次のようにします。

  1. データ ファクトリを作成する か、既存のデータ ファクトリを開きます。

  2. 作成が完了したら、データ ファクトリのページを開き、 Azure Data Factory Studio を開く タイルをクリックします。 Azure データ ファクトリのユーザー インターフェイスが表示されます。

  3. Azure Data Factory Studio ユーザー インターフェイスの 新規 ドロップダウン メニューから パイプライン を選択して、新しい Azure Data Factoryパイプラインを作成します。

  4. アクティビティ ツールボックスで、 全般 を展開し、 Web アクティビティをパイプライン キャンバスにドラッグします。 設定 タブをクリックし、次の値を入力します。

注記

自動化されたツール、システム、スクリプト、アプリで認証する際のセキュリティのベストプラクティスとして、Databricks では OAuth トークンを使用することをお勧めします。

パーソナルアクセストークン認証 を使用する場合、 Databricks では、ワークスペース ユーザーではなく、サービスプリンシパル に属する パーソナルアクセストークン を使用することをお勧めします。 サービスプリンシパルのトークンを作成するには、「 サービスプリンシパルのトークンの管理」を参照してください。

  • URL : https://<databricks-instance>/api/2.0/pipelines/<pipeline-id>/updates.

    <get-workspace-instance>を置き換えます。

    <pipeline-id> をパイプライン識別子に置き換えます。

  • 方法 : ドロップダウン メニューから POST を選択します。

  • ヘッダー : + 新規 をクリックします。 名前 テキスト ボックスに、 Authorizationと入力します。 テキスト ボックスに、 Bearer <personal-access-token>と入力します。

    <personal-access-token> を Databricks 個人用アクセス トークンに置き換えます。

  • 本文 : 追加の要求パラメーターを渡すには、パラメーターを含む JSON ドキュメントを入力します。 たとえば、更新を開始し、パイプラインのすべてのデータを再処理するには、 {"full_refresh": "true"}です。 追加の要求パラメーターがない場合は、空の中括弧 ({}) を入力します。

Web アクティビティをテストするには、Data Factory UI のパイプライン ツール バーで デバッグ をクリックします。実行の出力と状態 (エラーを含む) は、Azure Data Factory パイプラインの 出力 タブに表示されます。Lakeflow 宣言型パイプライン UI を使用して、パイプラインの更新の詳細を表示します。

ヒント

一般的なワークフロー要件は、前のタスクの完了後にタスクを開始することです。Lakeflow宣言型パイプラインのupdates 要求は非同期であるため (更新の開始後、更新が完了する前に要求が返される)、Azure Lakeflow宣言型パイプラインの更新に依存する Data Factory パイプラインのタスクは、更新が完了するまで待機する必要があります。更新の完了を待つオプションとして、Lakeflow 宣言型パイプラインの更新をトリガーする Web アクティビティの後に Until アクティビティ を追加する方法があります。Until アクティビティでは、次の操作を行います。

  1. Wait アクティビティを追加して、更新が完了するまで設定された秒数だけ待機します。
  2. Wait アクティビティの後に、 Lakeflow 宣言型パイプライン更新の詳細要求を使用して更新のステータスを取得する Web アクティビティを追加します。 応答の state フィールドは、更新が完了したかどうかを含め、更新の現在の状態を返します。
  3. state フィールドの値を使用して、Until アクティビティの終了条件を設定します。また、 Set Variableアクティビティ を使用して、 state 値に基づいてパイプライン変数を追加し、この変数を終了条件に使用することもできます。