Delta Live Tables によるデータの変換

この記事では、Delta Live Tables を使用してデータセットの変換を宣言し、クエリ ロジックを通じてレコードの処理方法を指定する方法について説明します。 また、Delta Live Tables パイプラインを構築するための一般的な変換パターンの例も含まれています。

DataFrame を返す任意のクエリに対してデータセットを定義できます。 Apache Spark の組み込み操作、UDF、カスタム ロジック、MLflow モデルを Delta Live Tables パイプラインの変換として使用できます。 Delta Live Tables パイプラインにデータが取り込まれたら、アップストリーム ソースに対して新しいデータセットを定義して、新しいストリーミング テーブル、具体化されたビュー、およびビューを作成できます。

Delta Live Tablesでステートフル処理を効果的に実行する方法については、 「ウォーターマークを使用したDelta Live Tablesのステートフル処理の最適化」を参照してください。

ビュー、マテリアライズドビュー、ストリーミングテーブルをどのような場合に使用するのか

パイプラインクエリを実装するときは、効率的で保守性を確保するために、最適なデータセットタイプを選択します。

ビューを使用して次のことを行うことを検討してください。

  • 必要な大規模または複雑なクエリを、管理しやすいクエリに分割します。

  • 期待値を使用して中間結果を検証します。

  • ストレージとコンピュートのコストを削減し、保持する必要のない結果を得ることができます。 テーブルはマテリアライズされるため、追加の計算リソースとストレージリソースが必要になります。

マテリアライズドビューは、次の場合に使用を検討してください。

  • 複数のダウンストリーム クエリーがテーブルを使用します。 ビューはオンデマンドでコンピュートされるため、ビューがクエリーされるたびにビューが再コンピュートされます。

  • 他のパイプライン、ジョブ、またはクエリーはテーブルを使用します。 ビューは具体化されないため、同じパイプラインでのみ使用できます。

  • 開発中にクエリーの結果を表示する場合。 テーブルは具体化され、パイプラインの外部で表示およびクエリーできるため、開発中にテーブルを使用すると、計算の正確性を検証するのに役立ちます。 検証後、具体化を必要としないクエリーをビューに変換します。

次の場合は、ストリーミング テーブルの使用を検討してください。

  • クエリーは、継続的または段階的に増加する DATA に対して定義されます。

  • クエリー 結果はインクリメンタルにコンピュートにする必要があります。

  • パイプラインには、高いスループットと低いレイテンシが必要です。

ストリーミング テーブルは、常にストリーミング ソースに対して定義されます。 また、 APPLY CHANGES INTO でストリーミング ソースを使用して、CDC フィードからの更新を適用することもできます。 「 APPLY CHANGES APIs: Delta Live Tablesによるチェンジデータキャプチャの簡素化 」を参照してください。

ターゲットスキーマからテーブルを除外する

外部消費を目的としない中間テーブルを計算する必要がある場合は、 TEMPORARY キーワードを使用して、中間テーブルがスキーマにパブリッシュされないようにすることができます。 一時テーブルは、Delta Live Tables のセマンティクスに従ってデータを格納および処理しますが、現在のパイプラインの外部からアクセスしないでください。 一時テーブルは、それを作成するパイプラインの有効期間中保持されます。 次の構文を使用して、テンポラリ・テーブルを宣言します。

CREATE TEMPORARY STREAMING TABLE temp_table
AS SELECT ... ;
@dlt.table(
  temporary=True)
def temp_table():
  return ("...")

ストリーミングテーブルとマテリアライズドビューを 1 つのパイプラインに結合する

ストリーミング テーブルは、Apache Spark 構造化ストリーミングの処理保証を継承し、追加専用 データソースからのクエリを処理するように構成されており、新しい行は変更されるのではなく、常にソース テーブルに挿入されます。

デフォルトでは、ストリーミングテーブルには追加専用の データソースが必要ですが、ストリーミングソースが更新または削除を必要とする別のストリーミングテーブルである場合は、 skipChangeCommits フラグを使用してこの動作をオーバーライドできます。

一般的なストリーミング パターンでは、ソース データを取り込んでパイプラインに初期データセットを作成します。 これらの初期データセットは一般にブロンズ テーブルと呼ばれ、多くの場合、単純な変換を実行します。

