Delta Live Tables パイプラインを開発およびテストする方法

この記事では、 Delta Live Tables パイプラインの開発とテストに使用できるパターンについて説明します。 パイプライン設定を使用して、 Delta Live Tables では、開発環境、テスト環境、および運用環境でパイプラインを分離するための構成を指定できます。 この記事の推奨事項は、SQL コード開発と Python コード開発の両方に適用されます。

開発モードを使用してパイプラインの更新を実行する

Delta Live Tables には、パイプラインの更新を開発モードで実行するか運用モードで実行するかを制御する UI トグルが用意されています。 このモードは、パイプラインの更新の処理方法を制御します。

  • 開発モードでは、更新が成功または失敗した後、コンピュート リソースはすぐには終了しません。 同じコンピュート リソースを再利用して、クラスターの開始を待たずにパイプラインの複数の更新を実行できます。

  • 開発モードでは、タスクが失敗しても自動的に再試行されないため、パイプラインの論理エラーまたは構文エラーをすぐに検出して修正できます。

Databricks では、開発およびテスト中に開発モードを使用し、運用環境に配置するときに常に運用モードに切り替えることをお勧めします。

開発モードと本番モードを参照してください。

テーブルの更新を待たずにパイプラインのソース コードをテストする

開発およびテスト中に、構文エラーや分析エラーなど、パイプラインのソースコードに関する問題をチェックするには、検証 更新を実行します。 Validate 更新では、テーブルに対して実際の更新を実行せずに、パイプライン ソース コードの正確性を検証するだけなので、実際のパイプライン更新を実行する前に、問題をより迅速に特定して修正できます。

すべての開発ライフサイクルフェーズでターゲットスキーマを指定する

Delta Live Tables パイプライン内のすべてのデータセットは、パイプラインの外部からはアクセスできない LIVE 仮想スキーマを参照します。 ターゲット スキーマが指定されている場合、 LIVE 仮想スキーマはターゲット スキーマを指します。 更新中に各テーブルに書き込まれた結果を確認するには、ターゲット スキーマを指定する必要があります。

環境に固有のターゲット スキーマを指定する必要があります。 特定のスキーマ内の各テーブルは、1 つのパイプラインでのみ更新できます。

異なるターゲットで開発、テスト、運用用に個別のパイプラインを作成することで、これらの環境を分離したままにすることができます。 target schema パラメーターを使用すると、文字列補間やその他のウィジェットやパラメーターを使用して DATA とターゲットを制御するロジックを削除できます。

Delta Live Tables パイプラインから Hive metastoreへのデータの発行」を参照してください。

Databricks Git フォルダーを使用して Delta Live Tables パイプラインを管理する

Databricks では、Delta Live Tables パイプラインの開発、テスト、本番運用へのデプロイ中に Git フォルダーを使用することをお勧めします。 Git フォルダーを使用すると、次のことが可能になります。

  • 時間の経過に伴うコードの変化を追跡します。

  • 複数の開発者によって行われた変更をマージします。

  • コードレビューなどのソフトウェア開発プラクティス。

Databricks では、パイプラインに関連するすべてのコードに対して 1 つの Git リポジトリを構成することをお勧めします。

各開発者は、開発用に独自の Databricks Git フォルダーを構成する必要があります。 開発中、ユーザーは Databricks Git フォルダーから独自のパイプラインを構成し、開発データセットと分離されたスキーマと場所を使用して新しいロジックをテストします。 開発作業が完了すると、ユーザーは変更をコミットして中央の Git リポジトリ内のブランチにプッシュし、テスト ブランチまたは QA ブランチに対してプル リクエストを開きます。

結果のブランチは、Databricks Git フォルダーと、テスト データセットと開発スキーマを使用して構成されたパイプラインでチェックアウトする必要があります。 ロジックが期待どおりに実行されると仮定すると、変更を本番運用にプッシュするためにプル リクエストまたはリリース ブランチを準備する必要があります。

Git フォルダーを使用して環境間でコードを同期することはできますが、パイプライン設定は手動または Terraform などのツールを使用して最新の状態に保つ必要があります。

このワークフローは、すべての Databricks ジョブで CI/CD に Git フォルダーを使用する場合と似ています。 「Git および Databricks Git フォルダー (Repos) を使用した CI/CD テクニック」を参照してください。

