メインコンテンツまでスキップ

LakeFlow Spark宣言型パイプラインのベスト プラクティス

Lakeflow Spark宣言型パイプラインを使用してパイプラインを設計、構築、および運用する際に、新しいパイプラインを開始する場合でも、既存のパイプラインを改善する場合でも、これらの推奨パターンを適用してください。

適切なデータセットタイプを選択する

LakeFlow Spark宣言型パイプラインは、ストリーミング テーブル、マテリアライズドビュー、一時ビューの 3 つのデータセット タイプを提供します。 パイプラインの各層に適切なタイプを選択すると、不必要なコンピュート コストが回避され、コードの推論が容易になります。

ストリーミング テーブルは、 データ取り込みと低遅延のストリーミング変換に最適です。 各入力行は 1 回だけ読み取られて処理されるため、追加専用のワークロード、大量のデータ、クラウド ストレージまたはメッセージ バスからのイベント駆動型処理に最適です。

マテリアライズドビューは、複雑な変換や分析クエリに適しています。それらの結果は事前に計算され、増分更新を使用して常に最新の状態に保たれるため、それらに対するクエリは高速です。マテリアライズドビューのデータを直接変更することはできません;出力はクエリ定義によって制御されます。

一時ビューは 、データをストレージに具体化せずに変換ロジックを整理するパイプライン スコープのビューです。独自のテーブルを必要としない中間ステップに使用します。

次の表は、各タイプをいつ使用するかをまとめたものです。

ユースケース

推奨タイプ

理由:

クラウドストレージまたはメッセージバスからの取り込み

ストリーミングテーブル

各レコードを 1 回処理し、大量の追加専用のワークロードを処理します。

CDC ストリーム (挿入、更新、削除)

ストリーミングテーブル

順序付けられた重複排除された CDC 取り込みのAPPLY CHANGES INTOのターゲットとして使用されます。

複雑な集計と結合

マテリアライズドビュー

増分的に更新され、更新ごとに完全な再計算が回避されます。

ダッシュボードクエリの高速化

マテリアライズドビュー

コンピュート前の結果により、生のテーブルに対するクエリよりもクエリが高速になります。

中間変換(下流のリーダーなし)

一時的ビュー

ストレージ コストを発生させずにパイプライン ロジックを整理します。

詳細については、ストリーミングテーブルマテリアライズドビュー、およびLakeflow Spark宣言型パイプラインとはを参照してください。

命令型のMERGEの代わりに宣言型のCDCを使用する

命令型SQL MERGEステートメントを使用してチェンジデータ キャプチャ ( CDC ) を実装するには、イベントの順序付け、重複排除、部分的な更新、スキーマ進化を正しく処理するための大幅なカスタム コードが必要です。 これらの懸念事項はそれぞれ個別に解決する必要があり、結果として得られるコードは保守やテストが困難になります。

Lakeflow Spark宣言型パイプラインは、順序付け、重複排除、順序外イベント、スキーマ進化を宣言的に処理するAPPLY CHANGES INTOステートメント (SQL) とapply_changes()関数 (Python) を提供します。変更フィードとターゲットテーブルの形状を記述すると、残りの処理はパイプラインが行います。APPLY CHANGES INTO は SCDタイプ1(上書き)と SCDタイプ2(履歴保持)の両方をサポートしています。

詳細については、チェンジデータ キャプチャとスナップショット、およびAUTO CDC APIs : パイプラインを使用したチェンジデータ キャプチャの簡素化を参照してください。

期待に沿ったデータ品質の強化

期待値は、データセットを通過するすべての行に適用される true/false の SQL 式です。行が条件に違反すると、パイプラインは設定した違反ポリシーに従って応答します。期待はポリシーに関係なくメトリクスをパイプライン イベント ログに出力するため、長期にわたるデータ品質の傾向を追跡できます。

違反ポリシーを選択してください

3 つの違反ポリシーが利用可能です。不良データに対する許容範囲に一致するものを 1 つ選択します。

  • warn (安全): 無効なレコードはターゲットテーブルに書き込まれ、メトリクスでフラグが付けられます。 すべてのデータを取得する必要があるが、品質の問題を可視化する必要がある場合は、このポリシーを使用します。
  • drop : 有効でないレコードは書き込み前に破棄されます。不正な行が予想され、下流に伝播しないようにする場合にこれを使用します。
  • 失敗 : パイプラインの更新は最初の無効なレコードで停止します。不良レコードが重大なアップストリームの問題を示している重要なデータにこれを使用します。

