Pular para o conteúdo principal

visualização_materializada

O decorador @materialized_view pode ser usado para definir uma visualização materializada.

Para definir uma view materializada, aplique @materialized_view a uma consulta que executa uma leitura de lotes em uma fonte de dados.

Sintaxe

Python
from pyspark import pipelines as dp

@dp.materialized_view(
name="<name>",
comment="<comment>",
spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
path="<storage-location-path>",
partition_cols=["<partition-column>", "<partition-column>"],
cluster_by_auto = True,
cluster_by = ["<clustering-column>", "<clustering-column>"],
schema="schema-definition",
row_filter = "row-filter-clause",
private = False)
@dp.expect(...)
def <function-name>():
return (<query>)

Parâmetros

@dp.expect() é uma cláusula opcional de expectativa de pipeline declarativa LakeFlow . Você pode incluir múltiplas expectativas. Veja Expectativas.

Parâmetro

Tipo

Descrição

função

function

Obrigatório. Uma função que retorna um Apache Spark lotes DataFrame de uma consulta definida pelo usuário.

name

str

O nome da tabela. Se não for fornecido, o padrão será o nome da função.

comment

str

Uma descrição para a tabela.

spark_conf

dict

Uma lista de configurações do Spark para a execução desta consulta

table_properties

dict

Um dict de propriedades de tabela para a tabela.

path

str

Um local de armazenamento para dados da tabela. Se não for definido, use o local de armazenamento gerenciar para o esquema que contém a tabela.

partition_cols

list

Uma lista de uma ou mais colunas a serem usadas para particionar a tabela.

cluster_by_auto

bool

Habilitar clustering automático de líquidos na tabela. Isso pode ser combinado com cluster_by e definir as colunas a serem usadas como chave clustering inicial, seguidas de monitoramento e atualizações automáticas de seleção de key com base na carga de trabalho. Veja clusteringautomático de líquidos.

cluster_by

list

Habilite clustering líquido na tabela e defina as colunas a serem usadas como chave clustering . Consulte Usar clustering líquido para tabelas.

schema

str ou StructType

Uma definição de esquema para a tabela. Os esquemas podem ser definidos como strings DDL SQL ou com um Python StructType.

private

bool

Crie uma tabela, mas não a publique no metastore. Essa tabela está disponível para o pipeline , mas não pode ser acessada fora pipeline. Tabelas privadas persistem durante toda a vida útil do pipeline.

O padrão é False.

row_filter

str

(Visualização pública) Uma cláusula de filtro de linha para a tabela. Consulte Publicar tabelas com filtros de linha e máscaras de coluna.

Especificar um esquema é opcional e pode ser feito com PySpark StructType ou SQL DDL. Ao especificar um esquema, você pode incluir opcionalmente colunas geradas, máscaras de coluna e chaves primária e estrangeira. Ver:

Exemplos

Python
from pyspark import pipelines as dp

# Specify a schema
sales_schema = StructType([
StructField("customer_id", StringType(), True),
StructField("customer_name", StringType(), True),
StructField("number_of_line_items", StringType(), True),
StructField("order_datetime", StringType(), True),
StructField("order_number", LongType(), True)]
)
@dp.materialized_view(
comment="Raw data on sales",
schema=sales_schema)
def sales():
return ("...")

# Specify a schema with SQL DDL, use a generated column, and set clustering columns
@dp.materialized_view(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
cluster_by = ["order_day_of_week", "customer_id"])
def sales():
return ("...")

# Specify partition columns
@dp.materialized_view(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
partition_cols = ["order_day_of_week"])
def sales():
return ("...")

# Specify table constraints
@dp.materialized_view(
schema="""
customer_id STRING NOT NULL PRIMARY KEY,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
""")
def sales():
return ("...")

# Specify a row filter and column mask
@dp.materialized_view(
schema="""
id int COMMENT 'This is the customer ID',
name string COMMENT 'This is the customer full name',
region string,
ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
""",
row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)")
def sales():
return ("...")