メインコンテンツまでスキップ

チュートリアル: DLT を使用して ETL パイプラインを構築する

DLT と Auto Loader を使用して、データ オーケストレーション用の ETL (抽出、変換、読み込み) パイプラインを作成およびデプロイする方法について説明します。ETL パイプラインは、ソース システムからデータを読み取り、データ品質チェックやレコード重複排除などの要件に基づいてそのデータを変換し、データ ウェアハウスやデータレイクなどのターゲット システムにデータを書き込む手順を実装します。

このチュートリアルでは、 DLT と Auto Loader を使用して次のことを行います。

  • 生のソースデータをターゲットテーブルに取り込みます。
  • 生のソース データを変換し、変換されたデータを 2 つのターゲット マテリアライズドビューに書き込みます。
  • 変換されたデータをクエリします。
  • Databricks ジョブを使用して ETL パイプラインを自動化します。

DLT と Auto Loaderの詳細については、「DLT」および「Auto Loaderとは」を参照してください。

必要条件

このチュートリアルを完了するには、以下の条件を満たす必要があります。

データセットについて

この例で使用されているデータセットは、現代音楽トラックの機能とメタデータのコレクションである Million Song データセットのサブセットです。 このデータセットは、Databricksワークスペースに含まれている サンプルデータセットに含まれています。

ステップ 1: パイプラインを作成する

まず、DLT で ETL パイプラインを作成します。DLT は、DLT 構文を使用してノートブックまたはファイル ( ソース コード と呼ばれます) で定義された依存関係を解決することでパイプラインを作成します。各ソース コード ファイルには 1 つの言語のみを含めることができますが、パイプラインには複数の言語固有のノートブックまたはファイルを追加できます。詳細については、DLTを参照してください。

important

ソース コード フィールドを空白のままにして、ソース コード作成用のノートブックを自動的に作成および構成します。

このチュートリアルでは、サーバレス コンピュート と Unity Catalogを使用します。 指定されていないすべての構成オプションについては、デフォルト設定を使用します。サーバレス コンピュートがワークスペースで有効になっていないか、サポートされていない場合は、デフォルト コンピュートの設定を使用して、記載されているとおりにチュートリアルを完了できます。 デフォルト コンピュート設定を使用する場合は、 Create パイプライン Unity Catalog UI の[Destination] セクションの[Storage options] で [Storage] を手動で選択する必要があります。

DLT で新しい ETL パイプラインを作成するには、次の手順に従います。

  1. サイドバーで、 パイプライン をクリックします。
  2. パイプラインの作成ETL パイプライン をクリックします。
  3. パイプライン名 に、一意のパイプライン名を入力します。
  4. サーバレス チェックボックスを選択します。
  5. 公開先 で、テーブルが公開される Unity Catalog の場所を構成するには、既存の カタログ を選択し、 スキーマ に新しい名前を書き込み、カタログに新しいスキーマを作成します。
  6. 作成 をクリックします。

新しいパイプラインのパイプライン UI が表示されます。

ステップ 2: DLT パイプラインを開発する

important

ノートブックには、1 つのプログラミング言語のみを含めることができます。パイプラインのソースコードノートブックで Python コードと SQL コードを混在させないでください。

この手順では、 Databricks ノートブックを使用して、 DLT パイプラインのソース コードを対話形式で開発および検証します。

このコードでは、インクリメンタル データ取り込みに Auto Loader を使用します。 Auto Loader は、新しいファイルがクラウド オブジェクト ストレージに到着すると、自動的に検出して処理します。 詳細については、Auto Loaderとはを参照してください。

空白のソース コード ノートブックが自動的に作成され、パイプライン用に構成されます。ノートブックは、ユーザー・ディレクトリー内の新しいディレクトリーに作成されます。新しいディレクトリとファイルの名前は、パイプラインの名前と一致します。たとえば、 /Users/someone@example.com/my_pipeline/my_pipelineです。

