Databricks でエンドツーエンドのデータパイプラインを構築する
この記事では、生データの取り込み、データの変換、処理されたデータの分析の実行方法など、エンドツーエンドのデータ処理パイプラインを作成およびデプロイする方法について説明します。
この記事では、Databricksノートブック とDatabricksジョブ を使用してワークフローを調整する完全なデータ パイプラインを作成する方法について説明しますが、Databricks では、信頼性が高く、保守可能で、テスト可能なデータ処理パイプラインを構築するための宣言型インターフェイスであるDLT を使用することをお勧めします。
データパイプラインとは?
データパイプラインは、ソースシステムからデータを移動し、要件に基づいてデータを変換し、ターゲットシステムにデータを格納するために必要な手順を実装します。データパイプラインには、生データをユーザーが使用できる準備済みデータに変換するために必要なすべてのプロセスが含まれています。たとえば、データパイプラインは、データアナリストやデータサイエンティストが分析とレポートを通じてデータから価値を抽出できるようにデータを準備する場合があります。
抽出、変換、読み込み(ETL)ワークフローは、データデータパイプラインパイプラインの一般的な例です。ETL処理では、データはソースシステムから取り込まれ、ステージングエリアに書き込まれ、要件(データ品質の確保、レコードの重複排除など)に基づいて変換され、データウェアハウスやデータレイクなどのターゲットシステムに書き込まれます。
データパイプラインのステップ
Databricksでデータパイプラインの構築を始めるために、この記事に含まれる例ではデータ処理ワークフローの作成について説明します。
- Databricksの機能を使って生のデータセットを探索する
- Databricksノートブックを作成し、生のソースデータを取り込み、生のデータをターゲットテーブルに書き込む
- Databricksノートブックを作成して生のソースデータを変換し、変換後のデータをターゲットテーブルに書き込む
- Databricksノートブックを作成して、変換されたデータをクエリーする
- Databricksジョブでデータパイプラインを自動化する
必要条件
- Databricks にログインし、データサイエンス& エンジニアリングワークスペースにいる
- コンピュート リソースを作成する権限、またはコンピュート リソースにアクセスする権限があります。
- (オプション)テーブルを Unity Catalog に発行するには、Unity Catalog で カタログ と スキーマ を作成する必要があります。
例: Million Song データセット
この例で使用されているデータセットは、現代音楽トラックの機能とメタデータのコレクションである Million Song データセットのサブセットです。 このデータセットは、Databricksワークスペースに含まれている サンプルデータセット に含まれています。
ステップ 1: コンピュート リソースを作成する
この例でデータ処理と分析を実行するには、コマンドを実行するコンピュート リソースを作成します。
この例では、DBFS に格納されたサンプル データセットを使用し、 Unity Catalogにテーブルを保持することを推奨しているため、専用アクセス・モードで構成されたコンピュート リソースを作成します。専用アクセス モードでは、DBFS へのフル アクセスを提供しながら、Unity Catalog へのアクセスも可能にします。 「DBFS と Unity Catalog のベスト プラクティス」を参照してください。
- サイドバーの [ コンピュート ] をクリックします。
- 「コンピュート」ページで、「 コンピュートを作成 」をクリックします。
- 新しいコンピュート ページで、コンピュート リソースの一意の名前を入力します。
- [詳細設定] で、アクセス モード設定を [手動 ] に切り替え、[ 専用 ] を選択します。
- [ シングル ユーザーまたはグループ ] で、ユーザー名を選択します。
- 残りの値はデフォルトの状態のままにして、[ 作成 ] をクリックします。
Databricks コンピュート リソースの詳細については、「コンピュート」を参照してください。
ステップ 2: ソース データを探索する
Databricks インターフェイスを使用して未加工のソース データを探索する方法については、「データ パイプラインのソース データの探索」を参照してください。データの取り込みと準備に直接進む場合は、「 ステップ 3: 生データを取り込む」に進みます。
ステップ 3: 生データを取り込む
このステップでは、生データをテーブルにロードして、さらに処理できるようにします。 テーブルなどの Databricks プラットフォーム上のデータ資産を管理するために、Databricks では Unity Catalog をお勧めします。 ただし、テーブルを Unity Catalogに公開するために必要なカタログとスキーマを作成する権限がない場合でも、テーブルを Hive metastoreに公開することで、次の手順を完了できます。
データを取り込むには、Databricks を使用することをお勧めしますAuto Loader 。Auto Loader は、新しいファイルがクラウド オブジェクト ストレージに到着すると、自動的に検出して処理します。
読み込まれたデータのスキーマを自動的に検出するようにAuto Loaderを構成すると、データスキーマを明示的に宣言せずにテーブルを初期化し、新しい列の導入に応じてテーブルスキーマを進化させることができます。これにより、時間の経過とともにスキーマの変更を手動で追跡して適用する必要がなくなります。DatabricksはAuto Loaderを使用する際にスキーマ推論を推奨しています。ただし、データ探索ステップで見られるように、曲データにはヘッダー情報が含まれていません。ヘッダーはデータとともに保存されないため、次の例に示すように、スキーマを明示的に定義する必要があります。
-
サイドバーで
[新規] をクリックし、メニューから [ノートブック] を選択します。 [ノートブックの作成] ダイアログが表示されます。
-
ノートブックの名前を入力します(例:
Ingest songs data
)。デフォルトでは、- Python が選択された言語です。
- ノートブックは、最後に使用したコンピュート リソースにアタッチされます。 この場合は、「 ステップ 1: コンピュート リソースを作成する」で作成したリソースです。
-
ノートブックの最初のセルに次のように入力します。
Pythonfrom pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField
# Define variables used in the code below
file_path = "/databricks-datasets/songs/data-001/"
table_name = "<table-name>"
checkpoint_path = "/tmp/pipeline_get_started/_checkpoint/song_data"
schema = StructType(
[
StructField("artist_id", StringType(), True),
StructField("artist_lat", DoubleType(), True),
StructField("artist_long", DoubleType(), True),
StructField("artist_location", StringType(), True),
StructField("artist_name", StringType(), True),
StructField("duration", DoubleType(), True),
StructField("end_of_fade_in", DoubleType(), True),
StructField("key", IntegerType(), True),
StructField("key_confidence", DoubleType(), True),
StructField("loudness", DoubleType(), True),
StructField("release", StringType(), True),
StructField("song_hotnes", DoubleType(), True),
StructField("song_id", StringType(), True),
StructField("start_of_fade_out", DoubleType(), True),
StructField("tempo", DoubleType(), True),
StructField("time_signature", DoubleType(), True),
StructField("time_signature_confidence", DoubleType(), True),
StructField("title", StringType(), True),
StructField("year", IntegerType(), True),
StructField("partial_sequence", IntegerType(), True)
]
)
(spark.readStream
.format("cloudFiles")
.schema(schema)
.option("cloudFiles.format", "csv")
.option("sep","\t")
.load(file_path)
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(availableNow=True)
.toTable(table_name)
)Unity Catalogを使用している場合は、
<table-name>
を取り込まれたレコードを含めるカタログ、スキーマ、テーブル名に置き換えます(例:data_pipelines.songs_data.raw_song_data
)。それ以外の場合は、<table-name>
を取り込まれたレコードを含めるテーブルの名前(raw_song_data
など) に置き換えます。<checkpoint-path>
を、チェックポイントファイルを管理するためのDBFS内のディレクトリへのパス(例:/tmp/pipeline_get_started/_checkpoint/song_data
)に置き換えます。 -
「
」をクリックし、「 セルの実行」 を選択します。 この例では、
README
からの情報を使用してデータスキーマを定義し、file_path
に含まれるすべてのファイルから曲データを取り込み、table_name
で指定されたテーブルにデータを書き込みます。
ステップ 4: 生データを準備する
分析用の生データを準備するために、次の手順で、不要な列をフィルターで除外し、新しいレコードを作成するためのタイムスタンプを含む新しいフィールドを追加することによって、生の曲データを変換します。
-
サイドバーで
[新規] をクリックし、メニューから [ノートブック] を選択します。 [ノートブックの作成] ダイアログが表示されます。
-
ノートブックの名前を入力します。たとえば、
Prepare songs data
などです。デフォルトの言語を SQL に変更します。 -
ノートブックの最初のセルに次のように入力します。
SQLCREATE OR REPLACE TABLE
<table-name> (
artist_id STRING,
artist_name STRING,
duration DOUBLE,
release STRING,
tempo DOUBLE,
time_signature DOUBLE,
title STRING,
year DOUBLE,
processed_time TIMESTAMP
);
INSERT INTO
<table-name>
SELECT
artist_id,
artist_name,
duration,
release,
tempo,
time_signature,
title,
year,
current_timestamp()
FROM
<raw-songs-table-name>Unity Catalogを使用している場合は、フィルターおよび変換されたレコード(例:
data_pipelines.songs_data.prepared_song_data
)を含めるために、<table-name>
をカタログ、スキーマ、およびテーブル名に置き換えます。それ以外の場合は、<table-name>
をフィルター処理および変換されたレコードを含むテーブルの名前(例:prepared_song_data
)に置き換えます。<raw-songs-table-name>
を、前の手順で取り込まれた生の曲レコードを含むテーブルの名前に置き換えます。 -
「
」をクリックし、「 セルの実行」 を選択します。
ステップ 5: 変換されたデータのクエリ
このステップでは、曲データを分析するためのクエリーを追加して処理パイプラインを拡張します。これらのクエリーは、前のステップで作成された準備済みレコードを使用します。
-
サイドバーで
[新規] をクリックし、メニューから [ノートブック] を選択します。 [ノートブックの作成] ダイアログが表示されます。
-
ノートブックの名前を入力します。たとえば、
Analyze songs data
などです。デフォルトの言語を SQL に変更します。 -
ノートブックの最初のセルに次のように入力します。
SQL-- Which artists released the most songs each year?
SELECT
artist_name,
count(artist_name)
AS
num_songs,
year
FROM
<prepared-songs-table-name>
WHERE
year > 0
GROUP BY
artist_name,
year
ORDER BY
num_songs DESC,
year DESC<prepared-songs-table-name>
を、準備されたデータを含むテーブルの名前に置き換えます。たとえば、data_pipelines.songs_data.prepared_song_data
などです。 -
セルのアクションメニューで
をクリックし、 [下にセルを追加] を選択し、新しいセルに次の内容を入力します。
SQL-- Find songs for your DJ list
SELECT
artist_name,
title,
tempo
FROM
<prepared-songs-table-name>
WHERE
time_signature = 4
AND
tempo between 100 and 140;<prepared-songs-table-name>
を、前のステップで作成した準備済みテーブルの名前に置き換えます。たとえば、data_pipelines.songs_data.prepared_song_data
などです。 -
クエリを実行して出力を表示するには、 [すべて実行] をクリックします。
手順 6: パイプラインを実行する Databricks ジョブを作成する
Databricksジョブを使用して、データの取り込み、処理、分析のステップの実行を自動化するワークフローを作成できます。
-
データサイエンス & エンジニアリングワークスペースで、次のいずれかを実行します。
サイドバー の「 ワークフロー
」をクリックし、「 」をクリックします。
- サイドバーの
[ 新規 ]をクリックし、[ ジョブ ]を選択します。
-
タスクダイアログボックスの [タスク] タブで、 [ジョブの名前を追加…] をジョブ名に置き換えます。たとえば、「曲のワークフロー」などです。
-
[タスク名] に、最初のタスクの名前を入力します(例:
Ingest_songs_data
)。 -
[ タイプ ] で、タスクのタイプとして [ ノートブック ] を選択します。
-
[ ソース ] で [ ワークスペース ] を選択します。
-
[パス ] フィールドで、ファイル ブラウザーを使用してデータ取り込み ノートブックを検索し、[ 確認] をクリックします。
-
コンピュート で、
Create a compute resource
手順で作成したコンピュート リソースを選択します。 -
作成 をクリックします。
-
作成したタスクの下にある
をクリックし、[ ノートブック] を選択します。
-
[タスク名] にタスクの名前を入力します(例:
Prepare_songs_data
)。 -
[ タイプ ] で、タスクのタイプとして [ ノートブック ] を選択します。
-
[ ソース ] で [ ワークスペース ] を選択します。
-
ファイルブラウザを使用してデータ準備ノートブックを見つけ、ノートブック名をクリックして、 [確認] をクリックします。
-
コンピュート で、
Create a compute resource
手順で作成したコンピュート リソースを選択します。 -
作成 をクリックします。
-
作成したタスクの下にある
をクリックし、[ ノートブック] を選択します。
-
[タスク名] にタスクの名前を入力します(例:
Analyze_songs_data
)。 -
[ タイプ ] で、タスクのタイプとして [ ノートブック ] を選択します。
-
[ ソース ] で [ ワークスペース ] を選択します。
-
ファイルブラウザを使用してデータ分析ノートブックを見つけ、ノートブック名をクリックして、 [確認] をクリックします。
-
コンピュート で、
Create a compute resource
手順で作成したコンピュート リソースを選択します。 -
作成 をクリックします。
-
ワークフローを実行するには、[
] をクリックします。 実行の詳細 を表示するには、ジョブ実行 ビューで実行の[開始時刻] 列のリンクをクリックします。各タスクをクリックすると、タスク実行の詳細が表示されます。
-
ワークフローの完了時に結果を表示するには、最終データ分析タスクをクリックします。 「出力」 ページが表示され、クエリー結果が表示されます。
ステップ 7: データパイプライン ジョブのスケジュール
Databricks ジョブを使用してスケジュールされたワークフローを調整する方法を示すために、この入門例では、インジェスト、準備、分析のステップを別々のノートブックに分割し、各ノートブックを使用してジョブにタスクを作成します。すべての処理が 1 つのノートブックに含まれている場合は、Databricks ノートブック UI から直接ノートブックを簡単にスケジュールできます。 「スケジュールされたノートブック ジョブの作成と管理」を参照してください。
一般的な要件は、定期的にデータパイプラインを実行することです。パイプラインを実行するジョブのスケジュールを定義するには、以下の手順を実行します。
- サイドバーの
[ ワークフロー ]をクリックします。
- [名前] 列で、ジョブ名をクリックします。サイドパネルには ジョブの詳細 が表示されます。
- ジョブの詳細パネルで [ トリガーを追加 ] をクリックし、[ トリガータイプ ] で [ スケジュール済み ] を選択します。
- 期間、開始時刻、およびタイムゾーンを指定します。 Quartz Cron Syntax でスケジュールを表示および編集するには、オプションで Cron Syntaxを表示 チェックボックスを選択します。
- [ 保存 ]をクリックします。
詳細情報
- Databricks ノートブックの詳細については、「 Databricks ノートブックの概要」を参照してください。
- Databricks ジョブの詳細については、「 ジョブとは」を参照してください。
- Delta Lake の詳細については、「 Delta Lake とは」を参照してください。
- DLT を使用したデータ処理パイプラインの詳細については、「 DLT とは」を参照してください。