Apache Airflow で Databricks ジョブをオーケストレーション

この記事では、Databricks を使用してデータパイプラインを調整するための Apache Airflow のサポートについて説明し、Airflow をローカルにインストールして構成する手順を示し、Airflow を使用して Databricks ワークフローをデプロイして実行する例を示します。

データパイプライン でのジョブオーケストレーション

データ処理パイプラインの開発とデプロイでは、多くの場合、タスク間の複雑な依存関係を管理する必要があります。 たとえば、パイプラインでは、ソースからデータを読み取り、データをクリーンアップし、クリーンアップされたデータを変換し、変換されたデータをターゲットに書き込む場合があります。 また、パイプラインを運用化するときに、エラーのテスト、スケジュール設定、およびトラブルシューティングのサポートも必要です。

ワークフロー システムは、タスク間の依存関係を定義し、パイプラインを実行するタイミングをスケジュールし、ワークフローを監視できるようにすることで、これらの課題に対処します。 Apache Airflow は、データパイプラインを管理およびスケジューリングするためのオープンソースソリューションです。 エアフローは、データパイプラインを操作の有向非巡回グラフ (DAG) として表します。 Python ファイルでワークフローを定義すると、Airflow がスケジュールと実行を管理します。 Airflow Databricks 接続を使用すると、Databricks が提供する最適化された Spark エンジンを Airflow のスケジュール機能で活用できます。

要件

  • Airflow と Databricks の統合には、Airflow バージョン 2.5.0 以降が必要です。 この記事の例は、Airflow バージョン 2.6.1 でテストされています。

  • Airflow には Python 3.8、3.9、3.10、または 3.11 が必要です。 この記事の例は、Python 3.8 でテストされています。

  • この記事で Airflow をインストールして実行する手順では、 Python 仮想環境 を作成するために pipenv が必要です。

Databricks のためのエアフローオペレータ

Airflow DAG はタスクで構成され、各タスクは Airflow Operator を実行します。Databricks への統合をサポートする Airflow 演算子は、 Databricks プロバイダーに実装されます。

Databricks プロバイダーには、 テーブルへのデータのインポートSQL クエリの実行Databricks Git フォルダーの操作など、Databricks ワークスペースに対してさまざまなタスクを実行するためのオペレーターが含まれています。

Databricks プロバイダーは、ジョブをトリガーするための 2 つの演算子を実装します。

  • DatabricksRunNowOperator には既存の Databricks ジョブが必要であり、POST /api/2.1/jobs/実行-now を使用します 実行をトリガーするための API 要求。 Databricks では、ジョブ定義の重複が減り、この演算子でトリガーされたジョブの実行がジョブ UI に表示されるため、DatabricksRunNowOperatorの使用を推奨しています。

  • DatabricksSubmitRunOperator は、Databricks にジョブが存在する必要はなく、POST /api/2.1/jobs/実行/submit を使用します。 ジョブ仕様を送信し、実行をトリガーするための API 要求。

新しい Databricks ジョブを作成したり、既存のジョブをリセットしたりするために、Databricks プロバイダーは DatabricksCreateJobsOperator を実装しますDatabricksCreateJobsOperatorPOST /api/2.1/ジョブ/create を使用します。 と POST /api/2.1/ジョブ/リセット API 要求。 DatabricksRunNowOperatorDatabricksCreateJobsOperatorを使用して、ジョブを作成および実行できます。

Databricks オペレーターを使用してジョブをトリガーするには、Databricks 接続構成で資格情報を指定する必要があります。 「Airflow の Databricks 個人用アクセストークンを作成する」を参照してください。

Databricks Airflow オペレーターは、ジョブ実行ページの URL を polling_period_seconds ごとに Airflow ログに書き込みます (デフォルトは 30 秒)。 詳細については、Airflow Web サイトの apache-airflow-providers-databricks パッケージ ページを参照してください。

Airflow Databricksインテグレーションをローカルにインストールする

テストと開発のために Airflow と Databricks プロバイダーをローカルにインストールするには、次の手順を使用します。 本番運用インストールの作成など、その他の Airflow インストール オプションについては、Airflow ドキュメントの「 インストール 」を参照してください。

ターミナルを開き、次のコマンドを実行します。

mkdir airflow
cd airflow
pipenv --python 3.8
pipenv shell
export AIRFLOW_HOME=$(pwd)
pipenv install apache-airflow
pipenv install apache-airflow-providers-databricks
mkdir dags
airflow db init
airflow users create --username admin --firstname <firstname> --lastname <lastname> --role Admin --email <email>

<firstname><lastname><email> をユーザー名と電子メールに置き換えます。admin ユーザーのパスワードを入力するように求められます。 このパスワードは Airflow UI にログインするために必要であるため、必ず保存してください。

