メインコンテンツまでスキップ

データセットを定義する関数

pyspark.pipelines (ここではdpと別名で呼ばれる) モジュールは、デコレータを使用してコア機能の多くを実装します。これらのデコレータは、ストリーミング クエリまたはバッチ クエリのいずれかを定義し、Apache Spark DataFrame を返す関数を受け入れます。次の構文は、 LakeFlow宣言型パイプライン データセットを定義する簡単な例を示しています。

Python
from pyspark import pipelines as dp

@dp.table()
def function_name(): # This is the function decorated
return (<query>) # This is the query logic that defines the dataset

このページでは、 Lakeflow 宣言型パイプラインでデータセットを定義する関数とクエリの概要を説明します。 使用可能なデコレーターの完全な一覧については、「宣言型パイプラインLakeflow開発者向けリファレンス」を参照してください。

データセットの定義に使用する関数には、サードパーティのPython への呼び出しなど、データセットに関連しない任意のAPIs ロジックを含めることはできません。Lakeflow 宣言型パイプラインは、計画、検証、および更新中にこれらの関数を複数回実行します。 任意のロジックを含めると、予期しない結果が生じる可能性があります。

データセットの定義を開始するためにデータを読み取る

宣言型パイプライン データセットの定義に使用される関数は Lakeflow 通常、 spark.read または spark.readStream 操作で始まります。 これらの読み取り操作は、DataFrame を返す前に追加の変換を定義するために使用する静的またはストリーミング DataFrame オブジェクトを返します。DataFrame を返すスパーク操作のその他の例としては、 spark.tablespark.rangeなどがあります。

関数は、関数の外部で定義された データフレーム を参照しないでください。 異なるスコープで定義された データフレーム を参照しようとすると、予期しない動作が発生する可能性があります。 複数のテーブルを作成するためのメタプログラミング パターンの例については、for ループでのテーブルの作成を参照してください。

次の例は、バッチまたはストリーミング ロジックを使用してデータを読み取るための基本的な構文を示しています。

Python
from pyspark import pipelines as dp

# Batch read on a table
@dp.materialized_view()
def function_name():
return spark.read.table("catalog_name.schema_name.table_name")

# Batch read on a path
@dp.materialized_view()
def function_name():
return spark.read.format("parquet").load("/Volumes/catalog_name/schema_name/volume_name/data_path")


# Streaming read on a table
@dp.table()
def function_name():
return spark.readStream.table("catalog_name.schema_name.table_name")

# Streaming read on a path
@dp.table()
def function_name():
return (spark.read
.format("cloudFiles")
.option("cloudFile.format", "parquet")
.load("/Volumes/catalog_name/schema_name/volume_name/data_path")
)

外部 REST API からデータを読み取る必要がある場合は、Python カスタムデータソースを使用してこの接続を実装します。PySpark カスタムデータソースを参照してください。

注記

、dicts、listなどの データコレクションから任意の を作成することができます。Apache SparkDataFramesPythonPandasDataFramesこれらのパターンは、開発やテスト中に役立つ場合がありますが、ほとんどの本番運用 Lakeflow 宣言型パイプライン データセット定義は、ファイル、外部システム、または既存のテーブルやビューからデータをロードすることから始める必要があります。

連鎖変換

Lakeflow 宣言型パイプラインは、ほぼすべての Apache Spark DataFrame 変換をサポートしています。 データセット定義関数には任意の数の変換を含めることができますが、使用するメソッドが常に DataFrame オブジェクトを返すようにする必要があります。

複数のダウンストリーム ワークロードを駆動する中間変換があるが、それをテーブルとして実現する必要がない場合は、 @dp.temporary_view()を使用してパイプラインに一時ビューを追加します。その後、複数の下流データセット定義でspark.read.table("temp_view_name")を使用してこのビューを参照できます。次の構文はこのパターンを示しています。

Python
from pyspark import pipelines as dp

@dp.temporary_view()
def a():
return spark.read.table("source").filter(...)

@dp.materialized_view()
def b():
return spark.read.table("a").groupBy(...)

@dp.materialized_view()
def c():
return spark.read.table("a").groupBy(...)

これにより Lakeflow 宣言型パイプラインは、パイプライン計画中にビュー内の変換を完全に認識し、データセット定義の外部で実行される任意の Python コードに関連する潜在的な問題を防ぐことができます。

関数内では、次の例のように、増分結果をビュー、マテリアライズドビュー、またはストリーミングテーブルとして書き込むことなく、 データフレーム を連鎖させて新しい データフレーム を作成できます。

Python
from pyspark import pipelines as dp

@dp.table()
def multiple_transformations():
df1 = spark.read.table("source").filter(...)
df2 = df1.groupBy(...)
return df2.filter(...)

すべての データフレーム バッチロジックを使用して初期読み取りを実行する場合、戻り値は静的な データフレームになります。 ストリーミングしているクエリがある場合、返される結果はストリーミング データフレーム です。

データフレーム を返す

@dp.tableを使用して、ストリーミング読み取りの結果からストリーミング テーブルを作成します。 @dp.materialized_viewを使用して、バッチ読み取りの結果からマテリアライズドビューを作成します。 他のほとんどのデコレーターはストリーミング DataFrame と静的DataFrames両方で動作しますが、一部のデコレーターはストリーミングDataFrame必要とします。

データセットの定義に使用される関数は、Spark DataFrame を返す必要があります。ファイルやテーブルを保存または書き込むメソッドを Lakeflow 宣言型パイプライン データセット コードの一部として使用しないでください。

宣言型パイプライン コードで使用 てはならない Apache Spark操作の例を次に示します。Lakeflow

  • collect()
  • count()
  • toPandas()
  • save()
  • saveAsTable()
  • start()
  • toTable()
注記

LakeflowDeclarativePandas Sparkパイプラインは、データセット定義関数の での の使用もサポートしています。Spark の Pandas API を参照してください。

PythonパイプラインでSQLを使用する

PySpark は、SQL を使用して DataFrame コードを記述するための spark.sql 演算子をサポートしています。このパターンを 宣言型パイプライン ソース コードで使用する Lakeflow 、マテリアライズドビュー または ストリーミングテーブルにコンパイルされます。

次のコード例は、データセット クエリ ロジックにspark.read.table("catalog_name.schema_name.table_name")使用するのと同じです。

Python
@dp.materialized_view
def my_table():
return spark.sql("SELECT * FROM catalog_name.schema_name.table_name")

dlt.readdlt.read_stream (レガシー)

古いdltモジュールには、従来のパイプライン公開モードの機能をサポートするために導入されたdlt.read()関数とdlt.read_stream()関数が含まれています。これらのメソッドはサポートされていますが、Databricks では次の理由から、常にspark.read.table()spark.readStream.table()関数を使用することをお勧めします。

  • dlt関数は、現在のパイプラインの外部で定義されたデータセットの読み取りを限定的にサポートしています。
  • spark関数は、読み取り操作にskipChangeCommitsなどのオプションを指定することをサポートしています。オプションの指定はdlt関数ではサポートされていません。
  • dltモジュール自体はpyspark.pipelinesモジュールに置き換えられました。Databricks 、 PythonでLakeFlow宣言型パイプライン コードを作成するときに使用するために、 from pyspark import pipelines as dpを使用してpyspark.pipelinesインポートすることをお勧めします。