パイプラインの単体テスト
ベータ版
この機能はベータ版です。
DatabricksでのPython単体テストに関する一般的な情報については、Python単体テストを参照してください。
Lakeflow Spark宣言型パイプラインは、ウェブベースのLakeflow Pipelines エディターでPythonユニットテストの記述に対応しています。これにより、モックデータを使用して Python または SQL の変換ロジックを検証できます。パイプライン テスト フレームワークを使用すると、エッジ ケースをテストし、独自のパイプライン APIs(CDC、ストリーミングテーブル、期待値、追加フロー)を検証し、ライブ データセットに影響を与えることなく迅速に反復できます。
- 隔離されたテスト実行 :このフレームワークは、完全に隔離されたカタログと通信するSparkSessionの作成を可能にするプリミティブを提供し、テスト内で入力データをモックできるようにします。
- 柔軟なテストスコープ :テストSparkSessionを使用して、パイプラインのコンピュートで、パイプラインのサブセット(個々のテーブル、依存テーブルのチェーン、またはパイプライン全体)を実行します。
- 結果の検証 :標準のpytestアサーションを使用して、テストで作成された分離された出力テーブルの結果を検証します。
単体テストを使用するタイミング
一般的な使用例は次のとおりです。
- 新しい変換ロジックの検証: 本番運用データに対して実行する前に、変換が期待されるスキーマ、行数、集計、ビジネスロジックを生成することをテストしてください。
- Testing Auto CDC specifications :モックデータを使用して、Auto CDC フロー定義が、挿入、更新、削除、slowly changing dimensions タイプといった変更イベントを正しく処理していることを検証します。
- 期待値とデータ品質ルールの検証 : 期待値が適切に不合格となり、データが有効な場合には合格することを確認してください。
- 依存するテーブル間でのテスト :パイプライン DAG を通じてデータが正しく流れることを検証するために、変換チェーン(たとえば、ブロンズ、シルバー、ゴールド)をテストします。
要件
can_runパイプラインに対するアクセス許可に加え、パイプラインのデフォルトカタログ内にあるUSE_CATALOGおよびCREATE_SCHEMAのアクセス許可。- パイプラインはトリガー(非連続)モードで設定する必要があります。
- パイプラインは**プレビュー**チャンネルを使用する必要があります。単体テストはベータ版で、プレビューでのみご利用いただけます。
制限事項:
- Editor-only execution : テストはWebベースのLakeFlow Pipelines Editorから実行する必要があります。
- Python テストのみ: テストはPythonで記述する必要があります。SQL パイプラインをテストできますが、テスト自体は Python である必要があります。
- ガバナンスの制限 : カタログ/スキーマレベルで設定された行フィルターまたは列マスクは、モックデータには適用されません。
- 並列実行不可 :パイプラインの更新実行中はテストを実行しないでください。本番運用のワークロードのパフォーマンスに深刻な影響を与える可能性があります。
ステップ 1: パイプライン設定を更新する
パイプラインを**トリガー** モードで**PREVIEW** チャンネルで実行するように設定してください。
- UIでパイプラインを開き、 設定 > 詳細設定 > チャンネル > プレビュー をクリックします。
- パイプライン モードを**トリガー**に設定します(連続は使用しないでください)。
または、パイプライン設定JSONを直接編集:
"continuous": false,
"channel": "PREVIEW"
ステップ 2:テストファイルを作成する
Lakeflowパイプラインエディターで、**[パイプラインを編集]**> **[パイプラインアセットを追加]** > **[テスト]** をクリックします。これは、testsフォルダとテストファイルを作成しますが、これらはパイプラインのソースコードには含まれません。

