Delta Live Tables Python言語リファレンス

この記事には、Delta Live Tables Python プログラミング インターフェイスの詳細が記載されています。

SQL APIの詳細については、「Delta Live Tables SQL言語リファレンス」を参照してください。

オートローダーの設定に関する詳細は、「オートローダーとは?」を参照してください。

始める前に

Delta Live Tables Python インターフェースを使用してパイプラインを実装する場合の重要な考慮事項は次のとおりです。

  • Python table() 関数と view() 関数は、パイプライン更新の計画と実行中に複数回呼び出されるため、これらの関数のいずれかに副作用が発生する可能性のあるコード (たとえば、データを変更したり電子メールを送信したりするコード) を含めないでください。 予期しない動作を回避するには、データセットを定義する Python 関数に、テーブルまたはビューの定義に必要なコードのみを含める必要があります。

  • 特に、パラメータを定義する関数内で、メールの送信や外部モニタリング サービスとの統合などの操作を実行するには、イベント フックを使用します。 データセットを定義する関数にこれらの操作を実装すると、予期しない動作が発生します。

  • Python table関数とview関数はDataFrameを返す必要があります。DataFrameを操作する一部の関数はDataFrameを返さないため、使用しないでください。これらの操作には、collect()count()toPandas()save()saveAsTable()などの関数が含まれます。DataFrame変換は完全なデータフローグラフが解決された後に実行されるため、このような操作を使用すると、意図しない副作用が発生する可能性があります。

dlt Pythonモジュールをインポートする

Delta Live TablesのPython関数は、dltモジュールで定義されています。Python APIを使用して実装されたパイプラインは、このモジュールをインポートする必要があります:

import dlt

Delta Live Tablesマテリアライズドビューまたはストリーミングテーブルを作成する

Python では、Delta Live Tables は定義クエリに基づいて、データセットをマテリアライズド ビューとして更新するか、ストリーミング テーブルとして更新するかを決定します。 @tableデコレータは、マテリアライズド ビューとストリーミング テーブルの両方を定義するために使用できます。

Pythonでマテリアライズド ビューを定義するには、データソースに対して静的読み取りを実行するクエリに @table を適用します。 ストリーミング テーブルを定義するには、データ ソースに対してストリーミング読み取りを実行するクエリに@tableを適用するか、 create_streaming_table() 関数を使用します。 両方のデータセット タイプには、次の同じ構文仕様があります。

cluster_by引数を使用してリキッドクラスタリングを有効にするには、プレビューチャンネルを使用するようにパイプラインを設定する必要があります。

import dlt

