メインコンテンツまでスキップ

チュートリアル:ETL Lakeflow宣言型パイプラインを使用して パイプラインを構築する

Lakeflow宣言型パイプラインとAuto Loaderを使用して、データ オーケストレーション用のETL (抽出、変換、読み込み) パイプラインを作成およびデプロイする方法を学習します。ETL パイプラインは、ソース システムからデータを読み取り、データ品質チェックやレコード重複排除などの要件に基づいてそのデータを変換し、データ ウェアハウスやデータレイクなどのターゲット システムにデータを書き込む手順を実装します。

このチュートリアルでは、Lakeflow宣言型パイプラインとAuto Loaderを使用し、次のことを行います。

  • 生のソースデータをターゲットテーブルに取り込みます。
  • 生のソース データを変換し、変換されたデータを 2 つのターゲット マテリアライズドビューに書き込みます。
  • 変換されたデータをクエリします。
  • Databricks ジョブを使用して ETL パイプラインを自動化します。

Lakeflow 宣言型パイプラインと Auto Loaderの詳細については、「Lakeflow 宣言型パイプライン」および「Auto Loaderとは」を参照してください。

必要条件

このチュートリアルを完了するには、以下の条件を満たす必要があります。

データセットについて

この例で使用されているデータセットは、現代音楽トラックの機能とメタデータのコレクションである Million Song データセットのサブセットです。 このデータセットは、Databricksワークスペースに含まれている サンプルデータセットに含まれています。

ステップ 1: パイプラインを作成する

まず、Lakeflow宣言型パイプラインで ETL パイプラインを作成します 。 Lakeflow 宣言型パイプラインは、宣言型パイプライン構文を使用してノートブックまたはファイル ( ソース コード と呼ばれます) で定義された依存関係を解決することで Lakeflow パイプラインを作成します。 各ソース コード ファイルには 1 つの言語のみを含めることができますが、パイプラインには複数の言語固有のノートブックまたはファイルを追加できます。詳細については、「Lakeflow 宣言型パイプライン」を参照してください。

important

ソース コード フィールドを空白のままにして、ソース コード作成用のノートブックを自動的に作成および構成します。

このチュートリアルでは、サーバレス コンピュート と Unity Catalogを使用します。 指定されていないすべての構成オプションについては、デフォルト設定を使用します。サーバレス コンピュートがワークスペースで有効になっていないか、サポートされていない場合は、デフォルト コンピュートの設定を使用して、記載されているとおりにチュートリアルを完了できます。 デフォルト コンピュート設定を使用する場合は、 パイプライン作成 UIの 配信先 セクションで ストレージオプションUnity Catalog を手動で選択する必要があります。

Lakeflow宣言型パイプラインで新しい ETL パイプラインを作成するには次の手順を実行します。

  1. ワークスペースで、サイドバーの ワークフローアイコン。 ジョブとパイプライン をクリックします。
  2. [ 新規 ] で、[ ETL パイプライン ] をクリックします。
  3. パイプライン名 に、一意のパイプライン名を入力します。
  4. サーバレス チェックボックスを選択します。
  5. 公開先 で、テーブルが公開される Unity Catalog の場所を構成するには、既存の カタログ を選択し、 スキーマ に新しい名前を書き込み、カタログに新しいスキーマを作成します。
  6. 作成 をクリックします。

新しいパイプラインのパイプライン UI が表示されます。

ステップ 2: パイプラインを開発する

important

ノートブックには、1 つのプログラミング言語のみを含めることができます。パイプラインのソースコードノートブックで Python コードと SQL コードを混在させないでください。

この手順では、 Databricks ノートブックを使用して、Lakeflow宣言型パイプラインのソース コードを対話形式で開発および検証します。

このコードでは、インクリメンタル データ取り込みに Auto Loader を使用します。 Auto Loader は、新しいファイルがクラウド オブジェクト ストレージに到着すると、自動的に検出して処理します。 詳細については、Auto Loaderとはを参照してください。

空白のソース コード ノートブックが自動的に作成され、パイプライン用に構成されます。ノートブックは、ユーザー・ディレクトリー内の新しいディレクトリーに作成されます。新しいディレクトリとファイルの名前は、パイプラインの名前と一致します。たとえば、 /Users/someone@example.com/my_pipeline/my_pipelineです。

パイプラインを開発するときは、Python または SQL のいずれかを選択できます。両方の言語の例が含まれています。選択した言語に基づいて、デフォルトのノートブック言語を選択していることを確認します。Lakeflow宣言型パイプラインのコード開発に対するノートブックのサポートの詳細については、Lakeflow宣言型パイプラインでノートブックを使用してETLパイプラインを開発およびデバッグするを参照してください。

  1. このノートブックにアクセスするためのリンクは、 パイプラインの詳細 パネルの ソース コード フィールドの下にあります。リンクをクリックしてノートブックを開き、次のステップに進みます。

  2. 右上の 接続 をクリックして、コンピュート設定メニューを開きます。

  3. ステップ 1 で作成したパイプラインの名前にカーソルを合わせます。

  4. 接続 をクリックします。

  5. 上部にあるノートブックのタイトルの横にあるノートブックのデフォルト言語(Python または SQL)を選択します。

  6. 次のコードをコピーして、ノートブックのセルに貼り付けます。

Python
# Import modules
import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField

# Define the path to the source data
file_path = f"/databricks-datasets/songs/data-001/"

