メインコンテンツまでスキップ

Python を使用したパイプライン コードの開発

DLT では、パイプラインでマテリアライズドビューとストリーミングテーブルを定義するための新しい Python コード コンストラクトがいくつか導入されています。 Python パイプラインの開発サポートは、 PySpark データフレーム と構造化ストリーミング APIsの基本に基づいています。

Pythonやデータフレームに詳しくないユーザーには、SQL インターフェイスを使用することをお勧めします。SQL を使用したパイプライン コードの開発を参照してください。

DLT Python 構文の完全なリファレンスについては、「 DLT Python 言語リファレンス」を参照してください。

パイプライン開発のためのPythonの基本

DLT データセットを作成するPythonコードは、データフレームを返却しなくてはなりません。

すべてのDLT Python API はdltモジュールに実装されています。 Python で実装された DLT パイプライン コードでは、Python ノートブックとファイルの先頭にある dlt モジュールを明示的にインポートする必要があります。

パイプラインの構成中に指定されたカタログとスキーマに対してデフォルトを読み書きします。 ターゲット・カタログとスキーマの設定を参照してください。

DLT 固有の Python コードは、他のタイプの Python コードと 1 つの重要な点で異なります。 Python パイプライン コードは、データ取り込みと変換を実行して DLT データセットを作成する関数を直接呼び出しません。 代わりに、DLT は、パイプラインで構成されたすべてのソース コード ファイル内の dlt モジュールからデコレータ関数を解釈し、データフロー グラフを構築します。

important

パイプラインの実行時に予期しない動作が発生しないように、データセットを定義する関数に副作用がある可能性のあるコードを含めないでください。 詳細については、 Python リファレンスを参照してください。

Pythonを使用してマテリアライズドビューまたはストリーミングテーブルを作成する

@dlt.table デコレーターは、関数によって返された結果に基づいてマテリアライズドビュー またはストリーミングテーブルを作成するように DLT に指示します。バッチ読み取りの結果はマテリアライズドビューを作成し、ストリーミング読み取りの結果はストリーミングテーブルを作成します。

デフォルトでは、マテリアライズドビューとストリーミングテーブルの名前は関数名から推論されます。 次のコード例は、マテリアライズドビューとストリーミングテーブルを作成するための基本的な構文を示しています。

注記

どちらの関数も、 samples カタログ内の同じテーブルを参照し、同じデコレータ関数を使用します。 これらの例は、マテリアライズドビューとストリーミングテーブルの基本的な構文の唯一の違いは、 spark.readspark.readStreamを使用することであることを示しています。

すべてのデータソースがストリーミング読み取りをサポートしているわけではありません。 一部のデータソースは、常にストリーミング セマンティクスを使用して処理する必要があります。

Python
import dlt

@dlt.table()
def basic_mv():
return spark.read.table("samples.nyctaxi.trips")

@dlt.table()
def basic_st():
return spark.readStream.table("samples.nyctaxi.trips")

必要に応じて、@dlt.tableデコレータの name 引数を使用してテーブル名を指定できます。次の例は、マテリアライズドビューとストリーミングテーブルのこのパターンを示しています。

Python
import dlt

@dlt.table(name = "trips_mv")
def basic_mv():
return spark.read.table("samples.nyctaxi.trips")

@dlt.table(name = "trips_st")
def basic_st():
return spark.readStream.table("samples.nyctaxi.trips")

オブジェクトストレージからのデータの読み込み

DLT は、Databricks でサポートされているすべての形式からのデータの読み込みをサポートしています。データ形式のオプションを参照してください。

注記

これらの例では、ワークスペースに自動的にマウントされる /databricks-datasets で使用可能なデータを使用します。 Databricks では、ボリューム パスまたはクラウド URI を使用して、クラウド オブジェクト ストレージに格納されているデータを参照することをお勧めします。 Unity Catalogボリュームとはを参照してください。

Databricks は、クラウド オブジェクト ストレージに格納されているデータに対して増分インジェスト ワークロードを構成する場合は、 Auto Loader とストリーミング テーブルを使用することをお勧めします。 「Auto Loaderとは」を参照してください。

次の例では、Auto Loaderを使用してJSONファイルからストリーミングテーブルを作成します。

Python
import dlt

@dlt.table()
def ingestion_st():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders")
)

次の例では、バッチセマンティクスを使用して JSON ディレクトリを読み取り、マテリアライズドビューを作成します。

Python
import dlt

@dlt.table()
def batch_mv():
return spark.read.format("json").load("/databricks-datasets/retail-org/sales_orders")

エクスペクテーションでデータを検証

エクスペクテーションを使用して、データ品質の制約を設定および適用できます。 パイプラインのエクスペクテーションを使用してデータ品質を管理するを参照してください。

次のコードでは、 @dlt.expect_or_drop を使用して、データ取り込み中に null のレコードを削除する valid_data という名前のエクスペクテーションを定義します。

Python
import dlt

@dlt.table()
@dlt.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders_valid():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders")
)

パイプラインで定義されたマテリアライズドビューとストリーミングテーブルのクエリ

次の例では、4 つのデータセットを定義しています。

  • JSON データをロードする orders という名前のストリーミングテーブル。
  • CSV データをロードする customers という名前のマテリアライズドビュー。
  • orders データセットと customers データセットのレコードを結合し、注文タイムスタンプを日付にキャストし、customer_idorder_numberstateorder_date の各フィールドを選択する customer_orders という名前のマテリアライズドビュー
  • 各州の日次注文数を集計する daily_orders_by_state という名前のマテリアライズドビュー
注記

