gerenciar a qualidade dos dados com expectativas pipeline
Use expectativas para aplicar restrições de qualidade que validem os dados conforme eles fluem pelo pipeline ETL . As expectativas fornecem maior compreensão das métricas de qualidade de dados e permitem que você falhe nas atualizações ou descarte registros ao detectar registros inválidos.
Este artigo tem uma visão geral das expectativas, incluindo exemplos de sintaxe e opções de comportamento. Para casos de uso mais avançados e práticas recomendadas, consulte Recomendações de expectativa e padrões avançados.
O que são expectativas?
Expectativas são cláusulas opcionais em view materializadas pipeline , tabelas de transmissão ou instruções de criação view que aplicam verificações de qualidade de dados em cada registro que passa por uma consulta. Expectativas usam instruções Boolean SQL padrão para especificar restrições. Você pode combinar várias expectativas para um único dataset e definir expectativas em todas as declarações de dataset em um pipeline.
As seções a seguir apresentam os três componentes de uma expectativa e fornecem exemplos de sintaxe.
Nome da expectativa
Cada expectativa deve ter um nome, que é usado como um identificador para rastrear e monitorar a expectativa. Escolha um nome que comunique as métricas que estão sendo validadas. O exemplo a seguir define a expectativa valid_customer_age
para confirmar que a idade está entre 0 e 120 anos:
Um nome de expectativa deve ser exclusivo para um determinado dataset. Você pode reutilizar expectativas em vários conjuntos de dados em um pipeline. Veja Expectativas portáteis e reutilizáveis.
- Python
- SQL
@dp.table
@dp.expect("valid_customer_age", "age BETWEEN 0 AND 120")
def customers():
return spark.readStream.table("datasets.samples.raw_customers")
CREATE OR REFRESH STREAMING TABLE customers(
CONSTRAINT valid_customer_age EXPECT (age BETWEEN 0 AND 120)
) AS SELECT * FROM STREAM(datasets.samples.raw_customers);
Restrição para avaliar
A cláusula de restrição é uma instrução condicional SQL que deve ser avaliada como verdadeira ou falsa para cada registro. A restrição contém a lógica real do que está sendo validado. Quando um registro falha nessa condição, a expectativa é acionada.
As restrições devem usar sintaxe SQL válida e não podem conter o seguinte:
- Funções Python personalizadas
- Chamadas de serviço externo
- Subconsultas que fazem referência a outras tabelas
A seguir estão exemplos de restrições que podem ser adicionadas às instruções de criação dataset :
- Python
- SQL
A sintaxe para uma restrição em Python é:
@dp.expect(<constraint-name>, <constraint-clause>)
Várias restrições podem ser especificadas:
@dp.expect(<constraint-name>, <constraint-clause>)
@dp.expect(<constraint2-name>, <constraint2-clause>)
Exemplos:
# Simple constraint
@dp.expect("non_negative_price", "price >= 0")
# SQL functions
@dp.expect("valid_date", "year(transaction_date) >= 2020")
# CASE statements
@dp.expect("valid_order_status", """
CASE
WHEN type = 'ORDER' THEN status IN ('PENDING', 'COMPLETED', 'CANCELLED')
WHEN type = 'REFUND' THEN status IN ('PENDING', 'APPROVED', 'REJECTED')
ELSE false
END
""")
# Multiple constraints
@dp.expect("non_negative_price", "price >= 0")
@dp.expect("valid_purchase_date", "date <= current_date()")
# Complex business logic
@dp.expect(
"valid_subscription_dates",
"""start_date <= end_date
AND end_date <= current_date()
AND start_date >= '2020-01-01'"""
)
# Complex boolean logic
@dp.expect("valid_order_state", """
(status = 'ACTIVE' AND balance > 0)
OR (status = 'PENDING' AND created_date > current_date() - INTERVAL 7 DAYS)
""")
A sintaxe para uma restrição em SQL é:
CONSTRAINT <constraint-name> EXPECT ( <constraint-clause> )
Várias restrições devem ser separadas por uma vírgula:
CONSTRAINT <constraint-name> EXPECT ( <constraint-clause> ),
CONSTRAINT <constraint2-name> EXPECT ( <constraint2-clause> )
Exemplos:
-- Simple constraint
CONSTRAINT non_negative_price EXPECT (price >= 0)
-- SQL functions
CONSTRAINT valid_date EXPECT (year(transaction_date) >= 2020)
-- CASE statements
CONSTRAINT valid_order_status EXPECT (
CASE
WHEN type = 'ORDER' THEN status IN ('PENDING', 'COMPLETED', 'CANCELLED')
WHEN type = 'REFUND' THEN status IN ('PENDING', 'APPROVED', 'REJECTED')
ELSE false
END
)
-- Multiple constraints
CONSTRAINT non_negative_price EXPECT (price >= 0),
CONSTRAINT valid_purchase_date EXPECT (date <= current_date())
-- Complex business logic
CONSTRAINT valid_subscription_dates EXPECT (
start_date <= end_date
AND end_date <= current_date()
AND start_date >= '2020-01-01'
)
-- Complex boolean logic
CONSTRAINT valid_order_state EXPECT (
(status = 'ACTIVE' AND balance > 0)
OR (status = 'PENDING' AND created_date > current_date() - INTERVAL 7 DAYS)
)
Ação sobre registro inválido
Você deve especificar uma ação para determinar o que acontece quando um registro falha na verificação de validação. A tabela a seguir descreve as ações disponíveis:
Ação | Sintaxe SQL | Sintaxe Python | Resultado |
---|---|---|---|
avisar (default) |
|
| Registros inválidos são gravados no destino. |
|
| Registros inválidos são descartados antes que os dados sejam gravados no destino. A contagem de registros descartados é registrada juntamente com outras métricas dataset . | |
|
| Registros inválidos impedem que a atualização seja bem-sucedida. É necessária intervenção manual antes do reprocessamento. Essa expectativa causa falha em um único fluxo e não causa falha em outros fluxos no seu pipeline. |
Você também pode implementar lógica avançada para colocar registros inválidos em quarentena sem falhar ou descartar dados. Consulte Quarentena de registros inválidos.
Expectativa de acompanhamento de métricas
Você pode ver as métricas de acompanhamento para ações warn
ou drop
na interface pipeline . Como fail
faz com que a atualização falhe quando um registro inválido é detectado, as métricas não são registradas.
Para view as métricas de expectativa, siga os seguintes passos:
- Na barra lateral do seu workspace Databricks , clique em Jobs & pipeline .
- Clique no nome do seu pipeline.
- Clique em um dataset com uma expectativa definida.
- Selecione a tab Qualidade de dados na barra lateral direita.
Você pode view métricas de qualidade de dados consultando o log de eventos do pipeline declarativo LakeFlow . Consulte Métricas de qualidade ou expectativas de dados de consulta.
Manter registros inválidos
Manter registros inválidos é o comportamento default das expectativas. Use o operador expect
quando quiser manter registros que violam a expectativa, mas coletar métricas sobre quantos registros passam ou falham em uma restrição. Registros que violam a expectativa são adicionados ao dataset de destino junto com registros válidos:
- Python
- SQL
@dp.expect("valid timestamp", "timestamp > '2012-01-01'")
CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')
Eliminar registros inválidos
Use o operador expect_or_drop
para evitar o processamento adicional de registros inválidos. Os registros que violam as expectativas são descartados do dataset de destino:
- Python
- SQL
@dp.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW
Falha em registros inválidos
Quando registros inválidos forem inaceitáveis, use o operador expect_or_fail
para interromper a execução imediatamente quando um registro falhar na validação. Se a operação for uma atualização de tabela, o sistema reverte atomicamente a transação:
- Python
- SQL
@dp.expect_or_fail("valid_count", "count > 0")
CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE
Se você tiver vários fluxos paralelos definidos em um pipeline, a falha de um único fluxo não causará falha em outros fluxos.
Solução de problemas de atualizações com falha em relação às expectativas
Quando um pipeline falha devido a uma violação de expectativa, é necessário corrigir o código do pipeline para lidar corretamente com os dados inválidos antes de executar o pipeline novamente.
As expectativas configuradas para falhar no pipeline modificam o plano de consulta Spark de suas transformações para rastrear informações necessárias para detectar e relatar violações. Você pode usar essas informações para identificar qual registro de entrada resultou na violação para muitas consultas. O pipeline declarativo LakeFlow fornece uma mensagem de erro dedicada para relatar tais violações. Aqui está um exemplo de uma mensagem de erro de violação de expectativa:
[EXPECTATION_VIOLATION.VERBOSITY_ALL] Flow 'sensor-pipeline' failed to meet the expectation. Violated expectations: 'temperature_in_valid_range'. Input data: '{"id":"TEMP_001","temperature":-500,"timestamp_ms":"1710498600"}'. Output record: '{"sensor_id":"TEMP_001","temperature":-500,"change_time":"2024-03-15 10:30:00"}'. Missing input data: false
Gestão de múltiplas expectativas
Embora SQL e Python suportem múltiplas expectativas em um único dataset, somente Python permite agrupar múltiplas expectativas e especificar ações coletivas.
Você pode agrupar várias expectativas e especificar ações coletivas usando as funções expect_all
, expect_all_or_drop
e expect_all_or_fail
.
Esses decoradores aceitam um dicionário Python como argumento, onde a key é o nome da expectativa e o valor é a restrição da expectativa. Você pode reutilizar o mesmo conjunto de expectativas em vários conjuntos de dados em seu pipeline. A seguir, são mostrados exemplos de cada um dos operadores Python expect_all
:
valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}
@dp.table
@dp.expect_all(valid_pages)
def raw_data():
# Create a raw dataset
@dp.table
@dp.expect_all_or_drop(valid_pages)
def prepared_data():
# Create a cleaned and prepared dataset
@dp.table
@dp.expect_all_or_fail(valid_pages)
def customer_facing_data():
# Create cleaned and prepared to share the dataset
Limitações
-
Como apenas tabelas de transmissão e visualizações materializadas dão suporte às expectativas, as métricas de qualidade de dados são suportadas apenas para esses tipos de objetos.
-
As métricas de qualidade de dados não estão disponíveis quando:
- Nenhuma expectativa é definida em uma consulta.
- Um fluxo usa um operador que não suporta as expectativas.
- O tipo de fluxo, como coletores de pipeline declarativosLakeFlow, não atende às expectativas.
- Não há atualizações na tabela de transmissão associada ou view materializada para uma determinada execução de fluxo.
- A configuração do pipeline não inclui as configurações necessárias para capturar métricas, como
pipelines.metrics.flowTimeReporter.enabled
.
-
Em alguns casos, um fluxo
COMPLETED
pode não conter métricas. Em vez disso, as métricas são relatadas em cada microlote em um eventoflow_progress
com o statusRUNNING
.