チュートリアル: LakeFlow Pipelines Editor を使用して最初のパイプラインを作成する
データ オーケストレーションとAuto Loader用にLakeFlow Spark宣言型パイプライン (SDP) を使用して新しいパイプラインを作成する方法を学びます。 このチュートリアルでは、データをクリーンアップし、上位 100 人のユーザーを検索するクエリを作成して、サンプル パイプラインを拡張します。
このチュートリアルでは、 LakeFlow Pipelines Editor を使用して次のことを行う方法を学習します。
- デフォルトのフォルダー構造を使用して新しいパイプラインを作成し、一連のサンプル ファイルから開始します。
- エクスペクテーションを使用してデータ品質の制約を定義します。
- エディター機能を使用して、新しい変換でパイプラインを拡張し、データの分析を実行します。
要件
このチュートリアルを開始する前に、次の作業を行う必要があります。
- Databricks ワークスペースにログインしている必要があります。
- ワークスペースで Unity Catalog を有効にします。
- コンピュート リソースを作成する権限、またはコンピュート リソースにアクセスする権限を持っています。
- カタログに新しいスキーマを作成する権限を持っています。必要な権限は
ALL PRIVILEGESまたはUSE CATALOGとCREATE SCHEMAです。
ステップ 1: パイプラインを作成する
このステップでは、当然のフォルダー構造とコード サンプルを使用してパイプラインを作成します。 コード サンプルは、 wanderbricksサンプル データ ソースのusersテーブルを参照します。
-
Databricksワークスペースで、
新しい 、それから
ETLパイプライン 。 これにより、
New Pipeline <date> <time>のようなデフォルトのパイプライン名でパイプラインエディタが開きます。 -
(オプション)パイプラインの名前を選択し、分かりやすい名前を入力してください。
-
(オプション)名前の右側にあるカタログとスキーマをクリックして、異なるデフォルト設定を行います。
-
(オプション)作成された
my_transformationソースファイルで、言語ドロップダウンリストから Python または SQL を選択して、ファイルの言語を設定します。 -
クリック
サンプルコードを使用してください 。
選択した言語のサンプルコードは、
transformationsフォルダ内のmy_transformationソースファイルに表示されます。出力データセットはまだ作成されておらず、画面右側の パイプライングラフ は空です。 -
パイプラインコード(
transformationsフォルダ内のコード)を実行するには、画面右上の パイプライン実行 をクリックします。実行が完了すると、ワークスペースの下部に作成された 2 つの新しいテーブル
sample_users_<date_time>とsample_aggregation_<date_time>が表示されます。ワークスペースの右側にある パイプライングラフ には、sample_usersがsample_aggregationソースであることを含め、2 つのテーブルが表示されています。sample_users_<date_time>テーブル名全体をメモしておいてください。次のステップで参照します。
ステップ 2: データ品質チェックを適用する
このステップでは、 sample_usersテーブルにデータ品質チェックを追加します。パイプラインの期待値を使用してデータを制限します。この場合、有効な電子メール アドレスを持たないユーザー レコードを削除し、クリーンアップされたテーブルをusers_cleanedとして出力します。
-
左側のパイプラインアセットブラウザーで、クリックします
、そして 「変換」 を選択します。
-
新しい変換ファイルの作成 ダイアログで、次の選択を行います。
- 言語 として Python または SQL のいずれかを選択します。これは以前の選択と一致する必要はありません。
- ファイルに名前を付けます。この場合は、
users_cleanedを選択します。 - 宛先パス については、デフォルトのままにします。
- データセットの種類 については、 [なし] を選択したまま にするか、 マテリアライズドビュー を選択します。 マテリアライズドビュー を選択すると、サンプルコードが生成されます。
-
「作成」 をクリックして、変換コードファイルを作成します。
-
新しいコードファイルで、以下の内容に合わせてコードを編集してください(前の画面で選択した内容に応じて、SQLまたはPythonを使用してください)。
sample_users_<date_time>前のセクションで作成したsample_usersテーブルの完全な名前に置き換えてください。
- SQL
- Python
-- Drop all rows that do not have an email address
CREATE MATERIALIZED VIEW users_cleaned
(
CONSTRAINT non_null_email EXPECT (email IS NOT NULL) ON VIOLATION DROP ROW
) AS
SELECT *
FROM sample_users_<date_time>;
from pyspark import pipelines as dp
# Drop all rows that do not have an email address
@dp.materialized_view
@dp.expect_or_drop("no null emails", "email IS NOT NULL")
def users_cleaned():
return (
spark.read.table("sample_users_<date_time>")
)
- パイプラインを更新するには、 [パイプラインの実行] をクリックします。これで 3 つのテーブルが作成されます。
ステップ 3: トップユーザーを分析する
次に、作成した予約数に基づいて上位100人のユーザーを取得します。wanderbricks.bookingsテーブルをusers_cleanedマテリアライズドビューに結合します。
-
左側のパイプラインアセットブラウザーで、クリックします
、そして 「変換」 を選択します。
-
新しい変換ファイルの作成 ダイアログで、次の選択を行います。
- 言語 として Python または SQL のいずれかを選択します。以前の選択内容と一致する必要はありません。
- ファイルに名前を付けます。この場合は、
users_and_bookingsを選択します。 - 宛先パス については、デフォルトのままにします。
- 「データセットタイプ」 は 「なし」を選択した ままにします。
-
「作成」 をクリックして、変換コードファイルを作成します。
-
新しいコード ファイルで、次のコードと一致するようにコードを編集します (前の画面での選択に基づいて、SQL または Python を使用します)。
- SQL
- Python
-- Get the top 100 users by number of bookings
CREATE OR REFRESH MATERIALIZED VIEW users_and_bookings AS
SELECT u.name AS name, COUNT(b.booking_id) AS booking_count
FROM users_cleaned u
JOIN samples.wanderbricks.bookings b ON u.user_id = b.user_id
GROUP BY u.name
ORDER BY booking_count DESC
LIMIT 100;
from pyspark import pipelines as dp
from pyspark.sql.functions import col, count, desc
# Get the top 100 users by number of bookings
@dp.materialized_view
def users_and_bookings():
return (
spark.read.table("users_cleaned")
.join(spark.read.table("samples.wanderbricks.bookings"), "user_id")
.groupBy(col("name"))
.agg(count("booking_id").alias("booking_count"))
.orderBy(desc("booking_count"))
.limit(100)
)
-
パイプラインの実行 をクリックしてデータセットを更新します。実行が完了すると、 パイプライン グラフ に新しい
users_and_bookingsテーブルを含む 4 つのテーブルがあることが表示されます。
次のステップ
LakeFlow Pipelinesエディターのいくつかの機能の使い方とパイプラインの作成方法を学習したので、次に、さらに学習するその他の機能について説明します。
-
パイプラインの作成中に変換を操作およびデバッグするためのツール:
- 選択的実行
- データプレビュー
- 対話型パイプライングラフ(パイプライン内のデータセットのグラフ)
-
エディターから直接、効率的なコラボレーション、バージョン管理、CI/CD統合を実現する、組み込みの宣言型自動化バンドル統合機能: