Gerenciar a qualidade dos dados com pipeline expectativas

Use as expectativas para aplicar restrições de qualidade que validam os dados à medida que eles fluem pelo pipeline ETL. As expectativas proporcionam maior entendimento sobre as métricas de qualidade de dados e permitem que o senhor falhe em atualizações ou elimine registros ao detectar registros inválidos.

Este artigo fornece uma visão geral das expectativas, incluindo exemplos de sintaxe e opções de comportamento. Para casos de uso mais avançados e melhores práticas recomendadas, consulte Recomendações de expectativa e padrões avançados.

Delta Live Tables fluxo de expectativas gráfico

Quais são as expectativas?

Expectations são cláusulas opcionais em pipeline materialized view, transmissão table ou view creation statements que aplicam verificações de qualidade de dados em cada registro que passa por uma consulta. As expectativas usam as declarações padrão do site SQL Boolean para especificar as restrições. O senhor pode combinar várias expectativas para um único dataset e definir expectativas em todas as declarações de dataset em um pipeline.

As seções a seguir apresentam os três componentes de uma expectativa e fornecem exemplos de sintaxe.

Nome da expectativa

Cada expectativa deve ter um nome, que é usado como um identificador para rastrear e monitorar a expectativa. Escolha um nome que comunique as métricas que estão sendo validadas. O exemplo a seguir define a expectativa valid_customer_age para confirmar que a idade está entre 0 e 120 anos:

Importante

Um nome de expectativa deve ser exclusivo para um determinado dataset. O senhor pode reutilizar expectativas em vários conjuntos de dados em um pipeline. Veja as expectativas de portáteis e reutilizáveis.

@dlt.table
@dlt.expect("valid_customer_age", "age BETWEEN 0 AND 120")
def customers():
  return spark.readStream.table("datasets.samples.raw_customers")
CREATE OR REFRESH STREAMING TABLE customers(
  CONSTRAINT valid_customer_age EXPECT (age BETWEEN 0 AND 120)
) AS SELECT * FROM STREAM(datasets.samples.raw_customers);

Restrição para avaliar

A cláusula de restrição é uma instrução condicional SQL que deve ser avaliada como verdadeira ou falsa para cada registro. A restrição contém a lógica real do que está sendo validado. Quando um registro falha nessa condição, a expectativa é acionada.

As restrições devem usar a sintaxe SQL válida e não podem conter o seguinte:

  • Funções Python personalizadas

  • Chamadas de serviços externos

  • Subconsultas que fazem referência a outras tabelas

A seguir, exemplos de restrições que podem ser adicionadas às declarações de criação do site dataset:

# Simple constraint
@dlt.expect("non_negative_price", "price >= 0")

# SQL functions
@dlt.expect("valid_date", "year(transaction_date) >= 2020")

# CASE statements
@dlt.expect("valid_order_status", """
   CASE
     WHEN type = 'ORDER' THEN status IN ('PENDING', 'COMPLETED', 'CANCELLED')
     WHEN type = 'REFUND' THEN status IN ('PENDING', 'APPROVED', 'REJECTED')
     ELSE false
   END
""")

# Multiple constraints
@dlt.expect("non_negative_price", "price >= 0")
@dlt.expect("valid_purchase_date", "date <= current_date()")

# Complex business logic
@dlt.expect(
  "valid_subscription_dates",
  """start_date <= end_date
    AND end_date <= current_date()
    AND start_date >= '2020-01-01'"""
)

# Complex boolean logic
@dlt.expect("valid_order_state", """
   (status = 'ACTIVE' AND balance > 0)
   OR (status = 'PENDING' AND created_date > current_date() - INTERVAL 7 DAYS)
""")
-- Simple constraint
CONSTRAINT non_negative_price EXPECT (price >= 0)

-- SQL functions
CONSTRAINT valid_date EXPECT (year(transaction_date) >= 2020)

-- CASE statements
CONSTRAINT valid_order_status EXPECT (
  CASE
    WHEN type = 'ORDER' THEN status IN ('PENDING', 'COMPLETED', 'CANCELLED')
    WHEN type = 'REFUND' THEN status IN ('PENDING', 'APPROVED', 'REJECTED')
    ELSE false
  END
)

-- Multiple constraints
CONSTRAINT non_negative_price EXPECT (price >= 0)
CONSTRAINT valid_purchase_date EXPECT (date <= current_date())

-- Complex business logic
CONSTRAINT valid_subscription_dates EXPECT (
  start_date <= end_date
  AND end_date <= current_date()
  AND start_date >= '2020-01-01'
)

-- Complex boolean logic
CONSTRAINT valid_order_state EXPECT (
  (status = 'ACTIVE' AND balance > 0)
  OR (status = 'PENDING' AND created_date > current_date() - INTERVAL 7 DAYS)
)

Ação em registro inválido

Você deve especificar uma ação para determinar o que acontece quando um registro falha na verificação de validação. A tabela a seguir descreve as ações disponíveis:

