SQL を使用したパイプライン コードの開発
DLT では、パイプラインでマテリアライズドビューとストリーミングテーブルを定義するための新しい SQL キーワードと関数がいくつか導入されています。 パイプライン開発の SQL サポートは、Spark SQL の基本に基づいて構築され、構造化ストリーミング機能のサポートが追加されています。
PySpark データフレーム に精通しているユーザーは、Python を使用してパイプライン コードを開発することを好むかもしれません。 Python は、メタプログラミング操作など、SQL での実装が困難な、より広範なテストと操作をサポートしています。 Python を使用したパイプライン コードの開発を参照してください。
DLT SQL 構文の完全なリファレンスについては、「 DLT SQL 言語リファレンス」を参照してください。
パイプライン開発のためのSQLの基本
DLT データセットを作成するコードSQL、CREATE OR REFRESH
構文を使用して、クエリ結果に対してマテリアライズドビュー とストリーミングテーブルを定義します。
STREAM
キーワードは、SELECT
句で参照されるデータソースをストリーミング セマンティクスで読み取る必要があるかどうかを示します。
パイプラインの構成中に指定されたカタログとスキーマに対してデフォルトを読み書きします。 ターゲット・カタログとスキーマの設定を参照してください。
DLT のソースコードは SQL スクリプトと大きく異なります: DLT は、パイプラインで設定されたすべてのソースコードファイルのすべてのデータセット定義を評価し、クエリが実行される前にデータフローグラフを構築します。ノートブックまたはスクリプトに表示されるクエリの順序は、コード評価の順序を定義しますが、クエリの実行順序は定義しません。
SQL を使用したマテリアライズドビューの作成
次のコード例は、SQL を使用してマテリアライズドビューを作成するための基本的な構文を示しています。
CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;
SQLを使用してストリーミングテーブルを作成する
次のコード例は、 SQLを使用してストリーミングテーブルを作成するための基本的な構文を示しています。 ストリーミングテーブルのソースを読み取る場合、 STREAM
キーワードは、ソースにストリーミングセマンティクスを使用することを示します。マテリアライズドビューを作成するときは、 STREAM
キーワードを使用しないでください。
CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;
ストリーム キーワードを使用して、ストリーミング セマンティクスを使用してソースから読み取ります。 読み取りで既存のレコードに対する変更または削除が検出されると、エラーがスローされます。静的なソースまたは追加専用のソースから読み取るのが最も安全です。変更コミットのあるデータを取り込むには、Python と SkipChangeCommits
オプションを使用してエラーを処理できます。
オブジェクトストレージからのデータの読み込み
DLT は、Databricks でサポートされているすべての形式からのデータの読み込みをサポートしています。データ形式のオプションを参照してください。
これらの例では、ワークスペースに自動的にマウントされる /databricks-datasets
で使用可能なデータを使用します。 Databricks では、ボリューム パスまたはクラウド URI を使用して、クラウド オブジェクト ストレージに格納されているデータを参照することをお勧めします。 Unity Catalogボリュームとはを参照してください。
Databricks は、クラウド オブジェクト ストレージに格納されているデータに対して増分インジェスト ワークロードを構成する場合は、 Auto Loader とストリーミング テーブルを使用することをお勧めします。 「Auto Loaderとは」を参照してください。
SQL は read_files
関数を使用して Auto Loader 機能を呼び出します。 また、 STREAM
キーワードを使用して、 read_files
でストリーミング読み取りを設定する必要があります。
次に、SQL の read_files
の構文について説明します。
CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
FROM STREAM read_files(
"<file-path>",
[<option-key> => <option_value>, ...]
)
Auto Loaderのオプションはキーと値のペアです。サポートされている形式とオプションの詳細については、「 オプション」を参照してください。
次の例では、Auto Loaderを使用してJSONファイルからストリーミングテーブルを作成します。
CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT *
FROM STREAM read_files(
"/databricks-datasets/retail-org/sales_orders",
format => "json");
read_files
関数は、マテリアライズドビューを作成するためのバッチセマンティクスもサポートしています。次の例では、バッチセマンティクスを使用して JSON ディレクトリを読み取り、マテリアライズドビューを作成します。
CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT *
FROM read_files(
"/databricks-datasets/retail-org/sales_orders",
format => "json");
エクスペクテーションでデータを検証
エクスペクテーションを使用して、データ品質の制約を設定および適用できます。 パイプラインのエクスペクテーションを使用してデータ品質を管理するを参照してください。
次のコードは、データ取り込み中に null のレコードを削除する valid_data
という名前のエクスペクテーションを定義しています。
CREATE OR REFRESH STREAMING TABLE orders_valid(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
パイプラインで定義されたマテリアライズドビューとストリーミングテーブルのクエリ
次の例では、4 つのデータセットを定義しています。
- JSON データをロードする
orders
という名前のストリーミングテーブル。 - CSV データをロードする
customers
という名前のマテリアライズドビュー。 orders
データセットとcustomers
データセットのレコードを結合し、注文タイムスタンプを日付にキャストし、customer_id
、order_number
、state
、order_date
の各フィールドを選択するcustomer_orders
という名前のマテリアライズドビュー- 各州の日次注文数を集計する
daily_orders_by_state
という名前のマテリアライズドビュー
パイプライン内のビューまたはテーブルに対してクエリを実行する場合は、カタログとスキーマを直接指定することも、パイプラインで設定されたデフォルトを使用することもできます。 この例では、 orders
、 customers
、および customer_orders
テーブルは、パイプラインに設定されたデフォルトのカタログとスキーマから書き込みと読み取りが行われます。
従来のパブリッシング モードでは、 LIVE
スキーマを使用して、パイプラインで定義されている他のマテリアライズドビュー とストリーミング テーブルに対してクエリを実行します。 新しいパイプラインでは、 LIVE
スキーマ構文は警告なしで無視されます。 LIVE スキーマ (レガシー)を参照してください。
CREATE OR REFRESH STREAMING TABLE orders(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");
CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
c.customer_id,
o.order_number,
c.state,
date(timestamp(int(o.order_datetime))) order_date
FROM orders o
INNER JOIN customers c
ON o.customer_id = c.customer_id;
CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM customer_orders
GROUP BY state, order_date;
プライベートテーブルを定義する
PRIVATE
句は、マテリアライズドビューまたはストリーミングテーブルを作成するときに使用できます。プライベートテーブルを作成するときは、テーブルを作成しますが、テーブルのメタデータは作成しません。PRIVATE
句は、パイプラインで使用できるが、パイプラインの外部からアクセスしてはならないテーブルを作成するように DLT に指示します。処理時間を短縮するために、プライベートテーブルは、1 回の更新だけでなく、それを作成するパイプラインの有効期間中保持されます。
プライベート・テーブルは、カタログ内のテーブルと同じ名前にすることができます。パイプライン内のテーブルに非修飾名を指定した場合、その名前のプライベート・テーブルとカタログ・テーブルの両方が存在する場合は、プライベート・テーブルが使用されます。
プライベートテーブルは、以前は一時テーブルとして参照されていました。
マテリアライズドビューまたはストリーミングテーブルからのレコードの完全削除
GDPR コンプライアンスなど、削除ベクトルが有効になっているストリーミングテーブルからレコードを完全に削除するには、オブジェクトの基になる Delta テーブルに対して追加の操作を実行する必要があります。ストリーミングテーブルからレコードを確実に削除するには、「 ストリーミングテーブルからレコードを完全に削除する」を参照してください。
マテリアライズドビュー は、基になるテーブルが更新されると、常にデータを反映します。 マテリアライズドビューのデータを削除するには、ソースからデータを削除し、マテリアライズドビューを更新する必要があります。
SQL でテーブルまたはビューを宣言するときに使用する値をパラメータ化します
SET
を使用して、テーブルまたはビューを宣言するクエリで設定値 (Spark 設定を含む) を指定します。ノートブックで SET
ステートメントの後に定義するテーブルまたはビューは、定義された値にアクセスできます。SET
ステートメントを使用して指定された Spark 設定は、SET ステートメントに続くテーブルまたはビューに対して Spark クエリを実行するときに使用されます。クエリの設定値を読み取るには、文字列補間構文 ${}
を使用します。次の例では、 startDate
という名前の Spark 設定値を設定し、その値をクエリで使用します。
SET startDate='2025-01-01';
CREATE OR REFRESH MATERIALIZED VIEW filtered
AS SELECT * FROM src
WHERE date > ${startDate}
複数の設定値を指定するには、値ごとに個別の SET
ステートメントを使用します。
制限
PIVOT
句はサポートされていません。Spark の pivot
操作では、入力データを出力スキーマのコンピュートに一括して読み込む必要があります。この機能は DLT ではサポートされていません。
マテリアライズドビューを作成するための CREATE OR REFRESH LIVE TABLE
構文は非推奨です。 代わりに、 CREATE OR REFRESH MATERIALIZED VIEW
.