Delta Live Tables Python言語リファレンス
この記事には、Delta Live Tables Python プログラミング インターフェイスの詳細が記載されています。
SQL APIの詳細については、「Delta Live Tables SQL言語リファレンス」を参照してください。
Auto Loaderの設定に関する詳細は、「Auto Loaderとは?」を参照してください。
始める前に
Delta Live Tables Python インターフェースを使用してパイプラインを実装する場合の重要な考慮事項は次のとおりです。
Python
table()
関数とview()
関数は、パイプライン更新の計画と実行中に複数回呼び出されるため、これらの関数のいずれかに副作用が発生する可能性のあるコード (たとえば、データを変更したり電子メールを送信したりするコード) を含めないでください。 予期しない動作を回避するには、データセットを定義する Python 関数に、テーブルまたはビューの定義に必要なコードのみを含める必要があります。特に、パラメータを定義する関数内で、メールの送信や外部モニタリング サービスとの統合などの操作を実行するには、イベント フックを使用します。 データセットを定義する関数にこれらの操作を実装すると、予期しない動作が発生します。
Python
table
関数とview
関数はDataFrameを返す必要があります。DataFrameを操作する一部の関数はDataFrameを返さないため、使用しないでください。これらの操作には、collect()
、count()
、toPandas()
、save()
、saveAsTable()
などの関数が含まれます。DataFrame変換は完全なデータフローグラフが解決された後に実行されるため、このような操作を使用すると、意図しない副作用が発生する可能性があります。
dlt
Pythonモジュールをインポートする
Delta Live TablesのPython関数は、dlt
モジュールで定義されています。Python APIを使用して実装されたパイプラインは、このモジュールをインポートする必要があります:
import dlt
Delta Live Tablesマテリアライズドビューまたはストリーミングテーブルを作成する
Python では、Delta Live Tables は定義クエリに基づいて、データセットをマテリアライズド ビューとして更新するか、ストリーミング テーブルとして更新するかを決定します。 @table
デコレータは、マテリアライズド ビューとストリーミング テーブルの両方を定義するために使用できます。
Pythonでマテリアライズド ビューを定義するには、データソースに対して静的読み取りを実行するクエリに @table
を適用します。 ストリーミング テーブルを定義するには、データ ソースに対してストリーミング読み取りを実行するクエリに@table
を適用するか、 create_streaming_table() 関数を使用します。 両方のデータセット タイプには、次の同じ構文仕様があります。
注
cluster_by
引数を使用してリキッドクラスタリングを有効にするには、プレビューチャンネルを使用するようにパイプラインを設定する必要があります。
import dlt
@dlt.table(
name="<name>",
comment="<comment>",
spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
path="<storage-location-path>",
partition_cols=["<partition-column>", "<partition-column>"],
cluster_by = ["<clustering-column>", "<clustering-column>"],
schema="schema-definition",
row_filter = "row-filter-clause",
temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)
Delta Live Tablesビューの作成
Pythonでビューを定義するには、@view
デコレータを適用します。@table
デコレータと同様に、Delta Live Tableのビューは、静的データセットまたはストリーミングデータセットに使用できます。Pythonでビューを定義するための構文を次に示します:
import dlt
@dlt.view(
name="<name>",
comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)
例:テーブルとビューを定義する
Pythonでテーブルまたはビューを定義するには、@dlt.view
または@dlt.table
デコレータを関数に適用します。関数名またはname
パラメーターを使用して、テーブルまたはビューの名前を割り当てることができます。次の例では、2つの異なるデータセットを定義しています。JSONファイルを入力ソースとして受け取るtaxi_raw
というビューと、入力としてtaxi_raw
ビューを受け取るfiltered_data
というテーブルです:
import dlt
@dlt.view
def taxi_raw():
return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")
# Use the function name as the table name
@dlt.table
def filtered_data():
return spark.read.table("LIVE.taxi_raw").where(...)
# Use the name parameter as the table name
@dlt.table(
name="filtered_data")
def create_filtered_data():
return spark.read.table("LIVE.taxi_raw").where(...)
例:同じパイプラインで定義されたデータセットにアクセスする
注
dlt.read()
関数と dlt.read_stream()
関数は引き続き使用でき、Delta Live Tables Python インターフェイスで完全にサポートされていますが、Databricks では、次の理由から、常に spark.read.table()
関数と spark.readStream.table()
関数を使用することをお勧めします。
spark
関数は、外部ストレージ内のデータセットや他のパイプラインで定義されたデータセットなど、内部データセットと外部データセットの読み取りをサポートします。dlt
関数は、内部データセットの読み取りのみをサポートします。spark
関数は、読み取り操作に対するオプション (skipChangeCommits
など) の指定をサポートしています。オプションの指定は、dlt
関数ではサポートされていません。
同じパイプラインで定義されているデータセットにアクセスするには、 spark.read.table()
関数または spark.readStream.table()
関数を使用し、データセット名の先頭に LIVE
キーワードを付加します。
@dlt.table
def customers_raw():
return spark.read.format("csv").load("/data/customers.csv")
@dlt.table
def customers_filteredA():
return spark.read.table("LIVE.customers_raw").where(...)
例:Metastoreに登録されたテーブルからの読み取り
Hive metastoreに登録されたテーブルからデータを読み取るには、関数の引数で LIVE
キーワードを省略し、オプションでテーブル名をデータベース名で修飾します。
@dlt.table
def customers():
return spark.read.table("sales.customers").where(...)
Unity Catalog テーブルからの読み取りの例については、「 Unity Catalog パイプラインへのデータの取り込み」を参照してください。
例:spark.sql
を使ってデータセットにアクセスする
クエリー関数でspark.sql
式を使用してデータセットを返すこともできます。内部データセットから読み取るには、データセット名の前にLIVE.
を追加します:
@dlt.table
def chicago_customers():
return spark.sql("SELECT * FROM LIVE.customers_cleaned WHERE city = 'Chicago'")
ストリーミング操作のターゲットとして使用するテーブルを作成する
create_streaming_table()
関数を使用して、 apply_changes() 、 apply_changes_from_snapshot() 、 @append_flow出力レコードなどのストリーミング操作によって出力されるレコードのターゲット テーブルを作成します。
注
create_target_table()
関数とcreate_streaming_live_table()
関数は非推奨です。Databricksでは、create_streaming_table()
関数を使用するように既存のコードを更新することをお勧めします。
注
cluster_by
引数を使用してリキッドクラスタリングを有効にするには、プレビューチャンネルを使用するようにパイプラインを設定する必要があります。
create_streaming_table(
name = "<table-name>",
comment = "<comment>"
spark_conf={"<key>" : "<value", "<key" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
partition_cols=["<partition-column>", "<partition-column>"],
cluster_by = ["<clustering-column>", "<clustering-column>"],
path="<storage-location-path>",
schema="schema-definition",
expect_all = {"<key>" : "<value", "<key" : "<value>"},
expect_all_or_drop = {"<key>" : "<value", "<key" : "<value>"},
expect_all_or_fail = {"<key>" : "<value", "<key" : "<value>"},
row_filter = "row-filter-clause"
)
引数 |
---|
タイプ: テーブル名。 このパラメーターは必須です。 |
タイプ: テーブルの説明(オプション)。 |
タイプ: このクエリーを実行するためのSpark構成のオプションのリスト。 |
タイプ: テーブルのテーブルプロパティのオプションのリスト。 |
タイプ: テーブルのパーティション化に使用する1つ以上の列のオプションのリスト。 |
タイプ: オプションで、テーブルでリキッドクラスタリングを有効にし、クラスタリングキーとして使用する列を定義します。 「 Delta テーブルに リキッドクラスタリングを使用する」を参照してください。 |
タイプ: テーブル・データのオプションのストレージ・ロケーション。 設定されていない場合は、パイプラインの保存場所がデフォルトになります。 |
タイプ: テーブルのオプションのスキーマ定義。 スキーマは、SQL DDL 文字列または Python |
タイプ: テーブルのデータ品質制約 (省略可能)。 複数のエクスペクテーションを表示します。 |
タイプ: テーブルのオプションの行フィルタ句。 「行フィルターと列マスクを使用したテーブルのパブリッシュ」を参照してください。 |
テーブルの実体化方法を制御する
テーブルはまた、その実体化をさらにコントロールすることもできる:
partition_cols
を使用してテーブルを パーティション分割 する方法を指定します。パーティショニングを使用すると、クエリーを高速化できます。テーブルのプロパティは、ビューまたはテーブルを定義するときに設定できます。 Delta Live Tables テーブルのプロパティを参照してください。
path
設定を使用して、テーブルデータの保存場所を設定します。デフォルトでは、path
が設定されていない場合、テーブルデータはパイプラインの保存場所に保存されます。生成された列 は、スキーマ定義で使用できます。例 : スキーマ列とパーティション列の指定を参照してください。
注
テーブルのサイズが 1 TB 未満の場合、Databricks では Delta Live Tables でデータ編成を制御することを推奨しています。 パーティション列は、テーブルがテラバイトを超えて大きくなることが予想される場合を除き、指定しないでください。
例:スキーマとパーティション列を指定する
オプションで、Python StructType
またはSQL DDL文字列を使用してテーブルスキーマを指定できます。DDL文字列で指定すると、定義には生成された列を含めることができます。
次の例では、Python StructType
を使用してスキーマを指定した sales
というテーブルを作成します。
sales_schema = StructType([
StructField("customer_id", StringType(), True),
StructField("customer_name", StringType(), True),
StructField("number_of_line_items", StringType(), True),
StructField("order_datetime", StringType(), True),
StructField("order_number", LongType(), True)]
)
@dlt.table(
comment="Raw data on sales",
schema=sales_schema)
def sales():
return ("...")
次の例では、DDL文字列を使用してテーブルのスキーマを指定し、生成された列を定義し、パーティション列を定義します:
@dlt.table(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
partition_cols = ["order_day_of_week"])
def sales():
return ("...")
デフォルトでは、スキーマを指定しない場合、Delta Live Tablesはtable
定義からスキーマを推測します。
ソースストリーミングテーブルの変更を無視するようにストリーミングテーブルを構成する
注
skipChangeCommits
フラグは、option()
関数を使用するspark.readStream
でのみ機能します。このフラグは、dlt.read_stream()
関数では使用できません。ソース・ストリーミング・テーブルが apply_changes() 関数のターゲットとして定義されている場合、
skipChangeCommits
フラグを使用することはできません。
既定では、ストリーミング テーブルには追加専用のソースが必要です。 ストリーミング テーブルが別のストリーミング テーブルをソースとして使用し、ソース ストリーミング テーブルで更新または削除 (GDPR の "忘れられる権利" 処理など) が必要な場合、ソース ストリーミング テーブルを読み取るときに skipChangeCommits
フラグを設定して、それらの変更を無視できます。 このフラグの詳細については、「 更新と削除を無視する」を参照してください。
@table
def b():
return spark.readStream.option("skipChangeCommits", "true").table("LIVE.A")
例: テーブル制約の定義
プレビュー
テーブル制約は パブリック プレビュー段階です。
スキーマを指定するときに、プライマリ・キーと外部キーを定義できます。 制約は情報提供を目的としており、強制されません。 SQL 言語リファレンスのCONSTRAINT 句を参照してください。
次の例では、プライマリ・キー制約と外部キー制約を持つテーブルを定義しています。
@dlt.table(
schema="""
customer_id STRING NOT NULL PRIMARY KEY,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
"""
def sales():
return ("...")
例: 行フィルターと列マスクの定義
プレビュー
行フィルターと列マスクは パブリック プレビュー段階です。
マテリアライズド・ビューまたはロー・フィルタとカラム・マスクを持つストリーミング・テーブルを作成するには、 ROW FILTER 句 と MASK 句を使用します。 次の例は、マテリアライズドビューとストリーミングテーブルを行フィルタと列マスクの両方を使用して定義する方法を示しています。
@dlt.table(
schema="""
id int COMMENT 'This is the customer ID',
name string COMMENT 'This is the customer full name',
region string,
ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
""",
row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)"
def sales():
return ("...")
行フィルターと列マスクの詳細については、 「行フィルターと列マスクを使用してテーブルを公開する」を参照してください。
Python Delta Live Tablesのプロパティ
以下のテーブルでは、Delta Live Tablesでテーブルやビューを定義する際に指定できるオプションやプロパティについて説明します:
注
cluster_by
引数を使用してリキッドクラスタリングを有効にするには、プレビューチャンネルを使用するようにパイプラインを設定する必要があります。
@tableまたは@view |
---|
タイプ: テーブルまたはビューのオプションの名前。定義されていない場合、関数名がテーブル名またはビュー名として使用されます。 |
タイプ: テーブルの説明(オプション)。 |
タイプ: このクエリーを実行するためのSpark構成のオプションのリスト。 |
タイプ: テーブルのテーブルプロパティのオプションのリスト。 |
タイプ: テーブル・データのオプションのストレージ・ロケーション。 設定されていない場合は、パイプラインの保存場所がデフォルトになります。 |
タイプ: オプションのコレクション (テーブルのパーティション分割に使用する 1 つ以上の列の |
タイプ: オプションで、テーブルでリキッドクラスタリングを有効にし、クラスタリングキーとして使用する列を定義します。 「 Delta テーブルに リキッドクラスタリングを使用する」を参照してください。 |
タイプ: テーブルのオプションのスキーマ定義。 スキーマは、SQL DDL 文字列または Python |
タイプ: テーブルを作成しますが、テーブルのメタデータは公開しません。 デフォルトは「False」です。 |
タイプ: テーブルのオプションの行フィルタ句。 「行フィルターと列マスクを使用したテーブルのパブリッシュ」を参照してください。 |
テーブルまたはビューの定義 |
---|
データセットを定義するPython関数。 |
Spark DatasetまたはKoalas DataFrameを返すSpark SQL文。
SQL 構文を使用して Delta Live Tables |
エクスペクテーション |
---|
|
|
|
1つ以上のデータ品質制約を宣言します。 |
1つ以上のデータ品質制約を宣言します。 |
1つ以上のデータ品質制約を宣言します。 |
Delta Live TablesでPythonを使用した変更フィードからのチェンジデータキャプチャ
PythonAPIDelta Live Tablesキャプチャ (CDC ) 機能を使用してチェンジデータ フィード ( CDF ) からのソース データを処理するには、 apply_changes()
の関数を使用します。
重要
変更を適用するターゲット ストリーミング テーブルを宣言する必要があります。 オプションで、ターゲットテーブルのスキーマを指定できます。 apply_changes()
ターゲットテーブルのスキーマを指定するときは、sequence_by
フィールドと同じデータ型の__START_AT
列と__END_AT
列を含める必要があります。
必要なターゲット テーブルを作成するには、Delta Live Tables Python インターフェイスのcreate_streaming_table()関数を使用できます。
apply_changes(
target = "<target-table>",
source = "<data-source>",
keys = ["key1", "key2", "keyN"],
sequence_by = "<sequence-column>",
ignore_null_updates = False,
apply_as_deletes = None,
apply_as_truncates = None,
column_list = None,
except_column_list = None,
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
)
注
APPLY CHANGES
処理の場合、 INSERT
およびUPDATE
イベントのデフォルトの動作は、ソースから CDC イベントをアップサートすることです。つまり、指定されたキーに一致するターゲット テーブル内の行を更新するか、一致するレコードがターゲット テーブルに存在しない場合は新しい行を挿入します。 DELETE
イベントの処理は、APPLY AS DELETE WHEN
条件で指定できます。
変更フィードを使用したCDC処理の詳細については、「 APPLY CHANGES APIs : Delta Live Tablesによるチェンジデータキャプチャの簡素化」を参照してください。 apply_changes()
関数の使用例については、「 例: CDF ソース データを使用した SCD タイプ 1 および SCD タイプ 2 の処理」を参照してください。
重要
変更を適用するターゲット ストリーミング テーブルを宣言する必要があります。 オプションで、ターゲットテーブルのスキーマを指定できます。 apply_changes
ターゲット表スキーマを指定する場合は、sequence_by
フィールドと同じデータ・タイプの__START_AT
列と__END_AT
列を含める必要があります。
APPLY CHANGES APIs: Delta Live Tablesを使用してチェンジデータキャプチャを簡素化するを参照してください。
引数 |
---|
タイプ: 更新するテーブルの名前。 create_streaming_table() 関数を使用して、 このパラメーターは必須です。 |
タイプ: CDCレコードを含むデータソース。 このパラメーターは必須です。 |
タイプ: ソースデータ内の行を一意に識別する列または列の組み合わせ。これは、どのCDCイベントがターゲットテーブル内の特定のレコードに適用されるかを識別するために使用されます。 次のいずれかを指定できます:
このパラメーターは必須です。 |
タイプ: ソースデータ内のCDCイベントの論理順序を指定する列名。Delta Live Tablesは、このシーケンスを使用して、順序どおりに到着しない変更イベントを処理します。 次のいずれかを指定できます:
指定する列は、ソート可能なデータ・タイプでなければなりません。 このパラメーターは必須です。 |
タイプ: ターゲットカラムのサブセットを含む更新の取り込みを許可します。 CDC イベントが既存の行と一致し、 このパラメーターはオプションです。 デフォルトは |
タイプ: CDC イベントをアップサートではなく 次のいずれかを指定できます:
このパラメーターはオプションです。 |
タイプ: CDCイベントを完全なテーブル
次のいずれかを指定できます:
このパラメーターはオプションです。 |
タイプ: ターゲットテーブルに含める列のサブセット。
このパラメーターはオプションです。 デフォルトでは、 |
タイプ: レコードをSCDタイプ1として保存するか、SCDタイプ2として保存するか。 SCDタイプ1の場合は この句はオプションです。 デフォルトはSCDタイプ1です。 |
タイプ: ターゲット表のヒストリーについて追跡される出力列のサブセット。
このパラメーターはオプションです。 デフォルトでは、 |
Delta Live TablesでPythonを使用したデータベースからのチェンジデータキャプチャ
プレビュー
APPLY CHANGES FROM SNAPSHOT
API はパブリック プレビュー段階です。
PythonAPIDelta Live Tablesチェンジデータキャプチャ (CDC )機能を使用してデータベース トークンからソース データを処理するには、 apply_changes_from_snapshot()
の 関数を使用します。
重要
変更を適用するターゲットストリーミングテーブルを宣言する必要があります。オプションでターゲットテーブルのスキーマを指定できます。apply_changes_from_snapshot()
ターゲットテーブルのスキーマを指定する場合は、sequence_by
フィールドと同じデータ型の__START_AT
列と__END_AT
列も含める必要があります。
必要なターゲット テーブルを作成するには、Delta Live Tables Python インターフェイスのcreate_streaming_table()関数を使用できます。
apply_changes_from_snapshot(
target = "<target-table>",
source = Any,
keys = ["key1", "key2", "keyN"],
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
) -> None
注
APPLY CHANGES FROM SNAPSHOT
処理の場合、デフォルトの動作では、同じキーを持つ一致するレコードがターゲットに存在しない場合に新しい行を挿入します。 一致するレコードが存在する場合は、行のいずれかの値が変更された場合にのみ更新されます。 ターゲットには存在するがソースには存在しないキーを持つ行は削除されます。
サインインを使用したCDC処理の詳細については、「 APPLY CHANGES APIs : Delta Live Tablesによるチェンジデータキャプチャの簡素化」を参照してください。 apply_changes_from_snapshot()
関数の使用例については、 定期的なスナップショットの取り込みと履歴スナップショットの取り込みの例を参照してください。
引数 |
---|
タイプ: 更新するテーブルの名前。 create_streaming_table() 関数を使用して、 このパラメーターは必須です。 |
タイプ: 定期的にスナップショットを作成するテーブルまたはビューの名前、または処理するスナップショット DataFrame とスナップショットのバージョンを返す Python ラムダ関数のいずれか。 ソース引数の実装を参照してください。 このパラメーターは必須です。 |
タイプ: ソースデータ内の行を一意に識別する列または列の組み合わせ。これは、どのCDCイベントがターゲットテーブル内の特定のレコードに適用されるかを識別するために使用されます。 次のいずれかを指定できます:
このパラメーターは必須です。 |
タイプ: レコードをSCDタイプ1として保存するか、SCDタイプ2として保存するか。 SCDタイプ1の場合は この句はオプションです。 デフォルトはSCDタイプ1です。 |
タイプ: ターゲット表のヒストリーについて追跡される出力列のサブセット。
このパラメーターはオプションです。 デフォルトでは、 |
source
引数を実装する
apply_changes_from_snapshot()
関数には source
引数が含まれています。過去のスナップショットを処理する場合、 source
引数は、処理するスナップショットデータを含むPython DataFrame とスナップショットバージョンの2 つの値を apply_changes_from_snapshot()
関数に返すPythonラムダ関数である必要があります。
以下は、ラムダ関数のシグネチャです。
lambda Any => Optional[(DataFrame, Any)]
ラムダ関数への引数は、最後に処理されたスナップショット バージョンです。
ラムダ関数の戻り値は
None
または 2 つの値のタプルです。タプルの最初の値は、処理されるスナップショットを含む DataFrame です。 タプルの 2 番目の値は、スナップショットの論理的な順序を表すスナップショット バージョンです。
ラムダ関数を実装して呼び出す例を次に示します。
def next_snapshot_and_version(latest_snapshot_version):
if latest_snapshot_version is None:
return (spark.read.load("filename.csv"), 1)
else:
return None
apply_changes_from_snapshot(
# ...
source = next_snapshot_and_version,
# ...
)
Delta Live Tables apply_changes_from_snapshot()
関数を含む Pipeline がトリガーされるたびに、次の手順を実行します。
next_snapshot_and_version
関数を実行して、次のスナップショット DataFrame と対応するスナップショット バージョンを読み込みます。DataFrame が返されない場合、実行は終了し、パイプラインの更新は完了としてマークされます。
新しいスナップショットの変更を検出し、それをターゲット テーブルに段階的に適用します。
ステップ 1 に戻り、次のエンドポイントとそのバージョンを読み込みます。