メインコンテンツまでスキップ

Lakeflow Spark宣言型パイプラインPython言語リファレンス

このセクションには、 Lakeflow Spark宣言型パイプライン (SDP) Pythonプログラミング インターフェイスの詳細が記載されています。

pipelinesモジュールの概要

Lakeflow Spark宣言型パイプラインPython関数は、 pyspark.pipelinesモジュール ( dpとしてインポート) で定義されています。 Python API で実装されたパイプラインでは、このモジュールをインポートする必要があります。

Python
from pyspark import pipelines as dp
注記

パイプライン モジュールは、パイプラインのコンテキストでのみ使用できます。 パイプラインの外部で実行される Python では使用できません。パイプライン コードの編集の詳細については、 「 Lakeflow Pipelines Editor を使用したETLパイプラインの開発とデバッグ」を参照してください。

Apache Spark™パイプライン

Apache Spark には Spark 4.1 以降の 宣言型パイプライン が含まれており、 pyspark.pipelinesモジュールを通じて利用できます。Databricks Runtime 、追加のAPIsと統合を使用してこれらのオープン ソース機能を拡張し、管理された本番運用で使用できるようにします。

オープンソースのpipelinesモジュールで記述されたコードは、変更せずに Databricks で実行されます。次の機能は Apache Spark の一部ではありません。

  • dp.create_auto_cdc_flow
  • dp.create_auto_cdc_from_snapshot_flow
  • @dp.expect(...)
  • @dp.temporary_view

pipelinesモジュールは以前、Databricks ではdltと呼ばれていました。詳細およびApache Sparkとの違いについては、 @dltに何が起こったか?」を参照してください。

データセット定義のための関数

パイプラインは、マテリアライズドビューやストリーミングテーブルなどのデータセットを定義するためにPythonデコレーターを使用します。 データセットを定義する関数を参照してください。

APIリファレンス

Pythonパイプラインに関する考慮事項

Lakeflow Spark宣言型パイプライン (SDP) Pythonインターフェイスを使用してパイプラインを実装するときの重要な考慮事項は次のとおりです。

  • SDP は、計画中およびパイプラインの実行中に、パイプラインを定義するコードを複数回評価します。データセットを定義する Python 関数には、テーブルまたはビューを定義するために必要なコードのみを含める必要があります。データセット定義に含まれる任意の Python ロジックにより、予期しない動作が発生する可能性があります。
  • データセット定義にカスタムモニタリングロジックを実装しようとしないでください。 「イベント フックを使用したパイプラインのカスタム モニタリングの定義」を参照してください。
  • データセットを定義するために使用される関数は、Spark DataFrame を返す必要があります。返される DataFrame に関連しないロジックをデータセット定義に含めないでください。
  • パイプライン データセット コードの一部として、ファイルやテーブルに保存または書き込むメソッドを使用しないでください。

パイプライン コードで使用すべきでない Apache Spark 操作の例:

  • collect()
  • count()
  • toPandas()
  • save()
  • saveAsTable()
  • start()
  • toTable()

@dltに何が起こったのでしょうか?

以前は、Databricks はパイプライン機能をサポートするためにdltモジュールを使用していました。dltモジュールはpyspark.pipelinesモジュールに置き換えられました。引き続きdlt使用することもできますが、Databricks ではpipelinesの使用を推奨しています。

DLT、SDP、Apache Sparkの違い

次の表は、 DLT 、 LakeFlow Spark宣言型パイプライン、およびApache Spark宣言型パイプラインの構文と機能の違いを示しています。

領域

DLT構文

SDP 構文 ( LakeFlowおよびApache 、該当する場合)

Apache Sparkで利用可能

輸入品

import dlt

from pyspark import pipelinesas dp 、オプション)

はい

ストリーミングテーブル

@dlt.table ストリーミングデータフレーム

@dp.table

はい

マテリアライズドビュー

@dlt.table バッチデータフレーム

@dp.materialized_view

はい

ビュー

@dlt.view

@dp.temporary_view

はい

フローの追加

@dlt.append_flow

@dp.append_flow

はい

SQL – ストリーミング

CREATE STREAMING TABLE ...

CREATE STREAMING TABLE ...

はい

SQL – マテリアライズド

CREATE MATERIALIZED VIEW ...

CREATE MATERIALIZED VIEW ...

はい

SQL – フロー

CREATE FLOW ...

CREATE FLOW ...

はい

イベントログ

spark.read.table("event_log")

spark.read.table("event_log")

No

変更を適用 (CDC)

dlt.apply_changes(...)

dp.create_auto_cdc_flow(...)

No

エクスペクテーション

@dlt.expect(...)

dp.expect(...)

No

連続モード

連続トリガーを使用したパイプライン構成

(同じ)

No

シンク

@dlt.create_sink(...)

dp.create_sink(...)

はい