Pular para o conteúdo principal

Recomendações de expectativa e padrões avançados

Este artigo contém recomendações para a implementação de expectativas em escala e exemplos de padrões avançados apoiados por expectativas. Esses padrões usam vários conjuntos de dados em conjunto com as expectativas e exigem que os usuários entendam a sintaxe e a semântica da visualização materializada, das tabelas de transmissão e das expectativas.

Para obter uma visão geral básica do comportamento e da sintaxe das expectativas, consulte gerenciar a qualidade dos dados com pipeline expectations.

Expectativas portáteis e reutilizáveis

A Databricks recomenda as seguintes práticas recomendadas ao implementar expectativas para melhorar a portabilidade e reduzir os encargos de manutenção:

Recomendação

Impacto

Armazene as definições de expectativa separadamente da lógica do pipeline.

Aplique facilmente as expectativas a vários conjuntos de dados ou pipelines. Atualizar, auditar e manter as expectativas sem modificar o código-fonte do pipeline.

Adicione tags personalizadas para criar grupos de expectativas relacionadas.

Filtre as expectativas com base nas tags.

Aplicar as expectativas de forma consistente em um conjunto de dados semelhante.

Use as mesmas expectativas em vários conjuntos de dados e pipelines para avaliar uma lógica idêntica.

Os exemplos a seguir demonstram o uso de uma tabela ou dicionário Delta para criar um repositório central de expectativas. As funções personalizadas do Python aplicam essas expectativas ao conjunto de dados em um exemplo pipeline:

The following example creates a table named rules to maintain rules:

SQL
CREATE OR REPLACE TABLE
rules
AS SELECT
col1 AS name,
col2 AS constraint,
col3 AS tag
FROM (
VALUES
("website_not_null","Website IS NOT NULL","validity"),
("fresh_data","to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'","maintained"),
("social_media_access","NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)","maintained")
)

The following Python example defines data quality expectations based on the rules in the rules table. The get_rules() function reads the rules from the rules table and returns a Python dictionary containing rules matching the tag argument passed to the function.

In this example, the dictionary is applied using @dlt.expect_all_or_drop() decorators to enforce data quality constraints.

For example, any records failing the rules tagged with validity will be dropped from the raw_farmers_market table:

Python
import dlt
from pyspark.sql.functions import expr, col

def get_rules(tag):
"""
loads data quality rules from a table
:param tag: tag to match
:return: dictionary of rules that matched the tag
"""
df = spark.read.table("rules").filter(col("tag") == tag).collect()
return {
row['name']: row['constraint']
for row in df
}

@dlt.table
@dlt.expect_all_or_drop(get_rules('validity'))
def raw_farmers_market():
return (
spark.read.format('csv').option("header", "true")
.load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
)

@dlt.table
@dlt.expect_all_or_drop(get_rules('maintained'))
def organic_farmers_market():
return (
dlt.read("raw_farmers_market")
.filter(expr("Organic = 'Y'"))
)

Validação da contagem de linhas

O exemplo a seguir valida a igualdade de contagem de linhas entre table_a e table_b para verificar se nenhum dado é perdido durante as transformações:

Gráfico de validação de contagem de linhas DLT com expectativas de uso

Python
@dlt.view(
name="count_verification",
comment="Validates equal row counts between tables"
)
@dlt.expect_or_fail("no_rows_dropped", "a_count == b_count")
def validate_row_counts():
return spark.sql("""
SELECT * FROM
(SELECT COUNT(*) AS a_count FROM table_a),
(SELECT COUNT(*) AS b_count FROM table_b)""")

Detecção de registro ausente

O seguinte exemplo valida que todos os registros esperados estão presentes na tabela report:

Gráfico de detecção de linhas ausentes no DLT com expectativas de uso