次の例は、ストリーミング テーブルに適用される各ポリシーを示しています。

SQL
-- Warn: write invalid records but track them in metrics
CREATE OR REFRESH STREAMING TABLE orders_raw (
CONSTRAINT valid_order_id EXPECT (order_id IS NOT NULL)
) AS SELECT * FROM STREAM read_files("/volumes/raw/orders", format => "json");

-- Drop: discard invalid records before writing
CREATE OR REFRESH STREAMING TABLE orders_clean (
CONSTRAINT non_negative_amount EXPECT (amount >= 0) ON VIOLATION DROP ROW
) AS SELECT * FROM STREAM(orders_raw);

-- Fail: stop the pipeline on any invalid record
CREATE OR REFRESH STREAMING TABLE orders_critical (
CONSTRAINT required_customer_id EXPECT (customer_id IS NOT NULL) ON VIOLATION FAIL UPDATE
) AS SELECT * FROM STREAM(orders_clean);

無効なレコードを隔離する

削除されたレコードを黙って破棄するのではなく、調査のために保存したい場合は、検疫パターンを使用します。2 つのフローを使用して、検証に失敗した行を別のストリーミング テーブルにルーティングします。1 つはメイン テーブルから無効な行を削除し、もう 1 つは無効な行のみを隔離テーブルに書き込みます。 これにより、クリーンなデータセットを汚染することなく、不良データを調査、修正、再処理できます。

隔離パターンの詳細な例については、 「期待される推奨事項と高度なパターン」を参照してください。

期待値の詳細については、 「パイプラインの期待値によるデータ品質の管理」を参照してください。

パイプラインをパラメータ化する

パイプラインにはデフォルトのカタログとスキーマ設定があるため、同じカタログとスキーマ内で読み書きするコードは、パラメーターなしで複数の環境で機能します。しかし、パイプラインが2番目のカタログまたはスキーマを参照する必要がある場合(たとえば、開発と本番運用で異なる共有ソースカタログから読み取る場合)、それらの名前をソースコードに直接ハードコーディングすることは避けてください。代わりに、それらをパイプラインの構成パラメーター(パイプライン設定で設定されたキーと値のペア)として定義し、コードで参照します。これにより、パラメーターの値を入れ替えることで、単一のコードベースを複数の環境で正しく実行できるようになります。

SQL
CREATE OR REFRESH MATERIALIZED VIEW transaction_summary AS
SELECT account_id, COUNT(txn_id) AS txn_count, SUM(amount) AS total_amount
FROM ${source_catalog}.sales.transactions
GROUP BY account_id;

詳細については、 「パイプラインでの使用」を参照してください。

トリガー型パイプラインモードと連続型パイプラインモードのどちらかを選択してください。

トリガー モードでは 、利用可能なすべてのデータを処理してから停止します。これは、スケジュールに従って (時間ごと、日ごと、またはオンデマンドで) 実行され、1 分未満のデータの鮮度を必要としないパイプラインの大半にとって適切な選択です。

継続モードでは 、クラスターの実行が継続され、新しいデータが到着すると処理されます。これは、ユースケースで数秒から数分の範囲の遅延が必要な場合にのみ適切です。継続モードでは常時稼働のクラスターが必要となるため、トリガー モードよりも大幅にコストが高くなります。

リアルタイムモード は、連続モードに基づいて構築されており、不正検出やリアルタイムのパーソナライゼーションなどの運用ワークロード向けに、サブ秒、ミリ秒単位の低遅延を達成します。これには追加の構成とコンピュートの計画が必要です。LakeFlow Spark宣言型パイプラインでリアルタイムモードを使用するを参照してください。

詳細については、トリガー パイプライン モードと継続的パイプライン モードおよびパイプラインの構成を参照してください。

データレイアウトにリキッドクラスタリングを使用する

リキッドクラスタリングは、静的パーティショニングとZORDERを置き換えて、 Deltaテーブルのデータ レイアウトを最適化します。 静的パーティショニングでは、事前にパーティション列を選択してデータを再編成する必要があるため、値が不均等に分布している場合、データの偏りが生じる可能性があります。リキッドクラスタリングは自己調整、スキュー耐性、増分性があり、実行ごとに再編成が必要なデータのみを再書き込みします。

クエリ パターンの変化に応じてテーブル全体を書き換えることなく、いつでもクラスタリング列を変更できます。

Databricks では、クエリのワークロードに基づいて最適なクラスタリング列を Databricks が選択および維持できるようにする自動リキッドクラスタリングを推奨しています。CLUSTER BY AUTOで有効にする

