SQL を使用したパイプライン コードの開発
Lakeflow 宣言型パイプラインでは、パイプラインでマテリアライズドビューとストリーミングテーブルを定義するための新しい SQL キーワードと関数がいくつか導入されています。 パイプライン開発の SQL サポートは、Spark SQL の基本に基づいて構築され、構造化ストリーミング機能のサポートが追加されています。
PySpark データフレーム に精通しているユーザーは、Python を使用してパイプライン コードを開発することを好むかもしれません。 Python は、メタプログラミング操作など、SQL での実装が困難な、より広範なテストと操作をサポートしています。 Python を使用したパイプライン コードの開発を参照してください。
Lakeflow 宣言型パイプライン SQL構文の完全なリファレンスについては、「Lakeflow 宣言型パイプライン SQL 言語リファレンス」を参照してください。
パイプライン開発のためのSQLの基本
SQL 宣言型パイプライン データセットを作成するコード Lakeflow 、 CREATE OR REFRESH
構文を使用して、クエリ結果に対してマテリアライズドビュー とストリーミングテーブルを定義します。
STREAM
キーワードは、SELECT
句で参照されるデータソースをストリーミング セマンティクスで読み取る必要があるかどうかを示します。
パイプラインの構成中に指定されたカタログとスキーマに対してデフォルトを読み書きします。 ターゲット・カタログとスキーマの設定を参照してください。
Lakeflow 宣言型パイプライン ソース コードは SQL スクリプトと大きく異なります: Lakeflow 宣言型パイプラインは、パイプラインで構成されたすべてのソース コード ファイルのすべてのデータセット定義を評価し、クエリが実行される前にデータフロー グラフを構築します。 ノートブックまたはスクリプトに表示されるクエリの順序は、コード評価の順序を定義しますが、クエリの実行順序は定義しません。
SQL を使用したマテリアライズドビューの作成
次のコード例は、SQL を使用してマテリアライズドビューを作成するための基本的な構文を示しています。
CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;
SQLを使用してストリーミングテーブルを作成する
次のコード例は、 SQLを使用してストリーミングテーブルを作成するための基本的な構文を示しています。 ストリーミングテーブルのソースを読み取る場合、 STREAM
キーワードは、ソースにストリーミングセマンティクスを使用することを示します。マテリアライズドビューを作成するときは、 STREAM
キーワードを使用しないでください。
CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;
ストリーム キーワードを使用して、ストリーミング セマンティクスを使用してソースから読み取ります。 読み取りで既存のレコードに対する変更または削除が検出されると、エラーがスローされます。静的なソースまたは追加専用のソースから読み取るのが最も安全です。変更コミットのあるデータを取り込むには、Python と SkipChangeCommits
オプションを使用してエラーを処理できます。
オブジェクトストレージからのデータの読み込み
Lakeflow 宣言型パイプラインは、 Databricksでサポートされているすべての形式からのデータの読み込みをサポートしています。 データ形式のオプションを参照してください。
これらの例では、ワークスペースに自動的にマウントされる /databricks-datasets
で使用可能なデータを使用します。 Databricks では、ボリューム パスまたはクラウド URI を使用して、クラウド オブジェクト ストレージに格納されているデータを参照することをお勧めします。 Unity Catalogボリュームとはを参照してください。
Databricks は、クラウド オブジェクト ストレージに格納されているデータに対して増分インジェスト ワークロードを構成する場合は、 Auto Loader とストリーミング テーブルを使用することをお勧めします。 「Auto Loaderとは」を参照してください。
SQL は read_files
関数を使用して Auto Loader 機能を呼び出します。 また、 STREAM
キーワードを使用して、 read_files
でストリーミング読み取りを設定する必要があります。
次に、SQL の read_files
の構文について説明します。
CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
FROM STREAM read_files(
"<file-path>",
[<option-key> => <option_value>, ...]
)
Auto Loaderのオプションはキーと値のペアです。サポートされている形式とオプションの詳細については、「 オプション」を参照してください。
次の例では、Auto Loaderを使用してJSONファイルからストリーミングテーブルを作成します。
CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT *
FROM STREAM read_files(
"/databricks-datasets/retail-org/sales_orders",
format => "json");
read_files
関数は、マテリアライズドビューを作成するためのバッチセマンティクスもサポートしています。次の例では、バッチセマンティクスを使用して JSON ディレクトリを読み取り、マテリアライズドビューを作成します。
CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT *
FROM read_files(
"/databricks-datasets/retail-org/sales_orders",
format => "json");
エクスペクテーションでデータを検証
エクスペクテーションを使用して、データ品質の制約を設定および適用できます。 パイプラインのエクスペクテーションを使用してデータ品質を管理するを参照してください。
次のコードは、データ取り込み中に null のレコードを削除する valid_data
という名前のエクスペクテーションを定義しています。
CREATE OR REFRESH STREAMING TABLE orders_valid(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
パイプラインで定義されたマテリアライズドビューとストリーミングテーブルのクエリ
次の例では、4 つのデータセットを定義しています。
- JSON データをロードする
orders
という名前のストリーミングテーブル。 - CSV データをロードする
customers
という名前のマテリアライズドビュー。 orders
データセットとcustomers
データセットのレコードを結合し、注文タイムスタンプを日付にキャストし、customer_id
、order_number
、state
、order_date
の各フィールドを選択するcustomer_orders
という名前のマテリアライズドビュー- 各州の日次注文数を集計する
daily_orders_by_state
という名前のマテリアライズドビュー
パイプライン内のビューまたはテーブルに対してクエリを実行する場合は、カタログとスキーマを直接指定することも、パイプラインで設定されたデフォルトを使用することもできます。 この例では、 orders
、 customers
、および customer_orders
テーブルは、パイプラインに設定されたデフォルトのカタログとスキーマから書き込みと読み取りが行われます。
従来のパブリッシング モードでは、 LIVE
スキーマを使用して、パイプラインで定義されている他のマテリアライズドビュー とストリーミング テーブルに対してクエリを実行します。 新しいパイプラインでは、 LIVE
スキーマ構文は警告なしで無視されます。 LIVE スキーマ (レガシー)を参照してください。
CREATE OR REFRESH STREAMING TABLE orders(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");
CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
c.customer_id,
o.order_number,
c.state,
date(timestamp(int(o.order_datetime))) order_date
FROM orders o
INNER JOIN customers c
ON o.customer_id = c.customer_id;
CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM customer_orders
GROUP BY state, order_date;
プライベートテーブルを定義する
PRIVATE
句は、マテリアライズドビューまたはストリーミングテーブルを作成するときに使用できます。プライベートテーブルを作成するときは、テーブルを作成しますが、テーブルのメタデータは作成しません。PRIVATE
句は、Lakeflow 宣言型パイプラインに対して、パイプラインで使用できるが、パイプラインの外部からアクセスしてはならないテーブルを作成するように指示します。処理時間を短縮するために、プライベートテーブルは、1 回の更新だけでなく、それを作成するパイプラインの有効期間中保持されます。
プライベート・テーブルは、カタログ内のテーブルと同じ名前にすることができます。パイプライン内のテーブルに非修飾名を指定した場合、その名前のプライベート・テーブルとカタログ・テーブルの両方が存在する場合は、プライベート・テーブルが使用されます。
プライベートテーブルは、以前は一時テーブルとして参照されていました。
マテリアライズドビューまたはストリーミングテーブルからのレコードの完全削除
GDPR コンプライアンスなど、削除ベクトルが有効になっているストリーミングテーブルからレコードを完全に削除するには、オブジェクトの基になる Delta テーブルに対して追加の操作を実行する必要があります。ストリーミングテーブルからレコードを確実に削除するには、「 ストリーミングテーブルからレコードを完全に削除する」を参照してください。
マテリアライズドビュー は、基になるテーブルが更新されると、常にデータを反映します。 マテリアライズドビューのデータを削除するには、ソースからデータを削除し、マテリアライズドビューを更新する必要があります。
SQL でテーブルまたはビューを宣言するときに使用する値をパラメータ化します
SET
を使用して、テーブルまたはビューを宣言するクエリで設定値 (Spark 設定を含む) を指定します。ノートブックで SET
ステートメントの後に定義するテーブルまたはビューは、定義された値にアクセスできます。SET
ステートメントを使用して指定された Spark 設定は、SET ステートメントに続くテーブルまたはビューに対して Spark クエリを実行するときに使用されます。クエリの設定値を読み取るには、文字列補間構文 ${}
を使用します。次の例では、 startDate
という名前の Spark 設定値を設定し、その値をクエリで使用します。
SET startDate='2025-01-01';
CREATE OR REFRESH MATERIALIZED VIEW filtered
AS SELECT * FROM src
WHERE date > ${startDate}
複数の設定値を指定するには、値ごとに個別の SET
ステートメントを使用します。
制限
PIVOT
句はサポートされていません。Spark の pivot
操作では、入力データを出力スキーマのコンピュートに一括して読み込む必要があります。この機能は、宣言型パイプラインではサポートされていません Lakeflow 。
マテリアライズドビューを作成するための CREATE OR REFRESH LIVE TABLE
構文は非推奨です。 代わりに、 CREATE OR REFRESH MATERIALIZED VIEW
.