メインコンテンツまでスキップ

SQLを使用してパイプラインコードを開発する

Lakeflow 宣言型パイプラインでは、パイプラインでマテリアライズドビューとストリーミングテーブルを定義するための新しい SQL キーワードと関数がいくつか導入されています。 パイプライン開発の SQL サポートは、Spark SQL の基本に基づいて構築され、構造化ストリーミング機能のサポートが追加されています。

PySpark データフレーム に精通しているユーザーは、Python を使用してパイプライン コードを開発することを好むかもしれません。 Python は、メタプログラミング操作など、SQL での実装が困難な、より広範なテストと操作をサポートしています。 Python を使用したパイプライン コードの開発を参照してください。

Lakeflow 宣言型パイプライン SQL構文の完全なリファレンスについては、「Lakeflow 宣言型パイプライン SQL 言語リファレンス」を参照してください。

パイプライン開発のためのSQLの基礎

SQL 宣言型パイプライン データセットを作成するコード Lakeflow 、 CREATE OR REFRESH 構文を使用して、クエリ結果に対してマテリアライズドビュー とストリーミングテーブルを定義します。

STREAMキーワードは、 SELECT句で参照されるデータソースをストリーミング セマンティクスで読み取る必要があるかどうかを示します。

読み取りと書き込みは、パイプライン構成時に指定されたカタログとスキーマにデフォルト設定されます。「ターゲット カタログとスキーマを設定する」を参照してください。

LakeFlow宣言型パイプライン ソース コードは、 SQLスクリプトとは大きく異なります。LakeFlow LakeFlow型パイプラインは、パイプライン内で構成されたすべてのソース コード ファイルにわたってすべてのデータセット定義を評価し、クエリが実行される前にデータフロー グラフを構築します。 ソース ファイル内に表示されるクエリの順序はコードの評価順序を定義しますが、クエリの実行順序は定義しません。

SQLでマテリアライズドビューを作成する

次のコード例は、 SQLを使用してマテリアライズドビューを作成するための基本的な構文を示しています。

SQL
CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;

SQLでストリーミングテーブルを作成する

次のコード例は、 SQLを使用してストリーミング テーブルを作成するための基本的な構文を示しています。 ストリーミング テーブルのソースを読み取るとき、 STREAMキーワードは、ソースにストリーミング セマンティクスを使用することを示します。 マテリアライズドビューを作成するときは、 STREAMキーワードを使用しないでください。

SQL
CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;
注記

ストリーム キーワードを使用して、ストリーミング セマンティクスを使用してソースから読み取ります。 読み取り中に既存のレコードの変更または削除が検出されると、エラーがスローされます。静的ソースまたは追加専用のソースから読み取るのが最も安全です。変更コミットを含むデータを取り込むには、Python とSkipChangeCommitsオプションを使用してエラーを処理できます。

オブジェクトストレージからデータをロードする

Lakeflow 宣言型パイプラインは、 Databricksでサポートされているすべての形式からのデータの読み込みをサポートしています。 データ形式のオプションを参照してください。

注記

これらの例では、ワークスペースに自動的にマウントされる /databricks-datasets で使用可能なデータを使用します。 Databricks では、ボリューム パスまたはクラウド URI を使用して、クラウド オブジェクト ストレージに格納されているデータを参照することをお勧めします。 Unity Catalogボリュームとはを参照してください。

Databricks 、クラウド オブジェクト ストレージに保存されているデータに対して増分取り込みワークロードを構成する場合、 Auto Loaderとストリーミング テーブルを使用することをお勧めします。 「Auto Loader とは何ですか?」を参照してください。

SQL read_files関数を使用してAuto Loader機能を呼び出します。 read_filesでストリーミング読み取りを構成するには、 STREAMキーワードも使用する必要があります。

以下は SQL のread_filesの構文について説明しています。

CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
FROM STREAM read_files(
"<file-path>",
[<option-key> => <option_value>, ...]
)

Auto Loaderのオプションはキーと値のペアです。 サポートされている形式とオプションの詳細については、 「オプション」を参照してください。

次の例では、Auto Loaderを使用してJSONファイルからストリーミングテーブルを作成します。

SQL
CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT *
FROM STREAM read_files(
"/databricks-datasets/retail-org/sales_orders",
format => "json");

read_files 関数は、マテリアライズドビューを作成するためのバッチセマンティクスもサポートしています。次の例では、バッチセマンティクスを使用して JSON ディレクトリを読み取り、マテリアライズドビューを作成します。

SQL
CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT *
FROM read_files(
"/databricks-datasets/retail-org/sales_orders",
format => "json");

エクスペクテーションでデータを検証

