Delta Live Tablesでデータ品質を管理する

エクスペクテーションを使用して、データセットの内容に対するデータ品質制約を定義します。エクスペクテーションにより、テーブルに到着するデータがデータ品質要件を満たしていることを保証し、パイプライン更新ごとにデータ品質についての洞察を得ることができます。PythonデコレータまたはSQL制約句を使用して、クエリーにエクスペクテーションを適用します。

Delta Live Tablesのエクスペクテーションとは?

エクスペクテーションとは、Delta Live Tablesデータセット宣言に追加するオプションの句で、クエリーを通過する各レコードにデータ品質チェックを適用します。

期待値は、次の3つの要素で構成されます:

  • 説明。一意の識別子として機能し、制約のメトリックを追跡できるようにします。

  • 指定された条件に基づいて常に真または偽を返すブーリアンテートメント。

  • レコードが期待を裏切った場合に実行するアクション。ブーリアン値がfalseを返すことを意味します。

次のマトリックスは、無効なレコードに適用できる3つのアクションを示しています:

操作

結果

warn (デフォルト)

無効なレコードはターゲットに書き込まれ、失敗はデータセットのメトリックとして報告される。

drop

無効なレコードは、データがターゲットに書き込まれる前に削除されます。失敗はデータセットのメトリクスとして報告されます。

fail

無効なレコードがあると、更新が成功しません。再処理の前に手動による介入が必要です。

Delta Live Tablesイベントログをクエリーすることで、期待に反するレコードの数などのデータ品質メトリクスを表示できます。「Delta Live Tablesパイプラインの監視」を参照してください。

Delta Live Tablesデータセット宣言構文の完全なリファレンスは、「Delta Live Tables Python言語リファレンス」または「Delta Live Tables SQL言語リファレンス」を参照してください。

  • 任意エクスペクテーションに複数の句を含めることができますが、複数のエクスペクテーションに基づくアクションの定義をサポートしているのは Python だけです。 「複数のエクスペクテーション」を参照してください。

  • エクスペクテーションは、SQL 式を使用して定義する必要があります。 エクスペクテーションを定義するときに、SQL 以外の構文 (Python 関数など) を使用することはできません。

無効な記録を保持する

期待に反するレコードを保持したい場合は、expect演算子を使用します。期待に違反するレコードは、有効なレコードとともにターゲットデータセットに追加されます:

@dlt.expect("valid timestamp", "timestamp > '2012-01-01'")
CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')

無効なレコードを削除する

無効なレコードがそれ以上処理されないようにするには、expect or drop演算子を使用します。期待に反するレコードは、ターゲットデータセットから削除されます:

@dlt.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW

無効なレコードで失敗

無効なレコードを許容できない場合は、expect or fail演算子を使用して、レコードがバリデーションに失敗したときに直ちに実行を停止します。操作がテーブル更新の場合、システムはトランザクションをアトミックにロールバックする:

@dlt.expect_or_fail("valid_count", "count > 0")
CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE

重要

パイプラインに複数の並列フローが定義されている場合、1 つのフローで障害が発生しても、他のフローが失敗することはありません。

期待値違反によりパイプラインが失敗した場合は、パイプラインを再実行する前に、無効なデータを正しく処理できるようにパイプラインコードを修正する必要があります。

エクスペクテーションに失敗すると、変換のSparkクエリープランが変更され、違反の検出とレポートに必要な情報が追跡されます。多くのクエリーでは、この情報を使用して、違反が発生した入力レコードを特定できます。例外の例を次に示します:

Expectation Violated:
{
  "flowName": "a-b",
  "verboseInfo": {
    "expectationsViolated": [
      "x1 is negative"
    ],
    "inputData": {
      "a": {"x1": 1,"y1": "a },
      "b": {
        "x2": 1,
        "y2": "aa"
      }
    },
    "outputRecord": {
      "x1": 1,
      "y1": "a",
      "x2": 1,
      "y2": "aa"
    },
    "missingInputData": false
  }
}

複数のエクスペクテーション

Pythonパイプラインで1つ以上のデータ品質制約を使用してエクスペクテーションを定義できます。これらのデコレータは、Pythonディクショナリを引数として受け入れ、キーはエクスペクテーションの名前、値はエクスペクテーション制約です。

検証に失敗したレコードを対象データセットに含める必要がある場合、複数のデータ品質制約を指定するにはexpect_allを使用します:

@dlt.expect_all({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})

expect_all_or_dropを使用して、検証に失敗したレコードをターゲットデータセットから削除する必要がある場合に、複数のデータ品質制約を指定します:

@dlt.expect_all_or_drop({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})

expect_all_or_failを使用して、検証に失敗したレコードがパイプラインの実行を停止する必要がある場合に、複数のデータ品質制約を指定します:

@dlt.expect_all_or_fail({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})

エクスペクテーションのコレクションを変数として定義し、それをパイプライン内の1つ以上のクエリーに渡すこともできます:

valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}

