Apache Airflow を使用した Databricks ジョブのオーケストレーション
この記事では、ApacheAirflow を使用してデータパイプラインをオーケストレーションするためのDatabricks Airflowサポートについて説明し、 をローカルにインストールおよび構成する手順と、 を使用してDatabricks ワークフローをデプロイおよび実行する例を示します。Airflow
データパイプラインでのジョブオーケストレーション
データ処理パイプラインを開発およびデプロイするには、多くの場合、タスク間の複雑な依存関係を管理する必要があります。 たとえば、パイプラインは、ソースからデータを読み取り、データをクリーニングし、クリーニングされたデータを変換し、変換されたデータをターゲットに書き込む場合があります。 また、パイプラインを運用化する際には、テスト、スケジューリング、エラーのトラブルシューティングのサポートも必要です。
ワークフロー システムは、タスク間の依存関係を定義し、パイプラインを実行するタイミングをスケジュールし、ワークフローを監視できるようにすることで、これらの課題に対処します。 Apache Airflow は、データパイプラインを管理およびスケジューリングするためのオープンソースソリューションです。 Airflowは、データパイプラインを操作の有向非巡回グラフ (DAG) として表します。 Python ファイルでワークフローを定義すると、Airflow がスケジュールと実行を管理します。 Airflow Databricks 接続を使用すると、Databricks が提供する最適化された Spark エンジンを Airflow のスケジュール機能で活用できます。
必要条件
- Airflow と Databricks の統合には、Airflow バージョン 2.5.0 以降が必要です。 この記事の例は、Airflow バージョン 2.6.1 でテストされています。
- Airflow には、Python 3.8、3.9、3.10、または 3.11 が必要です。 この記事の例は、Python 3.8 でテストされています。
- この記事の手順でAirflowをインストールして実行するには、Python仮想環境 を作成するためにpipenv が必要です。
Airflowの オペレーターDatabricks
Airflow DAG はタスクで構成され、各タスクは Airflow Operator を実行します。Databricks への統合をサポートする Airflow オペレーターは、 Databricks プロバイダーに実装されます。
Databricks プロバイダーには、 テーブルへのデータのインポート、 SQL クエリの実行、 Databricks Git フォルダーの操作など、Databricks ワークスペースに対してさまざまなタスクを実行するオペレーターが含まれています。
Databricks プロバイダーは、ジョブをトリガーするための 2 つの演算子を実装します。
- DatabricksRunNowOperator には既存の Databricks ジョブが必要であり、POST /api/2.1/ジョブ/実行-now を使用します 実行をトリガーするための API 要求。 Databricks では、ジョブ定義の重複を減らし、この演算子でトリガーされたジョブの実行をジョブ UI で見つけることができるため、
DatabricksRunNowOperator
を使用することをお勧めします。 - DatabricksSubmitRunOperator は、ジョブが Databricks に存在する必要はなく、POST /api/2.1/ジョブ/実行/submit を使用します ジョブ仕様を送信し、実行をトリガーするための API 要求。
新しい Databricks ジョブを作成するか、既存のジョブをリセットするために、Databricks プロバイダーは DatabricksCreateJobsOperator を実装します。 DatabricksCreateJobsOperator
は POST /api/2.1/ジョブ/create を使用します および POST /api/2.1/ジョブ/リセット API 要求。 DatabricksRunNowOperator
と DatabricksCreateJobsOperator
を使用して、ジョブを作成および実行できます。
Databricks オペレーターを使用してジョブをトリガーするには、Databricks 接続構成で資格情報を指定する必要があります。 「Airflow の Databricks 個人用アクセス トークンを作成する」を参照してください。
Databricks Airflow オペレーターは、polling_period_seconds
ごとにジョブ実行ページの URL をAirflowログに書き込みます (デフォルトは 30 秒です)。詳細については、 Web サイトのapache-Airflow -providers-databricks パッケージ ページを参照してください。Airflow
Airflow Databricksインテグレーションをローカルにインストールする
テストと開発のために Airflow と Databricks プロバイダーをローカルにインストールするには、次の手順に従います。 その他のAirflow インストール・オプション (本番運用インストールの作成など)については、Airflow 資料 のインストール を参照してください。
ターミナルを開き、次のコマンドを実行します。
mkdir airflow
cd airflow
pipenv --python 3.8
pipenv shell
export AIRFLOW_HOME=$(pwd)
pipenv install apache-airflow
pipenv install apache-airflow-providers-databricks
mkdir dags
airflow db init
airflow users create --username admin --firstname <firstname> --lastname <lastname> --role Admin --email <email>
<firstname>
、<lastname>
、<email>
をユーザー名とEメールに置き換えます。管理者ユーザーのパスワードを入力するように求められます。 このパスワードは、 Airflow UIにログインするために必要であるため、必ず保存してください。
このスクリプトは、次の手順を実行します。
airflow
という名前のディレクトリを作成し、そのディレクトリに変更します。pipenv
を使用して、Python 仮想環境を作成および生成します。Databricks では、Python 仮想環境を使用して、パッケージのバージョンとコードの依存関係をその環境に分離することをお勧めします。 この分離により、予期しないパッケージ バージョンの不一致やコード依存関係の競合を減らすことができます。airflow
ディレクトリのパスに設定されたAIRFLOW_HOME
という名前の環境変数を初期化します。- Airflow と Airflow Databricks プロバイダー パッケージをインストールします。
airflow/dags
ディレクトリを作成します。Airflow は、dags
ディレクトリを使用して DAG 定義を格納します。- Airflowがメタデータの追跡に使用するSQLiteデータベースを初期化します。 本番運用 Airflow デプロイメントでは、標準データベースを使用して Airflow を構成します。 Airflow デプロイの SQLite データベースとデフォルト設定は、
airflow
ディレクトリで初期化されます。 - Airflow の管理者ユーザーを作成します。
Databricks プロバイダーのインストールを確認するには、Airflow インストール ディレクトリで次のコマンドを実行します。
airflow providers list
AirflowのWebサーバーとスケジューラーを起動します
Airflow UI を表示するには、Airflow Web サーバーが必要です。 Webサーバーを起動するには、Airflowインストールディレクトリでターミナルを開き、次のコマンドを実行します。
ポートの競合が原因でAirflow Webサーバーの起動に失敗した場合は、 Airflow構成のデフォルトポートを変更できます。
pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow webserver
スケジューラは、DAG をスケジュールする Airflow コンポーネントです。 スケジューラを起動するには、Airflowインストールディレクトリで新しいターミナルを開き、次のコマンドを実行します。
pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow scheduler
Airflow のインストールをテストする
Airflowのインストールを確認するには、Airflowに含まれている DAG の例のいずれかを実行します。
- ブラウザ ウィンドウで、
http://localhost:8080/home
を開きます。 Airflow のインストール時に作成したユーザー名とパスワードを使用してAirflowUI にログインします。[Airflow DAGs ] ページが表示されます。 - [停止する/一時停止解除] 切り替えボタンをクリックして、例の DAG の 1 つ (
example_python_operator
など) の一時停止を解除します。 - 「DAGをトリガー」ボタンをクリックして、 サンプルDAGをトリガーします 。
- DAG 名をクリックすると、DAG の実行ステータスなどの詳細が表示されます。
Airflow の Databricks 個人用アクセス トークンを作成する
Airflow は、Databricks パーソナル アクセス トークン (PAT) を使用して Databricks に接続します。 PAT を作成するには、「 ワークスペース ユーザーの Databricks 個人用アクセス トークン」の手順に従います。
自動化されたツール、システム、スクリプト、アプリで認証する際のセキュリティのベストプラクティスとして、Databricks では OAuth トークンを使用することをお勧めします。
personal access token authentication を使用する場合、 Databricks では、ワークスペース ユーザーではなく 、サービスプリンシパル に属する personal access token を使用することをお勧めします。 サービスプリンシパルのトークンを作成するには、「 サービスプリンシパルのトークンの管理」を参照してください。
また、サービスプリンシパルの [] Databricksを使用して に対して認証することもできます。DatabricksOAuthAirflow のドキュメントの 「Databricks Connection 」を参照してください。
Databricks 接続を構成する
Airflowのインストールには、Databricks のデフォルト接続が含まれています。 上記で作成した個人用アクセストークンを使用してワークスペースに接続するように接続を更新するには:
- ブラウザウィンドウで
http://localhost:8080/connection/list/
を開きます。 サインインを求められたら、管理者のユーザー名とパスワードを入力します。 - [Conn ID ] で [デフォルト ] を見つけて [レコードの編集 ] ボタンをクリックします。
- [ホスト ] フィールドの値を、Databricks デプロイのワークスペース インスタンス名 (
https://adb-123456789.cloud.databricks.com
など) に置き換えます。 - [パスワード ] フィールドに、Databricks の個人用アクセス トークンを入力します。
- [ 保存 ]をクリックします。
例: Databricks ジョブを実行するための Airflow DAG を作成する
次の例は、ローカル コンピューター上で実行され、Databricks での実行をトリガーするサンプル DAG をデプロイする単純な Airflow デプロイを作成する方法を示しています。 この例では、次のことを行います。
- 新しいノートブックを作成し、構成されたパラメーターに基づいてあいさつ文を印刷するコードを追加します。
- ノートブックを実行する 1 つのタスクを含む Databricks ジョブを作成します。
- Databricks ワークスペースへのAirflow接続を構成します。
- ノートブック ジョブをトリガーするAirflow DAG を作成します。 Python スクリプトで DAG を定義するには、
DatabricksRunNowOperator
を使用します。 - Airflow UI を使用して DAG をトリガーし、実行状態を表示します。
ノートブックを作成する
この例では、2 つのセルを含むノートブックを使用します。
- 最初のセルには、 Databricks ユーティリティ テキスト ウィジェット が含まれており、
greeting
という名前の変数がデフォルト値world
に設定されています。 - 2 番目のセルは、プレフィックス
hello
が付いたgreeting
変数の値を出力します。
ノートブックを作成するには:
-
Databricks ワークスペースに移動し、サイドバーの [
新規] をクリックして、[ ノートブック] を選択します。
-
ノートブックに Hello Airflow などの名前を付け、デフォルトの言語が Python に設定されていることを確認します。
-
次のPythonコードをコピーして、ノートブックの最初のセルに貼り付けます。
Pythondbutils.widgets.text("greeting", "world", "Greeting")
greeting = dbutils.widgets.get("greeting") -
最初のセルの下に新しいセルを追加し、次の Python コードをコピーして新しいセルに貼り付けます。
Pythonprint("hello {}".format(greeting))
ジョブを作成する
-
サイドバーの
[ ワークフロー ]をクリックします。
-
[
] をクリックします。
「タスク」 タブが表示され、タスクの作成ダイアログが表示されます。
-
Add a name for your job...(ジョブの名前の追加) をジョブ名に置き換えてください。
-
[タスク名 ] フィールドに、タスクの名前 ( greeting-task など) を入力します。
-
[ 種類 ] ドロップダウン メニューで [ ノートブック ] を選択します。
-
[ソース ] ドロップダウン メニューで、[ ワークスペース] を選択します。
-
[ パス ] テキスト ボックスをクリックし、ファイル ブラウザーを使用して作成したノートブックを検索し、ノートブック名をクリックして [ 確認] をクリックします。
-
パラメーター の下の 追加 をクリックします。 キー フィールドに
greeting
を入力します。 値 フィールドにAirflow user
を入力します。 -
「 タスクを作成 」をクリックします。
[ジョブの詳細 ] パネルで、[ ジョブ ID ] の値をコピーします。この値は、Airflow からジョブをトリガーするために必要です。
ジョブを実行する
Databricks ジョブ UI で新しいジョブをテストするには、右上隅にある [ ] をクリックします。 実行が完了したら、 ジョブ実行の詳細を表示して出力を確認できます。
新しい Airflow DAG を作成する
Airflow DAG は Python ファイルで定義します。 ノートブック ジョブの例をトリガーする DAG を作成するには:
-
テキストエディタまたは IDE で、次の内容で
databricks_dag.py
という名前の新しいファイルを作成します。Pythonfrom airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksRunNowOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow'
}
with DAG('databricks_dag',
start_date = days_ago(2),
schedule_interval = None,
default_args = default_args
) as dag:
opr_run_now = DatabricksRunNowOperator(
task_id = 'run_now',
databricks_conn_id = 'databricks_default',
job_id = JOB_ID
)JOB_ID
を、前に保存したジョブ ID の値に置き換えます。 -
ファイルを
airflow/dags
ディレクトリに保存します。 Airflowは、airflow/dags/
に保存されている DAG ファイルを自動的に読み取り、インストールします。
AirflowにDAGをインストールして確認します
Airflow UIでDAGをトリガーして確認するには、次の手順を実行します。
- ブラウザ ウィンドウで、
http://localhost:8080/home
を開きます。 [Airflow DAGs ] 画面が表示されます。 databricks_dag
を見つけて、 停止する/一時停止解除 トグルをクリックして DAG の一時停止を解除します。- 「DAGトリガー」 ボタンをクリックしてDAGをトリガーします 。
- [実行 ] 列で実行をクリックすると、実行のステータスと詳細が表示されます。