mesa
O decorador @table
pode ser usado para definir tanto a exibição materializada quanto as tabelas de transmissão. Em Python, DLT determina se deve atualizar uma dataset como uma tabela materializada view ou de transmissão com base na consulta de definição.
Para definir um view materializado em Python, aplique @table
a uma consulta que executa uma leitura estática em uma fonte de dados. Para definir uma tabela de transmissão, aplique @table
a uma consulta que execute uma leitura de transmissão em uma fonte de dados ou use a função create_streaming_table().
Sintaxe
import dlt
@dlt.table(
name="<name>",
comment="<comment>",
spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
path="<storage-location-path>",
partition_cols=["<partition-column>", "<partition-column>"],
cluster_by = ["<clustering-column>", "<clustering-column>"],
schema="schema-definition",
row_filter = "row-filter-clause",
temporary=False)
@dlt.expect(...)
def <function-name>():
return (<query>)
Parâmetros
@dlt.expect()
é uma cláusula opcional de expectativa de DLT. Você pode incluir várias expectativas. Veja as expectativas.
Parâmetro | Tipo | Descrição |
---|---|---|
função |
| Obrigatório. Uma função que retorna um Apache Spark DataFrame ou uma transmissão DataFrame de uma consulta definida pelo usuário. |
|
| O nome da tabela. Se não for fornecido, o padrão será o nome da função. |
|
| Uma descrição para a tabela. |
|
| Uma lista de configurações do Spark para a execução dessa consulta |
|
| Um |
|
| Um local de armazenamento para dados da tabela. Se não for definido, usará o local de armazenamento gerenciar do esquema que contém a tabela. |
|
| Uma lista de uma ou mais colunas a serem usadas para particionar a tabela. |
|
| Habilite o clustering líquido na tabela e defina as colunas a serem usadas como chave clustering. Consulte Usar clustering líquido para tabelas Delta. |
|
| Uma definição de esquema para a tabela. Os esquemas podem ser definidos como uma cadeia de caracteres SQL DDL ou com um Python |
|
| Crie uma tabela, mas não publique a tabela no metastore. Essa tabela está disponível para o pipeline, mas não pode ser acessada fora do pipeline. As tabelas temporárias persistem durante o tempo de vida do pipeline. O padrão é 'falso'. |
|
| (Visualização pública) Uma cláusula de filtro de linha para a tabela. Consulte Publicar tabelas com filtros de linha e máscaras de coluna. |
A especificação de um esquema é opcional e pode ser feita com o PySpark StructType
ou SQL DDL. Quando o senhor especifica um esquema, pode incluir opcionalmente colunas geradas, máscaras de coluna e chaves primária e externa. Veja:
- Delta Lake colunas geradas
- Restrições no Databricks
- Publique tabelas com filtros de linha e máscaras de coluna.
Exemplos
import dlt
# Specify a schema
sales_schema = StructType([
StructField("customer_id", StringType(), True),
StructField("customer_name", StringType(), True),
StructField("number_of_line_items", StringType(), True),
StructField("order_datetime", StringType(), True),
StructField("order_number", LongType(), True)]
)
@dlt.table(
comment="Raw data on sales",
schema=sales_schema)
def sales():
return ("...")
# Specify a schema with SQL DDL, use a generated column, and set clustering columns
@dlt.table(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
cluster_by = ["order_day_of_week", "customer_id"])
def sales():
return ("...")
# Specify partition columns
@dlt.table(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
partition_cols = ["order_day_of_week"])
def sales():
return ("...")
# Specify table constraints
@dlt.table(
schema="""
customer_id STRING NOT NULL PRIMARY KEY,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
""")
def sales():
return ("...")
# Specify a row filter and column mask
@dlt.table(
schema="""
id int COMMENT 'This is the customer ID',
name string COMMENT 'This is the customer full name',
region string,
ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
""",
row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)")
def sales():
return ("...")