Delta Live Tables によるデータの変換

この記事では、 Delta Live Tables を使用してデータセットの変換を宣言し、クエリー ロジックを使用してレコードを処理する方法を指定する方法について説明します。 また、 Delta Live Tables パイプラインを構築するときに役立つ一般的な変換パターンの例もいくつか含まれています。

データセットは、データフレームを返す任意のクエリーに対して定義できます。 Apache Spark の組み込み操作、UDF、カスタム ロジック、MLflow モデルを Delta Live Tables パイプラインの変換として使用できます。 データが Delta Live Tables パイプラインに取り込まれたら、アップストリームソースに対して新しいデータセットを定義して、新しいストリーミングテーブル、具体化されたビュー、ビューを作成できます。

Delta Live Tablesでステートフル処理を効果的に実行する方法については、 「ウォーターマークを使用したDelta Live Tablesのステートフル処理の最適化」を参照してください。

ビュー、マテリアライズドビュー、ストリーミングテーブルを使用する場合

パイプラインの効率性と保守性を確保するには、パイプライン クエリを実装するときに最適なデータセットの種類を選択します。

次の場合にビューの使用を検討してください。

  • 大規模または複雑なクエリーがあり、管理しやすいクエリーに分割したい。

  • エクスペクテーションを使用して中間結果を検証する必要があります。

  • 保管コストやコンピュートコストを削減し、クエリー結果の具体化を必要としない場合。 テーブルはマテリアライズされるため、追加の計算リソースとストレージリソースが必要です。

具体化されたビューは、次の場合に使用を検討してください。

  • 複数のダウンストリーム クエリーがテーブルを使用します。 ビューはオンデマンドでコンピュートされるため、ビューがクエリーされるたびにビューが再コンピュートされます。

  • 他のパイプライン、ジョブ、またはクエリーはテーブルを使用します。 ビューは具体化されないため、同じパイプラインでのみ使用できます。

  • 開発中にクエリーの結果を表示する場合。 テーブルは具体化され、パイプラインの外部で表示およびクエリーできるため、開発中にテーブルを使用すると、計算の正確性を検証するのに役立ちます。 検証後、具体化を必要としないクエリーをビューに変換します。

次の場合は、ストリーミング テーブルの使用を検討してください。

  • クエリーは、継続的または段階的に増加する DATA に対して定義されます。

  • クエリー 結果はインクリメンタルにコンピュートにする必要があります。

  • パイプラインには、高いスループットと低い待機時間が必要です。

ストリーミング テーブルは、常にストリーミング ソースに対して定義されます。 また、 APPLY CHANGES INTO でストリーミング ソースを使用して、CDC フィードから更新を適用することもできます。 「Delta Live Tables の APPLY CHANGES API を使用した簡略化されたチェンジデータキャプチャ」を参照してください。

ストリーミングテーブルとマテリアライズドビューを 1 つのパイプラインに結合する

ストリーミング テーブルは、Apache Spark 構造化ストリーミングの処理保証を継承し、追加専用 データソースからのクエリを処理するように構成されており、新しい行は変更されるのではなく、常にソース テーブルに挿入されます。

デフォルトでは、ストリーミングテーブルには追加専用の データソースが必要ですが、ストリーミングソースが更新または削除を必要とする別のストリーミングテーブルである場合は、 skipChangeCommits フラグを使用してこの動作をオーバーライドできます。

一般的なストリーミング パターンには、ソース データを取り込んでパイプラインに初期データセットを作成することが含まれます。 これらの初期データセットは一般に ブロンズ テーブルと呼ばれ、多くの場合、単純な変換を実行します。

対照的に、パイプラインの最後のテーブル (一般に ゴールド テーブルと呼ばれます) では、多くの場合、複雑な集計や、 APPLY CHANGES INTO 操作のターゲットであるソースからの読み取りが必要になります。 これらの操作は本質的に追加ではなく更新を作成するため、ストリーミング テーブルへの入力としてはサポートされていません。 これらの変換は、マテリアライズドビューに適しています。

