メインコンテンツまでスキップ

パイプラインのエクスペクテーションによるデータ品質の管理

エクスペクテーションを使用して、ETL パイプラインを流れるデータを検証する品質制約を適用します。 エクスペクテーションにより、データ品質メトリクスに関するより深い知見が得られ、無効なレコードを検出したときに更新を失敗させたり、レコードを削除したりできます。

この記事では、構文の例や動作オプションなど、エクスペクテーションの概要について説明します。 より高度なユース ケースと推奨されるベスト プラクティスについては、「 期待される推奨事項と高度なパターン」を参照してください。

DLT 期待フローグラフ

エクスペクテーションとは?

エクスペクテーションは、パイプラインのマテリアライズドビュー、ストリーミング テーブル、またはビュー作成ステートメントの省略可能な句で、クエリを通過する各レコードにデータ品質チェックを適用します。 エクスペクションは、標準の SQL Boolean ステートメントを使用して制約を指定します。 1 つのデータセットに対して複数のエクスペクテーションを組み合わせて、パイプライン内のすべてのデータセット宣言にエクスペクテーションを設定できます。

次のセクションでは、エクスペクテーションの 3 つのコンポーネントを紹介し、構文の例を示します。

期待の名前

各エクスペクテーションには名前が必要であり、これはエクスペクテーションを追跡および監視するための識別子として使用されます。 検証されるメトリクスを伝える名前を選択します。 次の例では、年齢が 0 歳から 120 歳までであることを確認する valid_customer_age エクスペクテーションを定義しています。

important

エクスペクテーション名は、特定のデータセットに対して一意である必要があります。 パイプライン内の複数のデータセット間で期待を再利用できます。 「ポータブルで再利用可能な期待」を参照してください。

Python
@dlt.table
@dlt.expect("valid_customer_age", "age BETWEEN 0 AND 120")
def customers():
return spark.readStream.table("datasets.samples.raw_customers")

評価する制約

制約句は、各レコードに対して true または false に評価される必要がある SQL 条件ステートメントです。 制約には、検証対象の実際のロジックが含まれています。 レコードがこの条件に満たない場合、エクスペクテーションがトリガーされます。

制約には有効な SQL 構文を使用する必要があり、次のものを含めることはできません。

  • カスタムPython関数
  • 外部サービスコール
  • 他のテーブルを参照するサブクエリ

次に、データセット作成ステートメントに追加できる制約の例を示します。

Python
# Simple constraint
@dlt.expect("non_negative_price", "price >= 0")

# SQL functions
@dlt.expect("valid_date", "year(transaction_date) >= 2020")

# CASE statements
@dlt.expect("valid_order_status", """
CASE
WHEN type = 'ORDER' THEN status IN ('PENDING', 'COMPLETED', 'CANCELLED')
WHEN type = 'REFUND' THEN status IN ('PENDING', 'APPROVED', 'REJECTED')
ELSE false
END
""")

# Multiple constraints
@dlt.expect("non_negative_price", "price >= 0")
@dlt.expect("valid_purchase_date", "date <= current_date()")

# Complex business logic
@dlt.expect(
"valid_subscription_dates",
"""start_date <= end_date
AND end_date <= current_date()
AND start_date >= '2020-01-01'"""
)

# Complex boolean logic
@dlt.expect("valid_order_state", """
(status = 'ACTIVE' AND balance > 0)
OR (status = 'PENDING' AND created_date > current_date() - INTERVAL 7 DAYS)
""")

無効なレコードに対するアクション

レコードが検証チェックに失敗した場合に何が起こるかを決定するアクションを指定する必要があります。 次の表では、使用可能なアクションについて説明します。

操作

SQL 構文

Python 構文

結果

warn (デフォルト)

EXPECT

dlt.expect

無効なレコードがターゲットに書き込まれます。 有効なレコードと無効なレコードの数は、他のデータセットメトリクスとともに記録されます。

drop

EXPECT ... ON VIOLATION DROP ROW

dlt.expect_or_drop

無効なレコードは、データがターゲットに書き込まれる前に削除されます。 ドロップされたレコードの数は、他のデータセットメトリクスとともにログに記録されます。

fail

EXPECT ... ON VIOLATION FAIL UPDATE

dlt.expect_or_fail

