データセットを定義する関数
pyspark.pipelines (ここではdpと別名で呼ばれる) モジュールは、デコレータを使用してコア機能の多くを実装します。これらのデコレータは、ストリーミング クエリまたはバッチ クエリのいずれかを定義し、Apache Spark DataFrame を返す関数を受け入れます。次の構文は、パイプライン データセットを定義するための簡単な例を示しています。
from pyspark import pipelines as dp
@dp.table()
def function_name(): # This is the function decorated
return (<query>) # This is the query logic that defines the dataset
このページでは、パイプラインでデータセットを定義する関数とクエリの概要を説明します。使用可能なデコレータの完全なリストについては、 「パイプライン開発者リファレンス」を参照してください。
データセットの定義に使用する関数には、サードパーティAPIsの呼び出しなど、データセットに関係のない任意のPythonロジックを含めてはいけません。 パイプラインは、計画、検証、および更新中にこれらの関数を複数回実行します。任意のロジックを含めると予期しない結果が生じる可能性があります。
データセットの定義を開始するためにデータを読み取る
パイプライン データセットを定義するために使用される関数は、通常、 spark.readまたはspark.readStream操作で始まります。これらの読み取り操作は、DataFrame を返す前に追加の変換を定義するために使用する静的またはストリーミング DataFrame オブジェクトを返します。DataFrame を返す Spark 操作の他の例としては、 spark.tableやspark.rangeなどがあります。
関数は、関数の外部で定義された データフレーム を参照しないでください。 異なるスコープで定義された データフレーム を参照しようとすると、予期しない動作が発生する可能性があります。 複数のテーブルを作成するためのメタプログラミング パターンの例については、for ループでのテーブルの作成を参照してください。
次の例は、バッチまたはストリーミング ロジックを使用してデータを読み取るための基本的な構文を示しています。
from pyspark import pipelines as dp
# Batch read on a table
@dp.materialized_view()
def function_name():
return spark.read.table("catalog_name.schema_name.table_name")
# Batch read on a path
@dp.materialized_view()
def function_name():
return spark.read.format("parquet").load("/Volumes/catalog_name/schema_name/volume_name/data_path")
# Streaming read on a table
@dp.table()
def function_name():
return spark.readStream.table("catalog_name.schema_name.table_name")
# Streaming read on a path
@dp.table()
def function_name():
return (spark.read
.format("cloudFiles")
.option("cloudFile.format", "parquet")
.load("/Volumes/catalog_name/schema_name/volume_name/data_path")
)
外部 REST API からデータを読み取る必要がある場合は、Python カスタムデータソースを使用してこの接続を実装します。PySpark カスタムデータソースを参照してください。
Pandas DataFrames 、dicts、リストなどのPythonデータコレクションから任意のApache Spark DataFramesを作成できます。 これらのパターンは開発やテスト中に役立つ場合がありますが、ほとんどの本番運用パイプライン データセット定義は、ファイル、外部システム、または既存のテーブルやビューからデータをロードすることから始める必要があります。
連鎖変換
パイプラインは、ほぼすべての Apache Spark DataFrame 変換をサポートします。データセット定義関数には任意の数の変換を含めることができますが、使用するメソッドが常に DataFrame オブジェクトを返すようにする必要があります。
複数のダウンストリーム ワークロードを駆動する中間変換があるが、それをテーブルとして実現する必要がない場合は、 @dp.temporary_view()を使用してパイプラインに一時ビューを追加します。その後、複数の下流データセット定義でspark.read.table("temp_view_name")を使用してこのビューを参照できます。次の構文はこのパターンを示しています。
from pyspark import pipelines as dp
@dp.temporary_view()
def a():
return spark.read.table("source").filter(...)
@dp.materialized_view()
def b():
return spark.read.table("a").groupBy(...)
@dp.materialized_view()
def c():
return spark.read.table("a").groupBy(...)
これにより、パイプラインの計画中にパイプラインがビュー内の変換を完全に認識できるようになり、データセット定義外で実行される任意の Python コードに関連する潜在的な問題を回避できます。
関数内では、次の例のように、増分結果をビュー、マテリアライズドビュー、またはストリーミングテーブルとして書き込むことなく、 データフレーム を連鎖させて新しい データフレーム を作成できます。
from pyspark import pipelines as dp
@dp.table()
def multiple_transformations():
df1 = spark.read.table("source").filter(...)
df2 = df1.groupBy(...)
return df2.filter(...)
すべての データフレーム バッチロジックを使用して初期読み取りを実行する場合、戻り値は静的な データフレームになります。 ストリーミングしているクエリがある場合、返される結果はストリーミング データフレーム です。
データフレーム を返す
@dp.tableを使用して、ストリーミング読み取りの結果からストリーミング テーブルを作成します。 @dp.materialized_viewを使用して、バッチ読み取りの結果からマテリアライズドビューを作成します。 他のほとんどのデコレーターはストリーミング DataFrame と静的DataFrames両方で動作しますが、一部のデコレーターはストリーミングDataFrame必要とします。
データセットを定義するために使用される関数は、Spark DataFrame を返す必要があります。パイプライン データセット コードの一部として、ファイルやテーブルに保存または書き込むメソッドを使用しないでください。
パイプライン コードで使用すべきでない Apache Spark 操作の例:
collect()count()toPandas()save()saveAsTable()start()toTable()
パイプラインは、データセット定義関数に Spark 上の Pandas を使用することもサポートしています。Spark 上の Pandas API を参照してください。
PythonパイプラインでSQLを使用する
PySpark は、SQL を使用して DataFrame コードを記述するためのspark.sql演算子をサポートしています。このパターンをパイプライン ソース コードで使用すると、マテリアライズドビューまたはストリーミング テーブルにコンパイルされます。
次のコード例は、データセット クエリ ロジックにspark.read.table("catalog_name.schema_name.table_name")使用するのと同じです。
@dp.materialized_view
def my_table():
return spark.sql("SELECT * FROM catalog_name.schema_name.table_name")
dlt.readとdlt.read_stream (レガシー)
古いdltモジュールには、従来のパイプライン公開モードの機能をサポートするために導入されたdlt.read()関数とdlt.read_stream()関数が含まれています。これらのメソッドはサポートされていますが、Databricks では次の理由から、常にspark.read.table()とspark.readStream.table()関数を使用することをお勧めします。
dlt関数は、現在のパイプラインの外部で定義されたデータセットの読み取りを限定的にサポートしています。spark関数は、読み取り操作にskipChangeCommitsなどのオプションを指定することをサポートしています。オプションの指定はdlt関数ではサポートされていません。dltモジュール自体はpyspark.pipelinesモジュールに置き換えられました。Databricks では、Python でパイプライン コードを記述するときに使用するために、from pyspark import pipelines as dpを使用してpyspark.pipelinesをインポートすることをお勧めします。