Delta Live Tables SQL 言語リファレンス
この記事では、Delta Live Tables SQL プログラミング インターフェイスについて詳しく説明します。
Python API の情報については、 Delta Live Tables Python 言語リファレンスを参照してください。
SQL コマンドの詳細については、「 SQL 言語リファレンス」を参照してください。
SQL で Python ユーザー定義関数 (UDF) を使用できますが、SQL ソース ファイルで呼び出す前に、Python ファイルでこれらの UDF を定義する必要があります。 ユーザー定義スカラー関数 - Python を参照してください。
制限
PIVOT
句はサポートされていません。Spark の pivot
操作では、入力データを出力スキーマのコンピュートに一括して読み込む必要があります。この機能は、Delta Live Tables ではサポートされていません。
Delta Live Tables マテリアライズドビューまたはストリーミングテーブルを作成する
注
具体化されたビューを作成するための
CREATE OR REFRESH LIVE TABLE
構文は非推奨です。 代わりに、CREATE OR REFRESH MATERIALIZED VIEW
.CLUSTER BY
句を使用してリキッドクラスタリングを有効にするには、プレビュー チャンネルを使用するようにパイプラインを構成する必要があります。
ストリーミングテーブルまたはマテリアライズドビューを宣言するときにも、同じ基本的な SQL 構文を使用します。
SQL を使用した Delta Live Tables の具体化ビュー の宣言
次に、SQL を使用して Delta Live Tables で具体化されたビューを宣言するための構文について説明します。
CREATE OR REFRESH MATERIALIZED VIEW view_name [CLUSTER BY (col_name1, col_name2, ... )]
[(
[
col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
...
]
[
CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
...
]
[ table_constraint ] [, ...]
)]
[USING DELTA]
[PARTITIONED BY (col_name1, col_name2, ... )]
[LOCATION path]
[COMMENT table_comment]
[TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
[ WITH { ROW FILTER func_name ON ( [ column_name | constant_literal [, ...] ] ) [...] } ]
AS select_statement
SQL を使用して Delta Live Tables ストリーミング テーブルを宣言する
ストリーミング テーブルは、ストリーミング ソースに対して読み取るクエリー を使用してのみ宣言できます。 Databricks では、クラウドオブジェクトストレージからのファイルのストリーミング インジェストに Auto Loader を使用することをお勧めします。 SQL 構文Auto Loader を参照してください。
パイプライン内の他のテーブルまたはビューをストリーミング ソースとして指定する場合は、データセット名の周囲に STREAM()
関数を含める必要があります。
次に、SQL を使用して Delta Live Tables でストリーミング テーブルを宣言するための構文について説明します。
CREATE OR REFRESH [TEMPORARY] STREAMING TABLE table_name [CLUSTER BY (col_name1, col_name2, ... )]
[(
[
col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
...
]
[
CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
...
]
[ table_constraint ] [, ...]
)]
[USING DELTA]
[PARTITIONED BY (col_name1, col_name2, ... )]
[LOCATION path]
[COMMENT table_comment]
[TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
[ WITH { ROW FILTER func_name ON ( [ column_name | constant_literal [, ...] ] ) [...] } ]
AS select_statement
Delta Live Tables ビューを作成する
次に、SQL を使用してビューを宣言するための構文について説明します。
CREATE TEMPORARY [STREAMING] LIVE VIEW view_name
[(
[
col_name1 [ COMMENT col_comment1 ],
col_name2 [ COMMENT col_comment2 ],
...
]
[
CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
...
]
)]
[COMMENT view_comment]
AS select_statement
Auto Loader SQL 構文
次に、SQL で Auto Loader を操作するための構文について説明します。
CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
FROM read_files(
"<file-path>",
"<file-format>",
map(
"<option-key>", "<option_value",
"<option-key>", "<option_value",
...
)
)
サポートされている形式オプションは、 Auto Loaderで使用できます。 map()
関数を使用すると、オプションを read_files()
メソッドに渡すことができます。オプションはキーと値のペアで、キーと値は文字列です。 サポート形式とオプションの詳細については、「 ファイル形式のオプション」を参照してください。
例: テーブルの定義
データセットを作成するには、外部 データソースから読み取るか、パイプラインで定義されたデータセットから読み取ります。 内部データセットから読み取るには、データセット名の前に LIVE
キーワードを追加します。 次の例では、JSON ファイルを入力ソースとして受け取る taxi_raw
というテーブルと、 taxi_raw
テーブルを入力として受け取る filtered_data
というテーブルの 2 つの異なるデータセットを定義しています。
CREATE OR REFRESH MATERIALIZED VIEW taxi_raw
AS SELECT * FROM json.`/databricks-datasets/nyctaxi/sample/json/`
CREATE OR REFRESH MATERIALIZED VIEW filtered_data
AS SELECT
...
FROM LIVE.taxi_raw
例: ストリーミングソースからの読み取り
ストリーミングソース ( Auto Loader や内部データセットなど) からデータを読み取るには、 STREAMING
テーブルを定義します。
CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers/", "csv")
CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(LIVE.customers_bronze)
ストリーミング データの詳細については、「 Delta Live Tablesを使用したデータの変換」を参照してください。
テーブルの具体化方法を制御する
テーブルでは、具体化をさらに制御することもできます。
PARTITIONED BY
を使用してテーブル をパーティション分割 する方法を指定します。パーティショニングを使用すると、クエリーを高速化できます。テーブルのプロパティは、
TBLPROPERTIES
を使用して設定できます。 Delta Live Tables テーブルのプロパティを参照してください。LOCATION
設定を使用して保存場所を設定します。既定では、LOCATION
が設定されていない場合、テーブル データはパイプラインの格納場所に格納されます。生成された列 は、スキーマ定義で使用できます。例 : スキーマ列とパーティション列の指定を参照してください。
注
サイズが 1 TB 未満のテーブルの場合、Databricks では Delta Live Tables でデータ編成を制御することをお勧めします。 テーブルがテラバイトを超えることが予想されない限り、Databricks ではパーティション列を指定しないことをお勧めします。
例: スキーマとパーティション列を指定する
必要に応じて、テーブルを定義するときにスキーマを指定できます。 次の例では、Delta Lake で生成された 列の使用やテーブルのパーティション列の定義など、ターゲット表のスキーマを指定します。
CREATE OR REFRESH MATERIALIZED VIEW sales
(customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
) PARTITIONED BY (order_day_of_week)
COMMENT "Raw data on sales"
AS SELECT * FROM ...
デフォルトで、スキーマを指定しない場合、 Delta Live Tables table
定義からスキーマを推論します。
例: テーブル制約の定義
注
Delta Live Tables のテーブル制約のサポートはパブリック プレビュー段階です。 テーブル制約を定義するには、パイプラインが Unity カタログ対応のパイプラインであり、 preview
チャンネルを使用するように構成されている必要があります。
スキーマを指定するときに、主キーと外部キーを定義できます。 制約は情報提供を目的としており、強制されません。 SQL 言語リファレンスのCONSTRAINT 句を参照してください。
次の例では、主キー制約と外部キー制約を持つテーブルを定義しています。
CREATE OR REFRESH MATERIALIZED VIEW sales
(customer_id STRING NOT NULL PRIMARY KEY,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
)
COMMENT "Raw data on sales"
AS SELECT * FROM ...
SQLでテーブルやビューを宣言するときに使用する値をパラメータ化する
SET
を使用して、Spark 構成を含むテーブルまたはビューを宣言するクエリで構成値を指定します。 SET
ステートメントの後にノートブックで定義したテーブルまたはビューは、定義された値にアクセスできます。 SET
ステートメントを使用して指定された Spark 構成は、SET ステートメントに続く任意のテーブルまたはビューに対して Spark クエリを実行するときに使用されます。 クエリ内の構成値を読み取るには、文字列補間構文${}
を使用します。 次の例では、 startDate
という名前の Spark 構成値を設定し、その値をクエリで使用します。
SET startDate='2020-01-01';
CREATE OR REFRESH MATERIALIZED VIEW filtered
AS SELECT * FROM src
WHERE date > ${startDate}
複数の構成値を指定するには、値ごとに個別の SET
ステートメントを使用します。
例: 行フィルターと列マスクの定義
プレビュー
行フィルターと列マスクは パブリック プレビュー段階です。
行フィルターと列マスクを使用してマテリアライズド ビューまたはストリーミング テーブルを作成するには、 ROW FILTER 句とMASK 句を使用します。 次の例は、行フィルターと列マスクの両方を使用してマテリアライズド ビューとストリーミング テーブルを定義する方法を示しています。
CREATE OR REFRESH STREAMING TABLE customers_silver (
id int COMMENT 'This is the customer ID',
name string,
region string,
ssn string MASK catalog.schema.ssn_mask_fn COMMENT 'SSN masked for privacy'
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT * FROM STREAM(LIVE.customers_bronze)
CREATE OR REFRESH MATERIALIZED VIEW sales (
customer_id STRING MASK catalog.schema.customer_id_mask_fn,
customer_name STRING,
number_of_line_items STRING COMMENT 'Number of items in the order',
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
)
COMMENT "Raw data on sales"
WITH ROW FILTER catalog.schema.order_number_filter_fn ON (order_number)
AS SELECT * FROM LIVE.sales_bronze
行フィルターと列マスクの詳細については、 「行フィルターと列マスクを使用してテーブルを公開する」を参照してください。
SQL プロパティ
注
CLUSTER BY
句を使用してリキッドクラスタリングを有効にするには、プレビュー チャンネルを使用するようにパイプラインを構成する必要があります。
CREATE TABLE または VIEW |
---|
テーブルを作成しますが、テーブルのメタデータは公開しません。 |
入力データセットをストリームとして読み取るテーブルを作成します。 入力データセットは、ストリーミング Auto Loader や |
テーブルでリキッドクラスタリングを有効にし、クラスタリングキーとして使用する列を定義します。 「 Delta テーブルに リキッドクラスタリングを使用する」を参照してください。 |
テーブルのパーティション分割に使用する 1 つ以上の列のオプションのリスト。 |
テーブル データのオプションの格納場所。 設定されていない場合、システムはパイプラインの格納場所にデフォルトします。 |
テーブルの説明 (オプション)。 |
列に対するオプションの通知主キー制約または外部キー制約。 |
機密データを匿名化するための列マスク機能を追加します。 その列に対する今後のクエリでは、列の元の値ではなく、評価された関数の結果が返されます。 この関数は、ユーザーの ID とグループ メンバーシップをチェックして値を編集するかどうかを決定できるため、きめ細かいアクセス制御に役立ちます。 Column mask 句を参照してください。 |
テーブルに対するオプションの通知主キー制約または外部キー制約。 |
テーブルの テーブル プロパティ のオプションのリスト。 |
テーブルに行フィルター関数を追加します。 そのテーブルに対する今後のクエリは、関数が TRUE と評価される行のサブセットを受け取ります。 これは、関数が呼び出しユーザーの ID とグループ メンバーシップを検査して、特定の行をフィルター処理するかどうかを決定できるため、きめ細かいアクセス制御に役立ちます。 ROW FILTER 句を参照してください。 |
テーブルのデータセットを定義する Delta Live Tables クエリー。 |
制約句 |
---|
データ品質制約 |
失敗した行に対して実行するオプションのアクション:
|
チェンジデータキャプチャ with SQL in Delta Live Tables
APPLY CHANGES INTO
ステートメントを使用して Delta Live Tables CDC 機能を使用します。
CREATE OR REFRESH STREAMING TABLE table_name;
APPLY CHANGES INTO LIVE.table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]
APPLY CHANGES
ターゲットのデータ品質制約は、非APPLY CHANGES
クエリーと同じ CONSTRAINT
句を使用して定義します。「 Delta Live Tables を使用したデータ品質の管理」を参照してください。
注
INSERT
イベントと UPDATE
イベントのデフォルトの動作は、ソースから CDC イベントを アップサート することです: 指定されたキーに一致するターゲット テーブル内の行を更新するか、一致するレコードがターゲット テーブルに存在しない場合は新しい行を挿入します。DELETE
イベントの処理は、 APPLY AS DELETE WHEN
条件で指定できます。
重要
変更を適用するターゲットストリーミングテーブルを宣言する必要があります。 オプションで、ターゲット表のスキーマを指定できます。 APPLY CHANGES
ターゲット表のスキーマを指定する場合は、 sequence_by
フィールドと同じデータ・タイプの __START_AT
列と __END_AT
列も含める必要があります。
APPLY CHANGES APIs参照してください: Delta Live Tablesを使用してチェンジデータキャプチャを簡素化します。
句 |
---|
ソース データ内の行を一意に識別する列または列の組み合わせ。 これは、ターゲット表の特定のレコードに適用される CDC イベントを識別するために使用されます。 列の組み合わせを定義するには、カンマ区切りの列のリストを使用します。 この句は必須です。 |
ターゲットカラムのサブセットを含む更新の取り込みを許可します。 CDC イベントが既存の行と一致し、IGNORE NULL UPDATES が指定されている場合、 この句はオプションです。 デフォルトは、既存の列を |
CDC イベントをアップサートではなく この句はオプションです。 |
CDC イベントを完全なテーブル
この句はオプションです。 |
ソース データ内の CDC イベントの論理的な順序を指定する列名。 Delta Live Tables は、このシーケンス処理を使用して、順不同で到着した変更イベントを処理します。 指定する列は、ソート可能なデータ・タイプでなければなりません。 この句は必須です。 |
ターゲット表に含める列のサブセットを指定します。 次のいずれかを実行できます。
この句はオプションです。 デフォルトでは、 |
レコードを SCD タイプ 1 として保管するか、SCD タイプ 2 として保管するか。 この句はオプションです。 デフォルトは SCD タイプ 1 です。 |
出力列のサブセットを指定して、指定された列に変更があった場合にヒストリー・レコードを生成します。 次のいずれかを実行できます。
この句はオプションです。 デフォルトでは、変更があった場合にすべての出力カラムの履歴を追跡します。これは |