メインコンテンツまでスキップ

SQL を使用したパイプライン コードの開発

Lakeflow 宣言型パイプラインでは、パイプラインでマテリアライズドビューとストリーミングテーブルを定義するための新しい SQL キーワードと関数がいくつか導入されています。 パイプライン開発の SQL サポートは、Spark SQL の基本に基づいて構築され、構造化ストリーミング機能のサポートが追加されています。

PySpark データフレーム に精通しているユーザーは、Python を使用してパイプライン コードを開発することを好むかもしれません。 Python は、メタプログラミング操作など、SQL での実装が困難な、より広範なテストと操作をサポートしています。 Python を使用したパイプライン コードの開発を参照してください。

Lakeflow 宣言型パイプライン SQL構文の完全なリファレンスについては、「Lakeflow 宣言型パイプライン SQL 言語リファレンス」を参照してください。

パイプライン開発のためのSQLの基本

SQL 宣言型パイプライン データセットを作成するコード Lakeflow 、 CREATE OR REFRESH 構文を使用して、クエリ結果に対してマテリアライズドビュー とストリーミングテーブルを定義します。

STREAM キーワードは、SELECT 句で参照されるデータソースをストリーミング セマンティクスで読み取る必要があるかどうかを示します。

パイプラインの構成中に指定されたカタログとスキーマに対してデフォルトを読み書きします。 ターゲット・カタログとスキーマの設定を参照してください。

Lakeflow 宣言型パイプライン ソース コードは SQL スクリプトと大きく異なります: Lakeflow 宣言型パイプラインは、パイプラインで構成されたすべてのソース コード ファイルのすべてのデータセット定義を評価し、クエリが実行される前にデータフロー グラフを構築します。 ノートブックまたはスクリプトに表示されるクエリの順序は、コード評価の順序を定義しますが、クエリの実行順序は定義しません。

SQL を使用したマテリアライズドビューの作成

次のコード例は、SQL を使用してマテリアライズドビューを作成するための基本的な構文を示しています。

SQL
CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;

SQLを使用してストリーミングテーブルを作成する

次のコード例は、 SQLを使用してストリーミングテーブルを作成するための基本的な構文を示しています。 ストリーミングテーブルのソースを読み取る場合、 STREAM キーワードは、ソースにストリーミングセマンティクスを使用することを示します。マテリアライズドビューを作成するときは、 STREAM キーワードを使用しないでください。

SQL
CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;
注記

ストリーム キーワードを使用して、ストリーミング セマンティクスを使用してソースから読み取ります。 読み取りで既存のレコードに対する変更または削除が検出されると、エラーがスローされます。静的なソースまたは追加専用のソースから読み取るのが最も安全です。変更コミットのあるデータを取り込むには、Python と SkipChangeCommits オプションを使用してエラーを処理できます。

オブジェクトストレージからのデータの読み込み

Lakeflow 宣言型パイプラインは、 Databricksでサポートされているすべての形式からのデータの読み込みをサポートしています。 データ形式のオプションを参照してください。

注記

これらの例では、ワークスペースに自動的にマウントされる /databricks-datasets で使用可能なデータを使用します。 Databricks では、ボリューム パスまたはクラウド URI を使用して、クラウド オブジェクト ストレージに格納されているデータを参照することをお勧めします。 Unity Catalogボリュームとはを参照してください。

Databricks は、クラウド オブジェクト ストレージに格納されているデータに対して増分インジェスト ワークロードを構成する場合は、 Auto Loader とストリーミング テーブルを使用することをお勧めします。 「Auto Loaderとは」を参照してください。

SQL は read_files 関数を使用して Auto Loader 機能を呼び出します。 また、 STREAM キーワードを使用して、 read_filesでストリーミング読み取りを設定する必要があります。

次に、SQL の read_files の構文について説明します。

CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
FROM STREAM read_files(
"<file-path>",
[<option-key> => <option_value>, ...]
)

Auto Loaderのオプションはキーと値のペアです。サポートされている形式とオプションの詳細については、「 オプション」を参照してください。

次の例では、Auto Loaderを使用してJSONファイルからストリーミングテーブルを作成します。

SQL
CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT *
FROM STREAM read_files(
"/databricks-datasets/retail-org/sales_orders",
format => "json");

read_files 関数は、マテリアライズドビューを作成するためのバッチセマンティクスもサポートしています。次の例では、バッチセマンティクスを使用して JSON ディレクトリを読み取り、マテリアライズドビューを作成します。

SQL
CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT *
FROM read_files(
"/databricks-datasets/retail-org/sales_orders",
format => "json");

エクスペクテーションでデータを検証

