メインコンテンツまでスキップ

Databricks で初めての ETL ワークロードの実行

Databricksの本番対応ツールを使用して、データオーケストレーション用の最初の抽出、変換、ロード(ETL)パイプラインを開発およびデプロイする方法をご紹介します。

この記事の終わりまでに、以下をスムーズに行えるようになります。

  1. Databricks all-purposeコンピュート クラスターを起動
  2. Databricks ノートブックを作成
  3. Auto LoaderでDelta Lakeに増分データ取り込みをするための設定
  4. データを処理、クエリ、プレビューするために、ノートブックのセルを実行
  5. ノートブックを Databricks ジョブとしてスケジュールする

このチュートリアルでは、対話型ノートブックを使用して、PythonまたはScalaによる一般的なETLタスクを完了します。

DLT を使用して ETL パイプラインを構築することもできます。Databricks は、本番運用 ETL パイプラインの構築、デプロイ、保守の複雑さを軽減するために DLT を作成しました。 「 チュートリアル: 初めての DLT パイプラインを実行する」を参照してください。

Databricks Terraform プロバイダーを使用して、この記事のリソースを作成することもできます。「Terraformを使用したクラスター、ノートブック、ジョブの作成」を参照してください。

必要条件

注記

クラスター制御権限がない場合でも、 クラスターにアクセスできる限り、以下のほとんどの手順を完了できます。

ステップ 1: クラスターを作成する

探索的データ分析とデータエンジニアリングを行うには、コマンドの実行に必要なコンピュートリソースを提供するクラスターを作成します。

  1. サイドバーのコンピュートアイコンコンピュート ]をクリックします。
  2. 「コンピュート」ページで、「 クラスターを作成 」をクリックします。これにより、「新しいクラスター」ページが開きます。
  3. クラスターの一意の名前を指定し、残りの値はデフォルトのままにして、「 クラスターを作成 」をクリックします。

Databricks クラスターの詳細については、「コンピュート」を参照してください。

手順 2: Databricks ノートブックを作成する

ワークスペースにノートブックを作成するには、サイドバーの「新しいアイコン 新規 」をクリックし、「 ノートブック 」をクリックします。空白のノートブックがワークスペースで開きます。

ノートブックの作成と管理の詳細については、「 ノートブックの管理」を参照してください。

ステップ 3: Delta Lakeにデータを取り込むようにAuto Loaderを設定する

Databricks では、増分データ取り込みに Auto Loader を使用することをお勧めします。Auto Loader は、新しいファイルがクラウド オブジェクト ストレージに到着すると、自動的に検出して処理します。

Databricks では、 Delta Lake を使用してデータを格納することをお勧めします。 Delta Lake は、 ACIDトランザクションを提供し、データレイクハウスを有効にするオープンソース ストレージ レイヤーです。 Delta Lake は、 Databricksで作成されたテーブルのデフォルト形式です。

Delta Lakeテーブルにデータを取り込むようにAuto Loaderを設定するには、以下のコードをコピーしてノートブックの空のセルに貼り付けます。

Python
# Import functions
from pyspark.sql.functions import col, current_timestamp

# Define variables used in code below
file_path = "/databricks-datasets/structured-streaming/events"
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
table_name = f"{username}_etl_quickstart"
checkpoint_path = f"/tmp/{username}/_checkpoint/etl_quickstart"

# Clear out data from previous demo execution
spark.sql(f"DROP TABLE IF EXISTS {table_name}")
dbutils.fs.rm(checkpoint_path, True)

# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
.select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(availableNow=True)
.toTable(table_name))
注記

このコードで定義された変数を使用すると、既存のワークスペースアセットや他のユーザーと競合するリスクを負うことなく、コードを安全に実行できます。ネットワークまたはストレージのアクセス許可が制限されていると、このコードを実行するとエラーが発生します。これらの制限のトラブルシューティングについては、ワークスペース管理者に問い合わせてください。

Auto Loaderの詳細については、「Auto Loaderとは」を参照してください。

ステップ 4: データの処理と操作

ノートブックは、セルごとにロジックを実行します。セル内のロジックを実行するには、以下を実行します。

  1. 前の手順で完了したセルを実行するには、セルを選択して SHIFT+ENTER キーを押します。

  2. 作成したテーブルを検索するには、以下のコードをコピーして空のセルに貼り付け、 Shift+Enter キーを押してセルを実行します。

Python
df = spark.read.table(table_name)
  1. データフレーム内のデータをプレビューするには、以下のコードを空のセルにコピーして貼り付け、 Shift + Enter キーを押してセルを実行します。
Python
display(df)

データを視覚化するための対話型オプションの詳細については、「 Databricks ノートブックでの視覚化」を参照してください。

ステップ 5: ジョブをスケジュールする

DatabricksノートブックをDatabricksジョブのタスクとして追加することで、Databricksノートブックを本番運用スクリプトとして実行できます。このステップでは、手動でトリガーできる新しいジョブを作成します。

ノートブックをタスクとしてスケジュールするには、以下を実行します。

  1. ヘッダーバーの右側にある「 スケジュール 」をクリックします。
  2. ジョブ名 」に一意の名前を入力します。
  3. 手動 」をクリックします。
  4. クラスター 」ドロップダウンで、ステップ1で作成したクラスターを選択します。
  5. 作成 をクリックします。
  6. 表示されるウィンドウで、「 今すぐ実行 」をクリックします。
  7. ジョブの実行結果を表示するには、外部リンク 最終実行 タイムスタンプの横にある アイコンをクリックします。

ジョブの詳細については、「 ジョブとは」を参照してください。

追加の統合

Databricksを使用したデータエンジニアリングのための統合とツールの詳細をご覧ください。