Auto Loaderのベストプラクティス
このページでは、Auto Loaderをユースケースに合わせて信頼性高く、費用対効果が高く、大規模に実行するように構成するためのベストプラクティスについて説明します。
これらのベストプラクティスは、運用上のオーバーヘッドを削減し、本番運用で診断が困難な、次のような一般的な問題を防止します。フルディレクトリのスキャンによる不要なLIST APIコスト、スキーマのドリフトによるサイレントなデータ損失、およびチェックポイントの誤設定によって引き起こされるパイプラインの再起動。
本番運用の構成の詳細については、本番運用のワークロード用に Auto Loader を構成するを参照してください。モニタリングとオブザーバビリティについては、Auto Loaderの監視と観測を参照してください。
適切な実行フレームワークを選択する
ユースケースに最適な実行フレームワークは、パイプラインに対する制御の必要度と、管理したい運用オーバーヘッドの量によって異なります。ほとんどのユーザーや本番運用のパイプラインにとって、Lakeflow Spark宣言型パイプラインを使用したAuto Loaderは適しています。ただし、最大限の制御とカスタマイズが必要な場合は、構造化ストリーミングでAuto Loaderを使用してください。マネージドエクスペリエンスで最も簡単なセットアップを行うには、利用可能な場合にマネージドLakeFlowコネクタを使用してください。
Lakeflow Spark宣言型パイプラインは、オートスケール、データ品質チェック、スキーマ進化の処理、およびイベントログを通じたモニタリングにより、構造化ストリーミングを拡張します。Databricksでは、ほとんどの本番運用取り込みワークロードにLakeflow Spark宣言型パイプラインを使用することをお勧めします。
適切なスケジューリングとトリガータイプを選択してください
ユースケースに最適なスケジューリングとトリガータイプは、レイテンシ要件とファイルの到着パターンによって異なります。ほとんどのユースケースでは、Databricksはファイルイベントが有効なファイル到着トリガーを推奨しています。これは、新しいファイルが到着したときにのみコンピュートが実行されるため、低コストで低レイテンシの取り込みを実現します。3つのトリガータイプは、パイプラインの開始時期と頻度によって異なります。
- **継続的**: パイプラインは停止せずに実行されます。継続的なコンピュートはよりコストがかかるため、サブ秒のレイテンシーが厳格な要件である場合にのみ使用します。ファイルイベントと組み合わせます。
- **ファイル到着トリガー**:新しいファイルがソースロケーションに到着するとパイプラインが開始されます。低から中程度のレイテンシー、または不規則なファイル到着パターンに最適です。ファイルイベントを有効にする必要があります。新しいファイルが到着したときにジョブをトリガーするを参照してください。
- スケジュール済み :パイプラインは時間ベースのスケジュール(例:毎時)で実行されます。レイテンシ要件が緩やかである場合(数分から数時間)に使用します。ディレクトリリストと連携しますが、ファイルイベントは、完全なディレクトリのスキャンを回避することで、スケジュールモードでもコストを削減します。
バッチスケジューリングにTrigger.AvailableNowを使用する方法の詳細については、Trigger.AvailableNow とレート制限の使用を参照してください。
適切なファイル検出モードを選択してください
Auto Loaderは、セットアップの複雑さ、スケーラビリティ、コストにそれぞれ異なるトレードオフを持つ3つのファイル検出モードをサポートしています。
モード | セットアップの複雑さ | スケーラビリティ | コスト | 使用する場合 |
|---|---|---|---|---|
ファイルイベント(推奨) | 低(1回限りのアクセス許可の設定) | 1時間あたり数百万のファイル | 最低 | ほとんどのワークロードのデフォルトです。 |
クラシックファイル通知 | 高(21以上のクラウド構成オプション) | 1時間あたり数百万のファイル | M | ファイルイベントが利用できない場合 |
ディレクトリ一覧 | なし | ディレクトリサイズによって制限されます | 最も高い (LIST APIコスト) | 小規模なディレクトリ、1回限りのバックフィル、またはセキュリティポリシーによってファイルイベントが防止される場合 |
ファイル イベントは、ストリームごとに1つではなく、外部ロケーションごとに1つのサブスクリプションとキューを使用することで、クラウド ストレージ リソースを集約します。大規模環境では、パフォーマンスの差は大きくなります。ディレクトリ一覧では、トリガーごとにソース ディレクトリ全体をスキャンする必要があるため、取り込み時間はディレクトリのサイズに応じて増加します。ファイル イベントは新しいファイル通知を直接配信するため、ディレクトリ内のオブジェクト数に関係なく、取り込み時間は低いまま維持されます。
ファイルイベントを有効にする
ファイルイベントには、一度限りのクラウドアクセス許可付与と、マネージドファイルイベントサービスを使用するように構成された外部ロケーションが必要です。設定すると、その外部ロケーションから読み取るすべてのAuto Loaderストリームは、追加の構成なしでファイルイベントを使用できます。
-
クラウドプロバイダー側で必要なクラウド権限を付与します。要件はクラウドプロバイダーによって異なります。外部ロケーションのファイルイベントを設定するを参照してください。
-
Auto Loaderクエリで
cloudFiles.useManagedFileEventsをtrueに設定します。Pythondf = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.useManagedFileEvents", "true")
.load("/path/to/data/dir"))完全なセットアップステップについては、ファイルイベントを用いたAuto Loaderへの移行を参照してください。
ファイルイベントを使用できない場合
次の場合は、ファイル イベントを使用できない場合があります。
- 外部ロケーションはファイルイベントで構成されていません。
- 組織のセキュリティポリシーでは、共有外部ロケーションでのファイルイベントの有効化は許可されていません。
これらの場合、クラシックファイル通知モードまたはディレクトリリストモードを使用します。ファイル検出モードの完全な比較については、Auto Loaderファイル検出モードの比較を参照してください。
スキーマ進化の管理
Auto Loaderはスキーマを自動的に推論しますが、スキーマ進化をどのように構成するかによって、データの完全性とパイプラインの安定性に影響します。戦略を選択するには、次の表を使用してください。
シナリオ | 推奨事項 |
|---|---|
スキーマは既知で固定されています | 明示的なスキーマを使用して |
スキーマは不明ですが、追加の変更が予想されます |
|
スキーマは不明であり、型の変更が予想されます。 |
|
厳密なスキーマ契約が必要です |
|
任意または予測不能なスキーマ |
|
戦略を選択したら、スキーマ進化の動作を微調整するために次のプラクティスを適用します。
既知のフィールドタイプにスキーマヒントを使用する
cloudFiles.schemaHintsオプションを使用して、事前にわかっているフィールドの型を強制し、他のフィールドのスキーマ推論も可能にします。
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaHints", "id long, amount double")
.load("/path/to/data/dir"))
互換性のある型変更には型拡張を使用します
addNewColumnsWithTypeWideningスキーマ進化モードでは、互換性のある型を自動的に拡張し(例:intからlong)、データを_rescued_data列にルーティングする代わりに処理します。これにより、単純な型プロモーションを処理するための後処理ジョブが不要になります。
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "parquet")
.option("cloudFiles.schemaEvolutionMode", "addNewColumnsWithTypeWidening")
.load("/path/to/data/dir"))
予測不能なスキーマをVariant型として取り込む
データが特定のスキーマに準拠しない場合、またはスキーマが継続的に変更される場合は、データを Variant 型として取り込みます。
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("singleVariantColumn", "data")
.load("/path/to/data/dir"))
Variant クエリ時にスキーマオンリードを提供しますが、構造化された列をクエリするよりも効率が低下します。スキーマ推論と進化の完全なメカニズムについては、Auto Loaderでのスキーマ推論と進化の設定を参照してください。
不良データとデータ品質の処理
次のプラクティスは、不適切なデータがダウンストリームレイヤーに伝播する前に、それを検出、キャプチャ、分離するのに役立ちます。
_rescued_dataと有効にする _corrupt_record
Auto Loaderには、クリーンに解析できなかったデータをキャプチャするための2つの列が用意されています。
_rescued_data現在のスキーマと一致しないフィールドをキャプチャします。Auto Loaderによって自動的に追加されます。_corrupt_recordまったく解析できない行をキャプチャします。columnNameOfCorruptRecordを使用して有効にします:
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaHints", "_corrupt_record string")
.option("columnNameOfCorruptRecord", "_corrupt_record")
.load("/path/to/data/dir"))
Databricksでは、破損したレコードを見逃す可能性がある競合状態を避けるために、badRecordsPathよりもcolumnNameOfCorruptRecordを推奨しています。
モニタリングにLakeflow Spark宣言型パイプラインの期待値を使用する
Lakeflow Spark宣言型パイプラインの期待を設定し、通常の条件下で_rescued_dataと_corrupt_recordがNULLであることを確認します。NULL以外の値は、スキーマドリフトまたはデータ破損を示します。
import dlt
@dlt.table
@dlt.expect("no rescued data", "_rescued_data IS NULL")
@dlt.expect("no corrupt records", "_corrupt_record IS NULL")
def bronze_table():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaHints", "_corrupt_record string")
.option("columnNameOfCorruptRecord", "_corrupt_record")
.load("/path/to/data/dir"))
破損したデータの分離
解析不能なデータを含む行は、調査のために専用のシンクに分離してください。これにより、破損したデータがダウンストリームレイヤーに伝播するのを防ぎます。
import dlt
@dlt.table
def corrupt_records_sink():
return dlt.read_stream("bronze_table").where("_corrupt_record IS NOT NULL")
@dlt.view
def clean_table():
return dlt.read_stream("bronze_table").where("_corrupt_record IS NULL")
ソースファイルのメタデータでデータにアノテーションを付ける
Auto Loaderの取り込みクエリに _metadata 列を含めます。最小限、file_pathとfile_modification_timeをキャプチャします。これにより、データの問題を特定のソースファイルまで追跡し、ファイルのライフサイクル全体で cloud_files_state() と結合できます。
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/path/to/data/dir")
.select("*", "_metadata.file_path", "_metadata.file_modification_time"))
詳細については、ファイルメタデータ列を参照してください。
コストとパフォーマンスの最適化
以下の対策により、Auto Loaderの3つの主なコスト要因(クラウドLIST APIコール、アイドルコンピュート、長期ストレージの増加)を削減します。
-
ファイルイベントを使用してLIST APIコストを最小限に抑える : ファイルイベントは増分ファイル検出を提供し、実行ごとに完全なディレクトリリストを作成する必要がありません。これは、Auto Loaderにとって最も影響の大きいコスト最適化です。
-
イベント駆動型処理にはファイル到着トリガーを使用します :ファイル到着トリガーは、新しいファイルが到着した場合にのみパイプラインを開始するため、アイドル状態のコンピュート料金は発生しません。新しいファイルが到着したときにジョブをトリガーするを参照してください。
-
**cloudFiles.cleanSourceを使用して処理済みファイルをアーカイブします**。
cloudFiles.cleanSourceを使用して、処理済みファイルを自動的に削除または移動します。これにより、ストレージコストと長期間使用されるストリームのディレクトリリストのコストの両方が削減されます。詳細については、コスト削減のためにソースディレクトリ内のファイルをアーカイブするを参照してください。- 取り込み後にファイルを削除するには、
deleteモードを使用します。 moveモードを使用して、コンプライアンスまたは監査のためにファイルを別の場所にアーカイブします。
Pythondf = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.cleanSource", "delete")
.load("/path/to/data/dir")) - 取り込み後にファイルを削除するには、
複数のAuto Loaderストリームまたは他のクライアントが同じソースディレクトリから読み取る場合、cloudFiles.cleanSourceを有効にしないでください。
- パフォーマンスの向上を活用する :最新のDatabricks Runtimeにアップグレードするか、サーバレスコンピュートを使用して、最近のAuto Loaderのパフォーマンス向上を活用してください。
チェックポイント管理
チェックポイントは、ストリームの進行状況とファイルの状態を格納します。チェックポイントの構成ミスまたは損失は完全な再起動を必要とするため、重要なインフラストラクチャとして扱ってください。
- チェックポイントロケーションにクラウドオブジェクトライフサイクルポリシーを適用しないでください。チェックポイントファイルが削除された場合、ストリームの状態が破損するため、最初から再起動する必要があります。
- 各ストリームとソースディレクトリに個別のチェックポイントを使用します。
- 状態の増加を制限するには、長期間にわたる大容量のストリームに
cloudFiles.maxFileAgeを検討してください。保守的な設定を使用してください(90日間が最小推奨です)。この値を積極的に設定しすぎると、Auto Loaderがすでに取り込んだファイルがウィンドウ外に落ちた場合に再処理するリスクがあります。
詳細については、ファイルイベントの追跡を参照してください。
ファイルイベントで最適なファイル検出のためにボリュームを使用します。
ファイルイベントによるパフォーマンス向上のため、Auto Loaderがロードする各パスまたはサブディレクトリに外部ボリュームを作成します。クラウドパス(例:s3://bucket/path)の代わりに、ボリュームパス(例:/Volumes/catalog/schema/volume)をAuto Loaderに提供します。これにより、最適化されたデータアクセスパターンを通じてファイル検出が最適化されます。
ファイルイベントのベストプラクティスの詳細については、ファイルイベントを使用したAuto Loaderのベストプラクティスを参照してください。