Gerenciar a qualidade dos dados com o Delta Live Tables

Você utiliza expectativas para estabelecer restrições de qualidade de dados para o conteúdo de um conjunto de dados. As expectativas permitem que você garanta que os dados que chegam às tabelas atendam aos requisitos de qualidade de dados e forneçam insights sobre a qualidade de dados para cada atualização do pipeline.Você aplica expectativas para consultas usando decoradores Python ou cláusulas de restrição SQL.

Quais são as expectativas do Delta Live Tables?

As expectativas são cláusulas opcionais que você adiciona às declarações de conjuntos de dados no Delta Live Tables, que aplicam verificações de qualidade de dados em cada registro que passa por uma consulta.

Uma expectativa consiste em três coisas:

  • Uma descrição, que atua como um identificador exclusivo e permite que você rastreie métricas para a restrição.

  • Uma instrução Boolean que sempre retorna verdadeiro ou falso com base em alguma condição declarada.

  • Uma ação a tomar quando um registro falha na expectativa, o que significa que o Boolean retorna falso.

A matriz a seguir mostra as três ações que você pode aplicar a registros inválidos:

Ação

Resultado

avisar (default)

Registros inválidos são gravados no alvo; a falha é relatada como uma métrica para o dataset.

derrubar

Registros inválidos são eliminados antes que os dados sejam gravados no destino; a falha é relatada como uma métrica para o dataset.

falhar

Registros inválidos impedem que a atualização seja bem-sucedida. É necessária uma intervenção manual antes do reprocessamento.

você pode visualizar métricas de qualidade de dados, como o número de registros que violam uma expectativa, consultando o log de eventos do Delta Live Tables. Consulte Monitorar pipelines do Delta Live Tables.

Para obter uma referência completa da sintaxe de declaração do dataset do Delta Live Tables, consulte Referência da linguagem Python do Delta Live Tables ou Referência da linguagem SQL do Delta Live Tables.

Observação

  • Embora você possa incluir várias cláusulas em qualquer expectativa, apenas o Python oferece suporte à definição de ações com base em várias expectativas. Consulte Múltiplas expectativas.

  • As expectativas devem ser definidas usando expressões SQL. O senhor não pode usar sintaxe não-SQL (por exemplo, funções Python) ao definir uma expectativa.

Reter registros inválidos

Use o operador expect quando quiser manter registros que violem a expectativa. Registros que violam a expectativa são adicionados ao dataset de destino junto com registros válidos:

@dlt.expect("valid timestamp", "timestamp > '2012-01-01'")
CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')

Solte registros inválidos

Use o operador expect or drop para evitar o processamento adicional de registros inválidos. Os registros que violam as expectativas são descartados do dataset de destino:

@dlt.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW

Falha em registros inválidos

Quando registros inválidos forem inaceitáveis, use o operador expect or fail para interromper a execução imediatamente quando um registro falhar na validação. Se a operação for uma atualização de tabela, o sistema reverte atomicamente a transação:

@dlt.expect_or_fail("valid_count", "count > 0")
CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE

Importante

Se o senhor tiver vários fluxos paralelos definidos em um pipeline, a falha de um único fluxo não causará a falha de outros fluxos.

Quando um pipeline falha devido a uma violação de expectativa, é necessário corrigir o código do pipeline para lidar corretamente com os dados inválidos antes de executar o pipeline novamente.

As expectativas de falha modificam o plano de consulta Spark de suas transformações para rastrear as informações necessárias para detectar e relatar violações.Para muitas consultas, é possível usar essas informações para identificar qual registro de entrada resultou na violação. A seguir, um exemplo de exceção:

Expectation Violated:
{
  "flowName": "a-b",
  "verboseInfo": {
    "expectationsViolated": [
      "x1 is negative"
    ],
    "inputData": {
      "a": {"x1": 1,"y1": "a },
      "b": {
        "x2": 1,
        "y2": "aa"
      }
    },
    "outputRecord": {
      "x1": 1,
      "y1": "a",
      "x2": 1,
      "y2": "aa"
    },
    "missingInputData": false
  }
}

Múltiplas expectativas

Você pode definir expectativas com um ou mais critérios de qualidade de dados em pipelines Python.Esses decoradores aceitam um dicionário Python como argumento, onde a chave é o nome da expectativa e o valor é a restrição da expectativa.

Use expect_all para especificar várias restrições de qualidade de dados quando os registros que falham na validação devem ser incluídos no dataset de destino:

@dlt.expect_all({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})

Utilize o expect_all_or_drop para especificar múltiplas restrições de qualidade de dados quando registros que falham na validação devem ser descartados do dataset de destino:

@dlt.expect_all_or_drop({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})

Use expect_all_or_fail para especificar várias restrições de qualidade de dados quando os registros que falham na validação devem interromper a execução do pipeline:

@dlt.expect_all_or_fail({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})

Você também pode definir uma coleção de expectativas como uma variável e passá-la para uma ou mais consultas em seu pipeline:

valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}

