メインコンテンツまでスキップ

ワークフローで DLT パイプラインを実行する

DLT パイプラインは、Databricks ジョブ、Apache Airflow、または Azure Data Factory を使用して、データ処理ワークフローの一部として実行できます。

ジョブ

Databricks ジョブで複数のタスクをオーケストレーションして、データ処理ワークフローを実装できます。 DLT パイプラインをジョブに含めるには、ジョブの作成時に パイプライン タスクを使用します。 ジョブの DLT パイプライン タスクを参照してください。

Apache Airflow

Apache Airflow は、データワークフローを管理およびスケジューリングするためのオープンソースソリューションです。 Airflow は、ワークフローを操作の有向非巡回グラフ (DAG) として表します。 Pythonファイルでワークフローを定義し、Airflowがスケジューリングと実行を管理します。 with のインストールと使用に関する情報については、「AirflowDatabricks を使用してDatabricks ジョブをオーケストレーションApacheAirflow する」を参照してください。

DLT パイプラインを Airflow ワークフローの一部として実行するには、 DatabricksSubmitRunOperator を使用します。

必要条件

DLT の Airflow サポートを使用するには、次のものが必要です。

  • Airflowバージョン2.1.0 またはそれ以降。
  • Databricks プロバイダー パッケージ バージョン 2.1.0またはそれ以降。

次の例では、識別子 が DLT パイプラインの更新をトリガーする Airflow DAG を作成します8279d543-063c-4d63-9926-dae38e35ce8b

Python
from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
from airflow.utils.dates import days_ago

default_args = {
'owner': 'airflow'
}

with DAG('dlt',
start_date=days_ago(2),
schedule_interval="@once",
default_args=default_args
) as dag:

opr_run_now=DatabricksSubmitRunOperator(
task_id='run_now',
databricks_conn_id='CONNECTION_ID',
pipeline_task={"pipeline_id": "8279d543-063c-4d63-9926-dae38e35ce8b"}
)

CONNECTION_ID をワークスペースへの Airflow 接続の識別子に置き換えます。

この例を airflow/dags ディレクトリに保存し、Airflow UI を使用して DAG を表示およびトリガー します。 DLT UI を使用して、パイプラインの更新の詳細を表示します。

Azure データ ファクトリ

注記

DLT と Azure Data Factory にはそれぞれ、エラー発生時の再試行回数を構成するオプションが含まれています。DLT パイプライン 、パイプラインを呼び出す Azure Data Factory アクティビティで再試行値が構成されている場合、再試行回数は、Azure Data Factory の再試行値に DLT 再試行値を掛けた値になります。

たとえば、パイプラインの更新が失敗した場合、DLT はデフォルトによって最大 5 回更新を再試行します。 Azure Data Factory の再試行が 3 回に設定され、DLT パイプラインが 5 回の再試行のデフォルトを使用している場合、失敗した DLT パイプラインは最大 15 回再試行される可能性があります。パイプラインの更新が失敗したときに過剰な再試行を避けるために、Databricks では、DLT パイプラインまたはパイプラインを呼び出す Azure Data Factory アクティビティを構成するときに、再試行回数を制限することをお勧めします。

DLT パイプラインの再試行設定を変更するには、パイプラインの設定時に pipelines.numUpdateRetryAttempts 設定を使用します。

Azure Data Factory は、データ統合と変換のワークフローを調整できるクラウドベースの ETL サービスです。 Azure Data Factory は、ノートブック、JAR タスク、Python スクリプトなど、ワークフローでの Databricks タスクの実行を直接サポートします。 また、Azure Data FactoryWeb アクティビティ から DLTAPI を呼び出すことで、ワークフローにパイプラインを含めることもできます。たとえば、Azure Data Factory からパイプラインの更新をトリガーするには、次のようにします。

  1. データ ファクトリを作成する か、既存のデータ ファクトリを開きます。

  2. 作成が完了したら、データ ファクトリのページを開き、 Azure Data Factory Studio を開く タイルをクリックします。 Azure データ ファクトリのユーザー インターフェイスが表示されます。

  3. Azure Data Factory Studio ユーザー インターフェイスの 新規 ドロップダウン メニューから パイプライン を選択して、新しい Azure Data Factoryパイプラインを作成します。

  4. アクティビティ ツールボックスで、 全般 を展開し、 Web アクティビティをパイプライン キャンバスにドラッグします。 設定 タブをクリックし、次の値を入力します。

注記

自動化されたツール、システム、スクリプト、アプリで認証する際のセキュリティのベストプラクティスとして、Databricks では OAuth トークンを使用することをお勧めします。

personal access token authentication を使用する場合、 Databricks では、ワークスペース ユーザーではなく 、サービスプリンシパル に属する personal access token を使用することをお勧めします。 サービスプリンシパルのトークンを作成するには、「 サービスプリンシパルのトークンの管理」を参照してください。

  • URL : https://<databricks-instance>/api/2.0/pipelines/<pipeline-id>/updates.

    <get-workspace-instance>を置き換えます。

    <pipeline-id> をパイプライン識別子に置き換えます。

  • 方法 : ドロップダウン メニューから POST を選択します。

  • ヘッダー : + 新規 をクリックします。 名前 テキスト ボックスに、 Authorizationと入力します。 テキスト ボックスに、 Bearer <personal-access-token>と入力します。

    <personal-access-token> を Databricks 個人用アクセス トークンに置き換えます。

  • 本文 : 追加の要求パラメーターを渡すには、パラメーターを含む JSON ドキュメントを入力します。 たとえば、更新を開始し、パイプラインのすべてのデータを再処理するには、 {"full_refresh": "true"}です。 追加の要求パラメーターがない場合は、空の中括弧 ({}) を入力します。

Web アクティビティをテストするには、Data Factory UI のパイプライン ツール バーで [デバッグ ] をクリックします。実行の出力と状態 (エラーを含む) は、Azure Data Factory パイプラインの [出力 ] タブに表示されます。DLT UI を使用して、パイプラインの更新の詳細を表示します。

ヒント

一般的なワークフロー要件は、前のタスクの完了後にタスクを開始することです。 DLT updates 要求は非同期であるため (更新の開始後、更新が完了する前に要求が返されるため)、DLT 更新に依存する Azure Data Factory パイプライン内のタスクは、更新が完了するまで待機する必要があります。更新の完了を待つオプションとして、DLT 更新をトリガーする Web アクティビティの後に Until アクティビティ を追加する方法があります。Until アクティビティでは、次の操作を行います。

  1. Wait アクティビティを追加して、更新が完了するまで設定された秒数だけ待機します。
  2. DLT 更新の詳細要求を使用して更新のステータスを取得する Wait アクティビティの後に Web アクティビティを追加します。応答の state フィールドは、更新が完了したかどうかを含め、更新の現在の状態を返します。
  3. state フィールドの値を使用して、Until アクティビティの終了条件を設定します。また、 Set Variableアクティビティ を使用して、 state 値に基づいてパイプライン変数を追加し、この変数を終了条件に使用することもできます。