DLT パイプラインを開発する場合は、Python または SQL のいずれかを選択できます。両方の言語の例が含まれています。選択した言語に基づいて、デフォルトのノートブック言語を選択していることを確認します。パイプライン コード開発のノートブック サポートの詳細については、 の「ノートブックを使用したDLTETLパイプラインの開発とデバッグDLT 」を参照してください。

  1. このノートブックにアクセスするためのリンクは、 パイプラインの詳細 パネルの ソース コード フィールドの下にあります。リンクをクリックしてノートブックを開き、次のステップに進みます。

  2. 右上の 接続 をクリックして、コンピュート設定メニューを開きます。

  3. ステップ 1 で作成したパイプラインの名前にカーソルを合わせます。

  4. 接続 をクリックします。

  5. 上部にあるノートブックのタイトルの横にあるノートブックのデフォルト言語(Python または SQL)を選択します。

  6. 次のコードをコピーして、ノートブックのセルに貼り付けます。

    タブ :::タブ-item[Python]

    Python
    # Import modules
    import dlt
    from pyspark.sql.functions import *
    from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField

    # Define the path to the source data
    file_path = f"/databricks-datasets/songs/data-001/"

    # Define a streaming table to ingest data from a volume
    schema = StructType(
    [
    StructField("artist_id", StringType(), True),
    StructField("artist_lat", DoubleType(), True),
    StructField("artist_long", DoubleType(), True),
    StructField("artist_location", StringType(), True),
    StructField("artist_name", StringType(), True),
    StructField("duration", DoubleType(), True),
    StructField("end_of_fade_in", DoubleType(), True),
    StructField("key", IntegerType(), True),
    StructField("key_confidence", DoubleType(), True),
    StructField("loudness", DoubleType(), True),
    StructField("release", StringType(), True),
    StructField("song_hotnes", DoubleType(), True),
    StructField("song_id", StringType(), True),
    StructField("start_of_fade_out", DoubleType(), True),
    StructField("tempo", DoubleType(), True),
    StructField("time_signature", DoubleType(), True),
    StructField("time_signature_confidence", DoubleType(), True),
    StructField("title", StringType(), True),
    StructField("year", IntegerType(), True),
    StructField("partial_sequence", IntegerType(), True)
    ]
    )

    @dlt.table(
    comment="Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks."
    )
    def songs_raw():
    return (spark.readStream
    .format("cloudFiles")
    .schema(schema)
    .option("cloudFiles.format", "csv")
    .option("sep","\t")
    .option("inferSchema", True)
    .load(file_path))

    # Define a materialized view that validates data and renames a column
    @dlt.table(
    comment="Million Song Dataset with data cleaned and prepared for analysis."
    )
    @dlt.expect("valid_artist_name", "artist_name IS NOT NULL")
    @dlt.expect("valid_title", "song_title IS NOT NULL")
    @dlt.expect("valid_duration", "duration > 0")
    def songs_prepared():
    return (
    spark.read.table("songs_raw")
    .withColumnRenamed("title", "song_title")
    .select("artist_id", "artist_name", "duration", "release", "tempo", "time_signature", "song_title", "year")
    )

    # Define a materialized view that has a filtered, aggregated, and sorted view of the data
    @dlt.table(
    comment="A table summarizing counts of songs released by the artists who released the most songs each year."
    )
    def top_artists_by_year():
    return (
    spark.read.table("songs_prepared")
    .filter(expr("year > 0"))
    .groupBy("artist_name", "year")
    .count().withColumnRenamed("count", "total_number_of_songs")
    .sort(desc("total_number_of_songs"), desc("year"))
    )

    :::

    タブ-item[sql]

    SQL
    -- Define a streaming table to ingest data from a volume
    CREATE OR REFRESH STREAMING TABLE songs_raw
    (
    artist_id STRING,
    artist_lat DOUBLE,
    artist_long DOUBLE,
    artist_location STRING,
    artist_name STRING,
    duration DOUBLE,
    end_of_fade_in DOUBLE,
    key INT,
    key_confidence DOUBLE,
    loudness DOUBLE,
    release STRING,
    song_hotnes DOUBLE,
    song_id STRING,
    start_of_fade_out DOUBLE,
    tempo DOUBLE,
    time_signature INT,
    time_signature_confidence DOUBLE,
    title STRING,
    year INT,
    partial_sequence STRING,
    value STRING
    )
    COMMENT "Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks."
    AS SELECT *
    FROM STREAM read_files(
    '/databricks-datasets/songs/data-001/');

    -- Define a materialized view that validates data and renames a column
    CREATE OR REFRESH MATERIALIZED VIEW songs_prepared(
    CONSTRAINT valid_artist_name EXPECT (artist_name IS NOT NULL),
    CONSTRAINT valid_title EXPECT (song_title IS NOT NULL),
    CONSTRAINT valid_duration EXPECT (duration > 0)
    )
    COMMENT "Million Song Dataset with data cleaned and prepared for analysis."
    AS SELECT artist_id, artist_name, duration, release, tempo, time_signature, title AS song_title, year
    FROM songs_raw;

    -- Define a materialized view that has a filtered, aggregated, and sorted view of the data
    CREATE OR REFRESH MATERIALIZED VIEW top_artists_by_year
    COMMENT "A table summarizing counts of songs released by the artists each year, who released the most songs."
    AS SELECT
    artist_name,
    year,
    COUNT(*) AS total_number_of_songs
    FROM songs_prepared
    WHERE year > 0
    GROUP BY artist_name, year
    ORDER BY total_number_of_songs DESC, year DESC

    ::: ::::

  7. [ 開始 ] をクリックして、接続されたパイプラインの更新を開始します。

