メインコンテンツまでスキップ

DLT Python 言語リファレンス

この記事では、DLT Python プログラミング インターフェイスについて詳しく説明します。

SQL APIの情報については、DLT SQL language reference を参照してください。

Auto Loaderの構成の詳細については、「Auto Loaderとは」を参照してください。

始める前に

DLT Python インターフェイスを使用してパイプラインを実装する場合の重要な考慮事項を次に示します。

  • Python table() 関数と view() 関数は、パイプライン更新の計画および実行時に複数回呼び出されるため、これらの関数のいずれかに副作用のあるコード (データを変更するコードや電子メールを送信するコードなど) を含めないでください。予期しない動作を避けるために、データセットを定義する Python 関数には、テーブルまたはビューの定義に必要なコードのみを含める必要があります。
  • Eメール の送信や外部モニタリング サービスとの統合などの操作を行うには、特にデータセットを定義する関数でイベント フックを使用します。 データセットを定義する関数にこれらの操作を実装すると、予期しない動作が発生します。
  • Python table関数とview関数はデータフレームを返す必要があります。データフレームを操作する一部の関数はデータフレームを返さないため、使用しないでください。これらの操作には、collect()count()toPandas()save()saveAsTable()などの関数が含まれます。データフレーム変換は完全なデータフローグラフが解決 された後に 実行されるため、このような操作を使用すると、意図しない副作用が発生する可能性があります。

dlt Python モジュールをインポートする

DLT Python 関数は dlt モジュールで定義されています。Python API で実装されたパイプラインでは、次のモジュールをインポートする必要があります。

Python
import dlt

DLT マテリアライズドビュー または ストリーミングテーブルを作成する

Python では、DLT は、定義クエリに基づいて、データセットをマテリアライズドビューとして更新するかストリーミングテーブルとして更新するかを決定します。@tableデコレータを使用して、マテリアライズドビューとストリーミングテーブルの両方を定義できます。

Python でマテリアライズドビューを定義するには、データソースに対して静的読み取りを実行するクエリに @table を適用します。 ストリーミングテーブルを定義するには、データソースに対してストリーミング読み取りを実行するクエリに @table を適用するか、 create_streaming_table() 関数を使用します。 どちらのデータセットタイプも、次のように同じ構文仕様を持っています。

Python
import dlt

@dlt.table(
name="<name>",
comment="<comment>",
spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
path="<storage-location-path>",
partition_cols=["<partition-column>", "<partition-column>"],
cluster_by = ["<clustering-column>", "<clustering-column>"],
schema="schema-definition",
row_filter = "row-filter-clause",
temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)

DLT ビューの作成

Python でビューを定義するには、 @view デコレータを適用します。@table デコレーターと同様に、DLT のビューは静的データセットまたはストリーミングデータセットに使用できます。以下は、Python でビューを定義するための構文です。

Python
import dlt

@dlt.view(
name="<name>",
comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)

例: テーブルとビューの定義

Pythonでテーブルまたはビューを定義するには、@dlt.viewまたは@dlt.tableデコレータを関数に適用します。関数名またはnameパラメーターを使用して、テーブルまたはビューの名前を割り当てることができます。次の例では、2つの異なるデータセットを定義しています。JSONファイルを入力ソースとして受け取るtaxi_rawというビューと、入力としてtaxi_rawビューを受け取るfiltered_dataというテーブルです:

Python
import dlt

@dlt.view
def taxi_raw():
return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")

# Use the function name as the table name
@dlt.table
def filtered_data():
return spark.read.table("taxi_raw").where(...)

# Use the name parameter as the table name
@dlt.table(
name="filtered_data")
def create_filtered_data():
return spark.read.table("taxi_raw").where(...)

例: 同じパイプラインで定義されたデータセットにアクセスする

注記

dlt.read() 関数と dlt.read_stream() 関数は引き続き使用でき、DLT Python インターフェイスで完全にサポートされていますが、Databricks では、次の理由から、常に spark.read.table() 関数と spark.readStream.table() 関数を使用することをお勧めします。

  • spark 関数は、外部ストレージ内のデータセットや他のパイプラインで定義されたデータセットなど、内部データセットと外部データセットの読み取りをサポートします。dlt 関数は、内部データセットの読み取りのみをサポートします。
  • spark 関数は、読み取り操作に対するオプション (skipChangeCommitsなど) の指定をサポートしています。オプションの指定は、 dlt 関数ではサポートされていません。

同じパイプラインで定義されたデータセットにアクセスするには、 spark.read.table() 関数または spark.readStream.table() 関数を使用します。

Python
@dlt.table
def customers_raw():
return spark.read.format("csv").load("/data/customers.csv")

@dlt.table
def customers_filteredA():
return spark.read.table("customers_raw").where(...)
注記

パイプライン内のビューまたはテーブルに対してクエリを実行する場合は、カタログとスキーマを直接指定することも、パイプラインで設定されたデフォルトを使用することもできます。 この例では、 customersテーブルは、パイプライン用に構成されたデフォルト カタログとスキーマから読み取られます。

例: メタストアに登録されているテーブルからの読み取り

Hive metastoreに登録されているテーブルからデータを読み取るには、関数引数でテーブル名をデータベース名で修飾できます。

Python
@dlt.table
def customers():
return spark.read.table("sales.customers").where(...)

Unity Catalog テーブルから読み取る例については、「 Unity Catalog パイプラインにデータを取り込む」を参照してください。

例: 次を使用してデータセットにアクセスする spark.sql

クエリ関数で spark.sql 式を使用してデータセットを返すこともできます。 内部データセットから読み取るには、デフォルトのカタログとスキーマを使用するために名前を修飾しないままにするか、それらを先頭に付けることができます。

Python
@dlt.table
def chicago_customers():
return spark.sql("SELECT * FROM catalog_name.schema_name.customers_cleaned WHERE city = 'Chicago'")

マテリアライズドビューまたはストリーミングテーブルからのレコードの完全削除

GDPR コンプライアンスなど、削除ベクトルが有効になっているマテリアライズドビューまたはストリーミングテーブルからレコードを完全に削除するには、オブジェクトの基になる Delta テーブルに対して追加の操作を実行する必要があります。 マテリアライズドビューからのレコードの削除を確実にするには、 削除ベクトルが有効になっているマテリアライズドビューからのレコードの完全削除を参照してください。 ストリーミングテーブルからレコードを確実に削除するには、「 ストリーミングテーブルからレコードを完全に削除する」を参照してください。

DLT sink API を使用して外部イベント ストリーミング サービスまたは Delta テーブルに書き込む

備考

プレビュー

DLT sink API は パブリック プレビュー段階です。

注記
  • 完全な更新更新を実行しても、シンクからデータはクリアされません。再処理されたデータはシンクに追加され、既存のデータは変更されません。
  • DLT のエクスペクテーションは、 sink API ではサポートされていません。

Apache Kafka や Azure Event Hubs などのイベント ストリーミング サービスに書き込むか、DLT パイプラインから Delta テーブルに書き込むには、dlt Python モジュールに含まれている create_sink() 関数を使用します。create_sink() 関数を使用してシンクを作成した後、追加フローでシンクを使用してデータをシンクに書き込みます。追加フローは、 create_sink() 関数でサポートされている唯一のフロー タイプです。 その他のフロー タイプ( apply_changesなど)はサポートされていません。

次に、 create_sink() 関数を使用してシンクを作成するための構文を示します。

Python
create_sink(<sink_name>, <format>, <options>)

引数

name タイプ: str シンクを識別し、シンクの参照と管理に使用される文字列。 シンク名は、パイプラインの一部であるノートブックやモジュールなど、すべてのソース コードを含め、パイプラインに対して一意である必要があります。 このパラメーターは必須です。

format タイプ: str 出力形式 ( kafka または delta) を定義する文字列。 このパラメーターは必須です。

options タイプ: dict シンクオプションのオプションのリストで、 {"key": "value"}形式で、キーと値は両方とも文字列です。 Kafka シンクと Delta シンクでサポートされているすべての Databricks Runtime オプションがサポートされています。 Kafka のオプションについては、「 Kafka 構造化ストリーミング ライターの構成」を参照してください。 Delta のオプションについては、「 シンクとしての Delta テーブル」を参照してください。

例: create_sink() 関数を使用した Kafka シンクの作成

Python
create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "host:port",
"topic": "my_topic"
}
)

例: create_sink() 関数とファイル・システム・パスを持つ Delta シンクの作成

次の例では、ファイル システム パスをテーブルに渡すことで Delta テーブルに書き込むシンクを作成します。

Python
create_sink(
"my_delta_sink",
"delta",
{ "path": "//path/to/my/delta/table" }
)

例: create_sink() 関数と Unity Catalog テーブル名を使用して Delta シンクを作成する

注記

Deltaシンクは、Unity Catalog外部およびマネージドテーブルとHive metastoreマネージドテーブルをサポートします。テーブル名は完全修飾名である必要があります。 たとえば、 Unity Catalog テーブルでは、<catalog>.<schema>.<table> という 3 層の識別子を使用する必要があります。 Hive metastore テーブルでは<schema>.<table>を使用する必要があります。

次の例では、Unity Catalog のテーブルの名前を渡すことで Delta テーブルに書き込むシンクを作成します。

Python
create_sink(
"my_delta_sink",
"delta",
{ "tableName": "my_catalog.my_schema.my_table" }
)

例: 追加フローを使用して Delta シンクに書き込む

次の例では、Delta テーブルに書き込むシンクを作成し、そのシンクに書き込む追加フローを作成します。

Python
create_sink("my_sink", "delta", {"path": "/tmp/delta_sink"})

@append_flow(name = "flow", target = "my_sink")
def flowFunc():
return <streaming-query>

例: 追加フローを使用して Kafka シンクに書き込む

次の例では、Kafka トピックに書き込むシンクを作成し、そのシンクに書き込む追加フローを作成します。

Python
create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "host:port",
"topic": "my_topic"
}
)

@append_flow(name = "flow", target = "my_kafka_sink")
def myFlow():
return read_stream("xxx").select(F.to_json(F.struct("*")).alias("value"))

Kafka に書き込まれる データフレーム のスキーマには、「 Kafka 構造化ストリーミング ライターの構成」で指定されている列が含まれている必要があります。

ストリーミング操作のターゲットとして使用するテーブルを作成します

create_streaming_table() 関数を使用して、apply_changes()、apply_changes_from_snapshot()、@append_flow 出力レコードなど、ストリーミング操作によって出力されるレコードのターゲット テーブルを作成します。

注記

create_target_table()関数とcreate_streaming_live_table()関数は非推奨です。Databricksでは、create_streaming_table()関数を使用するように既存のコードを更新することをお勧めします。

