transformação de dados com Delta Live Tables

Este artigo descreve como você pode usar Delta Live Tables para declarar transformações no dataset e especificar como os registros são processados por meio da lógica query . Ele também contém alguns exemplos de padrões comuns de transformação que podem ser úteis ao criar pipelines Delta Live Tables.

Você pode definir um dataset em qualquer query que retorne um DataFrame. Você pode usar operações integradas Apache Spark, UDFs, lógica personalizada e modelos MLflow como transformações em seu pipeline Delta Live Tables. Uma vez que os dados tenham sido ingeridos em seu pipeline Delta Live Tables, você pode definir um novo dataset contra fontes upstream para criar novas tabelas transmitidas, view materializada e view.

Para saber como executar com eficácia o processamento com estado com Delta Live Tables, consulte Otimizar o processamento com estado em Delta Live Tables com marcas d'água.

Quando usar visualizações, visualizações materializadas e tabelas transmitidas

Para garantir que seus pipelines sejam eficientes e sustentáveis, escolha o melhor tipo dataset ao implementar sua query de pipeline.

Considere o uso de uma view quando:

  • Você tem uma query grande ou complexa que deseja dividir em query mais fácil de gerenciar .

  • Você deseja validar resultados intermediários usando expectativas.

  • Você deseja reduzir os custos de armazenamento e compute e não requer a materialização dos resultados query . Como as tabelas são materializadas, elas requerem recursos adicionais de computação e armazenamento.

Considere o uso de uma view materializada quando:

  • Várias query downstream consomem a tabela. Como view são compute sob demanda, a view écompute toda vez que a view é query.

  • Outros pipelines, Job ou query consomem a tabela. Como view não são materializadas, você só pode usá-las no mesmo pipeline.

  • Você deseja view os resultados de uma query durante o desenvolvimento. Como as tabelas são materializadas e podem ser view e query fora do pipeline, o uso de tabelas durante o desenvolvimento pode ajudar a validar a exatidão dos cálculos. Após a validação, converta query que não requer materialização em view.

Considere usar uma mesa de transmissão quando:

  • Uma query é definida em uma fonte de dados que cresce continuamente ou de forma incremental.

  • os resultados query devem ser compute de forma incremental.

  • Taxa de transferência alta e baixa latência são desejadas para o pipeline.

Observação

tabelas de transmissão são sempre definidas em relação às fontes de transmissão. Você também pode usar fontes de transmissão com APPLY CHANGES INTO para aplicar atualizações de feeds do CDC. Consulte Captura simplificada de dados de alterações (CDC) com a API APPLY CHANGES em Delta Live Tables.

Combine tabelas de transmissão e visualizações materializadas em um único pipeline

tabelas de transmissão herdam as garantias de processamento do Apache Spark transmissão estruturada e são configuradas para processar query de fontes de dados anexadas somente, onde novas linhas são sempre inseridas na tabela de origem em vez de modificadas.

Observação

Embora, por default, as tabelas de transmissão requeiram fonte de dados apenas anexada, quando uma fonte de transmissão é outra tabela de transmissão que requer atualizações ou exclusões, você pode substituir esse comportamento com o sinalizador skipChangeCommits.

Um padrão de transmissão comum inclui a ingestão de dados de origem para criar o dataset inicial em um pipeline. Esses dataset iniciais são comumente chamados de tabelas de bronze e geralmente executam transformações simples.

Por outro lado, as tabelas finais em um pipeline, geralmente chamadas de tabelas de ouro , geralmente exigem agregações complicadas ou leitura de fontes que são os destinos de uma operação APPLY CHANGES INTO . Como essas operações criam inerentemente atualizações em vez de acréscimos, elas não são suportadas como entradas para tabelas transmitidas. Essas transformações são mais adequadas para view materializada.

Ao misturar tabelas transmitidas e view materializada em um único pipeline, você pode simplificar seu pipeline, evitar a reingestão ou reprocessamento dispendioso de dados brutos e ter todo o poder do SQL para compute agregações complexas em um dataset codificado e filtrado com eficiência. O exemplo a seguir ilustra esse tipo de processamento misto:

Observação

Estes exemplos usam o Auto Loader para carregar arquivos do armazenamento clouds . Para carregar arquivos com o Auto Loader em um pipeline habilitado para Unity Catalog, você deve usar locais externos. Para saber mais sobre como usar o Unity Catalog com Delta Live Tables, consulte Usar o Unity Catalog com seu pipelineDelta Live Tables.

@dlt.table
def streaming_bronze():
  return (
    # Since this is a streaming source, this table is incremental.
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("s3://path/to/raw/data")
  )

@dlt.table
def streaming_silver():
  # Since we read the bronze table as a stream, this silver table is also
  # updated incrementally.
  return dlt.read_stream("streaming_bronze").where(...)

@dlt.table
def live_gold():
  # This table will be recomputed completely by reading the whole silver table
  # when it is updated.
  return dlt.read("streaming_silver").groupBy("user_id").count()
CREATE OR REFRESH STREAMING TABLE streaming_bronze
AS SELECT * FROM cloud_files(
  "s3://path/to/raw/data", "json"
)

CREATE OR REFRESH STREAMING TABLE streaming_silver
AS SELECT * FROM STREAM(LIVE.streaming_bronze) WHERE...

CREATE OR REFRESH LIVE TABLE live_gold
AS SELECT count(*) FROM LIVE.streaming_silver GROUP BY user_id

Saiba mais sobre como usar o Auto Loader para ler com eficiência arquivos JSON do S3 para processamento incremental.

junções estáticas de transmissão

join estática de transmissão é uma boa escolha ao desnormalizar uma transmissão contínua de dados apenas anexados com uma tabela de dimensão principalmente estática.

A cada atualização do pipeline, novos registros da transmissão são agregados ao Snapshot mais atual da tabela estática. Se os registros forem adicionados ou atualizados na tabela estática após o processamento dos dados correspondentes da tabela de transmissão, os registros resultantes não serão recalculados, a menos que uma refresh completa seja executada.

Em pipelines configurados para execução acionada, a tabela estática retorna resultados a partir do momento em que a atualização começa. Em pipelines configurados para execução contínua, cada vez que a tabela processa uma atualização, a versão mais recente da tabela estática é query.

Veja a seguir um exemplo de join estática transmitida:

@dlt.table
def customer_sales():
  return dlt.read_stream("sales").join(dlt.read("customers"), ["customer_id"], "left")
CREATE OR REFRESH STREAMING TABLE customer_sales
AS SELECT * FROM STREAM(LIVE.sales)
  INNER JOIN LEFT LIVE.customers USING (customer_id)

Calcule agregados de forma eficiente

Você pode usar tabelas transmitidas para calcular incrementalmente agregados distributivos simples, como contagem, mínimo, máximo ou soma, e agregados algébricos, como média ou desvio padrão. Databricks recomenda agregação incremental para query com um número limitado de grupos, por exemplo, uma query com uma cláusula GROUP BY country. Apenas novos dados de entrada são lidos com cada atualização.

Use modelos MLflow em um pipeline Delta Live Tables

Observação

Para usar modelos MLflow em um pipeline habilitado para o Unity Catalog, seu pipeline deve ser configurado para usar o canal preview . Para usar o canal current, você deve configurar seu pipeline para publicar no Hive metastore.

Você pode usar modelos treinados em MLflow no pipeline Delta Live Tables. Os modelos MLflow são tratados como transformações em Databricks, o que significa que atuam sobre uma entrada Spark DataFrame e retornam resultados como um Spark DataFrame. Como o Delta Live Tables define o conjunto de dados em relação aos DataFrames, você pode converter cargas de trabalho do Apache Spark que aproveitam o MLflow em Delta Live Tables com apenas algumas linhas de código. Para obter mais informações sobre MLflow, consulte Gerenciamento do ciclo de vida de ML usando MLflow.