エクスペクテーションを使用して、データ品質の制約を設定および適用できます。 パイプラインのエクスペクテーションを使用してデータ品質を管理するを参照してください。

次のコードは、データ取り込み中に null のレコードを削除する valid_data という名前のエクスペクテーションを定義しています。

SQL
CREATE OR REFRESH STREAMING TABLE orders_valid(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

パイプラインで定義されたマテリアライズドビューとストリーミングテーブルのクエリ

次の例では、4 つのデータセットを定義しています。

  • JSON データをロードする orders という名前のストリーミングテーブル。
  • CSV データをロードする customers という名前のマテリアライズドビュー。
  • orders データセットと customers データセットのレコードを結合し、注文タイムスタンプを日付にキャストし、customer_idorder_numberstateorder_date の各フィールドを選択する customer_orders という名前のマテリアライズドビュー
  • 各州の日次注文数を集計する daily_orders_by_state という名前のマテリアライズドビュー
注記

パイプライン内のビューまたはテーブルに対してクエリを実行する場合は、カタログとスキーマを直接指定することも、パイプラインで設定されたデフォルトを使用することもできます。 この例では、 orderscustomers、および customer_orders テーブルは、パイプラインに設定されたデフォルトのカタログとスキーマから書き込みと読み取りが行われます。

従来のパブリッシング モードでは、 LIVE スキーマを使用して、パイプラインで定義されている他のマテリアライズドビュー とストリーミング テーブルに対してクエリを実行します。 新しいパイプラインでは、 LIVE スキーマ構文は警告なしで無視されます。 LIVE スキーマ (レガシー)を参照してください。

SQL
CREATE OR REFRESH STREAMING TABLE orders(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");

CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
c.customer_id,
o.order_number,
c.state,
date(timestamp(int(o.order_datetime))) order_date
FROM orders o
INNER JOIN customers c
ON o.customer_id = c.customer_id;

CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM customer_orders
GROUP BY state, order_date;

プライベートテーブルを定義する

PRIVATE 句は、マテリアライズドビューまたはストリーミングテーブルを作成するときに使用できます。プライベートテーブルを作成するときは、テーブルを作成しますが、テーブルのメタデータは作成しません。PRIVATE 句は、Lakeflow 宣言型パイプラインに対して、パイプラインで使用できるが、パイプラインの外部からアクセスしてはならないテーブルを作成するように指示します。処理時間を短縮するために、プライベートテーブルは、1 回の更新だけでなく、それを作成するパイプラインの有効期間中保持されます。

プライベート・テーブルは、カタログ内のテーブルと同じ名前にすることができます。パイプライン内のテーブルに非修飾名を指定した場合、その名前のプライベート・テーブルとカタログ・テーブルの両方が存在する場合は、プライベート・テーブルが使用されます。

プライベートテーブルは、以前は一時テーブルとして参照されていました。

マテリアライズドビューまたはストリーミングテーブルからのレコードの完全削除

GDPR コンプライアンスなど、削除ベクトルが有効になっているストリーミングテーブルからレコードを完全に削除するには、オブジェクトの基になる Delta テーブルに対して追加の操作を実行する必要があります。ストリーミングテーブルからレコードを確実に削除するには、「 ストリーミングテーブルからレコードを完全に削除する」を参照してください。

マテリアライズドビュー は、基になるテーブルが更新されると、常にデータを反映します。 マテリアライズドビューのデータを削除するには、ソースからデータを削除し、マテリアライズドビューを更新する必要があります。

SQL でテーブルまたはビューを宣言するときに使用する値をパラメータ化します

SET を使用して、テーブルまたはビューを宣言するクエリで設定値 (Spark 設定を含む) を指定します。ノートブックで SET ステートメントの後に定義するテーブルまたはビューは、定義された値にアクセスできます。SET ステートメントを使用して指定された Spark 設定は、SET ステートメントに続くテーブルまたはビューに対して Spark クエリを実行するときに使用されます。クエリの設定値を読み取るには、文字列補間構文 ${}を使用します。次の例では、 startDate という名前の Spark 設定値を設定し、その値をクエリで使用します。

SET startDate='2025-01-01';

CREATE OR REFRESH MATERIALIZED VIEW filtered
AS SELECT * FROM src
WHERE date > ${startDate}

複数の設定値を指定するには、値ごとに個別の SET ステートメントを使用します。

制限

PIVOT句はサポートされていません。Spark の pivot 操作では、入力データを出力スキーマのコンピュートに一括して読み込む必要があります。この機能は、宣言型パイプラインではサポートされていません Lakeflow 。

注記

マテリアライズドビューを作成するための CREATE OR REFRESH LIVE TABLE 構文は非推奨です。 代わりに、 CREATE OR REFRESH MATERIALIZED VIEW.