Skip to main content

table

The @table decorator can be used to define both materialized views and streaming tables. In Python, DLT determines whether to update a dataset as a materialized view or streaming table based on the defining query.

To define a materialized view in Python, apply @table to a query that performs a static read against a data source. To define a streaming table, apply @table to a query that performs a streaming read against a data source or use the create_streaming_table() function.

Syntax

Python
import dlt

@dlt.table(
name="<name>",
comment="<comment>",
spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
path="<storage-location-path>",
partition_cols=["<partition-column>", "<partition-column>"],
cluster_by = ["<clustering-column>", "<clustering-column>"],
schema="schema-definition",
row_filter = "row-filter-clause",
temporary=False)
@dlt.expect(...)
def <function-name>():
return (<query>)

Parameters

@dlt.expect() is an optional DLT expectation clause. You can include multiple expectations. See Expectations.

Parameter

Type

Description

function

function

Required. A function that returns an Apache Spark DataFrame or streaming DataFrame from a user-defined query.

name

str

The table name. If not provided, defaults to the function name.

comment

str

A description for the table.

spark_conf

dict

A list of Spark configurations for the execution of this query

table_properties

dict

A dict of table properties for the table.

path

str

A storage location for table data. If not set, use the managed storage location for the schema containing the table.

partition_cols

list

A list of one or more columns to use for partitioning the table.

cluster_by

list

Enable liquid clustering on the table and define the columns to use as clustering keys. See Use liquid clustering for Delta tables.

schema

str or StructType

A schema definition for the table. Schemas can be defined as a SQL DDL string or with a Python StructType.

temporary

bool

Create a table, but do not publish the table to the metastore. That table is available to the pipeline but not be accessible outside the pipeline. Temporary tables persist for the lifetime of the pipeline.

The default is ‘False’.

row_filter

str

(Public Preview) A row filter clause for the table. See Publish tables with row filters and column masks.

Specifying a schema is optional and can be done with PySpark StructType or SQL DDL. When you specify a schema, you can optionally include generated columns, column masks, and primary and foreign keys. See:

Examples

Python
import dlt

# Specify a schema
sales_schema = StructType([
StructField("customer_id", StringType(), True),
StructField("customer_name", StringType(), True),
StructField("number_of_line_items", StringType(), True),
StructField("order_datetime", StringType(), True),
StructField("order_number", LongType(), True)]
)
@dlt.table(
comment="Raw data on sales",
schema=sales_schema)
def sales():
return ("...")

# Specify a schema with SQL DDL, use a generated column, and set clustering columns
@dlt.table(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
cluster_by = ["order_day_of_week", "customer_id"])
def sales():
return ("...")

# Specify partition columns
@dlt.table(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
partition_cols = ["order_day_of_week"])
def sales():
return ("...")

# Specify table constraints
@dlt.table(
schema="""
customer_id STRING NOT NULL PRIMARY KEY,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
""")
def sales():
return ("...")

# Specify a row filter and column mask
@dlt.table(
schema="""
id int COMMENT 'This is the customer ID',
name string COMMENT 'This is the customer full name',
region string,
ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
""",
row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)")
def sales():
return ("...")