メインコンテンツまでスキップ

Apache Airflow を使用した Databricks ジョブのオーケストレーション

この記事では、ApacheAirflow を使用してデータパイプラインをオーケストレーションするためのDatabricks Airflowサポートについて説明し、 をローカルにインストールおよび構成する手順と、 を使用してDatabricks ワークフローをデプロイおよび実行する例を示します。Airflow

データパイプラインでのジョブオーケストレーション

データ処理パイプラインを開発およびデプロイするには、多くの場合、タスク間の複雑な依存関係を管理する必要があります。 たとえば、パイプラインは、ソースからデータを読み取り、データをクリーニングし、クリーニングされたデータを変換し、変換されたデータをターゲットに書き込む場合があります。 また、パイプラインを運用化する際には、テスト、スケジューリング、エラーのトラブルシューティングのサポートも必要です。

ワークフロー システムは、タスク間の依存関係を定義し、パイプラインを実行するタイミングをスケジュールし、ワークフローを監視できるようにすることで、これらの課題に対処します。 Apache Airflow は、データパイプラインを管理およびスケジューリングするためのオープンソースソリューションです。 Airflowは、データパイプラインを操作の有向非巡回グラフ (DAG) として表します。 Python ファイルでワークフローを定義すると、Airflow がスケジュールと実行を管理します。 Airflow Databricks 接続を使用すると、Databricks が提供する最適化された Spark エンジンを Airflow のスケジュール機能で活用できます。

必要条件

  • Airflow と Databricks の統合には、Airflow バージョン 2.5.0 以降が必要です。 この記事の例は、Airflow バージョン 2.6.1 でテストされています。
  • Airflow には、Python 3.8、3.9、3.10、または 3.11 が必要です。 この記事の例は、Python 3.8 でテストされています。
  • この記事の手順でAirflowをインストールして実行するには、Python仮想環境 を作成するためにpipenv が必要です。

Airflowの オペレーターDatabricks

Airflow DAG はタスクで構成され、各タスクは Airflow Operator を実行します。Databricks への統合をサポートする Airflow オペレーターは、 Databricks プロバイダーに実装されます。

Databricks プロバイダーには、 テーブルへのデータのインポートSQL クエリの実行Databricks Git フォルダーの操作など、Databricks ワークスペースに対してさまざまなタスクを実行するオペレーターが含まれています。

Databricks プロバイダーは、ジョブをトリガーするための 2 つの演算子を実装します。

新しい Databricks ジョブを作成するか、既存のジョブをリセットするために、Databricks プロバイダーは DatabricksCreateJobsOperator を実装します。 DatabricksCreateJobsOperatorPOST /api/2.1/ジョブ/create を使用します および POST /api/2.1/ジョブ/リセット API 要求。 DatabricksRunNowOperatorDatabricksCreateJobsOperator を使用して、ジョブを作成および実行できます。

注記

Databricks オペレーターを使用してジョブをトリガーするには、Databricks 接続構成で資格情報を指定する必要があります。 「Airflow の Databricks 個人用アクセス トークンを作成する」を参照してください。

Databricks Airflow オペレーターは、polling_period_secondsごとにジョブ実行ページの URL をAirflowログに書き込みます (デフォルトは 30 秒です)。詳細については、 Web サイトのapache-Airflow -providers-databricks パッケージ ページを参照してください。Airflow

Airflow Databricksインテグレーションをローカルにインストールする

テストと開発のために Airflow と Databricks プロバイダーをローカルにインストールするには、次の手順に従います。 その他のAirflow インストール・オプション (本番運用インストールの作成など)については、Airflow 資料 のインストール を参照してください。

ターミナルを開き、次のコマンドを実行します。

Bash
mkdir airflow
cd airflow
pipenv --python 3.8
pipenv shell
export AIRFLOW_HOME=$(pwd)
pipenv install apache-airflow
pipenv install apache-airflow-providers-databricks
mkdir dags
airflow db init
airflow users create --username admin --firstname <firstname> --lastname <lastname> --role Admin --email <email>

<firstname><lastname><email>をユーザー名とEメールに置き換えます。管理者ユーザーのパスワードを入力するように求められます。 このパスワードは、 Airflow UIにログインするために必要であるため、必ず保存してください。