ストリーミング テーブルと具体化されたビューを 1 つのパイプラインに混在させることで、パイプラインを簡素化し、コストのかかる生データの再取り込みや再処理を回避し、効率的にエンコードおよびフィルター処理されたデータセットに対して複雑な集計を行うための SQL を最大限に活用できます。 次の例は、このタイプの混合処理を示しています。

これらの例では、 Auto Loader を使用してクラウドストレージからファイルを読み込みます。 Unity Catalog 対応パイプラインで Auto Loader を含むファイルを読み込むには、 外部ロケーションを使用する必要があります。 Delta Live Tables で Unity Catalog を使用する方法の詳細については、「 Delta Live Tables パイプラインで Unity Catalog を使用する」を参照してください。

@dlt.table
def streaming_bronze():
  return (
    # Since this is a streaming source, this table is incremental.
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("s3://path/to/raw/data")
  )

@dlt.table
def streaming_silver():
  # Since we read the bronze table as a stream, this silver table is also
  # updated incrementally.
  return dlt.read_stream("streaming_bronze").where(...)

@dlt.table
def live_gold():
  # This table will be recomputed completely by reading the whole silver table
  # when it is updated.
  return dlt.read("streaming_silver").groupBy("user_id").count()
CREATE OR REFRESH STREAMING TABLE streaming_bronze
AS SELECT * FROM cloud_files(
  "s3://path/to/raw/data", "json"
)

CREATE OR REFRESH STREAMING TABLE streaming_silver
AS SELECT * FROM STREAM(LIVE.streaming_bronze) WHERE...

CREATE OR REFRESH LIVE TABLE live_gold
AS SELECT count(*) FROM LIVE.streaming_silver GROUP BY user_id

Auto Loader を使用して S3 から JSON ファイルを効率的に読み取り、増分処理を行う方法の詳細をご覧ください。

stream-static 結合

Stream-static 結合は、主に静的ディメンション テーブルを使用して追加専用データの連続ストリームを非正規化する場合に適しています。

パイプラインが更新されるたびに、ストリームからの新しいレコードが静的テーブルの最新のスナップショットと結合されます。 ストリーミング テーブルの対応するデータが処理された後に静的テーブルにレコードが追加または更新された場合、完全更新が実行されない限り、結果のレコードは再計算されません。

トリガーされた実行用に構成されたパイプラインでは、静的テーブルは更新が開始された時点の結果を返します。 連続実行用に構成されたパイプラインでは、テーブルが更新を処理するたびに、静的テーブルの最新バージョンがクエリーになります。

ストリーム静的結合の例を次に示します。

@dlt.table
def customer_sales():
  return dlt.read_stream("sales").join(dlt.read("customers"), ["customer_id"], "left")
CREATE OR REFRESH STREAMING TABLE customer_sales
AS SELECT * FROM STREAM(LIVE.sales)
  INNER JOIN LEFT LIVE.customers USING (customer_id)

集計を効率的に計算する

ストリーミングテーブルを使用して、カウント、最小、最大、合計などの単純な分散集計と、平均や標準偏差などの代数集計を段階的に計算できます。 Databricks では、 GROUP BY country 句を含むクエリーなど、限られた数のグループを持つクエリーの増分集計をお勧めします。 更新のたびに、新しい入力データのみが読み取られます。

増分集計を実行する Delta Live Tables クエリの作成の詳細については、 「ウォーターマークを使用したウィンドウ集計の実行」を参照してください。

Delta Live Tables パイプラインで MLflow モデルを使用する

Unity Catalog 対応パイプラインで MLflow モデルを使用するには、 preview チャネルを使用するようにパイプラインを構成する必要があります。 current チャンネルを使用するには、 Hive metastoreに公開するようにパイプラインを設定する必要があります。

MLflow でトレーニングされたモデルは、Delta Live Tables パイプラインで使用できます。 MLflow モデルは Databricks では変換として扱われ、Spark DataFrame 入力に作用し、結果を Spark DataFrame として返します。 Delta Live Tables では DataFramesに対してデータセットが定義されるため、わずか数行のコードで MLflow を利用する Apache Spark ワークロードを Delta Live Tables に変換できます。 MLflow の詳細については、「 MLflow を使用した機械学習ライフサイクル管理」を参照してください。

