メインコンテンツまでスキップ

SQLを使用してパイプラインコードを開発する

Lakeflow Spark宣言型パイプライン (SDP) では、パイプラインでマテリアライズドビューとストリーミング テーブルを定義するためのいくつかの新しいSQLキーワードと関数が導入されています。 パイプラインを開発するための SQL サポートは、Spark SQL の基礎に基づいて構築され、構造化ストリーミング機能のサポートが追加されています。

PySpark データフレーム に精通しているユーザーは、Python を使用してパイプライン コードを開発することを好むかもしれません。 Python は、メタプログラミング操作など、SQL での実装が困難な、より広範なテストと操作をサポートしています。 Python を使用したパイプライン コードの開発を参照してください。

パイプラインSQL構文の完全なリファレンスについては、 「パイプラインSQL言語リファレンス」を参照してください。

パイプライン開発のためのSQLの基礎

パイプライン データセットを作成するSQLコードは、 CREATE OR REFRESH構文を使用して、クエリ結果に対してマテリアライズドビューとストリーミング テーブルを定義します。

STREAMキーワードは、 SELECT句で参照されるデータソースをストリーミング セマンティクスで読み取る必要があるかどうかを示します。

読み取りと書き込みは、パイプライン構成時に指定されたカタログとスキーマにデフォルト設定されます。「ターゲット カタログとスキーマを設定する」を参照してください。

パイプライン ソース コードは、 SQLスクリプトとは大きく異なります。SDP は、パイプライン内で構成されたすべてのソース コード ファイルにわたってすべてのデータセット定義を評価し、クエリが実行される前にデータフロー グラフを構築します。 ソース ファイル内に表示されるクエリの順序はコードの評価順序を定義しますが、クエリの実行順序は定義しません。

SQLでマテリアライズドビューを作成する

次のコード例は、 SQLを使用してマテリアライズドビューを作成するための基本的な構文を示しています。

SQL
CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;

SQLでストリーミングテーブルを作成する

次のコード例は、 SQLを使用してストリーミング テーブルを作成するための基本的な構文を示しています。 ストリーミング テーブルのソースを読み取るとき、 STREAMキーワードは、ソースにストリーミング セマンティクスを使用することを示します。 マテリアライズドビューを作成するときは、 STREAMキーワードを使用しないでください。

SQL
CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;
注記

ストリーム キーワードを使用して、ストリーミング セマンティクスを使用してソースから読み取ります。 読み取り中に既存のレコードの変更または削除が検出されると、エラーがスローされます。静的ソースまたは追加専用のソースから読み取るのが最も安全です。変更コミットを含むデータを取り込むには、Python とSkipChangeCommitsオプションを使用してエラーを処理できます。

オブジェクトストレージからデータをロードする

パイプラインは、Databricks でサポートされているすべての形式からのデータの読み込みをサポートします。データ形式オプションを参照してください。

注記

これらの例では、ワークスペースに自動的にマウントされる /databricks-datasets で使用可能なデータを使用します。 Databricks では、ボリューム パスまたはクラウド URI を使用して、クラウド オブジェクト ストレージに格納されているデータを参照することをお勧めします。 Unity Catalogボリュームとはを参照してください。

Databricks 、クラウド オブジェクト ストレージに保存されているデータに対して増分取り込みワークロードを構成する場合、 Auto Loaderとストリーミング テーブルを使用することをお勧めします。 「Auto Loader とは何ですか?」を参照してください。

SQL read_files関数を使用してAuto Loader機能を呼び出します。 read_filesでストリーミング読み取りを構成するには、 STREAMキーワードも使用する必要があります。

以下は SQL のread_filesの構文について説明しています。

CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
FROM STREAM read_files(
"<file-path>",
[<option-key> => <option_value>, ...]
)

Auto Loaderのオプションはキーと値のペアです。 サポートされている形式とオプションの詳細については、 「オプション」を参照してください。

次の例では、Auto Loaderを使用してJSONファイルからストリーミングテーブルを作成します。

SQL
CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT *
FROM STREAM read_files(
"/databricks-datasets/retail-org/sales_orders",
format => "json");

read_files 関数は、マテリアライズドビューを作成するためのバッチセマンティクスもサポートしています。次の例では、バッチセマンティクスを使用して JSON ディレクトリを読み取り、マテリアライズドビューを作成します。

SQL
CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT *
FROM read_files(
"/databricks-datasets/retail-org/sales_orders",
format => "json");

エクスペクテーションでデータを検証

エクスペクテーションを使用して、データ品質の制約を設定および適用できます。 パイプラインのエクスペクテーションを使用してデータ品質を管理するを参照してください。