SQL
CREATE OR REFRESH STREAMING TABLE events
CLUSTER BY AUTO
AS SELECT * FROM STREAM read_files("/volumes/raw/events", format => "parquet");

クラスタリング列を自分で選択するには、明示的に指定してください:

SQL
CREATE OR REFRESH STREAMING TABLE events
CLUSTER BY (event_date, region)
AS SELECT * FROM STREAM read_files("/volumes/raw/events", format => "parquet");

詳細については、 「ストリーミングテーブル」と「テーブルにリキッドクラスタリングを使用する」を参照してください。

CI/CDと宣言型自動化バンドルを使用してパイプラインを管理する

パイプラインのソースコードをバージョン管理し、宣言型自動化バンドルを使用して複数の環境にわたるデプロイメントを管理します。

詳細については、 「ソース制御のパイプラインの作成」「パイプラインをバンドル プロジェクトに変換する」 、および「パイプラインでの監視の使用」を参照してください。

パイプラインコードをバージョン管理に保存する

すべてのパイプライン ソース ファイル (Python および SQL) をバンドル構成と一緒に Git リポジトリに保存します。プロジェクト全体をバージョン管理すると、変更の完全な履歴が提供され、共同作業が容易になり、開発環境で変更を本番運用に昇格する前に検証できるようになります。

Databricksは、このワークフローを管理するために宣言型自動化バンドルを推奨しています。バンドルは、ソース コードとともに YAML でパイプライン構成を定義します。また、 databricks bundle CLI使用すると、ターミナルまたはCI/CDシステムからパイプラインを検証、デプロイ、実行できます。

環境分離にバンドルターゲットを使用する

バンドルでは複数の ターゲット (例: devstagingprod ) が有効になり、それぞれのターゲットにカタログ名、クラスターポリシー、通知アドレス、その他の設定に対する独自のオーバーライドのセットが含まれます。 バンドル ターゲットとパイプライン問題を組み合わせて、デプロイ時に正しい環境固有の値を注入し、ソース コードに環境定数を含まないようにします。

典型的なワークフローは次のようになります。

  1. 開発者は機能ブランチで作業し、開発カタログ内の個人開発パイプラインにデプロイします。
  2. メイン ブランチへのマージ時に、CI システムはdatabricks bundle validatedatabricks bundle deploy --target stagingを実行してパイプラインを検証し、ステージング環境にデプロイします。
  3. テストに合格すると、CI システムはdatabricks bundle deploy --target prodを使用して本番運用にデプロイされます。

ストリーミングのベストプラクティス

これらのパターンを使用して、状態を管理し、遅延データを制御し、ストリーミング パイプラインの信頼性を維持します。

詳細については、 「ウォーターマークを使用したステートフル処理の最適化」「ストリーミング チェックポイントの失敗からパイプラインを回復する」 、および「パイプラインを使用したヒストリカル データのバックフィル」を参照してください。

ステートフル操作にウォーターマークを使用する

ウォーターマークは、ウィンドウ集約や重複排除などのステートフル ストリーミング操作中にパイプラインがメモリに保持する状態を制限します。ウォーターマークがないと、パイプラインが考えられるすべてのキーのデータを蓄積するにつれて状態が際限なく増大し、最終的には長時間実行されるパイプラインでメモリ不足エラーが発生します。

ウォーターマークは、タイムスタンプ列と遅延データの許容しきい値を指定します。しきい値を超えた後に到着したレコードは削除されます。遅延データに対する許容度と、その状態を開いたままにするためのメモリ コストのバランスをとるしきい値を選択します。

次の例は、1 分間のタンブリング ウィンドウ集計を 3 分間のウォーターマークでコンピュートします。

SQL
CREATE OR REFRESH STREAMING TABLE event_counts AS
SELECT window(event_time, '1 minute') AS time_window, region, COUNT(*) AS cnt
FROM STREAM(events_raw)
WATERMARK event_time DELAY OF INTERVAL 3 MINUTES
GROUP BY time_window, region;
注記

集計が更新ごとに完全に再計算されるのではなく、増分的に処理されるようにするには、ウォーターマークを定義する必要があります。

ストリーミング状態と完全更新を理解する

