メインコンテンツまでスキップ

パイプラインでデータを変換する

この記事では Lakeflow 宣言型パイプラインを使用してデータセットの変換を宣言し、クエリ ロジックを通じてレコードの処理方法を指定する方法について説明します。 また、 Lakeflow 宣言型パイプラインを構築するための一般的な変換パターンの例も含まれています。

DataFrame を返す任意のクエリに対してデータセットを定義できます。Apache Spark組み込みオペレーション、UDF、カスタムロジック、およびMLflowモデルを Lakeflow宣言型パイプラインの変換として使用できます。データがパイプラインに取り込まれたら、アップストリーム ソース に対して新しいデータセットを定義して、新しいストリーミングテーブル、マテリアライズドビュー、およびビューを作成できます。

Lakeflow宣言型パイプラインを使用してステートフル処理を効果的に実行する方法についてはウォーターマークを使用して宣言型パイプラインでステートフル 処理を最適化するLakeflow を参照してください。

ビュー、マテリアライズドビュー、ストリーミングテーブルを使用する場合

パイプライン クエリを実装するときは、効率的で保守しやすいように最適なデータセット タイプを選択します。

次のことを実行するにはビューの使用を検討してください。

  • 必要な大規模または複雑なクエリを、管理しやすいクエリに分割します。
  • エクスペクテーションを使用して中間結果を検証します。
  • 永続化する必要のない結果のストレージとコンピュートのコストを削減します。 テーブルはマテリアライズドされるため、追加の計算リソースとストレージ リソースが必要になります。

マテリアライズドビューは、次の場合に使用を検討してください。

  • 複数のダウンストリーム クエリがテーブルを消費します。ビューはオンデマンドでコンピュートされるため、ビューがクエリされるたびにビューが再コンピュートされます。
  • 他のパイプライン、ジョブ、またはクエリがテーブルを使用します。ビューはマテリアライズされないため、同じパイプライン内でのみ使用できます。
  • 開発中にクエリの結果を表示したい。テーブルはマテリアライズされ、パイプラインの外部で表示およびクエリできるため、開発中にテーブルを使用すると計算の正確性を検証するのに役立ちます。検証後、マテリアライズを必要としないクエリをビューに変換します。

次の場合にはストリーミング テーブルの使用を検討してください。

  • クエリーは、継続的または段階的に増加するデータソースに対して定義されます。
  • クエリ結果は段階的にコンピュートされる必要があります。
  • パイプラインには高いスループットと低いレイテンシが必要です。
注記

ストリーミングテーブルは、常にストリーミング ソースに対して定義されます。 また、 AUTO CDC ... INTO でストリーミング ソースを使用して、CDC フィードからの更新を適用することもできます。AUTO CDC API: Lakeflow 宣言型パイプラインでチェンジデータキャプチャをシンプルにを参照してください。

ターゲットスキーマからテーブルを除外する

外部消費を目的としない中間テーブルを計算する必要がある場合は、 TEMPORARY キーワードを使用して、中間テーブルがスキーマにパブリッシュされないようにすることができます。一時テーブルは、宣言型パイプラインのセマンティクスに従ってデータを格納および処理 Lakeflow が、現在のパイプラインの外部からアクセスしないでください。 一時テーブルは、それを作成するパイプラインの有効期間中保持されます。次の構文を使用して、テンポラリ・テーブルを宣言します。

SQL
CREATE TEMPORARY STREAMING TABLE temp_table
AS SELECT ... ;

ストリーミングテーブルとマテリアライズドビューを単一のパイプラインに結合

ストリーミング テーブルは、 Apache Spark構造化ストリーミングの処理保証を継承し、追加専用のデータ ソースからのクエリを処理するように構成されています。新しい行は変更されるのではなく、常にソース テーブルに挿入されます。

注記

当然のことながら、ストリーミング テーブルには追加専用のデータ ソースが必要ですが、ストリーミング ソースが更新または削除を必要とする別のストリーミング テーブルである場合は、 skipChangeCommits フラグを使用してこの動作をオーバーライドできます。

一般的なストリーミング パターンでは、ソース データを取り込んでパイプラインの初期データセットを作成します。これらの初期データセットは一般にブロンズ テーブルと呼ばれ、多くの場合、単純な変換を実行します。