Python
@dlt.view(
name="report_compare_tests",
comment="Validates no records are missing after joining"
)
@dlt.expect_or_fail("no_missing_records", "r_key IS NOT NULL")
def validate_report_completeness():
return (
dlt.read("validation_copy").alias("v")
.join(
dlt.read("report").alias("r"),
on="key",
how="left_outer"
)
.select(
"v.*",
"r.key as r_key"
)
)

Primário key exclusividade

O exemplo a seguir valida as restrições primárias key entre as tabelas:

DLT primary key uniqueness gráfico com expectativas de uso

Python
@dlt.view(
name="report_pk_tests",
comment="Validates primary key uniqueness"
)
@dlt.expect_or_fail("unique_pk", "num_entries = 1")
def validate_pk_uniqueness():
return (
dlt.read("report")
.groupBy("pk")
.count()
.withColumnRenamed("count", "num_entries")
)

Padrão de evolução do esquema

O exemplo a seguir mostra como lidar com a evolução do esquema para colunas adicionais. Use esse padrão quando estiver migrando fontes de dados ou lidando com várias versões de dados upstream, garantindo a compatibilidade com versões anteriores e, ao mesmo tempo, reforçando a qualidade dos dados:

Validação da evolução do esquema DLT com expectativas de uso

Python
@dlt.table
@dlt.expect_all_or_fail({
"required_columns": "col1 IS NOT NULL AND col2 IS NOT NULL",
"valid_col3": "CASE WHEN col3 IS NOT NULL THEN col3 > 0 ELSE TRUE END"
})
def evolving_table():
# Legacy data (V1 schema)
legacy_data = spark.read.table("legacy_source")

# New data (V2 schema)
new_data = spark.read.table("new_source")

# Combine both sources
return legacy_data.unionByName(new_data, allowMissingColumns=True)

Padrão de validação baseado em intervalos

O exemplo a seguir demonstra como validar novos pontos de dados em relação a intervalos estatísticos históricos, ajudando a identificar discrepâncias e anomalias em seu fluxo de dados:

Validação baseada na faixa DLT com uso de expectativas

Python
@dlt.view
def stats_validation_view():
# Calculate statistical bounds from historical data
bounds = spark.sql("""
SELECT
avg(amount) - 3 * stddev(amount) as lower_bound,
avg(amount) + 3 * stddev(amount) as upper_bound
FROM historical_stats
WHERE
date >= CURRENT_DATE() - INTERVAL 30 DAYS
""")

# Join with new data and apply bounds
return spark.read.table("new_data").crossJoin(bounds)

@dlt.table
@dlt.expect_or_drop(
"within_statistical_range",
"amount BETWEEN lower_bound AND upper_bound"
)
def validated_amounts():
return dlt.read("stats_validation_view")

Quarentena de registros inválidos

Esse padrão combina expectativas com tabelas e visualizações temporárias para rastrear métricas de qualidade de dados durante as atualizações do site pipeline e permitir caminhos de processamento separados para registros válidos e inválidos em operações downstream.

Padrão de quarentena de dados DLT com expectativas de uso

Python
import dlt
from pyspark.sql.functions import expr

rules = {
"valid_pickup_zip": "(pickup_zip IS NOT NULL)",
"valid_dropoff_zip": "(dropoff_zip IS NOT NULL)",
}
quarantine_rules = "NOT({0})".format(" AND ".join(rules.values()))

@dlt.view
def raw_trips_data():
return spark.readStream.table("samples.nyctaxi.trips")

@dlt.table(
temporary=True,
partition_cols=["is_quarantined"],
)
@dlt.expect_all(rules)
def trips_data_quarantine():
return (
dlt.readStream("raw_trips_data").withColumn("is_quarantined", expr(quarantine_rules))
)

@dlt.view
def valid_trips_data():
return dlt.read("trips_data_quarantine").filter("is_quarantined=false")

@dlt.view
def invalid_trips_data():
return dlt.read("trips_data_quarantine").filter("is_quarantined=true")