メインコンテンツまでスキップ

パイプラインにパラメーターを使用する

パイプラインパラメーターを使用すると、複数の環境やデータセットで同じパイプラインソースコードを再利用できます。たとえば、dev および prod のカタログに対して同じ変換を実行したり、実行ごとに別のソースパスからインジェストしたりできます。パイプラインでパラメーターを定義し(または更新を開始するときにそれらを上書きし)、SQLソースコードからそれらを参照します。

備考

ベータ版

この機能はベータ版です。ワークスペース管理者は、 プレビュー ページからこの機能へのアクセスを制御できます。Databricksのプレビューを管理するを参照してください。

このページでは、SQL ソースコードで利用できるパイプライン パラメーター機能について説明します。パイプラインでPythonのソースコードをパラメーター化するには、「 構成フィールドを使用したパラメーターの参照 」に記載されているように、構成 フィールドを使い続けてください。設定は、パイプラインがランタイムで読み取るSpark構成値を設定するためにも使用されます。Spark 設定の詳細については、「パイプライン プロパティ リファレンス」を参照してください。

パイプラインのパラメーターとは何ですか?

パイプラインのパラメーターは、以下のことができるキーと値のペアです。

  • パイプライン設定でデフォルトとして宣言します。
  • パイプライン UI、更新開始 API、または 「異なる設定で実行」 ダイアログから更新を開始する場合に上書きします。
  • ジョブ内のパイプラインタスクを上書きし、オプションでジョブレベルパラメーターをプッシュダウンします。
  • SQLソースコードからの名前付きパラメーター構文を使用した参照。

パラメーターの値は常に文字列です。_``-``.キーには英数字、アンダースコア、ハイフン、ピリオドを含めることができます。

パイプラインパラメーターと Configuration フィールドには、異なる目的があります:

Use パラメーター for...

設定を使用します…

更新間で変化する値(ターゲットカタログ、ソースパス、日付範囲)。

