Delta Live Tables SQL 言語リファレンス

この記事では、 Delta Live Tables SQL プログラミング インターフェイスの詳細について説明します。

SQL で Python ユーザー定義関数 (UDF) を使用できますが、SQL ソース ファイルで呼び出す前に、Python ファイルでこれらの UDF を定義する必要があります。 ユーザー定義スカラー関数 - Python を参照してください。

制限

PIVOT 句はサポートされていません。Spark での pivot 操作では、出力のスキーマをコンピュートするために入力データを一括して読み込む必要があります。 この機能は Delta Live Tablesではサポートされていません。

Delta Live Tables マテリアライズドビューまたはストリーミングテーブルを作成する

ストリーミングテーブルまたはマテリアライズドビュー ( LIVE TABLEとも呼ばれます) を宣言するときには、同じ基本的な SQL 構文を使用します。

ストリーミング テーブルは、ストリーミング ソースに対して読み取るクエリー を使用してのみ宣言できます。 Databricks では、クラウド オブジェクト ストレージからのファイルのストリーミング インジェストに Auto Loader を使用することをお勧めします。 SQL 構文Auto Loader を参照してください。

パイプライン内の他のテーブルまたはビューをストリーミングソースとして指定する場合は、データセット名を STREAM() 関数で囲む必要があります。

次に、SQL を使用してマテリアライズドビューとストリーミングテーブルを宣言するための構文について説明します。