ストリーミング状態は増分的です。パイプラインは、毎回最初から再計算するのではなく、更新全体にわたって状態を構築して維持します。これにより、ステートフル ストリーミングが効率的になりますが、ステートフル クエリのロジックを変更すると (たとえば、ウォーターマークしきい値の変更や集計列の変更など)、既存の状態は新しいロジックと互換性がなくなります。この場合、完全な更新を実行して、新しいロジックですべての履歴データを再処理し、状態を最初から再構築する必要があります。

ソースに履歴データが保持されていない場合、完全に更新するとデータが失われる可能性もあります。 たとえば、保持期間が短い Kafka ソースでは、更新時に最後の数分間のデータしか利用できない可能性があり、その結果、テーブルに含まれるデータは以前よりもはるかに少なくなります。特に、完全な更新にコストがかかる大容量のストリームや、ソースのデータ保持が制限されているストリームの場合は、ステートフル クエリ ロジックの変更を慎重に計画してください。メダリオンアーキテクチャを使用すると、最小限の変換でブロンズ テーブルを作成でき、完全な履歴を含むブロンズ テーブルからシルバーまたはゴールド テーブルを再計算できます。

ストリーム-ストリーム結合

ストリームストリーム結合には、結合の 両側 にウォーターマークと時間制限のある結合条件が必要です。 結合条件の時間間隔は、ストリーミング エンジンにそれ以上一致できなくなったことを伝え、一致できなくなった状態を排除できるようにします。ウォーターマークまたは時間制限条件のいずれかを省略すると、状態は無制限に増加します。

次の例では、広告インプレッション イベントとクリック イベントを結合し、インプレッションから 3 分以内にクリックが発生することを要求します。

SQL
CREATE OR REFRESH STREAMING TABLE impression_clicks AS
SELECT imp.ad_id, imp.impression_time, clk.click_time
FROM STREAM(ad_impressions)
WATERMARK impression_time DELAY OF INTERVAL 3 MINUTES AS imp
JOIN STREAM(user_clicks)
WATERMARK click_time DELAY OF INTERVAL 3 MINUTES AS clk
ON imp.ad_id = clk.ad_id
AND clk.click_time BETWEEN imp.impression_time
AND imp.impression_time + INTERVAL 3 MINUTES;

ストリームを静的テーブルに結合すると (スナップショット結合)、各マイクロバッチの開始時に静的テーブルのスナップショットが更新されます。つまり、遅れて到着したディメンション レコードは、すでに処理されたファクトに遡って適用されることはありません。遡及適用が必要な場合は、マテリアライズドビューを使用するか、パイプラインを再構築してください。

パイプラインのパフォーマンスを最適化する

これらのテクニックを適用してコンピュート コストを削減し、パイプラインの更新を高速化します。

詳細については、 「マテリアライズドビュー」「ウォーターマークを使用したステートフル処理の最適化」を参照してください。

小さなファイルを避ける

低ボリュームのソースでパイプラインを頻繁にトリガーすると、多数の小さなファイルがクラウド ストレージに書き込まれます。小さなファイルでは、各ファイルに個別のメタデータ検索と I/O ラウンドトリップが必要になり、クラウド ストレージAPIs大規模なリスト操作が制限されるため、読み取りパフォーマンスが低下します。 これを回避するには、データ量に一致するトリガー間隔を選択します。トリガーされたパイプラインを、継続的にではなく、更新の間に意味のある量のデータが蓄積されるスケジュールで実行します。

データの偏りを処理する

データ スキューは、結合キーまたは groupBy キーの値がパーティション間で不均等に分散され、少数のタスクでデータの大部分を処理する場合に発生します。これにより、エンドツーエンドの更新時間が長くなるホットスポットが作成されます。リキッドクラスタリングを使用して、格納されたテーブルのスキューに対処します。 実行中の計算中に発生する偏りについては、2 段階でグループ化および集約する前に、ランダムなバケット サフィックスを追加して、偏りの大きいキーにソルトを適用します。

詳細については、 「データ レイアウトにリキッドクラスタリングを使用する」を参照してください。

マテリアライズドビューに増分更新を使用する

大規模な集計にマテリアライズドビューを使用すると、Lakeflow Spark宣言型パイプラインは、結果セット全体を再計算するのではなく、最後の更新以降のアップストリームの変更のみを処理して、増分更新を試みます。増分更新は、パイプラインのトリガーごとにクエリを最初から再実行するよりも、はるかに安価です。マテリアライズドビューが増分的に更新される可能性を最大化するには、シンプルで決定論的な集計クエリを記述し、非決定論的な関数など、増分処理を妨げる構造を避けてください。

マテリアライズドビューについては、「増分更新」を参照してください。