@dlt.table
@dlt.expect_all(valid_pages)
def raw_data():
  # Create raw dataset

@dlt.table
@dlt.expect_all_or_drop(valid_pages)
def prepared_data():
  # Create cleaned and prepared dataset

無効なデータを隔離する

次の例では、一時テーブルおよびビューと組み合わせてエクスペクテーションを使用します。このパターンは、パイプラインの更新中にエクスペクテーションのチェックに合格したレコードのメトリクスを提供し、さまざまなダウンストリームパスを通じて有効なレコードと無効なレコードを処理する方法を提供します。

この例では 、Databricks データセットに含まれるサンプル データを読み取ります。 Databricks データセットは Unity Catalogにパブリッシュするパイプラインではサポートされていないため、この例は Hive metastoreにパブリッシュするように設定されたパイプラインでのみ機能します。ただし、このパターンは Unity Catalog 対応パイプラインでも機能しますが、 外部ロケーションからデータを読み取る必要があります。 Delta Live Tables で Unity Catalog を使用する方法の詳細については、「 Delta Live Tables パイプラインで Unity Catalog を使用する」を参照してください。

import dlt
from pyspark.sql.functions import expr

rules = {}
rules["valid_website"] = "(Website IS NOT NULL)"
rules["valid_location"] = "(Location IS NOT NULL)"
quarantine_rules = "NOT({0})".format(" AND ".join(rules.values()))

@dlt.table(
  name="raw_farmers_market"
)
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="farmers_market_quarantine",
  temporary=True,
  partition_cols=["is_quarantined"]
)
@dlt.expect_all(rules)
def farmers_market_quarantine():
  return (
    dlt.read("raw_farmers_market")
      .select("MarketName", "Website", "Location", "State",
              "Facebook", "Twitter", "Youtube", "Organic", "updateTime")
      .withColumn("is_quarantined", expr(quarantine_rules))
  )

@dlt.view(
  name="valid_farmers_market"
)
def get_valid_farmers_market():
  return (
    dlt.read("farmers_market_quarantine")
      .filter("is_quarantined=false")
  )

@dlt.view(
  name="invalid_farmers_market"
)
def get_invalid_farmers_market():
  return (
    dlt.read("farmers_market_quarantine")
      .filter("is_quarantined=true")
  )

テーブル全体の行数を検証する

2 つのマテリアライズド ビューまたはストリーミング テーブル間の行数を比較するための期待値を定義する追加のテーブルをパイプラインに追加できます。 この期待の結果は、イベント ログと Delta Live Tables UI に表示されます。 次の例では、 tbla テーブルと tblb テーブルの間で等しい行数を検証します。

CREATE OR REFRESH MATERIALIZED VIEW count_verification(
  CONSTRAINT no_rows_dropped EXPECT (a_count == b_count)
) AS SELECT * FROM
  (SELECT COUNT(*) AS a_count FROM LIVE.tbla),
  (SELECT COUNT(*) AS b_count FROM LIVE.tblb)

Delta Live Tablesのエクスペクテーションに基づいた高度な検証を実行する

集計クエリとジョインクエリを使用してマテリアライズドビューを定義し、それらのクエリの結果を期待値チェックの一部として使用できます。 これは、派生テーブルにソース テーブルのすべてのレコードが含まれていることを確認したり、テーブル間で数値列の等価性を保証したりするなど、複雑なデータ品質チェックを実行する場合に便利です。

次の例では、予期されるすべてのレコードがreportテーブルに存在することを検証します:

CREATE MATERIALIZED VIEW report_compare_tests(
  CONSTRAINT no_missing_records EXPECT (r.key IS NOT NULL)
)
AS SELECT * FROM LIVE.validation_copy v
LEFT OUTER JOIN LIVE.report r ON v.key = r.key

以下の例では、主キーの一意性を確保するために集約を使用しています:

