メインコンテンツまでスキップ

チュートリアル: Databricks プラットフォーム上の Apache Spark を使用して ETL パイプラインを構築する

このチュートリアルでは、Apache Spark を使用したデータ オーケストレーション用の最初の ETL (抽出、変換、読み込み) パイプラインを開発してデプロイする方法を示します。このチュートリアルでは Databricks 汎用コンピュートを使用しますが、ワークスペースで有効になっている場合は、サーバレス コンピュートを使用することもできます。

Lakeflow Spark宣言型パイプラインを使用してETLパイプラインを構築することもできます。 Databricks Lakeflow Spark宣言型パイプラインは、本番運用ETLラインの構築、デプロイ、保守の複雑さを軽減します。 「チュートリアル: Lakeflow Spark宣言型パイプラインを使用してETLパイプラインを構築する」を参照してください。

この記事の終わりまでに、次の方法がわかります。

  1. Databricks汎用コンピュート リソースを起動します
  2. Databricksノートブックを作成します
  3. Auto Loaderを使用してDelta Lakeへの増分データ取り込みを設定します
  4. データを処理し、データとやり取りする
  5. ノートブックをDatabricksジョブとしてスケジュールします

このチュートリアルでは、対話型ノートブックを使用して、PythonまたはScalaによる一般的なETLタスクを完了します。

Databricks Terraform プロバイダーを使用して、この記事のリソースを作成することもできます。Terraformを使用したクラスター、ノートブック、ジョブの作成を参照してください。

必要条件

注記

コンピュート制御権限を持っていない場合でも、コンピュートリソースにアクセスできる限り、以下のステップのほとんどを完了できます。

ステップ 1: コンピュート リソースを作成する

探索的データ分析やデータエンジニアリングを行うには、コマンドを実行するためのコンピュート リソースを作成します。

  1. サイドバーのコンピュートアイコンコンピュート ]をクリックします。
  2. 「コンピュート」ページで、「 コンピュートを作成 」をクリックします。
  3. コンピュート リソースに一意の名前を指定し、残りの値はそのままの状態にして、 [コンピュートの作成] を クリックします。

Databricksコンピュートの詳細については、「コンピュートを参照してください。

手順 2: Databricks ノートブックを作成する

ワークスペースにノートブックを作成するには、サイドバーの「新しいアイコン 新規 」をクリックし、「 ノートブック 」をクリックします。空白のノートブックがワークスペースで開きます。

ノートブックの作成と管理の詳細については、「 ノートブックの管理」を参照してください。

ステップ 3: Delta Lakeにデータを取り込むようにAuto Loaderを設定する

Databricks では、増分データ取り込みに Auto Loader を使用することをお勧めします。Auto Loader は、新しいファイルがクラウド オブジェクト ストレージに到着すると、自動的に検出して処理します。

Databricks では、 Delta Lake を使用してデータを格納することをお勧めします。 Delta Lake は、 ACIDトランザクションを提供し、データレイクハウスを有効にするオープンソース ストレージ レイヤーです。 Delta Lake は、 Databricksで作成されたテーブルのデフォルト形式です。

Delta Lakeテーブルにデータを取り込むようにAuto Loaderを設定するには、以下のコードをコピーしてノートブックの空のセルに貼り付けます。

Python
# Import functions
from pyspark.sql.functions import col, current_timestamp

# Define variables used in code below
file_path = "/databricks-datasets/structured-streaming/events"
username = spark.sql("SELECT regexp_replace(session_user(), '[^a-zA-Z0-9]', '_')").first()[0]
table_name = f"{username}_etl_quickstart"
checkpoint_path = f"/tmp/{username}/_checkpoint/etl_quickstart"

# Clear out data from previous demo execution
spark.sql(f"DROP TABLE IF EXISTS {table_name}")
dbutils.fs.rm(checkpoint_path, True)

# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
.select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(availableNow=True)
.toTable(table_name))
注記

このコードで定義された変数を使用すると、既存のワークスペースアセットや他のユーザーと競合するリスクを負うことなく、コードを安全に実行できます。ネットワークまたはストレージのアクセス許可が制限されていると、このコードを実行するとエラーが発生します。これらの制限のトラブルシューティングについては、ワークスペース管理者に問い合わせてください。

Auto Loaderの詳細については、「Auto Loaderとは」を参照してください。

ステップ 4: データの処理と操作

ノートブックは、セルごとにロジックを実行します。セル内のロジックを実行するには、以下を実行します。

  1. 前の手順で完了したセルを実行するには、セルを選択して SHIFT+ENTER キーを押します。

  2. 作成したテーブルに対してクエリを実行するには、次のコードをコピーして空のセルに貼り付け、 Shift + Enter キーを押してセルを実行します。

Python
df = spark.read.table(table_name)
  1. データフレーム内のデータをプレビューするには、以下のコードを空のセルにコピーして貼り付け、 Shift + Enter キーを押してセルを実行します。
Python
display(df)

データを視覚化するための対話型オプションの詳細については、「 Databricks ノートブックと SQL エディターでの視覚化」を参照してください。

ステップ 5: ジョブをスケジュールする

DatabricksノートブックをDatabricksジョブのタスクとして追加することで、Databricksノートブックを本番運用スクリプトとして実行できます。このステップでは、手動でトリガーできる新しいジョブを作成します。

ノートブックをタスクとしてスケジュールするには、以下を実行します。

  1. ヘッダーバーの右側にある「 スケジュール 」をクリックします。
  2. ジョブ名 」に一意の名前を入力します。
  3. 手動 」をクリックします。
  4. コンピュート ドロップダウンで、ステップ 1 で作成したコンピュート リソースを選択します。
  5. 作成 をクリックします。
  6. 表示されるウィンドウで、「 今すぐ実行 」をクリックします。
  7. ジョブの実行結果を表示するには、外部リンク 最終実行 タイムスタンプの横にある アイコンをクリックします。

ジョブの詳細については、「 ジョブとは」を参照してください。

追加の統合

Databricksを使用したデータエンジニアリングのための統合とツールの詳細をご覧ください。