LakeFlow宣言型パイプラインの開発
パイプライン コードの開発とテストは、他の Apache Spark ワークロードとは異なります。この記事では、パイプライン コードの開発時にサポートされている機能、ベスト プラクティス、および考慮事項の概要について説明します。その他の推奨事項とベストプラクティスについては、LakeFlow宣言型パイプラインへのソフトウェア開発とDevOpsベスト プラクティスの適用を参照してください。
パイプライン構成にソース コードを追加して、コードを検証したり、更新を実行したりする必要があります。「LakeFlow宣言型パイプラインの構成」を参照してください。
パイプラインのソースコードにはどのファイルが有効ですか?
LakeFlow 宣言型パイプライン コードは、 Python または SQLにすることができます。 1 つのパイプラインをサポートする Python と SQL のソース コード ファイルを組み合わせることができますが、各ファイルに含めることができる言語は 1 つだけです。Python を使用したパイプライン コードの開発およびSQL を使用したパイプライン コードの開発を参照してください。
パイプラインのソース コードを指定するときに、ノートブックとワークスペース ファイルを使用できます。 ワークスペース ファイルは、任意の IDE または Databricks ファイル エディターで作成された Python または SQL スクリプトを表します。 「ワークスペースファイルとは」を参照してください。
Python コードをモジュールまたはライブラリとして開発する場合は、コードをインストールしてインポートし、ソース コードとして構成された Python ノートブックまたはワークスペース ファイルからメソッドを呼び出す必要があります。LakeFlow宣言型パイプラインのPython依存関係の管理を参照してください。
Python ノートブックで任意の SQL コマンドを使用する必要がある場合は、構文パターン パターン spark.sql("<QUERY>")
を使用して SQL を Python コードとして実行できます。
Unity Catalogの関数を使用すると、SQLで使用する任意のPythonユーザー定義関数を登録できます。Unity Catalog のユーザー定義関数 (UDF)を参照してください。
LakeFlow 宣言型パイプライン開発機能の概要
LakeFlow 宣言型パイプラインは、多くの Databricks 機能を拡張して活用し、新しい機能と概念を導入します。 次の表に、パイプライン コード開発をサポートする概念と機能の概要を示します。
機能 | 説明 |
---|---|
開発モード | 新しいパイプラインは、デフォルトで開発モードで実行されるように構成されています。 Databricks では、対話型の開発とテストに開発モードを使用することをお勧めします。 開発モードと本番運用モードを参照してください。 |
検証 |
|
ノートブック | LakeFlow宣言型パイプラインのソース コードとして構成されたノートブックはコードの検証と更新の実行のための対話型オプションを提供します。 LakeFlow宣言型パイプラインのノートブックを使用したETLパイプラインの開発とデバッグを参照してください 。 |
パラメータ | ソース コードとパイプライン構成のパラメーターを活用して、テストと拡張性を簡素化します。「LakeFlow宣言型パイプラインでパラメーターを使用する」を参照してください。 |
Databricksアセットバンドル | Databricks アセット バンドルを使用すると、パイプライン構成とソース コードをワークスペース間で移動できます。「LakeFlow宣言型パイプラインを Databricks アセット バンドル プロジェクトに変換する」を参照してください。 |
開発とテスト用のサンプル データセットを作成する
Databricks では、開発データセットとテスト データセットを作成して、予期されるデータと、形式が正しくない、または破損している可能性のあるレコードを使用してパイプライン ロジックをテストすることをお勧めします。 開発とテストに役立つデータセットを作成するには、次のような複数の方法があります。
- 本番運用 データセットからデータのサブセットを選択します。
- PII を含むソースには、匿名化されたデータまたは人工的に生成されたデータを使用します。
- ダウンストリームの変換ロジックに基づいて明確に定義された結果を持つテストデータを作成します。
- データ スキーマのエクスペクテーションを破るレコードを作成することで、潜在的なデータ破損、不正な形式のレコード、およびアップストリーム データの変更を予測します。
たとえば、次のコードを使用してデータセットを定義するノートブックがあるとします。
CREATE OR REFRESH STREAMING TABLE input_data
AS SELECT * FROM STREAM read_files(
"/production/data",
format => "json")
特定のレコードを含むサンプル データセットは、次のようなクエリを使用して作成できます。
CREATE OR REFRESH MATERIALIZED VIEW input_data AS
SELECT "2021/09/04" AS date, 22.4 as sensor_reading UNION ALL
SELECT "2021/09/05" AS date, 21.5 as sensor_reading
次の例は、パブリッシュされたデータをフィルタリングして、開発またはテスト用の本番運用データのサブセットを作成する方法を示しています。
CREATE OR REFRESH MATERIALIZED VIEW input_data AS SELECT * FROM prod.input_data WHERE date > current_date() - INTERVAL 1 DAY
これらの異なるデータセットを使用するには、変換ロジックを実装するノートブックを使用して複数のパイプラインを作成します。 各パイプラインは input_data
データセットからデータを読み取ることができますが、環境に固有のデータセットを作成するノートブックを含むように構成されています。
LakeFlow 宣言型パイプライン データセットはデータをどのように処理しますか?
次の表では、マテリアライズドビュー、ストリーミングテーブル、およびビューがデータを処理する方法について説明します。
データセットのタイプ | 定義されたクエリーによってレコードが処理される方法とは? |
---|---|
ストリーミングテーブル | 各レコードは一度だけ処理されます。これは、追加専用のソースを前提としています。 |
マテリアライズドビュー | レコードは、現在のデータ状態の正確な結果を返すために、必要に応じて処理されます。マテリアライズドビューは、変換、集計、低速クエリや頻繁に使用される計算の事前計算などのデータ処理タスクに使用する必要があります。 |
ビュー | レコードは、ビューがクエリーされるたびに処理されます。パブリックデータセットに公開すべきではない中間変換やデータ品質チェックにはビューを使用します。 |
最初のデータセットを宣言する LakeFlow 宣言型パイプライン
LakeFlow 宣言型パイプラインでは、 Python と SQLの新しい構文が導入されています。 パイプライン構文の基本については、Python を使用したパイプライン コードの開発 および SQL を使用したパイプライン コードの開発を参照してください。
LakeFlow 宣言型パイプラインは、データセット定義を更新処理から分離し、 LakeFlow 宣言型パイプライン ノートブックは対話式実行を目的としていません。
宣言型パイプライン LakeFlow はどのように構成しますか?
LakeFlow宣言型パイプラインの設定は、大きく分けて 2 つのカテゴリに分類されます。
- ノートブックまたはファイルのコレクションを定義する構成 ( ソース コード と呼ばれます) LakeFlow 宣言型パイプライン構文を使用してデータセットを宣言します。
- パイプラインのインフラストラクチャ、依存関係の管理、更新の処理方法、ワークスペースでのテーブルの保存方法を制御する設定。
ほとんどの構成はオプションですが、特に運用パイプラインを構成する場合は、注意が必要です。これらには以下が含まれます:
- パイプラインの外部でデータを利用できるようにするには、Hive metastoreに公開する ターゲットスキーマ 、またはUnity Catalogに公開する ターゲットカタログ と ターゲットスキーマ を宣言する必要があります。
- データアクセス権限は、実行に使用されるクラスターを通じて構成されます。 クラスターに、データソースとターゲット ストレージの場所 (指定されている場合) に適切な権限が設定されていることを確認します。
Python と SQL を使用してパイプラインのソース コードを記述する方法の詳細については、LakeFlow 宣言型パイプライン SQL 言語リファレンスおよびLakeFlow 宣言型パイプライン Python 言語リファレンスを参照してください。
パイプラインの設定と構成の詳細については、「LakeFlow宣言型パイプラインの構成」を参照してください。
最初のパイプラインをデプロイし、更新をトリガーする
宣言型パイプラインでデータを処理する前に LakeFlow パイプラインを構成する必要があります。 パイプラインを設定したら、更新をトリガーして、パイプライン内の各データセットの結果を計算することができます。LakeFlow宣言型パイプラインの使用を開始するには、チュートリアル: チェンジデータキャプチャとLakeFlow宣言型パイプラインを使用してETLパイプラインを構築するを参照してください。
パイプラインの更新とは
パイプラインはインフラストラクチャをデプロイし、 更新 の開始時にデータの状態を再計算します。更新プログラムは、次の処理を行います:
- 正しい構成でクラスターを開始します。
- 定義されているすべてのテーブルとビューを検出し、無効な列名、依存関係の欠落、構文エラーなどの分析エラーをチェックします。
- 使用可能な最新のデータでテーブルとビューを作成または更新します。
パイプラインは、ユースケースのコストとレイテンシの要件に応じて、継続的に実行することも、スケジュールに従って実行することもできます。LakeFlow宣言型パイプラインでの更新の実行を参照してください。
LakeFlow 宣言型パイプラインを使用したデータの取り込み
LakeFlow 宣言型パイプラインは、 Databricksで使用可能なすべてのデータソースをサポートしています。
Databricksでは、ほとんどのインジェストのユースケースでストリーミングテーブルを使用することをお勧めします。 クラウド オブジェクト ストレージに到着するファイルの場合、Databricks では Auto Loader をお勧めします。LakeFlow宣言型パイプラインを使用することで、ほとんどのメッセージバスからデータを直接取り込むことができます。
クラウドストレージへのアクセスの設定の詳細については、「 クラウドストレージの設定」を参照してください。
Auto Loaderでサポートされていない形式については、Python または SQL を使用して、Apache Sparkでサポートされている任意の形式をクエリできます。「LakeFlow宣言型パイプラインを使用したデータの読み込みを参照してください。
データ品質の監視と適用
エクスペクテーション を使用して、データセットの内容に対するデータ品質管理を指定できます。制約を満たさないレコードを追加できない従来のデータベースの CHECK
制約とは異なり、エクスペクテーションは、データ品質要件を満たさないデータを処理するときに柔軟性を提供します。 この柔軟性により、乱雑になると予想されるデータや、厳しい品質要件を満たす必要があるデータを処理および保存できます。 パイプラインのエクスペクテーションを使用してデータ品質を管理するを参照してください。
LakeFlow宣言型パイプラインとDelta Lakeはどのように関連していますか?
LakeFlow 宣言型パイプラインは、 Delta Lake. 宣言型パイプラインによって作成および管理 LakeFlow テーブルは Delta テーブルであるため、 Delta Lakeによって提供されるものと同じ保証と機能を備えています。 「Databricks の Delta Lake とは」を参照してください。
LakeFlow 宣言型パイプラインは、 Delta Lakeで設定できる多くのテーブル プロパティに加えて、いくつかのテーブル プロパティを追加します。 「宣言型パイプラインのプロパティLakeFlowリファレンス」および「Deltaテーブルのプロパティリファレンス」を参照してください。
LakeFlow宣言型パイプラインによるテーブルの作成と管理方法
Databricks 宣言型パイプラインを使用して自動的に管理テーブル LakeFlow 作成され、テーブルの現在の状態を正しくコンピュートするために更新をどのように処理する必要があるかを判断し、さまざまなメンテナンスおよび最適化タスクを実行します。
ほとんどの操作では、宣言型パイプライン LakeFlow ターゲット テーブルに対するすべての更新、挿入、および削除を処理できるようにする必要があります。 詳細と制限事項については、「 手動による削除または更新の保持」を参照してください。
LakeFlow 宣言型パイプラインによって実行されるメンテナンス タスク
LakeFlow 宣言型パイプラインは、 予測的最適化を使用して、最適な頻度でメンテナンス タスクを実行します。 メンテナンスにより、古いバージョンのテーブルを削除することで、クエリのパフォーマンスを向上させ、コストを削減できます。これには、フル OPTIMIZE および vacuum 操作が含まれます。 メンテナンス タスクは、予測的最適化によって決定されたスケジュールで、前回のメンテナンス以降にパイプラインの更新が実行された場合にのみ実行されます。
予測的最適化 実行の頻度、および保守コストを理解するには、 予測的最適化 システムテーブル・リファレンスを参照してください。
制限
制限事項の一覧については、「LakeFlow 宣言型パイプラインの制限事項」を参照してください。
で 宣言型パイプラインを使用する場合 固有の要件と制限事項の一覧については、「LakeFlowUnity Catalog 宣言型パイプラインで Unity Catalogを使用する 」を参照してくださいLakeFlow
追加のリソース
- LakeFlow 宣言型パイプラインは、 Databricks REST APIを完全にサポートしています。LakeFlow 宣言型パイプライン APIを参照してください。
- パイプラインとテーブルの設定については、「LakeFlow 宣言型パイプラインのプロパティ リファレンス」を参照してください。
- LakeFlow 宣言型パイプライン SQL 言語リファレンス。
- LakeFlow 宣言型パイプライン Python 言語リファレンス。