メインコンテンツまでスキップ

データセットを定義する関数

dltモジュールは、デコレータを使用してそのコア機能の多くを実装します。これらのデコレータは、ストリーミングクエリまたはバッチクエリを定義し、Apache Spark データフレーム を返す関数を受け入れます。次の構文は、DLT データセットを定義する簡単な例を示しています。

import dlt

@dlt.table()
def function_name(): # This is the function decorated
return (<query>) # This is the query logic that defines the dataset

このページでは、DLT でデータセットを定義する関数とクエリの概要について説明します。利用可能なデコレータの完全なリストについては、 DLT 開発者リファレンスを参照してください。

データセットの定義に使用する関数には、サードパーティのPython への呼び出しなど、データセットに関連しない任意のAPI ロジックを含めることはできません。DLT は、計画、検証、および更新中にこれらの関数を複数回実行します。任意のロジックを含めると、予期しない結果が生じる可能性があります。

データを読み取ってデータセット定義を開始する

DLT データセットの定義に使用される関数は、通常、 spark.read 操作または spark.readStream 操作で始まります。これらの読み取り操作は、データフレーム を返す前に追加の変換を定義するために使用する静的またはストリーミング データフレーム オブジェクトを返します。データフレーム を返すスパーク操作のその他の例としては、 spark.tablespark.rangeなどがあります。

関数は、関数の外部で定義された データフレーム を参照しないでください。 異なるスコープで定義された データフレーム を参照しようとすると、予期しない動作が発生する可能性があります。 複数のテーブルを作成するためのメタプログラミング パターンの例については、for ループでのテーブルの作成を参照してください。

次の例は、バッチ ロジックまたはストリーミング ロジックを使用してデータを読み取るための基本的な構文を示しています。

Python
import dlt

# Batch read on a table
@dlt.table()
def function_name():
return spark.read.table("catalog_name.schema_name.table_name")

# Batch read on a path
@dlt.table()
def function_name():
return spark.read.format("parquet").load("/Volumes/catalog_name/schema_name/volume_name/data_path")


# Streaming read on a table
@dlt.table()
def function_name():
return spark.readStream.table("catalog_name.schema_name.table_name")

# Streaming read on a path
@dlt.table()
def function_name():
return (spark.read
.format("cloudFiles")
.option("cloudFile.format", "parquet")
.load("/Volumes/catalog_name/schema_name/volume_name/data_path")
)

外部 REST API からデータを読み取る必要がある場合は、Python カスタムデータソースを使用してこの接続を実装します。PySpark カスタムデータソースを参照してください。

注記

pandasデータフレーム、dict、listなどの データコレクションから任意のApache Sparkデータフレームを作成することができます。これらのパターンは、開発やテスト中に役立つ場合がありますが、ほとんどの本番運用 DLT データセットの定義は、ファイル、外部システム、または既存のテーブルやビューからデータをロードすることから始める必要があります。

変換の連鎖

DLT は、ほぼすべての Apache Spark データフレーム 変換をサポートしています。データセット定義関数には任意の数の変換を含めることができますが、使用するメソッドが常に データフレーム オブジェクトを返すようにする必要があります。

複数のダウンストリーム ワークロードを駆動する中間変換があるが、それをテーブルとして具体化する必要がない場合は、 @dlt.view() を使用してパイプラインに一時的なビューを追加します。その後、複数のダウンストリーム データセット定義で spark.read.table("temp_view_name") を使用して、このビューを参照できます。次の構文は、このパターンを示しています。

Python
import dlt

@dlt.view()
def a():
return spark.read.table("source").filter(...)

@dlt.table()
def b():
return spark.read.table("b").groupBy(...)

@dlt.table()
def c():
return spark.read.table("c").groupBy(...)

これにより、DLT はパイプライン計画中にビュー内の変換を完全に認識し、データセット定義の外部で実行される任意の Python コードに関連する潜在的な問題を防ぐことができます。

関数内では、次の例のように、増分結果をビュー、マテリアライズドビュー、またはストリーミングテーブルとして書き込むことなく、 データフレーム を連鎖させて新しい データフレーム を作成できます。

Python
import dlt

@dlt.table()
def multiple_transformations():
df1 = spark.read.table("source").filter(...)
df2 = df1.groupBy(...)
return df2.filter(...)

すべての データフレーム バッチロジックを使用して初期読み取りを実行する場合、戻り値は静的な データフレームになります。 ストリーミングしているクエリがある場合、返される結果はストリーミング データフレーム です。

データフレーム を返す

@dlt.table() デコレータの場合、静的な データフレーム を返すことは、マテリアライズドビューを定義していることを意味します。ストリーミング データフレーム を返すということは、ストリーミングテーブルを定義していることを意味します。 ほとんどのデコレータはストリーミングと静的 データフレームの両方で動作しますが、他のデコレータはストリーミング データフレームを必要とします。

データセットの定義に使用される関数は、Spark データフレーム を返す必要があります。DLT データセット コードの一部としてファイルやテーブルを保存または書き込むメソッドは絶対に使用しないでください。

DLT コードで使用すべきでない Apache Spark 操作の例:

  • collect()
  • count()
  • toPandas()
  • save()
  • saveAsTable()
  • start()
  • toTable()
注記

DLT では、データセット定義関数に Spark 上の Pandas を使用することもサポートしています。Spark の Pandas API を参照してください。

Python パイプラインで SQL を使用する

PySpark は、SQL を使用して データフレーム コードを記述するための spark.sql 演算子をサポートしています。このパターンを DLT ソースコードで使用すると、マテリアライズドビューまたはストリーミングテーブルにコンパイルされます。

次のコード例は、データセットのクエリ ロジックに spark.read.table("catalog_name.schema_name.table_name") を使用する場合と同じです。

Python
@dlt.table
def my_table():
return spark.sql("SELECT * FROM catalog_name.schema_name.table_name")

dlt.readdlt.read_stream (レガシー)

dlt モジュールには、従来のパイプライン発行モードでの機能をサポートするために導入されたdlt.read()関数とdlt.read_stream()関数が含まれています。これらの方法はサポートされていますが、Databricks では、次の理由から、常に spark.read.table() 関数と spark.readStream.table() 関数を使用することをお勧めします。

  • dlt 関数では、現在のパイプラインの外部で定義されたデータセットの読み取りに対するサポートが制限されています。
  • spark 関数は、読み取り操作に対するオプション (skipChangeCommitsなど) の指定をサポートしています。オプションの指定は、 dlt 関数ではサポートされていません。