モデルを呼び出す Pythonノートブックがすでにある場合は、 デコレータを使用し、変換結果を返すように関数が定義されていることを確認することで、このコードをMLflowDelta Live Tables @dlt.tableに適応させることができます。Delta Live Tables はデフォルトでは MLflow をインストールしないため、必ずノートブックの先頭で%pip install mlflowを実行し、 mlflowdltをインポートしてください。 Delta Live Tables構文の概要については、 「例: ニューヨークの赤ちゃんの名前データの取り込みと処理」を参照してください。

Delta Live Tablesで MLflow モデルを使用するには、次の手順を実行します。

  1. MLflow モデルの実行 ID とモデル名を取得します。 実行 ID とモデル名は、MLflow モデルの URI を構築するために使用されます。

  2. URI を使用して Spark UDF を定義し、MLflow モデルを読み込みます。

  3. テーブル定義で UDF を呼び出して、MLflow モデルを使用します。

次の例は、このパターンの基本的な構文を示しています。

%pip install mlflow

import dlt
import mlflow

run_id= "<mlflow-run-id>"
model_name = "<the-model-name-in-run>"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)

@dlt.table
def model_predictions():
  return dlt.read(<input-data>)
    .withColumn("prediction", loaded_model_udf(<model-features>))

完全な例として、次のコードでは、ローン リスク データでトレーニングされた MLflow モデルを読み込む loaded_model_udf という名前の Spark UDF を定義しています。 予測に使用されるデータ列は、引数として UDF に渡されます。 テーブル loan_risk_predictions は、 loan_risk_input_dataの各行の予測を計算します。

%pip install mlflow

import dlt
import mlflow
from pyspark.sql.functions import struct

run_id = "mlflow_run_id"
model_name = "the_model_name_in_run"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)

categoricals = ["term", "home_ownership", "purpose",
  "addr_state","verification_status","application_type"]

numerics = ["loan_amnt", "emp_length", "annual_inc", "dti", "delinq_2yrs",
  "revol_util", "total_acc", "credit_length_in_years"]

features = categoricals + numerics

@dlt.table(
  comment="GBT ML predictions of loan risk",
  table_properties={
    "quality": "gold"
  }
)
def loan_risk_predictions():
  return dlt.read("loan_risk_input_data")
    .withColumn('predictions', loaded_model_udf(struct(features)))

手動による削除または更新を保持する

Delta Live Tables を使用すると、テーブルからレコードを手動で削除または更新し、更新操作を実行してダウンストリーム テーブルを再計算できます。

デフォルトにより、Delta Live Tables はパイプラインが更新されるたびに入力データに基づいてテーブルの結果を再計算するため、削除されたレコードがソースデータから再読み込みされないようにする必要があります。 pipelines.reset.allowed table プロパティを false に設定すると、テーブルへの更新は防止されますが、テーブルへの増分書き込みや新しいデータがテーブルに流れ込むのを防ぐことはできません。

次の図は、2 つのストリーミング テーブルを使用する例を示しています。

  • raw_user_table ソースから生のユーザーデータを取り込みます。

  • bmi_table raw_user_tableから体重と身長を使用してBMIスコアを段階的にコンピュートします。

raw_user_table からユーザー レコードを手動で削除または更新し、 bmi_tableを再計算します。

データ図の保持

次のコードは、 pipelines.reset.allowed table プロパティを false に設定して raw_user_table の完全更新を無効にし、意図した変更が時間の経過と共に保持されるようにする方法を示していますが、パイプラインの更新が実行されるとダウンストリーム テーブルが再計算されます。

CREATE OR REFRESH STREAMING TABLE raw_user_table
TBLPROPERTIES(pipelines.reset.allowed = false)
AS SELECT * FROM cloud_files("/databricks-datasets/iot-stream/data-user", "csv");

CREATE OR REFRESH STREAMING TABLE bmi_table
AS SELECT userid, (weight/2.2) / pow(height*0.0254,2) AS bmi FROM STREAM(LIVE.raw_user_table);