パイプラインにパラメーターを使用する
パイプラインパラメーターを使用すると、複数の環境やデータセットで同じパイプラインソースコードを再利用できます。たとえば、dev および prod のカタログに対して同じ変換を実行したり、実行ごとに別のソースパスからインジェストしたりできます。パイプラインでパラメーターを定義し(または更新を開始するときにそれらを上書きし)、SQLソースコードからそれらを参照します。
ベータ版
この機能はベータ版です。ワークスペース管理者は、 プレビュー ページからこの機能へのアクセスを制御できます。Databricksのプレビューを管理するを参照してください。
このページでは、SQL ソースコードで利用できるパイプライン パラメーター機能について説明します。パイプラインでPythonのソースコードをパラメーター化するには、「 構成フィールドを使用したパラメーターの参照 」に記載されているように、構成 フィールドを使い続けてください。設定は、パイプラインがランタイムで読み取るSpark構成値を設定するためにも使用されます。Spark 設定の詳細については、「パイプライン プロパティ リファレンス」を参照してください。
パイプラインのパラメーターとは何ですか?
パイプラインのパラメーターは、以下のことができるキーと値のペアです。
- パイプライン設定でデフォルトとして宣言します。
- パイプライン UI、更新開始 API、または 「異なる設定で実行」 ダイアログから更新を開始する場合に上書きします。
- ジョブ内のパイプラインタスクを上書きし、オプションでジョブレベルパラメーターをプッシュダウンします。
- SQLソースコードからの名前付きパラメーター構文を使用した参照。
パラメーターの値は常に文字列です。_``-``.キーには英数字、アンダースコア、ハイフン、ピリオドを含めることができます。
パイプラインパラメーターと Configuration フィールドには、異なる目的があります:
Use パラメーター for... | 設定を使用します… |
|---|---|
更新間で変化する値(ターゲットカタログ、ソースパス、日付範囲)。 | パイプラインの動作を制御するSpark構成( |
ジョブまたはタスクからプッシュダウンしたい値。 | 静的、構造的パイプライン プロパティ |
名前付きパラメーター構文を使用してSQLで参照される値。 | SQL の |
パイプラインパラメーターの定義
パイプライン設定でデフォルトのパラメーター値を定義できます。アップデートが上書きなしで実行されると、パイプラインはこれらのデフォルトを使用します。
パイプライン UI を使用する
- ワークスペースで、サイドバーの
ジョブとパイプライン をクリックし、ご自身のパイプラインを選択します。
- 設定 をクリックします。
- **パイプライン設定**サイドバーで、**パラメーター**セクションの**編集**をクリックします。
- キー と 値 のエントリを追加し、 保存 をクリックします。
JSON または REST API を使用する
parameters マップをパイプライン定義に追加:
{
"name": "Sales pipeline",
"parameters": {
"source_catalog": "dev_catalog",
"source_schema": "sales",
"start_date": "2026-01-01"
}
}
完全なパイプライン JSON リファレンスについては、「パイプラインの構成」を参照してください。
SQL ソースコードでパラメーターを参照する
キーの前にコロンを付けてパラメーターを参照します。Databricks は値を更新時に文字列としてバインドします。
CREATE OR REFRESH MATERIALIZED VIEW transaction_summary AS
SELECT account_id,
COUNT(txn_id) AS txn_count,
SUM(txn_amount) AS account_revenue
FROM :source_catalog.sales.transactions
WHERE txn_date >= :start_date
GROUP BY account_id
カタログ、スキーマ、またはテーブル名などの識別子の位置でパラメーターを使用するには、IDENTIFIER() で囲みます。
USE CATALOG IDENTIFIER(:source_catalog);
USE SCHEMA IDENTIFIER(:source_schema);
CREATE OR REFRESH MATERIALIZED VIEW daily_sales AS
SELECT date(timestamp) AS date,
SUM(price) AS total_sales
FROM transactions
GROUP BY date;
ソースコードが更新時に値のないパラメーターを参照している場合、更新はエラーで失敗します。パイプラインは、コードが参照しない追加のパラメーターを無視します。
更新時にパラメーターを上書きする
保存済みのデフォルトを変更することなく、一度の更新でパラメーター値を上書きできます。
- パイプライン UI から、[ 異なる設定で実行 ]をクリックし、 パラメーター セクションを編集します。
- ジョブ内のパイプラインタスクから、タスクの [パラメーター] フィールドでパラメーターのオーバーライドを設定します。See パラメーター.
- APIから、更新の開始リクエストで
parametersマップを渡します。
Databricks は特定の更新のパラメーターを更新履歴に記録し、パイプライン実行リストの ラン パラメーター 列に表示します。
パラメーターの優先度
同じキーを複数箇所で定義した場合、優先順位が最も高い値が適用されます。高いものから低いものの順:
- ジョブ実行パラメーター: 単一のジョブ実行に指定される値(上書き)
- ジョブ パラメーター : 親ジョブで定義されたデフォルトです。
- パイプライン タスク パラメーター :パイプライン タスクに設定された値です。
- パイプラインパラメーター :パイプライン設定で定義されているデフォルトです。
これは、他のジョブパラメータータスクのタイプで使用される優先順位と一致します。
Lakeflow ジョブのパイプラインパラメーター
パイプラインをジョブのパイプライン タスクとしてスケジュールすると、そのタスクは、パイプラインのデフォルトを上書きするパラメーターを提供できます。パラメーター値は、動的値参照を使用して、ジョブ実行時の値(例:{{job.trigger.time.iso_date}}や{{job.parameters.region}})を注入できます。
LakeFlow ジョブは、ノートブックタスクやSQLタスクにプッシュダウンするのと同じように、すべてのジョブパラメーターをパイプラインタスクに自動的にプッシュダウンします。パイプラインのソース コードは、名前付きパラメーター構文を使用して、プッシュダウンされた値を参照できます。パイプライン設定でのパラメーターの宣言はオプションであり、上書きのない実行に対してのみデフォルトが設定されます。
注意事項および既知の制限事項
-
パイプラインは一度に1つの更新を実行 :一度に1つの更新のみ実行できます。複数の更新が重なってジョブが失敗するのを防ぐため、Databricksは2つのシナリオで同時実行数を1に制限しています。
- パイプライン タスクを含み、複数の
max_concurrent_runsで構成されているジョブです。 - 反復回数にかかわらず、for-each タスクでラップされたパイプライン タスク。
この上限が有効になると、ジョブ UI に通知が表示されます。多くのパラメーターの組み合わせで実行する予定のパラメーター化されたパイプラインを設計する際は、上限を考慮してください。
- パイプライン タスクを含み、複数の
-
日付フィルターは完全な更新をトリガーする場合があります :一般的なパラメータ化のユースケースは、データを日付でフィルタリングすることです。述語には注意してください。日付範囲の両側をフィルタリングすると、マテリアライズドビューでの増分処理が無効になり、更新ごとに完全更新がトリガーされます。
SQL-- Triggers a full refresh on each update
CREATE OR REFRESH MATERIALIZED VIEW recent_orders AS
SELECT * FROM orders
WHERE order_date >= :start_date AND order_date < :end_date;SQL-- Processes incrementally
CREATE OR REFRESH MATERIALIZED VIEW recent_orders AS
SELECT * FROM orders
WHERE order_date >= :start_date; -
名前付きパラメーターはSQL専用です:このベータ版では、名前付きパラメーター構文はSQLソースコードでのみ使用できます。Pythonソースコードをパラメータ化するには、 で**Configuration**フィールドを引き続き使用してください。
spark.conf.get()構成フィールドを使用したパラメーターの参照を参照してください。
構成フィールドを使用してパラメーターを参照します。
パイプラインの 設定 フィールドは、Sparkの設定値として公開される任意のキーと値のペアを受け入れます。これはレガシーのパラメータ化メカニズムであり、パイプライン パラメーターと並行して引き続き動作します。Python ソースコード、および名前付きパラメーター構文ではなく spark.conf.get() を使用して読み取るキーに使用します。
次の例では、mypipeline.start_date構成値を使用して開発パイプラインを入力データのサブセットに制限します。
- SQL
- Python
CREATE OR REFRESH MATERIALIZED VIEW customer_events
AS SELECT * FROM source_table WHERE date > '${mypipeline.start_date}';
from pyspark import pipelines as dp
from pyspark.sql.functions import col
@dp.table
def customer_events():
start_date = spark.conf.get("mypipeline.start_date")
return spark.read.table("source_table").where(col("date") > start_date)
構成値は、パイプライン設定の 構成 セクション、またはパイプライン JSON の configuration フィールドで設定します。予約済みのパイプラインまたは Apache Spark 構成値と競合するキーは使用しないでください。