このスクリプトは、次の手順を実行します。

  1. airflow という名前のディレクトリを作成し、そのディレクトリに移動します。

  2. pipenv を使用して Python 仮想環境を作成および生成します。Databricks では、Python 仮想環境を使用して、パッケージのバージョンとコードの依存関係をその環境に分離することをお勧めします。 この分離により、予期しないパッケージ バージョンの不一致やコード依存関係の競合を減らすことができます。

  3. AIRFLOW_HOME という名前の環境変数を初期化しairflowディレクトリのパスに設定します。

  4. Airflow と Airflow Databricks プロバイダー パッケージをインストールします。

  5. airflow/dagsディレクトリを作成します。Airflow は dags ディレクトリを使用して DAG 定義を格納します。

  6. Airflow がメタデータの追跡に使用する SQLite データベースを初期化します。 本番運用 Airflow 展開では、標準データベースを使用して Airflow を構成します。 Airflow 展開の SQLite データベースとデフォルト構成は、 airflow ディレクトリで初期化されます。

  7. Airflow の管理者ユーザーを作成します。

ヒント

Databricks プロバイダーのインストールを確認するには、Airflow インストール ディレクトリで次のコマンドを実行します。

airflow providers list

エアフローウェブサーバーとスケジューラ を起動します

Airflow UI を表示するには、Airflow Web サーバーが必要です。 Webサーバーを起動するには、Airflowインストールディレクトリでターミナルを開き、次のコマンドを実行します。

ポートの競合が原因で Airflow Web サーバの起動に失敗した場合は、 Airflow 構成でデフォルト ポートを変更できます。

pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow webserver

スケジューラは、DAG をスケジュールする Airflow コンポーネントです。 スケジューラを起動するには、Airflow インストール ディレクトリで新しいターミナルを開き、次のコマンドを実行します。

pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow scheduler

エアフローのインストール をテストします

エアフローのインストールを確認するには、エアフローに含まれている DAG の例のいずれかを実行します。

  1. ブラウザ ウィンドウで、 http://localhost:8080/homeを開きます。 Airflow のインストール時に作成したユーザ名とパスワードを使用して Airflow UI にログインします。 [Airflow DAGs ] ページが表示されます。

  2. [停止する/一時停止しない DAG] トグルをクリックして、例の DAG の 1 つ (example_python_operatorなど) の一時停止を解除します。

  3. [Trigger DAG] ボタンをクリックして、サンプルの DAG をトリガーします。

  4. DAG 名をクリックすると、DAG の実行状態などの詳細が表示されます。

Airflow 用の Databricks 個人用アクセストークンを作成する

Airflow は、Databricks 個人用アクセストークン (PAT) を使用して Databricks に接続します。 PAT を作成するには、次の手順を実行します。

  1. Databricks ワークスペースで、上部のバーにある Databricks ユーザー名をクリックし、ドロップダウンから[設定]を選択します。

  2. [ 開発者] をクリックします。

  3. [アクセストークン] の横にある [管理] をクリックします。

  4. [ 新しいトークンの生成] をクリックします。

  5. (任意)今後このトークンを識別するのに役立つコメントを入力し、トークンのデフォルトの有効期間である90日を変更します。有効期間のないトークンを作成するには(非推奨)、[有効期間 (日) ] ボックスを空白のままにしてください。

  6. [生成] をクリックします。

  7. 表示されたトークンを安全な場所にコピーし、[完了] をクリックします。

コピーしたトークンは、必ず安全な場所に保存してください。 コピーしたトークンを他のユーザーと共有しないでください。 コピーしたトークンを紛失した場合、まったく同じトークンを再生成することはできません。 代わりに、この手順を繰り返して新しいトークンを作成する必要があります。 コピーしたトークンを紛失した場合、またはトークンが侵害されたと思われる場合は、アクセストークン ページでトークンの横にあるごみ箱 (取り消し) アイコンをクリックして、ワークスペースからそのトークンをすぐに削除することを強くお勧めします。

ワークスペースでトークンを作成または使用できない場合は、ワークスペース管理者がトークンを無効にしたか、トークンを作成または使用する権限を与えていないことが原因である可能性があります。ワークスペース管理者に問い合わせるか、以下をご覧ください。

自動化されたツール、システム、スクリプト、アプリを使用して認証する場合のセキュリティのベスト プラクティスとして、Databricks ではOAuth トークンを使用することをお勧めします。

個人用アクセストークン認証を使用する場合、Databricks では、ワークスペース ユーザーではなく、 サービスプリンシパル に属する個人用アクセストークンを使用することをお勧めします。 サービスプリンシパルのトークンを作成するには、「 サービスプリンシパルのトークンを管理する」を参照してください。

また、サービスプリンシパルの Databricks OAuth を使用して Databricks に対して認証を行うこともできます。 Airflow ドキュメントの「 Databricks Connection 」を参照してください。

Databricks 接続 を構成する

