メインコンテンツまでスキップ

SQL を使用したパイプライン コードの開発

DLT では、パイプラインでマテリアライズドビューとストリーミングテーブルを定義するための新しい SQL キーワードと関数がいくつか導入されています。 パイプライン開発の SQL サポートは、Spark SQL の基本に基づいて構築され、構造化ストリーミング機能のサポートが追加されています。

PySpark データフレーム に精通しているユーザーは、Python を使用してパイプライン コードを開発することを好むかもしれません。 Python は、メタプログラミング操作など、SQL での実装が困難な、より広範なテストと操作をサポートしています。 Python を使用したパイプライン コードの開発を参照してください。

DLT SQL 構文の完全なリファレンスについては、「 DLT SQL 言語リファレンス」を参照してください。

パイプライン開発のためのSQLの基本

DLT データセットを作成するコードSQL、CREATE OR REFRESH 構文を使用して、クエリ結果に対してマテリアライズドビュー とストリーミングテーブルを定義します。

STREAM キーワードは、SELECT 句で参照されるデータソースをストリーミング セマンティクスで読み取る必要があるかどうかを示します。

パイプラインの構成中に指定されたカタログとスキーマに対してデフォルトを読み書きします。 ターゲット・カタログとスキーマの設定を参照してください。

DLT のソースコードは SQL スクリプトと大きく異なります: DLT は、パイプラインで設定されたすべてのソースコードファイルのすべてのデータセット定義を評価し、クエリが実行される前にデータフローグラフを構築します。ノートブックまたはスクリプトに表示されるクエリの順序は、コード評価の順序を定義しますが、クエリの実行順序は定義しません。

SQL を使用したマテリアライズドビューの作成

次のコード例は、SQL を使用してマテリアライズドビューを作成するための基本的な構文を示しています。

SQL
CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;

SQLを使用してストリーミングテーブルを作成する

次のコード例は、 SQLを使用してストリーミングテーブルを作成するための基本的な構文を示しています。 ストリーミングテーブルのソースを読み取る場合、 STREAM キーワードは、ソースにストリーミングセマンティクスを使用することを示します。マテリアライズドビューを作成するときは、 STREAM キーワードを使用しないでください。

SQL
CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;
注記

ストリーム キーワードを使用して、ストリーミング セマンティクスを使用してソースから読み取ります。 読み取りで既存のレコードに対する変更または削除が検出されると、エラーがスローされます。静的なソースまたは追加専用のソースから読み取るのが最も安全です。変更コミットのあるデータを取り込むには、Python と SkipChangeCommits オプションを使用してエラーを処理できます。

オブジェクトストレージからのデータの読み込み

DLT は、Databricks でサポートされているすべての形式からのデータの読み込みをサポートしています。データ形式のオプションを参照してください。

注記

これらの例では、ワークスペースに自動的にマウントされる /databricks-datasets で使用可能なデータを使用します。 Databricks では、ボリューム パスまたはクラウド URI を使用して、クラウド オブジェクト ストレージに格納されているデータを参照することをお勧めします。 Unity Catalogボリュームとはを参照してください。

Databricks は、クラウド オブジェクト ストレージに格納されているデータに対して増分インジェスト ワークロードを構成する場合は、 Auto Loader とストリーミング テーブルを使用することをお勧めします。 「Auto Loaderとは」を参照してください。

SQL は read_files 関数を使用して Auto Loader 機能を呼び出します。 また、 STREAM キーワードを使用して、 read_filesでストリーミング読み取りを設定する必要があります。

次に、SQL の read_files の構文について説明します。

CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
FROM STREAM read_files(
"<file-path>",
[<option-key> => <option_value>, ...]
)

Auto Loaderのオプションはキーと値のペアです。サポートされている形式とオプションの詳細については、「 オプション」を参照してください。

次の例では、Auto Loaderを使用してJSONファイルからストリーミングテーブルを作成します。

SQL
CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT *
FROM STREAM read_files(
"/databricks-datasets/retail-org/sales_orders",
format => "json");

read_files 関数は、マテリアライズドビューを作成するためのバッチセマンティクスもサポートしています。次の例では、バッチセマンティクスを使用して JSON ディレクトリを読み取り、マテリアライズドビューを作成します。

SQL
CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT *
FROM read_files(
"/databricks-datasets/retail-org/sales_orders",
format => "json");

エクスペクテーションでデータを検証

エクスペクテーションを使用して、データ品質の制約を設定および適用できます。 パイプラインのエクスペクテーションを使用してデータ品質を管理するを参照してください。