CREATE MATERIALIZED VIEW report_pk_tests(
  CONSTRAINT unique_pk EXPECT (num_entries = 1)
)
AS SELECT pk, count(*) as num_entries
FROM LIVE.report
GROUP BY pk

エクスペクテーションをポータブルかつ再利用可能にする

データ品質ルールは、パイプラインの実装とは別に管理できます。

Databricksでは、各ルールをタグで分類したDeltaテーブルにルールを格納することをお勧めします。データセット定義でこのタグを使用して、適用するルールを決定します。

次の例では、ルールを維持するためにrulesという名前のテーブルを作成します:

CREATE OR REPLACE TABLE
  rules
AS SELECT
  col1 AS name,
  col2 AS constraint,
  col3 AS tag
FROM (
  VALUES
  ("website_not_null","Website IS NOT NULL","validity"),
  ("location_not_null","Location IS NOT NULL","validity"),
  ("state_not_null","State IS NOT NULL","validity"),
  ("fresh_data","to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'","maintained"),
  ("social_media_access","NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)","maintained")
)

次のPythonの例では、rulesテーブルに保存されているルールに基づいてデータ品質のエクスペクテーションを定義します。get_rules()関数は、rulesテーブルからルールを読み取り、関数に渡されたtag 引数に一致するルールを含むPythonディクショナリを返します。ディクショナリは、データ品質制約を強制するために@dlt.expect_all_*()デコレータに適用されます。たとえば、validityでタグ付けされたルールに違反したレコードは、raw_farmers_marketテーブルから削除されます:

この例では 、Databricks データセットに含まれるサンプル データを読み取ります。 Databricks データセットは Unity Catalogにパブリッシュするパイプラインではサポートされていないため、この例は Hive metastoreにパブリッシュするように設定されたパイプラインでのみ機能します。ただし、このパターンは Unity Catalog 対応パイプラインでも機能しますが、 外部ロケーションからデータを読み取る必要があります。 Delta Live Tables で Unity Catalog を使用する方法の詳細については、「 Delta Live Tables パイプラインで Unity Catalog を使用する」を参照してください。

import dlt
from pyspark.sql.functions import expr, col

def get_rules(tag):
  """
    loads data quality rules from a table
    :param tag: tag to match
    :return: dictionary of rules that matched the tag
  """
  rules = {}
  df = spark.read.table("rules")
  for row in df.filter(col("tag") == tag).collect():
    rules[row['name']] = row['constraint']
  return rules

@dlt.table(
  name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
  return (
    dlt.read("raw_farmers_market")
      .filter(expr("Organic = 'Y'"))
      .select("MarketName", "Website", "State",
        "Facebook", "Twitter", "Youtube", "Organic",
        "updateTime"
      )
  )

ルールを維持するためにrulesという名前のテーブルを作成する代わりに、メイン ルールに対する Python モジュールを、たとえばノートブックと同じフォルダー内のrules_module.pyという名前のファイルに作成できます。

def get_rules_as_list_of_dict():
  return [
    {
      "name": "website_not_null",
      "constraint": "Website IS NOT NULL",
      "tag": "validity"
    },
    {
      "name": "location_not_null",
      "constraint": "Location IS NOT NULL",
      "tag": "validity"
    },
    {
      "name": "state_not_null",
      "constraint": "State IS NOT NULL",
      "tag": "validity"
    },
    {
      "name": "fresh_data",
      "constraint": "to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'",
      "tag": "maintained"
    },
    {
      "name": "social_media_access",
      "constraint": "NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)",
      "tag": "maintained"
    }
  ]

次に、モジュールをインポートし、 rulesテーブルからではなくモジュールから読み取るようにget_rules()関数を変更して、前述のノートブックを変更します。

import dlt
from rules_module import *
from pyspark.sql.functions import expr, col

df = spark.createDataFrame(get_rules_as_list_of_dict())

def get_rules(tag):
  """
    loads data quality rules from a table
    :param tag: tag to match
    :return: dictionary of rules that matched the tag
  """
  rules = {}
  for row in df.filter(col("tag") == tag).collect():
    rules[row['name']] = row['constraint']
  return rules

@dlt.table(
  name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
  return (
    dlt.read("raw_farmers_market")
      .filter(expr("Organic = 'Y'"))
      .select("MarketName", "Website", "State",
        "Facebook", "Twitter", "Youtube", "Organic",
        "updateTime"
      )
  )