メインコンテンツまでスキップ

DLT パイプラインの開発

パイプライン コードの開発とテストは、他の Apache Spark ワークロードとは異なります。この記事では、パイプライン コードの開発時にサポートされている機能、ベスト プラクティス、および考慮事項の概要について説明します。その他の推奨事項とベスト プラクティスについては、「 ソフトウェア開発と DevOps のベスト プラクティスを DLT パイプラインに適用する」を参照してください。

注記

パイプライン構成にソース コードを追加して、コードを検証したり、更新を実行したりする必要があります。 DLT パイプラインの設定を参照してください。

パイプラインのソースコードにはどのファイルが有効ですか?

DLT パイプライン コードには、Python または SQL を使用できます。1 つのパイプラインをサポートする Python と SQL のソース コード ファイルを組み合わせることができますが、各ファイルに含めることができる言語は 1 つだけです。 Python を使用したパイプライン コードの開発およびSQL を使用したパイプライン コードの開発を参照してください。

パイプラインのソース コードを指定するときに、ノートブックとワークスペース ファイルを使用できます。 ワークスペース ファイルは、任意の IDE または Databricks ファイル エディターで作成された Python または SQL スクリプトを表します。 「ワークスペースファイルとは」を参照してください。

Python コードをモジュールまたはライブラリとして開発する場合は、コードをインストールしてインポートし、ソース コードとして構成された Python ノートブックまたはワークスペース ファイルからメソッドを呼び出す必要があります。 DLT パイプラインの Python 依存関係の管理を参照してください。

注記

Python ノートブックで任意の SQL コマンドを使用する必要がある場合は、構文パターン パターン spark.sql("<QUERY>") を使用して SQL を Python コードとして実行できます。

関数を使用すると、Unity Catalog で使用する任意の ユーザー定義関数を登録できます。PythonSQL「Unity Catalog のユーザー定義関数 (UDF)」を参照してください。

DLT開発機能の概要

DLT は、Databricks の多くの機能を拡張して活用し、新しい機能と概念を導入します。次の表に、パイプライン コード開発をサポートする概念と機能の概要を示します。

機能

説明

開発モード

新しいパイプラインは、デフォルトで開発モードで実行されるように構成されています。 Databricks では、対話型の開発とテストに開発モードを使用することをお勧めします。 開発モードと本番運用モードを参照してください。

検証

Validate更新では、テーブルで更新を実行せずに、パイプラインのソースコードの正確性を検証します。テーブルの更新を待たずにパイプラインでエラーを確認するを参照してください。

ノートブック

DLT パイプラインのソース コードとして構成されたノートブックには、コードの検証と更新の実行のための対話型オプションが用意されています。ノートブックでの DLT パイプラインの開発とデバッグを参照してください。

パラメータ

ソース コードとパイプライン構成のパラメーターを活用して、テストと拡張性を簡素化します。 DLT パイプラインでのパラメーターの使用を参照してください。

Databricksアセットバンドル

Databricks アセット バンドルを使用すると、パイプライン構成とソース コードをワークスペース間で移動できます。 「DLT パイプラインを Databricks アセット バンドル プロジェクトに変換する」を参照してください。

開発とテスト用のサンプル データセットを作成する

Databricks では、開発データセットとテスト データセットを作成して、予期されるデータと、形式が正しくない、または破損している可能性のあるレコードを使用してパイプライン ロジックをテストすることをお勧めします。 開発とテストに役立つデータセットを作成するには、次のような複数の方法があります。

  • 本番運用 データセットからデータのサブセットを選択します。
  • PII を含むソースには、匿名化されたデータまたは人工的に生成されたデータを使用します。
  • ダウンストリームの変換ロジックに基づいて明確に定義された結果を持つテストデータを作成します。
  • データ スキーマのエクスペクテーションを破るレコードを作成することで、潜在的なデータ破損、不正な形式のレコード、およびアップストリーム データの変更を予測します。

