メインコンテンツまでスキップ

パイプラインのエクスペクテーションによるデータ品質の管理

エクスペクテーションを使用して、ETL パイプラインを流れるデータを検証する品質制約を適用します。 エクスペクテーションにより、データ品質メトリクスに関するより深い知見が得られ、無効なレコードを検出したときに更新を失敗させたり、レコードを削除したりできます。

この記事では、構文の例や動作オプションなど、期待値の概要について説明します。より高度なユース ケースと推奨されるベスト プラクティスについては、「 期待される推奨事項と高度なパターン」を参照してください。

LakeFlow 宣言型パイプラインの期待フローグラフ

エクスペクテーションとは?

エクスペクテーションは、パイプラインのマテリアライズドビュー、ストリーミング テーブル、またはビュー作成ステートメントの省略可能な句で、クエリを通過する各レコードにデータ品質チェックを適用します。 エクスペクションは、標準の SQL Boolean ステートメントを使用して制約を指定します。 1 つのデータセットに対して複数のエクスペクテーションを組み合わせて、パイプライン内のすべてのデータセット宣言にエクスペクテーションを設定できます。

次のセクションでは、エクスペクテーションの 3 つのコンポーネントを紹介し、構文の例を示します。

エクスペクテーションの名前

各エクスペクテーションには名前が必要であり、これはエクスペクテーションを追跡および監視するための識別子として使用されます。 検証されるメトリクスを伝える名前を選択します。 次の例では、年齢が 0 歳から 120 歳までであることを確認する valid_customer_age エクスペクテーションを定義しています。

important

エクスペクテーション名は、特定のデータセットに対して一意である必要があります。 パイプライン内の複数のデータセット間で期待を再利用できます。 ポータブルで再利用可能な期待を参照してください。

Python
@dlt.table
@dlt.expect("valid_customer_age", "age BETWEEN 0 AND 120")
def customers():
return spark.readStream.table("datasets.samples.raw_customers")

評価のための制約

制約句は、各レコードに対して true または false に評価される必要がある SQL 条件ステートメントです。 制約には、検証対象の実際のロジックが含まれています。 レコードがこの条件に満たない場合、エクスペクテーションがトリガーされます。

制約には有効な SQL 構文を使用する必要があり、次のものを含めることはできません。

  • カスタムPython関数
  • 外部サービスコール
  • 他のテーブルを参照するサブクエリ

次に、データセット作成ステートメントに追加できる制約の例を示します。

Python での制約の構文は次のとおりです。

Python
@dlt.expect(<constraint-name>, <constraint-clause>)

複数の制約を指定できます。

Python
@dlt.expect(<constraint-name>, <constraint-clause>)
@dlt.expect(<constraint2-name>, <constraint2-clause>)

例:

Python
# Simple constraint
@dlt.expect("non_negative_price", "price >= 0")

# SQL functions
@dlt.expect("valid_date", "year(transaction_date) >= 2020")

# CASE statements
@dlt.expect("valid_order_status", """
CASE
WHEN type = 'ORDER' THEN status IN ('PENDING', 'COMPLETED', 'CANCELLED')
WHEN type = 'REFUND' THEN status IN ('PENDING', 'APPROVED', 'REJECTED')
ELSE false
END
""")

# Multiple constraints
@dlt.expect("non_negative_price", "price >= 0")
@dlt.expect("valid_purchase_date", "date <= current_date()")

# Complex business logic
@dlt.expect(
"valid_subscription_dates",
"""start_date <= end_date
AND end_date <= current_date()
AND start_date >= '2020-01-01'"""
)

# Complex boolean logic
@dlt.expect("valid_order_state", """
(status = 'ACTIVE' AND balance > 0)
OR (status = 'PENDING' AND created_date > current_date() - INTERVAL 7 DAYS)
""")

無効なレコードに対するアクション

レコードが検証チェックに失敗した場合に何が起こるかを決定するアクションを指定する必要があります。 次の表では、使用可能なアクションについて説明します。

操作

SQL 構文

Python 構文

結果

warn (デフォルト)

EXPECT

dlt.expect

無効なレコードがターゲットに書き込まれます。

drop

EXPECT ... ON VIOLATION DROP ROW

dlt.expect_or_drop

無効なレコードは、データがターゲットに書き込まれる前に削除されます。 ドロップされたレコードの数は、他のデータセットメトリクスとともにログに記録されます。

fail

EXPECT ... ON VIOLATION FAIL UPDATE

dlt.expect_or_fail

レコードが無効な場合、更新は成功しません。 再処理の前に手動による介入が必要です。 この予想により、1 つのフローが失敗し、パイプライン内の他のフローが失敗することはありません。

また、データを失敗させたり削除したりすることなく、無効なレコードを隔離する高度なロジックを実装することもできます。 無効なレコードの検疫を参照してください。

エクスペクテーションの追跡メトリクス

パイプライン UI から、 warn または drop アクションの追跡メトリクスを確認できます。 fail により無効なレコードが検出されると更新が失敗するため、メトリクスは記録されません。

