パイプラインによるデータの変換
この記事では LakeFlow 宣言型パイプラインを使用してデータセットの変換を宣言し、クエリ ロジックを通じてレコードの処理方法を指定する方法について説明します。 また、 LakeFlow 宣言型パイプラインを構築するための一般的な変換パターンの例も含まれています。
DataFrame を返す任意のクエリに対してデータセットを定義できます。Apache Spark組み込みオペレーション、UDF、カスタムロジック、およびMLflowモデルを LakeFlow宣言型パイプラインの変換として使用できます。データがパイプラインに取り込まれたら、アップストリーム ソース に対して新しいデータセットを定義して、新しいストリーミングテーブル、マテリアライズドビュー、およびビューを作成できます。
LakeFlow宣言型パイプラインを使用してステートフル処理を効果的に実行する方法についてはウォーターマークを使用して宣言型パイプラインでステートフル 処理を最適化するLakeFlow を参照してください。
ビュー、マテリアライズドビュー、ストリーミングテーブルを使用する場合
パイプラインクエリを実装するときは、効率的で保守性を確保するために、最適なデータセットタイプを選択します。
ビューを使用して次のことを行うことを検討してください。
- 必要な大規模または複雑なクエリを、管理しやすいクエリに分割します。
- エクスペクテーションを使用して中間結果を検証します。
- ストレージとコンピュートのコストを削減し、保持する必要のない結果を得ることができます。 テーブルはマテリアライズされるため、追加の計算リソースとストレージリソースが必要になります。
マテリアライズドビューは、次の場合に使用を検討してください。
- 複数のダウンストリームクエリがテーブルを消費します。 ビューはオンデマンドでコンピュートされるため、ビューがクエリされるたびにビューが再コンピュートされます。
- 他のパイプライン、ジョブ、またはクエリはテーブルを消費します。 ビューは具体化されていないため、同じパイプラインでのみ使用できます。
- 開発中にクエリの結果を表示する必要があります。 テーブルは具体化され、パイプラインの外部で表示およびクエリを実行できるため、開発中にテーブルを使用すると、計算の正確性を検証するのに役立ちます。 検証後、実体化を必要としないクエリをビューに変換します。
次の場合は、ストリーミングテーブルの使用を検討してください。
- クエリーは、継続的または段階的に増加するデータソースに対して定義されます。
- クエリ結果はインクリメンタルにコンピュートする必要があります。
- パイプラインには、高いスループットと低いレイテンシが必要です。
ストリーミングテーブルは、常にストリーミング ソースに対して定義されます。 また、 AUTO CDC ... INTO
でストリーミング ソースを使用して、CDC フィードからの更新を適用することもできます。「The AUTO CDC APIs: Simplify チェンジデータキャプチャ with LakeFlow 宣言型パイプライン」を参照してください。
ターゲットスキーマからテーブルを除外する
外部消費を目的としない中間テーブルを計算する必要がある場合は、 TEMPORARY
キーワードを使用して、中間テーブルがスキーマにパブリッシュされないようにすることができます。一時テーブルは、宣言型パイプラインのセマンティクスに従ってデータを格納および処理 LakeFlow が、現在のパイプラインの外部からアクセスしないでください。 一時テーブルは、それを作成するパイプラインの有効期間中保持されます。次の構文を使用して、テンポラリ・テーブルを宣言します。
- SQL
- Python
CREATE TEMPORARY STREAMING TABLE temp_table
AS SELECT ... ;
@dlt.table(
temporary=True)
def temp_table():
return ("...")
ストリーミングテーブルとマテリアライズドビューを 1 つのパイプラインに結合
ストリーミングテーブルは、 Apache Spark 構造化ストリーミングの処理保証を継承し、新しいローが変更されるのではなく、常にソーステーブルに挿入される追加専用データソースからのクエリを処理するように構成されています。
デフォルトでは、ストリーミングテーブルには追加専用データソースが必要ですが、ストリーミング ソースが更新または削除が必要な別のストリーミングテーブルである場合は、skipChangeCommits フラグを使用してこの動作をオーバーライドできます
一般的なストリーミング パターンでは、ソース データを取り込んでパイプラインに初期データセットを作成します。 これらの初期データセットは一般にブロンズ テーブルと呼ばれ、多くの場合、単純な変換を実行します。
対照的に、パイプライン内の最終テーブル (一般にゴールド テーブルと呼ばれます) では、多くの場合、複雑な集計や AUTO CDC ... INTO
操作のターゲットからの読み取りが必要になります。 これらのオペレーションは本質的に追加ではなく更新を作成するため、ストリーミングテーブルへの入力としてはサポートされていません。 これらの変換は、マテリアライズドビューに適しています。
ストリーミング テーブルとマテリアライズドビューを 1 つのパイプラインに混在させることで、パイプラインを簡素化し、コストのかかる生データの再取り込みや再処理を回避し、効率的にエンコードおよびフィルター処理されたデータセットに対して複雑な集計を行うための SQL を最大限に活用できます。 次の例は、このタイプの混合処理を示しています。
これらの例では、 Auto Loader を使用してクラウド ストレージからファイルを読み込みます。 Auto LoaderUnity Catalog対応パイプラインで を含むファイルをロードするには、外部ロケーション を使用する必要があります。宣言型パイプラインで を使用する方法の詳細については、「Unity CatalogLakeFlow宣言型パイプラインで Unity Catalogを使用するLakeFlow 」を参照してください。
- Python
- SQL
@dlt.table
def streaming_bronze():
return (
# Since this is a streaming source, this table is incremental.
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("gs://path/to/raw/data")
)
@dlt.table
def streaming_silver():
# Since we read the bronze table as a stream, this silver table is also
# updated incrementally.
return spark.readStream.table("streaming_bronze").where(...)
@dlt.table
def live_gold():
# This table will be recomputed completely by reading the whole silver table
# when it is updated.
return spark.readStream.table("streaming_silver").groupBy("user_id").count()
CREATE OR REFRESH STREAMING TABLE streaming_bronze
AS SELECT * FROM STREAM read_files(
"gs://path/to/raw/data",
format => "json"
)
CREATE OR REFRESH STREAMING TABLE streaming_silver
AS SELECT * FROM STREAM(streaming_bronze) WHERE...
CREATE OR REFRESH MATERIALIZED VIEW live_gold
AS SELECT count(*) FROM streaming_silver GROUP BY user_id
Auto LoaderJSONを使用して からGoogle Cloud Storage ファイルを段階的に取り込む方法について詳しくは、こちらをご覧ください。
ストリーム静的結合
Stream-static 結合は、主に静的ディメンション テーブルを使用して追加専用データの連続ストリームを非正規化する場合に適しています。
パイプラインが更新されるたびに、ストリームの新しいレコードが静的テーブルの最新のスナップショットと結合されます。 ストリーミングテーブルの対応するデータが処理された後に静的テーブルにレコードが追加または更新された場合、完全な更新が実行されない限り、結果のレコードは再計算されません。
トリガー実行用に構成されたパイプラインでは、静的テーブルは更新が開始された時点の結果を返します。 連続実行用に構成されたパイプラインでは、テーブルが更新を処理するたびに、静的テーブルの最新バージョンがクエリされます。
次に、ストリーム静的結合の例を示します。
- Python
- SQL
@dlt.table
def customer_sales():
return spark.readStream.table("sales").join(spark.readStream.table("customers"), ["customer_id"], "left")
CREATE OR REFRESH STREAMING TABLE customer_sales
AS SELECT * FROM STREAM(sales)
INNER JOIN LEFT customers USING (customer_id)
集計を効率的に計算
ストリーミングテーブルを使用して、count、min、max、sum などの単純な分布集計や、平均や標準偏差などの代数集計を増分的に計算できます。 Databricks では、 GROUP BY country
句を含むクエリなど、グループの数が制限されているクエリの増分集計をお勧めします。 更新のたびに、新しい入力データのみが読み取られます。
増分集計を実行する LakeFlow 宣言型パイプライン クエリの記述の詳細については、「 ウォーターマークを使用したウィンドウ集計の実行」を参照してください。
宣言型パイプラインでMLflowモデルを使用するLakeFlow
Unity Catalog 対応パイプラインで MLflow モデルを使用するには、preview
チャンネルを使用するようにパイプラインを構成する必要があります。 current
チャンネルを使用するには、 Hive metastoreにパブリッシュするようにパイプラインを設定する必要があります。
MLflow でトレーニングされたモデルは、宣言型パイプラインで使用できます LakeFlow 。 MLflow モデルは Databricks では変換として扱われ、Spark DataFrame 入力に作用し、結果を Spark DataFrame として返します。LakeFlow宣言型パイプライン に対してデータセットを定義するため、DataFrames Apache Sparkを使用する ワークロードをMLflow LakeFlow宣言型パイプラインに変換 わずか数行のコードで実行できます。MLflow の詳細については、「 ML モデルのライフサイクルに関する MLflow」を参照してください。
モデルを呼び出す Pythonノートブックが既にある場合は、MLflowLakeFlow @dlt.table
デコレーターを使用し 、変換結果を返すように関数が定義されていることを確認することで、このコードを宣言型パイプラインに適合させることができます。LakeFlow 宣言型パイプラインでは MLflow by デフォルトがインストールされていないため、 %pip install mlflow
と一緒に MLFlow ライブラリをインストールし、ノートブックの上部に mlflow
と dlt
をインポートしたことを確認してください。 LakeFlow 宣言型パイプライン構文の概要については、「 Pythonを使用したパイプラインコードの開発」を参照してください。
MLflowLakeFlow宣言型パイプラインで モデルを使用するには、次の手順を実行します。
- MLflow モデルの実行 ID とモデル名を取得します。 実行 ID とモデル名は、MLflow モデルの URI を構築するために使用されます。
- URI を使用して、MLflow モデルを読み込むための Spark UDF を定義します。
- MLflow モデルを使用するには、テーブル定義で UDF を呼び出します。
次の例は、このパターンの基本的な構文を示しています。
%pip install mlflow
import dlt
import mlflow
run_id= "<mlflow-run-id>"
model_name = "<the-model-name-in-run>"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)
@dlt.table
def model_predictions():
return spark.read.table(<input-data>)
.withColumn("prediction", loaded_model_udf(<model-features>))
完全な例として、次のコードでは、ローン リスク データでトレーニングされた MLflow モデルを読み込む loaded_model_udf
という名前の Spark UDF を定義しています。 予測を行うために使用されるデータ列は、引数として UDF に渡されます。 loan_risk_predictions
テーブルでは、loan_risk_input_data
の各行の予測が計算されます。
%pip install mlflow
import dlt
import mlflow
from pyspark.sql.functions import struct
run_id = "mlflow_run_id"
model_name = "the_model_name_in_run"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)
categoricals = ["term", "home_ownership", "purpose",
"addr_state","verification_status","application_type"]
numerics = ["loan_amnt", "emp_length", "annual_inc", "dti", "delinq_2yrs",
"revol_util", "total_acc", "credit_length_in_years"]
features = categoricals + numerics
@dlt.table(
comment="GBT ML predictions of loan risk",
table_properties={
"quality": "gold"
}
)
def loan_risk_predictions():
return spark.read.table("loan_risk_input_data")
.withColumn('predictions', loaded_model_udf(struct(features)))
手動による削除または更新を保持する
LakeFlow 宣言型パイプラインを使用すると、テーブルからレコードを手動で削除または更新し、更新操作を実行してダウンストリーム テーブルを再計算できます。
By Default LakeFlow 宣言型パイプラインは、パイプラインが更新されるたびに入力データに基づいてテーブルの結果を再計算するため、削除されたレコードがソース データから再ロードされないようにする必要があります。 pipelines.reset.allowed
table プロパティを false
に設定すると、テーブルへの更新は防止されますが、テーブルへの増分書き込みや新しいデータがテーブルに流入するのを防ぐことはできません。
次の図は、2 つのストリーミングテーブルを使用した例を示しています。
raw_user_table
ソースから生のユーザーデータを取り込みます。bmi_table
raw_user_table
からの体重と身長を使用してBMIスコアを段階的にコンピュートします。
raw_user_table
からユーザーレコードを手動で削除または更新し、bmi_table
を再計算する場合。
次のコードは、 pipelines.reset.allowed
テーブル プロパティを false
に設定して raw_user_table
の完全更新を無効にし、意図した変更が時間の経過と共に保持されるようにする方法を示しています。これにより、パイプラインの更新が実行されるとダウンストリーム テーブルが再計算されます。
CREATE OR REFRESH STREAMING TABLE raw_user_table
TBLPROPERTIES(pipelines.reset.allowed = false)
AS SELECT * FROM STREAM read_files("/databricks-datasets/iot-stream/data-user", format => "csv");
CREATE OR REFRESH STREAMING TABLE bmi_table
AS SELECT userid, (weight/2.2) / pow(height*0.0254,2) AS bmi FROM STREAM(raw_user_table);