Python
create_streaming_table(
name = "<table-name>",
comment = "<comment>",
spark_conf={"<key>" : "<value", "<key" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
partition_cols=["<partition-column>", "<partition-column>"],
cluster_by = ["<clustering-column>", "<clustering-column>"],
path="<storage-location-path>",
schema="schema-definition",
expect_all = {"<key>" : "<value", "<key" : "<value>"},
expect_all_or_drop = {"<key>" : "<value", "<key" : "<value>"},
expect_all_or_fail = {"<key>" : "<value", "<key" : "<value>"},
row_filter = "row-filter-clause"
)

引数

name タイプ: str テーブル名。 このパラメーターは必須です。

comment タイプ: str テーブルの説明(オプション)。

spark_conf タイプ: dict このクエリーを実行するためのSpark構成のオプションのリスト。

table_properties タイプ: dict テーブルの テーブルプロパティ のオプションのリスト。

partition_cols タイプ: array テーブルのパーティション化に使用する1つ以上の列のオプションのリスト。

cluster_by タイプ: array オプションで、テーブルでリキッドクラスタリングを有効にし、クラスタリングキーとして使用する列を定義します。 「Deltaテーブルにリキッドクラスタリングを使用する」を参照してください。

path タイプ: str テーブル・データのオプションのストレージ・ロケーション。 設定されていない場合、システムはデフォルトでパイプラインのストレージロケーションになります。

schema タイプ:strまたは StructType テーブルのオプションのスキーマ定義。 スキーマは、SQL DDL 文字列として定義することも、Python を使用して定義することもできます StructType.

expect_all expect_all_or_drop expect_all_or_fail タイプ: dict テーブルのデータ品質制約 (オプション)。 複数の期待値を表示します。

row_filter (パブリック プレビュー) タイプ: str テーブルのオプションの行フィルタ句。 行フィルターと列マスクを使用したテーブルのパブリッシュを参照してください。

テーブルの具体化方法を制御する

テーブルはまた、その実体化をさらにコントロールすることもできる:

注記

サイズが 1 TB 未満のテーブルの場合、Databricks では DLT でデータ編成を制御できるようにすることをお勧めします。パーティション列は、テーブルがテラバイトを超えて大きくなることが予想される場合を除き、指定しないでください。

例: スキーマとクラスター列を指定する

オプションで、Python StructType または SQL DDL 文字列を使用してテーブルスキーマを指定できます。 DDL 文字列で指定すると、 定義に生成列を含めることができます。

次の例では、Python StructTypeを使用して指定されたスキーマを持つ sales というテーブルを作成します。

Python
sales_schema = StructType([
StructField("customer_id", StringType(), True),
StructField("customer_name", StringType(), True),
StructField("number_of_line_items", StringType(), True),
StructField("order_datetime", StringType(), True),
StructField("order_number", LongType(), True)]
)

@dlt.table(
comment="Raw data on sales",
schema=sales_schema)
def sales():
return ("...")

次の例では、DDL 文字列を使用してテーブルのスキーマを指定し、生成されたカラムを定義し、クラスタリングカラムを定義します。

Python
@dlt.table(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
cluster_by = ["order_day_of_week", "customer_id"])
def sales():
return ("...")

デフォルトでは、スキーマを指定しない場合、DLT は table 定義からスキーマを推測します。

例: パーティション列の指定

次の例では、DDL文字列を使用してテーブルのスキーマを指定し、生成された列を定義し、パーティション列を定義します:

Python
@dlt.table(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
partition_cols = ["order_day_of_week"])
def sales():
return ("...")

例: テーブル制約の定義

備考

プレビュー

テーブル制約は パブリック プレビュー段階です。

スキーマを指定するときに、プライマリ・キーと外部キーを定義できます。 制約は情報提供を目的としており、強制されません。 SQL 言語リファレンスの CONSTRAINT 句 を参照してください。

次の例では、プライマリ・キー制約と外部キー制約を持つテーブルを定義しています。

Python
@dlt.table(
schema="""
customer_id STRING NOT NULL PRIMARY KEY,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
"""
def sales():
return ("...")

例: 行フィルターと列マスクの定義

備考

プレビュー

行フィルターと列マスクは パブリック プレビュー段階です。

ロー・フィルターとカラム・マスクを使用してマテリアライズドビュー またはストリーミング・テーブルを作成するには、 ROW FILTER 句MASK 句を使用します。 次の例は、マテリアライズドビューとストリーミングテーブルを行フィルタと列マスクの両方を使用して定義する方法を示しています。

Python
@dlt.table(
schema="""
id int COMMENT 'This is the customer ID',
name string COMMENT 'This is the customer full name',
region string,
ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
""",
row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)"
def sales():
return ("...")

行フィルターと列マスクの詳細については、「 行フィルターと列マスクを使用してテーブルを発行する」を参照してください。

ソース ストリーミングテーブルの変更を無視するようにストリーミングテーブルを構成する

注記
  • skipChangeCommitsフラグは、option()関数を使用するspark.readStreamでのみ機能します。このフラグは、dlt.read_stream()関数では使用できません。
  • ソース ストリーミングテーブルが apply_changes() 関数のターゲットとして定義されている場合は、skipChangeCommits フラグを使用できません。

デフォルトでは、ストリーミングテーブルには追加専用ソースが必要です。 ストリーミングテーブルが別のストリーミングテーブルをソースとして使用し、ソース ストリーミングテーブルが更新または削除 ( GDPR "忘れられる権利" 処理など) を必要とする場合、ソース ストリーミングテーブルを読み取るときにskipChangeCommitsフラグを設定して、それらの変更を無視できます。 このフラグの詳細については、「 更新と削除を無視する」を参照してください。

Python
@table
def b():
return spark.readStream.option("skipChangeCommits", "true").table("A")

Python DLT プロパティ

次の表では、DLT を使用してテーブルとビューを定義する際に指定できるオプションとプロパティについて説明します。

@tableまたは@view

name タイプ: str テーブルまたはビューのオプションの名前。定義されていない場合、関数名がテーブル名またはビュー名として使用されます。

comment タイプ: str テーブルの説明(オプション)。

spark_conf タイプ: dict このクエリーを実行するためのSpark構成のオプションのリスト。

table_properties タイプ: dict テーブルの テーブルプロパティ のオプションのリスト。

path タイプ: str テーブル・データのオプションのストレージ・ロケーション。 設定されていない場合、システムはデフォルトでパイプラインのストレージロケーションになります。

partition_cols タイプ: a collection of str オプションのコレクション (テーブルのパーティション分割に使用する 1 つ以上の列の list など)。

cluster_by タイプ: array オプションで、テーブルでリキッドクラスタリングを有効にし、クラスタリングキーとして使用する列を定義します。 「Deltaテーブルにリキッドクラスタリングを使用する」を参照してください。

schema タイプ:strまたは StructType テーブルのオプションのスキーマ定義。 スキーマは、SQL DDL 文字列として、または Python StructTypeで定義できます。

temporary タイプ: bool テーブルを作成しますが、テーブルのメタデータは公開しません。 temporary キーワードは、パイプラインで使用できるが、パイプラインの外部からアクセスしてはならないテーブルを作成するように DLT に指示します。処理時間を短縮するために、一時テーブルは、1 回の更新だけでなく、それを作成するパイプラインの有効期間中保持されます。 デフォルトは「False」です。

row_filter (パブリック プレビュー) タイプ: str テーブルのオプションの行フィルタ句。 行フィルターと列マスクを使用したテーブルのパブリッシュを参照してください。

テーブルまたはビューの定義

def <function-name>() データセットを定義するPython関数。nameパラメーターが設定されていない場合は、<function-name>がターゲットデータセット名として使用されます。

query Spark DatasetまたはKoalas データフレームを返すSpark SQL文。 dlt.read() または spark.read.table() を使用して、同じパイプラインで定義されているデータセットから完全な読み取りを実行します。外部データセットを読み取るには、 spark.read.table() 関数を使用します。 dlt.read()を使用して外部データセットを読み取ることはできません。spark.read.table() を使用して内部データセット、現在のパイプラインの外部で定義されたデータセットを読み取ることができ、データの読み取りオプションを指定できるため、Databricks では dlt.read() 関数の代わりに Databricks を使用することをお勧めします。 パイプラインでデータセットを定義すると、デフォルトによって、パイプライン構成で定義されたカタログとスキーマが使用されます。 spark.read.table() 関数を使用すると、パイプラインで定義されたデータセットを修飾なしで読み取ることができます。たとえば、 customersという名前のデータセットから読み取るには、次のようにします。 spark.read.table("customers") また、 spark.read.table() 関数を使用して、メタストアに登録されているテーブルから読み取るには、必要に応じてテーブル名をデータベース名で修飾します。 spark.read.table("sales.customers") dlt.read_stream() または spark.readStream.table() を使用して、同じパイプラインで定義されたデータセットからストリーミング読み取りを実行します。外部データセットからストリーミング読み取りを実行するには、 spark.readStream.table() 機能。 spark.readStream.table() を使用して内部データセット、現在のパイプラインの外部で定義されたデータセットを読み取ることができ、データの読み取りオプションを指定できるため、Databricks では dlt.read_stream() 関数の代わりに Databricks を使用することをお勧めします。 SQL 構文を使用して DLT table 関数でクエリを定義するには、 spark.sql 関数を使用します。例: spark.sqlを使用したデータセットへのアクセスを参照してください。Python を使用して DLT table 関数でクエリを定義するには、 PySpark 構文を使用します。

エクスペクテーション

@expect("description", "constraint") 次で識別されるデータ品質制約を宣言します。 description.行がエクスペクテーションに違反している場合は、その行をターゲットデータセットに含めます。

@expect_or_drop("description", "constraint") 次で識別されるデータ品質制約を宣言します。 description.行がエクスペクテーションに違反している場合は、ターゲット データセットから行を削除します。

@expect_or_fail("description", "constraint") 次で識別されるデータ品質制約を宣言します。 description.行がエクスペクテーションに違反している場合は、すぐに実行を停止します。

@expect_all(expectations) 1つ以上のデータ品質制約を宣言します。 expectations は Python ディクショナリで、キーはエクスペクテーションの説明、値はエクスペクテーション制約です。 行が予想のいずれかに違反している場合は、ターゲットデータセットに行を含めます。

@expect_all_or_drop(expectations) 1つ以上のデータ品質制約を宣言します。 expectations は Python ディクショナリで、キーはエクスペクテーションの説明、値はエクスペクテーション制約です。 行が予想のいずれかに違反している場合は、ターゲットデータセットから行を削除します。

@expect_all_or_fail(expectations) 1つ以上のデータ品質制約を宣言します。 expectations は Python ディクショナリで、キーはエクスペクテーションの説明、値はエクスペクテーション制約です。 行が予想のいずれかに違反した場合は、すぐに実行を停止します。

DLT の Python を使用した変更フィードからのチェンジデータキャプチャ

Python APIの apply_changes() 関数を使用して、DLT チェンジデータキャプチャ (CDC) 機能を使用して、チェンジデータフィード (CDF) からのソース データを処理します。

important

変更を適用するターゲット ストリーミングテーブルを宣言する必要があります。 オプションで、ターゲットテーブルのスキーマを指定できます。 apply_changes()ターゲットテーブルのスキーマを指定するときは、sequence_byフィールドと同じデータ型の__START_AT列と__END_AT列を含める必要があります。

必要なターゲットテーブルを作成するには、DLT Pythonインターフェースの create_streaming_table() 関数を使用できます。

Python
apply_changes(
target = "<target-table>",
source = "<data-source>",
keys = ["key1", "key2", "keyN"],
sequence_by = "<sequence-column>",
ignore_null_updates = False,
apply_as_deletes = None,
apply_as_truncates = None,
column_list = None,
except_column_list = None,
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
)
注記

APPLY CHANGES処理の場合、INSERTイベントとUPDATEイベントのデフォルトの動作は、ソースから CDC イベント を更新する ことです。指定したキーに一致するターゲットテーブルの行を更新するか、一致するレコードがターゲットテーブルに存在しない場合は新しい行を挿入します。DELETEイベントの処理は、APPLY AS DELETE WHEN条件で指定できます。

チェンジ フィードを使用した CDC 処理の詳細については、「 APPLY CHANGES APIs: DLT によるチェンジデータキャプチャの簡略化」を参照してください。 apply_changes() 関数の使用例については、例: CDF ソース・データを使用した SCD タイプ 1 および SCD タイプ 2 の処理を参照してください。

important

変更を適用するターゲット ストリーミングテーブルを宣言する必要があります。 オプションで、ターゲットテーブルのスキーマを指定できます。 apply_changesターゲット表スキーマを指定する場合は、sequence_byフィールドと同じデータ・タイプの__START_AT列と__END_AT列を含める必要があります。

APPLY CHANGES APIs: DLTによるチェンジデータキャプチャの簡素化」を参照してください。

引数

target タイプ: str 更新するテーブルの名前。 create_streaming_table() 関数を使用して、apply_changes() 関数を実行する前にターゲット テーブルを作成できます。 このパラメーターは必須です。

source タイプ: str CDCレコードを含むデータソース。 このパラメーターは必須です。

keys タイプ: list ソースデータ内の行を一意に識別する列または列の組み合わせ。これは、どのCDCイベントがターゲットテーブル内の特定のレコードに適用されるかを識別するために使用されます。 次のいずれかを指定できます: - 文字列のリスト: ["userId", "orderId"] - Spark SQL col() 関数の一覧: [col("userId"), col("orderId"] col()関数の引数に修飾子を含めることはできません。例えば、col(userId)は使えますが、col(source.userId)は使えません。 このパラメーターは必須です。

sequence_by タイプ: strcol() 、または struct() ソース・データ内の CDC イベントの論理的な順序を指定する列名。DLT は、このシーケンスを使用して、順不同で到着した変更イベントを処理します。 次のいずれかを指定できます: - 文字列: "sequenceNum" - Spark SQL col() 関数: col("sequenceNum") - 複数の列を組み合わせて同順位を破る struct() : struct("timestamp_col", "id_col")、最初に最初の構造体フィールドで並べ替えられ、次に同点の場合は 2 番目のフィールドで並べ替えられます。 col()関数の引数に修飾子を含めることはできません。例えば、col(userId)は使えますが、col(source.userId)は使えません。 指定する列は、ソート可能なデータ・タイプでなければなりません。 このパラメーターは必須です。

ignore_null_updates タイプ: bool ターゲットカラムのサブセットを含む更新の取り込みを許可します。 CDC イベントが既存の行と一致し、 ignore_null_updatesTrueの場合、 null を持つ列はターゲット内の既存の値を保持します。 これは、値が nullのネストされた列にも適用されます。 ignore_null_updatesFalseの場合、既存の値はnull値で上書きされます。 このパラメーターはオプションです。 デフォルトはFalseです。

apply_as_deletes タイプ:strまたは expr() CDC イベントをアップサートではなく DELETE として扱うタイミングを指定します。 順不同のデータを処理するために、削除された行は基になる Delta テーブルに廃棄石として一時的に保持され、これらの廃棄石を除外するビューがメタストアに作成されます。 保持間隔は、 pipelines.cdc.tombstoneGCThresholdInSeconds table プロパティ。 次のいずれかを指定できます: - 文字列: "Operation = 'DELETE'" - Spark SQL expr() 関数: expr("Operation = 'DELETE'") このパラメーターはオプションです。

apply_as_truncates タイプ:strまたは expr() CDCイベントを完全なテーブルTRUNCATEとして扱う必要がある場合を指定します。この句はターゲットテーブルの完全な切り捨てをトリガーするため、この機能が必要な特定のユースケースにのみ使用してください。 apply_as_truncates パラメーターは、 SCD タイプ 1 でのみサポートされます。 SCD タイプ 2 では、切り捨て操作はサポートされていません。 次のいずれかを指定できます: - 文字列: "Operation = 'TRUNCATE'" - Spark SQL expr() 関数: expr("Operation = 'TRUNCATE'") このパラメーターはオプションです。

column_list except_column_list タイプ: list ターゲットテーブルに含める列のサブセット。column_listを使用して、含める列の完全なリストを指定します。except_column_listを使用して、除外する列を指定します。いずれかの値を文字列のリストとして宣言することも、Spark SQL col()関数として宣言することもできます: - column_list = ["userId", "name", "city"]. - column_list = [col("userId"), col("name"), col("city")] - except_column_list = ["operation", "sequenceNum"] - except_column_list = [col("operation"), col("sequenceNum") col()関数の引数に修飾子を含めることはできません。例えば、col(userId)は使えますが、col(source.userId)は使えません。 このパラメーターはオプションです。 デフォルトでは、column_listまたはexcept_column_list引数が関数に渡されない場合、ターゲットテーブル内のすべての列が含まれます。

stored_as_scd_type タイプ:strまたは int レコードをSCDタイプ1として保存するか、SCDタイプ2として保存するか。 SCDタイプ1の場合は1、SCDタイプ2の場合は2に設定します。 この句はオプションです。 デフォルトはSCDタイプ1です。

track_history_column_list track_history_except_column_list タイプ: list ターゲット・テーブル内のヒストリーについて追跡する出力列のサブセット。 track_history_column_list を使用して、追跡する列の完全なリストを指定します。使う track_history_except_column_list をクリックして、トラッキングから除外する列を指定します。 どちらの値も、文字列のリストとして、または Spark SQL col() 関数として宣言できます。 - track_history_column_list = ["userId", "name", "city"]. - track_history_column_list = [col("userId"), col("name"), col("city")] - track_history_except_column_list = ["operation", "sequenceNum"] - track_history_except_column_list = [col("operation"), col("sequenceNum") col()関数の引数に修飾子を含めることはできません。例えば、col(userId)は使えますが、col(source.userId)は使えません。 このパラメーターはオプションです。 デフォルトでは、ターゲット・テーブルにすべての列が含まれます。track_history_column_list track_history_except_column_list 引数が関数に渡されます。

DLT の Python を使用したデータベース スナップショットからのチェンジデータキャプチャ

備考

プレビュー

APPLY CHANGES FROM SNAPSHOT API はパブリック プレビュー段階です。

Python API の apply_changes_from_snapshot() 関数を使用して、DLT チェンジデータキャプチャ (CDC) 機能を使用して、データベース スナップショットからソース データを処理します。

important

変更を適用するターゲットストリーミングテーブルを宣言する必要があります。オプションでターゲットテーブルのスキーマを指定できます。apply_changes_from_snapshot()ターゲットテーブルのスキーマを指定する場合は、sequence_byフィールドと同じデータ型の__START_AT列と__END_AT列も含める必要があります。

必要なターゲットテーブルを作成するには、DLT Pythonインターフェースの create_streaming_table() 関数を使用できます。

Python
apply_changes_from_snapshot(
target = "<target-table>",
source = Any,
keys = ["key1", "key2", "keyN"],
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
) -> None
注記

APPLY CHANGES FROM SNAPSHOT処理の場合、デフォルトの動作では、同じキーを持つ一致するレコードがターゲットに存在しない場合に新しいローが挿入されます。一致するレコードが存在する場合は、行のいずれかの値が変更された場合にのみ更新されます。 ターゲットに存在し、ソースに存在しなくなったキーを持つ行は削除されます。

スナップショットを使用した CDC 処理の詳細については、「 APPLY CHANGES APIs: DLT を使用したチェンジデータキャプチャの簡素化」を参照してください。 apply_changes_from_snapshot() 関数の使用例については、定期的なスナップショットの取り込みスナップショットの履歴取り込みの例を参照してください。

引数

target タイプ: str 更新するテーブルの名前。 create_streaming_table() 関数を使用して、apply_changes() 関数を実行する前にターゲット テーブルを作成できます。 このパラメーターは必須です。

source タイプ:strまたは lambda function 定期的にスナップショットを作成するテーブルまたはビューの名前、または処理するスナップショット データフレーム とスナップショット バージョンを返す Python ラムダ関数。 source引数の実装を参照してください。 このパラメーターは必須です。

keys タイプ: list ソースデータ内の行を一意に識別する列または列の組み合わせ。これは、どのCDCイベントがターゲットテーブル内の特定のレコードに適用されるかを識別するために使用されます。 次のいずれかを指定できます: - 文字列のリスト: ["userId", "orderId"] - Spark SQL col() 関数の一覧: [col("userId"), col("orderId"] col()関数の引数に修飾子を含めることはできません。例えば、col(userId)は使えますが、col(source.userId)は使えません。 このパラメーターは必須です。

stored_as_scd_type タイプ:strまたは int レコードをSCDタイプ1として保存するか、SCDタイプ2として保存するか。 SCDタイプ1の場合は1、SCDタイプ2の場合は2に設定します。 この句はオプションです。 デフォルトはSCDタイプ1です。

track_history_column_list track_history_except_column_list タイプ: list ターゲット・テーブル内のヒストリーについて追跡する出力列のサブセット。 track_history_column_list を使用して、追跡する列の完全なリストを指定します。使う track_history_except_column_list をクリックして、トラッキングから除外する列を指定します。 どちらの値も、文字列のリストとして、または Spark SQL col() 関数として宣言できます。 - track_history_column_list = ["userId", "name", "city"]. - track_history_column_list = [col("userId"), col("name"), col("city")] - track_history_except_column_list = ["operation", "sequenceNum"] - track_history_except_column_list = [col("operation"), col("sequenceNum") col()関数の引数に修飾子を含めることはできません。例えば、col(userId)は使えますが、col(source.userId)は使えません。 このパラメーターはオプションです。 デフォルトでは、ターゲット・テーブルにすべての列が含まれます。track_history_column_list track_history_except_column_list 引数が関数に渡されます。

source引数を実装する

apply_changes_from_snapshot() 関数には source 引数が含まれています。過去のスナップショットを処理する場合、 source引数は、処理するスナップショットデータを含むPython データフレーム とスナップショットバージョンの2 つの値を apply_changes_from_snapshot()関数に返すPythonラムダ関数である必要があります。

以下は、ラムダ関数のシグネチャです。

Python
lambda Any => Optional[(DataFrame, Any)]
  • lambda 関数の引数は、最後に処理されたスナップショット バージョンです。
  • ラムダ関数の戻り値が None または 2 つの値のタプルである: タプルの最初の値は、処理するスナップショットを含む データフレーム です。 タプルの 2 番目の値は、スナップショットの論理順序を表すスナップショット バージョンです。

ラムダ関数を実装して呼び出す例を次に示します。

Python
def next_snapshot_and_version(latest_snapshot_version):
if latest_snapshot_version is None:
return (spark.read.load("filename.csv"), 1)
else:
return None

apply_changes_from_snapshot(
# ...
source = next_snapshot_and_version,
# ...
)

DLT ランタイムは、 apply_changes_from_snapshot() 関数を含むパイプラインがトリガーされるたびに、次の手順を実行します。

  1. next_snapshot_and_version 関数を実行して、次のスナップショット データフレーム と対応するスナップショット バージョンを読み込みます。
  2. データフレーム が返されない場合、実行は終了し、パイプラインの更新は完了としてマークされます。
  3. 新しいスナップショットの変更を検出し、それらをターゲット・テーブルに段階的に適用します。
  4. ステップ #1 に戻り、次のスナップショットとそのバージョンをロードします。

制限

DLT Python インターフェースには、以下の制限があります。

pivot()機能はサポートされていません。Spark の pivot 操作では、入力データを出力スキーマのコンピュートに一括して読み込む必要があります。この機能は DLT ではサポートされていません。