パイプラインの動作を制御するSpark構成(pipelines.enzyme.enabledpipelines.clusterLabelsV2Enabled

ジョブまたはタスクからプッシュダウンしたい値。

静的、構造的パイプライン プロパティ

名前付きパラメーター構文を使用してSQLで参照される値。

SQL の ${key} 構文、または Python の spark.conf.get("key") 構文で参照する値。

パイプラインパラメーターの定義

パイプライン設定でデフォルトのパラメーター値を定義できます。アップデートが上書きなしで実行されると、パイプラインはこれらのデフォルトを使用します。

パイプライン UI を使用する

  1. ワークスペースで、サイドバーの ワークフロー アイコン。 ジョブとパイプライン をクリックし、ご自身のパイプラインを選択します。
  2. 設定 をクリックします。
  3. **パイプライン設定**サイドバーで、**パラメーター**セクションの**編集**をクリックします。
  4. キー のエントリを追加し、 保存 をクリックします。

JSON または REST API を使用する

parameters マップをパイプライン定義に追加:

JSON
{
"name": "Sales pipeline",
"parameters": {
"source_catalog": "dev_catalog",
"source_schema": "sales",
"start_date": "2026-01-01"
}
}

完全なパイプライン JSON リファレンスについては、「パイプラインの構成」を参照してください。

SQL ソースコードでパラメーターを参照する

キーの前にコロンを付けてパラメーターを参照します。Databricks は値を更新時に文字列としてバインドします。

SQL
CREATE OR REFRESH MATERIALIZED VIEW transaction_summary AS
SELECT account_id,
COUNT(txn_id) AS txn_count,
SUM(txn_amount) AS account_revenue
FROM :source_catalog.sales.transactions
WHERE txn_date >= :start_date
GROUP BY account_id

カタログ、スキーマ、またはテーブル名などの識別子の位置でパラメーターを使用するには、IDENTIFIER() で囲みます。

SQL
USE CATALOG IDENTIFIER(:source_catalog);
USE SCHEMA IDENTIFIER(:source_schema);

CREATE OR REFRESH MATERIALIZED VIEW daily_sales AS
SELECT date(timestamp) AS date,
SUM(price) AS total_sales
FROM transactions
GROUP BY date;

ソースコードが更新時に値のないパラメーターを参照している場合、更新はエラーで失敗します。パイプラインは、コードが参照しない追加のパラメーターを無視します。

更新時にパラメーターを上書きする

保存済みのデフォルトを変更することなく、一度の更新でパラメーター値を上書きできます。

  • パイプライン UI から、[ 異なる設定で実行 ]をクリックし、 パラメーター セクションを編集します。
  • ジョブ内のパイプラインタスクから、タスクの [パラメーター] フィールドでパラメーターのオーバーライドを設定します。See パラメーター.
  • APIから、更新の開始リクエストでparametersマップを渡します。

Databricks は特定の更新のパラメーターを更新履歴に記録し、パイプライン実行リストの ラン パラメーター 列に表示します。

パラメーターの優先度

同じキーを複数箇所で定義した場合、優先順位が最も高い値が適用されます。高いものから低いものの順:

  1. ジョブ実行パラメーター: 単一のジョブ実行に指定される値(上書き)
  2. ジョブ パラメーター : 親ジョブで定義されたデフォルトです。
  3. パイプライン タスク パラメーター :パイプライン タスクに設定された値です。
  4. パイプラインパラメーター :パイプライン設定で定義されているデフォルトです。

これは、他のジョブパラメータータスクのタイプで使用される優先順位と一致します。

Lakeflow ジョブのパイプラインパラメーター

パイプラインをジョブのパイプライン タスクとしてスケジュールすると、そのタスクは、パイプラインのデフォルトを上書きするパラメーターを提供できます。パラメーター値は、動的値参照を使用して、ジョブ実行時の値(例:{{job.trigger.time.iso_date}}{{job.parameters.region}})を注入できます。

LakeFlow ジョブは、ノートブックタスクやSQLタスクにプッシュダウンするのと同じように、すべてのジョブパラメーターをパイプラインタスクに自動的にプッシュダウンします。パイプラインのソース コードは、名前付きパラメーター構文を使用して、プッシュダウンされた値を参照できます。パイプライン設定でのパラメーターの宣言はオプションであり、上書きのない実行に対してのみデフォルトが設定されます。

注意事項および既知の制限事項

  • パイプラインは一度に1つの更新を実行 :一度に1つの更新のみ実行できます。複数の更新が重なってジョブが失敗するのを防ぐため、Databricksは2つのシナリオで同時実行数を1に制限しています。

    • パイプライン タスクを含み、複数のmax_concurrent_runsで構成されているジョブです。
    • 反復回数にかかわらず、for-each タスクでラップされたパイプライン タスク。

    この上限が有効になると、ジョブ UI に通知が表示されます。多くのパラメーターの組み合わせで実行する予定のパラメーター化されたパイプラインを設計する際は、上限を考慮してください。

  • 日付フィルターは完全な更新をトリガーする場合があります :一般的なパラメータ化のユースケースは、データを日付でフィルタリングすることです。述語には注意してください。日付範囲の両側をフィルタリングすると、マテリアライズドビューでの増分処理が無効になり、更新ごとに完全更新がトリガーされます。

    SQL
    -- Triggers a full refresh on each update
    CREATE OR REFRESH MATERIALIZED VIEW recent_orders AS
    SELECT * FROM orders
    WHERE order_date >= :start_date AND order_date < :end_date;
    SQL
    -- Processes incrementally
    CREATE OR REFRESH MATERIALIZED VIEW recent_orders AS
    SELECT * FROM orders
    WHERE order_date >= :start_date;
  • 名前付きパラメーターはSQL専用です:このベータ版では、名前付きパラメーター構文はSQLソースコードでのみ使用できます。Pythonソースコードをパラメータ化するには、 で**Configuration**フィールドを引き続き使用してください。 spark.conf.get()構成フィールドを使用したパラメーターの参照を参照してください。

構成フィールドを使用してパラメーターを参照します。

パイプラインの 設定 フィールドは、Sparkの設定値として公開される任意のキーと値のペアを受け入れます。これはレガシーのパラメータ化メカニズムであり、パイプライン パラメーターと並行して引き続き動作します。Python ソースコード、および名前付きパラメーター構文ではなく spark.conf.get() を使用して読み取るキーに使用します。

次の例では、mypipeline.start_date構成値を使用して開発パイプラインを入力データのサブセットに制限します。

SQL
CREATE OR REFRESH MATERIALIZED VIEW customer_events
AS SELECT * FROM source_table WHERE date > '${mypipeline.start_date}';

構成値は、パイプライン設定の 構成 セクション、またはパイプライン JSON の configuration フィールドで設定します。予約済みのパイプラインまたは Apache Spark 構成値と競合するキーは使用しないでください。

関連記事