スタンドアロン ストリーミング テーブルの REPLACE WHERE フロー
ベータ版
スタンドアロン ストリーミング テーブルの REPLACE WHERE フローはベータ版です。
このページでは、REPLACE WHERE フローを使用して、テーブル履歴全体を再処理せずに、スタンドアロン ストリーミング テーブルの対象サブセットを再計算して上書きする方法について説明します。 REPLACE WHERE フローは、遅れて到着するデータ、上流の再処理、スキーマ進化、およびバックフィルを処理します。
REPLACE WHERE フローでは、対象テーブルに対して述語を定義します。述語に一致するすべての行は削除され、同じ述語範囲に対してソースクエリを再評価することで置き換えられます。述語に一致しない行はそのまま残されます。
要件
REPLACE WHERE フローには、以下の要件があります。
- ストリーミング テーブルは
PREVIEWチャンネルを使用する必要があります。 パイプライン構成のchannelを参照してください。 - DatabricksではUnity Catalogとサーバレス コンピュートを推奨しています。 増分更新はサーバレス コンピュートでのみサポートされます。
REPLACE WHERE フローを使用するタイミング
以下のシナリオでは、REPLACE WHERE フローを使用してください。
- ストリーミング セマンティクスを使用しない増分バッチ処理 : ウォーターマークなどのストリーミングの概念を管理せずに、バッチ内の新しい行を処理します。
- 選択的再処理 :述語に一致する行のみを再計算し、その他の行はそのまま残します。
- 標準的なマテリアライズドビュー機能を超えるシナリオ :
- ソーステーブルよりも保持期間の長いターゲットテーブル
- ディメンションテーブルが変更された際に再計算を防止する
- 履歴全体を再計算せずにスキーマを進化させる
REPLACE WHERE フローを作成する
FLOW REPLACE WHERE句をCREATE OR REFRESH STREAMING TABLEとインラインで使用してください。
CREATE OR REFRESH STREAMING TABLE orders_enriched
TBLPROPERTIES (pipelines.channel = 'PREVIEW')
SCHEDULE EVERY 1 DAY
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT
o.order_id,
o.date,
o.region,
p.product_name,
o.qty,
o.price
FROM orders_fct o
JOIN product_dim p
ON o.product_id = p.product_id;
更新処理中は、対象テーブル内の述語に一致するすべての行が削除され、同じ述語範囲に対してソースクエリが再計算され、新しい結果が挿入されます。この例では、 orders_enrichedから過去 7 日間のすべての行が削除され、ソース クエリを使用して再計算されます。
述語をソースクエリに追加する必要はありません。パイプラインエンジンは、ソースから読み込む際にそれを自動的に適用します。
BY NAME が必要です。これにより、列は位置ではなく名前で照合されることが保証されます。
バックフィル履歴データ
バックフィルを実行するには、対象テーブルに対してDMLステートメントを直接実行します。
INSERT INTO orders_enriched
SELECT *
FROM orders_enriched_legacy
WHERE date < '2025-01-01';
完全更新動作
REPLACE WHERE フローの完全な更新では、現在の述語のみを使用してソースクエリが再実行されます。現在の述語範囲外のDMLステートメントによって挿入された行は、完全に削除されます。
完全更新では、既存のデータがすべて消去され、定義された述語のみを使用してフローが再実行されます。パイプラインが7日間の述語で1年間実行されている場合、完全な更新を行うと、テーブルには過去7日間のデータのみが含まれることになります。古い行はすべて完全に削除されます。
REFRESH STREAMING TABLE orders_enriched FULL;
テーブル全体の更新を防ぐには、テーブルプロパティpipelines.reset.allowedをfalseに設定します。
CREATE OR REFRESH STREAMING TABLE orders_enriched
TBLPROPERTIES (pipelines.reset.allowed = 'false')
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
...
増分更新
REPLACE WHERE フローでは、可能な場合は増分更新を使用し、置換ウィンドウ全体を再計算するのではなく、前回の更新以降に変更されたソースデータのみを再処理します。増分更新にはサーバーレス コンピュートが必要です。
増分更新が適用される場合
以下のすべてが真実でなければならない。
- この取り込みパイプラインはサーバレスコンピュートで実行されます。
- クエリ形状はサポートされています。 サポートされている演算子セットについては、 「増分更新」を参照してください。
- 述語は、ソーステーブルの基本列を参照します。 集計値やウィンドウ関数の出力など、派生値に対する述語はソースにプッシュできないため、増分更新が無効になります。
- 現在の置換ウィンドウ内で、外部DMLによって変更された行はありません。 現在のウィンドウ外の行を変更するDML操作は影響を受けません。
- 現在の置換ウィンドウには、前回の述語で除外された行は含まれません。 述語の範囲を拡張して、これまで処理されていなかった範囲をカバーする場合、その1回の更新処理は完全な再計算にフォールバックします。後続の更新は、再度増分更新の対象となります。
- 述語は決定論的である。
rand()のような非決定性関数を使用する述語は、増分更新を無効にします。current_date()のような時間関数は許可されています。
どのフローにおいても、最初の更新は常に完全な計算処理となります。いずれかの条件が満たされない場合、その更新処理は現在の置換ウィンドウの完全な再計算にフォールバックします。
増分更新のベストプラクティス
REPLACE WHERE フローが増分更新の対象となるように、以下のガイドラインに従ってください。
移動下限値を使用する
下限値が変動する述語は、無期限に増分更新の対象となります。
FLOW REPLACE WHERE date >= date_add(current_date(), -7)
date BETWEEN date_add(current_date(), -7) AND current_date()のような移動する上限値によって、ウィンドウが移動して以前に除外されていた行が含まれるようになり、一度限りの完全な再計算へのフォールバックがトリガーされます。
グループ BY に述語列を含める
集計を行う際は、 GROUP BYに述語列を含めてください。そうすることで、エンジンが述語を集計の下に配置できます。
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT date, region, SUM(amount) AS total
FROM sales
GROUP BY date, region;
述語列がGROUP BYに存在しない場合、述語は集計の下にプッシュできず、ソース全体がスキャンされます。
述語列を結合キーに含める
結合条件に述語列を含めることで、エンジンが結合されたすべてのソースを絞り込むことができます。
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT f.date, f.user_id, d.region, f.revenue
FROM fact f
JOIN dim d ON f.date = d.date AND f.user_id = d.user_id;
結合されたテーブルが述語列を公開していない場合、そのテーブルは更新のたびに完全にスキャンされます。
診断時に完全な再計算にフォールバックする
更新処理が完全な再計算にフォールバックした場合、その理由はフローのplanning_informationイベントで報告されます。パイプラインイベントログの監視を参照してください。以下の表は、当該事象で報告された理由を一覧にしたものです。
理由: | 意味 |
|---|---|
| 外部DMLによって、現在の置換ウィンドウ内の行が変更されました。 |
| 述語は非決定的な表現を使用する。 |
| 前回の更新では、非決定的な述語が使用されていました。 |
| 述語をどのソースにもプッシュできない、現在のウィンドウに前の述語で処理されなかった行が含まれている、または実行で述語のオーバーライドが使用されている。 |
例
以下の例は、一般的な REPLACE WHERE フローパターンを示しています。
例1:保持期間が限定されたソースからの履歴集計データを保持する
この例では、ソーステーブルから生データが削除された後(3日間の保持期間)でも、日次集計値を無期限に保持します。
CREATE OR REFRESH STREAMING TABLE events_agg
FLOW REPLACE WHERE date >= date_add(current_date(), -3) BY NAME
SELECT
date,
key,
SUM(val) AS agg
FROM events_raw
GROUP BY ALL;
例2:ディメンションテーブルが変更されたときに再計算を防止する
この例では、ディメンション属性が変更されても、履歴事実の行は変更されません。
CREATE OR REFRESH STREAMING TABLE fact_dim_join
FLOW REPLACE WHERE f.date >= date_add(current_date(), -1) BY NAME
SELECT
f.date,
f.user_id,
d.region,
f.revenue
FROM fact_table f
JOIN dim_users d
ON f.user_id = d.user_id;
ユーザーの地域が変更された場合、最新の行のみが再計算されます。履歴行には、書き込み時点の地域値が保持されます。
例 3: 完全な履歴を再計算せずに新しいメトリクスを追加する
この例では、テーブル定義を進化させ、対象範囲のみをバックフィルする方法を示します。
-
初期テーブルを定義します。
SQLCREATE OR REFRESH STREAMING TABLE clickstream_daily
FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
SELECT
event_date,
page_id,
COUNT(*) AS clicks
FROM clickstream_raw
GROUP BY ALL; -
クエリを更新して
uniq_usersを追加します。SQLCREATE OR REFRESH STREAMING TABLE clickstream_daily
FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
SELECT
event_date,
page_id,
COUNT(*) AS clicks,
COUNT(DISTINCT user_id) AS uniq_users
FROM clickstream_raw
GROUP BY ALL;7日間のウィンドウより古い行には、
uniq_usersに対してNULLが含まれています。
例4:完全な履歴を埋め込む前に、小さなウィンドウで反復処理を行う
この例では、履歴データ全体を処理する前に、小さなデータウィンドウでクエリロジックを検証する方法を示します。
短いウィンドウでメトリクスを検証し、より低いコンピュート コストでビジネス ロジックを反復処理することから始めます。
CREATE OR REFRESH STREAMING TABLE revenue_attribution
FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
SELECT
event_date,
campaign_id,
SUM(revenue) AS total_revenue
FROM marketing_events
GROUP BY ALL;
短いウィンドウでは更新ごとに過去 7 日間のみが再計算されるため、完全な履歴実行をコミットする前に、必要なだけクエリを修正してください。
クエリが完了したら、DMLを使用して履歴範囲全体をバックフィルします。
INSERT INTO revenue_attribution
SELECT
event_date,
campaign_id,
SUM(revenue) AS total_revenue
FROM marketing_events
WHERE event_date < date_add(current_date(), -7)
GROUP BY ALL;