SQLを使用してパイプラインコードを開発する
Lakeflow 宣言型パイプラインでは、パイプラインでマテリアライズドビューとストリーミングテーブルを定義するための新しい SQL キーワードと関数がいくつか導入されています。 パイプライン開発の SQL サポートは、Spark SQL の基本に基づいて構築され、構造化ストリーミング機能のサポートが追加されています。
PySpark データフレーム に精通しているユーザーは、Python を使用してパイプライン コードを開発することを好むかもしれません。 Python は、メタプログラミング操作など、SQL での実装が困難な、より広範なテストと操作をサポートしています。 Python を使用したパイプライン コードの開発を参照してください。
Lakeflow 宣言型パイプライン SQL構文の完全なリファレンスについては、「Lakeflow 宣言型パイプライン SQL 言語リファレンス」を参照してください。
パイプライン開発のためのSQLの基礎
SQL 宣言型パイプライン データセットを作成するコード Lakeflow 、 CREATE OR REFRESH
構文を使用して、クエリ結果に対してマテリアライズドビュー とストリーミングテーブルを定義します。
STREAM
キーワードは、 SELECT
句で参照されるデータソースをストリーミング セマンティクスで読み取る必要があるかどうかを示します。
読み取りと書き込みは、パイプライン構成時に指定されたカタログとスキーマにデフォルト設定されます。「ターゲット カタログとスキーマを設定する」を参照してください。
LakeFlow宣言型パイプライン ソース コードは、 SQLスクリプトとは大きく異なります。LakeFlow LakeFlow型パイプラインは、パイプライン内で構成されたすべてのソース コード ファイルにわたってすべてのデータセット定義を評価し、クエリが実行される前にデータフロー グラフを構築します。 ソース ファイル内に表示されるクエリの順序はコードの評価順序を定義しますが、クエリの実行順序は定義しません。
SQLでマテリアライズドビューを作成する
次のコード例は、 SQLを使用してマテリアライズドビューを作成するための基本的な構文を示しています。
CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;
SQLでストリーミングテーブルを作成する
次のコード例は、 SQLを使用してストリーミング テーブルを作成するための基本的な構文を示しています。 ストリーミング テーブルのソースを読み取るとき、 STREAM
キーワードは、ソースにストリーミング セマンティクスを使用することを示します。 マテリアライズドビューを作成するときは、 STREAM
キーワードを使用しないでください。
CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;
ストリーム キーワードを使用して、ストリーミング セマンティクスを使用してソースから読み取ります。 読み取り中に既存のレコードの変更または削除が検出されると、エラーがスローされます。静的ソースまたは追加専用のソースから読み取るのが最も安全です。変更コミットを含むデータを取り込むには、Python とSkipChangeCommits
オプションを使用してエラーを処理できます。
オブジェクトストレージからデータをロードする
Lakeflow 宣言型パイプラインは、 Databricksでサポートされているすべての形式からのデータの読み込みをサポートしています。 データ形式のオプションを参照してください。
これらの例では、ワークスペースに自動的にマウントされる /databricks-datasets
で使用可能なデータを使用します。 Databricks では、ボリューム パスまたはクラウド URI を使用して、クラウド オブジェクト ストレージに格納されているデータを参照することをお勧めします。 Unity Catalogボリュームとはを参照してください。
Databricks 、クラウド オブジェクト ストレージに保存されているデータに対して増分取り込みワークロードを構成する場合、 Auto Loaderとストリーミング テーブルを使用することをお勧めします。 「Auto Loader とは何ですか?」を参照してください。
SQL read_files
関数を使用してAuto Loader機能を呼び出します。 read_files
でストリーミング読み取りを構成するには、 STREAM
キーワードも使用する必要があります。
以下は SQL のread_files
の構文について説明しています。
CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
FROM STREAM read_files(
"<file-path>",
[<option-key> => <option_value>, ...]
)
Auto Loaderのオプションはキーと値のペアです。 サポートされている形式とオプションの詳細については、 「オプション」を参照してください。
次の例では、Auto Loaderを使用してJSONファイルからストリーミングテーブルを作成します。
CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT *
FROM STREAM read_files(
"/databricks-datasets/retail-org/sales_orders",
format => "json");
read_files
関数は、マテリアライズドビューを作成するためのバッチセマンティクスもサポートしています。次の例では、バッチセマンティクスを使用して JSON ディレクトリを読み取り、マテリアライズドビューを作成します。
CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT *
FROM read_files(
"/databricks-datasets/retail-org/sales_orders",
format => "json");
エクスペクテーションでデータを検証
エクスペクテーションを使用して、データ品質の制約を設定および適用できます。 パイプラインのエクスペクテーションを使用してデータ品質を管理するを参照してください。
次のコードは、データ取り込み中に null のレコードを削除するvalid_data
という名前の期待値を定義します。
CREATE OR REFRESH STREAMING TABLE orders_valid(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
パイプラインで定義されたマテリアライズドビューとストリーミングテーブルをクエリする
次の例では、4 つのデータセットを定義します。
- JSONデータをロードする
orders
という名前のストリーミング テーブル。 - CSVデータをロードする
customers
という名前のマテリアライズドビュー。 orders
データセットとcustomers
データセットのレコードを結合し、注文タイムスタンプを日付にキャストし、customer_id
、order_number
、state
、order_date
の各フィールドを選択するcustomer_orders
という名前のマテリアライズドビュー- 各州の日次注文数を集計する
daily_orders_by_state
という名前のマテリアライズドビュー
パイプライン内のビューまたはテーブルをクエリする場合、カタログとスキーマを直接指定することも、パイプラインで構成されたデフォルトを使用することもできます。この例では、パイプラインに構成されたデフォルトのカタログとスキーマから、 orders
、 customers
、およびcustomer_orders
テーブルが書き込まれ、読み取られます。
レガシー公開モードは、 LIVE
スキーマを使用して、パイプラインに定義されている他のマテリアライズドビューとストリーミング テーブルをクエリします。 新しいパイプラインでは、 LIVE
スキーマ構文は暗黙的に無視されます。LIVE スキーマ (レガシー)を参照してください。
CREATE OR REFRESH STREAMING TABLE orders(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");
CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
c.customer_id,
o.order_number,
c.state,
date(timestamp(int(o.order_datetime))) order_date
FROM orders o
INNER JOIN customers c
ON o.customer_id = c.customer_id;
CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM customer_orders
GROUP BY state, order_date;
プライベートテーブルを定義する
PRIVATE
句は、マテリアライズドビューまたはストリーミングテーブルを作成するときに使用できます。プライベートテーブルを作成するときは、テーブルを作成しますが、テーブルのメタデータは作成しません。PRIVATE
句は、Lakeflow 宣言型パイプラインに対して、パイプラインで使用できるが、パイプラインの外部からアクセスしてはならないテーブルを作成するように指示します。処理時間を短縮するために、プライベートテーブルは、1 回の更新だけでなく、それを作成するパイプラインの有効期間中保持されます。
プライベート テーブルの名前は、カタログ内のテーブルと同じにすることができます。パイプライン内のテーブルに修飾されていない名前を指定した場合、その名前のプライベート テーブルとカタログ テーブルの両方が存在すると、プライベート テーブルが使用されます。
プライベート テーブルは以前は一時テーブルと呼ばれていました。
マテリアライズドビューまたはストリーミングテーブルからレコードを完全に削除する
GDPRコンプライアンスなどの削除を有効にしてストリーミング テーブルからレコードを永久に削除するには、オブジェクトの基礎となるDeltaテーブルに対して追加の操作を実行する必要があります。 ストリーミング テーブルからレコードを確実に削除するには、 「ストリーミング テーブルからレコードを完全に削除する」を参照してください。
マテリアライズドビューは、基になるテーブルが更新されると常にそのデータを反映します。 マテリアライズドビューのデータを削除するには、ソースからデータを削除し、マテリアライズドビューを更新する必要があります。
SQLでテーブルまたはビューを宣言するときに使用する値をパラメータ化する
SET
を使用して、Spark 構成を含むテーブルまたはビューを宣言するクエリで構成値を指定します。SET
ステートメントの後のソース ファイルで定義したテーブルまたはビューは、定義された値にアクセスできます。SET
ステートメントを使用して指定された Spark 構成は、SET ステートメントに続く任意のテーブルまたはビューに対して Spark クエリを実行するときに使用されます。クエリ内の構成値を読み取るには、文字列補間構文${}
を使用します。次の例では、 startDate
という名前の Spark 構成値を設定し、その値をクエリで使用します。
SET startDate='2025-01-01';
CREATE OR REFRESH MATERIALIZED VIEW filtered
AS SELECT * FROM src
WHERE date > ${startDate}
複数の構成値を指定するには、値ごとに個別のSET
ステートメントを使用します。
制限事項
PIVOT
句はサポートされていません。Spark の pivot
操作では、入力データを出力スキーマのコンピュートに一括して読み込む必要があります。この機能は、宣言型パイプラインではサポートされていません Lakeflow 。
マテリアライズドビューを作成するための CREATE OR REFRESH LIVE TABLE
構文は非推奨です。 代わりに、 CREATE OR REFRESH MATERIALIZED VIEW
.