Delta Live Tables パイプラインを開発する

パイプライン コードの開発とテストは、他の Apache Spark ワークロードとは異なります。 この記事では、パイプライン コードの開発時にサポートされている機能、ベスト プラクティス、および考慮事項の概要について説明します。 その他の推奨事項とベスト プラクティスについては、「 ソフトウェア開発と DevOps のベスト プラクティスを Delta Live Table パイプラインに適用する」を参照してください。

注:

パイプライン構成にソース コードを追加して、コードを検証したり、更新を実行したりする必要があります。 「Delta Live Tables パイプラインの構成」を参照してください。

パイプラインのソースコードにはどのファイルが有効ですか?

Delta Live Tables パイプライン コードには、Python または SQL を指定できます。 1 つのパイプラインをサポートする Python と SQL のソース コード ファイルを組み合わせることができますが、各ファイルに含めることができる言語は 1 つだけです。 「Python を使用したパイプライン コードの開発」および「SQL を使用したパイプライン コードの開発」を参照してください。

パイプラインのソース コードを指定するときに、ノートブックとワークスペース ファイルを使用できます。 ワークスペース ファイルは、任意の IDE または Databricks ファイル エディターで作成された Python または SQL スクリプトを表します。 「ワークスペースファイルとは」を参照してください。

Python コードをモジュールまたはライブラリとして開発する場合は、コードをインストールしてインポートし、ソース コードとして構成された Python ノートブックまたはワークスペース ファイルからメソッドを呼び出す必要があります。 「Delta Live Tables パイプラインの Python 依存関係の管理」を参照してください。

注:

Python ノートブックで任意の SQL コマンドを使用する必要がある場合は、構文パターン パターン spark.sql("<QUERY>") を使用して SQL を Python コードとして実行できます。

関数を使用すると、Unity Catalog で使用する任意の ユーザー定義関数を登録できます。PythonSQL「Unity Catalog のユーザー定義関数 (UDF)」を参照してください。

Delta Live Tables 開発機能の概要

Delta Live Tables は、多くの Databricks 機能を拡張して活用し、新しい機能と概念を導入します。 次の表に、パイプライン コード開発をサポートする概念と機能の概要を示します。

機能

説明

開発モード

新しいパイプラインは、デフォルトで開発モードで実行されるように構成されています。 Databricks では、対話型の開発とテストに開発モードを使用することをお勧めします。 開発モードと本番運用モードを参照してください。

検証

Validate更新では、テーブルで更新を実行せずに、パイプラインのソースコードの正確性を検証します。「テーブルの更新を待たずにパイプラインでエラーを確認する」を参照してください。

ノートブック

Delta Live Tables パイプラインのソース コードとして構成されたノートブックには、コードの検証と更新の実行のための対話型オプションが用意されています。 「ノートブックでの Delta Live Tables パイプラインの開発とデバッグ」を参照してください

パラメータ

ソース コードとパイプライン構成のパラメーターを活用して、テストと拡張性を簡素化します。 「 Delta Live Tables パイプラインでのパラメーターの使用」を参照してください。

Databricksアセットバンドル

Databricks アセット バンドルを使用すると、パイプライン構成とソース コードをワークスペース間で移動できます。 「Delta Live Tables パイプラインを Databricks Asset Bundle プロジェクトに変換する」を参照してください。

開発およびテスト用のサンプル データセットを作成する

Databricks では、開発データセットとテスト データセットを作成して、予期されるデータと、形式が正しくない、または破損している可能性のあるレコードを使用してパイプライン ロジックをテストすることをお勧めします。 開発とテストに役立つデータセットを作成するには、次のような複数の方法があります。

  • 本番運用 データセットからデータのサブセットを選択します。

  • PII を含むソースには、匿名化されたデータまたは人工的に生成されたデータを使用します。

  • ダウンストリームの変換ロジックに基づいて明確に定義された結果を持つテストデータを作成します。

  • データ スキーマのエクスペクテーションを破るレコードを作成することで、潜在的なデータ破損、不正な形式のレコード、およびアップストリーム データの変更を予測します。

たとえば、次のコードを使用してデータセットを定義するノートブックがあるとします。

CREATE OR REFRESH STREAMING TABLE input_data AS SELECT * FROM read_files("/production/data", "json")

特定のレコードを含むサンプル データセットは、次のようなクエリを使用して作成できます。

CREATE OR REFRESH MATERIALIZED VIEW input_data AS
SELECT "2021/09/04" AS date, 22.4 as sensor_reading UNION ALL
SELECT "2021/09/05" AS date, 21.5 as sensor_reading

次の例は、パブリッシュされたデータをフィルタリングして、開発またはテスト用の本番運用データのサブセットを作成する方法を示しています。

CREATE OR REFRESH MATERIALIZED VIEW input_data AS SELECT * FROM prod.input_data WHERE date > current_date() - INTERVAL 1 DAY

これらの異なるデータセットを使用するには、変換ロジックを実装するノートブックを使用して複数のパイプラインを作成します。 各パイプラインは LIVE.input_data データセットからデータを読み取ることができますが、環境に固有のデータセットを作成するノートブックを含むように構成されています。