Databricksで最初のETLワークロードを実行する

Databricksの本番対応ツールを使用して、データオーケストレーション用の最初の抽出、変換、ロード(ETL)パイプラインを開発およびデプロイする方法をご紹介します。

この記事の終わりまでに、以下をスムーズに行えるようになります。

  1. Databricks 汎用コンピュート クラスターの起動

  2. Databricks ノートブックの作成

  3. Auto Loaderを使用したDelta Lakeへの増分データ取り込みの構成

  4. ノートブックのセルを実行して、データを処理、クエリー、およびプレビュー

  5. ノートブックを Databricks ジョブとしてスケジュール

このチュートリアルでは、対話型ノートブックを使用して、PythonまたはScalaによる一般的なETLタスクを完了します。

Delta Live Tables使用してETLパイプラインを構築することもできます。 DatabricksDelta Live Tables本番運用ETL パイプラインの構築、デプロイ、保守の複雑さを軽減するために を作成しました。「チュートリアル: 最初の Delta Live Tables パイプラインを実行する」を参照してください。

Databricks Terraformプロバイダーを使用してこの記事のリソースを作成することもできます。「Terraformでクラスター、ノートブック、ジョブを作成する」を参照してください。

要件

クラスター制御権限がない場合でも、 クラスターにアクセスできる限り、以下のステップのほとんどを完了できます。

ステップ1:クラスターを作成する

探索的データ分析とデータエンジニアリングを行うには、コマンドの実行に必要なコンピュートリソースを提供するクラスターを作成します。

  1. コンピュートアイコン サイドバー の 「コンピュート 」をクリックします

  2. 「コンピュート」ページで、「クラスターを作成」をクリックします。これにより、「新しいクラスター」ページが開きます。

  3. クラスターの一意の名前を指定し、残りの値はデフォルトのままにして、「クラスターを作成」をクリックします。

Databricks クラスターの詳細については、「 コンピュート」を参照してください。

ステップ2:Databricksノートブックを作成する

Databricksで対話型コードの作成と実行を開始するには、ノートブックを作成します。

  1. 新しいアイコン サイドバーで 「 新規 」をクリック し 、「 ノートブック」 をクリックします。

  2. 「ノートブックの作成」ページで、以下の操作を行います。

    • ノートブックの一意の名前を指定します。

    • デフォルトの言語がPythonまたはScalaに設定されていることを確認してください。

    • クラスター」ドロップダウンからステップ1で作成したクラスターを選択します。

    • 作成」をクリックします。

ノートブックが開き、上部に空のセルが表示されます。

ノートブックの作成と管理の詳細については、「ノートブックの管理」を参照してください。

ステップ3:データをDelta Lakeに取り込むようにAuto Loaderを構成する

Databricks では、増分データの取り込みにAuto Loaderを使用することをお勧めしています。Auto Loaderは、新しいファイルがクラウドオブジェクトストレージに到着すると、自動的に検出して処理します。

Databricksでは、Delta Lakeを使用してデータを保存することをお勧めします。Delta Lakeは、ACIDトランザクションを提供し、データレイクハウスを可能にするオープンソースストレージレイヤーです。Delta Lakeは、Databricksで作成されるデフォルトのテーブル形式です。

Delta Lakeテーブルにデータを取り込むようにAuto Loaderを設定するには、以下のコードをコピーしてノートブックの空のセルに貼り付けます。

# Import functions
from pyspark.sql.functions import col, current_timestamp

# Define variables used in code below
file_path = "/databricks-datasets/structured-streaming/events"
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
table_name = f"{username}_etl_quickstart"
checkpoint_path = f"/tmp/{username}/_checkpoint/etl_quickstart"

# Clear out data from previous demo execution
spark.sql(f"DROP TABLE IF EXISTS {table_name}")
dbutils.fs.rm(checkpoint_path, True)

# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(file_path)
  .select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(availableNow=True)
  .toTable(table_name))
// Imports
import org.apache.spark.sql.functions.current_timestamp
import org.apache.spark.sql.streaming.Trigger
import spark.implicits._

// Define variables used in code below
val file_path = "/databricks-datasets/structured-streaming/events"
val username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first.get(0)
val table_name = s"${username}_etl_quickstart"
val checkpoint_path = s"/tmp/${username}/_checkpoint"

// Clear out data from previous demo execution
spark.sql(s"DROP TABLE IF EXISTS ${table_name}")
dbutils.fs.rm(checkpoint_path, true)

// Configure Auto Loader to ingest JSON data to a Delta table
spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(file_path)
  .select($"*", $"_metadata.file_path".as("source_file"), current_timestamp.as("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(Trigger.AvailableNow)
  .toTable(table_name)

このコードで定義された変数を使用すると、既存のワークスペースアセットや他のユーザーと競合するリスクを負うことなく、コードを安全に実行できます。ネットワークまたはストレージのアクセス許可が制限されていると、このコードを実行するとエラーが発生します。これらの制限のトラブルシューティングについては、ワークスペース管理者に問い合わせてください。

Auto Loaderの詳細については、「Auto Loaderとは」を参照してください。

ステップ4:データを処理し、操作する

ノートブックは、セルごとにロジックを実行します。セル内のロジックを実行するには、以下を実行します。

  1. 前の手順で完了したセルを実行するには、セルを選択してSHIFT+ENTERキーを押します。

  2. 作成したテーブルを検索するには、以下のコードをコピーして空のセルに貼り付け、Shift+Enterキーを押してセルを実行します。

    df = spark.read.table(table_name)
    
    val df = spark.read.table(table_name)
    
  3. DataFrame内のデータをプレビューするには、以下のコードを空のセルにコピーして貼り付け、Shift + Enterキーを押してセルを実行します。

    display(df)
    
    display(df)
    

データを視覚化するための対話型オプションの詳細については、「Databricksノートブックでのビジュアライゼーション」を参照してください。

ステップ5:ジョブをスケジュールする

DatabricksノートブックをDatabricksジョブのタスクとして追加することで、Databricksノートブックを本番運用スクリプトとして実行できます。このステップでは、手動でトリガーできる新しいジョブを作成します。

ノートブックをタスクとしてスケジュールするには、以下を実行します。

  1. ヘッダーバーの右側にある「スケジュール」をクリックします。

  2. ジョブ名」に一意の名前を入力します。

  3. 手動」をクリックします。

  4. クラスター」ドロップダウンで、ステップ1で作成したクラスターを選択します。

  5. 作成」をクリックします。

  6. 表示されるウィンドウで、「今すぐ実行」をクリックします。

  7. ジョブ実行の結果を表示するには、[外部リンク 最終実行 タイムスタンプ] の横にあるアイコンをクリックします 。

ジョブの詳細については、「 Databricks ジョブとは」を参照してください。

その他の統合

Databricksを使用したデータエンジニアリングのための統合とツールの詳細をご覧ください。