Databricks SQL の REPLACE WHERE フロー
ベータ版
Databricks SQLのストリーミング テーブルの REPLACE WHERE フローはベータ版です。
このページでは、REPLACE WHERE フローを使用して、テーブル履歴全体を再処理せずに、 Databricks SQLでストリーミング テーブルの対象サブセットを再計算して上書きする方法について説明します。 REPLACE WHERE フローは、遅れて到着するデータ、上流の再処理、スキーマ進化、およびバックフィルを処理します。
REPLACE WHERE フローでは、対象テーブルに対して述語を定義します。述語に一致するすべての行が削除され、その後、その述語範囲に対してソースクエリが再評価され、新しい結果が挿入されます。述語に一致しない行はそのまま残されます。
Python構文やバックフィル用の述語オーバーライドを含む完全な機能リファレンスについては、 REPLACE WHERE flowsを参照してください。
要件
Databricks SQLでREPLACE WHEREフローを作成する前に、以下の点を確認してください。
-
ストリーミング テーブルは
PREVIEWチャンネルを使用する必要があります。pipelines.channelテーブル プロパティを使用してチャンネルを設定します。SQLCREATE STREAMING TABLE st_preview
TBLPROPERTIES (pipelines.channel = 'PREVIEW')
... -
Databricks 、最高のエクスペリエンスを得るためにUnity Catalogとサーバーレス コンピュートを推奨しています。
REPLACE WHERE フローを使用するタイミング
REPLACE WHERE フローは、以下のシナリオに最適です。
- ストリーミング セマンティクスを使用しない増分バッチ処理: ウォーターマークなどのストリーミングの概念を管理せずに、バッチ内の新しい行を処理します。
- 選択的再処理: 述語に一致する行のみを再計算し、その他の行はそのまま残します。
- 標準的なマテリアライズドビュー機能を超えるシナリオ:
- ソーステーブルよりも保持期間の長いターゲットテーブル
- ディメンションテーブルが変更された際に再計算を防止する
- 履歴全体を再計算せずにスキーマを進化させる
述語設計ガイドライン
集計列または派生列に対して REPLACE WHERE 述語を使用することは避けてください。例えば、 total_salesがSUM()であるtotal_sales > 100000のような述語では、エンジンは実行ごとにすべてのパーティションの集計を再計算する必要があります。dateやregionなどの基本列に述語を使用することで、エンジンはフィルタをソースまでプッシュダウンし、関連するデータのみを処理できます。
REPLACE WHERE フローを作成する
FLOW REPLACE WHERE句をCREATE OR REFRESH STREAMING TABLEとインラインで使用してください。
CREATE OR REFRESH STREAMING TABLE orders_enriched
SCHEDULE EVERY 1 DAY
FLOW REPLACE WHERE date >= date_add(current_timestamp(), -7) BY NAME
SELECT
o.order_id,
o.date,
o.region,
p.product_name,
o.qty,
o.price
FROM orders_fct o
JOIN product_dim p
ON o.product_id = p.product_id;
更新処理中は、対象テーブル内の述語に一致するすべての行が削除され、同じ述語範囲に対してソースクエリが再計算され、新しい結果が挿入されます。この例では、 orders_enrichedから過去 7 日間のすべての行が削除され、ソースクエリから再計算されます。
述語をソースクエリに追加する必要はありません。パイプラインエンジンは、ソースから読み込む際にそれを自動的に適用します。
BY NAME が必要です。位置ではなく名前で列を照合します。
Databricks SQLで作成されたストリーミングテーブルでは、 CREATE FLOW構文とPython構文はサポートされていません。REPLACE WHERE句をCREATE OR REFRESH STREAMING TABLEステートメント内にインラインで定義します。
DMLステートメントによるバックフィル
Databricks SQLで作成されたストリーミングテーブルでは、述語の上書きはサポートされていません。ヒストリカルデータのロード、特定の期間の列の修正、レガシー テーブルからのロードなどのバックフィルを実行するには、ターゲット テーブルで DML ステートメントを直接実行します。
INSERT INTO orders_enriched
SELECT *
FROM orders_enriched_legacy
WHERE date < '2025-01-01';
DML によって挿入された行は、REPLACE WHERE 述語の対象外であり、将来の実行の述語範囲内に含まれない限り、スケジュールされた更新後も保持されます。
述語のオーバーライドをサポートするパイプラインについては、 「述語のオーバーライドを使用してバックフィルを実行する」を参照してください。
完全更新動作
REPLACE WHERE フローの完全更新では、述語範囲のみが再実行され、ソースクエリ全体が再実行されるわけではありません。完全な更新を実行する前に、以下の警告を確認してください。
完全更新では、既存のデータがすべて消去され、定義された述語のみを使用してフローが再実行されます。テーブルが7日間の条件で1年間更新されていた場合、完全更新を行うと、テーブルには直近7日間のデータのみが含まれることになります。古い行はすべて完全に削除されます。
REFRESH STREAMING TABLE orders_enriched FULL;
テーブル全体の更新を防ぐには、テーブルプロパティpipelines.reset.allowedをfalseに設定します。
CREATE OR REFRESH STREAMING TABLE orders_enriched
TBLPROPERTIES (pipelines.reset.allowed = 'false')
FLOW REPLACE WHERE date >= date_add(current_timestamp(), -7) BY NAME
...
例
以下の例は、Databricks SQLにおける一般的なREPLACE WHERE句のフローパターンを示しています。
保存期間が限定されたソースからの過去の集計データを保持する
この例では、ソーステーブルの保持期間が3日間であるにもかかわらず、生データが古くなっても、日次集計値を無期限に保持します。
CREATE OR REFRESH STREAMING TABLE events_agg
FLOW REPLACE WHERE date >= date_add(current_date(), -3) BY NAME
SELECT
date,
key,
SUM(val) AS agg
FROM events_raw
GROUP BY ALL;
ディメンションテーブルが変更されたときに再計算を防止する
この例では、ディメンション属性が変更されても、履歴事実の行は変更されません。
CREATE OR REFRESH STREAMING TABLE fact_dim_join
FLOW REPLACE WHERE f.date >= date_add(current_date(), -1) BY NAME
SELECT
f.date,
f.user_id,
d.region,
f.revenue
FROM fact_table f
JOIN dim_users d
ON f.user_id = d.user_id;
ユーザーの地域が変更された場合、最新の行のみが再計算されます。履歴行には、書き込み時点の地域値が保持されます。
全履歴を再計算せずに新しいメトリクスを追加
この例では、テーブル定義を進化させ、対象範囲のみをバックフィルする方法を示します。
-
初期テーブルを定義します。
SQLCREATE OR REFRESH STREAMING TABLE clickstream_daily
FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
SELECT
event_date,
page_id,
COUNT(*) AS clicks
FROM clickstream_raw
GROUP BY ALL; -
クエリを更新して
uniq_usersを追加します。SQLCREATE OR REFRESH STREAMING TABLE clickstream_daily
FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
SELECT
event_date,
page_id,
COUNT(*) AS clicks,
COUNT(DISTINCT user_id) AS uniq_users
FROM clickstream_raw
GROUP BY ALL; -
DML を使用して新しいメトリクスをバックフィルします。
SQLINSERT INTO clickstream_daily
SELECT
event_date,
page_id,
COUNT(*) AS clicks,
COUNT(DISTINCT user_id) AS uniq_users
FROM clickstream_raw
WHERE event_date BETWEEN '2026-01-01' AND '2026-01-30'
GROUP BY ALL;バックフィルされた範囲よりも古い行には、
uniq_usersに対してNULLが含まれています。