Delta Live Tables Python言語リファレンス

この記事では、Delta Live Tables Pythonプログラミングインターフェイスの詳細を説明します。

SQL APIの詳細については、「Delta Live Tables SQL言語リファレンス」を参照してください。

オートローダーの設定に関する詳細は、「オートローダーとは?」を参照してください。

制限事項

Delta Live Tables Pythonインターフェイスには次の制限があります:

  • Python table関数とview関数はDataFrameを返す必要があります。DataFrameを操作する一部の関数はDataFrameを返さないため、使用しないでください。DataFrame変換は完全なデータフローグラフが解決された後に実行されるため、このような操作を使用すると、意図しない副作用が発生する可能性があります。これらの操作には、collect()count()toPandas()save()saveAsTable()などの関数が含まれます。ただし、このコードはグラフの初期化フェーズ中に1回実行されるため、これらの関数をtableまたはview関数定義の外に含めることができます。

  • pivot()関数はサポートされていません。Sparkのpivotオペレーションでは、出力のスキーマを計算するために入力データの積極的な読み込みが必要です。この機能は、Delta Live Tablesではサポートされていません。

dlt Pythonモジュールをインポートする

Delta Live TablesのPython関数は、dltモジュールで定義されています。Python APIを使用して実装されたパイプラインは、このモジュールをインポートする必要があります:

import dlt

Delta Live Tablesマテリアライズドビューまたはストリーミングテーブルを作成する

Pythonでは、Delta Live Tablesは、定義クエリーに基づいてデータセットをマテリアライズドビューとして更新するかストリーミングテーブルとして更新するかを決定します。@tableデコレータは、マテリアライズドビューとストリーミングテーブルの両方を定義するために使用されます。

Pythonでマテリアライズドビューを定義するには、データソースに対して静的読み取りを実行するクエリーに@tableを適用します。ストリーミングテーブルを定義するには、データソースに対してストリーミング読み取りを実行するクエリーに@tableを適用します。どちらのデータセットタイプも、次のように同じ構文仕様を持っています:

import dlt