対照的に、パイプライン内の最終テーブル (一般にゴールド テーブルと呼ばれます) では、多くの場合、複雑な集計や APPLY CHANGES INTO 操作のターゲットからの読み取りが必要になります。 これらのオペレーションは本質的に追加ではなく更新を作成するため、ストリーミングテーブルへの入力としてはサポートされていません。 これらの変換は、マテリアライズドビューに適しています。

ストリーミング テーブルとマテリアライズドビューを 1 つのパイプラインに混在させることで、パイプラインを簡素化し、コストのかかる生データの再取り込みや再処理を回避し、効率的にエンコードおよびフィルター処理されたデータセットに対して複雑な集計を行うための SQL を最大限に活用できます。 次の例は、このタイプの混合処理を示しています。

これらの例では、 Auto Loader を使用してクラウド ストレージからファイルを読み込みます。 Auto LoaderUnity Catalog対応パイプラインで を含むファイルをロードするには、 外部ロケーション を使用する必要があります。Delta Live Tables で Unity Catalog を使用する方法の詳細については、「 Delta Live Tables パイプラインで Unity Catalog を使用する」を参照してください。

@dlt.table
def streaming_bronze():
  return (
    # Since this is a streaming source, this table is incremental.
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("s3://path/to/raw/data")
  )

@dlt.table
def streaming_silver():
  # Since we read the bronze table as a stream, this silver table is also
  # updated incrementally.
  return spark.readStream.table("LIVE.streaming_bronze").where(...)

@dlt.table
def live_gold():
  # This table will be recomputed completely by reading the whole silver table
  # when it is updated.
  return spark.readStream.table("LIVE.streaming_silver").groupBy("user_id").count()
CREATE OR REFRESH STREAMING TABLE streaming_bronze
AS SELECT * FROM read_files(
  "s3://path/to/raw/data", "json"
)

CREATE OR REFRESH STREAMING TABLE streaming_silver
AS SELECT * FROM STREAM(LIVE.streaming_bronze) WHERE...

CREATE OR REFRESH MATERIALIZED VIEW mv_gold
AS SELECT count(*) FROM LIVE.streaming_silver GROUP BY user_id

Auto LoaderJSONを使用して からS3 ファイルを段階的に取り込む方法について詳しくは、こちらをご覧ください。

stream-static 結合

Stream-static 結合は、主に静的ディメンション テーブルを使用して追加専用データの連続ストリームを非正規化する場合に適しています。

パイプラインが更新されるたびに、ストリームからの新しいレコードが静的テーブルの最新のスナップショットと結合されます。 ストリーミング テーブルの対応するデータが処理された後に静的テーブルにレコードが追加または更新された場合、完全更新が実行されない限り、結果のレコードは再計算されません。

トリガー実行用に構成されたパイプラインでは、静的テーブルは更新が開始された時点の結果を返します。 連続実行用に構成されたパイプラインでは、テーブルが更新を処理するたびに、静的テーブルの最新バージョンがクエリされます。

ストリーム静的結合の例を次に示します。

@dlt.table
def customer_sales():
  return spark.readStream.table("LIVE.sales").join(spark.readStream.table("LIVE.customers"), ["customer_id"], "left")
CREATE OR REFRESH STREAMING TABLE customer_sales
AS SELECT * FROM STREAM(LIVE.sales)
  INNER JOIN LEFT LIVE.customers USING (customer_id)

集計を効率的に計算する

ストリーミングテーブルを使用して、count、min、max、sum などの単純な分布集計や、平均や標準偏差などの代数集計を増分的に計算できます。 Databricks では、 GROUP BY country 句を含むクエリなど、グループの数が制限されているクエリの増分集計をお勧めします。 更新のたびに、新しい入力データのみが読み取られます。

増分集計を実行する Delta Live Tables クエリの作成の詳細については、 「ウォーターマークを使用したウィンドウ集計の実行」を参照してください。

Delta Live Tables パイプラインで MLflow モデルを使用する

Unity Catalog 対応パイプラインで MLflow モデルを使用するには、 preview チャネルを使用するようにパイプラインを構成する必要があります。 current チャンネルを使用するには、 Hive metastoreに公開するようにパイプラインを設定する必要があります。

