メインコンテンツまでスキップ

バックフィル ヒストリカルデータ with LakeFlow Declarative パイプライン

データエンジニアリングでは、 バックフィル とは、現在のデータまたはストリーミングデータを処理するために設計されたデータパイプラインを介してヒストリカルデータを遡及的に処理するプロセスを指します。

通常、これは既存のテーブルにデータを送信する別のフローです。次の図は、ヒストリカルデータをパイプラインのブロンズテーブルに送信するバックフィルフローを示しています。

既存のワークフローへのヒストリカルデータ の追加バックフィルフロー

バックフィルが必要になる可能性のあるシナリオには、次のようなものがあります。

  • レガシ システムからヒストリカルデータを処理して、機械学習 (ML) モデルをトレーニングしたり、履歴傾向分析ダッシュボードを構築したりします。
  • アップストリーム データソースのデータ品質の問題により、データのサブセットを再処理します。
  • ビジネス要件が変更されたため、最初のパイプラインでカバーされていなかった別の期間のデータをバックフィルする必要があります。
  • ビジネス ロジックが変更され、履歴データと現在のデータの両方を再処理する必要があります。

LakeFlow 宣言型パイプラインのバックフィルは、ONCE オプションを使用する特殊な追加フローでサポートされています。ONCE オプションの詳細については、append_flow または CREATE FLOW (LakeFlow Declarative パイプライン) を参照してください。

ヒストリカルデータをストリーミングテーブルにバックフィルする際の考慮事項

  • 通常は、データを bronze ストリーミングテーブルに追加します。 ダウンストリームの silver と goldlayer は、ブロンズレイヤーから新しいデータを取得します。
  • 同じデータが複数回追加される場合に備えて、パイプラインが重複データを適切に処理できることを確認します。
  • ヒストリカルデータ スキーマが現在のデータ スキーマと互換性があることを確認します。
  • データ・ボリューム・サイズと必要な処理時間 SLAを考慮し、それに応じてクラスタリング・サイズとバッチ・サイズを構成します。

例: 既存のパイプラインへのバックフィルの追加

この例では、2025 年 1 月 1 日以降、クラウド ストレージ ソースから未加工のイベント登録データを取り込むパイプラインがあるとします。後で、ダウンストリームのレポート作成および分析のユースケースのために、過去 3 年間のヒストリカルデータをバックフィルしたいことに気付きました。 すべてのデータは 1 つの場所にあり、年、月、日ごとに JSON 形式でパーティション分割されています。

初期パイプライン

以下は、クラウドストレージから未加工のイベント登録データを段階的に取り込む開始パイプラインコードです。

Python
import dlt

source_root_path = spark.conf.get("registration_events_source_root_path")
begin_year = spark.conf.get("begin_year")
incremental_load_path = f"{source_root_path}/*/*/*"

# create a streaming table and the default flow to ingest streaming events
@dlt.table(name="registration_events_raw", comment="Raw registration events")
def ingest():
return (
spark
.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.inferColumnTypes", "true")
.option("cloudFiles.maxFilesPerTrigger", 100)
.option("cloudFiles.schemaEvolutionMode", "addNewColumns")
.option("modifiedAfter", "2025-01-01T00:00:00.000+00:00")
.load(incremental_load_path)
.where(f"year(timestamp) >= {begin_year}") # safeguard to not process data before begin_year
)

ここでは、 modifiedAfter Auto Loader オプションを使用して、クラウド ストレージ パスからのすべてのデータを処理していないことを確認します。インクリメンタル処理は、その境界でカットオフされます。

ヒント

Kafka、Kinesis、Azure Event Hubs などの他のデータソースには、同じ動作を実現するための同等のリーダー オプションがあります。

過去 3 年間のバックフィルデータ

次に、以前のデータをバックフィルするために 1 つ以上のフローを追加します。この例では、次の手順を実行します。

  • append onceフローを使用します。これにより、最初のバックフィルの後も実行を続行せずに、1 回限りのバックフィルが実行されます。コードはパイプラインに残り、パイプラインが完全に更新された場合は、バックフィルが再実行されます。
  • 年ごとに 1 つずつ、合計 3 つのバックフィルフローを作成します(この場合、データはパス内の年ごとに分割されます)。Python では、フローの作成をパラメーター化しますが、SQL では、フローごとに 1 回、コードを 3 回繰り返します。

自分のプロジェクトで作業していて、サーバレス コンピュートを使用していない場合は、パイプラインの max ワーカーを更新することをお勧めします。 max ワーカーを増やすと、ヒストリカルデータを処理しながら、現在のストリーミングデータを予想される SLA内で引き続き処理するためのリソースを確保できます。

ヒント

サーバレス コンピュートと拡張オートスケール (the default) を使用すると、負荷が増加すると自動的にクラスタリングのサイズが大きくなります。

Python
import dlt

source_root_path = spark.conf.get("registration_events_source_root_path")
begin_year = spark.conf.get("begin_year")
backfill_years = spark.conf.get("backfill_years") # e.g. "2024,2023,2022"
incremental_load_path = f"{source_root_path}/*/*/*"

# meta programming to create append once flow for a given year (called later)
def setup_backfill_flow(year):
backfill_path = f"{source_root_path}/year={year}/*/*"
@dlt.append_flow(
target="registration_events_raw",
once=True,
name=f"flow_registration_events_raw_backfill_{year}",
comment=f"Backfill {year} Raw registration events")
def backfill():
return (
spark
.read
.format("json")
.option("inferSchema", "true")
.load(backfill_path)
)

# create the streaming table
dlt.create_streaming_table(name="registration_events_raw", comment="Raw registration events")

# append the original incremental, streaming flow
@dlt.append_flow(
target="registration_events_raw",
name="flow_registration_events_raw_incremental",
comment="Raw registration events")
def ingest():
return (
spark
.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.inferColumnTypes", "true")
.option("cloudFiles.maxFilesPerTrigger", 100)
.option("cloudFiles.schemaEvolutionMode", "addNewColumns")
.option("modifiedAfter", "2024-12-31T23:59:59.999+00:00")
.load(incremental_load_path)
.where(f"year(timestamp) >= {begin_year}")
)

# parallelize one time multi years backfill for faster processing
# split backfill_years into array
for year in backfill_years.split(","):
setup_backfill_flow(year) # call the previously defined append_flow for each year

この実装では、いくつかの重要なパターンが強調されています。

懸念事項の分離

  • インクリメンタル処理は、バックフィル操作とは無関係です。
  • 各フローには、独自の構成と最適化の設定があります。
  • インクリメンタル操作とバックフィル操作には明確な違いがあります。

制御された実行

  • ONCE オプションを使用すると、各バックフィルが 1 回だけ実行されるようになります。
  • バックフィルフローはパイプライングラフに残りますが、完了するとアイドル状態になります。完全に更新すると、自動的に使用できます。
  • パイプライン定義には、バックフィル操作の明確な監査証跡があります。

処理の最適化

  • 大きなバックフィルを複数の小さなバックフィルに分割して、処理を高速化したり、処理を制御したりすることができます。
  • 拡張オートスケールを使用すると、現在のクラスタリング負荷に基づいてクラスタリングサイズが動的にスケーリングされます。

スキーマの展開

  • schemaEvolutionMode="addNewColumns"を使用すると、スキーマの変更が適切に処理されます。
  • 履歴データと現在のデータ間で一貫したスキーマ推論を行うことができます。
  • 新しいデータでは、新しい列を安全に処理できます。

追加のリソース