このスクリプトは、次の手順を実行します。

  1. airflow という名前のディレクトリを作成し、そのディレクトリに変更します。
  2. pipenv を使用して、Python 仮想環境を作成および生成します。Databricks では、Python 仮想環境を使用して、パッケージのバージョンとコードの依存関係をその環境に分離することをお勧めします。 この分離により、予期しないパッケージ バージョンの不一致やコード依存関係の競合を減らすことができます。
  3. airflow ディレクトリのパスに設定された AIRFLOW_HOME という名前の環境変数を初期化します。
  4. Airflow と Airflow Databricks プロバイダー パッケージをインストールします。
  5. airflow/dagsディレクトリを作成します。Airflow は、 dags ディレクトリを使用して DAG 定義を格納します。
  6. Airflowがメタデータの追跡に使用するSQLiteデータベースを初期化します。 本番運用 Airflow デプロイメントでは、標準データベースを使用して Airflow を構成します。 Airflow デプロイの SQLite データベースとデフォルト設定は、 airflow ディレクトリで初期化されます。
  7. Airflow の管理者ユーザーを作成します。
ヒント

Databricks プロバイダーのインストールを確認するには、Airflow インストール ディレクトリで次のコマンドを実行します。

Bash
airflow providers list

AirflowのWebサーバーとスケジューラーを起動します

Airflow UI を表示するには、Airflow Web サーバーが必要です。 Webサーバーを起動するには、Airflowインストールディレクトリでターミナルを開き、次のコマンドを実行します。

注記

ポートの競合が原因でAirflow Webサーバーの起動に失敗した場合は、 Airflow構成のデフォルトポートを変更できます。

Bash
pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow webserver

スケジューラは、DAG をスケジュールする Airflow コンポーネントです。 スケジューラを起動するには、Airflowインストールディレクトリで新しいターミナルを開き、次のコマンドを実行します。

Bash
pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow scheduler

Airflow のインストールをテストする

Airflowのインストールを確認するには、Airflowに含まれている DAG の例のいずれかを実行します。

  1. ブラウザ ウィンドウで、 http://localhost:8080/homeを開きます。 Airflow のインストール時に作成したユーザー名とパスワードを使用してAirflowUI にログインします。[Airflow DAGs ] ページが表示されます。
  2. [停止する/一時停止解除] 切り替えボタンをクリックして、例の DAG の 1 つ (example_python_operatorなど) の一時停止を解除します。
  3. 「DAGをトリガー」ボタンをクリックして、 サンプルDAGをトリガーします
  4. DAG 名をクリックすると、DAG の実行ステータスなどの詳細が表示されます。

Airflow の Databricks 個人用アクセス トークンを作成する

Airflow は、Databricks パーソナル アクセス トークン (PAT) を使用して Databricks に接続します。 PAT を作成するには、「 ワークスペース ユーザーの Databricks 個人用アクセス トークン」の手順に従います。

注記

自動化されたツール、システム、スクリプト、アプリで認証する際のセキュリティのベストプラクティスとして、Databricks では OAuth トークンを使用することをお勧めします。

personal access token authentication を使用する場合、 Databricks では、ワークスペース ユーザーではなく 、サービスプリンシパル に属する personal access token を使用することをお勧めします。 サービスプリンシパルのトークンを作成するには、「 サービスプリンシパルのトークンの管理」を参照してください。

また、サービスプリンシパルの [] Databricksを使用して に対して認証することもできます。DatabricksOAuthAirflow のドキュメントの 「Databricks Connection 」を参照してください。

Databricks 接続を構成する

