メインコンテンツまでスキップ

Lakeflow 宣言型パイプラインでパラメーターを使用する

この記事では、 Lakeflow 宣言型パイプライン構成を使用してパイプライン コードをパラメーター化する方法について説明します。

パラメーターの参照

更新中、パイプライン ソース コードは構文を使用してパイプライン パラメーターにアクセスし、 Spark設定の値を取得できます。

キーを使用してパイプライン問題を参照します。 値は、ソース コード ロジックが評価される前に、文字列としてソース コードに挿入されます。

次の構文例では、キー source_catalog と値 dev_catalog を持つパラメーターを使用して、マテリアライズドビューのデータソースを指定します。

SQL
CREATE OR REFRESH MATERIALIZED VIEW transation_summary AS
SELECT account_id,
COUNT(txn_id) txn_count,
SUM(txn_amount) account_revenue
FROM ${source_catalog}.sales.transactions_table
GROUP BY account_id

パラメーターの設定

任意のキーと値のペアをパイプラインの構成として渡して、パラメーターをパイプラインに渡します。 パイプライン設定を定義または編集する際に、ワークスペース UI または JSON を使用してパラメーターを設定できます。「Lakeflow宣言型パイプラインの構成」を参照してください。

パイプライン問題キーには、 _ - .または英数字のみを含めることができます。 問題の値は文字列として設定されます。

パイプラインの問題は動的な値をサポートしていません。 パイプライン構成内のキーに関連付けられた値を更新する必要があります。

important

予約済みのパイプラインまたは Apache Spark 構成値と競合するキーワードを使用しないでください。

Python または SQL でデータセット宣言をパラメータ化する

データセットを定義する Python および SQL コードは、パイプラインの設定によってパラメーター化できます。パラメータ化により、次のユースケースが可能になります。

  • 長いパスやその他の変数をコードから分離します。
  • 開発環境またはステージング環境で処理されるデータの量を減らして、テストを高速化します。
  • 同じ変換ロジックを再利用して、複数のデータソースから処理します。

次の例では、 startDate構成値を使用して、開発パイプラインを入力データのサブセットに制限します。

SQL
CREATE OR REFRESH MATERIALIZED VIEW customer_events
AS SELECT * FROM sourceTable WHERE date > '${mypipeline.startDate}';
Python
@dp.table
def customer_events():
start_date = spark.conf.get("mypipeline.startDate")
return read("sourceTable").where(col("date") > start_date)
JSON
{
"name": "Data Ingest - DEV",
"configuration": {
"mypipeline.startDate": "2021-01-02"
}
}
JSON
{
"name": "Data Ingest - PROD",
"configuration": {
"mypipeline.startDate": "2010-01-02"
}
}

パラメーターによるデータソースの制御

パイプライン問題を使用すると、同じパイプラインの異なる構成で異なるデータ ソースを指定できます。

たとえば、変数data_source_pathを使用してパイプラインの開発、テスト、本番運用構成で異なるパスを指定し、次のコードを使用してそれを参照できます。

SQL
CREATE STREAMING TABLE bronze AS
SELECT *, _metadata.file_path AS source_file_path
FROM STREAM read_files(
'${data_source_path}',
format => 'csv',
header => true
)

このパターンは、初期取り込み時に取り込みロジックがスキーマまたは不正な形式のデータをどのように処理するかをテストするのに役立ちます。データセットを切り替えながら、すべての環境のパイプライン全体で同一のコードを使用できます。