Usar parâmetros com o pipeline Delta Live Tables

Este artigo explica como o senhor pode usar as configurações do Delta Live Tables pipeline para parametrizar o código pipeline.

Parâmetros de referência

Durante as atualizações, o código-fonte do pipeline pode acessar os parâmetros do pipeline usando a sintaxe para obter valores para as configurações do Spark.

O senhor faz referência aos parâmetros do pipeline usando o key. O valor é injetado em seu código-fonte como uma cadeia de caracteres antes da avaliação da lógica do código-fonte.

A sintaxe do exemplo a seguir usa um parâmetro com key source_catalog e valor dev_catalog para especificar a fonte de dados de um site materializado view:

CREATE OR REFRESH MATERIALIZED VIEW transation_summary AS
SELECT account_id,
  COUNT(txn_id) txn_count,
  SUM(txn_amount) account_revenue
FROM ${source_catalog}.sales.transactions_table
GROUP BY account_id
import dlt
from pyspark.sql.functions import col, sum, count

@dlt.table
def transaction_summary():

  source_catalog = spark.conf.get("source_catalog")

  return (spark.read
      .table(f"{source_catalog}.sales.transactions_table")
      .groupBy("account_id")
      .agg(
        count(col("txn_id").alias("txn_count")),
        sum(col("txn_amount").alias("account_revenue"))
      )
    )

Definir parâmetros

Passe parâmetros para o pipeline passando parâmetros arbitrários key-value como configurações para o pipeline. O senhor pode definir parâmetros ao definir ou editar uma configuração pipeline usando a UI workspace ou JSON. Consulte Configurar um pipeline do Delta Live Tables.

A chave de parâmetro do pipeline só pode conter _ - . ou caracteres alfanuméricos. Os valores dos parâmetros são definidos como strings.

Os parâmetros do pipeline não são compatíveis com valores dinâmicos. O senhor deve atualizar o valor associado a um key na configuração do pipeline.

Importante

Não use palavras-chave que entrem em conflito com valores reservados de configuração do pipeline ou do Apache Spark.

Parametrizar declarações de conjuntos de dados em Python ou SQL

Os códigos Python e SQL que definem seu conjunto de dados podem ser parametrizados pelas configurações do pipeline. A parametrização permite os seguintes casos de uso:

  • Separando caminhos longos e outras variáveis do seu código.

  • Reduzir a quantidade de dados processados em ambientes de desenvolvimento ou de teste para acelerar os testes.

  • Reutilizar a mesma lógica de transformações para processar a partir de várias fontes de dados.

O exemplo a seguir usa o valor de configuração startDate para limitar o pipeline de desenvolvimento a um subconjunto dos dados de entrada:

CREATE OR REFRESH MATERIALIZED VIEW customer_events
AS SELECT * FROM sourceTable WHERE date > '${mypipeline.startDate}';
@dlt.table
def customer_events():
  start_date = spark.conf.get("mypipeline.startDate")
  return read("sourceTable").where(col("date") > start_date)
{
  "name": "Data Ingest - DEV",
  "configuration": {
    "mypipeline.startDate": "2021-01-02"
  }
}
{
  "name": "Data Ingest - PROD",
  "configuration": {
    "mypipeline.startDate": "2010-01-02"
  }
}

Controle a fonte de dados com parâmetros

O senhor pode usar os parâmetros do pipeline para especificar diferentes fontes de dados em diferentes configurações do mesmo pipeline.

Por exemplo, o senhor pode especificar caminhos diferentes nas configurações de desenvolvimento, teste e produção para um pipeline usando a variável data_source_path e, em seguida, fazer referência a ela usando o código a seguir:

CREATE STREAMING TABLE bronze
AS (
    SELECT
    *,
    _metadata.file_path AS source_file_path
    FROM read_files( '${data_source_path}', 'csv',
            map("header", "true"))
)
import dlt
from pyspark.sql.functions import col

data_source_path = spark.conf.get("data_source_path")

@dlt.table
def bronze():
    return (spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("header", True)
        .load(data_source_path )
        .select("*", col("_metadata.file_path").alias("source_file_name"))
    )

Esse padrão é benéfico para testar como a lógica de ingestão pode lidar com esquemas ou dados malformados durante a ingestão inicial. O senhor pode usar o código idêntico em todo o site pipeline em todos os ambientes, alternando o conjunto de dados.