メインコンテンツまでスキップ

チュートリアル:ETL LakeFlow宣言型パイプラインを使用して パイプラインを構築する

ETL宣言型パイプラインと を使用して、データ オーケストレーション用の (抽出、変換、読み込み) パイプラインを作成およびデプロイする方法LakeFlow Auto Loader学習します。ETL パイプラインは、ソース システムからデータを読み取り、データ品質チェックやレコード重複排除などの要件に基づいてそのデータを変換し、データ ウェアハウスやデータレイクなどのターゲット システムにデータを書き込む手順を実装します。

このチュートリアルでは、宣言型パイプライン LakeFlow 使用し、次のことを Auto Loader します。

  • 生のソースデータをターゲットテーブルに取り込みます。
  • 生のソース データを変換し、変換されたデータを 2 つのターゲット マテリアライズドビューに書き込みます。
  • 変換されたデータをクエリします。
  • Databricks ジョブを使用して ETL パイプラインを自動化します。

LakeFlow 宣言型パイプラインと Auto Loaderの詳細については、「LakeFlow 宣言型パイプライン」および「Auto Loaderとは」を参照してください。

必要条件

このチュートリアルを完了するには、以下の条件を満たす必要があります。

データセットについて

この例で使用されているデータセットは、現代音楽トラックの機能とメタデータのコレクションである Million Song データセットのサブセットです。 このデータセットは、Databricksワークスペースに含まれている サンプルデータセットに含まれています。

ステップ 1: パイプラインを作成する

まず、宣言型パイプラインで ETL パイプラインを作成します LakeFlow 。 LakeFlow 宣言型パイプラインは、宣言型パイプライン構文を使用してノートブックまたはファイル ( ソース コード と呼ばれます) で定義された依存関係を解決することで LakeFlow パイプラインを作成します。 各ソース コード ファイルには 1 つの言語のみを含めることができますが、パイプラインには複数の言語固有のノートブックまたはファイルを追加できます。詳細については、「LakeFlow 宣言型パイプライン」を参照してください。

important

ソース コード フィールドを空白のままにして、ソース コード作成用のノートブックを自動的に作成および構成します。

このチュートリアルでは、サーバレス コンピュート と Unity Catalogを使用します。 指定されていないすべての構成オプションについては、デフォルト設定を使用します。サーバレス コンピュートがワークスペースで有効になっていないか、サポートされていない場合は、デフォルト コンピュートの設定を使用して、記載されているとおりにチュートリアルを完了できます。 デフォルト コンピュート設定を使用する場合は、 Create パイプライン Unity Catalog UI の[Destination] セクションの[Storage options] で [Storage] を手動で選択する必要があります。

宣言型パイプラインで新しい ETL パイプラインを作成するには LakeFlow 次の手順を実行します。

  1. サイドバーで、 パイプライン をクリックします。
  2. パイプラインの作成ETL パイプライン をクリックします。
  3. パイプライン名 に、一意のパイプライン名を入力します。
  4. サーバレス チェックボックスを選択します。
  5. 公開先 で、テーブルが公開される Unity Catalog の場所を構成するには、既存の カタログ を選択し、 スキーマ に新しい名前を書き込み、カタログに新しいスキーマを作成します。
  6. 作成 をクリックします。

新しいパイプラインのパイプライン UI が表示されます。

ステップ 2: パイプラインを開発する

important

ノートブックには、1 つのプログラミング言語のみを含めることができます。パイプラインのソースコードノートブックで Python コードと SQL コードを混在させないでください。

この手順では、 Databricks ノートブックを使用して、宣言型パイプラインのソース コードを対話形式で開発および検証 LakeFlow 。

このコードでは、インクリメンタル データ取り込みに Auto Loader を使用します。 Auto Loader は、新しいファイルがクラウド オブジェクト ストレージに到着すると、自動的に検出して処理します。 詳細については、Auto Loaderとはを参照してください。

空白のソース コード ノートブックが自動的に作成され、パイプライン用に構成されます。ノートブックは、ユーザー・ディレクトリー内の新しいディレクトリーに作成されます。新しいディレクトリとファイルの名前は、パイプラインの名前と一致します。たとえば、 /Users/someone@example.com/my_pipeline/my_pipelineです。

