Delta Live Tables Python言語リファレンス
この記事では、Delta Live Tables Pythonプログラミングインターフェイスの詳細を説明します。
SQL APIの詳細については、「Delta Live Tables SQL言語リファレンス」を参照してください。
オートローダーの設定に関する詳細は、「オートローダーとは?」を参照してください。
制限事項
Delta Live Tables Pythonインターフェイスには次の制限があります:
Python
table
関数とview
関数はDataFrameを返す必要があります。DataFrameを操作する一部の関数はDataFrameを返さないため、使用しないでください。DataFrame変換は完全なデータフローグラフが解決された後に実行されるため、このような操作を使用すると、意図しない副作用が発生する可能性があります。これらの操作には、collect()
、count()
、toPandas()
、save()
、saveAsTable()
などの関数が含まれます。ただし、このコードはグラフの初期化フェーズ中に1回実行されるため、これらの関数をtable
またはview
関数定義の外に含めることができます。pivot()
関数はサポートされていません。Sparkのpivot
オペレーションでは、出力のスキーマを計算するために入力データの積極的な読み込みが必要です。この機能は、Delta Live Tablesではサポートされていません。
dlt
Pythonモジュールをインポートする
Delta Live TablesのPython関数は、dlt
モジュールで定義されています。Python APIを使用して実装されたパイプラインは、このモジュールをインポートする必要があります:
import dlt
Delta Live Tablesマテリアライズドビューまたはストリーミングテーブルを作成する
Pythonでは、Delta Live Tablesは、定義クエリーに基づいてデータセットをマテリアライズドビューとして更新するかストリーミングテーブルとして更新するかを決定します。@table
デコレータは、マテリアライズドビューとストリーミングテーブルの両方を定義するために使用されます。
Pythonでマテリアライズドビューを定義するには、データソースに対して静的読み取りを実行するクエリーに@table
を適用します。ストリーミングテーブルを定義するには、データソースに対してストリーミング読み取りを実行するクエリーに@table
を適用します。どちらのデータセットタイプも、次のように同じ構文仕様を持っています:
import dlt
@dlt.table(
name="<name>",
comment="<comment>",
spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
path="<storage-location-path>",
partition_cols=["<partition-column>", "<partition-column>"],
schema="schema-definition",
temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)
Delta Live Tablesビューの作成
Pythonでビューを定義するには、@view
デコレータを適用します。@table
デコレータと同様に、Delta Live Tableのビューは、静的データセットまたはストリーミングデータセットに使用できます。Pythonでビューを定義するための構文を次に示します:
import dlt
@dlt.view(
name="<name>",
comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)
例:テーブルとビューを定義する
Pythonでテーブルまたはビューを定義するには、@dlt.view
または@dlt.table
デコレータを関数に適用します。関数名またはname
パラメーターを使用して、テーブルまたはビューの名前を割り当てることができます。次の例では、2つの異なるデータセットを定義しています。JSONファイルを入力ソースとして受け取るtaxi_raw
というビューと、入力としてtaxi_raw
ビューを受け取るfiltered_data
というテーブルです:
import dlt
@dlt.view
def taxi_raw():
return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")
# Use the function name as the table name
@dlt.table
def filtered_data():
return dlt.read("taxi_raw").where(...)
# Use the name parameter as the table name
@dlt.table(
name="filtered_data")
def create_filtered_data():
return dlt.read("taxi_raw").where(...)
例:同じパイプラインで定義されたデータセットにアクセスする
外部データソースからの読み取りに加えて、Delta Live Tables read()
関数を使用して、同じパイプラインで定義されているデータセットにアクセスできます。次の例は、read()
関数を使用してcustomers_filtered
データセットを作成する方法を示しています:
@dlt.table
def customers_raw():
return spark.read.format("csv").load("/data/customers.csv")
@dlt.table
def customers_filteredA():
return dlt.read("customers_raw").where(...)
spark.table()
関数を使用して、同じパイプラインで定義されたデータセットにアクセスすることもできます。spark.table()
関数を使用してパイプラインで定義されたデータセットにアクセスするときは、LIVE
関数の引数でデータセット名の前にキーワードを追加します:
@dlt.table
def customers_raw():
return spark.read.format("csv").load("/data/customers.csv")
@dlt.table
def customers_filteredB():
return spark.table("LIVE.customers_raw").where(...)
例:Metastoreに登録されたテーブルからの読み取り
Hive Metastoreに登録されているテーブルからデータを読み取るには、関数の引数でLIVE
キーワードを省略し、必要に応じてテーブル名をデータベース名で修飾します:
@dlt.table
def customers():
return spark.table("sales.customers").where(...)
Unity Catalog テーブルからの読み取りの例については、「 Unity Catalog パイプラインへのデータの取り込み」を参照してください。
例:spark.sql
を使ってデータセットにアクセスする
クエリー関数でspark.sql
式を使用してデータセットを返すこともできます。内部データセットから読み取るには、データセット名の前にLIVE.
を追加します:
@dlt.table
def chicago_customers():
return spark.sql("SELECT * FROM LIVE.customers_cleaned WHERE city = 'Chicago'")
複数のソースストリームからストリーミングテーブルへの書き込み
プレビュー
Delta Live Tables での @append_flow
のサポートは パブリック プレビュー段階です。
@append_flow
デコレーターを使用して、複数のストリーミング ソースからストリーミング テーブルに書き込み、次の操作を行うことができます。
既存のストリーミング テーブルにデータを追加するストリーミング ソースを追加および削除し、完全な更新は必要ありません。 たとえば、事業を展開しているすべての地域の地域データを組み合わせたテーブルがあるとします。 新しいリージョンがロールアウトされると、完全な更新を実行せずに新しいリージョン データをテーブルに追加できます。
不足しているヒストリカルデータ (backfilling) を追加して、ストリーミング テーブルを更新します。 たとえば、Apache Kafka トピックによって書き込まれる既存のストリーミングテーブルがあるとします。 また、ストリーミング テーブルに 1 回だけ挿入する必要があるテーブルに格納されているヒストリカルデータがあり、データを挿入する前に複雑な集計を実行する必要があるため、データをストリームできません。
@append_flow
処理によって出力されるレコードのターゲット表を作成するには、create_streaming_table() 関数を使用します。
注
データ品質制約を EXPECTATIONS とともに定義する必要がある場合は、 create_streaming_table()
関数の一部としてターゲット表の EXPECTATIONS を定義します。 @append_flow
定義で期待値を定義することはできません。
@append_flow
の構文は次のとおりです。
import dlt
dlt.create_streaming_table("<target-table-name>")
@dlt.append_flow(
target = "<target-table-name>",
name = "<flow-name>", # optional, defaults to function name
spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
comment = "<comment") # optional
def <function-name>():
return (<streaming query>)
例: 複数の Kafka トピックからストリーミングテーブルへの書き込み
次の例では、 kafka_target
という名前のストリーミングテーブルを作成し、2 つの Kafka トピックからそのストリーミングテーブルに書き込みます。
import dlt
dlt.create_streaming_table("kafka_target")
# Kafka stream from multiple topics
@dlt.append_flow(target = "kafka_target")
def topic1():
return (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic1")
.load()
)
@dlt.append_flow(target = "kafka_target")
def topic2():
return (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic2")
.load()
)
例: 1 回限りのデータバックフィルを実行する
次の例では、クエリーを実行して、ヒストリカルデータをストリーミングテーブルに追加します。
注
バックフィルクエリーがスケジュールされたベースまたは継続的に実行されるパイプラインの一部である場合に、真の 1 回限りのバックフィルを確保するには、パイプラインを一度実行した後にクエリーを削除します。 新しいデータがバックフィル ディレクトリに到着した場合に追加するには、クエリーをそのままにしておきます。
import dlt
@dlt.table()
def csv_target():
return spark.readStream.format("csv").load("path/to/sourceDir")
@dlt.append_flow(target = "csv_target")
def backfill():
return spark.readStream.format("csv").load("path/to/backfill/data/dir")
ストリーミング操作のターゲットとして使用するテーブルを作成する
create_streaming_table()
関数を使用して、 apply_changes()や@append_flow出力レコードなど、ストリーミング操作によって出力されるレコードのターゲット テーブルを作成します。
注
create_target_table()
関数とcreate_streaming_live_table()
関数は非推奨です。Databricksでは、create_streaming_table()
関数を使用するように既存のコードを更新することをお勧めします。
create_streaming_table(
name = "<table-name>",
comment = "<comment>"
spark_conf={"<key>" : "<value", "<key" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
partition_cols=["<partition-column>", "<partition-column>"],
path="<storage-location-path>",
schema="schema-definition",
expect_all = {"<key>" : "<value", "<key" : "<value>"},
expect_all_or_drop = {"<key>" : "<value", "<key" : "<value>"},
expect_all_or_fail = {"<key>" : "<value", "<key" : "<value>"}
)
引数 |
---|
タイプ: テーブル名。 このパラメーターは必須です。 |
タイプ: テーブルの説明(オプション)。 |
タイプ: このクエリーを実行するためのSpark構成のオプションのリスト。 |
タイプ: テーブルのテーブルプロパティのオプションのリスト。 |
タイプ: テーブルのパーティション化に使用する1つ以上の列のオプションのリスト。 |
タイプ: テーブルデータのオプションの格納場所。設定されていない場合、システムはデフォルトでパイプラインの保存場所を使用します。 |
タイプ: テーブルのオプションのスキーマ定義。スキーマは、SQL DDL文字列として、またはPython |
タイプ: テーブルのデータ品質制約 (省略可能)。 複数のエクスペクテーションを表示します。 |
テーブルの実体化方法を制御する
テーブルはまた、その実体化をさらにコントロールすることもできる:
partition_cols
を使用してテーブル をパーティション分割 する方法を指定します。パーティショニングを使用すると、クエリーを高速化できます。テーブルのプロパティは、ビューまたはテーブルを定義するときに設定できます。 Delta Live Tables テーブルのプロパティを参照してください。
path
設定を使用して、テーブルデータの保存場所を設定します。デフォルトでは、path
が設定されていない場合、テーブルデータはパイプラインの保存場所に保存されます。生成された列 は、スキーマ定義で使用できます。例 : スキーマ列とパーティション列の指定を参照してください。
注
サイズが1 TB未満のテーブルの場合、Databricksでは、Delta Live Tablesにデータ構成を制御させることをお勧めします。テーブルがテラバイトを超えて大きくなることが予想されない限り、通常はパーティション列を指定しないでください。
例:スキーマとパーティション列を指定する
オプションで、Python StructType
またはSQL DDL文字列を使用してテーブルスキーマを指定できます。DDL文字列で指定すると、定義には生成された列を含めることができます。
次の例では、Python StructType
を使用してスキーマを指定した sales
というテーブルを作成します。
sales_schema = StructType([
StructField("customer_id", StringType(), True),
StructField("customer_name", StringType(), True),
StructField("number_of_line_items", StringType(), True),
StructField("order_datetime", StringType(), True),
StructField("order_number", LongType(), True)]
)
@dlt.table(
comment="Raw data on sales",
schema=sales_schema)
def sales():
return ("...")
次の例では、DDL文字列を使用してテーブルのスキーマを指定し、生成された列を定義し、パーティション列を定義します:
@dlt.table(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
partition_cols = ["order_day_of_week"])
def sales():
return ("...")
デフォルトでは、スキーマを指定しない場合、Delta Live Tablesはtable
定義からスキーマを推測します。
ソースストリーミングテーブルの変更を無視するようにストリーミングテーブルを構成する
注
skipChangeCommits
フラグは、option()
関数を使用するspark.readStream
でのみ機能します。このフラグは、dlt.read_stream()
関数では使用できません。ソース・ストリーミング・テーブルが apply_changes() 関数のターゲットとして定義されている場合、
skipChangeCommits
フラグを使用することはできません。
既定では、ストリーミング テーブルには追加専用のソースが必要です。 ストリーミング テーブルが別のストリーミング テーブルをソースとして使用し、ソース ストリーミング テーブルで更新または削除 (GDPR の "忘れられる権利" 処理など) が必要な場合、ソース ストリーミング テーブルを読み取るときに skipChangeCommits
フラグを設定して、それらの変更を無視できます。 このフラグの詳細については、「 更新と削除を無視する」を参照してください。
@table
def b():
return spark.readStream.option("skipChangeCommits", "true").table("LIVE.A")
Python Delta Live Tablesのプロパティ
以下のテーブルでは、Delta Live Tablesでテーブルやビューを定義する際に指定できるオプションやプロパティについて説明します:
@tableまたは@view |
---|
タイプ: テーブルまたはビューのオプションの名前。定義されていない場合、関数名がテーブル名またはビュー名として使用されます。 |
タイプ: テーブルの説明(オプション)。 |
タイプ: このクエリーを実行するためのSpark構成のオプションのリスト。 |
タイプ: テーブルのテーブルプロパティのオプションのリスト。 |
タイプ: テーブルデータのオプションの格納場所。設定されていない場合、システムはデフォルトでパイプラインの保存場所を使用します。 |
タイプ: テーブルのパーティション化に使用する1つ以上の列のオプションのコレクション(例: |
タイプ: テーブルのオプションのスキーマ定義。スキーマは、SQL DDL文字列として、またはPython |
タイプ: テーブルを作成しますが、テーブルのメタデータは公開しません。 デフォルトは「False」です。 |
テーブルまたはビューの定義 |
---|
データセットを定義するPython関数。 |
Spark DatasetまたはKoalas DataFrameを返すSpark SQL文。 同じパイプラインで定義されたデータセットから完全な読み取りを実行するには、
同じパイプラインで定義されたデータセットからストリーミング読み取りを実行するには、
PySpark構文を使用して、PythonでDelta Live Tablesクエリーを定義します。 |
エクスペクテーション |
---|
|
|
|
1つ以上のデータ品質制約を宣言します。 |
1つ以上のデータ品質制約を宣言します。 |
1つ以上のデータ品質制約を宣言します。 |
Delta Live TablesでのPythonを使用した変更データキャプチャ
Delta Live Tables CDC 機能を使用するには、Python API のapply_changes()
関数を使用します。 Delta Live Tables Python インターフェイスでは、 create_streaming_table()関数も提供されます。 この関数を使用して、 apply_changes()
関数に必要なターゲット表を作成できます。
apply_changes(
target = "<target-table>",
source = "<data-source>",
keys = ["key1", "key2", "keyN"],
sequence_by = "<sequence-column>",
ignore_null_updates = False,
apply_as_deletes = None,
apply_as_truncates = None,
column_list = None,
except_column_list = None,
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
)
注
INSERT
とUPDATE
イベントのデフォルトの動作は、ソースからCDCイベントをアップサートすることです。指定されたキーに一致するターゲットテーブルの行を更新するか、ターゲットテーブルに一致するレコードが存在しない場合に新しい行を挿入します。DELETE
イベントの処理はAPPLY AS DELETE WHEN
条件で指定できます。
重要
変更を適用するターゲットストリーミングテーブルを宣言する必要があります。オプションでターゲットテーブルのスキーマを指定できます。apply_changes
ターゲットテーブルのスキーマを指定する場合は、sequence_by
フィールドと同じデータ型の__START_AT
列と__END_AT
列も含める必要があります。
「Delta Live Tables の APPLY CHANGES API を使用した簡略化されたチェンジデータキャプチャ」を参照してください。
引数 |
---|
タイプ: 更新するテーブルの名前。 create_streaming_table() 関数を使用して、 このパラメーターは必須です。 |
タイプ: CDCレコードを含むデータソース。 このパラメーターは必須です。 |
タイプ: ソースデータ内の行を一意に識別する列または列の組み合わせ。これは、どのCDCイベントがターゲットテーブル内の特定のレコードに適用されるかを識別するために使用されます。 次のいずれかを指定できます:
このパラメーターは必須です。 |
タイプ: ソースデータ内のCDCイベントの論理順序を指定する列名。Delta Live Tablesは、このシーケンスを使用して、順序どおりに到着しない変更イベントを処理します。 次のいずれかを指定できます:
このパラメーターは必須です。 |
タイプ: ターゲット列のサブセットを含む更新の取り込みを許可する。CDCイベントが既存の行と一致し、 このパラメーターはオプションです。 デフォルトは |
タイプ: CDC イベントをアップサートではなく 次のいずれかを指定できます:
このパラメーターはオプションです。 |
タイプ: CDCイベントを完全なテーブル
次のいずれかを指定できます:
このパラメーターはオプションです。 |
タイプ: ターゲットテーブルに含める列のサブセット。
このパラメーターはオプションです。 デフォルトでは、 |
タイプ: レコードをSCDタイプ1として保存するか、SCDタイプ2として保存するか。 SCDタイプ1の場合は この句はオプションです。 デフォルトはSCDタイプ1です。 |
タイプ: ターゲット表のヒストリーについて追跡される出力列のサブセット。
このパラメーターはオプションです。 デフォルトでは、 |