Se o senhor já tiver um Python Notebook chamando um modelo MLflow, poderá adaptar esse código para Delta Live Tables usando o decorador @dlt.table e garantindo que as funções sejam definidas para retornar os resultados das transformações. Delta Live Tables não instala o MLflow pelo default, portanto, certifique-se de %pip install mlflow e importar mlflow e dlt na parte superior do seu Notebook. Para obter uma introdução à sintaxe do Delta Live Tables, consulte Exemplo: Ingerir e processar dados de nomes de bebês de Nova York.

Para usar modelos MLflow em Delta Live Tables, conclua as passos a seguir:

  1. Obtenha o ID de execução e o nome do modelo do modelo MLflow. O ID de execução e o nome do modelo são usados para construir o URI do modelo MLflow.

  2. Use o URI para definir um Spark UDF para carregar o modelo MLflow.

  3. Chame o UDF em suas definições de tabela para usar o modelo MLflow.

O exemplo a seguir mostra a sintaxe básica desse padrão:

%pip install mlflow

import dlt
import mlflow

run_id= "<mlflow-run-id>"
model_name = "<the-model-name-in-run>"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)

@dlt.table
def model_predictions():
  return dlt.read(<input-data>)
    .withColumn("prediction", loaded_model_udf(<model-features>))

Como um exemplo completo, o código a seguir define uma UDF Spark chamada loaded_model_udf que carrega um modelo MLflow treinado em dados de risco de empréstimo. As colunas de dados usadas para fazer a previsão são passadas como um argumento para o UDF. A tabela loan_risk_predictions calcula previsões para cada linha em loan_risk_input_data.

%pip install mlflow

import dlt
import mlflow
from pyspark.sql.functions import struct

run_id = "mlflow_run_id"
model_name = "the_model_name_in_run"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)

categoricals = ["term", "home_ownership", "purpose",
  "addr_state","verification_status","application_type"]

numerics = ["loan_amnt", "emp_length", "annual_inc", "dti", "delinq_2yrs",
  "revol_util", "total_acc", "credit_length_in_years"]

features = categoricals + numerics

@dlt.table(
  comment="GBT ML predictions of loan risk",
  table_properties={
    "quality": "gold"
  }
)
def loan_risk_predictions():
  return dlt.read("loan_risk_input_data")
    .withColumn('predictions', loaded_model_udf(struct(features)))

Manter exclusões ou atualizações manuais

Delta Live Tables permite que você exclua ou atualize manualmente os registros de uma tabela e faça uma operação refresh para recalcular as tabelas downstream.

Por default, o Delta Live Tables recalcula os resultados da tabela com base nos dados de entrada sempre que um pipeline é atualizado, portanto, você deve garantir que o registro excluído não seja recarregado dos dados de origem. Definir a propriedade da tabela pipelines.reset.allowed como false impede refresh de uma tabela, mas não impede gravações incrementais nas tabelas ou impede que novos dados fluam para a tabela.

O diagrama a seguir ilustra um exemplo usando duas tabelas transmitidas:

  • raw_user_table ingere dados brutos do usuário de uma fonte.

  • bmi_table compute de forma incremental as pontuações de IMC usando peso e altura de raw_user_table.

Você deseja excluir ou atualizar manualmente os registros do usuário do raw_user_table e recalcular o bmi_table.

Reter diagrama de dados

O código a seguir demonstra a configuração da propriedade da tabela pipelines.reset.allowed como false para desabilitar refresh completa para raw_user_table para que as alterações pretendidas sejam retidas ao longo do tempo, mas as tabelas downstream são recalculadas quando uma atualização de pipeline é executada:

CREATE OR REFRESH STREAMING TABLE raw_user_table
TBLPROPERTIES(pipelines.reset.allowed = false)
AS SELECT * FROM cloud_files("/databricks-datasets/iot-stream/data-user", "csv");

CREATE OR REFRESH STREAMING TABLE bmi_table
AS SELECT userid, (weight/2.2) / pow(height*0.0254,2) AS bmi FROM STREAM(LIVE.raw_user_table);