Teste de unidade para pipelines
Beta
Esse recurso está em Beta.
Para obter informações gerais sobre testes de unidade Python no Databricks, consulte Teste de unidade Python.
Lakeflow Spark Declarative Pipelines permite escrever testes de unidade Python no Editor do Lakeflow Pipelines baseado na web. Isso permite validar a lógica de transformação Python ou SQL usando dados simulados. Com a estrutura de teste de pipeline, é possível testar casos extremos, validar APIs de pipeline proprietárias (CDC automático, tabelas de transmissão, expectativas, fluxos de anexação) e iterar rapidamente sem afetar datasets ativos.
- Execução de teste isolada: A estrutura fornece primitivas que permitem a criação de uma SparkSession que se comunica com um catálogo totalmente isolado, permitindo a simulação de dados de entrada dentro dos testes.
- **Escopo de teste flexível**: Execute um subconjunto de um pipeline (tabelas individuais, cadeias de tabelas dependentes ou pipelines inteiros) no compute do pipeline usando o SparkSession de teste.
- Validação de resultados : verifique os resultados de tabelas de saída isoladas criadas em um teste com asserções pytest padrão.
Quando usar o teste de unidade
Casos de uso típicos incluem:
- Validação de nova lógica de transformação : Teste se sua transformação produz o esquema esperado, as contagens de linha, as agregações e a lógica de negócios antes de executar em dados de produção.
- Teste das especificações do Auto CDC : Valide se as suas definições de fluxo do Auto CDC processam corretamente os eventos de alteração — tratando inserções, atualizações, exclusões e tipos de SCD (dimensões que mudam lentamente (SCD)) — usando dados simulados.
- Teste de expectativas e regras de qualidade dos dados : verifique se as expectativas falham quando deveriam e são aprovadas quando os dados são válidos.
- Teste em tabelas dependentes: Teste cadeias de transformações (por exemplo, bronze, prata e ouro) para validar se os dados fluem corretamente através do DAG do seu pipeline.
Requisitos
can_runpermissão no pipeline, além das permissõesUSE_CATALOGeCREATE_SCHEMAno catálogo default do pipeline.- O pipeline deve ser configurado no modo acionado (não contínuo).
- O pipeline deve estar no canal de **Pré-visualização**. Teste de unidade está em Beta e está disponível apenas em prévia.
Limitações
- Execução exclusiva do Editor : Testes devem ser executados a partir do Editor de Lakeflow Pipelines baseado na web.
- Somente testes Python : Testes devem ser escritos em Python. É possível testar pipelines SQL, mas os testes em si devem ser Python.
- Limitações de Governança: Dados simulados não seguirão a filtragem de linha ou o mascaramento de coluna definidos no nível de catálogo/esquema.
- Nenhuma execução concorrente: Não execute testes enquanto o pipeline estiver executando uma atualização. Pode impactar severamente o desempenho de sua carga de trabalho de produção.
O passo 1: Atualizar as configurações do pipeline
Configure o pipeline para ser executado no canal **PRÉ-VISUALIZAÇÃO** no modo triggered.
- Na interface do usuário, abra seu pipeline e clique em Configurações > Configurações Avançadas > Canal > Visualização
- Selecione **Acionado** para **Modo Pipeline** (não utilize Contínuo).
Alternativamente, edite as definições de JSON do pipeline diretamente:
"continuous": false,
"channel": "PREVIEW"
Etapa 2: Criar um arquivo de teste
No Editor de **LakeFlow Pipelines**, clique em Editar pipeline > Adicionar ativos de pipeline > Testar . Isso cria uma pasta tests e um arquivo de teste que não é incluído no código-fonte do seu pipeline.

Alternativamente, é possível criar a pasta e o arquivo manualmente:
- Clique em Editar pipeline > Adicionar ativos de pipeline > Nova pasta de pipeline .
- Dê o **Nome**
tests. - Clique na
testspasta, depois em"Criar arquivo > Python". - Nomeie seu arquivo usando o padrão
test_*.pyou*_test.py(por exemplo,test_transformations.py).
Passo 3: Gerar testes
O Genie Code pode gerar um esqueleto de teste:
-
Dentro do arquivo de teste, clique no botão Gerar testes .

