プログラムによって複数のテーブル を作成する

Python を Delta Live Tables と共に使用すると、プログラムで複数のテーブルを作成し、コードの冗長性を減らすことができます。

少数のパラメーターのみが異なる複数のフローまたはデータセット定義を含むパイプラインがある場合があります。 この冗長性により、エラーが発生しやすく、保守が困難なパイプラインが発生します。 たとえば、次の図は、消防署のデータセットを使用して、さまざまなカテゴリの緊急通報に対して応答時間が最も速い近隣地域を検索するパイプラインのグラフを示しています。 この例では、並列フローはいくつかのパラメーターのみが異なります。

火災データセットフロー図

Python を使用したメタプログラミングDelta Live Tables の例

この例では、 Databricks データセットに含まれるサンプル データを読み取ります。 Databricks データセットは Unity Catalog に発行するパイプラインではサポートされていないため、この例は、 Hive metastoreに発行するように構成されたパイプラインでのみ機能します。 ただし、このパターンは Unity Catalog 対応パイプラインでも機能しますが、 外部ロケーションからデータを読み取る必要があります。 Delta Live Tables で Unity Catalog を使用する方法の詳細については、「 Delta Live Tables パイプラインで Unity Catalog を使用する」を参照してください。

メタプログラミング・パターンを使用して、冗長フロー定義の生成と保守のオーバーヘッドを削減できます。 Delta Live Tables でのメタプログラミングは、Pythonの内部関数を使用して行われます。これらの関数は遅延評価されるため、入力パラメーターを除いて同一のフローを作成するために使用できます。 各呼び出しには、次の例に示すように、各テーブルの生成方法を制御する異なるパラメーターのセットを含めることができます。

重要

Delta Live Tables デコレータを使用する Python 関数は遅延して呼び出されるため、ループでデータセットを作成する場合は、別の関数を呼び出してデータセットを作成し、正しいパラメーター値が使用されるようにする必要があります。 別の関数でデータセットを作成しないと、ループの最終実行のパラメーターを使用する複数のテーブルが作成されます。

次の例では、ループ内で create_table() 関数を呼び出して、テーブル t1t2を作成します。

def create_table(name):
  @dlt.table(name=name)
  def t():
    return spark.read.table(name)

tables = ["t1", "t2"]
for t in tables:
  create_table(t)
import dlt
from pyspark.sql.functions import *

@dlt.table(
  name="raw_fire_department",
  comment="raw table for fire department response"
)
@dlt.expect_or_drop("valid_received", "received IS NOT NULL")
@dlt.expect_or_drop("valid_response", "responded IS NOT NULL")
@dlt.expect_or_drop("valid_neighborhood", "neighborhood != 'None'")
def get_raw_fire_department():
  return (
    spark.read.format('csv')
      .option('header', 'true')
      .option('multiline', 'true')
      .load('/databricks-datasets/timeseries/Fires/Fire_Department_Calls_for_Service.csv')
      .withColumnRenamed('Call Type', 'call_type')
      .withColumnRenamed('Received DtTm', 'received')
      .withColumnRenamed('Response DtTm', 'responded')
      .withColumnRenamed('Neighborhooods - Analysis Boundaries', 'neighborhood')
    .select('call_type', 'received', 'responded', 'neighborhood')
  )

all_tables = []

def generate_tables(call_table, response_table, filter):
  @dlt.table(
    name=call_table,
    comment="top level tables by call type"
  )
  def create_call_table():
    return (
      spark.sql("""
        SELECT
          unix_timestamp(received,'M/d/yyyy h:m:s a') as ts_received,
          unix_timestamp(responded,'M/d/yyyy h:m:s a') as ts_responded,
          neighborhood
        FROM LIVE.raw_fire_department
        WHERE call_type = '{filter}'
      """.format(filter=filter))
    )

  @dlt.table(
    name=response_table,
    comment="top 10 neighborhoods with fastest response time "
  )
  def create_response_table():
    return (
      spark.sql("""
        SELECT
          neighborhood,
          AVG((ts_received - ts_responded)) as response_time
        FROM LIVE.{call_table}
        GROUP BY 1
        ORDER BY response_time
        LIMIT 10
      """.format(call_table=call_table))
    )

  all_tables.append(response_table)

generate_tables("alarms_table", "alarms_response", "Alarms")
generate_tables("fire_table", "fire_response", "Structure Fire")
generate_tables("medical_table", "medical_response", "Medical Incident")

@dlt.table(
  name="best_neighborhoods",
  comment="which neighbor appears in the best response time list the most"
)
def summary():
  target_tables = [dlt.read(t) for t in all_tables]
  unioned = functools.reduce(lambda x,y: x.union(y), target_tables)
  return (
    unioned.groupBy(col("neighborhood"))
      .agg(count("*").alias("score"))
      .orderBy(desc("score"))
  )