メインコンテンツまでスキップ

テーブル

@tableデコレータを使用してストリーミング テーブルを定義できます。

ストリーミング テーブルを定義するには、データ ソースに対してストリーミング読み取りを実行するクエリに@table適用するか、 create_streaming_table() 関数を使用します。

注記

古いdltモジュールでは、ストリーミング テーブルとマテリアライズドビューの両方の作成に@table演算子が使用されていました。 pyspark.pipelinesモジュールの@table演算子は引き続きこの方法で機能しますが、 Databricks 、 @materialized_view演算子を使用してマテリアライズドビューを作成することをお勧めします。

構文

Python
from pyspark import pipelines as dp

@dp.table(
name="<name>",
comment="<comment>",
spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
path="<storage-location-path>",
partition_cols=["<partition-column>", "<partition-column>"],
cluster_by_auto = True,
cluster_by = ["<clustering-column>", "<clustering-column>"],
schema="schema-definition",
row_filter = "row-filter-clause",
private = False)
@dp.expect(...)
def <function-name>():
return (<query>)

問題

@dp.expect() は省略可能な Lakeflow 宣言型パイプラインのエクスペクテーション句です。複数のエクスペクテーションを含めることができます。エクスペクテーションを参照してください。

パラメーター

Type

説明

function

function

必須。ユーザー定義のクエリから Apache Sparkストリーミング データフレーム を返す関数。

name

str

テーブル名。指定されていない場合は、デフォルトで関数名になります。

comment

str

テーブルの説明。

spark_conf

dict

このクエリを実行するためのSpark構成のリスト

table_properties

dict

テーブルのテーブル プロパティdict

path

str

テーブル データの保存場所。設定されていない場合は、テーブルを含むスキーマの管理されたストレージの場所を使用します。

partition_cols

list

テーブルのパーティション分割に使用する 1 つ以上の列のリスト。

cluster_by_auto

bool

テーブル上で自動液体クラスタリングを有効にします。 これをcluster_byと組み合わせて、初期クラスタリング キーとして使用する列を定義し、その後、ワークロードに基づいてモニタリングと自動キー選択の更新を行うことができます。 「自動リキッドクラスタリング」を参照してください。

cluster_by

list

テーブルでリキッドクラスタリングを有効にし、クラスタリングキーとして使用する列を定義します。テーブルにリキッドクラスタリングを使用するを参照してください。

schema

str または StructType

テーブルのスキーマ定義。スキーマは、SQL DDL 文字列または Python StructTypeを使用して定義できます。

private

bool

テーブルを作成しますが、テーブルをメタストアに公開しません。そのテーブルはパイプラインでは使用できますが、パイプラインの外部からはアクセスできません。プライベート テーブルはパイプラインの有効期間中保持されます。

デフォルトはFalseです。

プライベート テーブルは以前にtemporaryを使用して作成されました。

row_filter

str

(パブリック プレビュー) テーブルの行フィルター句。「行フィルターと列マスクを使用してテーブルを公開する」を参照してください。

スキーマの指定はオプションであり、PySpark StructTypeまたは SQL DDL を使用して実行できます。スキーマを指定するときに、生成された列、列マスク、主キー、外部キーをオプションで含めることができます。見る:

Python
from pyspark import pipelines as dp

# Specify a schema
sales_schema = StructType([
StructField("customer_id", StringType(), True),
StructField("customer_name", StringType(), True),
StructField("number_of_line_items", StringType(), True),
StructField("order_datetime", StringType(), True),
StructField("order_number", LongType(), True)]
)
@dp.table(
comment="Raw data on sales",
schema=sales_schema)
def sales():
return ("...")

# Specify a schema with SQL DDL, use a generated column, and set clustering columns
@dp.table(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
cluster_by = ["order_day_of_week", "customer_id"])
def sales():
return ("...")

# Specify partition columns
@dp.table(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
partition_cols = ["order_day_of_week"])
def sales():
return ("...")

# Specify table constraints
@dp.table(
schema="""
customer_id STRING NOT NULL PRIMARY KEY,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
""")
def sales():
return ("...")

# Specify a row filter and column mask
@dp.table(
schema="""
id int COMMENT 'This is the customer ID',
name string COMMENT 'This is the customer full name',
region string,
ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
""",
row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)")
def sales():
return ("...")