パイプラインのエクスペクテーションによるデータ品質の管理

エクスペクテーションを使用して、ETL パイプラインを流れるデータを検証する品質制約を適用します。 エクスペクテーションにより、データ品質メトリクスに関するより深い知見が得られ、無効なレコードを検出したときに更新を失敗させたり、レコードを削除したりできます。

この記事では、構文の例や動作オプションなど、エクスペクテーションの概要について説明します。 より高度なユース ケースと推奨されるベスト プラクティスについては、エクスペクテーションの推奨事項と高度なパターンを参照してください。

Delta Live Tables のエクスペクテーションフロー グラフ

エクスペクテーションとは?

エクスペクテーションは、パイプラインのマテリアライズドビュー、ストリーミング テーブル、またはビュー作成ステートメントの省略可能な句で、クエリを通過する各レコードにデータ品質チェックを適用します。 エクスペクションは、標準の SQL Boolean ステートメントを使用して制約を指定します。 1 つのデータセットに対して複数のエクスペクテーションを組み合わせて、パイプライン内のすべてのデータセット宣言にエクスペクテーションを設定できます。

次のセクションでは、エクスペクテーションの 3 つのコンポーネントを紹介し、構文の例を示します。

エクスペクテーションの名前

各エクスペクテーションには名前が必要であり、これはエクスペクテーションを追跡および監視するための識別子として使用されます。 検証されるメトリクスを伝える名前を選択します。 次の例では、年齢が 0 歳から 120 歳までであることを確認する valid_customer_age エクスペクテーションを定義しています。

重要

エクスペクテーション名は、特定のデータセットに対して一意である必要があります。 パイプライン内の複数のデータセット間で期待を再利用できます。 ポータブルで再利用可能なエクスペクテーションを参照してください。

@dlt.table
@dlt.expect("valid_customer_age", "age BETWEEN 0 AND 120")
def customers():
  return spark.readStream.table("datasets.samples.raw_customers")
CREATE OR REFRESH STREAMING TABLE customers(
  CONSTRAINT valid_customer_age EXPECT (age BETWEEN 0 AND 120)
) AS SELECT * FROM STREAM(datasets.samples.raw_customers);

評価する制約

制約句は、各レコードに対して true または false に評価される必要がある SQL 条件ステートメントです。 制約には、検証対象の実際のロジックが含まれています。 レコードがこの条件に満たない場合、エクスペクテーションがトリガーされます。

制約には有効な SQL 構文を使用する必要があり、次のものを含めることはできません。

  • カスタムPython関数

  • 外部サービスコール

  • 他のテーブルを参照するサブクエリ

次に、データセット作成ステートメントに追加できる制約の例を示します。

# Simple constraint
@dlt.expect("non_negative_price", "price >= 0")

# SQL functions
@dlt.expect("valid_date", "year(transaction_date) >= 2020")

# CASE statements
@dlt.expect("valid_order_status", """
   CASE
     WHEN type = 'ORDER' THEN status IN ('PENDING', 'COMPLETED', 'CANCELLED')
     WHEN type = 'REFUND' THEN status IN ('PENDING', 'APPROVED', 'REJECTED')
     ELSE false
   END
""")

# Multiple constraints
@dlt.expect("non_negative_price", "price >= 0")
@dlt.expect("valid_purchase_date", "date <= current_date()")

# Complex business logic
@dlt.expect(
  "valid_subscription_dates",
  """start_date <= end_date
    AND end_date <= current_date()
    AND start_date >= '2020-01-01'"""
)

# Complex boolean logic
@dlt.expect("valid_order_state", """
   (status = 'ACTIVE' AND balance > 0)
   OR (status = 'PENDING' AND created_date > current_date() - INTERVAL 7 DAYS)
""")
-- Simple constraint
CONSTRAINT non_negative_price EXPECT (price >= 0)

-- SQL functions
CONSTRAINT valid_date EXPECT (year(transaction_date) >= 2020)

-- CASE statements
CONSTRAINT valid_order_status EXPECT (
  CASE
    WHEN type = 'ORDER' THEN status IN ('PENDING', 'COMPLETED', 'CANCELLED')
    WHEN type = 'REFUND' THEN status IN ('PENDING', 'APPROVED', 'REJECTED')
    ELSE false
  END
)

-- Multiple constraints
CONSTRAINT non_negative_price EXPECT (price >= 0)
CONSTRAINT valid_purchase_date EXPECT (date <= current_date())

-- Complex business logic
CONSTRAINT valid_subscription_dates EXPECT (
  start_date <= end_date
  AND end_date <= current_date()
  AND start_date >= '2020-01-01'
)

-- Complex boolean logic
CONSTRAINT valid_order_state EXPECT (
  (status = 'ACTIVE' AND balance > 0)
  OR (status = 'PENDING' AND created_date > current_date() - INTERVAL 7 DAYS)
)

無効なレコードに対するアクション

レコードが検証チェックに失敗した場合に何が起こるかを決定するアクションを指定する必要があります。 次の表では、使用可能なアクションについて説明します。