Ação

Sintaxe SQL

Sintaxe do Python

Resultado

avisar (default)

EXPECT

dlt.expect

Registros inválidos são gravados no destino. A contagem de registros válidos e inválidos é registrada juntamente com outras dataset métricas.

derrubar

EXPECT ... ON VIOLATION DROP ROW

dlt.expect_or_drop

Os registros inválidos são descartados antes que os dados sejam gravados no destino. A contagem de registros descartados é registrada juntamente com outras dataset métricas.

falhar

EXPECT ... ON VIOLATION FAIL UPDATE

dlt.expect_or_fail

Registros inválidos impedem que a atualização seja bem-sucedida. É necessária uma intervenção manual antes do reprocessamento. Essa expectativa causa a falha de um único fluxo e não causa a falha de outros fluxos em seu pipeline.

Você também pode implementar uma lógica avançada para colocar em quarentena registros inválidos sem falhar ou descartar dados. Consulte Quarentena de registros inválidos.

Expectativa acompanhamento métricas

O senhor pode ver o acompanhamento métrico das ações warn ou drop na interface de usuário pipeline. Como fail faz com que a atualização falhe quando um registro inválido é detectado, as métricas não são registradas.

Para view expectation métricas, conclua as etapas a seguir:

  1. Clique em Delta Live Tables na barra lateral.

  2. Clique no nome do seu pipeline.

  3. Clique em dataset com uma expectativa definida.

  4. Selecione Data quality (Qualidade dos dados ) tab na barra lateral direita.

O senhor pode view métricas de qualidade de dados consultando o evento Delta Live Tables log. Consulte Consultar a qualidade dos dados do evento log.

Reter registros inválidos

Manter registros inválidos é o comportamento default das expectativas. Use o operador expect quando quiser manter os registros que violam a expectativa, mas colete métricas sobre quantos registros passam ou falham em uma restrição. Os registros que violam a expectativa são adicionados ao destino dataset juntamente com os registros válidos:

@dlt.expect("valid timestamp", "timestamp > '2012-01-01'")
CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')

Solte registros inválidos

Use o operador expect_or_drop para evitar o processamento adicional de registros inválidos. Os registros que violam as expectativas são descartados do dataset de destino:

@dlt.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW

Falha em registros inválidos

Quando registros inválidos forem inaceitáveis, use o operador expect_or_fail para interromper a execução imediatamente quando um registro falhar na validação. Se a operação for uma atualização de tabela, o sistema reverte atomicamente a transação:

@dlt.expect_or_fail("valid_count", "count > 0")
CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE

Importante

Se o senhor tiver vários fluxos paralelos definidos em um pipeline, a falha de um único fluxo não causará a falha de outros fluxos.

Delta Live Tables explicação de falha de fluxo gráfico

Solução de problemas com falhas em atualizações a partir das expectativas

Quando um pipeline falha devido a uma violação de expectativa, é necessário corrigir o código do pipeline para lidar corretamente com os dados inválidos antes de executar o pipeline novamente.

As expectativas configuradas para o pipeline de falhas modificam o plano de consulta Spark de suas transformações para rastrear as informações necessárias para detectar e relatar violações. O senhor pode usar essas informações para identificar qual registro de entrada resultou na violação de muitas consultas. Veja a seguir um exemplo de expectativa:

Expectation Violated:
{
  "flowName": "sensor-pipeline",
  "verboseInfo": {
    "expectationsViolated": [
      "temperature_in_valid_range"
    ],
    "inputData": {
      "id": "TEMP_001",
      "temperature": -500,
      "timestamp_ms": "1710498600"
    },
    "outputRecord": {
      "sensor_id": "TEMP_001",
      "temperature": -500,
      "change_time": "2024-03-15 10:30:00"
    },
    "missingInputData": false
  }
}

Gestão de múltiplas expectativas

Observação

Embora tanto o SQL quanto o Python suportem várias expectativas em um único dataset, somente o Python permite que o senhor agrupe várias expectativas separadas e especifique ações coletivas.

Delta Live Tables com múltiplas expectativas fLow gráfico

Você pode agrupar várias expectativas e especificar ações coletivas usando as funções expect_all, expect_all_or_drop e expect_all_or_fail.

Esses decoradores aceitam um dicionário Python como argumento, em que key é o nome da expectativa e o valor é a restrição da expectativa. O senhor pode reutilizar o mesmo conjunto de expectativas em vários conjuntos de dados em seu site pipeline. A seguir, exemplos de cada um dos operadores expect_all do Python:

valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}

@dlt.table
@dlt.expect_all(valid_pages)
def raw_data():
  # Create a raw dataset

@dlt.table
@dlt.expect_all_or_drop(valid_pages)
def prepared_data():
  # Create a cleaned and prepared dataset

@dlt.table
@dlt.expect_all_or_fail(valid_pages)
def customer_facing_data():
  # Create cleaned and prepared to share the dataset