-
Alternativamente, utilize
/testsno modo de agente do Genie Code.
Utilize o Genie Code para gerar código padrão e, em seguida, personalize para os casos de borda.
Alternativamente, pode-se escrever o código de teste. Adicione as seguintes importações na parte superior de cada arquivo de teste:
import pytest
from pyspark.pipelines.testing import TestPipeline, test_spark
test_pipeline = TestPipeline.active()
Passo 4: executar testes
Executar testes no Editor de Lakeflow Pipelines:
- Clique no botão
(play) na medianiz ao lado de uma função de teste para executar um teste individual.
- Clique em Executar testes no arquivo na parte superior do arquivo de teste para executar todos os testes naquele arquivo.
Os resultados dos testes (sucesso ou falha) são exibidos no painel inferior do Editor. Analise erros de asserção para depurar falhas.
Testar APIs
API | Descrição |
|---|---|
| Retorna um objeto |
| Executa sincronicamente uma atualização do pipeline, realizando um refresh seletivo caso nomes de tabelas sejam especificados. Retorna após a execução do pipeline ser bem-sucedida ou ser encerrada com uma exceção. |
| Cria uma SparkSession isolada, que redireciona automaticamente todas as leituras e gravações de tabela para um esquema de teste temporário, garantindo que os dados de produção nunca sejam afetados. |
Criar dados simulados
Você pode simular dados de entrada usando SQL ou createDataFrame:
# Option 1: Using SQL
test_spark.sql("""
CREATE TABLE catalog.schema.table_name AS
SELECT * FROM VALUES
(1, 'value1'),
(2, 'value2')
AS t(id, name)
""")
# Option 2: Using createDataFrame
df = test_spark.createDataFrame(
[(1, 'value1'), (2, 'value2')],
schema=["id", "name"]
)
df.write.saveAsTable("catalog.schema.table_name")
Para gerar volumes maiores de dados sintéticos realistas, é possível usar a biblioteca Faker. Execute %pip install faker primeiro em seu pipeline e, depois, crie um DataFrame a partir de UDFs Faker-backed.
# Option 3: Using Faker for synthetic data
from pyspark.sql import functions as F
from faker import Faker
fake = Faker()
fake_firstname = F.udf(fake.first_name)
fake_lastname = F.udf(fake.last_name)
fake_email = F.udf(fake.ascii_company_email)
df = (
test_spark.range(0, 100)
.withColumn("firstname", fake_firstname())
.withColumn("lastname", fake_lastname())
.withColumn("email", fake_email())
)
df.write.saveAsTable("catalog.schema.table_name")
Execute o pipeline ou tabelas específicas
# Run specific tables
test_pipeline.run(test_spark, set(["catalog.schema.table1", "catalog.schema.table2"]))
# Run all tables in the pipeline
test_pipeline.run(test_spark)
Exemplos
Exemplo 1: Teste de Agregações com Contagem de Linhas, Esquema e Tratamento de Nulos
Objetivo: Validar se a agregação de usuários conta corretamente os usuários por tipo, lida com emails nulos e produz o esquema esperado.
Transformações de pipeline:
Essas transformações criam um pipeline simples de duas tabelas: users seleciona dados do usuário, e counts agrupa usuários por tipo e contabiliza o total de usuários e emails válidos.
from pyspark import pipelines as dp
from pyspark.sql.functions import col, count, count_if
@dp.table
def users():
return (
spark.read.table("catalog.schema.wanderbricks_users")
.select("user_id", "email", "name", "user_type")
)
@dp.table
def counts():
return (
spark.read.table("catalog.schema.users")
.withColumn("valid_email", col("email").isNotNull())
.groupBy("user_type")
.agg(
count("user_id").alias("total_count"),
count_if("valid_email").alias("count_valid_emails")
)
)
Testes :
Esses testes validam contagens de linhas, estrutura de esquema, tratamento de nulos e lógica de agregação, criando dados de usuário simulados com nulos intencionais e executando o pipeline isoladamente.
import pytest
from pyspark.pipelines.testing import TestPipeline, test_spark
from pyspark.testing import assertDataFrameEqual
test_pipeline = TestPipeline.active()
# Mock data fixture
def mock_users(session):
session.sql("""
CREATE TABLE catalog.schema.wanderbricks_users AS
SELECT * FROM VALUES
(1, 'alice@example.com', 'Alice', 'admin'),
(2, NULL, 'Bob', 'user'),
(3, 'charlie@example.com', 'Charlie', 'user'),
(4, NULL, 'Dana', 'admin')
AS t(user_id, email, name, user_type)
""")
# Test 1: Row count
def test_users_row_count(test_spark):
mock_users(test_spark)
test_pipeline.run(test_spark, set(["catalog.schema.users"]))
result = test_spark.table("catalog.schema.users")
assert result.count() == 4
# Test 2: Schema validation
def test_users_schema(test_spark):
mock_users(test_spark)
test_pipeline.run(test_spark, set(["catalog.schema.users"]))
result = test_spark.table("catalog.schema.users")
expected_fields = {"user_id", "email", "name", "user_type"}
actual_fields = set(f.name for f in result.schema.fields)
assert expected_fields == actual_fields
# Test 3: Null handling
def test_users_null_handling(test_spark):
mock_users(test_spark)
test_pipeline.run(test_spark, set(["catalog.schema.users"]))
result = test_spark.table("catalog.schema.users")
null_emails = result.filter("email IS NULL").count()
assert null_emails == 2
# Test 4: Aggregation
def test_counts(test_spark):
mock_users(test_spark)
# Run both tables since counts depends on users
test_pipeline.run(test_spark, set(["catalog.schema.users", "catalog.schema.counts"]))
result = test_spark.table("catalog.schema.counts")
# Check counts for each user_type
admin_row = result.filter("user_type = 'admin'").collect()[0]
user_row = result.filter("user_type = 'user'").collect()[0]
assert admin_row["total_count"] == 2
assert admin_row["count_valid_emails"] == 1
assert user_row["total_count"] == 2
assert user_row["count_valid_emails"] == 1
# Test 5: Full DataFrame comparison with assertDataFrameEqual
def test_counts_full_dataframe(test_spark):
mock_users(test_spark)
test_pipeline.run(test_spark, set(["catalog.schema.users", "catalog.schema.counts"]))
result = test_spark.table("catalog.schema.counts")
expected = test_spark.createDataFrame(
[("admin", 2, 1), ("user", 2, 1)],
schema=["user_type", "total_count", "count_valid_emails"]
)
assertDataFrameEqual(result, expected)
Exemplo 2: Teste do Auto CDC
Objetivo: validar que o Auto CDC processa corretamente o feed de alterações com inserções e atualizações.
Transformação de pipeline :
Esta transformação configura o Auto CDC a partir de um feed de alterações, que lê alterações em transmissão e as aplica à tabela de destino como SCD Tipo 1 (mantém apenas a versão mais recente).
from pyspark import pipelines as dp
from pyspark.sql.functions import col
@dp.view
def users():
return spark.readStream.table("catalog.schema.change_feed")
dp.create_streaming_table("target_autocdc")
dp.create_auto_cdc_flow(
target="target_autocdc",
source="users",
keys=["userId"],
sequence_by=col("ts"),
stored_as_scd_type=1
)
Testes :
O primeiro teste cria um feed de alterações simulado com vários registros para o mesmo userId (simulando uma atualização) e verifica se somente o registro mais recente é retido no destino. O segundo teste simula eventos com chegada tardia e fora de ordem, executando o pipeline, anexando mais eventos ao feed de alterações e executando o pipeline novamente.
import pytest
from pyspark.pipelines.testing import TestPipeline, test_spark
test_pipeline = TestPipeline.active()
# Test 1: Standard inserts and updates
def test_auto_cdc_flow(test_spark):
# Create a mock change feed table
test_spark.sql("""
CREATE TABLE catalog.schema.change_feed AS
SELECT * FROM VALUES
(1, 'Alice', 1000),
(2, 'Bob', 1001),
(1, 'Alice Updated', 1002)
AS t(userId, name, ts)
""")
# Run the pipeline
test_pipeline.run(test_spark, set(["catalog.schema.target_autocdc"]))
# Read the output
result = test_spark.table("catalog.schema.target_autocdc")
# Verify two users exist
user_ids = set(row["userId"] for row in result.collect())
assert user_ids == {1, 2}
# Verify latest record for userId=1 has ts=1002
latest_user1 = result.filter("userId = 1").collect()[0]
assert latest_user1["ts"] == 1002
assert latest_user1["name"] == "Alice Updated"
# Verify userId=2 has ts=1001
user2 = result.filter("userId = 2").collect()[0]
assert user2["ts"] == 1001
# Test 2: Late-arriving and out-of-order events
def test_auto_cdc_late_arriving(test_spark):
# First batch of change events
test_spark.sql("""
CREATE TABLE catalog.schema.change_feed AS
SELECT * FROM VALUES
(1, 'Alice', 1000),
(2, 'Bob', 1001)
AS t(userId, name, ts)
""")
# Run the pipeline with the initial batch
test_pipeline.run(test_spark, set(["catalog.schema.target_autocdc"]))
# Append late-arriving events to the change feed:
# - A newer event for userId=1 (ts=1003) that arrived after the first run
# - A stale event for userId=2 (ts=999) with a timestamp older than what is already applied
test_spark.sql("""
INSERT INTO catalog.schema.change_feed VALUES
(1, 'Alice Updated', 1003),
(2, 'Bob (stale)', 999)
""")
# Re-run the pipeline. sequence_by=ts ensures stale events do not overwrite newer state.
test_pipeline.run(test_spark, set(["catalog.schema.target_autocdc"]))
result = test_spark.table("catalog.schema.target_autocdc")
# userId=1 should reflect the newer late-arriving event
alice = result.filter("userId = 1").collect()[0]
assert alice["ts"] == 1003
assert alice["name"] == "Alice Updated"
# userId=2 should be unchanged: the stale event with an older ts is ignored
bob = result.filter("userId = 2").collect()[0]
assert bob["ts"] == 1001
assert bob["name"] == "Bob"
Exemplo 3: Teste de Auto CDC a partir de Snapshot
Objetivo: Validar que a CDC processa corretamente as alterações do Snapshot, incluindo inserções, atualizações e exclusões.
Transformação de pipeline :
Esta transformação configura o Auto CDC de Snapshot, que lê de uma tabela de Snapshot e rastreia as alterações ao longo do tempo como SCD Tipo 2 (mantém história completa).
from pyspark import pipelines as dp
@dp.view(name="source")
def source():
return spark.read.table("catalog.schema.snapshot")
dp.create_streaming_table("catalog.schema.target")
dp.create_auto_cdc_from_snapshot_flow(
target="target",
source="source",
keys=["userId"],
stored_as_scd_type=2
)
Teste :
Este teste cria um Snapshot inicial, executa o pipeline, então simula uma atualização de Snapshot truncando e inserindo novos dados para verificar se o CDC captura todas as alterações.
import pytest
from pyspark.pipelines.testing import TestPipeline, test_spark
test_pipeline = TestPipeline.active()
def test_auto_cdc_from_snapshot_flow(test_spark):
# Create initial snapshot
test_spark.sql("""
CREATE TABLE catalog.schema.snapshot AS
SELECT * FROM VALUES
(1, 'Alice', '2024-01-01'),
(2, 'Bob', '2024-01-02')
AS t(userId, name, created_at)
""")
# Run the pipeline
test_pipeline.run(test_spark, set(["catalog.schema.target"]))
# Simulate a new snapshot by truncating and inserting updated data
test_spark.sql("TRUNCATE TABLE catalog.schema.snapshot")
test_spark.sql("INSERT INTO catalog.schema.snapshot VALUES (2, 'Bob', '2024-01-03')")
test_pipeline.run(test_spark, set(["catalog.schema.target"]))
# Verify SCD Type 2: should have 3 rows (original Alice, original Bob, updated Bob)
result = test_spark.table("catalog.schema.target")
assert result.count() == 3
user_ids = [row["userId"] for row in result.collect()]
assert set(user_ids) == {1, 2}
Exemplo 4: Testando joins e expectativas
Objetivo: Validar que as junções funcionam corretamente e as expectativas filtram dados inválidos.
Transformação de pipeline :
Esta transformação join imagens de propriedades com comodidades e aplica uma expectativa para filtrar imagens com upload feito antes de janeiro de 2024.
from pyspark import pipelines as dp
@dp.table
@dp.expect_or_drop("uploaded after Jan 2024", "uploaded_at > '2024-01-01'")
def property_images_amenities_join():
return (
spark.read.table("catalog.schema.property_images")
.join(
spark.read.table("catalog.schema.property_amenities"),
on="property_id",
how="inner"
)
)
Testes :
Esses testes verificam que a join produz o número correto de linhas e que a expectativa filtra com sucesso os registros com datas de upload inválidas.
import pytest
from pyspark.pipelines.testing import TestPipeline, test_spark
test_pipeline = TestPipeline.active()
# Mock property datasets
def mock_properties(session):
session.sql("""
CREATE TABLE catalog.schema.property_images AS
SELECT * FROM VALUES
(101, 'img1.jpg', '2024-02-01'),
(102, 'img2.jpg', '2024-01-15'),
(103, 'img3.jpg', '2024-12-20')
AS t(property_id, image_url, uploaded_at)
""")
session.sql("""
CREATE TABLE catalog.schema.property_amenities AS
SELECT * FROM VALUES
(101, 'wifi'),
(102, 'pool'),
(103, 'parking')
AS t(property_id, amenity)
""")
# Test 1: Join
def test_property_join(test_spark):
mock_properties(test_spark)
test_pipeline.run(test_spark, set(["catalog.schema.property_images_amenities_join"]))
result = test_spark.table("catalog.schema.property_images_amenities_join")
# Should have 3 rows after join
assert result.count() == 3
# Check all property_ids are present
property_ids = set(row["property_id"] for row in result.collect())
assert property_ids == {101, 102, 103}
# Test 2: Expectation
def test_property_expectation(test_spark):
mock_properties(test_spark)
# Add a row with uploaded_at before Jan 2024
test_spark.sql("""
INSERT INTO catalog.schema.property_images VALUES (104, 'img4.jpg', '2023-12-31')
""")
# Add a matching row in the amenities table for the join
test_spark.sql("""
INSERT INTO catalog.schema.property_amenities VALUES (104, 'gym')
""")
test_pipeline.run(test_spark, set(["catalog.schema.property_images_amenities_join"]))
result = test_spark.table("catalog.schema.property_images_amenities_join")
# Only property_ids with uploaded_at > '2024-01-01' should be present
valid_ids = set(row["property_id"] for row in result.collect())
assert 104 not in valid_ids
assert valid_ids == {101, 102, 103}