対照的に、パイプラインの最終テーブル (一般にゴールド テーブルと呼ばれる) では、複雑な集計やAUTO CDC ... INTO操作のターゲットからの読み取りが必要になることがよくあります。これらの操作は本質的に追加ではなく更新を作成するため、ストリーミング テーブルへの入力としてはサポートされていません。 これらの変換はマテリアライズドビューにより適しています。

ストリーミング テーブルとマテリアライズドビューを 1 つのパイプラインに混在させることで、パイプラインを簡素化し、コストのかかる生データの再取り込みや再処理を回避し、効率的にエンコードおよびフィルター処理されたデータセットに対して複雑な集計を行うための SQL を最大限に活用できます。 次の例は、このタイプの混合処理を示しています。

注記

これらの例では、 Auto Loader を使用してクラウド ストレージからファイルを読み込みます。 Auto LoaderUnity Catalog対応パイプラインで を含むファイルをロードするには、外部ロケーション を使用する必要があります。宣言型パイプラインで を使用する方法の詳細については、「Unity CatalogLakeflow宣言型パイプラインで Unity Catalogを使用するLakeflow 」を参照してください。

Python
@dp.table
def streaming_bronze():
return (
# Since this is a streaming source, this table is incremental.
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("s3://path/to/raw/data")
)

@dp.table
def streaming_silver():
# Since we read the bronze table as a stream, this silver table is also
# updated incrementally.
return spark.readStream.table("streaming_bronze").where(...)

@dp.materialized_view
def live_gold():
# This table will be recomputed completely by reading the whole silver table
# when it is updated.
return spark.read.table("streaming_silver").groupBy("user_id").count()

Auto Loaderを使用して S3 から JSON ファイルを段階的に取り込む方法の詳細について説明します。

ストリーム静的結合

Stream-static 結合は、主に静的ディメンション テーブルを使用して追加専用データの連続ストリームを非正規化する場合に適しています。

パイプラインが更新されるたびに、ストリームからの新しいレコードが静的テーブルの最新のスナップショットに結合されます。ストリーミング テーブルからの対応するデータが処理された後に静的テーブルでレコードが追加または更新された場合、完全な更新が実行されない限り、結果のレコードは再計算されません。

トリガー実行用に構成されたパイプラインでは、静的テーブルは更新が開始された時点の結果を返します。継続実行用に構成されたパイプラインでは、テーブルが更新を処理するたびに、静的テーブルの最新バージョンがクエリされます。

以下はストリーム静的結合の例です。

Python
@dp.table
def customer_sales():
return spark.readStream.table("sales").join(spark.read.table("customers"), ["customer_id"], "left")

集計を効率的に計算する

ストリーミング テーブルを使用すると、カウント、最小、最大、合計などの単純な分散集計や、平均や標準偏差などの代数集計を段階的に計算できます。 Databricks では、 GROUP BY country句を含むクエリなど、グループの数が限られているクエリに対して増分集計を推奨しています。更新のたびに新しい入力データのみが読み取られます。

増分集計を実行する Lakeflow 宣言型パイプライン クエリの記述の詳細については、「 ウォーターマークを使用したウィンドウ集計の実行」を参照してください。

宣言型パイプラインでMLflowモデルを使用するLakeflow

注記

Unity カタログ対応のパイプラインでMLflowモデルを使用するには、パイプラインがpreviewチャンネルを使用するように構成されている必要があります。 currentチャンネルを使用するには、 Hive metastoreに公開するようにパイプラインを構成する必要があります。

MLflow でトレーニングされたモデルは、宣言型パイプラインで使用できます Lakeflow 。 MLflow モデルは Databricks では変換として扱われ、Spark DataFrame 入力に作用し、結果を Spark DataFrame として返します。Lakeflow宣言型パイプライン に対してデータセットを定義するため、DataFrames Apache Sparkを使用する ワークロードをMLflow Lakeflow宣言型パイプラインに変換 わずか数行のコードで実行できます。MLflow の詳細については、「 ML モデルのライフサイクルに関する MLflow」を参照してください。