次のコードは、データ取り込み中に null のレコードを削除するvalid_dataという名前の期待値を定義します。

SQL
CREATE OR REFRESH STREAMING TABLE orders_valid(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

パイプラインで定義されたマテリアライズドビューとストリーミングテーブルをクエリする

次の例では、4 つのデータセットを定義します。

  • JSONデータをロードするordersという名前のストリーミング テーブル。
  • CSVデータをロードするcustomersという名前のマテリアライズドビュー。
  • orders データセットと customers データセットのレコードを結合し、注文タイムスタンプを日付にキャストし、customer_idorder_numberstateorder_date の各フィールドを選択する customer_orders という名前のマテリアライズドビュー
  • 各州の日次注文数を集計する daily_orders_by_state という名前のマテリアライズドビュー
注記

パイプライン内のビューまたはテーブルをクエリする場合、カタログとスキーマを直接指定することも、パイプラインで構成されたデフォルトを使用することもできます。この例では、パイプラインに構成されたデフォルトのカタログとスキーマから、 orderscustomers 、およびcustomer_ordersテーブルが書き込まれ、読み取られます。

レガシー公開モードは、 LIVEスキーマを使用して、パイプラインに定義されている他のマテリアライズドビューとストリーミング テーブルをクエリします。 新しいパイプラインでは、 LIVEスキーマ構文は暗黙的に無視されます。LIVE スキーマ (レガシー)を参照してください。

SQL
CREATE OR REFRESH STREAMING TABLE orders(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");

CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
c.customer_id,
o.order_number,
c.state,
date(timestamp(int(o.order_datetime))) order_date
FROM orders o
INNER JOIN customers c
ON o.customer_id = c.customer_id;

CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM customer_orders
GROUP BY state, order_date;

プライベートテーブルを定義する

マテリアライズドビューまたはストリーミング テーブルを作成するときに、 PRIVATE句を使用できます。 プライベート テーブルを作成する場合、テーブルは作成されますが、テーブルのメタデータは作成されません。PRIVATE句は、パイプラインで使用できるがパイプラインの外部からアクセスできないテーブルを作成するように SDP に指示します。処理時間を短縮するために、プライベート テーブルは、1 回の更新だけでなく、それを作成したパイプラインの有効期間中は保持されます。

プライベート テーブルの名前は、カタログ内のテーブルと同じにすることができます。パイプライン内のテーブルに修飾されていない名前を指定した場合、その名前のプライベート テーブルとカタログ テーブルの両方が存在すると、プライベート テーブルが使用されます。

プライベート テーブルは以前は一時テーブルと呼ばれていました。

マテリアライズドビューまたはストリーミングテーブルからレコードを完全に削除する

GDPRコンプライアンスなどの削除を有効にしてストリーミング テーブルからレコードを永久に削除するには、オブジェクトの基礎となるDeltaテーブルに対して追加の操作を実行する必要があります。 ストリーミング テーブルからレコードを確実に削除するには、 「ストリーミング テーブルからレコードを完全に削除する」を参照してください。

マテリアライズドビューは、基になるテーブルが更新されると常にそのデータを反映します。 マテリアライズドビューのデータを削除するには、ソースからデータを削除し、マテリアライズドビューを更新する必要があります。

SQLでテーブルまたはビューを宣言するときに使用する値をパラメータ化する

SETを使用して、Spark 構成を含むテーブルまたはビューを宣言するクエリで構成値を指定します。SETステートメントの後のソース ファイルで定義したテーブルまたはビューは、定義された値にアクセスできます。SETステートメントを使用して指定された Spark 構成は、SET ステートメントに続く任意のテーブルまたはビューに対して Spark クエリを実行するときに使用されます。クエリ内の構成値を読み取るには、文字列補間構文${}を使用します。次の例では、 startDateという名前の Spark 構成値を設定し、その値をクエリで使用します。

SET startDate='2025-01-01';

CREATE OR REFRESH MATERIALIZED VIEW filtered
AS SELECT * FROM src
WHERE date > ${startDate}

複数の構成値を指定するには、値ごとに個別のSETステートメントを使用します。

制限事項

PIVOT句はサポートされていません。Sparkのpivot操作では、出力スキーマをコンピュートするために入力データを積極的にロードする必要があります。 この機能はパイプラインではサポートされていません。

注記

マテリアライズドビューを作成するための CREATE OR REFRESH LIVE TABLE 構文は非推奨です。 代わりに、 CREATE OR REFRESH MATERIALIZED VIEW.