@dlt.table(
  name="<name>",
  comment="<comment>",
  spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  path="<storage-location-path>",
  partition_cols=["<partition-column>", "<partition-column>"],
  cluster_by = ["<clustering-column>", "<clustering-column>"],
  schema="schema-definition",
  row_filter = "row-filter-clause",
  temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

Delta Live Tablesビューの作成

Pythonでビューを定義するには、@viewデコレータを適用します。@tableデコレータと同様に、Delta Live Tableのビューは、静的データセットまたはストリーミングデータセットに使用できます。Pythonでビューを定義するための構文を次に示します:

import dlt

@dlt.view(
  name="<name>",
  comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

例:テーブルとビューを定義する

Pythonでテーブルまたはビューを定義するには、@dlt.viewまたは@dlt.tableデコレータを関数に適用します。関数名またはnameパラメーターを使用して、テーブルまたはビューの名前を割り当てることができます。次の例では、2つの異なるデータセットを定義しています。JSONファイルを入力ソースとして受け取るtaxi_rawというビューと、入力としてtaxi_rawビューを受け取るfiltered_dataというテーブルです:

import dlt

@dlt.view
def taxi_raw():
  return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")

# Use the function name as the table name
@dlt.table
def filtered_data():
  return dlt.read("taxi_raw").where(...)

# Use the name parameter as the table name
@dlt.table(
  name="filtered_data")
def create_filtered_data():
  return dlt.read("taxi_raw").where(...)

例:同じパイプラインで定義されたデータセットにアクセスする

外部データソースからの読み取りに加えて、Delta Live Tables read()関数を使用して、同じパイプラインで定義されているデータセットにアクセスできます。次の例は、read()関数を使用してcustomers_filteredデータセットを作成する方法を示しています:

@dlt.table
def customers_raw():
  return spark.read.format("csv").load("/data/customers.csv")

@dlt.table
def customers_filteredA():
  return dlt.read("customers_raw").where(...)

spark.table()関数を使用して、同じパイプラインで定義されたデータセットにアクセスすることもできます。spark.table()関数を使用してパイプラインで定義されたデータセットにアクセスするときは、LIVE関数の引数でデータセット名の前にキーワードを追加します:

@dlt.table
def customers_raw():
  return spark.read.format("csv").load("/data/customers.csv")

@dlt.table
def customers_filteredB():
  return spark.table("LIVE.customers_raw").where(...)

例:Metastoreに登録されたテーブルからの読み取り

Hive metastoreに登録されたテーブルからデータを読み取るには、関数の引数で LIVE キーワードを省略し、オプションでテーブル名をデータベース名で修飾します。

@dlt.table
def customers():
  return spark.table("sales.customers").where(...)

Unity Catalog テーブルからの読み取りの例については、「 Unity Catalog パイプラインへのデータの取り込み」を参照してください。

例:spark.sqlを使ってデータセットにアクセスする

クエリー関数でspark.sql式を使用してデータセットを返すこともできます。内部データセットから読み取るには、データセット名の前にLIVE.を追加します:

@dlt.table
def chicago_customers():
  return spark.sql("SELECT * FROM LIVE.customers_cleaned WHERE city = 'Chicago'")

ストリーミング操作のターゲットとして使用するテーブルを作成する

create_streaming_table()関数を使用して、 apply_changes()apply_changes_from_snapshot()@append_flow出力レコードなどのストリーミング操作によって出力されるレコードのターゲット テーブルを作成します。

create_target_table()関数とcreate_streaming_live_table()関数は非推奨です。Databricksでは、create_streaming_table()関数を使用するように既存のコードを更新することをお勧めします。

cluster_by引数を使用してリキッドクラスタリングを有効にするには、プレビューチャンネルを使用するようにパイプラインを設定する必要があります。

create_streaming_table(
  name = "<table-name>",
  comment = "<comment>"
  spark_conf={"<key>" : "<value", "<key" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  partition_cols=["<partition-column>", "<partition-column>"],
  cluster_by = ["<clustering-column>", "<clustering-column>"],
  path="<storage-location-path>",
  schema="schema-definition",
  expect_all = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_drop = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_fail = {"<key>" : "<value", "<key" : "<value>"},
  row_filter = "row-filter-clause"
)

引数

name

タイプ: str

テーブル名。

このパラメーターは必須です。

comment

タイプ: str

テーブルの説明(オプション)。

spark_conf

タイプ: dict

このクエリーを実行するためのSpark構成のオプションのリスト。

table_properties

タイプ: dict

テーブルのテーブルプロパティのオプションのリスト。

partition_cols

タイプ: array

テーブルのパーティション化に使用する1つ以上の列のオプションのリスト。

cluster_by

タイプ: array

オプションで、テーブルでリキッドクラスタリングを有効にし、クラスタリングキーとして使用する列を定義します。

Delta テーブルに リキッドクラスタリングを使用する」を参照してください。

path

タイプ: str

テーブル・データのオプションのストレージ・ロケーション。 設定されていない場合は、パイプラインの保存場所がデフォルトになります。

schema

タイプ:strまたは StructType

テーブルのオプションのスキーマ定義。 スキーマは、SQL DDL 文字列または Python StructTypeを使用して定義できます。

expect_all expect_all_or_drop expect_all_or_fail

タイプ: dict

テーブルのデータ品質制約 (省略可能)。 複数のエクスペクテーションを表示します。

row_filter (パブリック プレビュー)

タイプ: str

テーブルのオプションの行フィルタ句。 行フィルターと列マスクを使用したテーブルのパブリッシュを参照してください。

テーブルの実体化方法を制御する

テーブルはまた、その実体化をさらにコントロールすることもできる:

  • partition_colsを使用してテーブル をパーティション分割 する方法を指定します。パーティショニングを使用すると、クエリーを高速化できます。

  • テーブルのプロパティは、ビューまたはテーブルを定義するときに設定できます。 Delta Live Tables テーブルのプロパティを参照してください。

  • path設定を使用して、テーブルデータの保存場所を設定します。デフォルトでは、pathが設定されていない場合、テーブルデータはパイプラインの保存場所に保存されます。

  • 生成された列 は、スキーマ定義で使用できます。例 : スキーマ列とパーティション列の指定を参照してください。

テーブルのサイズが 1 TB 未満の場合、Databricks では Delta Live Tables でデータ編成を制御することを推奨しています。 パーティション列は、テーブルがテラバイトを超えて大きくなることが予想される場合を除き、指定しないでください。

例:スキーマとパーティション列を指定する

オプションで、Python StructTypeまたはSQL DDL文字列を使用してテーブルスキーマを指定できます。DDL文字列で指定すると、定義には生成された列を含めることができます

次の例では、Python StructTypeを使用してスキーマを指定した sales というテーブルを作成します。

sales_schema = StructType([
  StructField("customer_id", StringType(), True),
  StructField("customer_name", StringType(), True),
  StructField("number_of_line_items", StringType(), True),
  StructField("order_datetime", StringType(), True),
  StructField("order_number", LongType(), True)]
)

@dlt.table(
  comment="Raw data on sales",
  schema=sales_schema)
def sales():
  return ("...")

次の例では、DDL文字列を使用してテーブルのスキーマを指定し、生成された列を定義し、パーティション列を定義します:

@dlt.table(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  partition_cols = ["order_day_of_week"])
def sales():
  return ("...")

デフォルトでは、スキーマを指定しない場合、Delta Live Tablesはtable定義からスキーマを推測します。

ソースストリーミングテーブルの変更を無視するようにストリーミングテーブルを構成する

  • skipChangeCommitsフラグは、option()関数を使用するspark.readStreamでのみ機能します。このフラグは、dlt.read_stream()関数では使用できません。

  • ソース・ストリーミング・テーブルが apply_changes() 関数のターゲットとして定義されている場合、skipChangeCommits フラグを使用することはできません。

既定では、ストリーミング テーブルには追加専用のソースが必要です。 ストリーミング テーブルが別のストリーミング テーブルをソースとして使用し、ソース ストリーミング テーブルで更新または削除 (GDPR の "忘れられる権利" 処理など) が必要な場合、ソース ストリーミング テーブルを読み取るときに skipChangeCommits フラグを設定して、それらの変更を無視できます。 このフラグの詳細については、「 更新と削除を無視する」を参照してください。

@table
def b():
   return spark.readStream.option("skipChangeCommits", "true").table("LIVE.A")

例: テーブル制約の定義

プレビュー

テーブル制約は パブリック プレビュー段階です。

スキーマを指定するときに、プライマリ・キーと外部キーを定義できます。 制約は情報提供を目的としており、強制されません。 SQL 言語リファレンスのCONSTRAINT 句を参照してください。

次の例では、プライマリ・キー制約と外部キー制約を持つテーブルを定義しています。

@dlt.table(
   schema="""
    customer_id STRING NOT NULL PRIMARY KEY,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
    CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
    """
def sales():
   return ("...")

例: 行フィルターと列マスクの定義

プレビュー

行フィルターと列マスクは パブリック プレビュー段階です。

行フィルターと列マスクを使用してマテリアライズド ビューまたはストリーミング テーブルを作成するには、 ROW FILTER 句MASK 句を使用します。 次の例は、行フィルターと列マスクの両方を使用してマテリアライズド ビューとストリーミング テーブルを定義する方法を示しています。

@dlt.table(
   schema="""
    id int COMMENT 'This is the customer ID',
    name string COMMENT 'This is the customer full name',
    region string,
    ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
    """,
  row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)"
def sales():
   return ("...")

行フィルターと列マスクの詳細については、 「行フィルターと列マスクを使用してテーブルを公開する」を参照してください。

Python Delta Live Tablesのプロパティ

以下のテーブルでは、Delta Live Tablesでテーブルやビューを定義する際に指定できるオプションやプロパティについて説明します:

cluster_by引数を使用してリキッドクラスタリングを有効にするには、プレビューチャンネルを使用するようにパイプラインを設定する必要があります。

@tableまたは@view

name

タイプ: str

テーブルまたはビューのオプションの名前。定義されていない場合、関数名がテーブル名またはビュー名として使用されます。

comment

タイプ: str

テーブルの説明(オプション)。

spark_conf

タイプ: dict

このクエリーを実行するためのSpark構成のオプションのリスト。

table_properties

タイプ: dict

テーブルのテーブルプロパティのオプションのリスト。

path

タイプ: str

テーブル・データのオプションのストレージ・ロケーション。 設定されていない場合は、パイプラインの保存場所がデフォルトになります。

partition_cols

タイプ: a collection of str

オプションのコレクション (テーブルのパーティション分割に使用する 1 つ以上の列の list など)。

cluster_by

タイプ: array

オプションで、テーブルでリキッドクラスタリングを有効にし、クラスタリングキーとして使用する列を定義します。

Delta テーブルに リキッドクラスタリングを使用する」を参照してください。

schema

タイプ:strまたは StructType

テーブルのオプションのスキーマ定義。 スキーマは、SQL DDL 文字列または Python StructTypeを使用して定義できます。

temporary

タイプ: bool

テーブルを作成しますが、テーブルのメタデータは公開しません。 temporaryキーワードは、パイプラインで使用できるがパイプラインの外部からはアクセスできないテーブルを作成するように Delta Live Tables に指示します。 処理時間を短縮するために、一時テーブルは、単一の更新だけでなく、それを作成したパイプラインの有効期間中は保持されます。

デフォルトは「False」です。

row_filter (パブリック プレビュー)

タイプ: str

テーブルのオプションの行フィルタ句。 行フィルターと列マスクを使用したテーブルのパブリッシュを参照してください。

テーブルまたはビューの定義

def <function-name>()

データセットを定義するPython関数。nameパラメーターが設定されていない場合は、<function-name>がターゲットデータセット名として使用されます。

query

Spark DatasetまたはKoalas DataFrameを返すSpark SQL文。

同じパイプラインで定義されたデータセットから完全な読み取りを実行するには、dlt.read() またはspark.table()を使用します。spark.table()関数を使用して同じパイプラインで定義されたデータセットから読み取る場合は、関数の引数のデータセット名の前にLIVEキーワードを追加します。たとえば、customersという名前のデータセットから読み取るには、次のようにします:

spark.table("LIVE.customers")

spark.table()関数を使用して、LIVEキーワードを省略し、オプションでテーブル名をデータベース名で修飾することで、Metastoreに登録されているテーブルから読み取ることもできます:

spark.table("sales.customers")

同じパイプラインで定義されたデータセットからストリーミング読み取りを実行するには、dlt.read_stream()を使用します。

spark.sql関数を使用してSQLクエリーを定義し、返されるデータセットを作成します。

PySpark構文を使用して、PythonでDelta Live Tablesクエリーを定義します。

エクスペクテーション

@expect("description", "constraint")

descriptionで識別されるデータ品質制約を宣言します。行が期待に反する場合は、その行をターゲットデータセットに含めます。

@expect_or_drop("description", "constraint")

descriptionで識別されるデータ品質制約を宣言します。行が期待に反する場合は、ターゲットデータセットから行を削除します。

@expect_or_fail("description", "constraint")

descriptionで識別されるデータ品質制約を宣言します。行が期待値に違反した場合は、ただちに実行を停止してください。

@expect_all(expectations)

1つ以上のデータ品質制約を宣言します。expectationsはPython辞書で、キーはエクスペクテーションの説明、値はエクスペクテーションの制約です。行が期待に違反する場合は、その行をターゲットデータセットに含めます。

@expect_all_or_drop(expectations)

1つ以上のデータ品質制約を宣言します。expectationsはPython辞書で、キーはエクスペクテーションの説明、値はエクスペクテーションの制約です。行が期待に違反する場合は、ターゲットデータセットから行を削除します。

@expect_all_or_fail(expectations)

1つ以上のデータ品質制約を宣言します。expectationsはPython辞書で、キーはエクスペクテーションの説明、値はエクスペクテーションの制約です。行がいずれかの期待に反した場合、直ちに実行を停止します。

Delta Live TablesでPythonを使用した変更フィードからのチェンジデータキャプチャ

apply_changes()PythonAPIDelta Live Tablesキャプチャ (CDC ) 機能を使用してチェンジデータ フィード ( CDF ) からのソース データを処理するには、 の 関数を使用します。

重要

変更を適用するターゲット ストリーミング テーブルを宣言する必要があります。 オプションで、ターゲットテーブルのスキーマを指定できます。 apply_changes()ターゲットテーブルのスキーマを指定するときは、sequence_byフィールドと同じデータ型の__START_AT列と__END_AT列を含める必要があります。

必要なターゲット テーブルを作成するには、Delta Live Tables Python インターフェイスのcreate_streaming_table()関数を使用できます。

apply_changes(
  target = "<target-table>",
  source = "<data-source>",
  keys = ["key1", "key2", "keyN"],
  sequence_by = "<sequence-column>",
  ignore_null_updates = False,
  apply_as_deletes = None,
  apply_as_truncates = None,
  column_list = None,
  except_column_list = None,
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
)

APPLY CHANGES処理の場合、 INSERTおよびUPDATEイベントのデフォルトの動作は、ソースから CDC イベントをアップサートすることです。つまり、指定されたキーに一致するターゲット テーブル内の行を更新するか、一致するレコードがターゲット テーブルに存在しない場合は新しい行を挿入します。 DELETEイベントの処理は、APPLY AS DELETE WHEN条件で指定できます。

変更フィードを使用したCDC処理の詳細については、 「APPLY CHANGES APIs : Delta Live Tablesによるチェンジデータキャプチャの簡素化」を参照してください。 apply_changes()関数の使用例については、 「例: CDF ソース データを使用した SCD タイプ 1 および SCD タイプ 2 の処理」を参照してください。

重要

変更を適用するターゲット ストリーミング テーブルを宣言する必要があります。 オプションで、ターゲットテーブルのスキーマを指定できます。 apply_changesターゲット表スキーマを指定する場合は、sequence_byフィールドと同じデータ・タイプの__START_AT列と__END_AT列を含める必要があります。

APPLY CHANGES APIs参照してください: Delta Live Tablesを使用してチェンジデータキャプチャを簡素化します

引数

target

タイプ: str

更新するテーブルの名前。 create_streaming_table() 関数を使用して、 apply_changes() 関数を実行する前にターゲット表を作成できます。

このパラメーターは必須です。

source

タイプ: str

CDCレコードを含むデータソース。

このパラメーターは必須です。

keys

タイプ: list

ソースデータ内の行を一意に識別する列または列の組み合わせ。これは、どのCDCイベントがターゲットテーブル内の特定のレコードに適用されるかを識別するために使用されます。

次のいずれかを指定できます:

  • 文字列のリスト: ["userId", "orderId"]

  • Spark SQL col()関数の一覧を次に示します: [col("userId"), col("orderId"]

col()関数の引数に修飾子を含めることはできません。例えば、col(userId)は使えますが、col(source.userId)は使えません。

このパラメーターは必須です。

sequence_by

タイプ:strまたは col()

ソースデータ内のCDCイベントの論理順序を指定する列名。Delta Live Tablesは、このシーケンスを使用して、順序どおりに到着しない変更イベントを処理します。

次のいずれかを指定できます:

  • 文字列: "sequenceNum"

  • Spark SQL col()関数は次のとおりです: col("sequenceNum")

col()関数の引数に修飾子を含めることはできません。例えば、col(userId)は使えますが、col(source.userId)は使えません。

指定する列は、ソート可能なデータ・タイプでなければなりません。

このパラメーターは必須です。

ignore_null_updates

タイプ: bool

ターゲットカラムのサブセットを含む更新の取り込みを許可します。 CDC イベントが既存の行と一致し、 ignore_null_updatesTrueの場合、 nullの列はターゲット内の既存の値を保持します。 これは、値が nullのネストされた列にも適用されます。 ignore_null_updatesFalseの場合、既存の値はnull値で上書きされます。

このパラメーターはオプションです。

デフォルトはFalseです。

apply_as_deletes

タイプ:strまたは expr()

CDC イベントをアップサートではなく DELETE として扱うタイミングを指定します。 順不同のデータを処理するために、削除された行は基になる Delta テーブルに廃棄石として一時的に保持され、これらの廃棄石を除外するビューがメタストアに作成されます。 保持間隔は、 pipelines.cdc.tombstoneGCThresholdInSeconds テーブル プロパティを使用して構成できます。

次のいずれかを指定できます:

  • 文字列: "Operation = 'DELETE'"

  • Spark SQL expr()関数は次のとおりです: expr("Operation = 'DELETE'")

このパラメーターはオプションです。

apply_as_truncates

タイプ:strまたは expr()

CDCイベントを完全なテーブルTRUNCATEとして扱う必要がある場合を指定します。この句はターゲットテーブルの完全な切り捨てをトリガーするため、この機能が必要な特定のユースケースにのみ使用してください。

apply_as_truncates 引数はSCDタイプ 1 でのみサポートされます。SCD タイプ 2 SCD切り捨て操作をサポートしていません。

次のいずれかを指定できます:

  • 文字列: "Operation = 'TRUNCATE'"

  • Spark SQL expr()関数は次のとおりです: expr("Operation = 'TRUNCATE'")

このパラメーターはオプションです。

column_list

except_column_list

タイプ: list

ターゲットテーブルに含める列のサブセット。column_listを使用して、含める列の完全なリストを指定します。except_column_listを使用して、除外する列を指定します。いずれかの値を文字列のリストとして宣言することも、Spark SQL col()関数として宣言することもできます:

  • column_list = ["userId", "name", "city"].

  • column_list = [col("userId"), col("name"), col("city")]

  • except_column_list = ["operation", "sequenceNum"]

  • except_column_list = [col("operation"), col("sequenceNum")

col()関数の引数に修飾子を含めることはできません。例えば、col(userId)は使えますが、col(source.userId)は使えません。

このパラメーターはオプションです。

デフォルトでは、column_listまたはexcept_column_list引数が関数に渡されない場合、ターゲットテーブル内のすべての列が含まれます。

stored_as_scd_type

タイプ:strまたは int

レコードをSCDタイプ1として保存するか、SCDタイプ2として保存するか。

SCDタイプ1の場合は1、SCDタイプ2の場合は2に設定します。

この句はオプションです。

デフォルトはSCDタイプ1です。

track_history_column_list

track_history_except_column_list

タイプ: list

ターゲット表のヒストリーについて追跡される出力列のサブセット。 track_history_column_list を使用して、追跡する列の完全なリストを指定します。track_history_except_column_list を使用して、追跡から除外する列を指定します。いずれかの値を文字列のリストとして、または Spark SQL col() 関数として宣言できます: - track_history_column_list = ["userId", "name", "city"].- track_history_column_list = [col("userId"), col("name"), col("city")] - track_history_except_column_list = ["operation", "sequenceNum"] - track_history_except_column_list = [col("operation"), col("sequenceNum")

col()関数の引数に修飾子を含めることはできません。例えば、col(userId)は使えますが、col(source.userId)は使えません。

このパラメーターはオプションです。

デフォルトでは、track_history_column_listまたはtrack_history_except_column_list引数が関数に渡されない場合、ターゲットテーブル内のすべての列が含まれます。

Delta Live TablesでPythonを使用したデータベースからのチェンジデータキャプチャ

プレビュー

APPLY CHANGES FROM SNAPSHOT API はパブリック プレビュー段階です。

apply_changes_from_snapshot()PythonAPIDelta Live Tablesチェンジデータキャプチャ (CDC )機能を使用してデータベース トークンからソース データを処理するには、 の 関数を使用します。

重要

変更を適用するターゲットストリーミングテーブルを宣言する必要があります。オプションでターゲットテーブルのスキーマを指定できます。apply_changes_from_snapshot()ターゲットテーブルのスキーマを指定する場合は、sequence_byフィールドと同じデータ型の__START_AT列と__END_AT列も含める必要があります。

必要なターゲット テーブルを作成するには、Delta Live Tables Python インターフェイスのcreate_streaming_table()関数を使用できます。

apply_changes_from_snapshot(
  target = "<target-table>",
  source = Any,
  keys = ["key1", "key2", "keyN"],
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
) -> None

APPLY CHANGES FROM SNAPSHOT処理の場合、デフォルトの動作では、同じキーを持つ一致するレコードがターゲットに存在しない場合に新しい行を挿入します。 一致するレコードが存在する場合は、行のいずれかの値が変更された場合にのみ更新されます。 ターゲットには存在するがソースには存在しないキーを持つ行は削除されます。

サインインを使用したCDC処理の詳細については、 「APPLY CHANGES APIs : Delta Live Tablesによるチェンジデータキャプチャの簡素化」を参照してください。 apply_changes_from_snapshot()関数の使用例については、 定期的なスナップショットの取り込み履歴スナップショットの取り込みの例を参照してください。

引数

target

タイプ: str

更新するテーブルの名前。 create_streaming_table() 関数を使用して、apply_changes() 関数を実行する前にターゲット テーブルを作成できます。

このパラメーターは必須です。

source

タイプ:strまたは lambda function

定期的にスナップショットを作成するテーブルまたはビューの名前、または処理するスナップショット DataFrame とスナップショットのバージョンを返す Python ラムダ関数のいずれか。 ソース引数の実装を参照してください。

このパラメーターは必須です。

keys

タイプ: list

ソースデータ内の行を一意に識別する列または列の組み合わせ。これは、どのCDCイベントがターゲットテーブル内の特定のレコードに適用されるかを識別するために使用されます。

次のいずれかを指定できます:

  • 文字列のリスト: ["userId", "orderId"]

  • Spark SQL col()関数の一覧を次に示します: [col("userId"), col("orderId"]

col()関数の引数に修飾子を含めることはできません。例えば、col(userId)は使えますが、col(source.userId)は使えません。

このパラメーターは必須です。

stored_as_scd_type

タイプ:strまたは int

レコードをSCDタイプ1として保存するか、SCDタイプ2として保存するか。

SCDタイプ1の場合は1、SCDタイプ2の場合は2に設定します。

この句はオプションです。

デフォルトはSCDタイプ1です。

track_history_column_list

track_history_except_column_list

タイプ: list

ターゲット表のヒストリーについて追跡される出力列のサブセット。 track_history_column_list を使用して、追跡する列の完全なリストを指定します。track_history_except_column_list を使用して、追跡から除外する列を指定します。いずれかの値を文字列のリストとして、または Spark SQL col() 関数として宣言できます: - track_history_column_list = ["userId", "name", "city"].- track_history_column_list = [col("userId"), col("name"), col("city")] - track_history_except_column_list = ["operation", "sequenceNum"] - track_history_except_column_list = [col("operation"), col("sequenceNum")

col()関数の引数に修飾子を含めることはできません。例えば、col(userId)は使えますが、col(source.userId)は使えません。

このパラメーターはオプションです。

デフォルトでは、track_history_column_listまたはtrack_history_except_column_list引数が関数に渡されない場合、ターゲットテーブル内のすべての列が含まれます。

source引数を実装する

apply_changes_from_snapshot() 関数には source 引数が含まれています。過去のサインインを処理する場合、 引数は、処理するサインイン データを含む とサインイン バージョンの sourcePython2 つの値を 関数に返す ラムダ関数である必要があります。apply_changes_from_snapshot()PythonDataFrame

以下は、ラムダ関数のシグネチャです。

lambda Any => Optional[(DataFrame, Any)]
  • ラムダ関数への引数は、最後に処理されたスナップショット バージョンです。

  • ラムダ関数の戻り値はNoneまたは 2 つの値のタプルです。タプルの最初の値は、処理されるスナップショットを含む DataFrame です。 タプルの 2 番目の値は、スナップショットの論理的な順序を表すスナップショット バージョンです。

ラムダ関数を実装して呼び出す例を次に示します。

def next_snapshot_and_version(latest_snapshot_version):
 if latest_snapshot_version is None:
   return (spark.read.load("filename.csv"), 1)
 else:
   return None

apply_changes_from_snapshot(
  # ...
  source = next_snapshot_and_version,
  # ...
)

Delta Live Tables apply_changes_from_snapshot() 関数を含む Pipeline がトリガーされるたびに、次の手順を実行します。

  1. next_snapshot_and_version関数を実行して、次のスナップショット DataFrame と対応するスナップショット バージョンを読み込みます。

  2. DataFrame が返されない場合、実行は終了し、パイプラインの更新は完了としてマークされます。

  3. 新しいスナップショットの変更を検出し、それをターゲット テーブルに段階的に適用します。

  4. ステップ 1 に戻り、次のエンドポイントとそのバージョンを読み込みます。

制限事項

Delta Live Tables Python インターフェースには次の制限があります。

pivot()関数はサポートされていません。Sparkのpivotオペレーションでは、出力のスキーマを計算するために入力データの積極的な読み込みが必要です。この機能は、Delta Live Tablesではサポートされていません。