チュートリアル: LakeFlow Pipelines Editor を使用して最初のパイプラインを作成する
データ オーケストレーションとAuto Loader用にLakeFlow Spark宣言型パイプライン (SDP) を使用して新しいパイプラインを作成する方法を学びます。 このチュートリアルでは、データをクリーンアップし、上位 100 人のユーザーを検索するクエリを作成して、サンプル パイプラインを拡張します。
このチュートリアルでは、 LakeFlow Pipelines Editor を使用して次のことを行う方法を学習します。
- デフォルトのフォルダー構造を使用して新しいパイプラインを作成し、一連のサンプル ファイルから開始します。
- 期待値を使用してデータ品質の制約を定義します。
- エディター機能を使用して、新しい変換でパイプラインを拡張し、データの分析を実行します。
要件
このチュートリアルを開始する前に、次の作業を行う必要があります。
- Databricks ワークスペースにログインしている必要があります。
- ワークスペースで Unity Catalog を有効にします。
- ワークスペースに対してLakeFlow Pipelinesエディターを有効にし、オプトインする必要があります。 LakeFlow Pipelines Editor の有効化」と「モニタリングの更新」を参照してください。
- コンピュート リソースを作成する権限、またはコンピュート リソースにアクセスする権限を持っています。
- カタログに新しいスキーマを作成する権限を持っています。必要な権限は
ALL PRIVILEGESまたはUSE CATALOGとCREATE SCHEMAです。
ステップ 1: パイプラインを作成する
このステップでは、当然のフォルダー構造とコード サンプルを使用してパイプラインを作成します。 コード サンプルは、 wanderbricksサンプル データ ソースのusersテーブルを参照します。
-
Databricksワークスペースで、 をクリックします。
新しい 、それから
ETLパイプライン 。 これにより、パイプラインの作成ページでパイプライン エディターが開きます。
-
ヘッダーをクリックしてパイプラインに名前を付けます。
-
名前のすぐ下で、出力テーブルのデフォルトのカタログとスキーマを選択します。これらは、パイプライン定義でカタログとスキーマを指定しない場合に使用されます。
-
パイプラインの次のステップ で、次のいずれかをクリックします。
SQLのサンプルコードから始める か、
言語の好みに応じて、 Python のサンプル コードから始めます 。これにより、サンプル コードの デフォルト 言語が変更されますが、後で他の言語でコードを追加できます。 これにより、開始するためのサンプル コードを含むデフォルトのフォルダー構造が作成されます。
-
ワークスペースの左側にあるパイプライン アセット ブラウザーでサンプル コードを表示できます。
transformationsの下には、それぞれ 1 つのパイプライン データセットを生成する 2 つのファイルがあります。explorationsの下には、パイプラインの出力を表示するのに役立つコードが含まれるノートブックがあります。ファイルをクリックすると、エディターでコードを表示および編集できます。出力データセットはまだ作成されておらず、画面の右側の パイプライン グラフ は空です。
-
パイプラインコード(
transformationsフォルダ内のコード)を実行するには、画面右上の パイプライン実行を クリックします。実行が完了すると、ワークスペースの下部に、作成された 2 つの新しいテーブル
sample_users_<pipeline-name>とsample_aggregation_<pipeline-name>が表示されます。また、ワークスペースの右側の パイプライン グラフ に 2 つのテーブルが表示され、sample_usersがsample_aggregationのソースになっていることもわかります。
ステップ 2: データ品質チェックを適用する
このステップでは、 sample_usersテーブルにデータ品質チェックを追加します。パイプラインの期待値を使用してデータを制限します。この場合、有効な電子メール アドレスを持たないユーザー レコードを削除し、クリーンアップされたテーブルをusers_cleanedとして出力します。
-
パイプラインアセットブラウザで、
をクリックし、 変換 を選択します。
-
新しい変換ファイルの作成 ダイアログで、次の選択を行います。
- 言語 として Python または SQL の いずれかを選択します。これは以前の選択と一致する必要はありません。
- ファイルに名前を付けます。この場合は、
users_cleanedを選択します。 - 宛先パス については、デフォルトのままにします。
- データセットの種類 については、 [なし] を選択したまま にするか、 マテリアライズドビュー を選択します。 マテリアライズドビュー を選択すると、サンプルコードが生成されます。
-
新しいコード ファイルで、次のコードと一致するようにコードを編集します (前の画面での選択に基づいて、SQL または Python を使用します)。
<pipeline-name>sample_usersテーブルの完全な名前に置き換えます。
- SQL
- Python
-- Drop all rows that do not have an email address
CREATE MATERIALIZED VIEW users_cleaned
(
CONSTRAINT non_null_email EXPECT (email IS NOT NULL) ON VIOLATION DROP ROW
) AS
SELECT *
FROM sample_users_<pipeline-name>;
from pyspark import pipelines as dp
# Drop all rows that do not have an email address
@dp.table
@dp.expect_or_drop("no null emails", "email IS NOT NULL")
def users_cleaned():
return (
spark.read.table("sample_users_<pipeline_name>")
)
- パイプラインを更新するには、 [パイプラインの実行] をクリックします。これで 3 つのテーブルが作成されます。
ステップ 3: トップユーザーを分析する
次に、作成した予約数に基づいて上位 100 人のユーザーを取得します。wanderbricks.bookingsテーブルをusers_cleanedマテリアライズドビューに結合します。
-
パイプラインアセットブラウザで、
をクリックし、 変換 を選択します。
-
新しい変換ファイルの作成 ダイアログで、次の選択を行います。
- 言語 として Python または SQL の いずれかを選択します。以前の選択内容と一致する必要はありません。
- ファイルに名前を付けます。この場合は、
users_and_bookingsを選択します。 - 宛先パス については、デフォルトのままにします。
- 「データセットタイプ」 は 「なし」を選択した ままにします。
-
新しいコード ファイルで、次のコードと一致するようにコードを編集します (前の画面での選択に基づいて、SQL または Python を使用します)。
- SQL
- Python
-- Get the top 100 users by number of bookings
CREATE OR REFRESH MATERIALIZED VIEW users_and_bookings AS
SELECT u.name AS name, COUNT(b.booking_id) AS booking_count
FROM users_cleaned u
JOIN samples.wanderbricks.bookings b ON u.user_id = b.user_id
GROUP BY u.name
ORDER BY booking_count DESC
LIMIT 100;
from pyspark import pipelines as dp
from pyspark.sql.functions import col, count, desc
# Get the top 100 users by number of bookings
@dp.table
def users_and_bookings():
return (
spark.read.table("users_cleaned")
.join(spark.read.table("samples.wanderbricks.bookings"), "user_id")
.groupBy(col("name"))
.agg(count("booking_id").alias("booking_count"))
.orderBy(desc("booking_count"))
.limit(100)
)
-
パイプラインの実行 をクリックしてデータセットを更新します。実行が完了すると、 パイプライン グラフ に新しい
users_and_bookingsテーブルを含む 4 つのテーブルがあることが表示されます。
次のステップ
LakeFlow Pipelinesエディターのいくつかの機能の使い方とパイプラインの作成方法を学習したので、次に、さらに学習するその他の機能について説明します。
-
パイプラインの作成中に変換を操作およびデバッグするためのツール:
- 選択的処刑
- データプレビュー
- インタラクティブ DAG (パイプライン内のデータセットのグラフ)
-
組み込みのDatabricks Asset Bundles統合により、エディターから直接、効率的なコラボレーション、バージョン管理、CI/CD 統合が可能になります。