マテリアライズドビュー
@materialized_viewデコレータはマテリアライズドビューを定義するために使用できます。
マテリアライズドビューを定義するには、データ ソースに対してバッチ読み取りを実行するクエリに@materialized_view適用します。
構文
from pyspark import pipelines as dp
@dp.materialized_view(
  name="<name>",
  comment="<comment>",
  spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  path="<storage-location-path>",
  partition_cols=["<partition-column>", "<partition-column>"],
  cluster_by_auto = True,
  cluster_by = ["<clustering-column>", "<clustering-column>"],
  schema="schema-definition",
  row_filter = "row-filter-clause",
  private = False)
@dp.expect(...)
def <function-name>():
    return (<query>)
問題
@dp.expect() は省略可能な Lakeflow 宣言型パイプラインのエクスペクテーション句です。複数のエクスペクテーションを含めることができます。エクスペクテーションを参照してください。
| パラメーター | Type | 説明 | 
|---|---|---|
| function | 
 | 必須。ユーザー定義のクエリから Apache Spark バッチ DataFrame を返す関数。 | 
| 
 | 
 | テーブル名。指定されていない場合は、デフォルトで関数名になります。 | 
| 
 | 
 | テーブルの説明。 | 
| 
 | 
 | このクエリを実行するためのSpark構成のリスト | 
| 
 | 
 | テーブルのテーブル プロパティの | 
| 
 | 
 | テーブル データの保存場所。設定されていない場合は、テーブルを含むスキーマの管理されたストレージの場所を使用します。 | 
| 
 | 
 | テーブルのパーティション分割に使用する 1 つ以上の列のリスト。 | 
| 
 | 
 | テーブル上で自動液体クラスタリングを有効にします。 これを | 
| 
 | 
 | テーブルでリキッドクラスタリングを有効にし、クラスタリングキーとして使用する列を定義します。テーブルにリキッドクラスタリングを使用するを参照してください。 | 
| 
 | 
 | テーブルのスキーマ定義。スキーマは、SQL DDL 文字列または Python  | 
| 
 | 
 | テーブルを作成しますが、テーブルをメタストアに公開しません。そのテーブルはパイプラインでは使用できますが、パイプラインの外部からはアクセスできません。プライベート テーブルはパイプラインの有効期間中保持されます。 デフォルトは | 
| 
 | 
 | (パブリック プレビュー) テーブルの行フィルター句。「行フィルターと列マスクを使用してテーブルを公開する」を参照してください。 | 
スキーマの指定はオプションであり、PySpark StructTypeまたは SQL DDL を使用して実行できます。スキーマを指定するときに、生成された列、列マスク、主キー、外部キーをオプションで含めることができます。見る:
例
from pyspark import pipelines as dp
# Specify a schema
sales_schema = StructType([
  StructField("customer_id", StringType(), True),
  StructField("customer_name", StringType(), True),
  StructField("number_of_line_items", StringType(), True),
  StructField("order_datetime", StringType(), True),
  StructField("order_number", LongType(), True)]
)
@dp.materialized_view(
  comment="Raw data on sales",
  schema=sales_schema)
def sales():
  return ("...")
# Specify a schema with SQL DDL, use a generated column, and set clustering columns
@dp.materialized_view(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  cluster_by = ["order_day_of_week", "customer_id"])
def sales():
  return ("...")
# Specify partition columns
@dp.materialized_view(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  partition_cols = ["order_day_of_week"])
def sales():
  return ("...")
# Specify table constraints
@dp.materialized_view(
   schema="""
    customer_id STRING NOT NULL PRIMARY KEY,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
    CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
    """)
def sales():
   return ("...")
# Specify a row filter and column mask
@dp.materialized_view(
   schema="""
    id int COMMENT 'This is the customer ID',
    name string COMMENT 'This is the customer full name',
    region string,
    ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
    """,
  row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)")
def sales():
   return ("...")