Lakeflow 宣言型パイプライン Python 言語リファレンス
このセクションでは、 Lakeflow 宣言型パイプライン Python プログラミング インターフェイスについて詳しく説明します。
- Python for Lakeflow 宣言型パイプラインの概念情報と使用の概要については、「Pythonを使用したパイプラインコードの開発」を参照してください。
- SQLリファレンスについては、Lakeflow 宣言型パイプライン SQL 言語リファレンスを参照してください。
- Auto Loader の設定に関する詳細については、 「Auto Loader とは」を参照してください。
pipelinesモジュールの概要
LakeFlow宣言型パイプラインPython関数は、 pyspark.pipelinesモジュール ( dpとしてインポート) で定義されています。 Python API で実装されたパイプラインでは、このモジュールをインポートする必要があります。
from pyspark import pipelines as dp
パイプライン モジュールは、パイプラインのコンテキストでのみ使用できます。 パイプラインの外部で実行される Python では使用できません。パイプライン コードの編集の詳細については、 「 LakeFlow Pipelines Editor を使用したETLパイプラインの開発とデバッグ」を参照してください。
Apache Spark パイプライン
Apache Spark には Spark 4.1 以降の 宣言型パイプライン が含まれており、 pyspark.pipelinesモジュールを通じて利用できます。Databricks Runtime 、追加のAPIsと統合を使用してこれらのオープン ソース機能を拡張し、管理された本番運用で使用できるようにします。
オープンソースのpipelinesモジュールで記述されたコードは、変更せずに Databricks で実行されます。次の機能は Apache Spark の一部ではありません。
dp.create_auto_cdc_flowdp.create_auto_cdc_from_snapshot_flow@dp.expect(...)@dp.temporary_view
:::
@dltに何が起こったのでしょうか?
以前は、 Databricks dltモジュールを使用してLakeFlow宣言型パイプライン機能をサポートしていました。 dltモジュールはpyspark.pipelinesモジュールに置き換えられました。引き続きdlt使用することもできますが、Databricks ではpipelines使用を推奨しています。
データセット定義のための関数
LakeFlow宣言型パイプラインは、マテリアライズドビューやストリーミングテーブルなどのデータセットの定義にPythonデコレーターを使用します。 データセットを定義する関数を参照してください。
APIリファレンス
- append_flow
- 自動cdcフローを作成する
- スナップショットフローから自動 CDC を作成する
- シンクを作成する
- ストリーミングテーブルの作成
- エクスペクテーション
- マテリアライズドビュー
- テーブル
- 一時ビュー
Python Lakeflow 宣言型パイプラインに関する考慮事項
Lakeflow 宣言型パイプライン Python インターフェイスを使用してパイプラインを実装する場合の重要な考慮事項を次に示します。
- Lakeflow 宣言型パイプラインは、計画中およびパイプラインの実行中に、パイプラインを定義するコードを複数回評価します。 データセットを定義する Python 関数には、テーブルまたはビューの定義に必要なコードのみを含める必要があります。データセット定義に任意の Python ロジックが含まれていると、予期しない動作が発生する可能性があります。
- データセット定義にカスタム・モニタリング・ロジックを実装しようとしないでください。 「 イベント フックを使用した Lakeflow 宣言型パイプラインのカスタム モニタリングの定義」を参照してください。
- データセットを定義するために使用される関数は、Spark DataFrame を返す必要があります。返される DataFrame に関連しないロジックをデータセット定義に含めないでください。
- ファイルやテーブルを保存または書き込むメソッドを Lakeflow 宣言型パイプライン データセット コードの一部として使用しないでください。
宣言型パイプライン コードで使用 てはならない Apache Spark操作の例を次に示します。Lakeflow
collect()count()toPandas()save()saveAsTable()start()toTable()