Delta Live Tables SQL 言語リファレンス
この記事では、 Delta Live Tables SQL プログラミング インターフェイスの詳細について説明します。
Python API の情報については、 Delta Live Tables Python 言語リファレンスを参照してください。
SQL コマンドの詳細については、「 SQL 言語リファレンス」を参照してください。
SQL で Python ユーザー定義関数 (UDF) を使用できますが、SQL ソース ファイルで呼び出す前に、Python ファイルでこれらの UDF を定義する必要があります。 ユーザー定義スカラー関数 - Python を参照してください。
制限
PIVOT
句はサポートされていません。Spark での pivot
操作では、出力のスキーマをコンピュートするために入力データを一括して読み込む必要があります。 この機能は Delta Live Tablesではサポートされていません。
Delta Live Tables マテリアライズドビューまたはストリーミングテーブルを作成する
ストリーミングテーブルまたはマテリアライズドビュー ( LIVE TABLE
とも呼ばれます) を宣言するときには、同じ基本的な SQL 構文を使用します。
ストリーミング テーブルは、ストリーミング ソースに対して読み取るクエリー を使用してのみ宣言できます。 Databricks では、クラウド オブジェクト ストレージからのファイルのストリーミング インジェストに Auto Loader を使用することをお勧めします。 SQL 構文Auto Loader を参照してください。
パイプライン内の他のテーブルまたはビューをストリーミングソースとして指定する場合は、データセット名を STREAM()
関数で囲む必要があります。
次に、SQL を使用してマテリアライズドビューとストリーミングテーブルを宣言するための構文について説明します。
CREATE OR REFRESH [TEMPORARY] { STREAMING TABLE | LIVE TABLE } table_name
[(
[
col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ],
col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ],
...
]
[
CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
...
]
[ table_constraint ] [, ...]
)]
[USING DELTA]
[PARTITIONED BY (col_name1, col_name2, ... )]
[LOCATION path]
[COMMENT table_comment]
[TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
AS select_statement
Delta Live Tables ビューを作成する
次に、SQL を使用してビューを宣言するための構文について説明します。
CREATE TEMPORARY [STREAMING] LIVE VIEW view_name
[(
[
col_name1 [ COMMENT col_comment1 ],
col_name2 [ COMMENT col_comment2 ],
...
]
[
CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
...
]
)]
[COMMENT view_comment]
AS select_statement
Auto Loader SQL 構文
次に、SQL で Auto Loader を操作するための構文について説明します。
CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
FROM cloud_files(
"<file-path>",
"<file-format>",
map(
"<option-key>", "<option_value",
"<option-key>", "<option_value",
...
)
)
サポートされている形式オプションは、 Auto Loaderで使用できます。 map()
関数を使用すると、任意の数のオプションを cloud_files()
メソッドに渡すことができます。オプションはキーと値のペアで、キーと値は文字列です。 サポート形式とオプションの詳細については、「 ファイル形式のオプション」を参照してください。
例: テーブルの定義
データセットを作成するには、外部 データソースから読み取るか、パイプラインで定義されたデータセットから読み取ります。 内部データセットから読み取るには、データセット名の前に LIVE
キーワードを追加します。 次の例では、JSON ファイルを入力ソースとして受け取る taxi_raw
というテーブルと、 taxi_raw
テーブルを入力として受け取る filtered_data
というテーブルの 2 つの異なるデータセットを定義しています。
CREATE OR REFRESH LIVE TABLE taxi_raw
AS SELECT * FROM json.`/databricks-datasets/nyctaxi/sample/json/`
CREATE OR REFRESH LIVE TABLE filtered_data
AS SELECT
...
FROM LIVE.taxi_raw
例: ストリーミングソースからの読み取り
ストリーミングソース ( Auto Loader や内部データセットなど) からデータを読み取るには、 STREAMING
テーブルを定義します。
CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")
CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(LIVE.customers_bronze)
ストリーミング データの詳細については、「 Delta Live Tablesを使用したデータの変換」を参照してください。
テーブルの具体化方法を制御する
テーブルでは、具体化をさらに制御することもできます。
PARTITIONED BY
を使用してテーブル をパーティション分割 する方法を指定します。パーティショニングを使用すると、クエリーを高速化できます。テーブルのプロパティは、
TBLPROPERTIES
を使用して設定できます。 Delta Live Tables テーブルのプロパティを参照してください。LOCATION
設定を使用して保存場所を設定します。既定では、LOCATION
が設定されていない場合、テーブル データはパイプラインの格納場所に格納されます。生成された列 は、スキーマ定義で使用できます。例 : スキーマ列とパーティション列の指定を参照してください。
注
サイズが 1 TB 未満のテーブルの場合、Databricks では、 Delta Live Tables でデータ編成を制御できるようにすることをお勧めします。 テーブルがテラバイトを超えることが予想される場合を除き、通常はパーティション列を指定しないでください。
例: スキーマとパーティション列を指定する
必要に応じて、テーブルを定義するときにスキーマを指定できます。 次の例では、Delta Lake で生成された 列の使用やテーブルのパーティション列の定義など、ターゲット表のスキーマを指定します。
CREATE OR REFRESH LIVE TABLE sales
(customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
) PARTITIONED BY (order_day_of_week)
COMMENT "Raw data on sales"
AS SELECT * FROM ...
デフォルトによって、スキーマを指定しない場合、 Delta Live Tables table
定義からスキーマを推論します。
例: テーブル制約の定義
注
テーブル制約のサポートは パブリック プレビュー段階です。 テーブル制約を定義するには、パイプラインが Unity カタログ対応のパイプラインであり、 preview
チャンネルを使用するように構成されている必要があります。
スキーマを指定するときに、主キーと外部キーを定義できます。 制約は情報提供を目的としており、強制されません。 次の例では、主キー制約と外部キー制約を持つテーブルを定義しています。
CREATE OR REFRESH LIVE TABLE sales
(customer_id STRING NOT NULL PRIMARY KEY,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
)
COMMENT "Raw data on sales"
AS SELECT * FROM ...
テーブルまたはビューの構成値を設定する
SET
を使用して、Spark 構成を含むテーブルまたはビューの構成値を指定します。SET
ステートメントの後にノートブックで定義したテーブルまたはビューは、定義された値にアクセスできます。SET
ステートメントを使用して指定された Spark 構成は、SET ステートメントに続くテーブルまたはビューに対して Spark クエリーを実行するときに使用されます。クエリーの構成値を読み取るには、文字列補間構文 ${}
を使用します。 次の例では、 startDate
という名前の Spark 構成値を設定し、その値をクエリーで使用します。
SET startDate='2020-01-01';
CREATE OR REFRESH LIVE TABLE filtered
AS SELECT * FROM src
WHERE date > ${startDate}
複数の構成値を指定するには、値ごとに個別の SET
ステートメントを使用します。
SQL プロパティ
CREATE TABLE または VIEW |
---|
テーブルを作成しますが、テーブルのメタデータは公開しません。 |
入力データセットをストリームとして読み取るテーブルを作成します。 入力データセットは、ストリーミング Auto Loader や |
テーブルのパーティション分割に使用する 1 つ以上の列のオプションのリスト。 |
テーブル データのオプションの格納場所。 設定されていない場合、システムはパイプラインの格納場所にデフォルトします。 |
テーブルの説明 (オプション)。 |
列に対するオプションの通知主キー制約または外部キー 制約 。 |
テーブルに対するオプションの通知主キー制約または外部キー 制約 。 |
テーブルの テーブル プロパティ のオプションのリスト。 |
テーブルのデータセットを定義する Delta Live Tables クエリー。 |
制約句 |
---|
データ品質制約 |
失敗した行に対して実行するオプションのアクション:
|
チェンジデータキャプチャ with SQL in Delta Live Tables
APPLY CHANGES INTO
ステートメントを使用して Delta Live Tables CDC 機能を使用します。
CREATE OR REFRESH STREAMING TABLE table_name;
APPLY CHANGES INTO LIVE.table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]
APPLY CHANGES
ターゲットのデータ品質制約は、非APPLY CHANGES
クエリーと同じ CONSTRAINT
句を使用して定義します。「 Delta Live Tables を使用したデータ品質の管理」を参照してください。
注
INSERT
イベントと UPDATE
イベントのデフォルトの動作は、ソースから CDC イベントを アップサート することです: 指定されたキーに一致するターゲット テーブル内の行を更新するか、一致するレコードがターゲット テーブルに存在しない場合は新しい行を挿入します。DELETE
イベントの処理は、 APPLY AS DELETE WHEN
条件で指定できます。
重要
変更を適用するターゲットストリーミングテーブルを宣言する必要があります。 オプションで、ターゲット表のスキーマを指定できます。 APPLY CHANGES
ターゲット表のスキーマを指定する場合は、 sequence_by
フィールドと同じデータ・タイプの __START_AT
列と __END_AT
列も含める必要があります。
APPLY CHANGES API参照してください: Delta Live Tablesでのチェンジデータキャプチャの簡素化。
句 |
---|
ソース データ内の行を一意に識別する列または列の組み合わせ。 これは、ターゲット表の特定のレコードに適用される CDC イベントを識別するために使用されます。 この句は必須です。 |
ターゲットカラムのサブセットを含む更新の取り込みを許可します。 CDC イベントが既存の行と一致し、IGNORE NULL UPDATES が指定されている場合、 この句はオプションです。 デフォルトは、既存の列を |
CDC イベントをアップサートではなく この句はオプションです。 |
CDC イベントを完全なテーブル
この句はオプションです。 |
ソース データ内の CDC イベントの論理的な順序を指定する列名。 Delta Live Tables は、このシーケンス処理を使用して、順不同で到着した変更イベントを処理します。 この句は必須です。 |
ターゲット表に含める列のサブセットを指定します。 次のいずれかを実行できます。
この句はオプションです。 デフォルトでは、 |
レコードを SCD タイプ 1 として保管するか、SCD タイプ 2 として保管するか。 この句はオプションです。 デフォルトは SCD タイプ 1 です。 |
出力列のサブセットを指定して、指定された列に変更があった場合にヒストリー・レコードを生成します。 次のいずれかを実行できます。
この句はオプションです。 デフォルトは、変更がある場合のすべての出力列の履歴を追跡し、 |