transformação de dados com pipeline
Este artigo descreve como você pode usar o pipeline declarativo LakeFlow para declarar transformações em um conjunto de dados e especificar como os registros são processados por meio da lógica de consulta. Ele também contém exemplos de padrões de transformações comuns para a construção do pipeline declarativo LakeFlow .
Você pode definir um dataset para qualquer consulta que retorne um DataFrame. Você pode usar operações integradas Apache Spark , UDFs, lógica personalizada e modelos MLflow como transformações no pipeline declarativo LakeFlow . Depois que os dados forem ingeridos no seu pipeline, você pode definir um novo conjunto de dados em relação às fontes upstream para criar novas tabelas de transmissão, visualizações materializadas e visualizações.
Para saber como executar efetivamente o processamento com estado com o pipeline declarativo LakeFlow , consulte Otimizar o processamento com estado no pipeline declarativo LakeFlow com marcas d'água.
Quando usar tabelas de visualização, visualização materializada e transmissão
Ao implementar suas consultas pipeline , escolha o melhor tipo de dataset para garantir que elas sejam eficientes e fáceis de manter.
Considere usar uma view para fazer o seguinte:
- Divida uma consulta grande ou complexa que você deseja em consultas mais fáceis de gerenciar.
- Valide resultados intermediários usando expectativas.
- Reduza os custos de armazenamento e compute para obter resultados que você não precisa persistir. Como as tabelas são materializadas, elas exigem recursos adicionais de computação e armazenamento.
Considere usar uma view materializada quando:
- Várias consultas posteriores consomem a tabela. Como as visualizações são computadas sob demanda, view são computadas novamente sempre que são view .
- Outros pipelines, trabalhos ou consultas consomem a tabela. Como as visualizações não são materializadas, você só pode usá-las no mesmo pipeline.
- Você deseja view os resultados de uma consulta durante o desenvolvimento. Como as tabelas são materializadas e podem ser visualizadas e consultadas fora do pipeline, o uso de tabelas durante o desenvolvimento pode ajudar a validar a correção dos cálculos. Após a validação, converta as consultas que não exigem materialização em exibição.
Considere usar uma tabela de transmissão quando:
- Uma consulta é definida em relação a uma fonte de dados que cresce de forma contínua ou incremental.
- Os resultados da consulta devem ser calculados incrementalmente.
- O pipeline precisa de alta Taxa de transferência e baixa latência.
tabelas de transmissão são sempre definidas em relação às fontes de transmissão. Você também pode usar fontes de transmissão com AUTO CDC ... INTO
para aplicar atualizações de feeds CDC . Consulte APIs AUTO CDC : Simplifique a captura de dados de alterações (CDC) com pipeline declarativo LakeFlow.
Excluir tabelas do esquema de destino
Se você precisar calcular tabelas intermediárias não destinadas ao consumo externo, poderá impedir que elas sejam publicadas em um esquema usando a palavra-chave TEMPORARY
. As tabelas temporárias ainda armazenam e processam dados de acordo com a semântica do pipeline declarativo LakeFlow , mas não devem ser acessadas fora do pipeline atual. Uma tabela temporária persiste durante o tempo de vida do pipeline que a cria. Use a seguinte sintaxe para declarar tabelas temporárias:
- SQL
- Python
CREATE TEMPORARY STREAMING TABLE temp_table
AS SELECT ... ;
@dp.table(
temporary=True)
def temp_table():
return ("...")
Combine tabelas de transmissão e visualização materializada em um único pipeline
As tabelas de transmissão herdam as garantias de processamento do Apache Spark e são configuradas para processar consultas somente de anexação, onde novas linhas são sempre inseridas na tabela de origem em vez de modificadas.
Embora, por default, as tabelas de transmissão exijam somente anexação de fonte de dados, quando uma fonte de transmissão é outra tabela de transmissão que requer atualizações ou exclusões, você pode substituir esse comportamento com o sinalizador skipChangeCommits
Um padrão de transmissão comum envolve a ingestão de dados de origem para criar o conjunto de dados inicial em um pipeline. Esses conjuntos de dados iniciais são comumente chamados de tabelas de bronze e geralmente realizam transformações simples.
Por outro lado, as tabelas finais em um pipeline, comumente chamadas de tabelas ouro, geralmente exigem agregações complicadas ou leituras de alvos de uma AUTO CDC ... INTO
operação. Como essas operações criam inerentemente atualizações em vez de acréscimos, elas não são suportadas como entradas para tabelas de transmissão. Essas transformações são mais adequadas para visualização materializada.
Ao misturar tabelas de transmissão e visualizações materializadas em um único pipeline, você pode simplificar seu pipeline, evitar a custosa reingestão ou reprocessamento de dados brutos e ter todo o poder do SQL para compute agregações complexas em um dataset eficientemente codificado e filtrado. O exemplo a seguir ilustra esse tipo de processamento misto:
Esses exemplos usam Auto Loader para carregar arquivos do armazenamento cloud . Para carregar arquivos com o Auto Loader em um pipeline habilitado para o Unity Catalog, você deve usar locais externos. Para saber mais sobre como usar Unity Catalog com o pipeline declarativo LakeFlow , consulte Usar Unity Catalog com seu pipeline declarativo LakeFlow.
- Python
- SQL
@dp.table
def streaming_bronze():
return (
# Since this is a streaming source, this table is incremental.
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("gs://path/to/raw/data")
)
@dp.table
def streaming_silver():
# Since we read the bronze table as a stream, this silver table is also
# updated incrementally.
return spark.readStream.table("streaming_bronze").where(...)
@dp.materialized_view
def live_gold():
# This table will be recomputed completely by reading the whole silver table
# when it is updated.
return spark.read.table("streaming_silver").groupBy("user_id").count()
CREATE OR REFRESH STREAMING TABLE streaming_bronze
AS SELECT * FROM STREAM read_files(
"gs://path/to/raw/data",
format => "json"
)
CREATE OR REFRESH STREAMING TABLE streaming_silver
AS SELECT * FROM STREAM(streaming_bronze) WHERE...
CREATE OR REFRESH MATERIALIZED VIEW live_gold
AS SELECT count(*) FROM streaming_silver GROUP BY user_id
Saiba mais sobre como usar o Auto Loader para ingerir incrementalmente arquivos JSON do Google Cloud Storage.
transmissão-estática
transmissão-static join é uma boa escolha ao desnormalizar uma transmissão contínua de dados somente de acréscimo com uma tabela de dimensão principalmente estática.
Com cada atualização pipeline , novos registros da transmissão são unidos ao Snapshot mais atual da tabela estática. Se registros forem adicionados ou atualizados na tabela estática após os dados correspondentes da tabela de transmissão terem sido processados, os registros resultantes não serão recalculados, a menos que uma refresh completa seja executada.
No pipeline configurado para execução disparada, a tabela estática retorna resultados a partir do momento em que a atualização é iniciada. No pipeline configurado para execução contínua, a versão mais recente da tabela estática é consultada sempre que a tabela processa uma atualização.
A seguir está um exemplo de uma join transmissão-estática:
- Python
- SQL
@dp.table
def customer_sales():
return spark.readStream.table("sales").join(spark.read.table("customers"), ["customer_id"], "left")
CREATE OR REFRESH STREAMING TABLE customer_sales
AS SELECT * FROM STREAM(sales)
INNER JOIN LEFT customers USING (customer_id)
Calcular agregados de forma eficiente
Você pode usar tabelas de transmissão para calcular incrementalmente agregados distributivos simples, como contagem, mínimo, máximo ou soma, e agregados algébricos, como média ou desvio padrão. O Databricks recomenda agregação incremental para consultas com um número limitado de grupos, como uma consulta com uma cláusula GROUP BY country
. Somente novos dados de entrada são lidos a cada atualização.
Para saber mais sobre como escrever consultas de pipeline declarativas LakeFlow que executam agregações incrementais, consulte Executar agregações em janelas com marcas d'água.
Use modelos MLflow no pipeline declarativo LakeFlow
Para usar modelos MLflow em um pipeline habilitado para Unity Catalog, seu pipeline deve ser configurado para usar o canal preview
. Para usar o canal current
, você deve configurar seu pipeline para publicar no Hive metastore.
Você pode usar modelos treinados pelo MLflow no pipeline declarativo LakeFlow . Os modelos do MLflow são tratados como transformações no Databricks, o que significa que eles agem em uma entrada do Spark DataFrame e retornam resultados como um Spark DataFrame. Como o pipeline declarativo LakeFlow define o conjunto de dados em relação DataFrames, você pode converter cargas de trabalho Apache Spark que usam MLflow para o pipeline declarativo LakeFlow com apenas algumas linhas de código. Para mais informações sobre o MLflow, consulte MLflow para ciclo de vida do modelo ML.
Se você já tiver um script Python chamando um modelo MLflow , poderá adaptar esse código ao pipeline declarativo LakeFlow usando o decorador @dp.table
ou @dp.materialized_view
e garantindo que as funções sejam definidas para retornar resultados de transformações. O pipeline declarativo LakeFlow não instala MLflow por default, então confirme se você instalou a biblioteca do MLFlow com %pip install mlflow
e importou mlflow
e dp
no topo do seu código-fonte. Para uma introdução à sintaxe do pipeline declarativo LakeFlow , consulte Desenvolver código pipeline com Python.
Para usar modelos MLflow no pipeline declarativo LakeFlow , siga os seguintes passos:
- Obtenha o ID de execução e o nome do modelo do MLflow. O ID de execução e o nome do modelo são usados para construir o URI do modelo MLflow.
- Use o URI para definir um Spark UDF para carregar o modelo MLflow.
- Chame a UDF nas definições da sua tabela para usar o modelo MLflow.
O exemplo a seguir mostra a sintaxe básica deste padrão:
%pip install mlflow
from pyspark import pipelines as dp
import mlflow
run_id= "<mlflow-run-id>"
model_name = "<the-model-name-in-run>"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)
@dp.materialized_view
def model_predictions():
return spark.read.table(<input-data>)
.withColumn("prediction", loaded_model_udf(<model-features>))
Como um exemplo completo, o código a seguir define uma UDF Spark chamada loaded_model_udf
que carrega um modelo MLflow treinado em dados de risco de empréstimo. As colunas de dados usadas para fazer a previsão são passadas como um argumento para o UDF. A tabela loan_risk_predictions
calcula previsões para cada linha em loan_risk_input_data
.
%pip install mlflow
from pyspark import pipelines as dp
import mlflow
from pyspark.sql.functions import struct
run_id = "mlflow_run_id"
model_name = "the_model_name_in_run"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)
categoricals = ["term", "home_ownership", "purpose",
"addr_state","verification_status","application_type"]
numerics = ["loan_amnt", "emp_length", "annual_inc", "dti", "delinq_2yrs",
"revol_util", "total_acc", "credit_length_in_years"]
features = categoricals + numerics
@dp.materialized_view(
comment="GBT ML predictions of loan risk",
table_properties={
"quality": "gold"
}
)
def loan_risk_predictions():
return spark.read.table("loan_risk_input_data")
.withColumn('predictions', loaded_model_udf(struct(features)))
Manter exclusões ou atualizações manuais
O pipeline declarativo LakeFlow permite que você exclua ou atualize manualmente registros de uma tabela e faça operações de refresh para recalcular tabelas posteriores.
Por default, o pipeline declarativo LakeFlow recalcula os resultados da tabela com base nos dados de entrada sempre que um pipeline é atualizado. Portanto, você deve garantir que o registro excluído não seja recarregado a partir dos dados de origem. Definir a propriedade da tabela pipelines.reset.allowed
como false
impede a atualização de uma tabela, mas não impede gravações incrementais nas tabelas ou que novos dados fluam para a tabela.
O diagrama a seguir ilustra um exemplo usando duas tabelas de transmissão:
raw_user_table
ingere dados brutos do usuário de uma fonte.bmi_table
calcular incrementalmente as pontuações do IMC usando peso e altura deraw_user_table
.
Você deseja excluir ou atualizar manualmente os registros do usuário do raw_user_table
e recalcular o bmi_table
.
O código a seguir demonstra a configuração da propriedade da tabela pipelines.reset.allowed
como false
para desabilitar refresh completa para raw_user_table
, de modo que as alterações pretendidas sejam mantidas ao longo do tempo, mas as tabelas posteriores sejam recalculadas quando uma atualização pipeline for executada:
CREATE OR REFRESH STREAMING TABLE raw_user_table
TBLPROPERTIES(pipelines.reset.allowed = false)
AS SELECT * FROM STREAM read_files("/databricks-datasets/iot-stream/data-user", format => "csv");
CREATE OR REFRESH STREAMING TABLE bmi_table
AS SELECT userid, (weight/2.2) / pow(height*0.0254,2) AS bmi FROM STREAM(raw_user_table);