フローを置換
ベータ版
フローがベータ版である部分を置き換えます。
このページでは、 LakeFlow Spark宣言型パイプラインの REPLACE WHERE フローを使用して、テーブル履歴全体を再処理せずに、テーブルの対象のサブセットを再計算して上書きする方法について説明します。 REPLACE WHERE フローは、遅れて到着するデータ、上流の再処理、スキーマ進化、およびバックフィルを処理します。
REPLACE WHERE フローでは、対象テーブルに対して述語を定義します。述語に一致するすべての行は削除され、同じ述語範囲に対してソースクエリを再評価することで置き換えられます。述語に一致しない行はそのまま残されます。
要件
- パイプラインは
PREVIEWチャンネルを使用する必要があります。 - Databricks 、最高のエクスペリエンスを得るためにUnity Catalogとサーバーレス コンピュートを推奨しています。
REPLACE WHERE フローを使用するタイミング
REPLACE WHERE フローは、以下のシナリオに最適です。
- ストリーミング セマンティクスを使用しない増分バッチ処理: ウォーターマークなどのストリーミングの概念を使用せずに、バッチ内の新しい行を処理します。
- 選択的再処理: 述語に一致する行のみを再計算し、その他の行はそのまま残します。
- 標準的なマテリアライズドビュー機能を超えるシナリオ:
- ソーステーブルよりも保持期間の長いターゲットテーブル
- ディメンションテーブルが変更された際に再計算を防止する
- 履歴全体を再計算せずにスキーマを進化させる
述語設計ガイドライン
集計列または派生列に対して REPLACE WHERE 述語を使用することは避けてください。例えば、 total_salesがSUM()であるtotal_sales > 100000のような述語では、エンジンは実行ごとにすべてのパーティションの集計を再計算する必要があります。dateやregionなどの基本列に述語を使用することで、エンジンはフィルタをソースまでプッシュダウンし、関連するデータのみを処理できます。
REPLACE WHERE フローを作成する
SQLとPythonの両方で、REPLACE WHERE句を定義できます。
- SQL
- Python
FLOW REPLACE WHERE句をCREATE STREAMING TABLEとインラインで使用してください。
CREATE STREAMING TABLE orders_enriched
FLOW REPLACE WHERE date >= date_add(current_timestamp(), -7) BY NAME
SELECT
o.order_id,
o.date,
o.region,
p.product_name,
o.qty,
o.price
FROM orders_fct o
JOIN product_dim p
ON o.product_id = p.product_id;
長い形式のCREATE FLOW構文を使用することもできます。
CREATE STREAMING TABLE orders_enriched;
CREATE FLOW orders_enriched AS
INSERT INTO orders_enriched BY NAME
REPLACE WHERE date >= date_add(current_timestamp(), -7)
SELECT
o.order_id,
o.date,
o.region,
p.product_name,
o.qty,
o.price
FROM orders_fct o
JOIN product_dim p
ON o.product_id = p.product_id;
Pythonでは、テーブルとフローは単一のステートメントで定義されます。フローはテーブルと同じ名前を継承します。
from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col
@dp.table(
replace_where=col("date") >= F.date_sub(F.current_date(), 7)
)
def orders_enriched():
orders_fct = spark.read.table("orders_fct").select("date", "order_id", "region", "qty", "price")
product_dim = spark.read.table("product_dim")
return orders_fct.join(product_dim, "product_id")
replace_where問題は、 PySpark列式または文字列述語のいずれかを受け入れます。
これらの例では、 orders_enrichedから過去 7 日間のすべての行が削除され、ソースクエリから再計算されます。ソースクエリに述語を追加する必要はありません。パイプラインエンジンがソースから読み込む際に自動的に適用します。
BY NAME SQLでは必須です。位置ではなく名前で列を照合します。
述語オーバーライドを使用してバックフィルを実行します
パイプライン定義を変更することなく、単一のパイプライン更新に対してREPLACE WHERE述語をオーバーライドできます。述語の上書きは一度限りのものであり、現在の更新にのみ適用され、今後の実行には影響しません。
例:初期履歴データの読み込み
最初にパイプラインを設定するときにヒストリカルデータの 1 回限りのバックフィルを実行するには、次の手順を実行します。
pipeline_id = "<pipeline-id>"
overrides = [
{
"flow_name": "orders_enriched",
"predicate_override": "date BETWEEN '2020-01-01' AND '2024-12-31'",
}
]
resp = start_update_with_replace_where(
pipeline_id=pipeline_id,
replace_where_overrides=overrides,
)
print(resp)
ヒストリカルデータは、増分データと同じソースから取得する必要があります。 別のソースからのヒストリカルデータがある場合は、ターゲット テーブルで DML ステートメントを直接使用します。 DMLステートメントを使用したバックフィルを参照してください。
例:特定の期間の列を修正する
列定義を更新した後、対象とする履歴範囲に対して変更内容を反映させる:
pipeline_id = "<pipeline-id>"
overrides = [
{
"flow_name": "orders_enriched",
"predicate_override": "date >= date_add(current_timestamp(), -30)",
}
]
resp = start_update_with_replace_where(
pipeline_id=pipeline_id,
replace_where_overrides=overrides,
refresh_selection=["orders_enriched"],
)
print(resp)
述語のオーバーライドでは、複数の次元を組み合わせることもできます。
overrides = [
{
"flow_name": "orders_enriched",
"predicate_override": "date >= date_add(current_timestamp(), -30) AND region = 'asia'",
}
]
ヘルパー関数: start_update_with_replace_where
start_update_with_replace_whereノートブックからパイプライン更新APIを使用して述語のオーバーライドを送信します。
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.pipelines import StartUpdateResponse
def start_update_with_replace_where(
pipeline_id: str,
replace_where_overrides: list[dict],
refresh_selection: list[str] = None,
) -> StartUpdateResponse:
"""Start a pipeline update with REPLACE WHERE predicate overrides."""
client = WorkspaceClient()
body = {
"pipeline_id": pipeline_id,
"cause": "JOB_TASK",
"update_cause_details": {
"job_details": {"performance_target": "PERFORMANCE"}
},
"replace_where_overrides": replace_where_overrides,
}
if refresh_selection:
body["refresh_selection"] = refresh_selection
res = client.api_client.do(
"POST",
f"/api/2.0/pipelines/{pipeline_id}/updates",
body=body,
headers={"Accept": "application/json", "Content-Type": "application/json"},
)
return StartUpdateResponse.from_dict(res)
DMLステートメントによるバックフィル
パイプラインの外部から対象テーブルに対してDMLステートメントを直接実行することで、初期ロードや修正(レガシーテーブルからのロードなど)を実行できます。
INSERT INTO orders_enriched
SELECT *
FROM orders_enriched_legacy
WHERE date < '2025-01-01';
DML によって挿入された行は、REPLACE WHERE 述語の対象外であり、将来の実行の述語範囲内に含まれない限り、スケジュールされた更新後も保持されます。
完全更新動作
完全更新では、既存のデータがすべて消去され、定義された述語のみを使用してフローが再実行されます。パイプラインが7日間の述語で1年間実行されている場合、完全な更新を行うと、テーブルには過去7日間のデータのみが含まれることになります。古い行はすべて完全に削除されます。
テーブル全体の更新を防ぐには、テーブルプロパティpipelines.reset.allowedをfalseに設定します。パイプラインプロパティのリファレンスを参照してください。
制限事項
- 対象テーブルはパイプライン内で作成する必要があります。
- 対象テーブルごとに許可されるREPLACE WHERE句は1つのみです。
- REPLACE WHERE フローの対象となるテーブルは、AUTO CDC フローや追加フローなど、他のフロータイプの対象にすることはできません。
- REPLACE WHERE フローの対象となるテーブルでは、期待される動作はサポートされていません。
- Databricks SQLで作成されたストリーミングテーブルについては、構文とバックフィルに関する相違点について、 Databricks SQLのREPLACE WHEREフローを参照してください。
例
保存期間が限定されたソースからの過去の集計データを保持する
この例では、ソーステーブルから生データが削除された後(3日間の保持期間)でも、日次集計値を無期限に保持します。
CREATE STREAMING TABLE events_agg
FLOW REPLACE WHERE date >= date_add(current_date(), -3) BY NAME
SELECT
date,
key,
SUM(val) AS agg
FROM events_raw
GROUP BY ALL;
ディメンションテーブルが変更されたときに再計算を防止する
この例では、ディメンション属性が変更されても、履歴事実の行は変更されません。
CREATE STREAMING TABLE fact_dim_join
FLOW REPLACE WHERE f.date >= date_add(current_date(), -1) BY NAME
SELECT
f.date,
f.user_id,
d.region,
f.revenue
FROM fact_table f
JOIN dim_users d
ON f.user_id = d.user_id;
ユーザーの地域が変更された場合、最新の行のみが再計算されます。履歴行には、書き込み時点の地域値が保持されます。述語オーバーライドを使用したターゲットバックフィルにより、後から過去の行を修正できます。
全履歴を再計算せずに新しいメトリクスを追加
この例では、テーブル定義を進化させ、対象範囲のみをバックフィルする方法を示します。
-
初期テーブルを定義します。
SQLCREATE STREAMING TABLE clickstream_daily
FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
SELECT
event_date,
page_id,
COUNT(*) AS clicks
FROM clickstream_raw
GROUP BY ALL; -
クエリを更新して
uniq_usersを追加します。SQLCREATE STREAMING TABLE clickstream_daily
FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
SELECT
event_date,
page_id,
COUNT(*) AS clicks,
COUNT(DISTINCT user_id) AS uniq_users
FROM clickstream_raw
GROUP BY ALL; -
過去 30 日間の新しいメトリクスをバックフィルします。
Pythonoverrides = [
{
"flow_name": "clickstream_daily",
"predicate_override": "event_date BETWEEN '2026-01-01' AND '2026-01-30'",
}
]
resp = start_update_with_replace_where(
pipeline_id="<pipeline-id>",
replace_where_overrides=overrides,
refresh_selection=["clickstream_daily"],
)バックフィルされた範囲よりも古い行には、
uniq_usersに対してNULLが含まれています。