取り込みと変換のためのセグメント ライブラリ ステップ

Databricks では、データをエンリッチして検証する変換ロジックからデータを取り込むクエリーを分離することをお勧めします。 その後、開発またはテストの Data からデータを取り込むために使用されるライブラリを、運用データの取り込みロジックとは別のディレクトリに整理して、さまざまな環境のパイプラインを簡単に構成できます。 その後、より小さなデータセットをテストに使用して、開発を加速できます。 「 開発およびテスト用のサンプル データセットの作成」を参照してください。

パラメーターを使用して、開発、テスト、および運用の Data を制御することもできます。 パラメーターを使用したデータソースの制御を参照してください。

Delta Live Tables パイプラインは、すべてのデータセット リレーションシップを管理するために LIVE 仮想スキーマを使用するため、サンプル データを読み込むインジェスト ライブラリを使用して開発パイプラインとテスト パイプラインを構成することで、運用テーブル名を使用してサンプル データセットを置き換えてコードをテストできます。 すべての環境で同じ変換ロジックを使用できます。

開発およびテスト用のサンプル データセットを作成する

Databricks では、開発データセットとテスト データセットを作成して、予想されるデータと、形式が正しくないレコードや破損している可能性のあるレコードの両方を使用してパイプライン ロジックをテストすることをお勧めします。 開発とテストに役立つデータセットを作成するには、次のような複数の方法があります。

  • 運用データセットからデータのサブセットを選択します。

  • PII を含むソースには、匿名化または人工的に生成されたデータを使用します。

  • ダウンストリーム変換ロジックに基づいて、明確に定義された結果を持つテスト データを作成します。

  • データ スキーマの期待を破るレコードを作成することで、潜在的なデータ破損、不正な形式のレコード、およびアップストリーム データの変更を予測します。

たとえば、次のコードを使用してデータセットを定義するノートブックがあるとします。

CREATE OR REFRESH STREAMING TABLE input_data AS SELECT * FROM cloud_files("/production/data", "json")

次のようなクエリーを使用して、特定のレコードを含むサンプルデータセットを作成できます。

CREATE OR REFRESH LIVE TABLE input_data AS
SELECT "2021/09/04" AS date, 22.4 as sensor_reading UNION ALL
SELECT "2021/09/05" AS date, 21.5 as sensor_reading

次の例は、パブリッシュされたデータにフィルタを適用して、開発またはテスト用の運用データのサブセットを作成する方法を示しています。

CREATE OR REFRESH LIVE TABLE input_data AS SELECT * FROM prod.input_data WHERE date > current_date() - INTERVAL 1 DAY

これらの異なるデータセットを使用するには、変換ロジックを実装するノートブックを使用して複数のパイプラインを作成します。 各パイプラインは LIVE.input_data データセットからデータを読み取ることができますが、環境に固有のデータセットを作成するノートブックを含めるように構成されています。

パラメーターを使用した Data の制御

パイプライン設定中に設定されたパラメーターをライブラリ内から参照できます。 これらのパラメーターは、パイプライン設定 UI のコン ピュート>高度な>設定 の部分でキーと値のペアとして設定されます。 このパターンを使用すると、同じパイプラインの異なる構成で異なる Data を指定できます。

たとえば、変数 data_source_path を使用してパイプラインの開発、テスト、および運用構成で異なるパスを指定し、次のコードを使用して参照できます。

CREATE STREAMING TABLE bronze
AS (
    SELECT
    *,
    _metadata.file_path AS source_file_path
    FROM cloud_files( '${data_source_path}', 'csv',
            map("header", "true"))
)
import dlt
from pyspark.sql.functions import col

data_source_path = spark.conf.get("data_source_path")

@dlt.table
def bronze():
    return (spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("header", True)
        .load(data_source_path )
        .select("*", col("_metadata.file_path").alias("source_file_name"))
    )

このパターンは、初期インジェスト中にインジェスト ロジックがスキーマや不正な形式のデータに対する変更を処理する方法をテストする必要がある場合に特に役立ちます。 データセットを切り替えながら、すべての環境でパイプライン全体で同じコードを使用できます。