Usar parâmetros com o pipeline Delta Live Tables
Este artigo explica como o senhor pode usar as configurações do Delta Live Tables pipeline para parametrizar o código pipeline.
Parâmetros de referência
Durante as atualizações, o código-fonte do pipeline pode acessar os parâmetros do pipeline usando a sintaxe para obter valores para as configurações do Spark.
O senhor faz referência aos parâmetros do pipeline usando o key. O valor é injetado em seu código-fonte como uma cadeia de caracteres antes da avaliação da lógica do código-fonte.
A sintaxe do exemplo a seguir usa um parâmetro com key source_catalog
e valor dev_catalog
para especificar a fonte de dados de um site materializado view:
CREATE OR REFRESH MATERIALIZED VIEW transation_summary AS
SELECT account_id,
COUNT(txn_id) txn_count,
SUM(txn_amount) account_revenue
FROM ${source_catalog}.sales.transactions_table
GROUP BY account_id
import dlt
from pyspark.sql.functions import col, sum, count
@dlt.table
def transaction_summary():
source_catalog = spark.conf.get("source_catalog")
return (spark.read
.table(f"{source_catalog}.sales.transactions_table")
.groupBy("account_id")
.agg(
count(col("txn_id").alias("txn_count")),
sum(col("txn_amount").alias("account_revenue"))
)
)
Definir parâmetros
Passe parâmetros para o pipeline passando parâmetros arbitrários key-value como configurações para o pipeline. O senhor pode definir parâmetros ao definir ou editar uma configuração pipeline usando a UI workspace ou JSON. Consulte Configurar um pipeline do Delta Live Tables.
A chave de parâmetro do pipeline só pode conter _ - .
ou caracteres alfanuméricos. Os valores dos parâmetros são definidos como strings.
Os parâmetros do pipeline não são compatíveis com valores dinâmicos. O senhor deve atualizar o valor associado a um key na configuração do pipeline.
Importante
Não use palavras-chave que entrem em conflito com valores reservados de configuração do pipeline ou do Apache Spark.
Parametrizar declarações de conjuntos de dados em Python ou SQL
Os códigos Python e SQL que definem seu conjunto de dados podem ser parametrizados pelas configurações do pipeline. A parametrização permite os seguintes casos de uso:
Separando caminhos longos e outras variáveis do seu código.
Reduzir a quantidade de dados processados em ambientes de desenvolvimento ou de teste para acelerar os testes.
Reutilizar a mesma lógica de transformações para processar a partir de várias fontes de dados.
O exemplo a seguir usa o valor de configuração startDate
para limitar o pipeline de desenvolvimento a um subconjunto dos dados de entrada:
CREATE OR REFRESH MATERIALIZED VIEW customer_events
AS SELECT * FROM sourceTable WHERE date > '${mypipeline.startDate}';
@dlt.table
def customer_events():
start_date = spark.conf.get("mypipeline.startDate")
return read("sourceTable").where(col("date") > start_date)
{
"name": "Data Ingest - DEV",
"configuration": {
"mypipeline.startDate": "2021-01-02"
}
}
{
"name": "Data Ingest - PROD",
"configuration": {
"mypipeline.startDate": "2010-01-02"
}
}
Controle a fonte de dados com parâmetros
O senhor pode usar os parâmetros do pipeline para especificar diferentes fontes de dados em diferentes configurações do mesmo pipeline.
Por exemplo, o senhor pode especificar caminhos diferentes nas configurações de desenvolvimento, teste e produção para um pipeline usando a variável data_source_path
e, em seguida, fazer referência a ela usando o código a seguir:
CREATE STREAMING TABLE bronze
AS (
SELECT
*,
_metadata.file_path AS source_file_path
FROM read_files( '${data_source_path}', 'csv',
map("header", "true"))
)
import dlt
from pyspark.sql.functions import col
data_source_path = spark.conf.get("data_source_path")
@dlt.table
def bronze():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("header", True)
.load(data_source_path )
.select("*", col("_metadata.file_path").alias("source_file_name"))
)
Esse padrão é benéfico para testar como a lógica de ingestão pode lidar com esquemas ou dados malformados durante a ingestão inicial. O senhor pode usar o código idêntico em todo o site pipeline em todos os ambientes, alternando o conjunto de dados.