メインコンテンツまでスキップ

データセットを定義する関数

dltモジュールは、デコレータを使用してそのコア機能の多くを実装します。これらのデコレータは、ストリーミングクエリまたはバッチクエリを定義し、Apache Spark DataFrame を返す関数を受け入れます。次の構文は、 Lakeflow 宣言型パイプライン データセットを定義する簡単な例を示しています。

import dlt

@dlt.table()
def function_name(): # This is the function decorated
return (<query>) # This is the query logic that defines the dataset

このページでは、 Lakeflow 宣言型パイプラインでデータセットを定義する関数とクエリの概要を説明します。 使用可能なデコレーターの完全な一覧については、「宣言型パイプラインLakeflow開発者向けリファレンス」を参照してください。

データセットの定義に使用する関数には、サードパーティのPython への呼び出しなど、データセットに関連しない任意のAPIs ロジックを含めることはできません。Lakeflow 宣言型パイプラインは、計画、検証、および更新中にこれらの関数を複数回実行します。 任意のロジックを含めると、予期しない結果が生じる可能性があります。

データを読み取ってデータセット定義を開始する

宣言型パイプライン データセットの定義に使用される関数は Lakeflow 通常、 spark.read または spark.readStream 操作で始まります。 これらの読み取り操作は、DataFrame を返す前に追加の変換を定義するために使用する静的またはストリーミング DataFrame オブジェクトを返します。DataFrame を返すスパーク操作のその他の例としては、 spark.tablespark.rangeなどがあります。

関数は、関数の外部で定義された データフレーム を参照しないでください。 異なるスコープで定義された データフレーム を参照しようとすると、予期しない動作が発生する可能性があります。 複数のテーブルを作成するためのメタプログラミング パターンの例については、for ループでのテーブルの作成を参照してください。

次の例は、バッチ ロジックまたはストリーミング ロジックを使用してデータを読み取るための基本的な構文を示しています。

Python
import dlt

# Batch read on a table
@dlt.table()
def function_name():
return spark.read.table("catalog_name.schema_name.table_name")

# Batch read on a path
@dlt.table()
def function_name():
return spark.read.format("parquet").load("/Volumes/catalog_name/schema_name/volume_name/data_path")


# Streaming read on a table
@dlt.table()
def function_name():
return spark.readStream.table("catalog_name.schema_name.table_name")

# Streaming read on a path
@dlt.table()
def function_name():
return (spark.read
.format("cloudFiles")
.option("cloudFile.format", "parquet")
.load("/Volumes/catalog_name/schema_name/volume_name/data_path")
)

外部 REST API からデータを読み取る必要がある場合は、Python カスタムデータソースを使用してこの接続を実装します。PySpark カスタムデータソースを参照してください。

注記

、dicts、listなどの データコレクションから任意の を作成することができます。Apache SparkDataFramesPythonPandasDataFramesこれらのパターンは、開発やテスト中に役立つ場合がありますが、ほとんどの本番運用 Lakeflow 宣言型パイプライン データセット定義は、ファイル、外部システム、または既存のテーブルやビューからデータをロードすることから始める必要があります。

変換の連鎖

Lakeflow 宣言型パイプラインは、ほぼすべての Apache Spark DataFrame 変換をサポートしています。 データセット定義関数には任意の数の変換を含めることができますが、使用するメソッドが常に DataFrame オブジェクトを返すようにする必要があります。

複数のダウンストリーム ワークロードを駆動する中間変換があるが、それをテーブルとして具体化する必要がない場合は、 @dlt.view() を使用してパイプラインに一時的なビューを追加します。その後、複数のダウンストリーム データセット定義で spark.read.table("temp_view_name") を使用して、このビューを参照できます。次の構文は、このパターンを示しています。

Python
import dlt

@dlt.view()
def a():
return spark.read.table("source").filter(...)

@dlt.table()
def b():
return spark.read.table("b").groupBy(...)

@dlt.table()
def c():
return spark.read.table("c").groupBy(...)

これにより Lakeflow 宣言型パイプラインは、パイプライン計画中にビュー内の変換を完全に認識し、データセット定義の外部で実行される任意の Python コードに関連する潜在的な問題を防ぐことができます。

関数内では、次の例のように、増分結果をビュー、マテリアライズドビュー、またはストリーミングテーブルとして書き込むことなく、 データフレーム を連鎖させて新しい データフレーム を作成できます。

Python
import dlt

@dlt.table()
def multiple_transformations():
df1 = spark.read.table("source").filter(...)
df2 = df1.groupBy(...)
return df2.filter(...)

すべての データフレーム バッチロジックを使用して初期読み取りを実行する場合、戻り値は静的な データフレームになります。 ストリーミングしているクエリがある場合、返される結果はストリーミング データフレーム です。

データフレーム を返す

@dlt.table() デコレータの場合、静的な データフレーム を返すことは、マテリアライズドビューを定義していることを意味します。ストリーミング データフレーム を返すということは、ストリーミングテーブルを定義していることを意味します。 ほとんどのデコレータはストリーミングと静的 データフレームの両方で動作しますが、他のデコレータはストリーミング データフレームを必要とします。

データセットの定義に使用される関数は、Spark DataFrame を返す必要があります。ファイルやテーブルを保存または書き込むメソッドを Lakeflow 宣言型パイプライン データセット コードの一部として使用しないでください。

宣言型パイプライン コードで使用 てはならない Apache Spark操作の例を次に示します。Lakeflow

  • collect()
  • count()
  • toPandas()
  • save()
  • saveAsTable()
  • start()
  • toTable()
注記

LakeflowDeclarativePandas Sparkパイプラインは、データセット定義関数の での の使用もサポートしています。Spark の Pandas API を参照してください。

Python パイプラインで SQL を使用する

PySpark は、SQL を使用して DataFrame コードを記述するための spark.sql 演算子をサポートしています。このパターンを 宣言型パイプライン ソース コードで使用する Lakeflow 、マテリアライズドビュー または ストリーミングテーブルにコンパイルされます。

次のコード例は、データセットのクエリ ロジックに spark.read.table("catalog_name.schema_name.table_name") を使用する場合と同じです。

Python
@dlt.table
def my_table():
return spark.sql("SELECT * FROM catalog_name.schema_name.table_name")

dlt.readdlt.read_stream (レガシー)

dlt モジュールには、従来のパイプライン発行モードでの機能をサポートするために導入されたdlt.read()関数とdlt.read_stream()関数が含まれています。これらの方法はサポートされていますが、Databricks では、次の理由から、常に spark.read.table() 関数と spark.readStream.table() 関数を使用することをお勧めします。

  • dlt 関数では、現在のパイプラインの外部で定義されたデータセットの読み取りに対するサポートが制限されています。
  • spark 関数は、読み取り操作に対するオプション (skipChangeCommitsなど) の指定をサポートしています。オプションの指定は、 dlt 関数ではサポートされていません。