DLT SQL 言語リファレンス
この記事では、DLT SQL プログラミング・インターフェースについて詳しく説明します。
- Python APIの情報については、DLT Python language reference を参照してください。
- コマンドの詳細についてはSQLSQL言語リファレンスを参照してください。
SQL クエリで Python ユーザー定義関数 (UDF) を使用できますが、これらの UDF を SQL ソース ファイルで呼び出す前に、Python ファイルで定義する必要があります。 ユーザー定義のスカラー関数 - Pythonを参照してください。
制限
PIVOT
句はサポートされていません。Spark の pivot
操作では、入力データを出力スキーマのコンピュートに一括して読み込む必要があります。この機能は DLT ではサポートされていません。
DLT マテリアライズドビュー または ストリーミングテーブルを作成する
マテリアライズドビューを作成するための CREATE OR REFRESH LIVE TABLE
構文は非推奨です。 代わりに、 CREATE OR REFRESH MATERIALIZED VIEW
.
ストリーミングテーブルまたはマテリアライズドビューを宣言するときにも、同じ基本的な SQL 構文を使用します。
SQL を使用した DLT マテリアライズドビューの宣言
次に、 SQLを使用して DLT でマテリアライズドビューを宣言するための構文について説明します。
CREATE OR REFRESH MATERIALIZED VIEW view_name
[(
[
col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
...
]
[
CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
...
]
[ table_constraint ] [, ...]
)]
[USING DELTA]
[PARTITIONED BY (col_name1, col_name2, ... )]
[CLUSTER BY clause]
[LOCATION path]
[COMMENT table_comment]
[TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
[ WITH { ROW FILTER func_name ON ( [ column_name | constant_literal [, ...] ] ) [...] } ]
AS select_statement
DLT ストリーミングテーブルを SQL で宣言する
ストリーミングテーブルを宣言できるのは、ストリーミングソースに対して読み取るクエリのみを使用します。 Databricks では、クラウド オブジェクト ストレージからのファイルのストリーミング インジェストに Auto Loader を使用することをお勧めします。 Auto Loader SQL構文を参照してください。
パイプライン内の他のテーブルまたはビューをストリーミング ソースとして指定する場合は、データセット名の周囲に STREAM()
関数を含める必要があります。
次に、SQL を使用して DLT でストリーミングテーブルを宣言するための構文について説明します。
CREATE OR REFRESH [TEMPORARY] STREAMING TABLE table_name
[(
[
col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
...
]
[
CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
...
]
[ table_constraint ] [, ...]
)]
[USING DELTA]
[PARTITIONED BY (col_name1, col_name2, ... )]
[CLUSTER BY clause]
[LOCATION path]
[COMMENT table_comment]
[TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
[ WITH { ROW FILTER func_name ON ( [ column_name | constant_literal [, ...] ] ) [...] } ]
AS select_statement
DLT ビューの作成
次に、SQL でビューを宣言するための構文について説明します。
CREATE TEMPORARY [STREAMING] LIVE VIEW view_name
[(
[
col_name1 [ COMMENT col_comment1 ],
col_name2 [ COMMENT col_comment2 ],
...
]
[
CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
...
]
)]
[COMMENT view_comment]
AS select_statement
Auto Loader SQL 構文
以下は、SQLでAuto Loaderを操作するための構文です。
CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
FROM STREAM read_files(
"<file-path>",
[<option-key> => <option_value>, ...]
)
Auto Loaderのオプションはキーと値のペアです。サポートされている形式とオプションの詳細については、「 オプション」を参照してください。
例えば:
CREATE OR REFRESH STREAMING TABLE my_table
AS SELECT *
FROM STREAM read_files(
"/Volumes/my_volume/path/to/files/*",
format => "json",
inferColumnTypes => true,
maxFilesPerTrigger => 100,
schemaEvolutionMode => "addNewColumns",
modifiedAfter => "2025-03-11T23:59:59.999+00:00"
)
例: テーブルの定義
データセットは、外部データソースから読み取るか、パイプラインで定義されたデータセットから読み取ることで作成できます。 内部データセットから読み取るには、カタログとスキーマに設定されたパイプラインのデフォルトを使用するテーブル名を指定します。 次の例では、JSON ファイルを入力ソースとして受け取る taxi_raw
というテーブルと、taxi_raw
テーブルを入力として受け取る filtered_data
というテーブルの 2 つの異なるデータセットを定義しています。
CREATE OR REFRESH MATERIALIZED VIEW taxi_raw
AS SELECT * FROM read_files("/databricks-datasets/nyctaxi/sample/json/")
CREATE OR REFRESH MATERIALIZED VIEW filtered_data
AS SELECT
...
FROM taxi_raw
例: ストリーミング ソースからの読み取り
ストリーミング ソース ( Auto Loader や内部データセットなど) からデータを読み取るには、STREAMING
テーブルを定義します。
CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/customers/", format => "csv")
CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(customers_bronze)
ストリーミング データの詳細については、「 パイプラインを使用したデータの変換」を参照してください。
マテリアライズドビューまたはストリーミングテーブルからのレコードの完全削除
GDPR コンプライアンスなど、削除ベクトルが有効になっているマテリアライズドビューまたはストリーミングテーブルからレコードを完全に削除するには、オブジェクトの基になる Delta テーブルに対して追加の操作を実行する必要があります。 マテリアライズドビューからのレコードの削除を確実にするには、 削除ベクトルが有効になっているマテリアライズドビューからのレコードの完全削除を参照してください。 ストリーミングテーブルからレコードを確実に削除するには、「 ストリーミングテーブルからレコードを完全に削除する」を参照してください。
テーブルの具体化方法を制御する
テーブルはまた、その実体化をさらにコントロールすることもできる:
CLUSTER BY
を使用してテーブルをクラスタリングする方法を指定します。リキッドクラスタリングを使用して、クエリを高速化できます。 「Deltaテーブルにリキッドクラスタリングを使用する」を参照してください。- テーブルを パーティション分割 する方法を
PARTITIONED BY
で指定します。 - テーブル プロパティは、
TBLPROPERTIES
を使用して設定できます。 DLT テーブルのプロパティを参照してください。 LOCATION
設定を使用して保存場所を設定します。デフォルトでは、テーブル データはパイプラインのストレージの場所に保存されます (LOCATION
が設定されていない場合)。- 生成された列をスキーマ定義で使用できます。例: スキーマとクラスター列の指定を参照してください。
サイズが 1 TB 未満のテーブルの場合、Databricks では DLT でデータ編成を制御できるようにすることをお勧めします。テーブルがテラバイトを超えることが予想されない限り、Databricks ではパーティション列を指定しないことをお勧めします。
例: スキーマとクラスター列を指定する
オプションで、テーブルを定義するときにスキーマを指定できます。 次の例では、Delta Lake で生成された列 の使用を含むターゲット テーブルのスキーマを指定し、テーブルのクラスタリング列を定義します。
CREATE OR REFRESH MATERIALIZED VIEW sales
(customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
) CLUSTER BY (order_day_of_week, customer_id)
COMMENT "Raw data on sales"
AS SELECT * FROM ...
デフォルトでは、スキーマを指定しない場合、DLT は table
定義からスキーマを推測します。
例: パーティション列の指定
オプションで、テーブルのパーティション列を指定できます。
CREATE OR REFRESH MATERIALIZED VIEW sales
(customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
) PARTITIONED BY (order_day_of_week)
COMMENT "Raw data on sales"
AS SELECT * FROM ...
リキッドクラスタリングは、クラスタリングのための柔軟で最適化されたソリューションを提供します。 DLT には PARTITIONED BY
の代わりに CLUSTER BY
を使用することを検討してください。
例: テーブル制約の定義
テーブル制約の DLT サポートは パブリック プレビュー段階です。テーブル制約を定義するには、パイプラインが Unity Catalog 対応のパイプラインであり、 preview
チャンネルを使用するように構成されている必要があります。
スキーマを指定するときに、プライマリ・キーと外部キーを定義できます。 制約は情報提供を目的としており、強制されません。 SQL 言語リファレンスの CONSTRAINT 句 を参照してください。
次の例では、プライマリ・キー制約と外部キー制約を持つテーブルを定義しています。
CREATE OR REFRESH MATERIALIZED VIEW sales
(customer_id STRING NOT NULL PRIMARY KEY,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
)
COMMENT "Raw data on sales"
AS SELECT * FROM ...
SQL でテーブルまたはビューを宣言するときに使用する値をパラメータ化します
SET
を使用して、テーブルまたはビューを宣言するクエリで設定値 (Spark 設定を含む) を指定します。ノートブックで SET
ステートメントの後に定義するテーブルまたはビューは、定義された値にアクセスできます。 SET
ステートメントを使用して指定された Spark 設定は、SET ステートメントに続くテーブルまたはビューに対して Spark クエリを実行するときに使用されます。クエリの設定値を読み取るには、文字列補間構文 ${}
を使用します。 次の例では、 startDate
という名前の Spark 設定値を設定し、その値をクエリで使用します。
SET startDate='2020-01-01';
CREATE OR REFRESH MATERIALIZED VIEW filtered
AS SELECT * FROM src
WHERE date > ${startDate}
複数の設定値を指定するには、値ごとに個別の SET
ステートメントを使用します。
例: 行フィルターと列マスクの定義
プレビュー
行フィルターと列マスクは パブリック プレビュー段階です。
ロー・フィルターとカラム・マスクを使用してマテリアライズドビュー またはストリーミング・テーブルを作成するには、 ROW FILTER 句 と MASK 句を使用します。 次の例は、マテリアライズドビューとストリーミングテーブルを行フィルタと列マスクの両方を使用して定義する方法を示しています。
CREATE OR REFRESH STREAMING TABLE customers_silver (
id int COMMENT 'This is the customer ID',
name string,
region string,
ssn string MASK catalog.schema.ssn_mask_fn COMMENT 'SSN masked for privacy'
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT * FROM STREAM(customers_bronze)
CREATE OR REFRESH MATERIALIZED VIEW sales (
customer_id STRING MASK catalog.schema.customer_id_mask_fn,
customer_name STRING,
number_of_line_items STRING COMMENT 'Number of items in the order',
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
)
COMMENT "Raw data on sales"
WITH ROW FILTER catalog.schema.order_number_filter_fn ON (order_number)
AS SELECT * FROM sales_bronze
行フィルターと列マスクの詳細については、「 行フィルターと列マスクを使用してテーブルを発行する」を参照してください。
SQL プロパティ
CREATE TABLE または VIEW |
---|
|
|
|
|
|
|
|
|
|
|
|
|
CONSTRAINT 句 |
---|
|
|
DLTの SQL を持つチェンジデータキャプチャ
APPLY CHANGES INTO
ステートメントを使用して、以下で説明するように DLT CDC 機能を使用します。
CREATE OR REFRESH STREAMING TABLE table_name;
APPLY CHANGES INTO table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]
APPLY CHANGES
ターゲットのデータ品質制約を定義するには、非APPLY CHANGES
クエリと同じ CONSTRAINT
句を使用します。「パイプラインのエクスペクテーションを使用してデータ品質を管理する」を参照してください。
INSERT
とUPDATE
イベントのデフォルトの動作は、ソースからCDCイベントを アップサート することです。指定されたキーに一致するターゲットテーブルの行を更新するか、ターゲットテーブルに一致するレコードが存在しない場合に新しい行を挿入します。DELETE
イベントの処理はAPPLY AS DELETE WHEN
条件で指定できます。
変更を適用するターゲットストリーミングテーブルを宣言する必要があります。オプションでターゲットテーブルのスキーマを指定できます。APPLY CHANGES
ターゲットテーブルのスキーマを指定する場合は、sequence_by
フィールドと同じデータ型の__START_AT
列と__END_AT
列も含める必要があります。
「 APPLY CHANGES APIs: DLTによるチェンジデータキャプチャの簡素化」を参照してください。
句 |
---|
|
|
|
|
|
|
|
|