CREATE OR REFRESH [TEMPORARY] { STREAMING TABLE | LIVE TABLE } table_name
  [(
    [
    col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ],
    col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
    [ table_constraint ] [, ...]
  )]
  [USING DELTA]
  [PARTITIONED BY (col_name1, col_name2, ... )]
  [LOCATION path]
  [COMMENT table_comment]
  [TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
  AS select_statement

Delta Live Tables ビューを作成する

次に、SQL を使用してビューを宣言するための構文について説明します。

CREATE TEMPORARY [STREAMING] LIVE VIEW view_name
  [(
    [
    col_name1 [ COMMENT col_comment1 ],
    col_name2 [ COMMENT col_comment2 ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
  )]
  [COMMENT view_comment]
  AS select_statement

Auto Loader SQL 構文

次に、SQL で Auto Loader を操作するための構文について説明します。

CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
  FROM cloud_files(
    "<file-path>",
    "<file-format>",
    map(
      "<option-key>", "<option_value",
      "<option-key>", "<option_value",
      ...
    )
  )

サポートされている形式オプションは、 Auto Loaderで使用できます。 map() 関数を使用すると、任意の数のオプションを cloud_files() メソッドに渡すことができます。オプションはキーと値のペアで、キーと値は文字列です。 サポート形式とオプションの詳細については、「 ファイル形式のオプション」を参照してください。

例: テーブルの定義

データセットを作成するには、外部 データソースから読み取るか、パイプラインで定義されたデータセットから読み取ります。 内部データセットから読み取るには、データセット名の前に LIVE キーワードを追加します。 次の例では、JSON ファイルを入力ソースとして受け取る taxi_raw というテーブルと、 taxi_raw テーブルを入力として受け取る filtered_data というテーブルの 2 つの異なるデータセットを定義しています。

CREATE OR REFRESH LIVE TABLE taxi_raw
AS SELECT * FROM json.`/databricks-datasets/nyctaxi/sample/json/`

CREATE OR REFRESH LIVE TABLE filtered_data
AS SELECT
  ...
FROM LIVE.taxi_raw

例: ストリーミングソースからの読み取り

ストリーミングソース ( Auto Loader や内部データセットなど) からデータを読み取るには、 STREAMING テーブルを定義します。

CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(LIVE.customers_bronze)

ストリーミング データの詳細については、「 Delta Live Tablesを使用したデータの変換」を参照してください。

テーブルの具体化方法を制御する

テーブルでは、具体化をさらに制御することもできます。

サイズが 1 TB 未満のテーブルの場合、Databricks では、 Delta Live Tables でデータ編成を制御できるようにすることをお勧めします。 テーブルがテラバイトを超えることが予想される場合を除き、通常はパーティション列を指定しないでください。

例: スキーマとパーティション列を指定する

必要に応じて、テーブルを定義するときにスキーマを指定できます。 次の例では、Delta Lake で生成された 列の使用やテーブルのパーティション列の定義など、ターゲット表のスキーマを指定します。

CREATE OR REFRESH LIVE TABLE sales
(customer_id STRING,
  customer_name STRING,
  number_of_line_items STRING,
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
) PARTITIONED BY (order_day_of_week)
COMMENT "Raw data on sales"
AS SELECT * FROM ...

デフォルトによって、スキーマを指定しない場合、 Delta Live Tables table 定義からスキーマを推論します。

例: テーブル制約の定義

テーブル制約のサポートは パブリック プレビュー段階です。 テーブル制約を定義するには、パイプラインが Unity カタログ対応のパイプラインであり、 previewチャンネルを使用するように構成されている必要があります。

スキーマを指定するときに、主キーと外部キーを定義できます。 制約は情報提供を目的としており、強制されません。 次の例では、主キー制約と外部キー制約を持つテーブルを定義しています。

CREATE OR REFRESH LIVE TABLE sales
(customer_id STRING NOT NULL PRIMARY KEY,
  customer_name STRING,
  number_of_line_items STRING,
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
  CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
)
COMMENT "Raw data on sales"
AS SELECT * FROM ...

テーブルまたはビューの構成値を設定する

SET を使用して、Spark 構成を含むテーブルまたはビューの構成値を指定します。SET ステートメントの後にノートブックで定義したテーブルまたはビューは、定義された値にアクセスできます。SET ステートメントを使用して指定された Spark 構成は、SET ステートメントに続くテーブルまたはビューに対して Spark クエリーを実行するときに使用されます。クエリーの構成値を読み取るには、文字列補間構文 ${}を使用します。 次の例では、 startDate という名前の Spark 構成値を設定し、その値をクエリーで使用します。

SET startDate='2020-01-01';

CREATE OR REFRESH LIVE TABLE filtered
AS SELECT * FROM src
WHERE date > ${startDate}

複数の構成値を指定するには、値ごとに個別の SET ステートメントを使用します。

SQL プロパティ

CREATE TABLE または VIEW

TEMPORARY

テーブルを作成しますが、テーブルのメタデータは公開しません。 TEMPORARY 句は、パイプラインで使用できるがパイプラインの外部からはアクセスできないテーブルを作成するようにDelta Live Tablesに指示します。 処理時間を短縮するために、一時テーブルは 1 回の更新だけでなく、それを作成したパイプラインの存続期間中存続します。

STREAMING

入力データセットをストリームとして読み取るテーブルを作成します。 入力データセットは、ストリーミング Auto Loader や STREAMING テーブルなどのストリーミング である必要があります。

PARTITIONED BY

テーブルのパーティション分割に使用する 1 つ以上の列のオプションのリスト。

LOCATION

テーブル データのオプションの格納場所。 設定されていない場合、システムはパイプラインの格納場所にデフォルトします。

COMMENT

テーブルの説明 (オプション)。

column_constraint

列に対するオプションの通知主キー制約または外部キー 制約

table_constraint

テーブルに対するオプションの通知主キー制約または外部キー 制約

TBLPROPERTIES

テーブルの テーブル プロパティ のオプションのリスト。

select_statement

テーブルのデータセットを定義する Delta Live Tables クエリー。

制約句

EXPECT expectation_name

データ品質制約 expectation_nameを定義します。 制約が定義されていない場合は ON VIOLATION 制約に違反する行をターゲット データセットに追加します。

ON VIOLATION

失敗した行に対して実行するオプションのアクション:

  • FAIL UPDATE: パイプラインの実行を直ちに停止します。

  • DROP ROW: レコードをドロップして処理を続行します。

チェンジデータキャプチャ with SQL in Delta Live Tables

APPLY CHANGES INTO ステートメントを使用して Delta Live Tables CDC 機能を使用します。

CREATE OR REFRESH STREAMING TABLE table_name;

APPLY CHANGES INTO LIVE.table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]

APPLY CHANGESターゲットのデータ品質制約は、非APPLY CHANGESクエリーと同じ CONSTRAINT 句を使用して定義します。「 Delta Live Tables を使用したデータ品質の管理」を参照してください。

INSERT イベントと UPDATE イベントのデフォルトの動作は、ソースから CDC イベントを アップサート することです: 指定されたキーに一致するターゲット テーブル内の行を更新するか、一致するレコードがターゲット テーブルに存在しない場合は新しい行を挿入します。DELETE イベントの処理は、 APPLY AS DELETE WHEN 条件で指定できます。

重要

変更を適用するターゲットストリーミングテーブルを宣言する必要があります。 オプションで、ターゲット表のスキーマを指定できます。 APPLY CHANGES ターゲット表のスキーマを指定する場合は、 sequence_by フィールドと同じデータ・タイプの __START_AT 列と __END_AT 列も含める必要があります。

APPLY CHANGES API参照してください: Delta Live Tablesでのチェンジデータキャプチャの簡素化

KEYS

ソース データ内の行を一意に識別する列または列の組み合わせ。 これは、ターゲット表の特定のレコードに適用される CDC イベントを識別するために使用されます。

この句は必須です。

IGNORE NULL UPDATES

ターゲットカラムのサブセットを含む更新の取り込みを許可します。 CDC イベントが既存の行と一致し、IGNORE NULL UPDATES が指定されている場合、 null を持つ列はターゲット内の既存の値を保持します。 これは、値が nullのネストされた列にも適用されます。

この句はオプションです。

デフォルトは、既存の列を null 値で上書きすることです。

APPLY AS DELETE WHEN

CDC イベントをアップサートではなく DELETE として扱うタイミングを指定します。 順不同のデータを処理するために、削除された行は基になる Delta テーブルに廃棄標識として一時的に保持され、これらの廃棄標識を除外するビューがメタストアに作成されます。 保有期間は、 pipelines.cdc.tombstoneGCThresholdInSeconds テーブル プロパティを使用して構成できます。

この句はオプションです。

APPLY AS TRUNCATE WHEN

CDC イベントを完全なテーブル TRUNCATEとして扱うタイミングを指定します。 この句はターゲットテーブルの完全な切り捨てをトリガーするため、この機能を必要とする特定のユースケースにのみ使用する必要があります。

APPLY AS TRUNCATE WHEN 句は、SCD タイプ 1 でのみサポートされます。SCD タイプ 2 は切り捨てをサポートしていません。

この句はオプションです。

SEQUENCE BY

ソース データ内の CDC イベントの論理的な順序を指定する列名。 Delta Live Tables は、このシーケンス処理を使用して、順不同で到着した変更イベントを処理します。

この句は必須です。

COLUMNS

ターゲット表に含める列のサブセットを指定します。 次のいずれかを実行できます。

  • 含める列の完全なリストを指定します: COLUMNS (userId, name, city).

  • 除外する列のリストを指定します。 COLUMNS * EXCEPT (operation, sequenceNum)

この句はオプションです。

デフォルトでは、 COLUMNS 節が指定されていない場合、ターゲット表のすべての列が組み込まれます。

STORED AS

レコードを SCD タイプ 1 として保管するか、SCD タイプ 2 として保管するか。

この句はオプションです。

デフォルトは SCD タイプ 1 です。

TRACK HISTORY ON

出力列のサブセットを指定して、指定された列に変更があった場合にヒストリー・レコードを生成します。 次のいずれかを実行できます。

  • 追跡する列の完全なリストを指定します: COLUMNS (userId, name, city).

  • 追跡から除外する列のリストを指定します。 COLUMNS * EXCEPT (operation, sequenceNum)

この句はオプションです。 デフォルトは、変更がある場合のすべての出力列の履歴を追跡し、 TRACK HISTORY ON *に相当します。