Usar parâmetros com o pipeline declarativo LakeFlow
Este artigo explica como você pode usar as configurações de pipeline declarativas LakeFlow para parametrizar o código pipeline .
Parâmetros de referência
Durante as atualizações, o código-fonte do pipeline pode acessar os parâmetros do pipeline usando a sintaxe para obter valores para as configurações do Spark.
Você faz referência aos parâmetros pipeline usando a key. O valor é injetado no seu código-fonte como uma string antes da avaliação da lógica do código-fonte.
A sintaxe de exemplo a seguir usa um parâmetro com key source_catalog
e valor dev_catalog
para especificar a fonte de dados para uma view materializada:
- SQL
- Python
CREATE OR REFRESH MATERIALIZED VIEW transation_summary AS
SELECT account_id,
COUNT(txn_id) txn_count,
SUM(txn_amount) account_revenue
FROM ${source_catalog}.sales.transactions_table
GROUP BY account_id
from pyspark import pipelines as dp
from pyspark.sql.functions import col, sum, count
@dp.table
def transaction_summary():
source_catalog = spark.conf.get("source_catalog")
return (spark.read
.table(f"{source_catalog}.sales.transactions_table")
.groupBy("account_id")
.agg(
count(col("txn_id").alias("txn_count")),
sum(col("txn_amount").alias("account_revenue"))
)
)
Definir parâmetros
Passe parâmetros para o pipeline passando keye valores par arbitrários como configurações para o pipeline. Você pode definir parâmetros ao definir ou editar uma configuração pipeline usando a interface do usuário workspace ou JSON. Consulte Configurar pipeline declarativo LakeFlow.
A chave do parâmetro do pipeline pode conter apenas _ - .
ou caracteres alfanuméricos. Os valores dos parâmetros são definidos como strings.
Os parâmetros do pipeline não suportam valores dinâmicos. Você deve atualizar o valor associado a uma key na configuração pipeline .
Não use palavras-chave que entrem em conflito com valores de configuração do pipeline reservado ou do Apache Spark.
Parametrizar declarações de dataset em Python ou SQL
O código Python e SQL que define seu conjunto de dados pode ser parametrizado pelas configurações do pipeline. A parametrização permite os seguintes casos de uso:
- Separando caminhos longos e outras variáveis do seu código.
- Reduzir a quantidade de dados processados em ambientes de desenvolvimento ou preparação para acelerar os testes.
- Reutilizando a mesma lógica de transformações para processar a partir de múltiplas fontes de dados.
O exemplo a seguir usa o valor de configuração startDate
para limitar o pipeline de desenvolvimento a um subconjunto dos dados de entrada:
CREATE OR REFRESH MATERIALIZED VIEW customer_events
AS SELECT * FROM sourceTable WHERE date > '${mypipeline.startDate}';
@dp.table
def customer_events():
start_date = spark.conf.get("mypipeline.startDate")
return read("sourceTable").where(col("date") > start_date)
{
"name": "Data Ingest - DEV",
"configuration": {
"mypipeline.startDate": "2021-01-02"
}
}
{
"name": "Data Ingest - PROD",
"configuration": {
"mypipeline.startDate": "2010-01-02"
}
}
Controlar fonte de dados com parâmetros
Você pode usar parâmetros pipeline para especificar diferentes fontes de dados em diferentes configurações do mesmo pipeline.
Por exemplo, você pode especificar caminhos diferentes em configurações de desenvolvimento, teste e produção para um pipeline usando a variável data_source_path
e, em seguida, referenciá-la usando o seguinte código:
- SQL
- Python
CREATE STREAMING TABLE bronze AS
SELECT *, _metadata.file_path AS source_file_path
FROM STREAM read_files(
'${data_source_path}',
format => 'csv',
header => true
)
from pyspark import pipelines as dp
from pyspark.sql.functions import col
data_source_path = spark.conf.get("data_source_path")
@dp.table
def bronze():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("header", True)
.load(data_source_path )
.select("*", col("_metadata.file_path").alias("source_file_name"))
)
Esse padrão é benéfico para testar como a lógica de ingestão pode lidar com esquemas ou dados malformados durante a ingestão inicial. Você pode usar o mesmo código em todo o seu pipeline , em todos os ambientes, ao alternar o conjunto de dados.