次のコードは、データ取り込み中に null のレコードを削除する valid_data という名前のエクスペクテーションを定義しています。

SQL
CREATE OR REFRESH STREAMING TABLE orders_valid(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

パイプラインで定義されたマテリアライズドビューとストリーミングテーブルのクエリ

次の例では、4 つのデータセットを定義しています。

  • JSON データをロードする orders という名前のストリーミングテーブル。
  • CSV データをロードする customers という名前のマテリアライズドビュー。
  • orders データセットと customers データセットのレコードを結合し、注文タイムスタンプを日付にキャストし、customer_idorder_numberstateorder_date の各フィールドを選択する customer_orders という名前のマテリアライズドビュー
  • 各州の日次注文数を集計する daily_orders_by_state という名前のマテリアライズドビュー
注記

パイプライン内のビューまたはテーブルに対してクエリを実行する場合は、カタログとスキーマを直接指定することも、パイプラインで設定されたデフォルトを使用することもできます。 この例では、 orderscustomers、および customer_orders テーブルは、パイプラインに設定されたデフォルトのカタログとスキーマから書き込みと読み取りが行われます。

従来のパブリッシング モードでは、 LIVE スキーマを使用して、パイプラインで定義されている他のマテリアライズドビュー とストリーミング テーブルに対してクエリを実行します。 新しいパイプラインでは、 LIVE スキーマ構文は警告なしで無視されます。 LIVE スキーマ (レガシー)を参照してください。

SQL
CREATE OR REFRESH STREAMING TABLE orders(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");

CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
c.customer_id,
o.order_number,
c.state,
date(timestamp(int(o.order_datetime))) order_date
FROM orders o
INNER JOIN customers c
ON o.customer_id = c.customer_id;

CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM customer_orders
GROUP BY state, order_date;

プライベートテーブルを定義する

PRIVATE 句は、マテリアライズドビューまたはストリーミングテーブルを作成するときに使用できます。プライベートテーブルを作成するときは、テーブルを作成しますが、テーブルのメタデータは作成しません。PRIVATE 句は、パイプラインで使用できるが、パイプラインの外部からアクセスしてはならないテーブルを作成するように DLT に指示します。処理時間を短縮するために、プライベートテーブルは、1 回の更新だけでなく、それを作成するパイプラインの有効期間中保持されます。

プライベート・テーブルは、カタログ内のテーブルと同じ名前にすることができます。パイプライン内のテーブルに非修飾名を指定した場合、その名前のプライベート・テーブルとカタログ・テーブルの両方が存在する場合は、プライベート・テーブルが使用されます。

プライベートテーブルは、以前は一時テーブルとして参照されていました。

マテリアライズドビューまたはストリーミングテーブルからのレコードの完全削除

GDPR コンプライアンスなど、削除ベクトルが有効になっているストリーミングテーブルからレコードを完全に削除するには、オブジェクトの基になる Delta テーブルに対して追加の操作を実行する必要があります。ストリーミングテーブルからレコードを確実に削除するには、「 ストリーミングテーブルからレコードを完全に削除する」を参照してください。

マテリアライズドビュー は、基になるテーブルが更新されると、常にデータを反映します。 マテリアライズドビューのデータを削除するには、ソースからデータを削除し、マテリアライズドビューを更新する必要があります。

SQL でテーブルまたはビューを宣言するときに使用する値をパラメータ化します

SET を使用して、テーブルまたはビューを宣言するクエリで設定値 (Spark 設定を含む) を指定します。ノートブックで SET ステートメントの後に定義するテーブルまたはビューは、定義された値にアクセスできます。SET ステートメントを使用して指定された Spark 設定は、SET ステートメントに続くテーブルまたはビューに対して Spark クエリを実行するときに使用されます。クエリの設定値を読み取るには、文字列補間構文 ${}を使用します。次の例では、 startDate という名前の Spark 設定値を設定し、その値をクエリで使用します。

SET startDate='2025-01-01';

CREATE OR REFRESH MATERIALIZED VIEW filtered
AS SELECT * FROM src
WHERE date > ${startDate}

複数の設定値を指定するには、値ごとに個別の SET ステートメントを使用します。

制限

PIVOT句はサポートされていません。Spark の pivot 操作では、入力データを出力スキーマのコンピュートに一括して読み込む必要があります。この機能は DLT ではサポートされていません。

注記

マテリアライズドビューを作成するための CREATE OR REFRESH LIVE TABLE 構文は非推奨です。 代わりに、 CREATE OR REFRESH MATERIALIZED VIEW.