Databricksでエンドツーエンドのデータパイプラインを構築する

この記事では、生データの取り込み、データの変換、処理されたデータの分析の実行方法など、エンドツーエンドのデータ処理パイプラインを作成およびデプロイする方法について説明します。

この記事では、 DatabricksノートブックとDatabricksジョブを使用してワークフローを調整するための完全なデータ パイプラインを作成する方法を説明しますが、 Databricksでは、信頼性があり、保守可能で、テスト可能なデータ処理パイプラインを構築するための宣言型インターフェイスであるDelta Live Tablesを使用することをお勧めします。

データパイプラインとは

データパイプラインは、ソースシステムからデータを移動し、要件に基づいてデータを変換し、ターゲットシステムにデータを格納するために必要な手順を実装します。データパイプラインには、生データをユーザーが使用できる準備済みデータに変換するために必要なすべてのプロセスが含まれています。たとえば、データパイプラインは、データアナリストやデータサイエンティストが分析とレポートを通じてデータから価値を抽出できるようにデータを準備する場合があります。

抽出、変換、読み込み(ETL)ワークフローは、データデータパイプラインパイプラインの一般的な例です。ETL処理では、データはソースシステムから取り込まれ、ステージングエリアに書き込まれ、要件(データ品質の確保、レコードの重複排除など)に基づいて変換され、データウェアハウスやデータレイクなどのターゲットシステムに書き込まれます。

データパイプラインのステップ

Databricksでデータパイプラインの構築を始めるために、この記事に含まれる例ではデータ処理ワークフローの作成について説明します。

  • Databricksの機能を使って生のデータセットを探索する

  • Databricksノートブックを作成し、生のソースデータを取り込み、生のデータをターゲットテーブルに書き込む

  • Databricksノートブックを作成して生のソースデータを変換し、変換後のデータをターゲットテーブルに書き込む

  • Databricksノートブックを作成して、変換されたデータをクエリーする

  • Databricksジョブでデータパイプラインを自動化する

要件

例:Million Songデータセット

この例で使用されるデータセットは、現代音楽トラックの機能とメタデータのコレクションであるMillion Song データセットのサブセットです。 このデータセットは、 Databricksスペースに含まれる サンプル データセット で入手できます。

ステップ1:クラスターを作成する

この例のデータ処理と分析を実行するには、コマンドの実行に必要な計算リソースを提供するクラスターを作成します。

この例では、DBFS に保存されているサンプル データセットを使用し、テーブルをUnity Catalogに永続化することを推奨しているため、シングル ユーザー アクセスモードで構成されたクラスターを作成します。 シングル ユーザー アクセス モードでは、DBFS へのフル アクセスが提供されると同時に、 Unity Catalogへのアクセスも可能になります。 DBFS とUnity Catalogのベスト プラクティスを参照してください。

  1. サイドバーの [コンピュート] をクリックします。

  2. [コンピュート] ページで、 [クラスターの作成]をクリックします。

  3. [新しいクラスター] ページで、クラスターの一意の名前を入力します。

  4. アクセスモード]で、[シングルユーザー]を選択します。

  5. [単一ユーザーまたはサービスプリンシパルアクセス] で、ユーザー名を選択します。

  6. 残りの値はデフォルトの状態のままにして、[クラスターの作成] をクリックします。

Databricks クラスターの詳細については、「 コンピュート」を参照してください。

ステップ2:ソースデータを探索する

Databricks インターフェイスを使用して生のソース データを探索する方法については、「 データパイプラインのソース データの探索」を参照してください。 データの取り込みと準備に直接進む場合は、「 ステップ 3: 生データを取り込む」に進みます。

ステップ3:生データを取り込む

このステップでは、生データをテーブルにロードして、後続処理に使用できるようにします。 Databricks プラットフォーム上のデータ資産 (テーブルなど) を管理するには、Databricks で Unity Catalog をお勧めします。 ただし、テーブルを Unity Catalog に発行するために必要なカタログとスキーマを作成するアクセス許可がない場合でも、テーブルを Hive metastoreに発行することで、次の手順を完了できます。

データを取り込むには、Databricksでは Auto Loaderを使用することを推奨しています。Auto Loaderは、新しいファイルがクラウドオブジェクトストレージに到着すると、自動的に検出して処理します。