ステップ 3: 変換されたデータのクエリ

この手順では、ETL パイプラインで処理されたデータをクエリして、曲のデータを分析します。これらのクエリは、前の手順で作成した準備済みレコードを使用します。

まず、1990 年以降に毎年最も多くの曲をリリースしたアーティストを見つけるクエリを実行します。

  1. サイドバーで、「SQL エディターのアイコン SQL エディター 」をクリックします。

  2. 「新規タブ 追加またはプラスアイコン 」アイコンをクリックし、メニューから 「新規クエリーを作成 」を選択します。

  3. 次の項目を入力します。

    SQL
    -- Which artists released the most songs each year in 1990 or later?
    SELECT artist_name, total_number_of_songs, year
    FROM <catalog>.<schema>.top_artists_by_year
    WHERE year >= 1990
    ORDER BY total_number_of_songs DESC, year DESC

    <catalog><schema> を、テーブルが含まれているカタログとスキーマの名前に置き換えます。たとえば、 data_pipelines.songs_data.top_artists_by_year.

  4. [選択項目を実行] をクリックします。

次に、4/4 ビートとダンサブルなテンポの曲を見つける別のクエリを実行します。

  1. 新しいタップ アイコン 追加またはプラスアイコン をクリックし、メニューから 新しいクエリの作成 を選択します。

  2. 次のコードを入力します。

    SQL
     -- Find songs with a 4/4 beat and danceable tempo
    SELECT artist_name, song_title, tempo
    FROM <catalog>.<schema>.songs_prepared
    WHERE time_signature = 4 AND tempo between 100 and 140;

    <catalog><schema> を、テーブルが含まれているカタログとスキーマの名前に置き換えます。たとえば、 data_pipelines.songs_data.songs_prepared.

  3. [選択項目を実行] をクリックします。

ステップ 4: DLT パイプラインを実行するジョブを作成する

次に、 Databricks ジョブを使用してデータ取り込み、処理、分析のステップを自動化するワークフローを作成します。

  1. ワークスペースで、ワークフローアイコン サイドバーの ワークフロー をクリックし、 ジョブを作成 をクリックします。
  2. タスク タイトル ボックスで、 新しいジョブ <date and time> をジョブ名に置き換えます。 たとえば、 Songs workflow.
  3. [タスク名] に、最初のタスクの名前を入力します(例:ETL_songs_data)。
  4. 種類パイプライン を選択します。
  5. パイプライン で、ステップ 1 で作成した DLT パイプラインを選択します。
  6. 作成 をクリックします。
  7. ワークフローを実行するには、 今すぐ実行 をクリックします。 実行の詳細を表示するには、 実行 タブをクリックします。 タスクをクリックして、タスク実行の詳細を表示します。
  8. ワークフローの完了時に結果を表示するには、 成功した最新の実行に移動する か、ジョブ実行の開始 時刻 をクリックします。 出力 ページが表示され、クエリ結果が表示されます。

ジョブの実行の詳細については、「Databricksジョブのモニタリングと可観測性」を参照してください。

ステップ 5: DLT パイプライン ジョブをスケジュールする

ETL パイプラインをスケジュールに従って実行するには、次の手順を実行します。

  1. サイドバーのワークフローアイコンワークフロー ]をクリックします。
  2. [名前] 列で、ジョブ名をクリックします。サイドパネルには ジョブの詳細 が表示されます。
  3. スケジュールとトリガー パネルで トリガーの追加 をクリックし、 トリガー・タイプスケジュール済み を選択します。
  4. 期間、開始時刻、およびタイムゾーンを指定します。
  5. [ 保存 ]をクリックします。

詳細情報