Pular para o conteúdo principal

mesa

O decorador @table pode ser usado para definir tabelas de transmissão.

Para definir uma tabela de transmissão, aplique @table a uma consulta que executa uma leitura de transmissão em uma fonte de dados ou use a função create_streaming_table().

nota

No módulo dlt mais antigo, o operador @table era usado para criar tabelas de transmissão e visualizações materializadas. O operador @table no módulo pyspark.pipelines ainda funciona dessa maneira, mas Databricks recomenda usar o operador @materialized_view para criar uma visualização materializada.

Sintaxe

Python
from pyspark import pipelines as dp

@dp.table(
name="<name>",
comment="<comment>",
spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
path="<storage-location-path>",
partition_cols=["<partition-column>", "<partition-column>"],
cluster_by_auto = True,
cluster_by = ["<clustering-column>", "<clustering-column>"],
schema="schema-definition",
row_filter = "row-filter-clause",
private = False)
@dp.expect(...)
def <function-name>():
return (<query>)

Parâmetros

@dp.expect() é uma cláusula opcional de expectativa de pipeline declarativa LakeFlow . Você pode incluir múltiplas expectativas. Veja Expectativas.

Parâmetro

Tipo

Descrição

função

function

Obrigatório. Uma função que retorna um DataFrame de streaming Apache Spark streaming de uma consulta definida pelo usuário.

name

str

O nome da tabela. Se não for fornecido, o padrão será o nome da função.

comment

str

Uma descrição para a tabela.

spark_conf

dict

Uma lista de configurações do Spark para a execução desta consulta

table_properties

dict

Um dict de propriedades de tabela para a tabela.

path

str

Um local de armazenamento para dados da tabela. Se não for definido, use o local de armazenamento gerenciar para o esquema que contém a tabela.

partition_cols

list

Uma lista de uma ou mais colunas a serem usadas para particionar a tabela.

cluster_by_auto

bool

Habilitar clustering automático de líquidos na tabela. Isso pode ser combinado com cluster_by e definir as colunas a serem usadas como chave clustering inicial, seguidas de monitoramento e atualizações automáticas de seleção de key com base na carga de trabalho. Veja clusteringautomático de líquidos.

cluster_by

list

Habilite clustering líquido na tabela e defina as colunas a serem usadas como chave clustering . Consulte Usar clustering líquido para tabelas.

schema

str ou StructType

Uma definição de esquema para a tabela. Os esquemas podem ser definidos como strings DDL SQL ou com um Python StructType.

private

bool

Crie uma tabela, mas não a publique no metastore. Essa tabela está disponível para o pipeline , mas não pode ser acessada fora pipeline. Tabelas privadas persistem durante toda a vida útil do pipeline.

O padrão é False.

Tabelas privadas foram criadas anteriormente com o parâmetro temporary .

row_filter

str

(Visualização pública) Uma cláusula de filtro de linha para a tabela. Consulte Publicar tabelas com filtros de linha e máscaras de coluna.

Especificar um esquema é opcional e pode ser feito com PySpark StructType ou SQL DDL. Ao especificar um esquema, você pode incluir opcionalmente colunas geradas, máscaras de coluna e chaves primária e estrangeira. Ver:

Exemplos

Python
from pyspark import pipelines as dp

# Specify a schema
sales_schema = StructType([
StructField("customer_id", StringType(), True),
StructField("customer_name", StringType(), True),
StructField("number_of_line_items", StringType(), True),
StructField("order_datetime", StringType(), True),
StructField("order_number", LongType(), True)]
)
@dp.table(
comment="Raw data on sales",
schema=sales_schema)
def sales():
return ("...")

# Specify a schema with SQL DDL, use a generated column, and set clustering columns
@dp.table(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
cluster_by = ["order_day_of_week", "customer_id"])
def sales():
return ("...")

# Specify partition columns
@dp.table(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
partition_cols = ["order_day_of_week"])
def sales():
return ("...")

# Specify table constraints
@dp.table(
schema="""
customer_id STRING NOT NULL PRIMARY KEY,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
""")
def sales():
return ("...")

# Specify a row filter and column mask
@dp.table(
schema="""
id int COMMENT 'This is the customer ID',
name string COMMENT 'This is the customer full name',
region string,
ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
""",
row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)")
def sales():
return ("...")