DLT Python 言語リファレンス
この記事では、DLT Python プログラミング インターフェイスについて詳しく説明します。
SQL APIの情報については、DLT SQL language reference を参照してください。
Auto Loaderの構成の詳細については、「Auto Loaderとは」を参照してください。
始める前に
DLT Python インターフェイスを使用してパイプラインを実装する場合の重要な考慮事項を次に示します。
- Python
table()
関数とview()
関数は、パイプライン更新の計画および実行時に複数回呼び出されるため、これらの関数のいずれかに副作用のあるコード (データを変更するコードや電子メールを送信するコードなど) を含めないでください。予期しない動作を避けるために、データセットを定義する Python 関数には、テーブルまたはビューの定義に必要なコードのみを含める必要があります。 - Eメール の送信や外部モニタリング サービスとの統合などの操作を行うには、特にデータセットを定義する関数でイベント フックを使用します。 データセットを定義する関数にこれらの操作を実装すると、予期しない動作が発生します。
- Python
table
関数とview
関数はデータフレームを返す必要があります。データフレームを操作する一部の関数はデータフレームを返さないため、使用しないでください。これらの操作には、collect()
、count()
、toPandas()
、save()
、saveAsTable()
などの関数が含まれます。データフレーム変換は完全なデータフローグラフが解決 された後に 実行されるため、このような操作を使用すると、意図しない副作用が発生する可能性があります。
dlt
Python モジュールをインポートする
DLT Python 関数は dlt
モジュールで定義されています。Python API で実装されたパイプラインでは、次のモジュールをインポートする必要があります。
import dlt
DLT マテリアライズドビュー または ストリーミングテーブルを作成する
Python では、DLT は、定義クエリに基づいて、データセットをマテリアライズドビューとして更新するかストリーミングテーブルとして更新するかを決定します。@table
デコレータを使用して、マテリアライズドビューとストリーミングテーブルの両方を定義できます。
Python でマテリアライズドビューを定義するには、データソースに対して静的読み取りを実行するクエリに @table
を適用します。 ストリーミングテーブルを定義するには、データソースに対してストリーミング読み取りを実行するクエリに @table
を適用するか、 create_streaming_table() 関数を使用します。 どちらのデータセットタイプも、次のように同じ構文仕様を持っています。
import dlt
@dlt.table(
name="<name>",
comment="<comment>",
spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
path="<storage-location-path>",
partition_cols=["<partition-column>", "<partition-column>"],
cluster_by = ["<clustering-column>", "<clustering-column>"],
schema="schema-definition",
row_filter = "row-filter-clause",
temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)
DLT ビューの作成
Python でビューを定義するには、 @view
デコレータを適用します。@table
デコレーターと同様に、DLT のビューは静的データセットまたはストリーミングデータセットに使用できます。以下は、Python でビューを定義するための構文です。
import dlt
@dlt.view(
name="<name>",
comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)
例: テーブルとビューの定義
Pythonでテーブルまたはビューを定義するには、@dlt.view
または@dlt.table
デコレータを関数に適用します。関数名またはname
パラメーターを使用して、テーブルまたはビューの名前を割り当てることができます。次の例では、2つの異なるデータセットを定義しています。JSONファイルを入力ソースとして受け取るtaxi_raw
というビューと、入力としてtaxi_raw
ビューを受け取るfiltered_data
というテーブルです:
import dlt
@dlt.view
def taxi_raw():
return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")
# Use the function name as the table name
@dlt.table
def filtered_data():
return spark.read.table("taxi_raw").where(...)
# Use the name parameter as the table name
@dlt.table(
name="filtered_data")
def create_filtered_data():
return spark.read.table("taxi_raw").where(...)
例: 同じパイプラインで定義されたデータセットにアクセスする
dlt.read()
関数と dlt.read_stream()
関数は引き続き使用でき、DLT Python インターフェイスで完全にサポートされていますが、Databricks では、次の理由から、常に spark.read.table()
関数と spark.readStream.table()
関数を使用することをお勧めします。
spark
関数は、外部ストレージ内のデータセットや他のパイプラインで定義されたデータセットなど、内部データセットと外部データセットの読み取りをサポートします。dlt
関数は、内部データセットの読み取りのみをサポートします。spark
関数は、読み取り操作に対するオプション (skipChangeCommits
など) の指定をサポートしています。オプションの指定は、dlt
関数ではサポートされていません。
同じパイプラインで定義されたデータセットにアクセスするには、 spark.read.table()
関数または spark.readStream.table()
関数を使用します。
@dlt.table
def customers_raw():
return spark.read.format("csv").load("/data/customers.csv")
@dlt.table
def customers_filteredA():
return spark.read.table("customers_raw").where(...)
パイプライン内のビューまたはテーブルに対してクエリを実行する場合は、カタログとスキーマを直接指定することも、パイプラインで設定されたデフォルトを使用することもできます。 この例では、 customers
テーブルは、パイプライン用に構成されたデフォルト カタログとスキーマから読み取られます。
例: メタストアに登録されているテーブルからの読み取り
Hive metastoreに登録されているテーブルからデータを読み取るには、関数引数でテーブル名をデータベース名で修飾できます。
@dlt.table
def customers():
return spark.read.table("sales.customers").where(...)
Unity Catalog テーブルから読み取る例については、「 Unity Catalog パイプラインにデータを取り込む」を参照してください。
例: 次を使用してデータセットにアクセスする spark.sql
クエリ関数で spark.sql
式を使用してデータセットを返すこともできます。 内部データセットから読み取るには、デフォルトのカタログとスキーマを使用するために名前を修飾しないままにするか、それらを先頭に付けることができます。
@dlt.table
def chicago_customers():
return spark.sql("SELECT * FROM catalog_name.schema_name.customers_cleaned WHERE city = 'Chicago'")
マテリアライズドビューまたはストリーミングテーブルからのレコードの完全削除
GDPR コンプライアンスなど、削除ベクトルが有効になっているマテリアライズドビューまたはストリーミングテーブルからレコードを完全に削除するには、オブジェクトの基になる Delta テーブルに対して追加の操作を実行する必要があります。 マテリアライズドビューからのレコードの削除を確実にするには、 削除ベクトルが有効になっているマテリアライズドビューからのレコードの完全削除を参照してください。 ストリーミングテーブルからレコードを確実に削除するには、「 ストリーミングテーブルからレコードを完全に削除する」を参照してください。
DLT sink
API を使用して外部イベント ストリーミング サービスまたは Delta テーブルに書き込む
プレビュー
DLT sink
API は パブリック プレビュー段階です。
- 完全な更新更新を実行しても、シンクからデータはクリアされません。再処理されたデータはシンクに追加され、既存のデータは変更されません。
- DLT のエクスペクテーションは、
sink
API ではサポートされていません。
Apache Kafka や Azure Event Hubs などのイベント ストリーミング サービスに書き込むか、DLT パイプラインから Delta テーブルに書き込むには、dlt
Python モジュールに含まれている create_sink()
関数を使用します。create_sink()
関数を使用してシンクを作成した後、追加フローでシンクを使用してデータをシンクに書き込みます。追加フローは、 create_sink()
関数でサポートされている唯一のフロー タイプです。 その他のフロー タイプ( apply_changes
など)はサポートされていません。
次に、 create_sink()
関数を使用してシンクを作成するための構文を示します。
create_sink(<sink_name>, <format>, <options>)
引数 |
---|
|
|
|
例: create_sink()
関数を使用した Kafka シンクの作成
create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "host:port",
"topic": "my_topic"
}
)
例: create_sink()
関数とファイル・システム・パスを持つ Delta シンクの作成
次の例では、ファイル システム パスをテーブルに渡すことで Delta テーブルに書き込むシンクを作成します。
create_sink(
"my_delta_sink",
"delta",
{ "path": "//path/to/my/delta/table" }
)
例: create_sink()
関数と Unity Catalog テーブル名を使用して Delta シンクを作成する
Deltaシンクは、Unity Catalog外部およびマネージドテーブルとHive metastoreマネージドテーブルをサポートします。テーブル名は完全修飾名である必要があります。 たとえば、 Unity Catalog テーブルでは、<catalog>.<schema>.<table>
という 3 層の識別子を使用する必要があります。 Hive metastore テーブルでは<schema>.<table>
を使用する必要があります。
次の例では、Unity Catalog のテーブルの名前を渡すことで Delta テーブルに書き込むシンクを作成します。
create_sink(
"my_delta_sink",
"delta",
{ "tableName": "my_catalog.my_schema.my_table" }
)
例: 追加フローを使用して Delta シンクに書き込む
次の例では、Delta テーブルに書き込むシンクを作成し、そのシンクに書き込む追加フローを作成します。
create_sink("my_sink", "delta", {"path": "/tmp/delta_sink"})
@append_flow(name = "flow", target = "my_sink")
def flowFunc():
return <streaming-query>
例: 追加フローを使用して Kafka シンクに書き込む
次の例では、Kafka トピックに書き込むシンクを作成し、そのシンクに書き込む追加フローを作成します。
create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "host:port",
"topic": "my_topic"
}
)
@append_flow(name = "flow", target = "my_kafka_sink")
def myFlow():
return read_stream("xxx").select(F.to_json(F.struct("*")).alias("value"))
Kafka に書き込まれる データフレーム のスキーマには、「 Kafka 構造化ストリーミング ライターの構成」で指定されている列が含まれている必要があります。
ストリーミング操作のターゲットとして使用するテーブルを作成します
create_streaming_table()
関数を使用して、apply_changes()、apply_changes_from_snapshot()、@append_flow 出力レコードなど、ストリーミング操作によって出力されるレコードのターゲット テーブルを作成します。
create_target_table()
関数とcreate_streaming_live_table()
関数は非推奨です。Databricksでは、create_streaming_table()
関数を使用するように既存のコードを更新することをお勧めします。
create_streaming_table(
name = "<table-name>",
comment = "<comment>",
spark_conf={"<key>" : "<value", "<key" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
partition_cols=["<partition-column>", "<partition-column>"],
cluster_by = ["<clustering-column>", "<clustering-column>"],
path="<storage-location-path>",
schema="schema-definition",
expect_all = {"<key>" : "<value", "<key" : "<value>"},
expect_all_or_drop = {"<key>" : "<value", "<key" : "<value>"},
expect_all_or_fail = {"<key>" : "<value", "<key" : "<value>"},
row_filter = "row-filter-clause"
)
引数 |
---|
|
|
|
|
|
|
|
|
|
|
テーブルの具体化方法を制御する
テーブルはまた、その実体化をさらにコントロールすることもできる:
cluster_by
を使用してテーブルをクラスタリングする方法を指定します。リキッドクラスタリングを使用して、クエリを高速化できます。 「Deltaテーブルにリキッドクラスタリングを使用する」を参照してください。- テーブルを パーティション分割 する方法を
partition_cols
で指定します。 - テーブル・プロパティは、ビューまたはテーブルを定義するときに設定できます。 DLT テーブルのプロパティを参照してください。
path
設定を使用して、テーブルデータの保存場所を設定します。デフォルトでは、path
が設定されていない場合、テーブルデータはパイプラインの保存場所に保存されます。- 生成された列をスキーマ定義で使用できます。例: スキーマとクラスター列の指定を参照してください。
サイズが 1 TB 未満のテーブルの場合、Databricks では DLT でデータ編成を制御できるようにすることをお勧めします。パーティション列は、テーブルがテラバイトを超えて大きくなることが予想される場合を除き、指定しないでください。
例: スキーマとクラスター列を指定する
オプションで、Python StructType
または SQL DDL 文字列を使用してテーブルスキーマを指定できます。 DDL 文字列で指定すると、 定義に生成列を含めることができます。
次の例では、Python StructType
を使用して指定されたスキーマを持つ sales
というテーブルを作成します。
sales_schema = StructType([
StructField("customer_id", StringType(), True),
StructField("customer_name", StringType(), True),
StructField("number_of_line_items", StringType(), True),
StructField("order_datetime", StringType(), True),
StructField("order_number", LongType(), True)]
)
@dlt.table(
comment="Raw data on sales",
schema=sales_schema)
def sales():
return ("...")
次の例では、DDL 文字列を使用してテーブルのスキーマを指定し、生成されたカラムを定義し、クラスタリングカラムを定義します。
@dlt.table(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
cluster_by = ["order_day_of_week", "customer_id"])
def sales():
return ("...")
デフォルトでは、スキーマを指定しない場合、DLT は table
定義からスキーマを推測します。
例: パーティション列の指定
次の例では、DDL文字列を使用してテーブルのスキーマを指定し、生成された列を定義し、パーティション列を定義します:
@dlt.table(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
partition_cols = ["order_day_of_week"])
def sales():
return ("...")
例: テーブル制約の定義
プレビュー
テーブル制約は パブリック プレビュー段階です。
スキーマを指定するときに、プライマリ・キーと外部キーを定義できます。 制約は情報提供を目的としており、強制されません。 SQL 言語リファレンスの CONSTRAINT 句 を参照してください。
次の例では、プライマリ・キー制約と外部キー制約を持つテーブルを定義しています。
@dlt.table(
schema="""
customer_id STRING NOT NULL PRIMARY KEY,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
"""
def sales():
return ("...")
例: 行フィルターと列マスクの定義
プレビュー
行フィルターと列マスクは パブリック プレビュー段階です。
ロー・フィルターとカラム・マスクを使用してマテリアライズドビュー またはストリーミング・テーブルを作成するには、 ROW FILTER 句 と MASK 句を使用します。 次の例は、マテリアライズドビューとストリーミングテーブルを行フィルタと列マスクの両方を使用して定義する方法を示しています。
@dlt.table(
schema="""
id int COMMENT 'This is the customer ID',
name string COMMENT 'This is the customer full name',
region string,
ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
""",
row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)"
def sales():
return ("...")
行フィルターと列マスクの詳細については、「 行フィルターと列マスクを使用してテーブルを発行する」を参照してください。
ソース ストリーミングテーブルの変更を無視するようにストリーミングテーブルを構成する
skipChangeCommits
フラグは、option()
関数を使用するspark.readStream
でのみ機能します。このフラグは、dlt.read_stream()
関数では使用できません。- ソース ストリーミングテーブルが apply_changes() 関数のターゲットとして定義されている場合は、
skipChangeCommits
フラグを使用できません。
デフォルトでは、ストリーミングテーブルには追加専用ソースが必要です。 ストリーミングテーブルが別のストリーミングテーブルをソースとして使用し、ソース ストリーミングテーブルが更新または削除 ( GDPR "忘れられる権利" 処理など) を必要とする場合、ソース ストリーミングテーブルを読み取るときにskipChangeCommits
フラグを設定して、それらの変更を無視できます。 このフラグの詳細については、「 更新と削除を無視する」を参照してください。
@table
def b():
return spark.readStream.option("skipChangeCommits", "true").table("A")
Python DLT プロパティ
次の表では、DLT を使用してテーブルとビューを定義する際に指定できるオプションとプロパティについて説明します。
@tableまたは@view |
---|
|
|
|
|
|
|
|
|
|
|
テーブルまたはビューの定義 |
---|
|
|
エクスペクテーション |
---|
|
|
|
|
|
|
DLT の Python を使用した変更フィードからのチェンジデータキャプチャ
Python APIの apply_changes()
関数を使用して、DLT チェンジデータキャプチャ (CDC) 機能を使用して、チェンジデータフィード (CDF) からのソース データを処理します。
変更を適用するターゲット ストリーミングテーブルを宣言する必要があります。 オプションで、ターゲットテーブルのスキーマを指定できます。 apply_changes()
ターゲットテーブルのスキーマを指定するときは、sequence_by
フィールドと同じデータ型の__START_AT
列と__END_AT
列を含める必要があります。
必要なターゲットテーブルを作成するには、DLT Pythonインターフェースの create_streaming_table() 関数を使用できます。
apply_changes(
target = "<target-table>",
source = "<data-source>",
keys = ["key1", "key2", "keyN"],
sequence_by = "<sequence-column>",
ignore_null_updates = False,
apply_as_deletes = None,
apply_as_truncates = None,
column_list = None,
except_column_list = None,
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
)
APPLY CHANGES
処理の場合、INSERT
イベントとUPDATE
イベントのデフォルトの動作は、ソースから CDC イベント を更新する ことです。指定したキーに一致するターゲットテーブルの行を更新するか、一致するレコードがターゲットテーブルに存在しない場合は新しい行を挿入します。DELETE
イベントの処理は、APPLY AS DELETE WHEN
条件で指定できます。
チェンジ フィードを使用した CDC 処理の詳細については、「 APPLY CHANGES APIs: DLT によるチェンジデータキャプチャの簡略化」を参照してください。 apply_changes()
関数の使用例については、例: CDF ソース・データを使用した SCD タイプ 1 および SCD タイプ 2 の処理を参照してください。
変更を適用するターゲット ストリーミングテーブルを宣言する必要があります。 オプションで、ターゲットテーブルのスキーマを指定できます。 apply_changes
ターゲット表スキーマを指定する場合は、sequence_by
フィールドと同じデータ・タイプの__START_AT
列と__END_AT
列を含める必要があります。
「 APPLY CHANGES APIs: DLTによるチェンジデータキャプチャの簡素化」を参照してください。
引数 |
---|
|
|
|
|
|
|
|
|
|
|
DLT の Python を使用したデータベース スナップショットからのチェンジデータキャプチャ
プレビュー
APPLY CHANGES FROM SNAPSHOT
API はパブリック プレビュー段階です。
Python API の apply_changes_from_snapshot()
関数を使用して、DLT チェンジデータキャプチャ (CDC) 機能を使用して、データベース スナップショットからソース データを処理します。
変更を適用するターゲットストリーミングテーブルを宣言する必要があります。オプションでターゲットテーブルのスキーマを指定できます。apply_changes_from_snapshot()
ターゲットテーブルのスキーマを指定する場合は、sequence_by
フィールドと同じデータ型の__START_AT
列と__END_AT
列も含める必要があります。
必要なターゲットテーブルを作成するには、DLT Pythonインターフェースの create_streaming_table() 関数を使用できます。
apply_changes_from_snapshot(
target = "<target-table>",
source = Any,
keys = ["key1", "key2", "keyN"],
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
) -> None
APPLY CHANGES FROM SNAPSHOT
処理の場合、デフォルトの動作では、同じキーを持つ一致するレコードがターゲットに存在しない場合に新しいローが挿入されます。一致するレコードが存在する場合は、行のいずれかの値が変更された場合にのみ更新されます。 ターゲットに存在し、ソースに存在しなくなったキーを持つ行は削除されます。
スナップショットを使用した CDC 処理の詳細については、「 APPLY CHANGES APIs: DLT を使用したチェンジデータキャプチャの簡素化」を参照してください。 apply_changes_from_snapshot()
関数の使用例については、定期的なスナップショットの取り込みとスナップショットの履歴取り込みの例を参照してください。
引数 |
---|
|
|
|
|
|
source
引数を実装する
apply_changes_from_snapshot()
関数には source
引数が含まれています。過去のスナップショットを処理する場合、 source
引数は、処理するスナップショットデータを含むPython データフレーム とスナップショットバージョンの2 つの値を apply_changes_from_snapshot()
関数に返すPythonラムダ関数である必要があります。
以下は、ラムダ関数のシグネチャです。
lambda Any => Optional[(DataFrame, Any)]
- lambda 関数の引数は、最後に処理されたスナップショット バージョンです。
- ラムダ関数の戻り値が
None
または 2 つの値のタプルである: タプルの最初の値は、処理するスナップショットを含む データフレーム です。 タプルの 2 番目の値は、スナップショットの論理順序を表すスナップショット バージョンです。
ラムダ関数を実装して呼び出す例を次に示します。
def next_snapshot_and_version(latest_snapshot_version):
if latest_snapshot_version is None:
return (spark.read.load("filename.csv"), 1)
else:
return None
apply_changes_from_snapshot(
# ...
source = next_snapshot_and_version,
# ...
)
DLT ランタイムは、 apply_changes_from_snapshot()
関数を含むパイプラインがトリガーされるたびに、次の手順を実行します。
next_snapshot_and_version
関数を実行して、次のスナップショット データフレーム と対応するスナップショット バージョンを読み込みます。- データフレーム が返されない場合、実行は終了し、パイプラインの更新は完了としてマークされます。
- 新しいスナップショットの変更を検出し、それらをターゲット・テーブルに段階的に適用します。
- ステップ #1 に戻り、次のスナップショットとそのバージョンをロードします。
制限
DLT Python インターフェースには、以下の制限があります。
pivot()
機能はサポートされていません。Spark の pivot
操作では、入力データを出力スキーマのコンピュートに一括して読み込む必要があります。この機能は DLT ではサポートされていません。