# Define a streaming table to ingest data from a volume
schema = StructType(
[
StructField("artist_id", StringType(), True),
StructField("artist_lat", DoubleType(), True),
StructField("artist_long", DoubleType(), True),
StructField("artist_location", StringType(), True),
StructField("artist_name", StringType(), True),
StructField("duration", DoubleType(), True),
StructField("end_of_fade_in", DoubleType(), True),
StructField("key", IntegerType(), True),
StructField("key_confidence", DoubleType(), True),
StructField("loudness", DoubleType(), True),
StructField("release", StringType(), True),
StructField("song_hotnes", DoubleType(), True),
StructField("song_id", StringType(), True),
StructField("start_of_fade_out", DoubleType(), True),
StructField("tempo", DoubleType(), True),
StructField("time_signature", DoubleType(), True),
StructField("time_signature_confidence", DoubleType(), True),
StructField("title", StringType(), True),
StructField("year", IntegerType(), True),
StructField("partial_sequence", IntegerType(), True)
]
)

@dlt.table(
comment="Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks."
)
def songs_raw():
return (spark.readStream
.format("cloudFiles")
.schema(schema)
.option("cloudFiles.format", "csv")
.option("sep","\t")
.option("inferSchema", True)
.load(file_path))

# Define a materialized view that validates data and renames a column
@dlt.table(
comment="Million Song Dataset with data cleaned and prepared for analysis."
)
@dlt.expect("valid_artist_name", "artist_name IS NOT NULL")
@dlt.expect("valid_title", "song_title IS NOT NULL")
@dlt.expect("valid_duration", "duration > 0")
def songs_prepared():
return (
spark.read.table("songs_raw")
.withColumnRenamed("title", "song_title")
.select("artist_id", "artist_name", "duration", "release", "tempo", "time_signature", "song_title", "year")
)

# Define a materialized view that has a filtered, aggregated, and sorted view of the data
@dlt.table(
comment="A table summarizing counts of songs released by the artists who released the most songs each year."
)
def top_artists_by_year():
return (
spark.read.table("songs_prepared")
.filter(expr("year > 0"))
.groupBy("artist_name", "year")
.count().withColumnRenamed("count", "total_number_of_songs")
.sort(desc("total_number_of_songs"), desc("year"))
)
  1. [ 開始 ] をクリックして、接続されたパイプラインの更新を開始します。

ステップ 3: 変換されたデータのクエリ

この手順では、ETL パイプラインで処理されたデータをクエリして、曲のデータを分析します。これらのクエリは、前の手順で作成した準備済みレコードを使用します。

まず、1990 年以降に毎年最も多くの曲をリリースしたアーティストを見つけるクエリを実行します。

  1. サイドバーで、「SQL エディターのアイコン SQL エディター 」をクリックします。

  2. 追加またはプラスアイコン新規タブアイコンをクリックし、メニューから 新規クエリーを作成 を選択します。

  3. 次の項目を入力します。

    SQL
    -- Which artists released the most songs each year in 1990 or later?
    SELECT artist_name, total_number_of_songs, year
    FROM <catalog>.<schema>.top_artists_by_year
    WHERE year >= 1990
    ORDER BY total_number_of_songs DESC, year DESC

    <catalog><schema> を、テーブルが含まれているカタログとスキーマの名前に置き換えます。たとえば、 data_pipelines.songs_data.top_artists_by_year.

  4. [選択項目を実行] をクリックします。

次に、4/4 ビートとダンサブルなテンポの曲を見つける別のクエリを実行します。

  1. 新しいタップ アイコン 追加またはプラスアイコン をクリックし、メニューから 新しいクエリの作成 を選択します。

  2. 次のコードを入力します。

    SQL
     -- Find songs with a 4/4 beat and danceable tempo
    SELECT artist_name, song_title, tempo
    FROM <catalog>.<schema>.songs_prepared
    WHERE time_signature = 4 AND tempo between 100 and 140;

    <catalog><schema> を、テーブルが含まれているカタログとスキーマの名前に置き換えます。たとえば、 data_pipelines.songs_data.songs_prepared.

  3. [選択項目を実行] をクリックします。

手順 4: パイプラインを実行するジョブを作成する

次に、 Databricks ジョブを使用してデータ取り込み、処理、分析のステップを自動化するワークフローを作成します。

  1. ワークスペースで、サイドバーの ワークフローアイコン。 ジョブとパイプライン をクリックします。
  2. 新規 で、 ジョブ をクリックします。
  3. タスク タイトル ボックスで、 新しいジョブ <date and time> をジョブ名に置き換えます。 たとえば、 Songs workflow.
  4. [タスク名] に、最初のタスクの名前を入力します(例:ETL_songs_data)。
  5. 種類パイプライン を選択します。
  6. パイプライン で、ステップ 1 で作成したパイプラインを選択します。
  7. 作成 をクリックします。
  8. ワークフローを実行するには、 今すぐ実行 をクリックします。 実行の詳細を表示するには、 実行 タブをクリックします。 タスクをクリックして、タスク実行の詳細を表示します。
  9. ワークフローの完了時に結果を表示するには、 成功した最新の実行に移動する か、ジョブ実行の開始 時刻 をクリックします。 出力 ページが表示され、クエリ結果が表示されます。

ジョブの実行の詳細については、「Lakeflowジョブのモニタリングと可観測性」を参照してください。

ステップ 5: パイプライン ジョブをスケジュールする

ETL パイプラインをスケジュールに従って実行するには、次の手順を実行します。

  1. ジョブと同じ Databricks ワークスペースの ジョブとパイプライン UI に移動します。
  2. 必要に応じて、[ ジョブ ] と [自分が所有] フィルターを選択します。
  3. [名前] 列で、ジョブ名をクリックします。サイドパネルには ジョブの詳細 が表示されます。
  4. スケジュールとトリガー パネルで トリガーの追加 をクリックし、 トリガー・タイプスケジュール済み を選択します。
  5. 期間、開始時刻、およびタイムゾーンを指定します。
  6. [ 保存 ]をクリックします。

詳細情報