transformação de dados com pipeline
Este artigo descreve como o senhor pode usar a DLT para declarar transformações no conjunto de dados e especificar como os registros são processados por meio da lógica de consulta. Ele também contém exemplos de padrões de transformações comuns para a criação de pipeline de DLT.
O senhor pode definir um dataset em relação a qualquer consulta que retorne um DataFrame. O senhor pode usar Apache Spark operações integradas, UDFs, lógica personalizada e MLflow modelos como transformações em seu DLT pipeline. Depois que os dados forem ingeridos em seu DLT pipeline, o senhor poderá definir um novo conjunto de dados em relação às fontes upstream para criar novas tabelas de transmissão, visualizações materializadas e visualizações.
Para saber como realizar com eficiência o processamento com estado com DLT, consulte Otimizar o processamento com estado em DLT com marcas d'água.
Quando usar a visualização, a visualização materializada e as tabelas de transmissão
Ao implementar suas consultas em pipeline, escolha o melhor tipo de dataset para garantir que elas sejam eficientes e passíveis de manutenção.
Considere usar o site view para fazer o seguinte:
- Divida uma consulta grande ou complexa que o senhor deseja em consultas mais fáceis de gerenciar.
- Valide os resultados intermediários usando as expectativas.
- Reduza os custos de armazenamento e compute para resultados que o senhor não precisa manter. Como as tabelas são materializadas, elas exigem recursos adicionais de computação e armazenamento.
Considere a possibilidade de usar um view materializado quando o senhor estiver usando:
- Várias consultas downstream consomem a tabela. Como a visualização é computada sob demanda, o view é recomputado toda vez que o view é consultado.
- Outros pipelines, trabalhos ou consultas consomem a tabela. Como as visualizações não são materializadas, o senhor só pode usá-las no mesmo pipeline.
- O senhor deseja acessar view os resultados de uma consulta durante o desenvolvimento. Como as tabelas são materializadas e podem ser visualizadas e consultadas fora do site pipeline, o uso de tabelas durante o desenvolvimento pode ajudar a validar a exatidão dos cálculos. Após a validação, converta as consultas que não exigem materialização em visualização.
Considere a possibilidade de usar uma mesa de transmissão quando o senhor estiver usando:
- Uma consulta é definida em relação a uma fonte de dados que está crescendo de forma contínua ou incremental.
- Os resultados da consulta devem ser computados de forma incremental.
- O site pipeline precisa de alta taxa de transferência e baixa latência.
As tabelas de transmissão são sempre definidas em relação às fontes de transmissão. O senhor também pode usar fontes de transmissão com APPLY CHANGES INTO
para aplicar atualizações dos feeds do site CDC. Consulte o site APPLY CHANGES APIs: Simplificar a captura de dados de alterações (CDC) com DLT.
Excluir tabelas do esquema de destino
Se você precisar calcular tabelas intermediárias não destinadas ao consumo externo, poderá impedir que elas sejam publicadas em um esquema usando a palavra-chave TEMPORARY
. As tabelas temporárias ainda armazenam e processam dados de acordo com a semântica da DLT, mas não devem ser acessadas fora do pipeline atual. Uma tabela temporária persiste durante o tempo de vida do pipeline que a cria. Use a sintaxe a seguir para declarar tabelas temporárias:
- SQL
- Python
CREATE TEMPORARY STREAMING TABLE temp_table
AS SELECT ... ;
@dlt.table(
temporary=True)
def temp_table():
return ("...")
Combinar tabelas de transmissão e visualização materializada em uma única pipeline
As tabelas de transmissão herdam as garantias de processamento de Apache Spark transmissão estructurada e são configuradas para processar consultas de fontes de dados append-only, em que novas linhas são sempre inseridas na tabela de origem em vez de modificadas.
Embora, pelo site default, as tabelas de transmissão exijam fontes de dados somente anexadas, quando uma fonte de transmissão é outra tabela de transmissão que exige atualizações ou exclusões, o senhor pode substituir esse comportamento com o sinalizador skipChangeCommits.
Um padrão comum de transmissão envolve a ingestão de dados de origem para criar o conjunto de dados inicial em um pipeline. Esses conjuntos de dados iniciais são comumente chamados de tabelas de bronze e geralmente realizam transformações simples.
Por outro lado, as tabelas finais em um pipeline, comumente chamadas de tabelas de ouro, geralmente exigem agregações complicadas ou leitura de alvos de uma APPLY CHANGES INTO
operação. Como essas operações criam inerentemente atualizações em vez de anexações, elas não são compatíveis como entradas para tabelas de transmissão. Essas transformações são mais adequadas para a visualização materializada.
Ao combinar as tabelas de transmissão e a visualização materializada em um único pipeline, o senhor pode simplificar seu pipeline, evitar o caro reingresso ou reprocessamento de dados brutos e ter todo o poder do SQL para compute agregações complexas em um dataset codificado e filtrado com eficiência. O exemplo a seguir ilustra esse tipo de processamento misto:
Esses exemplos usam o Auto Loader para carregar arquivos do armazenamento em nuvem. Para carregar arquivos com o Auto Loader em um pipeline habilitado para o Unity Catalog, o senhor deve usar locais externos. Para saber mais sobre como usar Unity Catalog com DLT, consulte Usar Unity Catalog com seu pipeline DLT.
- Python
- SQL
@dlt.table
def streaming_bronze():
return (
# Since this is a streaming source, this table is incremental.
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("s3://path/to/raw/data")
)
@dlt.table
def streaming_silver():
# Since we read the bronze table as a stream, this silver table is also
# updated incrementally.
return spark.readStream.table("streaming_bronze").where(...)
@dlt.table
def live_gold():
# This table will be recomputed completely by reading the whole silver table
# when it is updated.
return spark.readStream.table("streaming_silver").groupBy("user_id").count()
CREATE OR REFRESH STREAMING TABLE streaming_bronze
AS SELECT * FROM STREAM read_files(
"s3://path/to/raw/data",
format => "json"
)
CREATE OR REFRESH STREAMING TABLE streaming_silver
AS SELECT * FROM STREAM(streaming_bronze) WHERE...
CREATE OR REFRESH MATERIALIZED VIEW mv_gold
AS SELECT count(*) FROM streaming_silver GROUP BY user_id
Saiba mais sobre como usar o Auto Loader para ingerir arquivos JSON de forma incremental a partir do S3.
junção transmissível-estática
A junção transmissão-estática é uma boa opção ao desnormalizar uma transmissão contínua de dados somente de anexos com uma tabela de dimensão principalmente estática.
A cada atualização do site pipeline, os novos registros da transmissão são unidos ao Snapshot mais atual da tabela estática. Se os registros forem adicionados ou atualizados na tabela estática depois que os dados correspondentes da tabela de transmissão tiverem sido processados, os registros resultantes não serão recalculados, a menos que um refresh completo seja executado.
No pipeline configurado para execução acionada, a tabela estática retorna resultados a partir do momento em que a atualização começa. No pipeline configurado para execução contínua, a versão mais recente da tabela estática é consultada sempre que a tabela processa uma atualização.
A seguir, um exemplo de uma transmissão estática join:
- Python
- SQL
@dlt.table
def customer_sales():
return spark.readStream.table("sales").join(spark.readStream.table("customers"), ["customer_id"], "left")
CREATE OR REFRESH STREAMING TABLE customer_sales
AS SELECT * FROM STREAM(sales)
INNER JOIN LEFT customers USING (customer_id)
Calcule agregados de forma eficiente
O senhor pode usar tabelas de transmissão para calcular de forma incremental agregados distributivos simples, como contagem, mínimo, máximo ou soma, e agregados algébricos, como média ou desvio padrão. A Databricks recomenda a agregação incremental para consultas com um número limitado de grupos, como uma consulta com uma cláusula GROUP BY country
. Somente novos dados de entrada são lidos com cada atualização.
Para saber mais sobre como escrever consultas DLT que realizam agregações incrementais, consulte Executar agregações em janelas com marcas d'água.
Usar modelos MLflow em um pipeline DLT
Para usar modelos do MLflow em um pipeline habilitado para o Unity Catalog, seu pipeline deve ser configurado para usar o canal preview
. Para usar o canal current
, o senhor deve configurar o pipeline para publicar no Hive metastore.
O senhor pode usar modelos treinados pelo MLflow no pipeline DLT. Os modelos MLflow são tratados como transformações no Databricks, o que significa que eles agem sobre uma entrada Spark DataFrame e retornam resultados como um Spark DataFrame. Como a DLT define o conjunto de dados em relação a DataFrames, o senhor pode converter as cargas de trabalho de Apache Spark que usam MLflow em DLT com apenas algumas linhas de código. Para saber mais sobre o MLflow, consulte MLflow for gen AI agent e ML model lifecycle.
Se o senhor já tiver um notebook Python chamando um modelo MLflow, poderá adaptar esse código ao DLT usando o decorador @dlt.table
e garantindo que as funções sejam definidas para retornar os resultados das transformações. A DLT não instala MLflow por default, portanto, confirme que o senhor instalou a biblioteca MLFlow com %pip install mlflow
e importou mlflow
e dlt
na parte superior do Notebook. Para obter uma introdução à sintaxe DLT, consulte Desenvolver código de pipeline com Python.
Para usar os modelos MLflow no DLT, conclua as etapas a seguir:
- Obtenha a ID de execução e o nome do modelo do MLflow. A ID de execução e o nome do modelo são usados para construir o URI do modelo MLflow.
- Use o URI para definir um Spark UDF para carregar o modelo MLflow.
- Chame o UDF em suas definições de tabela para usar o modelo MLflow.
O exemplo a seguir mostra a sintaxe básica desse padrão:
%pip install mlflow
import dlt
import mlflow
run_id= "<mlflow-run-id>"
model_name = "<the-model-name-in-run>"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)
@dlt.table
def model_predictions():
return spark.read.table(<input-data>)
.withColumn("prediction", loaded_model_udf(<model-features>))
Como exemplo completo, o código a seguir define um Spark UDF chamado loaded_model_udf
que carrega um modelo MLflow treinado em dados de risco de empréstimo. As colunas de dados usadas para fazer a previsão são passadas como argumento para o UDF. A tabela loan_risk_predictions
calcula as previsões para cada linha em loan_risk_input_data
.
%pip install mlflow
import dlt
import mlflow
from pyspark.sql.functions import struct
run_id = "mlflow_run_id"
model_name = "the_model_name_in_run"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)
categoricals = ["term", "home_ownership", "purpose",
"addr_state","verification_status","application_type"]
numerics = ["loan_amnt", "emp_length", "annual_inc", "dti", "delinq_2yrs",
"revol_util", "total_acc", "credit_length_in_years"]
features = categoricals + numerics
@dlt.table(
comment="GBT ML predictions of loan risk",
table_properties={
"quality": "gold"
}
)
def loan_risk_predictions():
return spark.read.table("loan_risk_input_data")
.withColumn('predictions', loaded_model_udf(struct(features)))
Retenha exclusões ou atualizações manuais
A DLT permite excluir ou atualizar manualmente os registros de uma tabela e fazer uma refresh operação para recomputar tabelas downstream.
Em default, a DLT recomputa os resultados da tabela com base nos dados de entrada sempre que um pipeline é atualizado, portanto, o senhor deve garantir que o registro excluído não seja recarregado a partir dos dados de origem. Definir a propriedade da tabela pipelines.reset.allowed
como false
impede a atualização de uma tabela, mas não impede gravações incrementais nas tabelas ou o fluxo de novos dados para a tabela.
O diagrama a seguir ilustra um exemplo usando duas tabelas de transmissão:
raw_user_table
ingere dados brutos do usuário de uma fonte.bmi_table
calcular de forma incremental as pontuações de IMC usando o peso e a altura deraw_user_table
.
Você deseja excluir ou atualizar manualmente os registros do usuário do raw_user_table
e recalcular o bmi_table
.
O código a seguir demonstra a configuração da propriedade da tabela pipelines.reset.allowed
como false
para desativar o refresh completo para raw_user_table
, de modo que as alterações pretendidas sejam mantidas ao longo do tempo, mas as tabelas downstream sejam recomputadas quando uma atualização do pipeline for executada:
CREATE OR REFRESH STREAMING TABLE raw_user_table
TBLPROPERTIES(pipelines.reset.allowed = false)
AS SELECT * FROM STREAM read_files("/databricks-datasets/iot-stream/data-user", format => "csv");
CREATE OR REFRESH STREAMING TABLE bmi_table
AS SELECT userid, (weight/2.2) / pow(height*0.0254,2) AS bmi FROM STREAM(raw_user_table);