Airflowのインストールには、Databricks のデフォルト接続が含まれています。 上記で作成した個人用アクセストークンを使用してワークスペースに接続するように接続を更新するには:

  1. ブラウザウィンドウで http://localhost:8080/connection/list/ を開きます。 サインインを求められたら、管理者のユーザー名とパスワードを入力します。
  2. [Conn ID ] で [デフォルト ] を見つけて [レコードの編集 ] ボタンをクリックします。
  3. [ホスト ] フィールドの値を、Databricks デプロイのワークスペース インスタンス名 (https://adb-123456789.cloud.databricks.comなど) に置き換えます。
  4. [パスワード ] フィールドに、Databricks の個人用アクセス トークンを入力します。
  5. [ 保存 ]をクリックします。

例: Databricks ジョブを実行するための Airflow DAG を作成する

次の例は、ローカル コンピューター上で実行され、Databricks での実行をトリガーするサンプル DAG をデプロイする単純な Airflow デプロイを作成する方法を示しています。 この例では、次のことを行います。

  1. 新しいノートブックを作成し、構成されたパラメーターに基づいてあいさつ文を印刷するコードを追加します。
  2. ノートブックを実行する 1 つのタスクを含む Databricks ジョブを作成します。
  3. Databricks ワークスペースへのAirflow接続を構成します。
  4. ノートブック ジョブをトリガーするAirflow DAG を作成します。 Python スクリプトで DAG を定義するには、 DatabricksRunNowOperatorを使用します。
  5. Airflow UI を使用して DAG をトリガーし、実行状態を表示します。

ノートブックを作成する

この例では、2 つのセルを含むノートブックを使用します。

ノートブックを作成するには:

  1. Databricks ワークスペースに移動し、サイドバーの [新しいアイコン 新規] をクリックして、[ ノートブック] を選択します。

  2. ノートブックに Hello Airflow などの名前を付け、デフォルトの言語が Python に設定されていることを確認します。

  3. 次のPythonコードをコピーして、ノートブックの最初のセルに貼り付けます。

    Python
    dbutils.widgets.text("greeting", "world", "Greeting")
    greeting = dbutils.widgets.get("greeting")
  4. 最初のセルの下に新しいセルを追加し、次の Python コードをコピーして新しいセルに貼り付けます。

    Python
    print("hello {}".format(greeting))

ジョブを作成する

  1. サイドバーのワークフローアイコンワークフロー ]をクリックします。

  2. [ 「ジョブを作成」ボタン] をクリックします。

    「タスク」 タブが表示され、タスクの作成ダイアログが表示されます。

    最初のタスクダイアログの作成

  3. Add a name for your job...(ジョブの名前の追加) をジョブ名に置き換えてください。

  4. [タスク名 ] フィールドに、タスクの名前 ( greeting-task など) を入力します。

  5. [ 種類 ] ドロップダウン メニューで [ ノートブック ] を選択します。

  6. [ソース ] ドロップダウン メニューで、[ ワークスペース] を選択します。

  7. [ パス ] テキスト ボックスをクリックし、ファイル ブラウザーを使用して作成したノートブックを検索し、ノートブック名をクリックして [ 確認] をクリックします。

  8. パラメーター の下の 追加 をクリックします。 キー フィールドにgreetingを入力します。 フィールドにAirflow userを入力します。

  9. タスクを作成 」をクリックします。

[ジョブの詳細 ] パネルで、[ ジョブ ID ] の値をコピーします。この値は、Airflow からジョブをトリガーするために必要です。

ジョブを実行する

Databricks ジョブ UI で新しいジョブをテストするには、右上隅にある [ 「今すぐ実行」ボタン ] をクリックします。 実行が完了したら、 ジョブ実行の詳細を表示して出力を確認できます。

新しい Airflow DAG を作成する

Airflow DAG は Python ファイルで定義します。 ノートブック ジョブの例をトリガーする DAG を作成するには:

  1. テキストエディタまたは IDE で、次の内容で databricks_dag.py という名前の新しいファイルを作成します。

    Python
    from airflow import DAG
    from airflow.providers.databricks.operators.databricks import DatabricksRunNowOperator
    from airflow.utils.dates import days_ago

    default_args = {
    'owner': 'airflow'
    }

    with DAG('databricks_dag',
    start_date = days_ago(2),
    schedule_interval = None,
    default_args = default_args
    ) as dag:

    opr_run_now = DatabricksRunNowOperator(
    task_id = 'run_now',
    databricks_conn_id = 'databricks_default',
    job_id = JOB_ID
    )

    JOB_ID を、前に保存したジョブ ID の値に置き換えます。

  2. ファイルを airflow/dags ディレクトリに保存します。 Airflowは、 airflow/dags/に保存されている DAG ファイルを自動的に読み取り、インストールします。

AirflowにDAGをインストールして確認します

Airflow UIでDAGをトリガーして確認するには、次の手順を実行します。

  1. ブラウザ ウィンドウで、 http://localhost:8080/homeを開きます。 [Airflow DAGs ] 画面が表示されます。
  2. databricks_dagを見つけて、 停止する/一時停止解除 トグルをクリックして DAG の一時停止を解除します。
  3. 「DAGトリガー」 ボタンをクリックしてDAGをトリガーします
  4. [実行 ] 列で実行をクリックすると、実行のステータスと詳細が表示されます。