たとえば、次のコードを使用してデータセットを定義するノートブックがあるとします。

SQL
CREATE OR REFRESH STREAMING TABLE input_data
AS SELECT * FROM STREAM read_files(
"/production/data",
format => "json")

特定のレコードを含むサンプル データセットは、次のようなクエリを使用して作成できます。

SQL
CREATE OR REFRESH MATERIALIZED VIEW input_data AS
SELECT "2021/09/04" AS date, 22.4 as sensor_reading UNION ALL
SELECT "2021/09/05" AS date, 21.5 as sensor_reading

次の例は、パブリッシュされたデータをフィルタリングして、開発またはテスト用の本番運用データのサブセットを作成する方法を示しています。

SQL
CREATE OR REFRESH MATERIALIZED VIEW input_data AS SELECT * FROM prod.input_data WHERE date > current_date() - INTERVAL 1 DAY

これらの異なるデータセットを使用するには、変換ロジックを実装するノートブックを使用して複数のパイプラインを作成します。 各パイプラインは input_data データセットからデータを読み取ることができますが、環境に固有のデータセットを作成するノートブックを含むように構成されています。

DLTデータセットはどのようにデータを処理しますか?

次の表では、マテリアライズドビュー、ストリーミングテーブル、およびビューがデータを処理する方法について説明します。

データセットのタイプ

定義されたクエリーによってレコードが処理される方法とは?

ストリーミングテーブル

各レコードは一度だけ処理されます。これは、追加専用のソースを前提としています。

マテリアライズドビュー

レコードは、現在のデータ状態の正確な結果を返すために、必要に応じて処理されます。マテリアライズドビューは、変換、集計、低速クエリや頻繁に使用される計算の事前計算などのデータ処理タスクに使用する必要があります。

ビュー

レコードは、ビューがクエリーされるたびに処理されます。パブリックデータセットに公開すべきではない中間変換やデータ品質チェックにはビューを使用します。

DLTで初めてのデータセットの宣言

DLT では、Python と SQL の新しい構文が導入されています。パイプライン構文の基本については、 Python を使用したパイプライン コードの開発 および SQL を使用したパイプライン コードの開発を参照してください。

注記

DLT はデータセット定義を更新処理から分離し、DLT ノートブックは対話型の実行を目的としていません。

DLT パイプラインはどのように構成しますか?

DLT パイプラインの設定は、大きく分けて 2 つのカテゴリに分類されます。

  1. DLT 構文を使用してデータセットを宣言するノートブックまたはファイルのコレクション ( ソース コード と呼ばれます) を定義する設定。
  2. パイプラインのインフラストラクチャ、依存関係の管理、更新の処理方法、ワークスペースでのテーブルの保存方法を制御する設定。

ほとんどの構成はオプションですが、特に運用パイプラインを構成する場合は、注意が必要です。これらには以下が含まれます:

  • パイプラインの外部でデータを利用できるようにするには、Hive metastoreに公開する ターゲットスキーマ 、またはUnity Catalogに公開する ターゲットカタログターゲットスキーマ を宣言する必要があります。
  • データアクセス権限は、実行に使用されるクラスターを通じて構成されます。 クラスターに、データソースとターゲット ストレージの場所 (指定されている場合) に適切な権限が設定されていることを確認します。

Python と SQL を使用してパイプラインのソース コードを記述する方法の詳細については、「 DLT SQL 言語リファレンス 」と「 DLT Python 言語リファレンス」を参照してください。

パイプラインの設定と設定の詳細については、「 DLT パイプラインの設定」を参照してください。

最初のパイプラインをデプロイし、更新をトリガーする

DLT でデータを処理する前に、パイプラインを構成する必要があります。パイプラインを設定したら、更新をトリガーして、パイプライン内の各データセットの結果を計算することができます。DLT パイプラインの使用を開始するには、「チュートリアル: 初めての DLT パイプラインの実行」を参照してください。

