REPLACE WHERE フローを使用したバッチ処理
ベータ版
フローがベータ版である部分を置き換えます。
このページでは、 LakeFlow Spark宣言型パイプラインの REPLACE WHERE フローを使用して、テーブル履歴全体を再処理せずに、テーブルの対象のサブセットを再計算して上書きする方法について説明します。 REPLACE WHERE フローは、遅れて到着するデータ、上流の再処理、スキーマ進化、およびバックフィルを処理します。
REPLACE WHERE フローでは、対象テーブルに対して述語を定義します。述語に一致するすべての行は削除され、同じ述語範囲に対してソースクエリを再評価することで置き換えられます。述語に一致しない行はそのまま残されます。
要件
REPLACE WHERE フローには、以下の要件があります。
- パイプラインは
PREVIEWチャンネルを使用する必要があります。 - DatabricksではUnity Catalogとサーバレス コンピュートを推奨しています。 増分更新はサーバレス コンピュートでのみサポートされます。
REPLACE WHERE フローを使用するタイミング
以下のシナリオでは、REPLACE WHERE フローを使用してください。
- ストリーミング セマンティクスを使用しない増分バッチ処理 : ウォーターマークなどのストリーミングの概念を管理せずに、バッチ内の新しい行を処理します。
- 選択的再処理 :述語に一致する行のみを再計算し、その他の行はそのまま残します。
- 標準的なマテリアライズドビュー機能を超えるシナリオ :
- ソーステーブルよりも保持期間の長いターゲットテーブル
- ディメンションテーブルが変更された際に再計算を防止する
- 履歴全体を再計算せずにスキーマを進化させる
REPLACE WHERE フローを作成する
SQLまたはPythonでREPLACE WHERE句のフローを定義します。
- SQL
- Python
FLOW REPLACE WHERE句をCREATE STREAMING TABLEとインラインで使用してください。
CREATE STREAMING TABLE orders_enriched
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT
o.order_id,
o.date,
o.region,
p.product_name,
o.qty,
o.price
FROM orders_fct o
JOIN product_dim p
ON o.product_id = p.product_id;
または、長い形式のCREATE FLOW構文を使用してください。
CREATE STREAMING TABLE orders_enriched;
CREATE FLOW orders_enriched AS
INSERT INTO orders_enriched BY NAME
REPLACE WHERE date >= date_add(current_date(), -7)
SELECT
o.order_id,
o.date,
o.region,
p.product_name,
o.qty,
o.price
FROM orders_fct o
JOIN product_dim p
ON o.product_id = p.product_id;
Pythonでは、テーブルとフローは単一のステートメントで定義されます。フローはテーブルと同じ名前を継承します。
from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col
@dp.table(
replace_where=col("date") >= F.date_sub(F.current_date(), 7)
)
def orders_enriched():
orders_fct = spark.read.table("orders_fct").select("date", "order_id", "region", "qty", "price")
product_dim = spark.read.table("product_dim")
return orders_fct.join(product_dim, "product_id")
replace_where問題は、 PySpark列式または文字列述語のいずれかを受け入れます。
これらの例では、 orders_enrichedから過去 7 日間のすべての行が削除され、ソース クエリを使用して再計算されます。述語をソースクエリに追加する必要はありません。パイプラインエンジンは、ソースから読み込む際にそれを自動的に適用します。
BY NAME SQLでは必須です。位置ではなく名前で列を照合します。
バックフィル履歴データ
スケジュールされた更新時間外に履歴行または修正行をターゲット テーブルに書き込むには、履歴データが存在する場所に基づいて 2 つのメカニズムから選択します。
- 述語の上書き :一度限りの述語範囲に対して、フローのソースクエリを再実行します。ヒストリカルデータが増分データと同じソースから取得される場合に使用します。
- DMLステートメント :フローをバイパスして、ターゲットテーブルに直接挿入します。ヒストリカルデータが増分データとは異なるソースに存在する場合に使用します。
述語のオーバーライド
パイプライン定義を変更せずに、単一のパイプライン更新に対して REPLACE WHERE 述語をオーバーライドします。述語の上書きは一度限りのものであり、現在の更新にのみ適用され、今後の実行には影響しません。
例:初期履歴データの読み込み
最初にパイプラインを設定するときにヒストリカルデータの 1 回限りのバックフィルを実行するには、次の手順を実行します。
pipeline_id = "<pipeline-id>"
overrides = [
{
"flow_name": "orders_enriched",
"predicate_override": "date BETWEEN '2020-01-01' AND '2024-12-31'",
}
]
resp = start_update_with_replace_where(
pipeline_id=pipeline_id,
replace_where_overrides=overrides,
)
print(resp)
例:特定の期間の列を修正する
列定義を更新した後、対象とする履歴範囲に対して変更内容を反映させる:
pipeline_id = "<pipeline-id>"
overrides = [
{
"flow_name": "orders_enriched",
"predicate_override": "date >= date_add(current_date(), -30)",
}
]
resp = start_update_with_replace_where(
pipeline_id=pipeline_id,
replace_where_overrides=overrides,
refresh_selection=["orders_enriched"],
)
print(resp)
複数の次元を単一の述語オーバーライドに組み合わせる:
overrides = [
{
"flow_name": "orders_enriched",
"predicate_override": "date >= date_add(current_date(), -30) AND region = 'asia'",
}
]
ヘルパー関数: start_update_with_replace_where
start_update_with_replace_whereノートブックからパイプライン更新APIを使用して述語のオーバーライドを送信します。
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.pipelines import StartUpdateResponse
def start_update_with_replace_where(
pipeline_id: str,
replace_where_overrides: list[dict],
refresh_selection: list[str] = None,
) -> StartUpdateResponse:
"""Start a pipeline update with REPLACE WHERE predicate overrides."""
client = WorkspaceClient()
body = {
"pipeline_id": pipeline_id,
"cause": "JOB_TASK",
"update_cause_details": {
"job_details": {"performance_target": "PERFORMANCE"}
},
"replace_where_overrides": replace_where_overrides,
}
if refresh_selection:
body["refresh_selection"] = refresh_selection
res = client.api_client.do(
"POST",
f"/api/2.0/pipelines/{pipeline_id}/updates",
body=body,
headers={"Accept": "application/json", "Content-Type": "application/json"},
)
return StartUpdateResponse.from_dict(res)
DMLステートメント
パイプラインの外部から対象テーブルに対してDMLステートメントを直接実行して、初期ロードや修正(レガシーテーブルからのロードなど)を実行します。
INSERT INTO orders_enriched
SELECT *
FROM orders_enriched_legacy
WHERE date < '2025-01-01';
DML によって挿入された行は、REPLACE WHERE 述語の対象外であり、将来の実行の述語範囲内に含まれない限り、スケジュールされた更新後も保持されます。
完全更新動作
REPLACE WHERE フローの完全な更新では、現在の述語のみを使用してソースクエリが再実行されます。述語の上書きまたは現在の述語範囲外のDMLステートメントによって挿入された行は、完全に削除されます。
完全更新では、既存のデータがすべて消去され、定義された述語のみを使用してフローが再実行されます。パイプラインが7日間の述語で1年間実行されている場合、完全な更新を行うと、テーブルには過去7日間のデータのみが含まれることになります。古い行はすべて完全に削除されます。
テーブル全体の更新を防ぐには、テーブルプロパティpipelines.reset.allowedをfalseに設定します。パイプラインプロパティのリファレンスを参照してください。
増分更新
REPLACE WHERE フローでは、可能な場合は増分更新を使用し、置換ウィンドウ全体を再計算するのではなく、前回の更新以降に変更されたソースデータのみを再処理します。増分更新にはサーバーレス コンピュートが必要です。
増分更新が適用される場合
以下のすべてが真実でなければならない。
- この取り込みパイプラインはサーバレスコンピュートで実行されます。
- クエリ形状はサポートされています。 サポートされている演算子セットについては、 「増分更新」を参照してください。
- 述語は、ソーステーブルの基本列を参照します。 集計値やウィンドウ関数の出力など、派生値に対する述語はソースにプッシュできないため、増分更新が無効になります。
- 現在の置換ウィンドウ内で、外部DMLによって変更された行はありません。 現在のウィンドウ外の行を変更するDML操作は影響を受けません。
- 現在の置換ウィンドウには、前回の述語で除外された行は含まれません。 述語の範囲を拡張して、これまで処理されていなかった範囲をカバーする場合、その1回の更新処理は完全な再計算にフォールバックします。後続の更新は、再度増分更新の対象となります。
- 述語は決定論的である。
rand()のような非決定性関数を使用する述語は、増分更新を無効にします。current_date()のような時間関数は許可されています。
どのフローにおいても、最初の更新は常に完全な計算処理となります。いずれかの条件が満たされない場合、その更新処理は現在の置換ウィンドウの完全な再計算にフォールバックします。
増分更新のベストプラクティス
REPLACE WHERE フローが増分更新の対象となるように、以下のガイドラインに従ってください。
移動下限値を使用する
下限値が変動する述語は、無期限に増分更新の対象となります。
FLOW REPLACE WHERE date >= date_add(current_date(), -7)
date BETWEEN date_add(current_date(), -7) AND current_date()のような移動する上限値によって、ウィンドウが移動して以前に除外されていた行が含まれるようになり、一度限りの完全な再計算へのフォールバックがトリガーされます。
グループ BY に述語列を含める
集計を行う際は、 GROUP BYに述語列を含めてください。そうすることで、エンジンが述語を集計の下に配置できます。
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT date, region, SUM(amount) AS total
FROM sales
GROUP BY date, region;
述語列がGROUP BYに存在しない場合、述語は集計の下にプッシュできず、ソース全体がスキャンされます。
述語列を結合キーに含める
結合条件に述語列を含めることで、エンジンが結合されたすべてのソースを絞り込むことができます。
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT f.date, f.user_id, d.region, f.revenue
FROM fact f
JOIN dim d ON f.date = d.date AND f.user_id = d.user_id;
結合されたテーブルが述語列を公開していない場合、そのテーブルは更新のたびに完全にスキャンされます。
診断時に完全な再計算にフォールバックする
更新処理が完全な再計算にフォールバックした場合、その理由はフローのplanning_informationイベントで報告されます。パイプラインイベントログの監視を参照してください。以下の表は、当該事象で報告された理由を一覧にしたものです。
理由: | 意味 |
|---|---|
| 外部DMLによって、現在の置換ウィンドウ内の行が変更されました。 |
| 述語は非決定的な表現を使用する。 |
| 前回の更新では、非決定的な述語が使用されていました。 |
| 述語をどのソースにもプッシュできない、現在のウィンドウに前の述語で処理されなかった行が含まれている、または実行で述語のオーバーライドが使用されている。 |
制限事項
REPLACE WHERE フローには、以下の制限があります。
- 対象テーブルはパイプライン内で作成する必要があります。
- 対象テーブルごとに許可されるREPLACE WHERE句は1つのみです。
- REPLACE WHERE フローの対象となるテーブルは、AUTO CDC フローや追加フローなど、他のフロータイプの対象にすることはできません。
- REPLACE WHERE フローの対象となるテーブルでは、期待される動作はサポートされていません。
- Databricks SQLで作成されたストリーミングテーブルについては、構文とバックフィルに関する相違点について、スタンドアロンストリーミングテーブルのREPLACE WHEREフローを参照してください。
例
以下の例は、一般的な REPLACE WHERE フローパターンを示しています。
例1:保持期間が限定されたソースからの履歴集計データを保持する
この例では、ソーステーブルから生データが削除された後(3日間の保持期間)でも、日次集計値を無期限に保持します。
- SQL
- Python
CREATE STREAMING TABLE events_agg
FLOW REPLACE WHERE date >= date_add(current_date(), -3) BY NAME
SELECT
date,
key,
SUM(val) AS agg
FROM events_raw
GROUP BY ALL;
from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col
@dp.table(
replace_where=col("date") >= F.date_sub(F.current_date(), 3)
)
def events_agg():
return (
spark.read.table("events_raw")
.groupBy("date", "key")
.agg(F.sum("val").alias("agg"))
)
例2:ディメンションテーブルが変更されたときに再計算を防止する
この例では、ディメンション属性が変更されても、履歴事実の行は変更されません。
- SQL
- Python
CREATE STREAMING TABLE fact_dim_join
FLOW REPLACE WHERE f.date >= date_add(current_date(), -1) BY NAME
SELECT
f.date,
f.user_id,
d.region,
f.revenue
FROM fact_table f
JOIN dim_users d
ON f.user_id = d.user_id;
from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col
@dp.table(
replace_where=col("date") >= F.date_sub(F.current_date(), 1)
)
def fact_dim_join():
fact_table = spark.read.table("fact_table").alias("f")
dim_users = spark.read.table("dim_users").alias("d")
return (
fact_table.join(dim_users, col("f.user_id") == col("d.user_id"))
.select(
col("f.date"),
col("f.user_id"),
col("d.region"),
col("f.revenue"),
)
)
ユーザーの地域が変更された場合、最新の行のみが再計算されます。履歴行には、書き込み時点の地域値が保持されます。過去の行を修正するには、述語オーバーライドを使用して対象を絞ったバックフィルを実行します。
例 3: 完全な履歴を再計算せずに新しいメトリクスを追加する
この例では、テーブル定義を進化させ、対象範囲のみをバックフィルする方法を示します。
- 初期テーブルを定義します。
- SQL
- Python
CREATE STREAMING TABLE clickstream_daily
FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
SELECT
event_date,
page_id,
COUNT(*) AS clicks
FROM clickstream_raw
GROUP BY ALL;
from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col
@dp.table(
replace_where=col("event_date") >= F.date_sub(F.current_date(), 7)
)
def clickstream_daily():
return (
spark.read.table("clickstream_raw")
.groupBy("event_date", "page_id")
.agg(F.count("*").alias("clicks"))
)
- クエリを更新して
uniq_usersを追加します。
- SQL
- Python
CREATE STREAMING TABLE clickstream_daily
FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
SELECT
event_date,
page_id,
COUNT(*) AS clicks,
COUNT(DISTINCT user_id) AS uniq_users
FROM clickstream_raw
GROUP BY ALL;
@dp.table(
replace_where=col("event_date") >= F.date_sub(F.current_date(), 7)
)
def clickstream_daily():
return (
spark.read.table("clickstream_raw")
.groupBy("event_date", "page_id")
.agg(
F.count("*").alias("clicks"),
F.countDistinct("user_id").alias("uniq_users"),
)
)
-
過去 30 日間の新しいメトリクスをバックフィルします。
Pythonoverrides = [
{
"flow_name": "clickstream_daily",
"predicate_override": "event_date BETWEEN '2026-01-01' AND '2026-01-30'",
}
]
resp = start_update_with_replace_where(
pipeline_id="<pipeline-id>",
replace_where_overrides=overrides,
refresh_selection=["clickstream_daily"],
)バックフィルされた範囲よりも古い行には、
uniq_usersに対してNULLが含まれています。
例4:完全な履歴を埋め込む前に、小さなウィンドウで反復処理を行う
この例では、履歴データ全体を処理する前に、小さなデータウィンドウでクエリロジックを検証する方法を示します。
クエリを修正する間、更新のたびに過去7日間のデータのみが再計算されるように、まずは短い期間から始めましょう。
- SQL
- Python
CREATE STREAMING TABLE revenue_attribution
FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
SELECT
event_date,
campaign_id,
SUM(revenue) AS total_revenue
FROM marketing_events
GROUP BY ALL;
from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col
@dp.table(
replace_where=col("event_date") >= F.date_sub(F.current_date(), 7)
)
def revenue_attribution():
return (
spark.read.table("marketing_events")
.groupBy("event_date", "campaign_id")
.agg(F.sum("revenue").alias("total_revenue"))
)
クエリが確定したら、述語オーバーライドを使用して、一度限りの履歴データバックフィルを実行します。
overrides = [
{
"flow_name": "revenue_attribution",
"predicate_override": "event_date >= date_add(current_date(), -365)",
}
]
resp = start_update_with_replace_where(
pipeline_id="<pipeline-id>",
replace_where_overrides=overrides,
refresh_selection=["revenue_attribution"],
)