結合を最適化する

片側が小さなディメンション テーブルである結合の場合、シャッフル結合を実行する代わりに、小さい方のテーブルをすべての エグゼキューター にブロードキャストするようにSparkに指示するブロードキャスト ヒントを追加します。

SQL
CREATE OR REFRESH MATERIALIZED VIEW enriched_orders AS
SELECT o.*, /*+ BROADCAST(p) */ p.product_name, p.category
FROM orders o
JOIN products p ON o.product_id = p.product_id;

時系列の近接結合(たとえば、時間範囲内で最も近いイベントの検索)の場合は、範囲結合条件を使用し、ストリームを結合する場合は両側にウォーターマークがあることを確認するか、結合する前にイベントを時間バケットに事前にビン分けすることを検討してください。

パイプラインを監視する

パイプライン イベント ログは、 LakeFlow Spark宣言型パイプラインの主要な可観測性プリミティブです。 パイプを実行するたびに、実行の進行状況、データ品質の期待結果、ライン データ リネージ、エラーの詳細を含む構造化レコードがイベント ログに書き込まれます。 イベント ログは、直接クエリできる Delta テーブルです。

基になるストレージ パスを知らずにイベント ログをクエリするには、共有クラスターまたはSQLウェアハウスでevent_log()テーブル値関数を使用します。

SQL
SELECT * FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
ORDER BY timestamp DESC
LIMIT 100;

期待メトリクスのイベント ログをクエリして、データ品質ダッシュボードを構築します。 details列には、各制約の合格/不合格数を含むネストされたJSON構造が含まれており、これを使用して時間の経過に伴う品質の傾向を追跡し、回帰をアラートできます。

イベント ドリブン アラートの場合、パイプラインが失敗したときやデータ品質のしきい値を超えたときに、イベント フックを使用してカスタム Webhook または通知サービス (Slack や PagerDuty など) をトリガーします。イベント フックは、パイプライン イベントに応答して実行される Python 関数です。

詳細については、 「パイプラインの監視」「パイプライン イベント ログ」 、および「イベント フックを使用したパイプラインのカスタム モニタリングの定義」を参照してください。

サーバレスコンピュートを使用する

Databricksは新しいパイプラインにはサーバレス コンピュートを推奨しています。Databricksがインフラストラクチャを自動的に管理するため、サーバーレスでは手動のクラスター構成は不要です。サーバレス パイプラインは、ワークロードの需要に応じて水平方向 (エグゼキューター数の増加) と垂直方向 (エグゼキューター サイズの拡大) の両方にスケールできる拡張オートスケールを使用します。サーバレス パイプラインは常に Unity Catalog を使用するため、ガバナンスとリネージの追跡がデフォルトで組み込まれています。

詳細については、 「サーバレス パイプラインの構成」を参照してください。

メダリオンアーキテクチャでパイプラインを整理する

メダリオンアーキテクチャは、それぞれ明確な目的を持つ3つの論理レイヤー(ブロンズ、シルバー、ゴールド)にデータを整理します。Lakeflow Spark宣言型パイプラインのデータセットタイプを適切なレイヤーにマッピングすることで、各レイヤーの責務が明確になり、パイプラインの保守が容易になります。

  • ブロンズ : ストリーミング テーブルを使用して、クラウド ストレージ、メッセージ バス、またはCDCソースから生データを取り込みます。 ブロンズ テーブルでは、最小限の変換で生のソース データが保存されるため、要件が変更された場合にシルバーまたはゴールドレイヤーがブロンズ レイヤーのソースから再処理できるようになります。
  • シルバー : 行レベルの増分変換 (フィルタリング、クリーニング、解析) にはストリーミング テーブルを使用します。 シルバーレイヤー ロジックに、増分更新の恩恵を受けるディメンション テーブルまたは複雑な集計に対するエンリッチメント結合が含まれる場合は、マテリアライズドビューを使用します。
  • ゴールド : マテリアライズドビューを使用して、ダッシュボード、レポート ツール、および下流の消費者に提供される集計、メトリクス、概要を事前にコンピュートします。

可能な限り、取り込み(ブロンズ)と変換(シルバーとゴールド)を別々のパイプラインに分離してください。各レイヤーを分離することで、各レイヤーを個別にスケジュール設定、監視、トラブルシューティングすることが可能になり、変換パイプラインの障害が発生しても、新しいデータがBronzeに取り込まれるのを妨げることはありません。

詳細については、ストリーミングテーブルマテリアライズドビューを参照してください。