操作

SQL 構文

Python 構文

結果

warn (デフォルト)

EXPECT

dlt.expect

無効なレコードがターゲットに書き込まれます。 有効なレコードと無効なレコードの数は、他のデータセットメトリクスとともに記録されます。

drop

EXPECT ... ON VIOLATION DROP ROW

dlt.expect_or_drop

無効なレコードは、データがターゲットに書き込まれる前に削除されます。 ドロップされたレコードの数は、他のデータセットメトリクスとともにログに記録されます。

fail

EXPECT ... ON VIOLATION FAIL UPDATE

dlt.expect_or_fail

レコードが無効な場合、更新は成功しません。 再処理の前に手動による介入が必要です。 この予想により、1 つのフローが失敗し、パイプライン内の他のフローが失敗することはありません。

また、データを失敗させたり削除したりすることなく、無効なレコードを隔離する高度なロジックを実装することもできます。 無効なレコードの検疫を参照してください。

エクスペクテーション追跡メトリクス

パイプライン UI から、 warn または drop アクションの追跡メトリクスを確認できます。 fail により無効なレコードが検出されると更新が失敗するため、メトリクスは記録されません。

エクスペクテーション メトリクスを表示するには、以下のステップを実行します。

  1. サイドバーの Delta Live Tables をクリックします。

  2. パイプラインの 名前をクリックします。

  3. エクスペクテーションが定義されているデータセットをクリックします。

  4. 右側のサイドバーにある データ品質タブを選択します。

データ品質メトリクスを表示するには、 Delta Live Tables イベント ログをクエリします。 「イベント ログからのデータ品質のクエリ」を参照してください。

無効な記録を保持する

無効なレコードの保持は、エクスペクテーションのデフォルトの動作です。 expect演算子は、エクスペクテーションに反するレコードを保持しながら、制約に合格したレコードまたは失敗したレコードの数に関するメトリクスを収集する場合に使用します。エクスペクテーションに反するレコードは、有効なレコードとともにターゲット データセットに追加されます。

@dlt.expect("valid timestamp", "timestamp > '2012-01-01'")
CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')

無効なレコードを削除する

無効なレコードがそれ以上処理されないようにするには、expect_or_drop演算子を使用します。エクスペクテーションに反するレコードは、ターゲットデータセットから削除されます:

@dlt.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW

無効なレコードで失敗

無効なレコードを許容できない場合は、expect_or_fail演算子を使用して、レコードがバリデーションに失敗したときに直ちに実行を停止します。操作がテーブル更新の場合、システムはトランザクションをアトミックにロールバックする:

@dlt.expect_or_fail("valid_count", "count > 0")
CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE

重要

パイプラインに複数の並列フローが定義されている場合、1 つのフローで障害が発生しても、他のフローが失敗することはありません。

Delta Live Tables フローの失敗の説明グラフ

期待値から失敗した更新のトラブルシューティング

エクスペクテーション違反によりパイプラインが失敗した場合は、パイプラインを再実行する前に、無効なデータを正しく処理できるようにパイプラインコードを修正する必要があります。

パイプラインが失敗するように構成されたエクスペクテーションでは、違反の検出と報告に必要な情報を追跡するために、変換の Spark クエリ プランが変更されます。 この情報を使用して、多くのクエリで違反の原因となった入力レコードを特定できます。 以下は、エクスペクテーションの例です。

Expectation Violated:
{
  "flowName": "sensor-pipeline",
  "verboseInfo": {
    "expectationsViolated": [
      "temperature_in_valid_range"
    ],
    "inputData": {
      "id": "TEMP_001",
      "temperature": -500,
      "timestamp_ms": "1710498600"
    },
    "outputRecord": {
      "sensor_id": "TEMP_001",
      "temperature": -500,
      "change_time": "2024-03-15 10:30:00"
    },
    "missingInputData": false
  }
}

複数の期待管理

SQL と Python はどちらも 1 つのデータセット内で複数の期待をサポートしていますが、複数の個別のエクスペクテーションをグループ化して集合的なアクションを指定できるのは Python だけです。

複数の期待を持つDelta Live Tables fLowグラフ

複数の期待をグループ化し、 expect_allexpect_all_or_drop、および expect_all_or_fail機能を使用して集合アクションを指定できます。

これらのデコレータは、Python ディクショナリを引数として受け入れ、キーはエクスペクテーションの名前で、値はエクスペクテーション制約です。 パイプライン内の複数のデータセットで同じ期待セットを再利用できます。 次に、 expect_all Python の各演算子の例を示します。

valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}

@dlt.table
@dlt.expect_all(valid_pages)
def raw_data():
  # Create a raw dataset

@dlt.table
@dlt.expect_all_or_drop(valid_pages)
def prepared_data():
  # Create a cleaned and prepared dataset

@dlt.table
@dlt.expect_all_or_fail(valid_pages)
def customer_facing_data():
  # Create cleaned and prepared to share the dataset