パイプライン内のビューまたはテーブルに対してクエリを実行する場合は、カタログとスキーマを直接指定することも、パイプラインで設定されたデフォルトを使用することもできます。 この例では、 orderscustomers、および customer_orders テーブルは、パイプラインに設定されたデフォルトのカタログとスキーマから書き込みと読み取りが行われます。

従来のパブリッシング モードでは、 LIVE スキーマを使用して、パイプラインで定義されている他のマテリアライズドビュー とストリーミング テーブルに対してクエリを実行します。 新しいパイプラインでは、 LIVE スキーマ構文は警告なしで無視されます。 LIVE スキーマ (レガシー)を参照してください。

Python
import dlt
from pyspark.sql.functions import col

@dlt.table()
@dlt.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders")
)

@dlt.table()
def customers():
return spark.read.format("csv").option("header", True).load("/databricks-datasets/retail-org/customers")

@dlt.table()
def customer_orders():
return (spark.read.table("orders")
.join(spark.read.table("customers"), "customer_id")
.select("customer_id",
"order_number",
"state",
col("order_datetime").cast("int").cast("timestamp").cast("date").alias("order_date"),
)
)

@dlt.table()
def daily_orders_by_state():
return (spark.read.table("customer_orders")
.groupBy("state", "order_date")
.count().withColumnRenamed("count", "order_count")
)

forループでのテーブルの作成

Python for ループを使用して、プログラムで複数のテーブルを作成できます。 これは、いくつかのパラメーターのみが異なる多数のデータソースまたはターゲット データセットがあり、その結果、維持するコードの合計が少なくなり、コードの冗長性が減る場合に便利です。

for ループはロジックを順番に評価しますが、データセットの計画が完了すると、パイプラインはロジックを並列で実行します。

important

このパターンを使用してデータセットを定義する場合は、 for ループに渡される値のリストが常に加算的であることを確認してください。 パイプラインで以前に定義されたデータセットが将来のパイプライン実行から省略された場合、そのデータセットはターゲット スキーマから自動的に削除されます。

次の例では、顧客の注文を地域別にフィルター処理する 5 つのテーブルを作成します。 ここでは、リージョン名を使用して、ターゲットのマテリアライズドビューの名前を設定し、ソースデータをフィルタリングします。 テンポラリ・ビューは、最終的なマテリアライズドビューの構築に使用されるソース・テーブルからのジョインを定義するために使用されます。

Python
import dlt
from pyspark.sql.functions import collect_list, col

@dlt.view()
def customer_orders():
orders = spark.read.table("samples.tpch.orders")
customer = spark.read.table("samples.tpch.customer")

return (orders.join(customer, orders.o_custkey == customer.c_custkey)
.select(
col("c_custkey").alias("custkey"),
col("c_name").alias("name"),
col("c_nationkey").alias("nationkey"),
col("c_phone").alias("phone"),
col("o_orderkey").alias("orderkey"),
col("o_orderstatus").alias("orderstatus"),
col("o_totalprice").alias("totalprice"),
col("o_orderdate").alias("orderdate"))
)

@dlt.view()
def nation_region():
nation = spark.read.table("samples.tpch.nation")
region = spark.read.table("samples.tpch.region")

return (nation.join(region, nation.n_regionkey == region.r_regionkey)
.select(
col("n_name").alias("nation"),
col("r_name").alias("region"),
col("n_nationkey").alias("nationkey")
)
)

# Extract region names from region table

region_list = spark.read.table("samples.tpch.region").select(collect_list("r_name")).collect()[0][0]

# Iterate through region names to create new region-specific materialized views

for region in region_list:

@dlt.table(name=f"{region.lower().replace(' ', '_')}_customer_orders")
def regional_customer_orders(region_filter=region):

customer_orders = spark.read.table("customer_orders")
nation_region = spark.read.table("nation_region")

return (customer_orders.join(nation_region, customer_orders.nationkey == nation_region.nationkey)
.select(
col("custkey"),
col("name"),
col("phone"),
col("nation"),
col("region"),
col("orderkey"),
col("orderstatus"),
col("totalprice"),
col("orderdate")
).filter(f"region = '{region_filter}'")
)

このパイプラインのデータ フロー グラフの例を次に示します。

5 つの地域テーブルにつながる 2 つのビューのデータ フロー グラフ。

トラブルシューティング: ループ for 同じ値を持つ多数のテーブルを作成します

パイプラインが Python コードの評価に使用する遅延実行モデルでは、 @dlt.table() によってデコレートされた関数が呼び出されたときに、ロジックが個々の値を直接参照する必要があります。

次の例は、 for ループを使用してテーブルを定義する 2 つの正しい方法を示しています。 どちらの例でも、 tables リストの各テーブル名は、 @dlt.table()で修飾された関数内で明示的に参照されます。

Python
import dlt

# Create a parent function to set local variables

def create_table(table_name):
@dlt.table(name=table_name)
def t():
return spark.read.table(table_name)

tables = ["t1", "t2", "t3"]
for t_name in tables:
create_table(t_name)

# Call `@dlt.table()` within a for loop and pass values as variables

tables = ["t1", "t2", "t3"]
for t_name in tables:

@dlt.table(name=t_name)
def create_table(table_name=t_name):
return spark.read.table(table_name)

次の例では、値を正しく 参照していません 。 この例では、異なる名前のテーブルを作成しますが、すべてのテーブルは for ループの最後の値からデータを読み込みます。

Python
import dlt

# Don't do this!

tables = ["t1", "t2", "t3"]
for t_name in tables:

@dlt.table(name=t_name)
def create_table():
return spark.read.table(t_name)