メインコンテンツまでスキップ

エクスペクテーションの推奨事項と高度なパターン

この記事には、エクスペクテーションを大規模に実装するための推奨事項と、エクスペクテーションによってサポートされる高度なパターンの例が含まれています。 これらのパターンでは、複数のデータセットをエクスペクテーションと組み合わせて使用し、ユーザーはマテリアライズドビュー、ストリーミング テーブル、およびエクスペクテーションの構文とセマンティクスを理解する必要があります。

エクスペクテーションの動作と構文の基本的な概要については、「 パイプラインのエクスペクテーションを使用してデータ品質を管理する」を参照してください。

ポータブルで再利用可能な期待

Databricks では、移植性を向上させ、メンテナンスの負担を軽減するためのエクスペクテーションを実装する際に、次のベスト プラクティスを推奨しています。

推奨事項

インパクト

エクスペクテーションの定義をパイプライン ロジックとは別に格納します。

複数のデータセットやパイプラインに期待事項を簡単に適用できます。 パイプラインのソースコードを変更せずに、エクスペクテーションを更新、監査、維持します。

カスタムタグを追加して、関連するエクスペクテーションのグループを作成します。

タグに基づいてエクスペクテーションをフィルタリングします。

類似したデータセットに一貫してエクスペクテーションを適用します。

複数のデータセットとパイプラインで同じエクスペクテーションを使用して、同一のロジックを評価します。

次の例は、Delta テーブルまたはディクショナリを使用して中央のエクスペクテーションリポジトリを作成する方法を示しています。 次に、カスタム Python 関数がこれらのエクスペクテーションをサンプル パイプラインのデータセットに適用します。

The following example creates a table named rules to maintain rules:

SQL
CREATE OR REPLACE TABLE
rules
AS SELECT
col1 AS name,
col2 AS constraint,
col3 AS tag
FROM (
VALUES
("website_not_null","Website IS NOT NULL","validity"),
("fresh_data","to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'","maintained"),
("social_media_access","NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)","maintained")
)

The following Python example defines data quality expectations based on the rules in the rules table. The get_rules() function reads the rules from the rules table and returns a Python dictionary containing rules matching the tag argument passed to the function.

In this example, the dictionary is applied using @dlt.expect_all_or_drop() decorators to enforce data quality constraints.

For example, any records failing the rules tagged with validity will be dropped from the raw_farmers_market table:

Python
import dlt
from pyspark.sql.functions import expr, col

def get_rules(tag):
"""
loads data quality rules from a table
:param tag: tag to match
:return: dictionary of rules that matched the tag
"""
df = spark.read.table("rules").filter(col("tag") == tag).collect()
return {
row['name']: row['constraint']
for row in df
}

@dlt.table
@dlt.expect_all_or_drop(get_rules('validity'))
def raw_farmers_market():
return (
spark.read.format('csv').option("header", "true")
.load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
)

@dlt.table
@dlt.expect_all_or_drop(get_rules('maintained'))
def organic_farmers_market():
return (
dlt.read("raw_farmers_market")
.filter(expr("Organic = 'Y'"))
)

行数の検証

次の例では、 table_atable_b の間の行数の等価性を検証して、変換中にデータが失われないことを確認します。

DLT 行数検証グラフと期待使用量

Python
@dlt.view(
name="count_verification",
comment="Validates equal row counts between tables"
)
@dlt.expect_or_fail("no_rows_dropped", "a_count == b_count")
def validate_row_counts():
return spark.sql("""
SELECT * FROM
(SELECT COUNT(*) AS a_count FROM table_a),
(SELECT COUNT(*) AS b_count FROM table_b)""")

レコードの欠落検出

次の例では、予期されるすべてのレコードがreportテーブルに存在することを検証します:

DLT 欠落行検出グラフと期待使用量

Python
@dlt.view(
name="report_compare_tests",
comment="Validates no records are missing after joining"
)
@dlt.expect_or_fail("no_missing_records", "r_key IS NOT NULL")
def validate_report_completeness():
return (
dlt.read("validation_copy").alias("v")
.join(
dlt.read("report").alias("r"),
on="key",
how="left_outer"
)
.select(
"v.*",
"r.key as r_key"
)
)

主キーの一意性

次の例では、テーブル間のプライマリ・キー制約を検証します。

DLT 主キーの一意性グラフと期待使用量

Python
@dlt.view(
name="report_pk_tests",
comment="Validates primary key uniqueness"
)
@dlt.expect_or_fail("unique_pk", "num_entries = 1")
def validate_pk_uniqueness():
return (
dlt.read("report")
.groupBy("pk")
.count()
.withColumnRenamed("count", "num_entries")
)

スキーマ進化パターン

次の例は、追加の列のスキーマ進化を処理する方法を示しています。 データソースを移行する場合や、複数のバージョンのアップストリーム データを処理する場合は、このパターンを使用して、データ品質を強化しながら下位互換性を確保します。

DLT、スキーマ進化、検証、エクスペクテーション、使用

Python
@dlt.table
@dlt.expect_all_or_fail({
"required_columns": "col1 IS NOT NULL AND col2 IS NOT NULL",
"valid_col3": "CASE WHEN col3 IS NOT NULL THEN col3 > 0 ELSE TRUE END"
})
def evolving_table():
# Legacy data (V1 schema)
legacy_data = spark.read.table("legacy_source")

# New data (V2 schema)
new_data = spark.read.table("new_source")

# Combine both sources
return legacy_data.unionByName(new_data, allowMissingColumns=True)

範囲ベースの検証パターン

次の例は、新しいデータポイントを過去の統計範囲に対して検証し、データ フローの外れ値と異常を特定する方法を示しています。

DLT範囲ベースの検証と予想される使用

Python
@dlt.view
def stats_validation_view():
# Calculate statistical bounds from historical data
bounds = spark.sql("""
SELECT
avg(amount) - 3 * stddev(amount) as lower_bound,
avg(amount) + 3 * stddev(amount) as upper_bound
FROM historical_stats
WHERE
date >= CURRENT_DATE() - INTERVAL 30 DAYS
""")

# Join with new data and apply bounds
return spark.read.table("new_data").crossJoin(bounds)

@dlt.table
@dlt.expect_or_drop(
"within_statistical_range",
"amount BETWEEN lower_bound AND upper_bound"
)
def validated_amounts():
return dlt.read("stats_validation_view")

無効なレコードを隔離する

このパターンは、エクスペクテーションと一時テーブルおよびビューを組み合わせて、パイプラインの更新中にデータ品質メトリクスを追跡し、ダウンストリーム操作で有効なレコードと無効なレコードに対して別々の処理パスを有効にします。

DLT データ検疫パターンと予想される使用量

Python
import dlt
from pyspark.sql.functions import expr

rules = {
"valid_pickup_zip": "(pickup_zip IS NOT NULL)",
"valid_dropoff_zip": "(dropoff_zip IS NOT NULL)",
}
quarantine_rules = "NOT({0})".format(" AND ".join(rules.values()))

@dlt.view
def raw_trips_data():
return spark.readStream.table("samples.nyctaxi.trips")

@dlt.table(
temporary=True,
partition_cols=["is_quarantined"],
)
@dlt.expect_all(rules)
def trips_data_quarantine():
return (
dlt.readStream("raw_trips_data").withColumn("is_quarantined", expr(quarantine_rules))
)

@dlt.view
def valid_trips_data():
return dlt.read("trips_data_quarantine").filter("is_quarantined=false")

@dlt.view
def invalid_trips_data():
return dlt.read("trips_data_quarantine").filter("is_quarantined=true")