パイプラインを開発する
パイプライン コードの開発とテストは、他の Apache Spark ワークロードとは異なります。この記事では、パイプライン コードを開発する際にサポートされる機能、ベスト プラクティス、および考慮事項の概要を説明します。その他の推奨事項とベスト プラクティスについては、 「ソフトウェア開発と DevOps のベスト プラクティスをパイプラインに適用する」を参照してください。
コードを検証したり更新を実行したりするには、パイプライン構成にソース コードを追加する必要があります。「パイプラインの構成」を参照してください。
パイプラインのソース コードに有効なファイルは何ですか?
パイプライン コードは Python または SQL にすることができます。1 つのパイプラインで Python と SQL のソース コード ファイルを混在させることができますが、各ファイルに含めることができる言語は 1 つだけです。「Python でパイプライン コードを開発する」および「SQL でパイプライン コードを開発する」を参照してください。
パイプラインのソース ファイルはワークスペースに保存されます。ワークスペース ファイルは、 Lakeflow Pipelines Editor で作成されたPythonまたはSQLスクリプトを表します。 好みの IDE でローカルにファイルを編集し、ワークスペースに同期することもできます。ワークスペース内のファイルについては、 「ワークスペース ファイルとは何ですか?」を参照してください。 。 Lakeflow Pipelines Editor を使用した編集については、 「 Lakeflow Pipelines Editor を使用したETLパイプラインの開発とデバッグ」を参照してください。 ローカル IDE でのコードの作成については、 「ローカル開発環境でのパイプライン コードの開発」を参照してください。
Python コードをモジュールまたはライブラリとして開発する場合は、コードをインストールしてインポートし、ソース コードとして構成された Python ファイルからメソッドを呼び出す必要があります。「パイプラインの Python 依存関係の管理」を参照してください。
Python ノートブックで任意の SQL コマンドを使用する必要がある場合は、構文パターンspark.sql("<QUERY>")を使用して SQL を Python コードとして実行できます。
Unity Catalogの関数を使用すると、SQLで使用する任意のPythonユーザー定義関数を登録できます。Unity Catalog のユーザー定義関数 (UDF)を参照してください。
パイプライン開発機能の概要
パイプラインは、多くの Databricks 開発機能を拡張および活用し、新しい機能と概念を導入します。次の表は、パイプライン コード開発をサポートする概念と機能の概要を示しています。
機能 | 説明 |
|---|---|
開発モード | パイプラインを対話的に実行すると ( Lakeflow Pipelines Editor を通じて更新を選択することによって)、 開発モード が使用されます。 新しいパイプラインは、スケジュールまたは自動トリガーによって自動的に実行される場合、開発モードをオフにして実行されます。開発モードを参照してください。 |
ドライラン | ドライ ラン 更新では、テーブルの更新を実行せずに、パイプラインのソース コードの正確性を検証します。「テーブルの更新を待たずにパイプラインのエラーをチェックする」を参照してください。 |
Lakeflow Pipelines エディター | パイプラインのソース コードとして構成された Python および SQL ファイルには、コードを検証し、更新を実行するための対話型オプションが用意されています。「 Lakeflow Pipelines Editor を使用したETLパイプラインの開発とデバッグ」を参照してください。 |
パラメータ | ソース コードとパイプライン構成でパラメーターを活用して、テストと拡張性を簡素化します。 パイプラインでのパラメーターの使用を参照してください。 |
Databricksアセットバンドル | Databricks アセット バンドルを使用すると、パイプライン構成とソース コードをワークスペース間で移動できます。「パイプラインを Databricks Asset Bundle プロジェクトに変換する」を参照してください。 |
開発とテスト用のサンプルデータセットを作成する
Databricks では、予想されるデータと、不正な形式または破損の可能性があるレコードを使用してパイプライン ロジックをテストするための開発データセットとテスト データセットを作成することをお勧めします。開発とテストに役立つデータセットを作成するには、次のような複数の方法があります。
- 本番運用データセットからデータのサブセットを選択します。
- 個人情報(PII)を含むソースには匿名化されたデータまたは人工的に生成されたデータを使用します。
fakerライブラリを使用してテスト用のデータを生成するチュートリアルを確認するには、チュートリアル: チェンジデータ キャプチャを使用してETLパイプラインを構築するをご覧ください。 - 下流の変換ロジックに基づいて、明確に定義された結果を持つテスト データを作成します。
- データ スキーマのエクスペクテーションを破るレコードを作成することで、潜在的なデータ破損、不正な形式のレコード、およびアップストリーム データの変更を予測します。
たとえば、次のコードを使用してデータセットを定義するファイルがあるとします。
CREATE OR REFRESH STREAMING TABLE input_data
AS SELECT * FROM STREAM read_files(
"/production/data",
format => "json")
次のようなクエリを使用して、レコードのサブセットを含むサンプル データセットを作成できます。
CREATE OR REFRESH MATERIALIZED VIEW input_data AS
SELECT "2021/09/04" AS date, 22.4 as sensor_reading UNION ALL
SELECT "2021/09/05" AS date, 21.5 as sensor_reading
次の例は、公開データをフィルタリングして、開発またはテスト用の本番運用データのサブセットを作成する方法を示しています。
CREATE OR REFRESH MATERIALIZED VIEW input_data AS SELECT * FROM prod.input_data WHERE date > current_date() - INTERVAL 1 DAY
これらの異なるデータセットを使用するには、変換ロジックを実装するソース コードを使用して複数のパイプラインを作成します。各パイプラインはinput_dataデータセットからデータを読み取ることができますが、環境に固有のデータセットを作成するファイルを含めるように構成されています。
パイプライン データセットはどのようにデータを処理しますか?
次の表は、マテリアライズドビュー、ストリーミングテーブル、およびビューがデータを処理する方法を示しています。
データセットのタイプ | 定義されたクエリーによってレコードが処理される方法とは? |
|---|---|
ストリーミングテーブル | 各レコードは一度だけ処理されます。これは、追加専用のソースを前提としています。 |
マテリアライズドビュー | レコードは、現在のデータ状態に対して正確な結果を返すために必要に応じて処理されます。マテリアライズドビューは、変換、集計、低速クエリや頻繁に使用される計算の事前計算などのデータ処理タスクに使用する必要があります。 結果は更新間でキャッシュされます。 |
ビュー | レコードは、ビューがクエリーされるたびに処理されます。パブリックデータセットに公開すべきではない中間変換やデータ品質チェックにはビューを使用します。 |
パイプラインで最初のデータセットを宣言する
パイプラインは Python と SQL に新しい構文を導入します。パイプライン構文の基本については、 「Python でパイプライン コードを開発する」および「SQL でパイプライン コードを開発する」を参照してください。
パイプラインはデータセット定義を更新処理から分離しており、パイプライン ソースは対話型の実行を目的としていません。
パイプラインをどのように構成しますか?
パイプラインの設定は、大きく分けて 2 つのカテゴリに分類されます。
- パイプライン構文を使用してデータセットを宣言するファイル ( ソース コード と呼ばれる) のコレクションを定義する構成。
- パイプラインのインフラストラクチャ、依存関係の管理、更新の処理方法、ワークスペースでのテーブルの保存方法を制御する設定。
ほとんどの構成はオプションですが、特に運用パイプラインを構成する場合は、注意が必要です。これらには以下が含まれます:
- パイプラインの外部でデータを利用できるようにするには、Hive metastoreに公開する ターゲットスキーマ 、またはUnity Catalogに公開する ターゲットカタログ と ターゲットスキーマ を宣言する必要があります。
- データアクセス権限は、実行に使用されるクラスターを通じて構成されます。 クラスターに、データソースとターゲット ストレージの場所 (指定されている場合) に適切な権限が設定されていることを確認します。
PythonとSQL使用してパイプラインのソース コードを作成する方法の詳細については、 「パイプラインSQL言語リファレンス」およびLakeflow Spark宣言型パイプラインPython言語リファレンス」を参照してください。
パイプラインの設定と構成の詳細については、 「パイプラインの構成」を参照してください。
最初のパイプラインをデプロイして更新をトリガーする
SDP を使用してデータを処理するには、パイプラインを構成します。パイプラインを構成したら、更新をトリガーして、パイプライン内の各データセットの結果を計算できます。パイプラインの使用を開始するには、 「チュートリアル: チェンジデータ キャプチャを使用してETLパイプラインを構築する」を参照してください。
パイプライン更新とは何ですか?
パイプラインはインフラストラクチャをデプロイし、 更新 の開始時にデータの状態を再計算します。更新プログラムは、次の処理を行います:
- 正しい構成でクラスターを開始します。
- 定義されているすべてのテーブルとビューを検出し、無効な列名、依存関係の欠落、構文エラーなどの分析エラーがないかチェックします。
- 使用可能な最新のデータでテーブルとビューを作成または更新します。
パイプラインは、ユースケースのコストとレイテンシーの要件に応じて、継続的に実行することも、スケジュールに従って実行することもできます。「パイプラインの更新を実行する」を参照してください。
パイプラインでデータを取り込む
パイプラインは、 Databricksで利用可能なすべてのデータ ソースをサポートします。
Databricksほとんどの取り込みユースケースでストリーミング テーブルを使用することをお勧めします。 クラウド オブジェクト ストレージに到着するファイルの場合、Databricks では Auto Loader を推奨します。ほとんどのメッセージ バスからパイプラインを使用してデータを直接取り込むことができます。
クラウド ストレージへのアクセスの構成の詳細については、 「ストレージ クラウドの構成」を参照してください。
Auto Loader でサポートされていない形式については、Python または SQL を使用して、Apache Spark でサポートされている任意の形式をクエリできます。「パイプラインでデータをロードする」を参照してください。
データ品質の監視と強化
エクスペクテーション を使用して、データセットの内容に対するデータ品質管理を指定できます。制約を満たさないレコードを追加できない従来のデータベースの CHECK 制約とは異なり、エクスペクテーションは、データ品質要件を満たさないデータを処理するときに柔軟性を提供します。 この柔軟性により、乱雑になると予想されるデータや、厳しい品質要件を満たす必要があるデータを処理および保存できます。 パイプラインの期待値を使用してデータ品質を管理するを参照してください。
Lakeflow Spark宣言型パイプラインとDelta Lakeにはどのような関係がありますか?
SDP は Delta Lake の機能を拡張します。パイプラインによって作成および管理されるテーブルは Delta テーブルであるため、Delta Lake によって提供されるものと同じ保証と機能があります。Databricks の Delta Lake とは何ですか? を参照してください。
パイプラインは、Delta Lake で設定できる多数のテーブル プロパティに加えて、いくつかのテーブル プロパティを追加します。「パイプライン プロパティ リファレンス」およびDeltaテーブル プロパティ リファレンス」を参照してください。
パイプラインによるテーブルの作成と管理方法
Databricksパイプラインによってマネージド テーブルを自動的に作成し、テーブルの現在の状態を正しくコンピュートするために更新をどのように処理する必要があるかを判断し、多数のメンテナンスと最適化タスクを実行します。
ほとんどの操作では、パイプラインがターゲット テーブルへのすべての更新、挿入、および削除を処理できるようにする必要があります。詳細と制限については、 「手動による削除または更新の保持」を参照してください。
パイプラインによって実行されるメンテナンスタスク
Databricks 、予測的最適化を使用して、パイプラインによって管理されているテーブルのメンテナンス タスクを最適な頻度で実行します。 メンテナンスを行うと、古いバージョンのテーブルを削除することでクエリのパフォーマンスが向上し、コストが削減されます。これには完全なOPTIMIZEおよびvacuum操作が含まれます。 メンテナンスタスクは、予測的最適化によって決定されたスケジュールに基づいて、前回のメンテナンス以降にパイプライン更新が実行されている場合にのみ実行されます。
予測的最適化実行の頻度とメンテナンス コストを理解するには、予測的最適化システム テーブル リファレンスを参照してください。
制限事項
制限事項の一覧については、 「パイプラインの制限事項」を参照してください。
Unity Catalog でパイプラインを使用する場合に特有の要件と制限事項の一覧については、 「パイプラインで Unity Catalog を使用する」を参照してください。
その他のリソース
- パイプラインは、Databricks REST API で完全にサポートされています。「パイプラインREST APIを参照してください。
- パイプラインとテーブルの設定については、 「パイプライン プロパティのリファレンス」を参照してください。
- パイプライン SQL 言語リファレンス。
- Lakeflow Spark宣言型パイプラインPython言語リファレンス。