エクスペクテーションを使用して、データ品質の制約を設定および適用できます。 パイプラインのエクスペクテーションを使用してデータ品質を管理するを参照してください。

次のコードは、データ取り込み中に null のレコードを削除するvalid_dataという名前の期待値を定義します。

SQL
CREATE OR REFRESH STREAMING TABLE orders_valid(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

パイプラインで定義されたマテリアライズドビューとストリーミングテーブルをクエリする

次の例では、4 つのデータセットを定義します。

  • JSONデータをロードするordersという名前のストリーミング テーブル。
  • CSVデータをロードするcustomersという名前のマテリアライズドビュー。
  • orders データセットと customers データセットのレコードを結合し、注文タイムスタンプを日付にキャストし、customer_idorder_numberstateorder_date の各フィールドを選択する customer_orders という名前のマテリアライズドビュー
  • 各州の日次注文数を集計する daily_orders_by_state という名前のマテリアライズドビュー
注記

パイプライン内のビューまたはテーブルをクエリする場合、カタログとスキーマを直接指定することも、パイプラインで構成されたデフォルトを使用することもできます。この例では、パイプラインに構成されたデフォルトのカタログとスキーマから、 orderscustomers 、およびcustomer_ordersテーブルが書き込まれ、読み取られます。

レガシー公開モードは、 LIVEスキーマを使用して、パイプラインに定義されている他のマテリアライズドビューとストリーミング テーブルをクエリします。 新しいパイプラインでは、 LIVEスキーマ構文は暗黙的に無視されます。LIVE スキーマ (レガシー)を参照してください。

SQL
CREATE OR REFRESH STREAMING TABLE orders(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");

CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
c.customer_id,
o.order_number,
c.state,
date(timestamp(int(o.order_datetime))) order_date
FROM orders o
INNER JOIN customers c
ON o.customer_id = c.customer_id;

CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM customer_orders
GROUP BY state, order_date;

プライベートテーブルを定義する

PRIVATE 句は、マテリアライズドビューまたはストリーミングテーブルを作成するときに使用できます。プライベートテーブルを作成するときは、テーブルを作成しますが、テーブルのメタデータは作成しません。PRIVATE 句は、Lakeflow 宣言型パイプラインに対して、パイプラインで使用できるが、パイプラインの外部からアクセスしてはならないテーブルを作成するように指示します。処理時間を短縮するために、プライベートテーブルは、1 回の更新だけでなく、それを作成するパイプラインの有効期間中保持されます。

プライベート テーブルの名前は、カタログ内のテーブルと同じにすることができます。パイプライン内のテーブルに修飾されていない名前を指定した場合、その名前のプライベート テーブルとカタログ テーブルの両方が存在すると、プライベート テーブルが使用されます。

プライベート テーブルは以前は一時テーブルと呼ばれていました。

マテリアライズドビューまたはストリーミングテーブルからレコードを完全に削除する

GDPRコンプライアンスなどの削除を有効にしてストリーミング テーブルからレコードを永久に削除するには、オブジェクトの基礎となるDeltaテーブルに対して追加の操作を実行する必要があります。 ストリーミング テーブルからレコードを確実に削除するには、 「ストリーミング テーブルからレコードを完全に削除する」を参照してください。

マテリアライズドビューは、基になるテーブルが更新されると常にそのデータを反映します。 マテリアライズドビューのデータを削除するには、ソースからデータを削除し、マテリアライズドビューを更新する必要があります。

SQLでテーブルまたはビューを宣言するときに使用する値をパラメータ化する

SETを使用して、Spark 構成を含むテーブルまたはビューを宣言するクエリで構成値を指定します。SETステートメントの後のソース ファイルで定義したテーブルまたはビューは、定義された値にアクセスできます。SETステートメントを使用して指定された Spark 構成は、SET ステートメントに続く任意のテーブルまたはビューに対して Spark クエリを実行するときに使用されます。クエリ内の構成値を読み取るには、文字列補間構文${}を使用します。次の例では、 startDateという名前の Spark 構成値を設定し、その値をクエリで使用します。

SET startDate='2025-01-01';

CREATE OR REFRESH MATERIALIZED VIEW filtered
AS SELECT * FROM src
WHERE date > ${startDate}

複数の構成値を指定するには、値ごとに個別のSETステートメントを使用します。

制限事項

PIVOT句はサポートされていません。Spark の pivot 操作では、入力データを出力スキーマのコンピュートに一括して読み込む必要があります。この機能は、宣言型パイプラインではサポートされていません Lakeflow 。

注記

マテリアライズドビューを作成するための CREATE OR REFRESH LIVE TABLE 構文は非推奨です。 代わりに、 CREATE OR REFRESH MATERIALIZED VIEW.