Databricks で初めての ETL ワークロードの実行
Databricksの本番対応ツールを使用して、データオーケストレーション用の最初の抽出、変換、ロード(ETL)パイプラインを開発およびデプロイする方法をご紹介します。
この記事の終わりまでに、以下をスムーズに行えるようになります。
- Databricks all-purposeコンピュート クラスターを起動。
- Databricks ノートブックを作成。
- Auto LoaderでDelta Lakeに増分データ取り込みをするための設定。
- データを処理、クエリ、プレビューするために、ノートブックのセルを実行。
- ノートブックを Databricks ジョブとしてスケジュールする。
このチュートリアルでは、対話型ノートブックを使用して、PythonまたはScalaによる一般的なETLタスクを完了します。
DLT を使用して ETL パイプラインを構築することもできます。Databricks は、本番運用 ETL パイプラインの構築、デプロイ、保守の複雑さを軽減するために DLT を作成しました。 「 チュートリアル: 初めての DLT パイプラインを実行する」を参照してください。
Databricks Terraform プロバイダーを使用して、この記事のリソースを作成することもできます。「Terraformを使用したクラスター、ノートブック、ジョブの作成」を参照してください。
必要条件
- Databricksワークスペースにログインしていること。
- クラスターを作成する権限があります。
クラスター制御権限がない場合でも、 クラスターにアクセスできる限り、以下のほとんどの手順を完了できます。
ステップ 1: クラスターを作成する
探索的データ分析とデータエンジニアリングを行うには、コマンドの実行に必要なコンピュートリソースを提供するクラスターを作成します。
- サイドバーの
[ コンピュート ]をクリックします。
- 「コンピュート」ページで、「 クラスターを作成 」をクリックします。これにより、「新しいクラスター」ページが開きます。
- クラスターの一意の名前を指定し、残りの値はデフォルトのままにして、「 クラスターを作成 」をクリックします。
Databricks クラスターの詳細については、「コンピュート」を参照してください。
手順 2: Databricks ノートブックを作成する
ワークスペースにノートブックを作成するには、サイドバーの「 新規 」をクリックし、「 ノートブック 」をクリックします。空白のノートブックがワークスペースで開きます。
ノートブックの作成と管理の詳細については、「 ノートブックの管理」を参照してください。
ステップ 3: Delta Lakeにデータを取り込むようにAuto Loaderを設定する
Databricks では、増分データ取り込みに Auto Loader を使用することをお勧めします。Auto Loader は、新しいファイルがクラウド オブジェクト ストレージに到着すると、自動的に検出して処理します。
Databricks では、 Delta Lake を使用してデータを格納することをお勧めします。 Delta Lake は、 ACIDトランザクションを提供し、データレイクハウスを有効にするオープンソース ストレージ レイヤーです。 Delta Lake は、 Databricksで作成されたテーブルのデフォルト形式です。
Delta Lakeテーブルにデータを取り込むようにAuto Loaderを設定するには、以下のコードをコピーしてノートブックの空のセルに貼り付けます。
- Python
- Scala
# Import functions
from pyspark.sql.functions import col, current_timestamp
# Define variables used in code below
file_path = "/databricks-datasets/structured-streaming/events"
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
table_name = f"{username}_etl_quickstart"
checkpoint_path = f"/tmp/{username}/_checkpoint/etl_quickstart"
# Clear out data from previous demo execution
spark.sql(f"DROP TABLE IF EXISTS {table_name}")
dbutils.fs.rm(checkpoint_path, True)
# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
.select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(availableNow=True)
.toTable(table_name))
// Imports
import org.apache.spark.sql.functions.current_timestamp
import org.apache.spark.sql.streaming.Trigger
import spark.implicits._
// Define variables used in code below
val file_path = "/databricks-datasets/structured-streaming/events"
val username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first.get(0)
val table_name = s"${username}_etl_quickstart"
val checkpoint_path = s"/tmp/${username}/_checkpoint"
// Clear out data from previous demo execution
spark.sql(s"DROP TABLE IF EXISTS ${table_name}")
dbutils.fs.rm(checkpoint_path, true)
// Configure Auto Loader to ingest JSON data to a Delta table
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
.select($"*", $"_metadata.file_path".as("source_file"), current_timestamp.as("processing_time"))
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(Trigger.AvailableNow)
.toTable(table_name)
このコードで定義された変数を使用すると、既存のワークスペースアセットや他のユーザーと競合するリスクを負うことなく、コードを安全に実行できます。ネットワークまたはストレージのアクセス許可が制限されていると、このコードを実行するとエラーが発生します。これらの制限のトラブルシューティングについては、ワークスペース管理者に問い合わせてください。
Auto Loaderの詳細については、「Auto Loaderとは」を参照してください。
ステップ 4: データの処理と操作
ノートブックは、セルごとにロジックを実行します。セル内のロジックを実行するには、以下を実行します。
-
前の手順で完了したセルを実行するには、セルを選択して SHIFT+ENTER キーを押します。
-
作成したテーブルを検索するには、以下のコードをコピーして空のセルに貼り付け、 Shift+Enter キーを押してセルを実行します。
- Python
- Scala
df = spark.read.table(table_name)
val df = spark.read.table(table_name)
- データフレーム内のデータをプレビューするには、以下のコードを空のセルにコピーして貼り付け、 Shift + Enter キーを押してセルを実行します。
- Python
- Scala
display(df)
display(df)
データを視覚化するための対話型オプションの詳細については、「 Databricks ノートブックでの視覚化」を参照してください。
ステップ 5: ジョブをスケジュールする
DatabricksノートブックをDatabricksジョブのタスクとして追加することで、Databricksノートブックを本番運用スクリプトとして実行できます。このステップでは、手動でトリガーできる新しいジョブを作成します。
ノートブックをタスクとしてスケジュールするには、以下を実行します。
- ヘッダーバーの右側にある「 スケジュール 」をクリックします。
- 「 ジョブ名 」に一意の名前を入力します。
- 「 手動 」をクリックします。
- 「 クラスター 」ドロップダウンで、ステップ1で作成したクラスターを選択します。
- 作成 をクリックします。
- 表示されるウィンドウで、「 今すぐ実行 」をクリックします。
- ジョブの実行結果を表示するには、
最終実行 タイムスタンプの横にある アイコンをクリックします。
ジョブの詳細については、「 ジョブとは」を参照してください。
追加の統合
Databricksを使用したデータエンジニアリングのための統合とツールの詳細をご覧ください。