パイプラインを開発するときは、Python または SQL のいずれかを選択できます。両方の言語の例が含まれています。選択した言語に基づいて、デフォルトのノートブック言語を選択していることを確認します。LakeFlow宣言型パイプラインのコード開発に対するノートブックのサポートの詳細については、「宣言ETL型パイプライン」 の「ノートブックを使用して パイプラインを開発およびデバッグLakeFlowする」を参照してください。

  1. このノートブックにアクセスするためのリンクは、 パイプラインの詳細 パネルの ソース コード フィールドの下にあります。リンクをクリックしてノートブックを開き、次のステップに進みます。

  2. 右上の 接続 をクリックして、コンピュート設定メニューを開きます。

  3. ステップ 1 で作成したパイプラインの名前にカーソルを合わせます。

  4. 接続 をクリックします。

  5. 上部にあるノートブックのタイトルの横にあるノートブックのデフォルト言語(Python または SQL)を選択します。

  6. 次のコードをコピーして、ノートブックのセルに貼り付けます。

    タブ :::タブ-item[Python]

    Python
    # Import modules
    import dlt
    from pyspark.sql.functions import *
    from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField

    # Define the path to the source data
    file_path = f"/databricks-datasets/songs/data-001/"

    # Define a streaming table to ingest data from a volume
    schema = StructType(
    [
    StructField("artist_id", StringType(), True),
    StructField("artist_lat", DoubleType(), True),
    StructField("artist_long", DoubleType(), True),
    StructField("artist_location", StringType(), True),
    StructField("artist_name", StringType(), True),
    StructField("duration", DoubleType(), True),
    StructField("end_of_fade_in", DoubleType(), True),
    StructField("key", IntegerType(), True),
    StructField("key_confidence", DoubleType(), True),
    StructField("loudness", DoubleType(), True),
    StructField("release", StringType(), True),
    StructField("song_hotnes", DoubleType(), True),
    StructField("song_id", StringType(), True),
    StructField("start_of_fade_out", DoubleType(), True),
    StructField("tempo", DoubleType(), True),
    StructField("time_signature", DoubleType(), True),
    StructField("time_signature_confidence", DoubleType(), True),
    StructField("title", StringType(), True),
    StructField("year", IntegerType(), True),
    StructField("partial_sequence", IntegerType(), True)
    ]
    )

    @dlt.table(
    comment="Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks."
    )
    def songs_raw():
    return (spark.readStream
    .format("cloudFiles")
    .schema(schema)
    .option("cloudFiles.format", "csv")
    .option("sep","\t")
    .option("inferSchema", True)
    .load(file_path))

    # Define a materialized view that validates data and renames a column
    @dlt.table(
    comment="Million Song Dataset with data cleaned and prepared for analysis."
    )
    @dlt.expect("valid_artist_name", "artist_name IS NOT NULL")
    @dlt.expect("valid_title", "song_title IS NOT NULL")
    @dlt.expect("valid_duration", "duration > 0")
    def songs_prepared():
    return (
    spark.read.table("songs_raw")
    .withColumnRenamed("title", "song_title")
    .select("artist_id", "artist_name", "duration", "release", "tempo", "time_signature", "song_title", "year")
    )

    # Define a materialized view that has a filtered, aggregated, and sorted view of the data
    @dlt.table(
    comment="A table summarizing counts of songs released by the artists who released the most songs each year."
    )
    def top_artists_by_year():
    return (
    spark.read.table("songs_prepared")
    .filter(expr("year > 0"))
    .groupBy("artist_name", "year")
    .count().withColumnRenamed("count", "total_number_of_songs")
    .sort(desc("total_number_of_songs"), desc("year"))
    )

    :::

    タブ-item[sql]

    SQL
    -- Define a streaming table to ingest data from a volume
    CREATE OR REFRESH STREAMING TABLE songs_raw
    (
    artist_id STRING,
    artist_lat DOUBLE,
    artist_long DOUBLE,
    artist_location STRING,
    artist_name STRING,
    duration DOUBLE,
    end_of_fade_in DOUBLE,
    key INT,
    key_confidence DOUBLE,
    loudness DOUBLE,
    release STRING,
    song_hotnes DOUBLE,
    song_id STRING,
    start_of_fade_out DOUBLE,
    tempo DOUBLE,
    time_signature INT,
    time_signature_confidence DOUBLE,
    title STRING,
    year INT,
    partial_sequence STRING,
    value STRING
    )
    COMMENT "Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks."
    AS SELECT *
    FROM STREAM read_files(
    '/databricks-datasets/songs/data-001/');

    -- Define a materialized view that validates data and renames a column
    CREATE OR REFRESH MATERIALIZED VIEW songs_prepared(
    CONSTRAINT valid_artist_name EXPECT (artist_name IS NOT NULL),
    CONSTRAINT valid_title EXPECT (song_title IS NOT NULL),
    CONSTRAINT valid_duration EXPECT (duration > 0)
    )
    COMMENT "Million Song Dataset with data cleaned and prepared for analysis."
    AS SELECT artist_id, artist_name, duration, release, tempo, time_signature, title AS song_title, year
    FROM songs_raw;

    -- Define a materialized view that has a filtered, aggregated, and sorted view of the data
    CREATE OR REFRESH MATERIALIZED VIEW top_artists_by_year
    COMMENT "A table summarizing counts of songs released by the artists each year, who released the most songs."
    AS SELECT
    artist_name,
    year,
    COUNT(*) AS total_number_of_songs
    FROM songs_prepared
    WHERE year > 0
    GROUP BY artist_name, year
    ORDER BY total_number_of_songs DESC, year DESC

    ::: ::::

  7. [ 開始 ] をクリックして、接続されたパイプラインの更新を開始します。

