LakeFlow Spark宣言型パイプラインのベスト プラクティス
このページでは、 LakeFlow Spark宣言型パイプラインを使用してパイプラインを設計、構築、運用するための推奨パターンについて説明します。 新しいパイプラインを開始するとき、または既存のパイプラインを改善するときには、これらのガイドラインを適用します。
適切なデータセットタイプを選択する
LakeFlow Spark宣言型パイプラインは、ストリーミング テーブル、マテリアライズドビュー、一時ビューの 3 つのデータセット タイプを提供します。 パイプラインの各層に適切なタイプを選択すると、不必要なコンピュート コストが回避され、コードの推論が容易になります。
ストリーミング テーブルは、 データ取り込みと低遅延のストリーミング変換に最適です。 各入力行は 1 回だけ読み取られて処理されるため、追加専用のワークロード、大量のデータ、クラウド ストレージまたはメッセージ バスからのイベント駆動型処理に最適です。
マテリアライズドビューは、 複雑な変換や分析クエリに最適です。 これらの結果はコンピュート前に作成され、増分更新を使用して最新に保たれるため、それらに対するクエリは高速です。 マテリアライズドビューのデータを直接変更することはできません。クエリ定義によって出力が制御されます。
一時ビューは 、データをストレージに具体化せずに変換ロジックを整理するパイプライン スコープのビューです。独自のテーブルを必要としない中間ステップに使用します。
次の表は、各タイプをいつ使用するかをまとめたものです。
ユースケース | 推奨タイプ | 理由: |
|---|---|---|
クラウドストレージまたはメッセージバスからの取り込み | ストリーミングテーブル | 各レコードを 1 回処理し、大量の追加専用のワークロードを処理します。 |
CDC ストリーム (挿入、更新、削除) | ストリーミングテーブル | 順序付けられた重複排除された CDC 取り込みの |
複雑な集計と結合 | マテリアライズドビュー | 増分的に更新され、更新ごとに完全な再計算が回避されます。 |
ダッシュボードクエリの高速化 | マテリアライズドビュー | コンピュート前の結果により、生のテーブルに対するクエリよりもクエリが高速になります。 |
中間変換(下流のリーダーなし) | 一時的ビュー | ストレージ コストを発生させずにパイプライン ロジックを整理します。 |
詳細については、ストリーミングテーブル 、マテリアライズドビュー 、およびLakeFlowSpark 宣言型パイプラインの概念を 参照してください。
命令型のMERGEの代わりに宣言型のCDCを使用する
命令型SQL MERGEステートメントを使用してチェンジデータ キャプチャ ( CDC ) を実装するには、イベントの順序付け、重複排除、部分的な更新、スキーマ進化を正しく処理するための大幅なカスタム コードが必要です。 これらの懸念事項はそれぞれ個別に解決する必要があり、結果として得られるコードは保守やテストが困難になります。
LakeFlow Spark宣言型パイプラインは、順序付け、重複排除、順序外れのイベント、スキーマ進化を宣言的に処理するAPPLY CHANGES INTOステートメント ( SQL ) とapply_changes()関数 ( Python ) を提供します。 変更フィードとターゲット テーブルの形状を記述すると、パイプラインが残りの処理を行います。APPLY CHANGES INTO 、SCD タイプ 1 (上書き) と SCD タイプ 2 (履歴の保存) の両方をサポートします。
詳細については、 「チェンジデータキャプチャ ( CDC ) とは何ですか?」を参照してください。 およびAUTO CDC APIs : パイプラインを使用して変更データのキャプチャを簡素化します。
期待に沿ったデータ品質の強化
期待値は、データセットを通過するすべての行に適用される true/false の SQL 式です。行が条件に違反すると、パイプラインは設定した違反ポリシーに従って応答します。期待はポリシーに関係なくメトリクスをパイプライン イベント ログに出力するため、長期にわたるデータ品質の傾向を追跡できます。
違反ポリシーを選択してください
3 つの違反ポリシーが利用可能です。不良データに対する許容範囲に一致するものを 1 つ選択します。
- warn (安全): 無効なレコードはターゲットテーブルに書き込まれ、メトリクスでフラグが付けられます。 すべてのデータを取得する必要があるが、品質の問題を可視化する必要がある場合は、このポリシーを使用します。
- drop : 有効でないレコードは書き込み前に破棄されます。不正な行が予想され、下流に伝播しないようにする場合にこれを使用します。
- 失敗 : パイプラインの更新は最初の無効なレコードで停止します。不良レコードが重大なアップストリームの問題を示している重要なデータにこれを使用します。
次の例は、ストリーミング テーブルに適用される各ポリシーを示しています。
- SQL
- Python
-- Warn: write invalid records but track them in metrics
CREATE OR REFRESH STREAMING TABLE orders_raw (
CONSTRAINT valid_order_id EXPECT (order_id IS NOT NULL)
) AS SELECT * FROM STREAM read_files("/volumes/raw/orders", format => "json");
-- Drop: discard invalid records before writing
CREATE OR REFRESH STREAMING TABLE orders_clean (
CONSTRAINT non_negative_amount EXPECT (amount >= 0) ON VIOLATION DROP ROW
) AS SELECT * FROM STREAM(orders_raw);
-- Fail: stop the pipeline on any invalid record
CREATE OR REFRESH STREAMING TABLE orders_critical (
CONSTRAINT required_customer_id EXPECT (customer_id IS NOT NULL) ON VIOLATION FAIL UPDATE
) AS SELECT * FROM STREAM(orders_clean);
from pyspark import pipelines as dp
# Warn: write invalid records but track them in metrics
@dp.table
@dp.expect("valid_order_id", "order_id IS NOT NULL")
def orders_raw():
return spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
.load("/volumes/raw/orders")
# Drop: discard invalid records before writing
@dp.table
@dp.expect_or_drop("non_negative_amount", "amount >= 0")
def orders_clean():
return spark.readStream.table("orders_raw")
# Fail: stop the pipeline on any invalid record
@dp.table
@dp.expect_or_fail("required_customer_id", "customer_id IS NOT NULL")
def orders_critical():
return spark.readStream.table("orders_clean")
無効なレコードを隔離する
削除されたレコードを黙って破棄するのではなく、調査のために保存したい場合は、検疫パターンを使用します。2 つのフローを使用して、検証に失敗した行を別のストリーミング テーブルにルーティングします。1 つはメイン テーブルから無効な行を削除し、もう 1 つは無効な行のみを隔離テーブルに書き込みます。 これにより、クリーンなデータセットを汚染することなく、不良データを調査、修正、再処理できます。
隔離パターンの詳細な例については、 「期待される推奨事項と高度なパターン」を参照してください。
期待値の詳細については、 「パイプラインの期待値によるデータ品質の管理」を参照してください。
パイプラインをパラメータ化する
パイプラインにはデフォルトのカタログとスキーマの設定があるため、同じカタログとスキーマ内で読み取りと書き込みを行うコードは、パラメーターなしで環境間で動作します。ただし、パイプラインが 2 番目のカタログまたはスキーマを参照する必要がある場合 (たとえば、開発と本番運用で異なる共有ソース カタログから読み取る場合)、それらの名前をソース コードに直接ハードコーディングすることは避けてください。 代わりに、それらをパイプライン構成の問題 (パイプライン設定で設定されたキーと値のペア) として定義し、コード内で参照します。 これにより、問題の値を交換することで、単一のコードベースを環境間で正しく実行できるようになります。
- SQL
- Python
CREATE OR REFRESH MATERIALIZED VIEW transaction_summary AS
SELECT account_id, COUNT(txn_id) AS txn_count, SUM(amount) AS total_amount
FROM ${source_catalog}.sales.transactions
GROUP BY account_id;
from pyspark import pipelines as dp
from pyspark.sql.functions import count, sum
@dp.materialized_view
def transaction_summary():
source_catalog = spark.conf.get("source_catalog")
return spark.read.table(f"{source_catalog}.sales.transactions") \
.groupBy("account_id") \
.agg(
count("txn_id").alias("txn_count"),
sum("amount").alias("total_amount")
)
詳細については、 「パイプラインでの使用」を参照してください。
各環境に適したパイプラインモードを選択する
開発および本番運用の更新モード
開発モード または 本番運用 更新モードでパイプラインを実行します。 目標に合ったモードを選択してください。
開発モード では、パイプラインは長時間稼働中のクラスターを更新全体で再利用し、エラー時に再試行しません。 これにより、クラスターの再起動を待たずにエラーの詳細をすぐに取得できるため、パイプライン コードを作成およびテストする際の反復サイクルが高速化されます。
本番運用モード では、各更新が完了するとクラスターがすぐにシャットダウンされ、コンピュート コストが削減されます。 パイプラインは、クラスターの再起動を含むエスカレーション再試行も適用し、一時的なインフラストラクチャ障害を自動的に処理します。スケジュールされたすべてのパイプライン実行には本番運用モードを使用します。
トリガーパイプラインモードと連続パイプラインモード
トリガー モードでは 、利用可能なすべてのデータを処理してから停止します。これは、スケジュールに従って (時間ごと、日ごと、またはオンデマンドで) 実行され、1 分未満のデータの鮮度を必要としないパイプラインの大半にとって適切な選択です。
継続モードでは 、クラスターの実行が継続され、新しいデータが到着すると処理されます。これは、ユースケースで数秒から数分の範囲の遅延が必要な場合にのみ適切です。継続モードでは常時稼働のクラスターが必要となるため、トリガー モードよりも大幅にコストが高くなります。
詳細については、 「トリガー パイプライン モードと連続パイプライン モード」および「パイプラインの構成」を参照してください。
データレイアウトにリキッドクラスタリングを使用する
リキッドクラスタリングは、静的パーティショニングとZORDERを置き換えて、 Deltaテーブルのデータ レイアウトを最適化します。 事前にパーティション列を選択する必要があり、値が不均等に分散されている場合にデータ スキューが発生する可能性があるパーティショニングとは異なり、リキッドクラスタリングは自己調整でスキュー耐性があり、増分的です。再編成が必要なデータのみが実行ごとに再書き込みされます。
クエリ パターンの変化に応じてテーブル全体を書き換えることなく、いつでもクラスタリング列を変更できます。
ストリーミング テーブル定義でクラスタリング列を定義します。
- SQL
- Python
CREATE OR REFRESH STREAMING TABLE events
CLUSTER BY (event_date, region)
AS SELECT * FROM STREAM read_files("/volumes/raw/events", format => "parquet");
from pyspark import pipelines as dp
@dp.table(cluster_by=["event_date", "region"])
def events():
return spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "parquet") \
.load("/volumes/raw/events")
どの列をクラスタリングするかわからない場合は、 CLUSTER BY AUTOを使用すると、 Databricksクエリ ワークロードに基づいて最適なクラスタリング列を自動的に選択できるようになります。
詳細については、 「ストリーミングテーブル」と「テーブルにリキッドクラスタリングを使用する」を参照してください。
CI/CD と Databricks アセット バンドルを使用してパイプラインを管理する
パイプラインのソース コードをバージョン管理し、 Databricks アセット バンドルを使用して環境間のデプロイメントを管理します。
詳細については、 「ソース制御のパイプラインを作成する」 、 「パイプラインをDatabricks Asset Bundle プロジェクトに変換する」 、および「パイプラインでの使用」を参照してください。
パイプラインコードをバージョン管理に保存する
すべてのパイプライン ソース ファイル (Python および SQL) をバンドル構成と一緒に Git リポジトリに保存します。プロジェクト全体をバージョン管理すると、変更の完全な履歴が提供され、共同作業が容易になり、開発環境で変更を本番運用に昇格する前に検証できるようになります。
Databricks では、このワークフローを管理するためにDatabricks Asset Bundle を推奨しています。バンドルは、ソース コードとともに YAML でパイプライン構成を定義します。また、 databricks bundle CLI使用すると、ターミナルまたはCI/CDシステムからパイプラインを検証、デプロイ、実行できます。
環境分離にバンドルターゲットを使用する
バンドルでは複数の ターゲット (例: dev 、 staging 、 prod ) が有効になり、それぞれのターゲットにカタログ名、クラスターポリシー、通知アドレス、その他の設定に対する独自のオーバーライドのセットが含まれます。 バンドル ターゲットとパイプライン問題を組み合わせて、デプロイ時に正しい環境固有の値を注入し、ソース コードに環境定数を含まないようにします。
典型的なワークフローは次のようになります。
- 開発者は機能ブランチで作業し、開発カタログ内の個人開発パイプラインにデプロイします。
- メイン ブランチへのマージ時に、CI システムは
databricks bundle validateとdatabricks bundle deploy --target stagingを実行してパイプラインを検証し、ステージング環境にデプロイします。 - テストに合格すると、CI システムは
databricks bundle deploy --target prodを使用して本番運用にデプロイされます。
ストリーミングのベストプラクティス
これらのパターンを使用して、状態を管理し、遅延データを制御し、ストリーミング パイプラインの信頼性を維持します。
詳細については、 「ウォーターマークを使用したステートフル処理の最適化」 、 「ストリーミング チェックポイントの失敗からパイプラインを回復する」 、および「パイプラインを使用したヒストリカル データのバックフィル」を参照してください。
ステートフル操作にウォーターマークを使用する
ウォーターマークは、ウィンドウ集約や重複排除などのステートフル ストリーミング操作中にパイプラインがメモリに保持する状態を制限します。ウォーターマークがないと、パイプラインが考えられるすべてのキーのデータを蓄積するにつれて状態が際限なく増大し、最終的には長時間実行されるパイプラインでメモリ不足エラーが発生します。
ウォーターマークは、タイムスタンプ列と遅延データの許容しきい値を指定します。しきい値を超えた後に到着したレコードは削除されます。遅延データに対する許容度と、その状態を開いたままにするためのメモリ コストのバランスをとるしきい値を選択します。
次の例は、1 分間のタンブリング ウィンドウ集計を 3 分間のウォーターマークでコンピュートします。
- SQL
- Python
CREATE OR REFRESH STREAMING TABLE event_counts AS
SELECT window(event_time, '1 minute') AS time_window, region, COUNT(*) AS cnt
FROM STREAM(events_raw)
WATERMARK event_time DELAY OF INTERVAL 3 MINUTES
GROUP BY time_window, region;
from pyspark import pipelines as dp
from pyspark.sql.functions import window
@dp.table
def event_counts():
return (
spark.readStream.table("events_raw")
.withWatermark("event_time", "3 minutes")
.groupBy(window("event_time", "1 minute"), "region")
.count()
)
集計が更新ごとに完全に再計算されるのではなく、増分的に処理されるようにするには、ウォーターマークを定義する必要があります。
ストリーミング状態と完全更新を理解する
ストリーミング状態は増分的です。パイプラインは、毎回最初から再計算するのではなく、更新全体にわたって状態を構築して維持します。これにより、ステートフル ストリーミングが効率的になりますが、ステートフル クエリのロジックを変更すると (たとえば、ウォーターマークしきい値の変更や集計列の変更など)、既存の状態は新しいロジックと互換性がなくなります。この場合、完全な更新を実行して、新しいロジックですべての履歴データを再処理し、状態を最初から再構築する必要があります。
ソースに履歴データが保持されていない場合、完全に更新するとデータが失われる可能性もあります。 たとえば、保持期間が短い Kafka ソースでは、更新時に最後の数分間のデータしか利用できない可能性があり、その結果、テーブルに含まれるデータは以前よりもはるかに少なくなります。特に、完全な更新にコストがかかる大容量のストリームや、ソースのデータ保持が制限されているストリームの場合は、ステートフル クエリ ロジックの変更を慎重に計画してください。メダリオンアーキテクチャを使用すると、最小限の変換でブロンズ テーブルを作成でき、完全な履歴を含むブロンズ テーブルからシルバーまたはゴールド テーブルを再計算できます。
ストリーム-ストリーム結合
ストリームストリーム結合には、結合の 両側 にウォーターマークと時間制限のある結合条件が必要です。 結合条件の時間間隔は、ストリーミング エンジンにそれ以上一致できなくなったことを伝え、一致できなくなった状態を排除できるようにします。ウォーターマークまたは時間制限条件のいずれかを省略すると、状態は無制限に増加します。
次の例では、広告インプレッション イベントとクリック イベントを結合し、インプレッションから 3 分以内にクリックが発生することを要求します。
- SQL
- Python
CREATE OR REFRESH STREAMING TABLE impression_clicks AS
SELECT imp.ad_id, imp.impression_time, clk.click_time
FROM STREAM(ad_impressions)
WATERMARK impression_time DELAY OF INTERVAL 3 MINUTES AS imp
JOIN STREAM(user_clicks)
WATERMARK click_time DELAY OF INTERVAL 3 MINUTES AS clk
ON imp.ad_id = clk.ad_id
AND clk.click_time BETWEEN imp.impression_time
AND imp.impression_time + INTERVAL 3 MINUTES;
from pyspark import pipelines as dp
from pyspark.sql.functions import expr
dp.create_streaming_table("impression_clicks")
@dp.append_flow(target="impression_clicks")
def join_impressions_and_clicks():
impressions = spark.readStream.table("ad_impressions") \
.withWatermark("impression_time", "3 minutes")
clicks = spark.readStream.table("user_clicks") \
.withWatermark("click_time", "3 minutes")
return impressions.alias("imp").join(
clicks.alias("clk"),
expr("""
imp.ad_id = clk.ad_id AND
clk.click_time BETWEEN imp.impression_time AND imp.impression_time + INTERVAL 3 MINUTES
"""),
"leftOuter"
)
ストリームを静的テーブルに結合すると (スナップショット結合)、各マイクロバッチの開始時に静的テーブルのスナップショットが更新されます。つまり、遅れて到着したディメンション レコードは、すでに処理されたファクトに遡って適用されることはありません。遡及適用が必要な場合は、マテリアライズドビューを使用するか、パイプラインを再構築してください。
パイプラインのパフォーマンスを最適化する
これらのテクニックを適用してコンピュート コストを削減し、パイプラインの更新を高速化します。
詳細については、 「マテリアライズドビュー」と「ウォーターマークを使用したステートフル処理の最適化」を参照してください。
小さなファイルを避ける
低ボリュームのソースでパイプラインを頻繁にトリガーすると、多数の小さなファイルがクラウド ストレージに書き込まれます。小さなファイルでは、各ファイルに個別のメタデータ検索と I/O ラウンドトリップが必要になり、クラウド ストレージAPIs大規模なリスト操作が制限されるため、読み取りパフォーマンスが低下します。 これを回避するには、データ量に一致するトリガー間隔を選択します。トリガーされたパイプラインを、継続的にではなく、更新の間に意味のある量のデータが蓄積されるスケジュールで実行します。
データの偏りを処理する
データ スキューは、結合キーまたは groupBy キーの値がパーティション間で不均等に分散され、少数のタスクでデータの大部分を処理する場合に発生します。これにより、エンドツーエンドの更新時間が長くなるホットスポットが作成されます。リキッドクラスタリングを使用して、格納されたテーブルのスキューに対処します。 実行中の計算中に発生する偏りについては、2 段階でグループ化および集約する前に、ランダムなバケット サフィックスを追加して、偏りの大きいキーにソルトを適用します。
詳細については、 「データ レイアウトにリキッドクラスタリングを使用する」を参照してください。
マテリアライズドビューに増分更新を使用する
大規模な集計にマテリアライズドビューを使用する場合、 LakeFlow Spark宣言型パイプラインはそれを段階的に更新しようとします。つまり、完全な結果セットを再計算するのではなく、最後の更新以降の上流の変更のみを処理します。 増分更新は、パイプライン トリガーごとにクエリを最初から再実行するよりも大幅にコストが削減されます。マテリアライズドビューが増分的に更新される可能性を最大限に高めるには、単純で決定的な集計クエリを作成し、非決定的な関数などの増分処理を妨げる構造を避けます。
マテリアライズドビューについては、「増分更新」を参照してください。
結合を最適化する
片側が小さなディメンション テーブルである結合の場合、シャッフル結合を実行する代わりに、小さい方のテーブルをすべての エグゼキューター にブロードキャストするようにSparkに指示するブロードキャスト ヒントを追加します。
- SQL
- Python
CREATE OR REFRESH MATERIALIZED VIEW enriched_orders AS
SELECT o.*, /*+ BROADCAST(p) */ p.product_name, p.category
FROM orders o
JOIN products p ON o.product_id = p.product_id;
from pyspark import pipelines as dp
from pyspark.sql.functions import broadcast
@dp.materialized_view
def enriched_orders():
orders = spark.read.table("orders")
products = spark.read.table("products")
return orders.join(broadcast(products), "product_id")
時系列の近接結合(たとえば、時間範囲内で最も近いイベントの検索)の場合は、範囲結合条件を使用し、ストリームを結合する場合は両側にウォーターマークがあることを確認するか、結合する前にイベントを時間バケットに事前にビン分けすることを検討してください。
パイプラインを監視する
パイプライン イベント ログは、 LakeFlow Spark宣言型パイプラインの主要な可観測性プリミティブです。 パイプを実行するたびに、実行の進行状況、データ品質の期待結果、ライン データ リネージ、エラーの詳細を含む構造化レコードがイベント ログに書き込まれます。 イベント ログは、直接クエリできる Delta テーブルです。
基になるストレージ パスを知らずにイベント ログをクエリするには、共有クラスターまたはSQLウェアハウスでevent_log()テーブル値関数を使用します。
SELECT * FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
ORDER BY timestamp DESC
LIMIT 100;
期待メトリクスのイベント ログをクエリして、データ品質ダッシュボードを構築します。 details列には、各制約の合格/不合格数を含むネストされたJSON構造が含まれており、これを使用して時間の経過に伴う品質の傾向を追跡し、回帰をアラートできます。
イベント ドリブン アラートの場合、パイプラインが失敗したときやデータ品質のしきい値を超えたときに、イベント フックを使用してカスタム Webhook または通知サービス (Slack や PagerDuty など) をトリガーします。イベント フックは、パイプライン イベントに応答して実行される Python 関数です。
詳細については、 「パイプラインの監視」 、 「パイプライン イベント ログ」 、および「イベント フックを使用したパイプラインのカスタム モニタリングの定義」を参照してください。
サーバレスコンピュートを使用する
Databricks新規パイプラインにはサーバレスコンピュートを推奨しております。 サーバーレスを使用すると、手動でクラスターを構成する必要はなく、 Databricksインフラストラクチャを自動的に管理します。 サーバレス パイプラインは、ワークロードの要求に応じて水平方向 (エグゼキューターの大型化) と垂直方向 (エグゼキューターの大型化) の両方に拡張できる強化されたオートスケールを使用します。 サーバレスパイプラインは常にUnity Catalog使用するため、ガバナンスとリネージ追跡が自動的に組み込まれています。
詳細については、 「サーバレス パイプラインの構成」を参照してください。
メダリオンアーキテクチャでパイプラインを整理する
メダリオンアーキテクチャは、データをブロンズ、シルバー、ゴールドの 3 つの論理レイヤーに編成し、それぞれが明確な目的を持っています。 LakeFlow Spark宣言型パイプライン データセットのタイプを適切なレイヤーにマッピングすると、各レイヤーの責任が明確になり、パイプラインの保守が容易になります。
- ブロンズ : ストリーミング テーブルを使用して、クラウド ストレージ、メッセージ バス、またはCDCソースから生データを取り込みます。 ブロンズ テーブルでは、最小限の変換で生のソース データが保存されるため、要件が変更された場合にシルバーまたはゴールドレイヤーがブロンズ レイヤーのソースから再処理できるようになります。
- シルバー : 行レベルの増分変換 (フィルタリング、クリーニング、解析) にはストリーミング テーブルを使用します。 シルバーレイヤー ロジックに、増分更新の恩恵を受けるディメンション テーブルまたは複雑な集計に対するエンリッチメント結合が含まれる場合は、マテリアライズドビューを使用します。
- ゴールド : マテリアライズドビューを使用して、ダッシュボード、レポート ツール、および下流の消費者に提供される集計、メトリクス、概要を事前にコンピュートします。
可能な限り、取り込み (ブロンズ) と変換 (シルバーとゴールド) を別々のパイプライン DAG に分離します。レイヤーを分離することで、各レイヤーを個別にスケジュール、監視、トラブルシューティングできるようになり、変換パイプラインで障害が発生しても、新しいデータが Bronze に格納されなくなります。
詳細については、ストリーミングテーブルとマテリアライズドビューを参照してください。