@dlt.table
@dlt.expect_all(valid_pages)
def raw_data():
  # Create raw dataset

@dlt.table
@dlt.expect_all_or_drop(valid_pages)
def prepared_data():
  # Create cleaned and prepared dataset

Colocar dados inválidos em quarentena

O exemplo a seguir utiliza expectativas em combinação com tabelas e visualizações temporárias. Esse padrão oferece métricas para registros que passam pelas verificações de expectativa durante as atualizações do pipeline e proporciona uma maneira de processar registros válidos e inválidos por caminhos downstream distintos.

Observação

Este exemplo lê dados de amostra incluídos no conjunto de dadosDatabricks . Como o conjunto de dados Databricks não é compatível com um pipeline que publica em Unity Catalog, este exemplo funciona somente com um pipeline configurado para publicar em Hive metastore. No entanto, esse padrão também funciona com o pipeline habilitado para Unity Catalog, mas o senhor deve ler os dados de locais externos. Para saber mais sobre o uso do Unity Catalog com o Delta Live Tables, consulte Use o Unity Catalog com o pipeline Delta Live Tables .

import dlt
from pyspark.sql.functions import expr

rules = {}
rules["valid_website"] = "(Website IS NOT NULL)"
rules["valid_location"] = "(Location IS NOT NULL)"
quarantine_rules = "NOT({0})".format(" AND ".join(rules.values()))

@dlt.table(
  name="raw_farmers_market"
)
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="farmers_market_quarantine",
  temporary=True,
  partition_cols=["is_quarantined"]
)
@dlt.expect_all(rules)
def farmers_market_quarantine():
  return (
    spark.read.table("LIVE.raw_farmers_market")
      .select("MarketName", "Website", "Location", "State",
              "Facebook", "Twitter", "Youtube", "Organic", "updateTime")
      .withColumn("is_quarantined", expr(quarantine_rules))
  )

@dlt.view(
  name="valid_farmers_market"
)
def get_valid_farmers_market():
  return (
    spark.read.table("LIVE.farmers_market_quarantine")
      .filter("is_quarantined=false")
  )

@dlt.view(
  name="invalid_farmers_market"
)
def get_invalid_farmers_market():
  return (
    spark.read.table("LIVE.farmers_market_quarantine")
      .filter("is_quarantined=true")
  )

Validar a contagem de linhas nas tabelas

O senhor pode adicionar uma tabela adicional ao site pipeline que defina uma expectativa para comparar as contagens de linhas entre duas visualizações materializadas ou tabelas de transmissão. Os resultados dessa expectativa aparecem no evento log e na UI Delta Live Tables. O exemplo a seguir valida contagens de linhas iguais entre as tabelas tbla e tblb:

CREATE OR REFRESH MATERIALIZED VIEW count_verification(
  CONSTRAINT no_rows_dropped EXPECT (a_count == b_count)
) AS SELECT * FROM
  (SELECT COUNT(*) AS a_count FROM LIVE.tbla),
  (SELECT COUNT(*) AS b_count FROM LIVE.tblb)

Realize validação avançada com as expectativas do Delta Live Tables

O senhor pode definir a visualização materializada usando consultas agregadas e join e usar os resultados dessas consultas como parte da verificação de expectativas. Isso é útil se você quiser realizar verificações complexas de qualidade de dados, por exemplo, garantindo que uma tabela derivada contenha todos os registros da tabela de origem ou garantindo a igualdade de uma coluna numérica nas tabelas.

O seguinte exemplo valida que todos os registros esperados estão presentes na tabela report:

CREATE MATERIALIZED VIEW report_compare_tests(
  CONSTRAINT no_missing_records EXPECT (r.key IS NOT NULL)
)
AS SELECT * FROM LIVE.validation_copy v
LEFT OUTER JOIN LIVE.report r ON v.key = r.key

O exemplo a seguir usa um agregado para garantir a exclusividade de uma chave primária:

CREATE MATERIALIZED VIEW report_pk_tests(
  CONSTRAINT unique_pk EXPECT (num_entries = 1)
)
AS SELECT pk, count(*) as num_entries
FROM LIVE.report
GROUP BY pk

Torne as expectativas portáteis e reutilizáveis

Você pode manter as regras de qualidade de dados separadamente das implementações de pipeline.

A Databricks recomenda armazenar as regras em uma tabela Delta com cada regra categorizada por uma tag. Use essa marca nas definições de dataset para determinar quais regras devem ser aplicadas.

O exemplo seguinte cria uma tabela denominada rules para manter as regras:

CREATE OR REPLACE TABLE
  rules
AS SELECT
  col1 AS name,
  col2 AS constraint,
  col3 AS tag
FROM (
  VALUES
  ("website_not_null","Website IS NOT NULL","validity"),
  ("location_not_null","Location IS NOT NULL","validity"),
  ("state_not_null","State IS NOT NULL","validity"),
  ("fresh_data","to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'","maintained"),
  ("social_media_access","NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)","maintained")
)

O exemplo de Python a seguir define as expectativas de qualidade dos dados com base nas regras armazenadas na rules tabela. A get_rules() função lê as regras da rules tabela e retorna um dicionário Python contendo regras que correspondem ao tag argumento passado para a função. O dicionário é aplicado nos @dlt.expect_all_*() decoradores para impor restrições de qualidade de dados. Por exemplo, todos os registros que falharem nas regras marcadas com validity serão retirados da raw_farmers_market tabela:

Observação

Este exemplo lê dados de amostra incluídos no conjunto de dadosDatabricks . Como o conjunto de dados Databricks não é compatível com um pipeline que publica em Unity Catalog, este exemplo funciona somente com um pipeline configurado para publicar em Hive metastore. No entanto, esse padrão também funciona com o pipeline habilitado para Unity Catalog, mas o senhor deve ler os dados de locais externos. Para saber mais sobre o uso do Unity Catalog com o Delta Live Tables, consulte Use o Unity Catalog com o pipeline Delta Live Tables .

import dlt
from pyspark.sql.functions import expr, col

def get_rules(tag):
  """
    loads data quality rules from a table
    :param tag: tag to match
    :return: dictionary of rules that matched the tag
  """
  rules = {}
  df = spark.read.table("rules")
  for row in df.filter(col("tag") == tag).collect():
    rules[row['name']] = row['constraint']
  return rules

@dlt.table(
  name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
  return (
    spark.read.table("LIVE.raw_farmers_market")
      .filter(expr("Organic = 'Y'"))
      .select("MarketName", "Website", "State",
        "Facebook", "Twitter", "Youtube", "Organic",
        "updateTime"
      )
  )

Em vez de criar uma tabela chamada rules para manter as regras, o senhor poderia criar um módulo Python para as regras principais, por exemplo, em um arquivo chamado rules_module.py na mesma pasta do site Notebook:

def get_rules_as_list_of_dict():
  return [
    {
      "name": "website_not_null",
      "constraint": "Website IS NOT NULL",
      "tag": "validity"
    },
    {
      "name": "location_not_null",
      "constraint": "Location IS NOT NULL",
      "tag": "validity"
    },
    {
      "name": "state_not_null",
      "constraint": "State IS NOT NULL",
      "tag": "validity"
    },
    {
      "name": "fresh_data",
      "constraint": "to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'",
      "tag": "maintained"
    },
    {
      "name": "social_media_access",
      "constraint": "NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)",
      "tag": "maintained"
    }
  ]

Em seguida, modifique o site Notebook anterior importando o módulo e alterando a função get_rules() para ler do módulo em vez de ler da tabela rules:

import dlt
from rules_module import *
from pyspark.sql.functions import expr, col

df = spark.createDataFrame(get_rules_as_list_of_dict())

def get_rules(tag):
  """
    loads data quality rules from a table
    :param tag: tag to match
    :return: dictionary of rules that matched the tag
  """
  rules = {}
  for row in df.filter(col("tag") == tag).collect():
    rules[row['name']] = row['constraint']
  return rules

@dlt.table(
  name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
  return (
    spark.read.table("LIVE.raw_farmers_market")
      .filter(expr("Organic = 'Y'"))
      .select("MarketName", "Website", "State",
        "Facebook", "Twitter", "Youtube", "Organic",
        "updateTime"
      )
  )