メインコンテンツまでスキップ

DLT SQL 言語リファレンス

この記事では、DLT SQL プログラミング・インターフェースについて詳しく説明します。

SQL クエリで Python ユーザー定義関数 (UDF) を使用できますが、これらの UDF を SQL ソース ファイルで呼び出す前に、Python ファイルで定義する必要があります。 ユーザー定義のスカラー関数 - Pythonを参照してください。

制限

PIVOT句はサポートされていません。Spark の pivot 操作では、入力データを出力スキーマのコンピュートに一括して読み込む必要があります。この機能は DLT ではサポートされていません。

DLT マテリアライズドビュー または ストリーミングテーブルを作成する

注記

マテリアライズドビューを作成するための CREATE OR REFRESH LIVE TABLE 構文は非推奨です。 代わりに、 CREATE OR REFRESH MATERIALIZED VIEW.

ストリーミングテーブルまたはマテリアライズドビューを宣言するときにも、同じ基本的な SQL 構文を使用します。

SQL を使用した DLT マテリアライズドビューの宣言

次に、 SQLを使用して DLT でマテリアライズドビューを宣言するための構文について説明します。

CREATE OR REFRESH MATERIALIZED VIEW view_name
[(
[
col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
...
]
[
CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
...
]
[ table_constraint ] [, ...]
)]
[USING DELTA]
[PARTITIONED BY (col_name1, col_name2, ... )]
[CLUSTER BY clause]
[LOCATION path]
[COMMENT table_comment]
[TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
[ WITH { ROW FILTER func_name ON ( [ column_name | constant_literal [, ...] ] ) [...] } ]
AS select_statement

DLT ストリーミングテーブルを SQL で宣言する

ストリーミングテーブルを宣言できるのは、ストリーミングソースに対して読み取るクエリのみを使用します。 Databricks では、クラウド オブジェクト ストレージからのファイルのストリーミング インジェストに Auto Loader を使用することをお勧めします。 Auto Loader SQL構文を参照してください。

パイプライン内の他のテーブルまたはビューをストリーミング ソースとして指定する場合は、データセット名の周囲に STREAM() 関数を含める必要があります。

次に、SQL を使用して DLT でストリーミングテーブルを宣言するための構文について説明します。

CREATE OR REFRESH [TEMPORARY] STREAMING TABLE table_name
[(
[
col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
...
]
[
CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
...
]
[ table_constraint ] [, ...]
)]
[USING DELTA]
[PARTITIONED BY (col_name1, col_name2, ... )]
[CLUSTER BY clause]
[LOCATION path]
[COMMENT table_comment]
[TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
[ WITH { ROW FILTER func_name ON ( [ column_name | constant_literal [, ...] ] ) [...] } ]
AS select_statement

DLT ビューの作成

次に、SQL でビューを宣言するための構文について説明します。

CREATE TEMPORARY [STREAMING] LIVE VIEW view_name
[(
[
col_name1 [ COMMENT col_comment1 ],
col_name2 [ COMMENT col_comment2 ],
...
]
[
CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
...
]
)]
[COMMENT view_comment]
AS select_statement

Auto Loader SQL 構文

以下は、SQLでAuto Loaderを操作するための構文です。

CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
FROM STREAM read_files(
"<file-path>",
[<option-key> => <option_value>, ...]
)

Auto Loaderのオプションはキーと値のペアです。サポートされている形式とオプションの詳細については、「 オプション」を参照してください。

例えば:

CREATE OR REFRESH STREAMING TABLE my_table
AS SELECT *
FROM STREAM read_files(
"/Volumes/my_volume/path/to/files/*",
format => "json",
inferColumnTypes => true,
maxFilesPerTrigger => 100,
schemaEvolutionMode => "addNewColumns",
modifiedAfter => "2025-03-11T23:59:59.999+00:00"
)

例: テーブルの定義

データセットは、外部データソースから読み取るか、パイプラインで定義されたデータセットから読み取ることで作成できます。 内部データセットから読み取るには、カタログとスキーマに設定されたパイプラインのデフォルトを使用するテーブル名を指定します。 次の例では、JSON ファイルを入力ソースとして受け取る taxi_raw というテーブルと、taxi_raw テーブルを入力として受け取る filtered_data というテーブルの 2 つの異なるデータセットを定義しています。

CREATE OR REFRESH MATERIALIZED VIEW taxi_raw
AS SELECT * FROM read_files("/databricks-datasets/nyctaxi/sample/json/")

CREATE OR REFRESH MATERIALIZED VIEW filtered_data
AS SELECT
...
FROM taxi_raw

例: ストリーミング ソースからの読み取り

ストリーミング ソース ( Auto Loader や内部データセットなど) からデータを読み取るには、STREAMING テーブルを定義します。

CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/customers/", format => "csv")

CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(customers_bronze)

ストリーミング データの詳細については、「 パイプラインを使用したデータの変換」を参照してください。

マテリアライズドビューまたはストリーミングテーブルからのレコードの完全削除

GDPR コンプライアンスなど、削除ベクトルが有効になっているマテリアライズドビューまたはストリーミングテーブルからレコードを完全に削除するには、オブジェクトの基になる Delta テーブルに対して追加の操作を実行する必要があります。 マテリアライズドビューからのレコードの削除を確実にするには、 削除ベクトルが有効になっているマテリアライズドビューからのレコードの完全削除を参照してください。 ストリーミングテーブルからレコードを確実に削除するには、「 ストリーミングテーブルからレコードを完全に削除する」を参照してください。

テーブルの具体化方法を制御する

テーブルはまた、その実体化をさらにコントロールすることもできる:

注記

サイズが 1 TB 未満のテーブルの場合、Databricks では DLT でデータ編成を制御できるようにすることをお勧めします。テーブルがテラバイトを超えることが予想されない限り、Databricks ではパーティション列を指定しないことをお勧めします。

例: スキーマとクラスター列を指定する

オプションで、テーブルを定義するときにスキーマを指定できます。 次の例では、Delta Lake で生成された列 の使用を含むターゲット テーブルのスキーマを指定し、テーブルのクラスタリング列を定義します。

CREATE OR REFRESH MATERIALIZED VIEW sales
(customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
) CLUSTER BY (order_day_of_week, customer_id)
COMMENT "Raw data on sales"
AS SELECT * FROM ...

デフォルトでは、スキーマを指定しない場合、DLT は table 定義からスキーマを推測します。

例: パーティション列の指定

オプションで、テーブルのパーティション列を指定できます。

CREATE OR REFRESH MATERIALIZED VIEW sales
(customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
) PARTITIONED BY (order_day_of_week)
COMMENT "Raw data on sales"
AS SELECT * FROM ...

リキッドクラスタリングは、クラスタリングのための柔軟で最適化されたソリューションを提供します。 DLT には PARTITIONED BY の代わりに CLUSTER BY を使用することを検討してください。

例: テーブル制約の定義

注記

テーブル制約の DLT サポートは パブリック プレビュー段階です。テーブル制約を定義するには、パイプラインが Unity Catalog 対応のパイプラインであり、 preview チャンネルを使用するように構成されている必要があります。

スキーマを指定するときに、プライマリ・キーと外部キーを定義できます。 制約は情報提供を目的としており、強制されません。 SQL 言語リファレンスの CONSTRAINT 句 を参照してください。

次の例では、プライマリ・キー制約と外部キー制約を持つテーブルを定義しています。

CREATE OR REFRESH MATERIALIZED VIEW sales
(customer_id STRING NOT NULL PRIMARY KEY,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
)
COMMENT "Raw data on sales"
AS SELECT * FROM ...

SQL でテーブルまたはビューを宣言するときに使用する値をパラメータ化します

SET を使用して、テーブルまたはビューを宣言するクエリで設定値 (Spark 設定を含む) を指定します。ノートブックで SET ステートメントの後に定義するテーブルまたはビューは、定義された値にアクセスできます。 SET ステートメントを使用して指定された Spark 設定は、SET ステートメントに続くテーブルまたはビューに対して Spark クエリを実行するときに使用されます。クエリの設定値を読み取るには、文字列補間構文 ${}を使用します。 次の例では、 startDate という名前の Spark 設定値を設定し、その値をクエリで使用します。

SET startDate='2020-01-01';

CREATE OR REFRESH MATERIALIZED VIEW filtered
AS SELECT * FROM src
WHERE date > ${startDate}

複数の設定値を指定するには、値ごとに個別の SET ステートメントを使用します。

例: 行フィルターと列マスクの定義

備考

プレビュー

行フィルターと列マスクは パブリック プレビュー段階です。

ロー・フィルターとカラム・マスクを使用してマテリアライズドビュー またはストリーミング・テーブルを作成するには、 ROW FILTER 句MASK 句を使用します。 次の例は、マテリアライズドビューとストリーミングテーブルを行フィルタと列マスクの両方を使用して定義する方法を示しています。

SQL
CREATE OR REFRESH STREAMING TABLE customers_silver (
id int COMMENT 'This is the customer ID',
name string,
region string,
ssn string MASK catalog.schema.ssn_mask_fn COMMENT 'SSN masked for privacy'
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT * FROM STREAM(customers_bronze)

CREATE OR REFRESH MATERIALIZED VIEW sales (
customer_id STRING MASK catalog.schema.customer_id_mask_fn,
customer_name STRING,
number_of_line_items STRING COMMENT 'Number of items in the order',
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
)
COMMENT "Raw data on sales"
WITH ROW FILTER catalog.schema.order_number_filter_fn ON (order_number)
AS SELECT * FROM sales_bronze

行フィルターと列マスクの詳細については、「 行フィルターと列マスクを使用してテーブルを発行する」を参照してください。

SQL プロパティ

CREATE TABLE または VIEW

TEMPORARY テーブルを作成しますが、テーブルのメタデータは公開しません。 TEMPORARY 句は、パイプラインで使用できるが、パイプラインの外部からアクセスしてはならないテーブルを作成するように DLT に指示します。処理時間を短縮するために、一時テーブルは、1 回の更新だけでなく、それを作成するパイプラインの有効期間中保持されます。

STREAMING 入力データセットをストリームとして読み取るテーブルを作成します。 入力データセットは、 Auto Loader や STREAMING テーブルなどのストリーミング データソースである必要があります。

CLUSTER BY テーブルでリキッドクラスタリングを有効にし、クラスタリングキーとして使用する列を定義します。 「Deltaテーブルにリキッドクラスタリングを使用する」を参照してください。

PARTITIONED BY テーブルのパーティション化に使用する1つ以上の列のオプションのリスト。

LOCATION テーブルデータのオプションの格納場所。設定されていない場合、システムはデフォルトでパイプラインの保存場所を使用します。

COMMENT テーブルの説明(オプション)。

column_constraint カラムに対する情報プライマリ・キー制約または外部キー制約 (オプション)。

MASK clause (パブリック プレビュー) 機密データを匿名化するための列マスク機能を追加します。 その列に対する今後のクエリでは、列の元の値ではなく、評価された関数の結果が返されます。 これは、関数がユーザーの ID とグループメンバーシップをチェックして、値を編集するかどうかを決定できるため、きめ細かなアクセス制御に役立ちます。 Column maskを参照してください。

table_constraint テーブル上の情報プライマリ・キー制約または外部キー制約(オプション)。

TBLPROPERTIES テーブルの テーブルプロパティ のオプションのリスト。

WITH ROW FILTER clause (パブリック プレビュー) テーブルに行フィルター関数を追加します。 そのテーブルに対する今後のクエリは、関数が TRUE と評価される行のサブセットを受け取ります。 これは、関数が呼び出し元のユーザーの ID とグループメンバーシップを検査して、特定の行をフィルタリングするかどうかを決定できるため、きめ細かなアクセス制御に役立ちます。 ROW FILTERを参照してください。

select_statement テーブルのデータセットを定義する DLT クエリ。

CONSTRAINT 句

EXPECT expectation_name データ品質制約 expectation_nameを定義します。 ON VIOLATION制約が定義されていない場合は、制約に違反する行をターゲット データセットに追加します。

ON VIOLATION 失敗した行に対して実行するオプションのアクション: - FAIL UPDATE: パイプラインの実行を直ちに停止します。 - DROP ROW: レコードをドロップして処理を続行します。

DLTの SQL を持つチェンジデータキャプチャ

APPLY CHANGES INTO ステートメントを使用して、以下で説明するように DLT CDC 機能を使用します。

CREATE OR REFRESH STREAMING TABLE table_name;

APPLY CHANGES INTO table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]

APPLY CHANGESターゲットのデータ品質制約を定義するには、非APPLY CHANGESクエリと同じ CONSTRAINT 句を使用します。「パイプラインのエクスペクテーションを使用してデータ品質を管理する」を参照してください。

注記

INSERTUPDATE イベントのデフォルトの動作は、ソースからCDCイベントを アップサート することです。指定されたキーに一致するターゲットテーブルの行を更新するか、ターゲットテーブルに一致するレコードが存在しない場合に新しい行を挿入します。DELETEイベントの処理はAPPLY AS DELETE WHEN条件で指定できます。

important

変更を適用するターゲットストリーミングテーブルを宣言する必要があります。オプションでターゲットテーブルのスキーマを指定できます。APPLY CHANGESターゲットテーブルのスキーマを指定する場合は、sequence_byフィールドと同じデータ型の__START_AT列と__END_AT列も含める必要があります。

APPLY CHANGES APIs: DLTによるチェンジデータキャプチャの簡素化」を参照してください。

KEYS ソースデータ内の行を一意に識別する列または列の組み合わせ。これは、どのCDCイベントがターゲットテーブル内の特定のレコードに適用されるかを識別するために使用されます。 列の組み合わせを定義するには、カンマ区切りの列のリストを使用します。 この句は必須です。

IGNORE NULL UPDATES ターゲットカラムのサブセットを含む更新の取り込みを許可します。 CDC イベントが既存の行と一致し、IGNORE NULL UPDATES が指定されている場合、 null を持つ列はターゲット内の既存の値を保持します。 これは、値が nullのネストされた列にも適用されます。 この句はオプションです。 デフォルトでは、既存の列を null 値で上書きします。

APPLY AS DELETE WHEN CDC イベントをアップサートではなく DELETE として扱うタイミングを指定します。 順不同のデータを処理するために、削除された行は基になる Delta テーブルに廃棄石として一時的に保持され、これらの廃棄石を除外するビューがメタストアに作成されます。 保持間隔は、 pipelines.cdc.tombstoneGCThresholdInSeconds table プロパティ。 この句はオプションです。

APPLY AS TRUNCATE WHEN CDC イベントをフル・テーブル・ TRUNCATEとして扱うタイミングを指定します。 この句はターゲットテーブルの完全な切り捨てをトリガーするため、この機能を必要とする特定のユースケースにのみ使用してください。 APPLY AS TRUNCATE WHEN 句は、SCD タイプ 1 でのみサポートされます。SCD タイプ 2 は、切り捨て操作をサポートしていません。 この句はオプションです。

SEQUENCE BY ソース・データ内の CDC イベントの論理的な順序を指定する列名。同順位を分割するために複数の列が必要な場合は、 STRUCT 式を使用します: 最初に最初の構造体フィールドによって順序付け、次に同点の場合は 2 番目のフィールドの順に並べます。DLT は、このシーケンスを使用して、順不同で到着した変更イベントを処理します。 指定する列は、ソート可能なデータ型である必要があります。 この句は必須です。

COLUMNS ターゲット・テーブルに含める列のサブセットを指定します。 次のいずれかを実行できます。 - 含める列の完全なリストを指定します: COLUMNS (userId, name, city)。 - 除外する列のリストを指定します。 COLUMNS * EXCEPT (operation, sequenceNum) この句はオプションです。 デフォルトでは、 COLUMNS 句が指定されていない場合、ターゲット・テーブルのすべてのカラムが含まれます。

STORED AS レコードをSCDタイプ1として保存するか、SCDタイプ2として保存するか。 この句はオプションです。 デフォルトはSCDタイプ1です。

TRACK HISTORY ON 出力列のサブセットを指定して、指定した列に変更があった場合に履歴レコードを生成します。 次のいずれかを実行できます。 - 追跡する列の完全なリストを指定します: COLUMNS (userId, name, city)。 - トラッキングから除外する列のリストを指定します。 COLUMNS * EXCEPT (operation, sequenceNum) この句はオプションです。 デフォルトでは、変更があった場合にすべての出力カラムの履歴を追跡します。これは TRACK HISTORY ON *に相当します。