パイプラインの更新とは

パイプラインはインフラストラクチャをデプロイし、 更新 の開始時にデータの状態を再計算します。更新プログラムは、次の処理を行います:

  • 正しい構成でクラスターを開始します。
  • 定義されているすべてのテーブルとビューを検出し、無効な列名、依存関係の欠落、構文エラーなどの分析エラーをチェックします。
  • 使用可能な最新のデータでテーブルとビューを作成または更新します。

パイプラインは、ユースケースのコストとレイテンシの要件に応じて、継続的に実行することも、スケジュールに従って実行することもできます。 DLT パイプラインで更新プログラムを実行するを参照してください。

DLTによるデータの取り込み

DLT は、 Databricksで利用可能なすべてのデータソースをサポートしています。

Databricks 、ほとんどのインジェストのユースケースでストリーミングテーブルを使用することをお勧めします。 クラウド オブジェクト ストレージに到着するファイルの場合、Databricks では Auto Loader をお勧めします。DLT を使用して、ほとんどのメッセージバスからデータを直接取り込むことができます。

クラウドストレージへのアクセスの設定の詳細については、「 クラウドストレージの設定」を参照してください。

Auto Loaderでサポートされていない形式については、Python または SQL を使用して、Apache Sparkでサポートされている任意の形式をクエリできます。DLT によるデータのロードを参照してください。

データ品質の監視と適用

エクスペクテーション を使用して、データセットの内容に対するデータ品質管理を指定できます。制約を満たさないレコードを追加できない従来のデータベースの CHECK 制約とは異なり、エクスペクテーションは、データ品質要件を満たさないデータを処理するときに柔軟性を提供します。 この柔軟性により、乱雑になると予想されるデータや、厳しい品質要件を満たす必要があるデータを処理および保存できます。 パイプラインのエクスペクテーションを使用してデータ品質を管理するを参照してください。

DLTとDelta Lakeはどのように関連していますか?

DLT は Delta Lake の機能を拡張します。DLT によって作成および管理されるテーブルは Delta テーブルであるため、Delta Lake によって提供されるものと同じ保証と機能を備えています。「Delta Lake とは」を参照してください。

DLT は、Delta Lake で設定できる多くのテーブル プロパティに加えて、いくつかのテーブル プロパティを追加します。DLT プロパティのリファレンスDelta テーブルのプロパティのリファレンスを参照してください。

DLT によるテーブルの作成と管理方法

Databricks 、 DLTで作成された自動的にマネージドテーブルであり、テーブルの現在の状態を正しくコンピュートするために更新をどのように処理する必要があるかを判断し、さまざまなメンテナンスおよび最適化タスクを実行します。

ほとんどの操作では、DLT がターゲット テーブルに対するすべての更新、挿入、および削除を処理できるようにする必要があります。詳細と制限事項については、「 手動による削除または更新の保持」を参照してください。

DLTによって実行されるメンテナンスタスク

DLT は、テーブルが更新されてから 24 時間以内にメンテナンス タスクを実行します。メンテナンスにより、古いバージョンのテーブルを削除することで、クエリのパフォーマンスを向上させ、コストを削減できます。デフォルトによって、システムは完全な OPTIMIZE 操作を実行し、その後に vacuumを実行します。 テーブルの OPTIMIZE を無効にするには、テーブルのテーブルプロパティpipelines.autoOptimize.managed = false を設定します。メンテナンス タスクは、メンテナンス タスクがスケジュールされる前の 24 時間以内にパイプラインの更新が実行された場合にのみ実行されます。

Delta Live Tables が DLT になりました

以前は Delta Live Tables と呼ばれていた製品は、DLT になりました。

制限

制限事項の一覧については、「 DLT の制限事項」を参照してください。

Unity Catalog で DLT を使用する場合に固有の要件と制限事項の一覧については、「DLT パイプラインで Unity Catalog を使用する」を参照してください

追加のリソース