エクスペクテーション メトリクスを表示するには、以下のステップを実行します。

  1. Databricks ワークスペースのサイドバーで、[ ジョブとパイプライン] をクリックします。
  2. パイプラインの 名前 をクリックします。
  3. エクスペクテーションが定義されているデータセットをクリックします。
  4. 右側のサイドバーにある データ品質 タブを選択します。

データ品質メトリクスを表示するには、 LakeFlow 宣言型パイプライン イベント ログをクエリします。 「イベント ログからのデータ品質のクエリ」を参照してください。

無効なレコードを保持する

無効なレコードの保持は、エクスペクテーションのデフォルトの動作です。 expect演算子は、エクスペクテーションに反するレコードを保持しながら、制約に合格したレコードまたは失敗したレコードの数に関するメトリクスを収集する場合に使用します。期待に違反するレコードは、有効なレコードとともにターゲットデータセットに追加されます:

Python
@dlt.expect("valid timestamp", "timestamp > '2012-01-01'")

無効なレコードを削除する

無効なレコードがそれ以上処理されないようにするには、expect_or_drop演算子を使用します。期待に反するレコードは、ターゲットデータセットから削除されます:

Python
@dlt.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")

無効なレコードで失敗する

無効なレコードを許容できない場合は、expect_or_fail演算子を使用して、レコードがバリデーションに失敗したときに直ちに実行を停止します。操作がテーブル更新の場合、システムはトランザクションをアトミックにロールバックする:

Python
@dlt.expect_or_fail("valid_count", "count > 0")
important

パイプラインに複数の並列フローが定義されている場合、1 つのフローで障害が発生しても、他のフローが失敗することはありません。

LakeFlow 宣言型パイプライン フローの失敗説明グラフ

エクスペクテーションから失敗した更新のトラブルシューティング

エクスペクテーション違反によりパイプラインが失敗した場合は、パイプラインを再実行する前に、無効なデータを正しく処理できるようにパイプラインコードを修正する必要があります。

パイプラインが失敗するように構成された期待値では、違反の検出と報告に必要な情報を追跡するために、変換の Spark クエリ プランが変更されます。この情報を使用して、多くのクエリで違反の原因となった入力レコードを特定できます。LakeFlow 宣言型パイプラインには、このような違反を報告するための専用のエラー メッセージが表示されます。 期待違反のエラー メッセージの例を次に示します。

Console
[EXPECTATION_VIOLATION.VERBOSITY_ALL] Flow 'sensor-pipeline' failed to meet the expectation. Violated expectations: 'temperature_in_valid_range'. Input data: '{"id":"TEMP_001","temperature":-500,"timestamp_ms":"1710498600"}'. Output record: '{"sensor_id":"TEMP_001","temperature":-500,"change_time":"2024-03-15 10:30:00"}'. Missing input data: false

複数のエクスペクテーションの管理

注記

SQL と Python はどちらも 1 つのデータセットで複数の期待値をサポートしていますが、複数の期待値をグループ化して集合的なアクションを指定できるのは Python だけです。

LakeFlow 複数の期待値を持つ宣言型パイプライン fLow グラフ

複数のエクスペクテーションをグループ化し、 expect_allexpect_all_or_drop、および expect_all_or_fail機能を使用して集合アクションを指定できます。

これらのデコレータは、Python ディクショナリを引数として受け入れ、キーはエクスペクテーションの名前で、値はエクスペクテーション制約です。 パイプライン内の複数のデータセットで同じエクスペクテーションセットを再利用できます。 次に、 expect_all Python の各演算子の例を示します。

Python
valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}

@dlt.table
@dlt.expect_all(valid_pages)
def raw_data():
# Create a raw dataset

@dlt.table
@dlt.expect_all_or_drop(valid_pages)
def prepared_data():
# Create a cleaned and prepared dataset

@dlt.table
@dlt.expect_all_or_fail(valid_pages)
def customer_facing_data():
# Create cleaned and prepared to share the dataset

制限

  • ストリーミングテーブルとマテリアライズドビューのみが期待値をサポートしているため、データ品質メトリクスはこれらのオブジェクトタイプでのみサポートされます。

  • Data Quality メトリクスは、次の場合は使用できません。

    • クエリには期待値が定義されていません。
    • フローは、期待をサポートしていない演算子を使用しています。
    • LakeFlow 宣言型パイプライン シンクなどのフロー タイプは、期待をサポートしていません。
    • 特定のフロー実行に関連付けられたストリーミングテーブルまたはマテリアライズドビューには更新がありません。
    • パイプライン設定には、 pipelines.metrics.flowTimeReporter.enabledなどのメトリクスをキャプチャするために必要な設定は含まれていません。
  • 場合によっては、 COMPLETED フローにメトリクスが含まれていないことがあります。代わりに、メトリクスは flow_progress イベントの各マイクロバッチでステータスが RUNNINGで報告されます。