Desenvolver código de pipeline com Python
O pipeline declarativo LakeFlow introduz várias novas construções de código Python para definir tabelas de visualização materializada e transmissão no pipeline. O suporte Python para desenvolvimento de pipeline se baseia nos conceitos básicos do PySpark DataFrame e APIs de transmissão estruturada.
Para usuários não familiarizados com Python e DataFrames, o Databricks recomenda usar a interface SQL. Veja Desenvolver código de pipeline com SQL.
Para uma referência completa da sintaxe do pipeline declarativo LakeFlow Python , consulte Referência da linguagem Python do pipeline declarativoLakeFlow.
Noções básicas de Python para desenvolvimento de pipeline
O código Python que cria o conjunto de dados do pipeline declarativo LakeFlow deve retornar DataFrames.
Todas APIs Python do pipeline declarativo LakeFlow são implementadas no módulo pyspark.pipelines
. O código do pipeline declarativo LakeFlow implementado com Python deve importar explicitamente o módulo pipelines
no topo do código-fonte Python . Em nossos exemplos, usamos o seguinte comando de importação e usamos dp
em exemplos para nos referir a pipelines
.
from pyspark import pipelines as dp
A versão pública e de código aberto do pyspark
também inclui o módulo pipelines
. Grande parte do código é compatível com a versão usada no Databricks. Entretanto, há alguns recursos na versão Databricks do pipelines
que não são compatíveis com o OSS pyspark
. Os seguintes recursos não são compatíveis:
dp.create_auto_cdc_flow
dp.create_auto_cdc_from_snapshot_flow
@dp.expect(...)
@dp.temporary_view
Lê e grava default no catálogo e esquema especificados durante a configuração pipeline . Consulte Definir o catálogo de destino e o esquema.
O código Python específico do pipeline declarativo LakeFlow difere de outros tipos de código Python em uma maneira crítica: o código pipeline Python não chama diretamente as funções que executam a extração de dados e transformações para criar o conjunto de dados do pipeline declarativo LakeFlow . Em vez disso, o pipeline declarativo LakeFlow interpreta as funções do decorador do módulo dp
em todos os arquivos de código-fonte configurados em um pipeline e cria um gráfico de fluxo de dados.
Para evitar comportamento inesperado durante a execução do seu pipeline , não inclua código que possa ter efeitos colaterais nas suas funções que definem o conjunto de dados. Para saber mais, consulte a referência do Python.
Crie uma view materializada ou tabela de transmissão com Python
Use @dp.table
para criar uma tabela de transmissão a partir dos resultados de uma leitura de transmissão. Use @dp.materialized_view
para criar uma view materializada a partir dos resultados de uma leitura de lotes.
Por default, os nomes das tabelas de view materializada e transmissão são inferidos a partir dos nomes das funções. O exemplo de código a seguir mostra a sintaxe básica para criar uma view materializada e uma tabela de transmissão:
Ambas as funções fazem referência à mesma tabela no catálogo samples
e usam a mesma função de decorador. Esses exemplos destacam que a única diferença na sintaxe básica para tabelas de visualização materializada e transmissão é usar spark.read
versus spark.readStream
.
Nem todas as fontes de dados suportam leituras de transmissão. Algumas fontes de dados devem sempre ser processadas com semântica de transmissão.
from pyspark import pipelines as dp
@dp.materialized_view()
def basic_mv():
return spark.read.table("samples.nyctaxi.trips")
@dp.table()
def basic_st():
return spark.readStream.table("samples.nyctaxi.trips")
Opcionalmente, você pode especificar o nome da tabela usando o argumento name
no decorador @dp.table
. O exemplo a seguir demonstra esse padrão para uma view materializada e uma tabela de transmissão:
from pyspark import pipelines as dp
@dp.materialized_view(name = "trips_mv")
def basic_mv():
return spark.read.table("samples.nyctaxi.trips")
@dp.table(name = "trips_st")
def basic_st():
return spark.readStream.table("samples.nyctaxi.trips")
Carregar dados do armazenamento de objetos
O pipeline declarativo LakeFlow suporta o carregamento de dados de todos os formatos suportados pelo Databricks. Veja Opções de formato de dados.
Esses exemplos usam dados disponíveis em /databricks-datasets
montados automaticamente em seu workspace. Databricks recomenda usar caminhos de volume ou URIs cloud para referenciar dados armazenados no armazenamento de objetos cloud . Veja O que são volumes Unity Catalog ?.
Databricks recomenda o uso Auto Loader e das tabelas de transmissão ao configurar cargas de trabalho de ingestão incremental em dados armazenados no armazenamento de objetos cloud . Veja O que é o Auto Loader?.
O exemplo a seguir cria uma tabela de transmissão a partir de arquivos JSON usando Auto Loader:
from pyspark import pipelines as dp
@dp.table()
def ingestion_st():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders")
)
O exemplo a seguir usa a semântica de lotes para ler um diretório JSON e criar uma view materializada:
from pyspark import pipelines as dp
@dp.materialized_view()
def batch_mv():
return spark.read.format("json").load("/databricks-datasets/retail-org/sales_orders")
Validar dados com expectativas
Você pode usar expectativas para definir e impor restrições de qualidade de dados. Veja gerenciar a qualidade dos dados com expectativas pipeline.
O código a seguir usa @dp.expect_or_drop
para definir uma expectativa chamada valid_data
que descarta registros nulos durante a ingestão de dados:
from pyspark import pipelines as dp
@dp.table()
@dp.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders_valid():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders")
)
Consultar tabelas de visualização materializada e transmissão definidas em seu pipeline
O exemplo a seguir define quatro conjuntos de dados:
- Uma tabela de transmissão chamada
orders
que carrega dados JSON . - Uma view materializada chamada
customers
que carrega dados CSV . - Uma view materializada chamada
customer_orders
que une registros do conjunto de dadosorders
ecustomers
, converte o registro de data e hora do pedido em uma data e seleciona os camposcustomer_id
,order_number
,state
eorder_date
. - Uma view materializada chamada
daily_orders_by_state
que agrega a contagem diária de pedidos para cada estado.
Ao consultar visualizações ou tabelas em seu pipeline, você pode especificar o catálogo e o esquema diretamente ou pode usar o padrão configurado em seu pipeline. Neste exemplo, as tabelas orders
, customers
e customer_orders
são gravadas e lidas do catálogo e esquema default configurados para seu pipeline.
O modo de publicação legado usa o esquema LIVE
para consultar outras tabelas de exibição materializada e transmissão definidas no seu pipeline. No novo pipeline, a sintaxe do esquema LIVE
é silenciosamente ignorada. Veja o esquema LIVE (legado).
from pyspark import pipelines as dp
from pyspark.sql.functions import col
@dp.table()
@dp.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders")
)
@dp.materialized_view()
def customers():
return spark.read.format("csv").option("header", True).load("/databricks-datasets/retail-org/customers")
@dp.materialized_view()
def customer_orders():
return (spark.read.table("orders")
.join(spark.read.table("customers"), "customer_id")
.select("customer_id",
"order_number",
"state",
col("order_datetime").cast("int").cast("timestamp").cast("date").alias("order_date"),
)
)
@dp.materialized_view()
def daily_orders_by_state():
return (spark.read.table("customer_orders")
.groupBy("state", "order_date")
.count().withColumnRenamed("count", "order_count")
)
Crie tabelas em um loop for
Você pode usar loops Python for
para criar várias tabelas programaticamente. Isso pode ser útil quando você tem muitas fontes de dados ou conjuntos de dados de destino que variam apenas em alguns parâmetros, resultando em menos código total para manter e menos redundância de código.
O loop for
avalia a lógica em ordem serial, mas assim que o planejamento do conjunto de dados estiver concluído, o pipeline executa a lógica em paralelo.
Ao usar esse padrão para definir o conjunto de dados, certifique-se de que a lista de valores passada para o loop for
seja sempre aditiva. Se um dataset definido anteriormente em um pipeline for omitido de uma execução futura pipeline , esse dataset será descartado automaticamente do esquema de destino.
O exemplo a seguir cria cinco tabelas que filtram pedidos de clientes por região. Aqui, o nome da região é usado para definir o nome da visualização materializada de destino e para filtrar os dados de origem. As visualizações temporárias são usadas para definir junções das tabelas de origem usadas na construção da visualização materializada final.
from pyspark import pipelines as dp
from pyspark.sql.functions import collect_list, col
@dp.temporary_view()
def customer_orders():
orders = spark.read.table("samples.tpch.orders")
customer = spark.read.table("samples.tpch.customer")
return (orders.join(customer, orders.o_custkey == customer.c_custkey)
.select(
col("c_custkey").alias("custkey"),
col("c_name").alias("name"),
col("c_nationkey").alias("nationkey"),
col("c_phone").alias("phone"),
col("o_orderkey").alias("orderkey"),
col("o_orderstatus").alias("orderstatus"),
col("o_totalprice").alias("totalprice"),
col("o_orderdate").alias("orderdate"))
)
@dp.temporary_view()
def nation_region():
nation = spark.read.table("samples.tpch.nation")
region = spark.read.table("samples.tpch.region")
return (nation.join(region, nation.n_regionkey == region.r_regionkey)
.select(
col("n_name").alias("nation"),
col("r_name").alias("region"),
col("n_nationkey").alias("nationkey")
)
)
# Extract region names from region table
region_list = spark.read.table("samples.tpch.region").select(collect_list("r_name")).collect()[0][0]
# Iterate through region names to create new region-specific materialized views
for region in region_list:
@dp.materialized_view(name=f"{region.lower().replace(' ', '_')}_customer_orders")
def regional_customer_orders(region_filter=region):
customer_orders = spark.read.table("customer_orders")
nation_region = spark.read.table("nation_region")
return (customer_orders.join(nation_region, customer_orders.nationkey == nation_region.nationkey)
.select(
col("custkey"),
col("name"),
col("phone"),
col("nation"),
col("region"),
col("orderkey"),
col("orderstatus"),
col("totalprice"),
col("orderdate")
).filter(f"region = '{region_filter}'")
)
A seguir está um exemplo do gráfico de fluxo de dados para este pipeline:
Solução de problemas: o loop for
cria muitas tabelas com os mesmos valores
O modelo de execução lenta que o pipeline usa para avaliar o código Python requer que sua lógica faça referência direta a valores individuais quando a função decorada por @dp.materialized_view()
for invocada.
O exemplo a seguir demonstra duas abordagens corretas para definir tabelas com um loop for
. Em ambos os exemplos, cada nome de tabela da lista tables
é explicitamente referenciado dentro da função decorada por @dp.materialized_view()
.
from pyspark import pipelines as dp
# Create a parent function to set local variables
def create_table(table_name):
@dp.materialized_view(name=table_name)
def t():
return spark.read.table(table_name)
tables = ["t1", "t2", "t3"]
for t_name in tables:
create_table(t_name)
# Call `@dp.materialized_view()` within a for loop and pass values as variables
tables = ["t1", "t2", "t3"]
for t_name in tables:
@dp.materialized_view(name=t_name)
def create_table(table_name=t_name):
return spark.read.table(table_name)
O exemplo a seguir não referencia valores corretamente. Este exemplo cria tabelas com nomes distintos, mas todas as tabelas carregam dados do último valor no loop for
:
from pyspark import pipelines as dp
# Don't do this!
tables = ["t1", "t2", "t3"]
for t_name in tables:
@dp.materialized(name=t_name)
def create_table():
return spark.read.table(t_name)
Excluir permanentemente registros de uma view materializada ou tabela de transmissão
Para excluir permanentemente registros de uma view materializada ou tabela de transmissão com vetores de exclusão habilitados, como para compliance GDPR , operações adicionais devem ser executadas nas tabelas Delta subjacentes do objeto. Para garantir a exclusão de registros de uma view materializada, consulte Excluir registros permanentemente de uma view materializada com vetores de exclusão habilitados. Para garantir a exclusão de registros de uma tabela de transmissão, consulte Excluir registros permanentemente de uma tabela de transmissão.