@dlt.table(
  name="<name>",
  comment="<comment>",
  spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  path="<storage-location-path>",
  partition_cols=["<partition-column>", "<partition-column>"],
  schema="schema-definition",
  temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

Delta Live Tablesビューの作成

Pythonでビューを定義するには、@viewデコレータを適用します。@tableデコレータと同様に、Delta Live Tableのビューは、静的データセットまたはストリーミングデータセットに使用できます。Pythonでビューを定義するための構文を次に示します:

import dlt

@dlt.view(
  name="<name>",
  comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

例:テーブルとビューを定義する

Pythonでテーブルまたはビューを定義するには、@dlt.viewまたは@dlt.tableデコレータを関数に適用します。関数名またはnameパラメーターを使用して、テーブルまたはビューの名前を割り当てることができます。次の例では、2つの異なるデータセットを定義しています。JSONファイルを入力ソースとして受け取るtaxi_rawというビューと、入力としてtaxi_rawビューを受け取るfiltered_dataというテーブルです:

import dlt

@dlt.view
def taxi_raw():
  return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")

# Use the function name as the table name
@dlt.table
def filtered_data():
  return dlt.read("taxi_raw").where(...)

# Use the name parameter as the table name
@dlt.table(
  name="filtered_data")
def create_filtered_data():
  return dlt.read("taxi_raw").where(...)

例:同じパイプラインで定義されたデータセットにアクセスする

外部データソースからの読み取りに加えて、Delta Live Tables read()関数を使用して、同じパイプラインで定義されているデータセットにアクセスできます。次の例は、read()関数を使用してcustomers_filteredデータセットを作成する方法を示しています:

@dlt.table
def customers_raw():
  return spark.read.format("csv").load("/data/customers.csv")

@dlt.table
def customers_filteredA():
  return dlt.read("customers_raw").where(...)

spark.table()関数を使用して、同じパイプラインで定義されたデータセットにアクセスすることもできます。spark.table()関数を使用してパイプラインで定義されたデータセットにアクセスするときは、LIVE関数の引数でデータセット名の前にキーワードを追加します:

@dlt.table
def customers_raw():
  return spark.read.format("csv").load("/data/customers.csv")

@dlt.table
def customers_filteredB():
  return spark.table("LIVE.customers_raw").where(...)

例:Metastoreに登録されたテーブルからの読み取り

Hive Metastoreに登録されているテーブルからデータを読み取るには、関数の引数でLIVEキーワードを省略し、必要に応じてテーブル名をデータベース名で修飾します:

@dlt.table
def customers():
  return spark.table("sales.customers").where(...)

Unity Catalog テーブルからの読み取りの例については、「 Unity Catalog パイプラインへのデータの取り込み」を参照してください。

例:spark.sqlを使ってデータセットにアクセスする

クエリー関数でspark.sql式を使用してデータセットを返すこともできます。内部データセットから読み取るには、データセット名の前にLIVE.を追加します:

@dlt.table
def chicago_customers():
  return spark.sql("SELECT * FROM LIVE.customers_cleaned WHERE city = 'Chicago'")

複数のソースストリームからストリーミングテーブルへの書き込み

プレビュー

Delta Live Tables での @append_flow のサポートは パブリック プレビュー段階です。

@append_flow デコレーターを使用して、複数のストリーミング ソースからストリーミング テーブルに書き込み、次の操作を行うことができます。

  • 既存のストリーミング テーブルにデータを追加するストリーミング ソースを追加および削除し、完全な更新は必要ありません。 たとえば、事業を展開しているすべての地域の地域データを組み合わせたテーブルがあるとします。 新しいリージョンがロールアウトされると、完全な更新を実行せずに新しいリージョン データをテーブルに追加できます。

  • 不足しているヒストリカルデータ (backfilling) を追加して、ストリーミング テーブルを更新します。 たとえば、Apache Kafka トピックによって書き込まれる既存のストリーミングテーブルがあるとします。 また、ストリーミング テーブルに 1 回だけ挿入する必要があるテーブルに格納されているヒストリカルデータがあり、データを挿入する前に複雑な集計を実行する必要があるため、データをストリームできません。

@append_flow処理によって出力されるレコードのターゲット表を作成するには、create_streaming_table() 関数を使用します。

データ品質制約を EXPECTATIONS とともに定義する必要がある場合は、 create_streaming_table() 関数の一部としてターゲット表の EXPECTATIONS を定義します。 @append_flow定義で期待値を定義することはできません。

@append_flowの構文は次のとおりです。

import dlt

dlt.create_streaming_table("<target-table-name>")

@dlt.append_flow(
  target = "<target-table-name>",
  name = "<flow-name>", # optional, defaults to function name
  spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
  comment = "<comment") # optional
def <function-name>():
  return (<streaming query>)

例: 複数の Kafka トピックからストリーミングテーブルへの書き込み

次の例では、 kafka_target という名前のストリーミングテーブルを作成し、2 つの Kafka トピックからそのストリーミングテーブルに書き込みます。

import dlt

dlt.create_streaming_table("kafka_target")

# Kafka stream from multiple topics
@dlt.append_flow(target = "kafka_target")
def topic1():
  return (
    spark.readStream.format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic1")
      .load()
  )

@dlt.append_flow(target = "kafka_target")
def topic2():
  return (
    spark.readStream.format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic2")
      .load()
  )

例: 1 回限りのデータバックフィルを実行する

次の例では、クエリーを実行して、ヒストリカルデータをストリーミングテーブルに追加します。

バックフィルクエリーがスケジュールされたベースまたは継続的に実行されるパイプラインの一部である場合に、真の 1 回限りのバックフィルを確保するには、パイプラインを一度実行した後にクエリーを削除します。 新しいデータがバックフィル ディレクトリに到着した場合に追加するには、クエリーをそのままにしておきます。

import dlt

@dlt.table()
def csv_target():
  return spark.readStream.format("csv").load("path/to/sourceDir")

@dlt.append_flow(target = "csv_target")
def backfill():
  return spark.readStream.format("csv").load("path/to/backfill/data/dir")

ストリーミング操作のターゲットとして使用するテーブルを作成する

create_streaming_table()関数を使用して、 apply_changes()@append_flow出力レコードなど、ストリーミング操作によって出力されるレコードのターゲット テーブルを作成します。

create_target_table()関数とcreate_streaming_live_table()関数は非推奨です。Databricksでは、create_streaming_table()関数を使用するように既存のコードを更新することをお勧めします。

create_streaming_table(
  name = "<table-name>",
  comment = "<comment>"
  spark_conf={"<key>" : "<value", "<key" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  partition_cols=["<partition-column>", "<partition-column>"],
  path="<storage-location-path>",
  schema="schema-definition",
  expect_all = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_drop = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_fail = {"<key>" : "<value", "<key" : "<value>"}
)

引数

name

タイプ: str

テーブル名。

このパラメーターは必須です。

comment

タイプ: str

テーブルの説明(オプション)。

spark_conf

タイプ: dict

このクエリーを実行するためのSpark構成のオプションのリスト。

table_properties

タイプ: dict

テーブルのテーブルプロパティのオプションのリスト。

partition_cols

タイプ: array

テーブルのパーティション化に使用する1つ以上の列のオプションのリスト。

path

タイプ: str

テーブルデータのオプションの格納場所。設定されていない場合、システムはデフォルトでパイプラインの保存場所を使用します。

schema

タイプ:strまたは StructType

テーブルのオプションのスキーマ定義。スキーマは、SQL DDL文字列として、またはPython StructTypeを使用して定義できます。

expect_all expect_all_or_drop expect_all_or_fail

タイプ: dict

テーブルのデータ品質制約 (省略可能)。 複数のエクスペクテーションを表示します。

テーブルの実体化方法を制御する

テーブルはまた、その実体化をさらにコントロールすることもできる:

  • partition_colsを使用してテーブル をパーティション分割 する方法を指定します。パーティショニングを使用すると、クエリーを高速化できます。

  • テーブルのプロパティは、ビューまたはテーブルを定義するときに設定できます。 Delta Live Tables テーブルのプロパティを参照してください。

  • path設定を使用して、テーブルデータの保存場所を設定します。デフォルトでは、pathが設定されていない場合、テーブルデータはパイプラインの保存場所に保存されます。

  • 生成された列 は、スキーマ定義で使用できます。例 : スキーマ列とパーティション列の指定を参照してください。

サイズが1 TB未満のテーブルの場合、Databricksでは、Delta Live Tablesにデータ構成を制御させることをお勧めします。テーブルがテラバイトを超えて大きくなることが予想されない限り、通常はパーティション列を指定しないでください。

例:スキーマとパーティション列を指定する

オプションで、Python StructTypeまたはSQL DDL文字列を使用してテーブルスキーマを指定できます。DDL文字列で指定すると、定義には生成された列を含めることができます

次の例では、Python StructTypeを使用してスキーマを指定した sales というテーブルを作成します。

sales_schema = StructType([
  StructField("customer_id", StringType(), True),
  StructField("customer_name", StringType(), True),
  StructField("number_of_line_items", StringType(), True),
  StructField("order_datetime", StringType(), True),
  StructField("order_number", LongType(), True)]
)

@dlt.table(
  comment="Raw data on sales",
  schema=sales_schema)
def sales():
  return ("...")

次の例では、DDL文字列を使用してテーブルのスキーマを指定し、生成された列を定義し、パーティション列を定義します:

@dlt.table(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  partition_cols = ["order_day_of_week"])
def sales():
  return ("...")

デフォルトでは、スキーマを指定しない場合、Delta Live Tablesはtable定義からスキーマを推測します。

ソースストリーミングテーブルの変更を無視するようにストリーミングテーブルを構成する

  • skipChangeCommitsフラグは、option()関数を使用するspark.readStreamでのみ機能します。このフラグは、dlt.read_stream()関数では使用できません。

  • ソース・ストリーミング・テーブルが apply_changes() 関数のターゲットとして定義されている場合、skipChangeCommits フラグを使用することはできません。

既定では、ストリーミング テーブルには追加専用のソースが必要です。 ストリーミング テーブルが別のストリーミング テーブルをソースとして使用し、ソース ストリーミング テーブルで更新または削除 (GDPR の "忘れられる権利" 処理など) が必要な場合、ソース ストリーミング テーブルを読み取るときに skipChangeCommits フラグを設定して、それらの変更を無視できます。 このフラグの詳細については、「 更新と削除を無視する」を参照してください。

@table
def b():
   return spark.readStream.option("skipChangeCommits", "true").table("LIVE.A")

Python Delta Live Tablesのプロパティ

以下のテーブルでは、Delta Live Tablesでテーブルやビューを定義する際に指定できるオプションやプロパティについて説明します:

@tableまたは@view

name

タイプ: str

テーブルまたはビューのオプションの名前。定義されていない場合、関数名がテーブル名またはビュー名として使用されます。

comment

タイプ: str

テーブルの説明(オプション)。

spark_conf

タイプ: dict

このクエリーを実行するためのSpark構成のオプションのリスト。

table_properties

タイプ: dict

テーブルのテーブルプロパティのオプションのリスト。

path

タイプ: str

テーブルデータのオプションの格納場所。設定されていない場合、システムはデフォルトでパイプラインの保存場所を使用します。

partition_cols

タイプ: a collection of str

テーブルのパーティション化に使用する1つ以上の列のオプションのコレクション(例:list)。

schema

タイプ:strまたは StructType

テーブルのオプションのスキーマ定義。スキーマは、SQL DDL文字列として、またはPython StructTypeを使用して定義できます。

temporary

タイプ: bool

テーブルを作成しますが、テーブルのメタデータは公開しません。 temporary キーワードは、パイプラインで使用できるがパイプラインの外部からはアクセスできないテーブルを作成するようにDelta Live Tablesに指示します。 処理時間を短縮するために、一時テーブルは 1 回の更新だけでなく、それを作成したパイプラインの存続期間中存続します。

デフォルトは「False」です。

テーブルまたはビューの定義

def <function-name>()

データセットを定義するPython関数。nameパラメーターが設定されていない場合は、<function-name>がターゲットデータセット名として使用されます。

query

Spark DatasetまたはKoalas DataFrameを返すSpark SQL文。

同じパイプラインで定義されたデータセットから完全な読み取りを実行するには、dlt.read() またはspark.table()を使用します。spark.table()関数を使用して同じパイプラインで定義されたデータセットから読み取る場合は、関数の引数のデータセット名の前にLIVEキーワードを追加します。たとえば、customersという名前のデータセットから読み取るには、次のようにします:

spark.table("LIVE.customers")

spark.table()関数を使用して、LIVEキーワードを省略し、オプションでテーブル名をデータベース名で修飾することで、Metastoreに登録されているテーブルから読み取ることもできます:

spark.table("sales.customers")

同じパイプラインで定義されたデータセットからストリーミング読み取りを実行するには、dlt.read_stream()を使用します。

spark.sql関数を使用してSQLクエリーを定義し、返されるデータセットを作成します。

PySpark構文を使用して、PythonでDelta Live Tablesクエリーを定義します。

エクスペクテーション

@expect("description", "constraint")

descriptionで識別されるデータ品質制約を宣言します。行が期待に反する場合は、その行をターゲットデータセットに含めます。

@expect_or_drop("description", "constraint")

descriptionで識別されるデータ品質制約を宣言します。行が期待に反する場合は、ターゲットデータセットから行を削除します。

@expect_or_fail("description", "constraint")

descriptionで識別されるデータ品質制約を宣言します。行が期待値に違反した場合は、ただちに実行を停止してください。

@expect_all(expectations)

1つ以上のデータ品質制約を宣言します。expectationsはPython辞書で、キーはエクスペクテーションの説明、値はエクスペクテーションの制約です。行が期待に違反する場合は、その行をターゲットデータセットに含めます。

@expect_all_or_drop(expectations)

1つ以上のデータ品質制約を宣言します。expectationsはPython辞書で、キーは期待値の説明、値は期待値の制約です。行が期待に違反する場合は、ターゲットデータセットから行を削除します。

@expect_all_or_fail(expectations)

1つ以上のデータ品質制約を宣言します。expectationsはPython辞書で、キーは期待値の説明、値は期待値の制約です。行がいずれかの期待に反した場合、直ちに実行を停止します。

Delta Live TablesでのPythonを使用した変更データキャプチャ

Delta Live Tables CDC 機能を使用するには、Python API のapply_changes()関数を使用します。 Delta Live Tables Python インターフェイスでは、 create_streaming_table()関数も提供されます。 この関数を使用して、 apply_changes() 関数に必要なターゲット表を作成できます。

apply_changes(
  target = "<target-table>",
  source = "<data-source>",
  keys = ["key1", "key2", "keyN"],
  sequence_by = "<sequence-column>",
  ignore_null_updates = False,
  apply_as_deletes = None,
  apply_as_truncates = None,
  column_list = None,
  except_column_list = None,
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
)

INSERTUPDATE イベントのデフォルトの動作は、ソースからCDCイベントをアップサートすることです。指定されたキーに一致するターゲットテーブルの行を更新するか、ターゲットテーブルに一致するレコードが存在しない場合に新しい行を挿入します。DELETEイベントの処理はAPPLY AS DELETE WHEN条件で指定できます。

重要

変更を適用するターゲットストリーミングテーブルを宣言する必要があります。オプションでターゲットテーブルのスキーマを指定できます。apply_changesターゲットテーブルのスキーマを指定する場合は、sequence_byフィールドと同じデータ型の__START_AT列と__END_AT列も含める必要があります。

「Delta Live Tables の APPLY CHANGES API を使用した簡略化されたチェンジデータキャプチャ」を参照してください。

引数

target

タイプ: str

更新するテーブルの名前。 create_streaming_table() 関数を使用して、 apply_changes() 関数を実行する前にターゲット表を作成できます。

このパラメーターは必須です。

source

タイプ: str

CDCレコードを含むデータソース。

このパラメーターは必須です。

keys

タイプ: list

ソースデータ内の行を一意に識別する列または列の組み合わせ。これは、どのCDCイベントがターゲットテーブル内の特定のレコードに適用されるかを識別するために使用されます。

次のいずれかを指定できます:

  • 文字列のリスト: ["userId", "orderId"]

  • Spark SQL col()関数の一覧を次に示します: [col("userId"), col("orderId"]

col()関数の引数に修飾子を含めることはできません。例えば、col(userId)は使えますが、col(source.userId)は使えません。

このパラメーターは必須です。

sequence_by

タイプ:strまたは col()

ソースデータ内のCDCイベントの論理順序を指定する列名。Delta Live Tablesは、このシーケンスを使用して、順序どおりに到着しない変更イベントを処理します。

次のいずれかを指定できます:

  • 文字列: "sequenceNum"

  • Spark SQL col()関数は次のとおりです: col("sequenceNum")

col()関数の引数に修飾子を含めることはできません。例えば、col(userId)は使えますが、col(source.userId)は使えません。

このパラメーターは必須です。

ignore_null_updates

タイプ: bool

ターゲット列のサブセットを含む更新の取り込みを許可する。CDCイベントが既存の行と一致し、ignore_null_updatesTrueの場合、nullを持つ列はターゲット内の既存の値を保持します。これはnullの値を持つネストされた列にも適用されます。ignore_null_updatesFalseの場合、既存の値はnullの値で上書きされます。

このパラメーターはオプションです。

デフォルトはFalseです。

apply_as_deletes

タイプ:strまたは expr()

CDC イベントをアップサートではなく DELETE として扱うタイミングを指定します。 順不同のデータを処理するために、削除された行は基になる Delta テーブルに廃棄標識として一時的に保持され、これらの廃棄標識を除外するビューがメタストアに作成されます。 保有期間は、 pipelines.cdc.tombstoneGCThresholdInSeconds テーブル プロパティを使用して構成できます。

次のいずれかを指定できます:

  • 文字列: "Operation = 'DELETE'"

  • Spark SQL expr()関数は次のとおりです: expr("Operation = 'DELETE'")

このパラメーターはオプションです。

apply_as_truncates

タイプ:strまたは expr()

CDCイベントを完全なテーブルTRUNCATEとして扱う必要がある場合を指定します。この句はターゲットテーブルの完全な切り捨てをトリガーするため、この機能が必要な特定のユースケースにのみ使用してください。

apply_as_truncatesパラメーターは、SCDタイプ1でのみサポートされます。SCDタイプ2は切り捨てをサポートしません。

次のいずれかを指定できます:

  • 文字列: "Operation = 'TRUNCATE'"

  • Spark SQL expr()関数は次のとおりです: expr("Operation = 'TRUNCATE'")

このパラメーターはオプションです。

column_list

except_column_list

タイプ: list

ターゲットテーブルに含める列のサブセット。column_listを使用して、含める列の完全なリストを指定します。except_column_listを使用して、除外する列を指定します。いずれかの値を文字列のリストとして宣言することも、Spark SQL col()関数として宣言することもできます:

  • column_list = ["userId", "name", "city"].

  • column_list = [col("userId"), col("name"), col("city")]

  • except_column_list = ["operation", "sequenceNum"]

  • except_column_list = [col("operation"), col("sequenceNum")

col()関数の引数に修飾子を含めることはできません。例えば、col(userId)は使えますが、col(source.userId)は使えません。

このパラメーターはオプションです。

デフォルトでは、column_listまたはexcept_column_list引数が関数に渡されない場合、ターゲットテーブル内のすべての列が含まれます。

stored_as_scd_type

タイプ:strまたは int

レコードをSCDタイプ1として保存するか、SCDタイプ2として保存するか。

SCDタイプ1の場合は1、SCDタイプ2の場合は2に設定します。

この句はオプションです。

デフォルトはSCDタイプ1です。

track_history_column_list

track_history_except_column_list

タイプ: list

ターゲット表のヒストリーについて追跡される出力列のサブセット。 track_history_column_list を使用して、追跡する列の完全なリストを指定します。track_history_except_column_list を使用して、追跡から除外する列を指定します。いずれかの値を文字列のリストとして、または Spark SQL col() 関数として宣言できます: - track_history_column_list = ["userId", "name", "city"].- track_history_column_list = [col("userId"), col("name"), col("city")] - track_history_except_column_list = ["operation", "sequenceNum"] - track_history_except_column_list = [col("operation"), col("sequenceNum")

col()関数の引数に修飾子を含めることはできません。例えば、col(userId)は使えますが、col(source.userId)は使えません。

このパラメーターはオプションです。

デフォルトでは、track_history_column_listまたはtrack_history_except_column_list引数が関数に渡されない場合、ターゲットテーブル内のすべての列が含まれます。