ステップ 3: 変換されたデータのクエリ

この手順では、ETL パイプラインで処理されたデータをクエリして、曲のデータを分析します。これらのクエリは、前の手順で作成した準備済みレコードを使用します。

まず、1990 年以降に毎年最も多くの曲をリリースしたアーティストを見つけるクエリを実行します。

  1. サイドバーで、「SQL エディターのアイコン SQL エディター 」をクリックします。

  2. 「新規タブ 追加またはプラスアイコン 」アイコンをクリックし、メニューから 「新規クエリーを作成 」を選択します。

  3. 次の項目を入力します。

    SQL
    -- Which artists released the most songs each year in 1990 or later?
    SELECT artist_name, total_number_of_songs, year
    FROM <catalog>.<schema>.top_artists_by_year
    WHERE year >= 1990
    ORDER BY total_number_of_songs DESC, year DESC

    <catalog><schema> を、テーブルが含まれているカタログとスキーマの名前に置き換えます。たとえば、 data_pipelines.songs_data.top_artists_by_year.

  4. [選択項目を実行] をクリックします。

次に、4/4 ビートとダンサブルなテンポの曲を見つける別のクエリを実行します。

  1. 新しいタップ アイコン 追加またはプラスアイコン をクリックし、メニューから 新しいクエリの作成 を選択します。

  2. 次のコードを入力します。

    SQL
     -- Find songs with a 4/4 beat and danceable tempo
    SELECT artist_name, song_title, tempo
    FROM <catalog>.<schema>.songs_prepared
    WHERE time_signature = 4 AND tempo between 100 and 140;

    <catalog><schema> を、テーブルが含まれているカタログとスキーマの名前に置き換えます。たとえば、 data_pipelines.songs_data.songs_prepared.

  3. [選択項目を実行] をクリックします。

手順 4: パイプラインを実行するジョブを作成する

次に、 Databricks ジョブを使用してデータ取り込み、処理、分析のステップを自動化するワークフローを作成します。

  1. ワークスペースで、ワークフローアイコン。 サイドバーの ワークフロー をクリックし、 ジョブを作成 をクリックします。
  2. タスク タイトル ボックスで、 新しいジョブ <date and time> をジョブ名に置き換えます。 たとえば、 Songs workflow.
  3. [タスク名] に、最初のタスクの名前を入力します(例:ETL_songs_data)。
  4. 種類パイプライン を選択します。
  5. [パイプライン ] で、ステップ 1 で作成したパイプラインを選択します。
  6. 作成 をクリックします。
  7. ワークフローを実行するには、 今すぐ実行 をクリックします。 実行の詳細を表示するには、 実行 タブをクリックします。 タスクをクリックして、タスク実行の詳細を表示します。
  8. ワークフローの完了時に結果を表示するには、 成功した最新の実行に移動する か、ジョブ実行の開始 時刻 をクリックします。 出力 ページが表示され、クエリ結果が表示されます。

ジョブの実行の詳細については、「LakeFlowジョブのモニタリングと可観測性」を参照してください。

ステップ 5: パイプライン ジョブをスケジュールする

ETL パイプラインをスケジュールに従って実行するには、次の手順を実行します。

  1. サイドバーのワークフローアイコン。ワークフロー ]をクリックします。
  2. [名前] 列で、ジョブ名をクリックします。サイドパネルには ジョブの詳細 が表示されます。
  3. スケジュールとトリガー パネルで トリガーの追加 をクリックし、 トリガー・タイプスケジュール済み を選択します。
  4. 期間、開始時刻、およびタイムゾーンを指定します。
  5. [ 保存 ]をクリックします。

詳細情報