MLflowモデルを呼び出すPythonスクリプトがすでにある場合は、 @dp.tableまたは@dp.materialized_viewデコレーターを使用し、変換結果を返すように関数が定義されていることを確認することで、このコードをLakeFlow宣言型パイプラインに適合させることができます。 LakeFlow宣言型パイプラインはMLflowを当然インストールしないので、MLFlowライブラリを%pip install mlflowでインストールし、ソースの先頭にmlflowdpインポートしていることを確認してください。 LakeFlow宣言型パイプライン構文の概要については、 Pythonを使用したパイプライン コードの開発」を参照してください。

MLflowLakeflow宣言型パイプラインで モデルを使用するには、次の手順を実行します。

  1. MLflow モデルの実行 ID とモデル名を取得します。実行 ID とモデル名は、MLflow モデルの URI を構築するために使用されます。
  2. URI を使用して Spark UDF を定義し、MLflow モデルをロードします。
  3. MLflow モデルを使用するには、テーブル定義で UDF を呼び出します。

次の例は、このパターンの基本的な構文を示しています。

Python
%pip install mlflow

from pyspark import pipelines as dp
import mlflow

run_id= "<mlflow-run-id>"
model_name = "<the-model-name-in-run>"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)

@dp.materialized_view
def model_predictions():
return spark.read.table(<input-data>)
.withColumn("prediction", loaded_model_udf(<model-features>))

完全な例として、次のコードは、ローン リスク データでトレーニングされた MLflow モデルを読み込むloaded_model_udfという名前の Spark UDF を定義します。予測を行うために使用されるデータ列は、引数として UDF に渡されます。テーブルloan_risk_predictionsは、 loan_risk_input_dataの各行の予測を計算します。

Python
%pip install mlflow

from pyspark import pipelines as dp
import mlflow
from pyspark.sql.functions import struct

run_id = "mlflow_run_id"
model_name = "the_model_name_in_run"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)

categoricals = ["term", "home_ownership", "purpose",
"addr_state","verification_status","application_type"]

numerics = ["loan_amnt", "emp_length", "annual_inc", "dti", "delinq_2yrs",
"revol_util", "total_acc", "credit_length_in_years"]

features = categoricals + numerics

@dp.materialized_view(
comment="GBT ML predictions of loan risk",
table_properties={
"quality": "gold"
}
)
def loan_risk_predictions():
return spark.read.table("loan_risk_input_data")
.withColumn('predictions', loaded_model_udf(struct(features)))

手動での削除や更新を保持する

Lakeflow 宣言型パイプラインを使用すると、テーブルからレコードを手動で削除または更新し、更新操作を実行してダウンストリーム テーブルを再計算できます。

By Default Lakeflow 宣言型パイプラインは、パイプラインが更新されるたびに入力データに基づいてテーブルの結果を再計算するため、削除されたレコードがソース データから再ロードされないようにする必要があります。 pipelines.reset.allowed table プロパティを false に設定すると、テーブルへの更新は防止されますが、テーブルへの増分書き込みや新しいデータがテーブルに流入するのを防ぐことはできません。

次の図は、2 つのストリーミング テーブルを使用する例を示しています。

  • raw_user_table ソースから生のユーザーデータを取り込みます。
  • bmi_table raw_user_tableからの体重と身長を使用して BMI スコアを段階的にコンピュートします。

raw_user_tableからユーザー レコードを手動で削除または更新し、 bmi_tableを再計算します。

データ図を保持する

次のコードは、 pipelines.reset.allowedテーブル プロパティをfalseに設定してraw_user_tableの完全更新を無効にし、意図した変更が長期間保持されるようにし、パイプラインの更新が実行されると下流のテーブルが再計算されるようにする方法を示しています。

SQL
CREATE OR REFRESH STREAMING TABLE raw_user_table
TBLPROPERTIES(pipelines.reset.allowed = false)
AS SELECT * FROM STREAM read_files("/databricks-datasets/iot-stream/data-user", format => "csv");

CREATE OR REFRESH STREAMING TABLE bmi_table
AS SELECT userid, (weight/2.2) / pow(height*0.0254,2) AS bmi FROM STREAM(raw_user_table);