テーブル
@table
デコレータを使用して、マテリアライズドビューとストリーミングテーブルの両方を定義できます。Python では、DLT は、定義クエリに基づいて、データセットをマテリアライズドビューとして更新するかストリーミングテーブルとして更新するかを決定します。
Python でマテリアライズドビューを定義するには、データソースに対して静的読み取りを実行するクエリに @table
を適用します。ストリーミングテーブルを定義するには、データソースに対してストリーミング読み取りを実行するクエリに @table
を適用するか、 create_streaming_table() 関数を使用します。
構文
import dlt
@dlt.table(
name="<name>",
comment="<comment>",
spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
path="<storage-location-path>",
partition_cols=["<partition-column>", "<partition-column>"],
cluster_by = ["<clustering-column>", "<clustering-column>"],
schema="schema-definition",
row_filter = "row-filter-clause",
temporary=False)
@dlt.expect(...)
def <function-name>():
return (<query>)
パラメーター
@dlt.expect()
はオプションの DLT 期待句です。複数の期待を含めることができます。「期待値」を参照してください。
パラメーター | タイプ | 説明 |
---|---|---|
関数 |
| 必須。Apache Spark DataFrame またはユーザー定義クエリからストリーミング DataFrame を返す関数。 |
|
| テーブル名。指定しない場合、デフォルトは関数名になります。 |
|
| テーブルの説明。 |
|
| このクエリを実行するための Spark 構成の一覧 |
|
| テーブルのテーブル プロパティの |
|
| テーブル・データのストレージ・ロケーション。設定されていない場合は、テーブルを含むスキーマの管理ストレージの場所を使用します。 |
|
| テーブルのパーティション分割に使用する 1 つ以上のカラムのリスト。 |
|
| テーブルでリキッドクラスタリングを有効にし、クラスタリングキーとして使用する列を定義します。 「Deltaテーブルにリキッドクラスタリングを使用する」を参照してください。 |
|
| テーブルのスキーマ定義。スキーマは、SQL DDL 文字列として、または Python |
|
| テーブルを作成しますが、テーブルをメタストアに公開しないでください。そのテーブルはパイプラインで使用できますが、パイプラインの外部からアクセスすることはできません。一時テーブルは、パイプラインの有効期間中保持されます。 デフォルトは「False」です。 |
|
| (パブリック プレビュー)テーブルの行フィルタ句。行フィルターと列マスクを使用したテーブルのパブリッシュを参照してください。 |
スキーマの指定はオプションであり、PySpark StructType
または SQL DDL を使用して行うことができます。スキーマを指定するときは、オプションで、生成されたカラム、カラム・マスク、プライマリ・キーと外部キーを含めることができます。見る:
例
import dlt
# Specify a schema
sales_schema = StructType([
StructField("customer_id", StringType(), True),
StructField("customer_name", StringType(), True),
StructField("number_of_line_items", StringType(), True),
StructField("order_datetime", StringType(), True),
StructField("order_number", LongType(), True)]
)
@dlt.table(
comment="Raw data on sales",
schema=sales_schema)
def sales():
return ("...")
# Specify a schema with SQL DDL, use a generated column, and set clustering columns
@dlt.table(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
cluster_by = ["order_day_of_week", "customer_id"])
def sales():
return ("...")
# Specify partition columns
@dlt.table(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
partition_cols = ["order_day_of_week"])
def sales():
return ("...")
# Specify table constraints
@dlt.table(
schema="""
customer_id STRING NOT NULL PRIMARY KEY,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
""")
def sales():
return ("...")
# Specify a row filter and column mask
@dlt.table(
schema="""
id int COMMENT 'This is the customer ID',
name string COMMENT 'This is the customer full name',
region string,
ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
""",
row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)")
def sales():
return ("...")