レコードが無効な場合、更新は成功しません。 再処理の前に手動による介入が必要です。 この予想により、1 つのフローが失敗し、パイプライン内の他のフローが失敗することはありません。

また、データを失敗させたり削除したりすることなく、無効なレコードを隔離する高度なロジックを実装することもできます。 「無効なレコードの検疫」を参照してください。

Expectation tracking メトリクス

パイプライン UI から、 warn または drop アクションの追跡メトリクスを確認できます。 fail により無効なレコードが検出されると更新が失敗するため、メトリクスは記録されません。

エクスペクテーション メトリクスを表示するには、以下のステップを実行します。

  1. サイドバーの 「DLT 」をクリックします。
  2. パイプラインの 名前 をクリックします。
  3. エクスペクテーションが定義されているデータセットをクリックします。
  4. 右側のサイドバーにある データ品質 タブを選択します。

データ品質メトリクスを表示するには、DLT イベント・ログを照会します。「イベント ログからのデータ品質のクエリ」を参照してください。

無効なレコードを保持する

無効なレコードの保持は、エクスペクテーションのデフォルトの動作です。 expect演算子は、エクスペクテーションに反するレコードを保持しながら、制約に合格したレコードまたは失敗したレコードの数に関するメトリクスを収集する場合に使用します。期待に違反するレコードは、有効なレコードとともにターゲットデータセットに追加されます:

Python
@dlt.expect("valid timestamp", "timestamp > '2012-01-01'")

無効なレコードを削除する

無効なレコードがそれ以上処理されないようにするには、expect_or_drop演算子を使用します。期待に反するレコードは、ターゲットデータセットから削除されます:

Python
@dlt.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")

無効なレコードで失敗する

無効なレコードを許容できない場合は、expect_or_fail演算子を使用して、レコードがバリデーションに失敗したときに直ちに実行を停止します。操作がテーブル更新の場合、システムはトランザクションをアトミックにロールバックする:

Python
@dlt.expect_or_fail("valid_count", "count > 0")
important

パイプラインに複数の並列フローが定義されている場合、1 つのフローで障害が発生しても、他のフローが失敗することはありません。

DLTフロー失敗説明グラフ

エクスペクテーションから失敗した更新のトラブルシューティング

エクスペクテーション違反によりパイプラインが失敗した場合は、パイプラインを再実行する前に、無効なデータを正しく処理できるようにパイプラインコードを修正する必要があります。

パイプラインが失敗するように構成されたエクスペクテーションでは、違反の検出と報告に必要な情報を追跡するために、変換の Spark クエリ プランが変更されます。 この情報を使用して、多くのクエリで違反の原因となった入力レコードを特定できます。 以下は、エクスペクテーションの例です。

Console
Expectation Violated:
{
"flowName": "sensor-pipeline",
"verboseInfo": {
"expectationsViolated": [
"temperature_in_valid_range"
],
"inputData": {
"id": "TEMP_001",
"temperature": -500,
"timestamp_ms": "1710498600"
},
"outputRecord": {
"sensor_id": "TEMP_001",
"temperature": -500,
"change_time": "2024-03-15 10:30:00"
},
"missingInputData": false
}
}

複数の期待管理

注記

SQL と Python はどちらも 1 つのデータセット内で複数の期待をサポートしていますが、複数の個別のエクスペクテーションをグループ化して集合的なアクションを指定できるのは Python だけです。

複数の期待を持つDLT fLowグラフ

複数の期待をグループ化し、 expect_allexpect_all_or_drop、および expect_all_or_fail機能を使用して集合アクションを指定できます。

これらのデコレータは、Python ディクショナリを引数として受け入れ、キーはエクスペクテーションの名前で、値はエクスペクテーション制約です。 パイプライン内の複数のデータセットで同じ期待セットを再利用できます。 次に、 expect_all Python の各演算子の例を示します。

Python
valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}

@dlt.table
@dlt.expect_all(valid_pages)
def raw_data():
# Create a raw dataset

@dlt.table
@dlt.expect_all_or_drop(valid_pages)
def prepared_data():
# Create a cleaned and prepared dataset

@dlt.table
@dlt.expect_all_or_fail(valid_pages)
def customer_facing_data():
# Create cleaned and prepared to share the dataset