Apache Airflow で Databricksジョブをオーケストレーション
この記事では、ApacheAirflow を使用してデータ パイプラインをオーケストレーションするためのDatabricks Airflowサポートについて説明し、 ローカルにインストールして構成する手順を示し、 を使用してDatabricks ワークフローをデプロイおよび実行する例を示します。Airflow
データパイプラインでのジョブオーケストレーション
データ処理パイプラインの開発と展開では、多くの場合、タスク間の複雑な依存関係の管理が必要になります。 たとえば、パイプラインはソースからデータを読み取り、データをクリーンアップし、クリーンアップされたデータを変換し、変換されたデータをターゲットに書き込みます。 パイプラインを運用化するときに、テスト、スケジュール設定、エラーのトラブルシューティングのサポートも必要です。
ワークフロー システムは、タスク間の依存関係を定義し、パイプラインを実行するタイミングをスケジュールし、ワークフローを監視できるようにすることで、これらの課題に対処します。 Apache Airflow は、データパイプラインを管理およびスケジューリングするためのオープンソースソリューションです。 Airflowは、データパイプラインを操作の有向非巡回グラフ (DAG) として表します。 Python ファイルでワークフローを定義すると、Airflow がスケジュールと実行を管理します。 Airflow Databricks 接続を使用すると、Databricks が提供する最適化された Spark エンジンを Airflow のスケジュール機能で活用できます。
要件
Airflow と Databricks の統合には、Airflow バージョン 2.5.0 以降が必要です。 この記事の例は、Airflow バージョン 2.6.1 でテストされています。
Airflow には Python 3.8、3.9、3.10、または 3.11 が必要です。 この記事の例は Python 3.8 でテストされています。
この記事の Airflow をインストールして実行する手順では、 Python 仮想環境 を作成するために pipenv が 必要です。
DatabricksのAirflowオペレータ
Airflow DAG はタスクで構成され、各タスクはAirflow Operator を実行します。 Databricks への統合をサポートする Airflow オペレーターは、 Databricks プロバイダーに実装されています。
Databricks プロバイダーには、 テーブルへのデータのインポート、 SQL クエリの実行、 Databricks Git フォルダーの操作など、Databricks ワークスペースに対してさまざまなタスクを実行するための演算子が含まれています。
Databricks プロバイダーは、ジョブをトリガーするための 2 つの演算子を実装します。
DatabricksRunNowOperator には既存の Databricks ジョブが必要であり、POST /api/2.1/ジョブ/実行-now を使用します 実行をトリガーするための API 要求。 Databricks では、ジョブ定義の重複を減らし、この演算子でトリガーされたジョブの実行をジョブ UI で見つけることができるため、
DatabricksRunNowOperator
を使用することをお勧めします。DatabricksSubmitRunOperator は、 Databricksにジョブが存在する必要はなく、 POST /api/2.1/ジョブ/実行/submitを使用します。 ジョブ仕様を送信し、実行をトリガーするための API リクエスト。
新しい Databricks ジョブを作成したり、既存のジョブをリセットしたりするために、Databricks プロバイダーはDatabricksCreateJobsOperatorを実装します。 DatabricksCreateJobsOperator
はPOST /api/2.1/Job/createを使用します。 そしてPOST /api/2.1/Job/reset API リクエスト。 DatabricksCreateJobsOperator
とDatabricksRunNowOperator
を組み合わせてジョブを作成し、実行することができます。
注:
Databricks オペレーターを使用してジョブをトリガーするには、Databricks 接続構成で資格情報を提供する必要があります。 「 Databricks用の パーソナル アクセスの作成」を 参照してください。Airflow
Databricks Airflow オペレーターは、ジョブ実行ページの URL をpolling_period_seconds
ごとに Airflow ログに書き込みます (デフォルトは 30 秒)。 詳細については、 Web サイトの apache-Airflow providers-databricks パッケージ ページを参照してください。Airflow
Airflow Databricksインテグレーションをローカルにインストールする
テストと開発のためにAirflowとDatabricksプロバイダーをローカルにインストールするには、次のステップを使用します。 インストールのその他のオプションAirflow (本番運用インストールの作成を含む) については、Airflow ドキュメントの インストール を参照してください。
ターミナルを開き、次のコマンドを実行します。
mkdir airflow
cd airflow
pipenv --python 3.8
pipenv shell
export AIRFLOW_HOME=$(pwd)
pipenv install apache-airflow
pipenv install apache-airflow-providers-databricks
mkdir dags
airflow db init
airflow users create --username admin --firstname <firstname> --lastname <lastname> --role Admin --email <email>
<firstname>
、 <lastname>
、 <email>
をユーザー名とEメールで置き換えます。 管理者ユーザーのパスワードを入力するように求められます。 このパスワードは Airflow UI にログインするために必要なので、必ず保存してください。
このスクリプトは次のステップを実行します。
airflow
という名前のディレクトリを作成し、そのディレクトリに変更します。pipenv
を使用して、Python 仮想環境を作成して生成します。 Databricks では、パッケージ バージョンとコードの依存関係をその環境に分離するために、Python 仮想環境を使用することをお勧めします。 この分離により、予期しないパッケージ バージョンの不一致やコード依存関係の競合を減らすことができます。airflow
ディレクトリのパスに設定されたAIRFLOW_HOME
という名前の環境変数を初期化します。Airflow および Airflow Databricks プロバイダー パッケージをインストールします。
airflow/dags
ディレクトリを作成します。Airflow はdags
ディレクトリを使用して DAG 定義を保存します。Airflow がメタデータを追跡するために使用する SQLite データベースを初期化します。 本番運用Airflowデプロイメントでは、標準データベースを使用してAirflowを構成します。 Airflow デプロイメントの SQLite データベースとデフォルト構成は、
airflow
ディレクトリで初期化されます。Airflow の管理者ユーザーを作成します。
ヒント
Databricks プロバイダーのインストールを確認するには、Airflow インストール ディレクトリで次のコマンドを実行します。
airflow providers list
Airflowウェブサーバーとスケジューラを起動します
Airflow UI を表示するには、Airflow Web サーバーが必要です。 Web サーバーを起動するには、Airflow インストール ディレクトリでターミナルを開き、次のコマンドを実行します。
注:
ポートの競合により Airflow Web サーバーの起動に失敗した場合は、 Airflow 構成でデフォルト ポートを変更できます。
pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow webserver
スケジューラは、DAG をスケジュールする Airflow コンポーネントです。 スケジューラを起動するには、Airflow インストール ディレクトリで新しいターミナルを開き、次のコマンドを実行します。
pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow scheduler
Airflowのインストールをテストします
Airflowのインストールを確認するには、Airflowに含まれている DAG の例のいずれかを実行します。
ブラウザ ウィンドウで、
http://localhost:8080/home
を開きます。 Airflow のインストール時に作成したユーザー名とパスワードを使用して、Airflow UI にログインします。 Airflow DAGページが表示されます。DAG の停止/一時停止解除トグルをクリックして、サンプル DAG の 1 つ (例:
example_python_operator
を一時停止解除します。「 DAG のトリガー 」ボタンをクリックして、サンプルの DAG をトリガーします。
DAG 名をクリックすると、DAG の実行ステータスなどの詳細が表示されます。
Airflow 用の Databricks 個人用アクセストークンを作成する
Airflow は、Databricks 個人用アクセストークン (PAT) を使用して Databricks に接続します。PAT を作成するには、ワークスペース ユーザー向けの個人用アクセストークンDatabricksのステップに従います。
注:
自動化されたツール、システム、スクリプト、アプリを使用して認証する場合のセキュリティのベスト プラクティスとして、Databricks ではOAuth トークンの使用を推奨しています。
個人アクセスウイルス認証を使用する場合、 Databricks ワークスペース ユーザーではなくサービスプリンシパルに属する個人アクセスウイルスを使用することをお勧めします。 サービスプリンシパル用のウイルスを作成するには、 「サービスプリンシパル用のウイルスの管理」を参照してください。
サービスプリンシパルの Databricksを使用して に対して認証することもできます。DatabricksOAuthAirflow ドキュメントの「Databricks 接続」を参照してください。
Databricks 接続を構成する
Airflowのインストールには、Databricks のデフォルト接続が含まれています。 上記で作成した個人用アクセストークンを使用してワークスペースに接続するように接続を更新するには:
ブラウザ ウィンドウで、
http://localhost:8080/connection/list/
を開きます。 ログインを求められたら、管理者のユーザー名とパスワードを入力します。[Conn ID]で[安全]を見つけ、 [レコードの編集]ボタンをクリックします。
ホストフィールドの値を、Databricks デプロイメントのワークスペース インスタンス名(例:
https://adb-123456789.cloud.databricks.com
に置き換えます。[パスワード]フィールドに、 Databricksの個人アクセス権を入力します。
[保存]をクリックします。
例: Airflow DAG を作成して Databricks ジョブを実行する
次の例は、ローカル マシンで実行され、サンプル DAG をデプロイして Databricks で実行をトリガーする単純な Airflow デプロイを作成する方法を示しています。 この例では、次のことを行います。
新しいノートブックを作成し、構成された問題に基づいて挨拶を印刷するコードを追加します。
ノートブックを実行する単一のタスクを含む Databricks ジョブを作成します。
Databricks ワークスペースへのAirflow接続を構成します。
ノートブック ジョブをトリガーするAirflow DAG を作成します。 Python スクリプトで DAG を定義するには、
DatabricksRunNowOperator
を使用します。Airflow UI を使用して DAG をトリガーし、実行状態を表示します。
ノートブックを作成する
この例では、2 つのセルを含むノートブックを使用します。
最初のセルには、当然の値
world
に設定されたgreeting
という名前の変数を定義するDatabricksテキスト ウィジェットが含まれています。2 番目のセルは、プレフィックス
hello
が付いたgreeting
変数の値を出力します。
ノートブックを作成するには:
Databricksワークスペースに移動し、クリックしますサイドバーで「新規」をクリックし、 「ノートブック」を選択します。
ノートブックに「Hello Airflow 」などの名前を付け、デフォルトの言語が「Python」に設定されていることを確認します。
次のPythonコードをコピーして、ノートブックの最初のセルに貼り付けます。
dbutils.widgets.text("greeting", "world", "Greeting") greeting = dbutils.widgets.get("greeting")
最初のセルの下に新しいセルを追加し、次の Python コードをコピーして新しいセルに貼り付けます。
print("hello {}".format(greeting))
ジョブの作成
サイドバーの[ワークフロー]をクリックします。
[ ] をクリックします。
「タスク」タブが表示され、タスクの作成ダイアログが表示されます。
Add a name for your job...(ジョブの名前の追加)をジョブ名に置き換えてください。
[タスク名]フィールドにタスクの名前を入力します (たとえば、 greeting-タスク) 。
[タイプ]ドロップダウン メニューで、 [ノートブック]を選択します。
[ソース]ドロップダウン メニューで、 [ワークスペース]を選択します。
[パス] テキスト ボックスをクリックし、ファイル ブラウザーを使用して作成したノートブックを見つけ、ノートブック名をクリックして、 [確認] をクリックします。
パラメーターの下の追加をクリックします。キーフィールドに
greeting
を入力します。値フィールドにAirflow user
を入力します。「タスクを作成」をクリックします。
ジョブの詳細パネルで、ジョブ ID の値をコピーします。 この値は、Airflow からジョブをトリガーするために必要です。
ジョブの実行
Databricks ジョブ UI で新しいジョブをテストするには、右上隅の をクリックします 。 実行が完了したら、 ジョブ実行の詳細を表示して出力を確認できます。
新しいAirflow DAGを作成する
Airflow DAG は Python ファイルで定義します。 ノートブック ジョブの例をトリガーする DAG を作成するには:
テキストエディタまたは IDE で、次の内容で
databricks_dag.py
という名前の新しいファイルを作成します。from airflow import DAG from airflow.providers.databricks.operators.databricks import DatabricksRunNowOperator from airflow.utils.dates import days_ago default_args = { 'owner': 'airflow' } with DAG('databricks_dag', start_date = days_ago(2), schedule_interval = None, default_args = default_args ) as dag: opr_run_now = DatabricksRunNowOperator( task_id = 'run_now', databricks_conn_id = 'databricks_default', job_id = JOB_ID )
JOB_ID
を、先ほど保存したジョブ ID の値に置き換えます。ファイルを
airflow/dags
ディレクトリに保存します。 Airflowは、airflow/dags/
に保存されている DAG ファイルを自動的に読み取り、インストールします。