MLflow でトレーニングされたモデルは、Delta Live Tables パイプラインで使用できます。 MLflow モデルは Databricks では変換として扱われ、Spark DataFrame 入力に作用し、結果を Spark DataFrame として返します。 では Delta Live Tablesに対してデータセットが定義されているため、 を使用する ワークロードをわずか数行のコードでDataFrames Apache SparkMLflowDelta Live Tablesに変換できます。MLflow の詳細については、「 MLflow を使用した ML ライフサイクル管理」を参照してください。

MLflow モデルを呼び出す Python ノートブックが既にある場合は、 @dlt.table デコレーターを使用し、変換結果を返すように関数が定義されていることを確認することで、このコードを Delta Live Tables に適応させることができます。 Delta Live Tables は MLflow by デフォルトをインストールしませんので、%pip install mlflow と一緒に MLFlow ライブラリをインストールし、ノートブックの上部に mlflowdlt をインポートしたことを確認してください。 Delta Live Tables 構文の概要については、「 Python を使用したパイプライン コードの開発」を参照してください。

Delta Live Tablesで MLflow モデルを使用するには、次の手順を実行します。

  1. MLflow モデルの実行 ID とモデル名を取得します。 実行 ID とモデル名は、MLflow モデルの URI を構築するために使用されます。

  2. URI を使用して Spark UDF を定義し、MLflow モデルを読み込みます。

  3. テーブル定義で UDF を呼び出して、MLflow モデルを使用します。

次の例は、このパターンの基本的な構文を示しています。

%pip install mlflow

import dlt
import mlflow

run_id= "<mlflow-run-id>"
model_name = "<the-model-name-in-run>"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)

@dlt.table
def model_predictions():
  return spark.read.table(<input-data>)
    .withColumn("prediction", loaded_model_udf(<model-features>))

完全な例として、次のコードでは、ローン リスク データでトレーニングされた MLflow モデルを読み込む loaded_model_udf という名前の Spark UDF を定義しています。 予測に使用されるデータ列は、引数として UDF に渡されます。 テーブル loan_risk_predictions は、 loan_risk_input_dataの各行の予測を計算します。

%pip install mlflow

import dlt
import mlflow
from pyspark.sql.functions import struct

run_id = "mlflow_run_id"
model_name = "the_model_name_in_run"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)

categoricals = ["term", "home_ownership", "purpose",
  "addr_state","verification_status","application_type"]

numerics = ["loan_amnt", "emp_length", "annual_inc", "dti", "delinq_2yrs",
  "revol_util", "total_acc", "credit_length_in_years"]

features = categoricals + numerics

@dlt.table(
  comment="GBT ML predictions of loan risk",
  table_properties={
    "quality": "gold"
  }
)
def loan_risk_predictions():
  return spark.read.table("loan_risk_input_data")
    .withColumn('predictions', loaded_model_udf(struct(features)))

手動による削除または更新を保持する

Delta Live Tables を使用すると、テーブルからレコードを手動で削除または更新し、更新操作を実行してダウンストリーム テーブルを再計算できます。

デフォルトでは、 Delta Live Tables はパイプラインが更新されるたびに入力データに基づいてテーブルの結果を再計算するため、削除されたレコードがソース データから再ロードされないようにする必要があります。 pipelines.reset.allowed table プロパティを false に設定すると、テーブルへの更新は防止されますが、テーブルへの増分書き込みや新しいデータがテーブルに流入するのを防ぐことはできません。

次の図は、2 つのストリーミング テーブルを使用する例を示しています。

  • raw_user_table ソースから生のユーザーデータを取り込みます。

  • bmi_table raw_user_tableから体重と身長を使用してBMIスコアを段階的にコンピュートします。

raw_user_table からユーザー レコードを手動で削除または更新し、 bmi_tableを再計算します。

データ図の保持

次のコードは、 pipelines.reset.allowed table プロパティを false に設定して raw_user_table の完全更新を無効にし、意図した変更が時間の経過と共に保持されるようにする方法を示していますが、パイプラインの更新が実行されるとダウンストリーム テーブルが再計算されます。

CREATE OR REFRESH STREAMING TABLE raw_user_table
TBLPROPERTIES(pipelines.reset.allowed = false)
AS SELECT * FROM read_files("/databricks-datasets/iot-stream/data-user", "csv");

CREATE OR REFRESH STREAMING TABLE bmi_table
AS SELECT userid, (weight/2.2) / pow(height*0.0254,2) AS bmi FROM STREAM(LIVE.raw_user_table);