ロードされたデータのスキーマを自動的に検出するようにオートローダーを構成すると、データスキーマを明示的に宣言せずにテーブルを初期化し、新しい列の導入に応じてテーブルスキーマを進化させることができます。これにより、時間の経過とともにスキーマの変更を手動で追跡して適用する必要がなくなります。DatabricksはAuto Loaderを使用する際にスキーマ推論を推奨しています。ただし、データ探索ステップで見られるように、曲データにはヘッダー情報が含まれていません。ヘッダーはデータとともに保存されないため、次の例に示すように、スキーマを明示的に定義する必要があります。

  1. サイドバーで 新しいアイコン [ 新規 ] をクリックし、メニューから [ノートブック ] を選択します。[ ノートブックの作成] ダイアログが表示されます。

  2. ノートブックの名前を入力します(例:Ingest songs data)。デフォルトでは、

    • Pythonが選択された言語です。

    • ノートブックは、最後に使用したクラスターにアタッチされます。 この場合、「 ステップ 1: クラスターを作成する」で作成したクラスターです。

  3. ノートブックの最初のセルに次のように入力します。

    from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField
    
    # Define variables used in the code below
    file_path = "/databricks-datasets/songs/data-001/"
    table_name = "<table-name>"
    checkpoint_path = "/tmp/pipeline_get_started/_checkpoint/song_data"
    
    schema = StructType(
      [
        StructField("artist_id", StringType(), True),
        StructField("artist_lat", DoubleType(), True),
        StructField("artist_long", DoubleType(), True),
        StructField("artist_location", StringType(), True),
        StructField("artist_name", StringType(), True),
        StructField("duration", DoubleType(), True),
        StructField("end_of_fade_in", DoubleType(), True),
        StructField("key", IntegerType(), True),
        StructField("key_confidence", DoubleType(), True),
        StructField("loudness", DoubleType(), True),
        StructField("release", StringType(), True),
        StructField("song_hotnes", DoubleType(), True),
        StructField("song_id", StringType(), True),
        StructField("start_of_fade_out", DoubleType(), True),
        StructField("tempo", DoubleType(), True),
        StructField("time_signature", DoubleType(), True),
        StructField("time_signature_confidence", DoubleType(), True),
        StructField("title", StringType(), True),
        StructField("year", IntegerType(), True),
        StructField("partial_sequence", IntegerType(), True)
      ]
    )
    
    (spark.readStream
      .format("cloudFiles")
      .schema(schema)
      .option("cloudFiles.format", "csv")
      .option("sep","\t")
      .load(file_path)
      .writeStream
      .option("checkpointLocation", checkpoint_path)
      .trigger(availableNow=True)
      .toTable(table_name)
    )
    

    Unity Catalogを使用している場合は、<table-name> を取り込まれたレコードを含めるカタログ、スキーマ、テーブル名に置き換えます(例:data_pipelines.songs_data.raw_song_data)。それ以外の場合は、<table-name> を取り込まれたレコードを含めるテーブルの名前(raw_song_dataなど) に置き換えます。

    <checkpoint-path> を、チェックポイントファイルを管理するためのDBFS内のディレクトリへのパス(例:/tmp/pipeline_get_started/_checkpoint/song_data)に置き換えます。

  4. 実行メニュー をクリックし、「セルを実行」を選択します。この例では、 READMEからの情報を使用してデータ スキーマを定義し、 file_pathに含まれるすべてのファイルから songs データを取り込み、 table_nameで指定されたテーブルにデータを書き込みます。

ステップ4:生データを準備する

分析用の生データを準備するために、次の手順で、不要な列をフィルターで除外し、新しいレコードを作成するためのタイムスタンプを含む新しいフィールドを追加することによって、生の曲データを変換します。

  1. サイドバーで 新しいアイコン [ 新規 ] をクリックし、メニューから [ノートブック ] を選択します。[ ノートブックの作成] ダイアログが表示されます。

  2. ノートブックの名前を入力します。たとえば、Prepare songs dataなどです。デフォルトの言語をSQLに変更します。

  3. ノートブックの最初のセルに次のように入力します。

    CREATE OR REPLACE TABLE
      <table-name> (
        artist_id STRING,
        artist_name STRING,
        duration DOUBLE,
        release STRING,
        tempo DOUBLE,
        time_signature DOUBLE,
        title STRING,
        year DOUBLE,
        processed_time TIMESTAMP
      );
    
    INSERT INTO
      <table-name>
    SELECT
      artist_id,
      artist_name,
      duration,
      release,
      tempo,
      time_signature,
      title,
      year,
      current_timestamp()
    FROM
      <raw-songs-table-name>
    

    Unity Catalogを使用している場合は、フィルターおよび変換されたレコード(例:data_pipelines.songs_data.prepared_song_data)を含めるために、 <table-name> をカタログ、スキーマ、およびテーブル名に置き換えます。それ以外の場合は、<table-name> をフィルター処理および変換されたレコードを含むテーブルの名前(例:prepared_song_data)に置き換えます。

    <raw-songs-table-name> を、前の手順で取り込まれた生の曲レコードを含むテーブルの名前に置き換えます。

  4. 実行メニュー をクリックし、「セルを実行」を選択します。

ステップ5:変換されたデータをクエリーする