エアフローのインストールには、Databricks のデフォルト接続が含まれています。 上記で作成した個人用アクセストークンを使用してワークスペースに接続するように接続を更新するには:

  1. ブラウザ ウィンドウで、 http://localhost:8080/connection/list/を開きます。 サインインを求められたら、管理者のユーザー名とパスワードを入力します。

  2. [Conn ID] で [デフォルト] を見つけて、[レコードの編集] ボタンをクリックします。

  3. [ ホスト ] フィールドの値を、Databricks デプロイの ワークスペース インスタンス名 (例: https://adb-123456789.cloud.databricks.com) に置き換えます。

  4. [ パスワード ] フィールドに、Databricks の個人用アクセストークンを入力します。

  5. [保存]をクリックします。

例: Airflow DAG を作成して Databricks ジョブを実行する

次の例は、ローカル コンピューターで実行される単純な Airflow デプロイを作成し、Databricks で実行をトリガーするサンプル DAG をデプロイする方法を示しています。 この例では、次のことを行います。

  1. 新しいノートブックを作成し、構成されたパラメーターに基づいてあいさつ文を印刷するコードを追加します。

  2. ノートブックを実行する 1 つのタスクを含む Databricks ジョブを作成します。

  3. Databricks ワークスペースへのエアフロー接続を構成します。

  4. ノートブック ジョブをトリガーするエアフロー DAG を作成します。 Python スクリプトで DAG を定義するには、 DatabricksRunNowOperatorを使用します。

  5. エアフロー UI を使用して DAG をトリガーし、実行状態を表示します。

ノートブック を作成する

この例では、2 つのセルを含むノートブックを使用します。

ノートブックを作成するには:

  1. Databricks ワークスペースに移動し、サイドバーの [新しいアイコン 新規 ] をクリックして、 [ノートブック ] を選択します。

  2. ノートブックに 「Hello Airflow」などの名前を付け、既定の言語が Python に設定されていることを確認します。

  3. 次の Python コードをコピーし、ノートブックの最初のセルに貼り付けます。

    dbutils.widgets.text("greeting", "world", "Greeting")
    greeting = dbutils.widgets.get("greeting")
    
  4. 最初のセルの下に新しいセルを追加し、次の Python コードをコピーして新しいセルに貼り付けます。

    print("hello {}".format(greeting))
    

ジョブ の作成

  1. ジョブアイコン サイドバー の [ワークフロー] をクリックします

  2. をクリックします [ジョブの作成] ボタン

    「タスク」タブが表示され、タスクの作成ダイアログが表示されます。

    最初のタスクの作成ダイアログ
  3. [ジョブの名前を追加する...] をジョブ名に置き換えます。

  4. [ タスク名 ] フィールドに、タスクの名前を入力します (例: あいさつタスク)。

  5. [ 種類 ] ドロップダウン メニューで、[ ノートブック] を選択します。

  6. ソース 」ドロップダウンメニューで、「 ワークスペース」を選択します。

  7. [ パス ] テキスト ボックスをクリックし、ファイル ブラウザーを使用して作成したノートブックを検索し、ノートブック名をクリックして [ 確認] をクリックします。

  8. [パラメーター] の下の [追加] をクリックします。キー フィールドに、 greetingと入力します。 フィールドに、 Airflow userと入力します。

  9. [ タスクの作成] をクリックします。

[ ジョブの詳細 ] パネルで、[ ジョブ ID ] の値をコピーします。 この値は、Airflow からジョブをトリガーするために必要です。

ジョブ の実行

Databricks ワークフロー UI で新しいジョブをテストするには、右上隅の をクリックします [今すぐ実行] ボタン 。 実行が完了したら、 ジョブ実行の詳細を表示して出力を確認できます。

新しいエアフロー DAG を作成する

エアフロー DAG は Python ファイルで定義します。 ノートブック ジョブの例をトリガーする DAG を作成するには:

  1. テキストエディタまたは IDE で、次の内容の databricks_dag.py という名前の新しいファイルを作成します。

    from airflow import DAG
    from airflow.providers.databricks.operators.databricks import DatabricksRunNowOperator
    from airflow.utils.dates import days_ago
    
    default_args = {
      'owner': 'airflow'
    }
    
    with DAG('databricks_dag',
      start_date = days_ago(2),
      schedule_interval = None,
      default_args = default_args
      ) as dag:
    
      opr_run_now = DatabricksRunNowOperator(
        task_id = 'run_now',
        databricks_conn_id = 'databricks_default',
        job_id = JOB_ID
      )
    

    JOB_ID を、前に保存したジョブ ID の値に置き換えます。

  2. ファイルを airflow/dags ディレクトリに保存します。 エアフローは、 airflow/dags/に保存されている DAG ファイルを自動的に読み取り、インストールします。

エアフロー に DAG をインストールして確認します

エアフローUIでDAGをトリガーして確認するには、次の手順を実行します。

  1. ブラウザ ウィンドウで、 http://localhost:8080/homeを開きます。 [Airflow DAGs ] 画面が表示されます。

  2. databricks_dag を見つけて、 停止する/一時停止しない DAG トグルをクリックして DAG の一時停止を解除します。

  3. [Trigger DAG] ボタンをクリックして DAG をトリガーします。

  4. [実行] 列で実行をクリックして、実行の状態と詳細を表示します。