パイプラインのエクスペクテーションによるデータ品質の管理
エクスペクテーションを使用して、ETL パイプラインを流れるデータを検証する品質制約を適用します。 エクスペクテーションにより、データ品質メトリクスに関するより深い知見が得られ、無効なレコードを検出したときに更新を失敗させたり、レコードを削除したりできます。
この記事では、構文の例や動作オプションなど、エクスペクテーションの概要について説明します。 より高度なユース ケースと推奨されるベスト プラクティスについては、「 期待される推奨事項と高度なパターン」を参照してください。
エクスペクテーションとは?
エクスペクテーションは、パイプラインのマテリアライズドビュー、ストリーミング テーブル、またはビュー作成ステートメントの省略可能な句で、クエリを通過する各レコードにデータ品質チェックを適用します。 エクスペクションは、標準の SQL Boolean ステートメントを使用して制約を指定します。 1 つのデータセットに対して複数のエクスペクテーションを組み合わせて、パイプライン内のすべてのデータセット宣言にエクスペクテーションを設定できます。
次のセクションでは、エクスペクテーションの 3 つのコンポーネントを紹介し、構文の例を示します。
期待の名前
各エクスペクテーションには名前が必要であり、これはエクスペクテーションを追跡および監視するための識別子として使用されます。 検証されるメトリクスを伝える名前を選択します。 次の例では、年齢が 0 歳から 120 歳までであることを確認する valid_customer_age
エクスペクテーションを定義しています。
エクスペクテーション名は、特定のデータセットに対して一意である必要があります。 パイプライン内の複数のデータセット間で期待を再利用できます。 「ポータブルで再利用可能な期待」を参照してください。
- Python
- SQL
@dlt.table
@dlt.expect("valid_customer_age", "age BETWEEN 0 AND 120")
def customers():
return spark.readStream.table("datasets.samples.raw_customers")
CREATE OR REFRESH STREAMING TABLE customers(
CONSTRAINT valid_customer_age EXPECT (age BETWEEN 0 AND 120)
) AS SELECT * FROM STREAM(datasets.samples.raw_customers);
評価する制約
制約句は、各レコードに対して true または false に評価される必要がある SQL 条件ステートメントです。 制約には、検証対象の実際のロジックが含まれています。 レコードがこの条件に満たない場合、エクスペクテーションがトリガーされます。
制約には有効な SQL 構文を使用する必要があり、次のものを含めることはできません。
- カスタムPython関数
- 外部サービスコール
- 他のテーブルを参照するサブクエリ
次に、データセット作成ステートメントに追加できる制約の例を示します。
- Python
- SQL
# Simple constraint
@dlt.expect("non_negative_price", "price >= 0")
# SQL functions
@dlt.expect("valid_date", "year(transaction_date) >= 2020")
# CASE statements
@dlt.expect("valid_order_status", """
CASE
WHEN type = 'ORDER' THEN status IN ('PENDING', 'COMPLETED', 'CANCELLED')
WHEN type = 'REFUND' THEN status IN ('PENDING', 'APPROVED', 'REJECTED')
ELSE false
END
""")
# Multiple constraints
@dlt.expect("non_negative_price", "price >= 0")
@dlt.expect("valid_purchase_date", "date <= current_date()")
# Complex business logic
@dlt.expect(
"valid_subscription_dates",
"""start_date <= end_date
AND end_date <= current_date()
AND start_date >= '2020-01-01'"""
)
# Complex boolean logic
@dlt.expect("valid_order_state", """
(status = 'ACTIVE' AND balance > 0)
OR (status = 'PENDING' AND created_date > current_date() - INTERVAL 7 DAYS)
""")
-- Simple constraint
CONSTRAINT non_negative_price EXPECT (price >= 0)
-- SQL functions
CONSTRAINT valid_date EXPECT (year(transaction_date) >= 2020)
-- CASE statements
CONSTRAINT valid_order_status EXPECT (
CASE
WHEN type = 'ORDER' THEN status IN ('PENDING', 'COMPLETED', 'CANCELLED')
WHEN type = 'REFUND' THEN status IN ('PENDING', 'APPROVED', 'REJECTED')
ELSE false
END
)
-- Multiple constraints
CONSTRAINT non_negative_price EXPECT (price >= 0)
CONSTRAINT valid_purchase_date EXPECT (date <= current_date())
-- Complex business logic
CONSTRAINT valid_subscription_dates EXPECT (
start_date <= end_date
AND end_date <= current_date()
AND start_date >= '2020-01-01'
)
-- Complex boolean logic
CONSTRAINT valid_order_state EXPECT (
(status = 'ACTIVE' AND balance > 0)
OR (status = 'PENDING' AND created_date > current_date() - INTERVAL 7 DAYS)
)
無効なレコードに対するアクション
レコードが検証チェックに失敗した場合に何が起こるかを決定するアクションを指定する必要があります。 次の表では、使用可能なアクションについて説明します。
操作 | SQL 構文 | Python 構文 | 結果 |
---|---|---|---|
warn (デフォルト) |
|
| 無効なレコードがターゲットに書き込まれます。 有効なレコードと無効なレコードの数は、他のデータセットメトリクスとともに記録されます。 |
|
| 無効なレコードは、データがターゲットに書き込まれる前に削除されます。 ドロップされたレコードの数は、他のデータセットメトリクスとともにログに記録されます。 | |
|
| レコードが無効な場合、更新は成功しません。 再処理の前に手動による介入が必要です。 この予想により、1 つのフローが失敗し、パイプライン内の他のフローが失敗することはありません。 |
また、データを失敗させたり削除したりすることなく、無効なレコードを隔離する高度なロジックを実装することもできます。 「無効なレコードの検疫」を参照してください。
Expectation tracking メトリクス
パイプライン UI から、 warn
または drop
アクションの追跡メトリクスを確認できます。 fail
により無効なレコードが検出されると更新が失敗するため、メトリクスは記録されません。
エクスペクテーション メトリクスを表示するには、以下のステップを実行します。
- サイドバーの 「DLT 」をクリックします。
- パイプラインの 名前 をクリックします。
- エクスペクテーションが定義されているデータセットをクリックします。
- 右側のサイドバーにある データ品質 タブを選択します。
データ品質メトリクスを表示するには、DLT イベント・ログを照会します。「イベント ログからのデータ品質のクエリ」を参照してください。
無効なレコードを保持する
無効なレコードの保持は、エクスペクテーションのデフォルトの動作です。 expect
演算子は、エクスペクテーションに反するレコードを保持しながら、制約に合格したレコードまたは失敗したレコードの数に関するメトリクスを収集する場合に使用します。期待に違反するレコードは、有効なレコードとともにターゲットデータセットに追加されます:
- Python
- SQL
@dlt.expect("valid timestamp", "timestamp > '2012-01-01'")
CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')
無効なレコードを削除する
無効なレコードがそれ以上処理されないようにするには、expect_or_drop
演算子を使用します。期待に反するレコードは、ターゲットデータセットから削除されます:
- Python
- SQL
@dlt.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW
無効なレコードで失敗する
無効なレコードを許容できない場合は、expect_or_fail
演算子を使用して、レコードがバリデーションに失敗したときに直ちに実行を停止します。操作がテーブル更新の場合、システムはトランザクションをアトミックにロールバックする:
- Python
- SQL
@dlt.expect_or_fail("valid_count", "count > 0")
CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE
パイプラインに複数の並列フローが定義されている場合、1 つのフローで障害が発生しても、他のフローが失敗することはありません。
エクスペクテーションから失敗した更新のトラブルシューティング
エクスペクテーション違反によりパイプラインが失敗した場合は、パイプラインを再実行する前に、無効なデータを正しく処理できるようにパイプラインコードを修正する必要があります。
パイプラインが失敗するように構成されたエクスペクテーションでは、違反の検出と報告に必要な情報を追跡するために、変換の Spark クエリ プランが変更されます。 この情報を使用して、多くのクエリで違反の原因となった入力レコードを特定できます。 以下は、エクスペクテーションの例です。
Expectation Violated:
{
"flowName": "sensor-pipeline",
"verboseInfo": {
"expectationsViolated": [
"temperature_in_valid_range"
],
"inputData": {
"id": "TEMP_001",
"temperature": -500,
"timestamp_ms": "1710498600"
},
"outputRecord": {
"sensor_id": "TEMP_001",
"temperature": -500,
"change_time": "2024-03-15 10:30:00"
},
"missingInputData": false
}
}
複数の期待管理
SQL と Python はどちらも 1 つのデータセット内で複数の期待をサポートしていますが、複数の個別のエクスペクテーションをグループ化して集合的なアクションを指定できるのは Python だけです。
複数の期待をグループ化し、 expect_all
、 expect_all_or_drop
、および expect_all_or_fail
機能を使用して集合アクションを指定できます。
これらのデコレータは、Python ディクショナリを引数として受け入れ、キーはエクスペクテーションの名前で、値はエクスペクテーション制約です。 パイプライン内の複数のデータセットで同じ期待セットを再利用できます。 次に、 expect_all
Python の各演算子の例を示します。
valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}
@dlt.table
@dlt.expect_all(valid_pages)
def raw_data():
# Create a raw dataset
@dlt.table
@dlt.expect_all_or_drop(valid_pages)
def prepared_data():
# Create a cleaned and prepared dataset
@dlt.table
@dlt.expect_all_or_fail(valid_pages)
def customer_facing_data():
# Create cleaned and prepared to share the dataset