Pythonでパイプラインコードを開発する
Lakeflow 宣言型パイプラインでは、パイプラインでマテリアライズドビューとストリーミングテーブルを定義するための新しい Python コードコンストラクトがいくつか導入されています。 Python パイプラインの開発サポートは、 PySpark DataFrame と構造化ストリーミング APIsの基本に基づいています。
Pythonやデータフレームに詳しくないユーザーには、SQL インターフェイスを使用することをお勧めします。SQL を使用したパイプライン コードの開発を参照してください。
Lakeflow 宣言型パイプライン Python構文の完全なリファレンスについては、「Lakeflow 宣言型パイプライン Python 言語リファレンス」を参照してください。
パイプライン開発のためのPythonの基礎
Python 宣言型パイプライン データセットを作成するコードは Lakeflow DataFrames.
すべてのLakeFlow宣言型パイプラインPython APIs 、 pyspark.pipelines
モジュールに実装されています。 Pythonで実装されたLakeFlow宣言型パイプライン コードは、 Pythonソースの先頭にあるpipelines
モジュールを明示的にインポートする必要があります。 この例では、次のインポート コマンドを使用し、例のdp
を使用してpipelines
を参照します。
from pyspark import pipelines as dp
pyspark
の公開オープンソース バージョンには、 pipelines
モジュールも含まれています。コードの多くは、Databricks 内で使用されているバージョンと互換性があります。ただし、 pipelines
の Databricks バージョンには、OSS pyspark
と互換性のない機能がいくつかあります。次の機能は互換性がありません:
dp.create_auto_cdc_flow
dp.create_auto_cdc_from_snapshot_flow
@dp.expect(...)
@dp.temporary_view
読み取りと書き込みは、パイプライン構成時に指定されたカタログとスキーマにデフォルト設定されます。「ターゲット カタログとスキーマを設定する」を参照してください。
Lakeflow 宣言型パイプライン固有の Python コードは、他の種類の Python コードと 1 つの重要な点で異なります。 Python パイプライン コードでは、データ取り込みと変換を実行して 宣言型パイプライン データセットを作成する関数を直接呼び出 Lakeflow 。 代わりに、宣言型パイプライン Lakeflow 、パイプラインで構成されたすべてのソース コード ファイル内の dp
モジュールからデコレーター関数を解釈し、データフロー グラフを構築します。
パイプラインの実行時に予期しない動作を回避するには、データセットを定義する関数に副作用をもたらす可能性のあるコードを含めないでください。詳細については、 Python リファレンスを参照してください。
Pythonでマテリアライズドビューまたはストリーミングテーブルを作成する
@dp.table
を使用して、ストリーミング読み取りの結果からストリーミング テーブルを作成します。 @dp.materialized_view
を使用して、バッチ読み取りの結果からマテリアライズドビューを作成します。
マテリアライズドビューとストリーミングテーブルの名前は関数名から推測されています。 次のコード例は、マテリアライズドビューとストリーミング テーブルを作成するための基本的な構文を示しています。
どちらの関数もsamples
カタログ内の同じテーブルを参照し、同じデコレータ関数を使用します。これらの例は、マテリアライズドビューとストリーミング テーブルの基本構文の唯一の違いが、 spark.read
とspark.readStream
使用であることを強調しています。
すべてのデータソースがストリーミング読み取りをサポートしているわけではありません。 一部のデータソースは常にストリーミング セマンティクスで処理する必要があります。
from pyspark import pipelines as dp
@dp.materialized_view()
def basic_mv():
return spark.read.table("samples.nyctaxi.trips")
@dp.table()
def basic_st():
return spark.readStream.table("samples.nyctaxi.trips")
オプションで、 @dp.table
デコレータのname
引数を使用してテーブル名を指定できます。次の例は、マテリアライズドビューとストリーミング テーブルのこのパターンを示しています。
from pyspark import pipelines as dp
@dp.materialized_view(name = "trips_mv")
def basic_mv():
return spark.read.table("samples.nyctaxi.trips")
@dp.table(name = "trips_st")
def basic_st():
return spark.readStream.table("samples.nyctaxi.trips")
オブジェクトストレージからデータをロードする
Lakeflow 宣言型パイプラインは、 Databricksでサポートされているすべての形式からのデータの読み込みをサポートしています。 データ形式のオプションを参照してください。
これらの例では、ワークスペースに自動的にマウントされる /databricks-datasets
で使用可能なデータを使用します。 Databricks では、ボリューム パスまたはクラウド URI を使用して、クラウド オブジェクト ストレージに格納されているデータを参照することをお勧めします。 Unity Catalogボリュームとはを参照してください。
Databricks 、クラウド オブジェクト ストレージに保存されているデータに対して増分取り込みワークロードを構成する場合、 Auto Loaderとストリーミング テーブルを使用することをお勧めします。 「Auto Loader とは何ですか?」を参照してください。
次の例では、Auto Loaderを使用してJSONファイルからストリーミングテーブルを作成します。
from pyspark import pipelines as dp
@dp.table()
def ingestion_st():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders")
)
次の例では、バッチセマンティクスを使用して JSON ディレクトリを読み取り、マテリアライズドビューを作成します。
from pyspark import pipelines as dp
@dp.materialized_view()
def batch_mv():
return spark.read.format("json").load("/databricks-datasets/retail-org/sales_orders")
エクスペクテーションでデータを検証
エクスペクテーションを使用して、データ品質の制約を設定および適用できます。 パイプラインのエクスペクテーションを使用してデータ品質を管理するを参照してください。
次のコードは、 @dp.expect_or_drop
を使用して、データ取り込み中に null のレコードを削除するvalid_data
という名前の期待値を定義します。
from pyspark import pipelines as dp
@dp.table()
@dp.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders_valid():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders")
)
パイプラインで定義されたマテリアライズドビューとストリーミングテーブルをクエリする
次の例では、4 つのデータセットを定義します。
- JSONデータをロードする
orders
という名前のストリーミング テーブル。 - CSVデータをロードする
customers
という名前のマテリアライズドビュー。 orders
データセットとcustomers
データセットのレコードを結合し、注文タイムスタンプを日付にキャストし、customer_id
、order_number
、state
、order_date
の各フィールドを選択するcustomer_orders
という名前のマテリアライズドビュー- 各州の日次注文数を集計する
daily_orders_by_state
という名前のマテリアライズドビュー
パイプライン内のビューまたはテーブルをクエリする場合、カタログとスキーマを直接指定することも、パイプラインで構成されたデフォルトを使用することもできます。この例では、パイプラインに構成されたデフォルトのカタログとスキーマから、 orders
、 customers
、およびcustomer_orders
テーブルが書き込まれ、読み取られます。
レガシー公開モードは、 LIVE
スキーマを使用して、パイプラインに定義されている他のマテリアライズドビューとストリーミング テーブルをクエリします。 新しいパイプラインでは、 LIVE
スキーマ構文は暗黙的に無視されます。LIVE スキーマ (レガシー)を参照してください。
from pyspark import pipelines as dp
from pyspark.sql.functions import col
@dp.table()
@dp.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders")
)
@dp.materialized_view()
def customers():
return spark.read.format("csv").option("header", True).load("/databricks-datasets/retail-org/customers")
@dp.materialized_view()
def customer_orders():
return (spark.read.table("orders")
.join(spark.read.table("customers"), "customer_id")
.select("customer_id",
"order_number",
"state",
col("order_datetime").cast("int").cast("timestamp").cast("date").alias("order_date"),
)
)
@dp.materialized_view()
def daily_orders_by_state():
return (spark.read.table("customer_orders")
.groupBy("state", "order_date")
.count().withColumnRenamed("count", "order_count")
)
for
ループでテーブルを作成する
Python for
ループを使用して、プログラムで複数のテーブルを作成できます。これは、ほんのわずかしか変化しないデータ ソースまたはターゲット データセットが多数あり、その結果、維持するコードの総量が減り、コードの冗長性が少なくなる場合に便利です。
for
ループはロジックを順番に評価しますが、データセットの計画が完了すると、パイプラインはロジックを並列に実行します。
このパターンを使用してデータセットを定義する場合は、 for
ループに渡される値のリストが常に追加的であることを確認してください。パイプラインで以前に定義されたデータセットが将来のパイプライン実行から省略された場合、そのデータセットはターゲット スキーマから自動的に削除されます。
次の例では、顧客注文を地域別にフィルターする 5 つのテーブルを作成します。ここで、リージョン名は、ターゲットのマテリアライズドビューの名前を設定し、ソース データをフィルタリングするために使用されます。 一時ビューは、最終的なマテリアライズドビューの構築に使用されるソース テーブルからの結合を定義するために使用されます。
from pyspark import pipelines as dp
from pyspark.sql.functions import collect_list, col
@dp.temporary_view()
def customer_orders():
orders = spark.read.table("samples.tpch.orders")
customer = spark.read.table("samples.tpch.customer")
return (orders.join(customer, orders.o_custkey == customer.c_custkey)
.select(
col("c_custkey").alias("custkey"),
col("c_name").alias("name"),
col("c_nationkey").alias("nationkey"),
col("c_phone").alias("phone"),
col("o_orderkey").alias("orderkey"),
col("o_orderstatus").alias("orderstatus"),
col("o_totalprice").alias("totalprice"),
col("o_orderdate").alias("orderdate"))
)
@dp.temporary_view()
def nation_region():
nation = spark.read.table("samples.tpch.nation")
region = spark.read.table("samples.tpch.region")
return (nation.join(region, nation.n_regionkey == region.r_regionkey)
.select(
col("n_name").alias("nation"),
col("r_name").alias("region"),
col("n_nationkey").alias("nationkey")
)
)
# Extract region names from region table
region_list = spark.read.table("samples.tpch.region").select(collect_list("r_name")).collect()[0][0]
# Iterate through region names to create new region-specific materialized views
for region in region_list:
@dp.materialized_view(name=f"{region.lower().replace(' ', '_')}_customer_orders")
def regional_customer_orders(region_filter=region):
customer_orders = spark.read.table("customer_orders")
nation_region = spark.read.table("nation_region")
return (customer_orders.join(nation_region, customer_orders.nationkey == nation_region.nationkey)
.select(
col("custkey"),
col("name"),
col("phone"),
col("nation"),
col("region"),
col("orderkey"),
col("orderstatus"),
col("totalprice"),
col("orderdate")
).filter(f"region = '{region_filter}'")
)
以下は、このパイプラインのデータ フロー グラフの例です。
トラブルシューティング: for
ループは同じ値を持つテーブルを多数作成します
パイプラインが Python コードを評価するのに使用する遅延実行モデルでは、 @dp.materialized_view()
で装飾された関数が呼び出されたときにロジックが個々の値を直接参照する必要があります。
次の例は、 for
ループを使用してテーブルを定義する 2 つの正しい方法を示しています。どちらの例でも、 tables
リストの各テーブル名は、 @dp.materialized_view()
で装飾された関数内で明示的に参照されます。
from pyspark import pipelines as dp
# Create a parent function to set local variables
def create_table(table_name):
@dp.materialized_view(name=table_name)
def t():
return spark.read.table(table_name)
tables = ["t1", "t2", "t3"]
for t_name in tables:
create_table(t_name)
# Call `@dp.materialized_view()` within a for loop and pass values as variables
tables = ["t1", "t2", "t3"]
for t_name in tables:
@dp.materialized_view(name=t_name)
def create_table(table_name=t_name):
return spark.read.table(table_name)
次の例では、値を正しく 参照していません 。 この例では、異なる名前のテーブルを作成しますが、すべてのテーブルは for
ループの最後の値からデータを読み込みます。
from pyspark import pipelines as dp
# Don't do this!
tables = ["t1", "t2", "t3"]
for t_name in tables:
@dp.materialized(name=t_name)
def create_table():
return spark.read.table(t_name)
マテリアライズドビューまたはストリーミングテーブルからレコードを完全に削除する
GDPRコンプライアンスなどの場合、削除を有効にしてマテリアライズドビューまたはストリーミング テーブルからレコードを永久に削除するには、オブジェクトの基礎となるDeltaテーブルに対して追加の操作を実行する必要があります。 マテリアライズドビューからレコードを確実に削除するには、 「削除を有効にしてマテリアライズドビューからレコードを完全に削除する」を参照してください。 ストリーミング テーブルからレコードを確実に削除するには、 「ストリーミング テーブルからレコードを完全に削除する」を参照してください。