このステップでは、曲データを分析するためのクエリーを追加して処理パイプラインを拡張します。これらのクエリーは、前のステップで作成された準備済みレコードを使用します。

  1. サイドバーで 新しいアイコン [ 新規 ] をクリックし、メニューから [ノートブック ] を選択します。[ ノートブックの作成] ダイアログが表示されます。

  2. ノートブックの名前を入力します。たとえば、Analyze songs dataなどです。デフォルトの言語をSQLに変更します。

  3. ノートブックの最初のセルに次のように入力します。

    -- Which artists released the most songs each year?
    SELECT
      artist_name,
      count(artist_name)
    AS
      num_songs,
      year
    FROM
      <prepared-songs-table-name>
    WHERE
      year > 0
    GROUP BY
      artist_name,
      year
    ORDER BY
      num_songs DESC,
      year DESC
    

    <prepared-songs-table-name>を、準備されたデータを含むテーブルの名前に置き換えます。たとえば、data_pipelines.songs_data.prepared_song_data などです。

  4. セルアクションメニューをクリックし ダウンキャレット 、「 下にセルを追加 」を選択して、新しいセルに次のように入力します。

     -- Find songs for your DJ list
     SELECT
       artist_name,
       title,
       tempo
     FROM
       <prepared-songs-table-name>
     WHERE
       time_signature = 4
       AND
       tempo between 100 and 140;
    

    <prepared-songs-table-name> を、前のステップで作成した準備済みテーブルの名前に置き換えます。たとえば、data_pipelines.songs_data.prepared_song_data などです。

  5. クエリを実行して出力を表示するには、[すべて実行] をクリックします。

ステップ6:パイプラインを実行するDatabricksジョブを作成する

Databricksジョブを使用して、データの取り込み、処理、分析のステップの実行を自動化するワークフローを作成できます。

  1. データサイエンス & エンジニアリングワークスペースで、次のいずれかを実行します。

    • クリックワークフローアイコンサイドバーのワークフローをクリックして「ジョブを作成」ボタン

    • サイドバーで、[新しいアイコン 新規 ] をクリックし 、[ジョブ] を選択します。

  2. タスクダイアログボックスの [タスク] タブで、 [ジョブの名前を追加…] をジョブ名に置き換えます。たとえば、「曲のワークフロー」などです。

  3. [タスク名] に、最初のタスクの名前を入力します(例:Ingest_songs_data)。

  4. [タイプ] で、タスクのタイプとして [ノートブック] を選択します。

  5. [ソース] で [ワークスペース] を選択します。

  6. ファイルブラウザーを使用してデータ取り込みノートブックを見つけ、ノートブック名をクリックして、[確認] をクリックします。

  7. [クラスター] で、 Shared_job_cluster または Create a cluster ステップで作成したクラスターを選択します。

  8. [作成] をクリックします。

  9. 作成したタスクの下をクリックし 「タスクを追加」ボタン 、[ ノートブック] を選択します。

  10. [タスク名] にタスクの名前を入力します(例:Prepare_songs_data)。

  11. [タイプ] で、タスクのタイプとして [ノートブック] を選択します。

  12. [ソース] で [ワークスペース] を選択します。

  13. ファイルブラウザを使用してデータ準備ノートブックを見つけ、ノートブック名をクリックして、[確認] をクリックします。

  14. [クラスター] で、 Shared_job_cluster または Create a cluster ステップで作成したクラスターを選択します。

  15. [作成] をクリックします。

  16. 作成したタスクの下をクリックし 「タスクを追加」ボタン 、[ ノートブック] を選択します。

  17. [タスク名] にタスクの名前を入力します(例:Analyze_songs_data)。

  18. [タイプ] で、タスクのタイプとして [ノートブック] を選択します。

  19. [ソース] で [ワークスペース] を選択します。

  20. ファイルブラウザを使用してデータ分析ノートブックを見つけ、ノートブック名をクリックして、 [確認] をクリックします。

  21. [クラスター] で、 Shared_job_cluster または Create a cluster ステップで作成したクラスターを選択します。

  22. [作成] をクリックします。

  23. ワークフローを実行するには、 をクリックします 「今すぐ実行」ボタン実行の詳細 を表示するには、 ジョブ実行 ビューで実行の[開始時刻] 列のリンクをクリックします。各タスクをクリックすると、タスク実行の詳細が表示されます。

  24. ワークフローの完了時に結果を表示するには、最終データ分析タスクをクリックします。「出力」ページが表示され、クエリー結果が表示されます。

ステップ7:データパイプラインジョブをスケジュールする

Databricksジョブを使用してスケジュールされたワークフローをオーケストレーションすることを示すために、この開始例では取り込み、準備、分析の各ステップを別々のノートブックに分離し、各ノートブックを使用してジョブのタスクを作成します。すべての処理が1つのノートブックに含まれている場合は、DatabricksノートブックUIから直接ノートブックを簡単にスケジュールできます。「スケジュールされたノートブックジョブの作成と管理」を参照してください。

一般的な要件は、定期的にデータパイプラインを実行することです。パイプラインを実行するジョブのスケジュールを定義するには、以下の手順を実行します。

  1. サイドバーのワークフローアイコンワークフロー]をクリックします。

  2. [名前] 列で、ジョブ名をクリックします。サイドパネルにはジョブの詳細が表示されます。

  3. ジョブの詳細パネルで [トリガーを追加] をクリックし、[トリガータイプ] で [スケジュール済み] を選択します。

  4. 期間、開始時刻、およびタイムゾーンを指定します。Quartz Cron Syntax でスケジュールを表示および編集するには、オプションで Cron Syntaxを表示 チェックボックスを選択します。

  5. [保存]をクリックします。

詳細情報