あるいは、フォルダとファイルを手動で作成してください。
- パイプラインを編集 > パイプラインアセットを追加 > 新規パイプラインフォルダー をクリックします。
- 名前に
testsと入力します。 testsフォルダーをクリックし、[ファイルを作成]> [Python] をクリックします。test_*.pyまたは*_test.pyのパターンを使用してファイルに名前を付けてください(たとえば、test_transformations.py)。
ステップ 3: テストを生成
Genie Codeはテストスキャフォールディングを生成できます:
-
テストファイル内で、 テストを生成 ボタンをクリックします。

-
または、Genie Codeエージェントモードで
/testsを使用します。
Genie Codeを使用してボイラープレートを生成し、その後、エッジケースに合わせてカスタマイズしてください。
または、テストコードを自分で書くこともできます。各テストファイルの先頭に以下のインポートを追加してください。
import pytest
from pyspark.pipelines.testing import TestPipeline, test_spark
test_pipeline = TestPipeline.active()
ステップ 4: テストを実行する
Lakeflow Pipelines Editorからテストを実行します:
- 個別のテストを実行するには、テスト機能の横にあるガターの
(play) ボタンをクリックします。
- テストファイルの上部にある [ **ファイル内のテストを実行** ] をクリックして、そのファイル内のすべてのテストを実行します。
テスト結果(成功または失敗)はエディターの下部パネルに表示されます。失敗をデバッグするには、アサーションエラーを確認します。
APIs のテスト
API | 説明 |
|---|---|
| Lakeflow Pipelines Editorで現在編集されているパイプラインの |
| パイプラインの更新を同期的に実行し、テーブル名が指定されている場合は、選択的な更新を行います。パイプラインの実行が成功するか、例外で終了した後に返されます。 |
| 隔離されたSparkSessionを作成し、すべてのテーブルの読み取りと書き込みを一時的なテストスキーマに自動的にリダイレクトすることで、本番運用データが影響を受けないようにします。 |
モックデータの作成
入力データをSQLまたはcreateDataFrameを使用してモックできます:
# Option 1: Using SQL
test_spark.sql("""
CREATE TABLE catalog.schema.table_name AS
SELECT * FROM VALUES
(1, 'value1'),
(2, 'value2')
AS t(id, name)
""")
# Option 2: Using createDataFrame
df = test_spark.createDataFrame(
[(1, 'value1'), (2, 'value2')],
schema=["id", "name"]
)
df.write.saveAsTable("catalog.schema.table_name")
より大量の現実的な合成データを生成するには、Faker ライブラリを使用できます。まずパイプラインで%pip install fakerを実行し、その後、FakerベースのUDFからDataFrameを構築してください。
# Option 3: Using Faker for synthetic data
from pyspark.sql import functions as F
from faker import Faker
fake = Faker()
fake_firstname = F.udf(fake.first_name)
fake_lastname = F.udf(fake.last_name)
fake_email = F.udf(fake.ascii_company_email)
df = (
test_spark.range(0, 100)
.withColumn("firstname", fake_firstname())
.withColumn("lastname", fake_lastname())
.withColumn("email", fake_email())
)
df.write.saveAsTable("catalog.schema.table_name")
パイプラインまたは特定のテーブルを実行します。
# Run specific tables
test_pipeline.run(test_spark, set(["catalog.schema.table1", "catalog.schema.table2"]))
# Run all tables in the pipeline
test_pipeline.run(test_spark)
例
例 1: 行数、スキーマ、および NULL 処理の集計検証
目的 : ユーザー集計がタイプ別にユーザーを正しくカウントし、Null Eメールを処理し、予期されるスキーマを生成すること。
パイプライン変換:
これらの変換は、シンプルな2テーブルのパイプラインを作成します: usersはユーザー データを選択し、countsはユーザーをタイプ別にグループ化し、ユーザーの総数と有効なEメールをカウントします。
from pyspark import pipelines as dp
from pyspark.sql.functions import col, count, count_if
@dp.table
def users():
return (
spark.read.table("catalog.schema.wanderbricks_users")
.select("user_id", "email", "name", "user_type")
)
@dp.table
def counts():
return (
spark.read.table("catalog.schema.users")
.withColumn("valid_email", col("email").isNotNull())
.groupBy("user_type")
.agg(
count("user_id").alias("total_count"),
count_if("valid_email").alias("count_valid_emails")
)
)
テスト :
これらのテストは、意図的なnull値を含むモックユーザーデータを作成し、パイプラインを分離して実行することで、行数、スキーマ構造、Null処理、および集計ロジックを検証します。
import pytest
from pyspark.pipelines.testing import TestPipeline, test_spark
from pyspark.testing import assertDataFrameEqual
test_pipeline = TestPipeline.active()
# Mock data fixture
def mock_users(session):
session.sql("""
CREATE TABLE catalog.schema.wanderbricks_users AS
SELECT * FROM VALUES
(1, 'alice@example.com', 'Alice', 'admin'),
(2, NULL, 'Bob', 'user'),
(3, 'charlie@example.com', 'Charlie', 'user'),
(4, NULL, 'Dana', 'admin')
AS t(user_id, email, name, user_type)
""")
# Test 1: Row count
def test_users_row_count(test_spark):
mock_users(test_spark)
test_pipeline.run(test_spark, set(["catalog.schema.users"]))
result = test_spark.table("catalog.schema.users")
assert result.count() == 4
# Test 2: Schema validation
def test_users_schema(test_spark):
mock_users(test_spark)
test_pipeline.run(test_spark, set(["catalog.schema.users"]))
result = test_spark.table("catalog.schema.users")
expected_fields = {"user_id", "email", "name", "user_type"}
actual_fields = set(f.name for f in result.schema.fields)
assert expected_fields == actual_fields
# Test 3: Null handling
def test_users_null_handling(test_spark):
mock_users(test_spark)
test_pipeline.run(test_spark, set(["catalog.schema.users"]))
result = test_spark.table("catalog.schema.users")
null_emails = result.filter("email IS NULL").count()
assert null_emails == 2
# Test 4: Aggregation
def test_counts(test_spark):
mock_users(test_spark)
# Run both tables since counts depends on users
test_pipeline.run(test_spark, set(["catalog.schema.users", "catalog.schema.counts"]))
result = test_spark.table("catalog.schema.counts")
# Check counts for each user_type
admin_row = result.filter("user_type = 'admin'").collect()[0]
user_row = result.filter("user_type = 'user'").collect()[0]
assert admin_row["total_count"] == 2
assert admin_row["count_valid_emails"] == 1
assert user_row["total_count"] == 2
assert user_row["count_valid_emails"] == 1
# Test 5: Full DataFrame comparison with assertDataFrameEqual
def test_counts_full_dataframe(test_spark):
mock_users(test_spark)
test_pipeline.run(test_spark, set(["catalog.schema.users", "catalog.schema.counts"]))
result = test_spark.table("catalog.schema.counts")
expected = test_spark.createDataFrame(
[("admin", 2, 1), ("user", 2, 1)],
schema=["user_type", "total_count", "count_valid_emails"]
)
assertDataFrameEqual(result, expected)
例 2: Auto CDC テスト
目標:Auto CDC がインサートと更新を含むチェンジフィードを正しく処理することを検証します。
パイプライン変換 :
この変換は、変更フィードからAuto CDCを設定し、ストリーミングの変更を読み取り、SCD Type 1としてターゲットテーブルに適用します(最新バージョンのみを保持します)。
from pyspark import pipelines as dp
from pyspark.sql.functions import col
@dp.view
def users():
return spark.readStream.table("catalog.schema.change_feed")
dp.create_streaming_table("target_autocdc")
dp.create_auto_cdc_flow(
target="target_autocdc",
source="users",
keys=["userId"],
sequence_by=col("ts"),
stored_as_scd_type=1
)
テスト :
最初のテストでは、同じuserIdに対して複数のレコードを持つモック変更フィードを作成し(更新をシミュレートしています)、ターゲットには最新のレコードのみが保持されることを検証します。2番目のテストは、パイプラインを実行し、変更フィードにイベントを追加し、パイプラインを再度実行することで、遅延イベントや順不同イベントをシミュレートします。
import pytest
from pyspark.pipelines.testing import TestPipeline, test_spark
test_pipeline = TestPipeline.active()
# Test 1: Standard inserts and updates
def test_auto_cdc_flow(test_spark):
# Create a mock change feed table
test_spark.sql("""
CREATE TABLE catalog.schema.change_feed AS
SELECT * FROM VALUES
(1, 'Alice', 1000),
(2, 'Bob', 1001),
(1, 'Alice Updated', 1002)
AS t(userId, name, ts)
""")
# Run the pipeline
test_pipeline.run(test_spark, set(["catalog.schema.target_autocdc"]))
# Read the output
result = test_spark.table("catalog.schema.target_autocdc")
# Verify two users exist
user_ids = set(row["userId"] for row in result.collect())
assert user_ids == {1, 2}
# Verify latest record for userId=1 has ts=1002
latest_user1 = result.filter("userId = 1").collect()[0]
assert latest_user1["ts"] == 1002
assert latest_user1["name"] == "Alice Updated"
# Verify userId=2 has ts=1001
user2 = result.filter("userId = 2").collect()[0]
assert user2["ts"] == 1001
# Test 2: Late-arriving and out-of-order events
def test_auto_cdc_late_arriving(test_spark):
# First batch of change events
test_spark.sql("""
CREATE TABLE catalog.schema.change_feed AS
SELECT * FROM VALUES
(1, 'Alice', 1000),
(2, 'Bob', 1001)
AS t(userId, name, ts)
""")
# Run the pipeline with the initial batch
test_pipeline.run(test_spark, set(["catalog.schema.target_autocdc"]))
# Append late-arriving events to the change feed:
# - A newer event for userId=1 (ts=1003) that arrived after the first run
# - A stale event for userId=2 (ts=999) with a timestamp older than what is already applied
test_spark.sql("""
INSERT INTO catalog.schema.change_feed VALUES
(1, 'Alice Updated', 1003),
(2, 'Bob (stale)', 999)
""")
# Re-run the pipeline. sequence_by=ts ensures stale events do not overwrite newer state.
test_pipeline.run(test_spark, set(["catalog.schema.target_autocdc"]))
result = test_spark.table("catalog.schema.target_autocdc")
# userId=1 should reflect the newer late-arriving event
alice = result.filter("userId = 1").collect()[0]
assert alice["ts"] == 1003
assert alice["name"] == "Alice Updated"
# userId=2 should be unchanged: the stale event with an older ts is ignored
bob = result.filter("userId = 2").collect()[0]
assert bob["ts"] == 1001
assert bob["name"] == "Bob"
例3:スナップショットからのAuto CDCテスト
目標: CDC が、挿入、更新、削除を含むスナップショットの変更内容を正しく処理していることを検証します。
パイプライン変換 :
この変換は、スナップショットからAuto CDCを設定します。これにより、スナップショットテーブルからデータを読み込み、完全な履歴を保持するSCDタイプ2として、時間の経過に伴う変更を追跡します。
from pyspark import pipelines as dp
@dp.view(name="source")
def source():
return spark.read.table("catalog.schema.snapshot")
dp.create_streaming_table("catalog.schema.target")
dp.create_auto_cdc_from_snapshot_flow(
target="target",
source="source",
keys=["userId"],
stored_as_scd_type=2
)
テスト :
このテストでは、初期のスナップショットを作成し、パイプラインを実行し、次に、新しいデータを切り捨てて挿入することでスナップショットの更新をシミュレートし、CDCがすべての変更をキャプチャすることを検証します。
import pytest
from pyspark.pipelines.testing import TestPipeline, test_spark
test_pipeline = TestPipeline.active()
def test_auto_cdc_from_snapshot_flow(test_spark):
# Create initial snapshot
test_spark.sql("""
CREATE TABLE catalog.schema.snapshot AS
SELECT * FROM VALUES
(1, 'Alice', '2024-01-01'),
(2, 'Bob', '2024-01-02')
AS t(userId, name, created_at)
""")
# Run the pipeline
test_pipeline.run(test_spark, set(["catalog.schema.target"]))
# Simulate a new snapshot by truncating and inserting updated data
test_spark.sql("TRUNCATE TABLE catalog.schema.snapshot")
test_spark.sql("INSERT INTO catalog.schema.snapshot VALUES (2, 'Bob', '2024-01-03')")
test_pipeline.run(test_spark, set(["catalog.schema.target"]))
# Verify SCD Type 2: should have 3 rows (original Alice, original Bob, updated Bob)
result = test_spark.table("catalog.schema.target")
assert result.count() == 3
user_ids = [row["userId"] for row in result.collect()]
assert set(user_ids) == {1, 2}
例 4: 結合とエクスペクテーションのテスト
目的 :結合が正しく機能し、エクスペクテーションが無効なデータをフィルターすることを検証します。
パイプライン変換 :
この変換は、プロパティ画像をアメニティと結合し、2024年1月以前にアップロードされた画像をフィルターで除外する条件を適用します。
from pyspark import pipelines as dp
@dp.table
@dp.expect_or_drop("uploaded after Jan 2024", "uploaded_at > '2024-01-01'")
def property_images_amenities_join():
return (
spark.read.table("catalog.schema.property_images")
.join(
spark.read.table("catalog.schema.property_amenities"),
on="property_id",
how="inner"
)
)
テスト :
これらのテストは、結合が正しい行数を生成し、エクスペクテーションが無効なアップロード日のレコードを正常にフィルターで除外することを確認します。
import pytest
from pyspark.pipelines.testing import TestPipeline, test_spark
test_pipeline = TestPipeline.active()
# Mock property datasets
def mock_properties(session):
session.sql("""
CREATE TABLE catalog.schema.property_images AS
SELECT * FROM VALUES
(101, 'img1.jpg', '2024-02-01'),
(102, 'img2.jpg', '2024-01-15'),
(103, 'img3.jpg', '2024-12-20')
AS t(property_id, image_url, uploaded_at)
""")
session.sql("""
CREATE TABLE catalog.schema.property_amenities AS
SELECT * FROM VALUES
(101, 'wifi'),
(102, 'pool'),
(103, 'parking')
AS t(property_id, amenity)
""")
# Test 1: Join
def test_property_join(test_spark):
mock_properties(test_spark)
test_pipeline.run(test_spark, set(["catalog.schema.property_images_amenities_join"]))
result = test_spark.table("catalog.schema.property_images_amenities_join")
# Should have 3 rows after join
assert result.count() == 3
# Check all property_ids are present
property_ids = set(row["property_id"] for row in result.collect())
assert property_ids == {101, 102, 103}
# Test 2: Expectation
def test_property_expectation(test_spark):
mock_properties(test_spark)
# Add a row with uploaded_at before Jan 2024
test_spark.sql("""
INSERT INTO catalog.schema.property_images VALUES (104, 'img4.jpg', '2023-12-31')
""")
# Add a matching row in the amenities table for the join
test_spark.sql("""
INSERT INTO catalog.schema.property_amenities VALUES (104, 'gym')
""")
test_pipeline.run(test_spark, set(["catalog.schema.property_images_amenities_join"]))
result = test_spark.table("catalog.schema.property_images_amenities_join")
# Only property_ids with uploaded_at > '2024-01-01' should be present
valid_ids = set(row["property_id"] for row in result.collect())
assert 104 not in valid_ids
assert valid_ids == {101, 102, 103}