メインコンテンツまでスキップ

SQL を使用したパイプライン コードの開発

DLT では、パイプラインでマテリアライズドビューとストリーミングテーブルを定義するための新しい SQL キーワードと関数がいくつか導入されています。 パイプライン開発の SQL サポートは、Spark SQL の基本に基づいて構築され、構造化ストリーミング機能のサポートが追加されています。

PySpark データフレーム に精通しているユーザーは、Python を使用してパイプライン コードを開発することを好むかもしれません。 Python は、メタプログラミング操作など、SQL での実装が困難な、より広範なテストと操作をサポートしています。 Python を使用したパイプライン コードの開発を参照してください。

DLT SQL 構文の完全なリファレンスについては、「 DLT SQL 言語リファレンス」を参照してください。

パイプライン開発のためのSQLの基本

DLT データセットを作成するコードSQL、CREATE OR REFRESH 構文を使用して、クエリ結果に対してマテリアライズドビュー とストリーミングテーブルを定義します。

STREAM キーワードは、SELECT 句で参照されるデータソースをストリーミング セマンティクスで読み取る必要があるかどうかを示します。

パイプラインの構成中に指定されたカタログとスキーマに対してデフォルトを読み書きします。 ターゲット・カタログとスキーマの設定を参照してください。

DLT のソースコードは SQL スクリプトと大きく異なります: DLT は、パイプラインで設定されたすべてのソースコードファイルのすべてのデータセット定義を評価し、クエリが実行される前にデータフローグラフを構築します。ノートブックまたはスクリプトに表示されるクエリの順序は、コード評価の順序を定義しますが、クエリの実行順序は定義しません。

SQL を使用したマテリアライズドビューの作成

次のコード例は、SQL を使用してマテリアライズドビューを作成するための基本的な構文を示しています。

SQL
CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;

SQLを使用してストリーミングテーブルを作成する

次のコード例は、 SQLを使用してストリーミングテーブルを作成するための基本的な構文を示しています。

注記

すべてのデータソースがストリーミング読み取りをサポートしているわけではなく、一部のデータソースは常にストリーミングセマンティクスで処理する必要があります。

SQL
CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;

オブジェクトストレージからのデータの読み込み

DLT は、Databricks でサポートされているすべての形式からのデータの読み込みをサポートしています。データ形式のオプションを参照してください。

注記

これらの例では、ワークスペースに自動的にマウントされる /databricks-datasets で使用可能なデータを使用します。 Databricks では、ボリューム パスまたはクラウド URI を使用して、クラウド オブジェクト ストレージに格納されているデータを参照することをお勧めします。 Unity Catalogボリュームとはを参照してください。

Databricks は、クラウド オブジェクト ストレージに格納されているデータに対して増分インジェスト ワークロードを構成する場合は、 Auto Loader とストリーミング テーブルを使用することをお勧めします。 「Auto Loaderとは」を参照してください。

SQL は read_files 関数を使用して Auto Loader 機能を呼び出します。 また、 STREAM キーワードを使用して、 read_filesでストリーミング読み取りを設定する必要があります。

次の例では、Auto Loaderを使用してJSONファイルからストリーミングテーブルを作成します。

SQL
CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

read_files 関数は、マテリアライズドビューを作成するためのバッチセマンティクスもサポートしています。次の例では、バッチセマンティクスを使用して JSON ディレクトリを読み取り、マテリアライズドビューを作成します。

SQL
CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders");

エクスペクテーションでデータを検証

エクスペクテーションを使用して、データ品質の制約を設定および適用できます。 パイプラインのエクスペクテーションを使用してデータ品質を管理するを参照してください。

次のコードは、データ取り込み中に null のレコードを削除する valid_data という名前のエクスペクテーションを定義しています。

SQL
CREATE OR REFRESH STREAMING TABLE orders_valid(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

パイプラインで定義されたマテリアライズドビューとストリーミングテーブルのクエリ

次の例では、4 つのデータセットを定義しています。

  • JSON データをロードする orders という名前のストリーミングテーブル。
  • CSV データをロードする customers という名前のマテリアライズドビュー。
  • orders データセットと customers データセットのレコードを結合し、注文タイムスタンプを日付にキャストし、customer_idorder_numberstateorder_date の各フィールドを選択する customer_orders という名前のマテリアライズドビュー
  • 各州の日次注文数を集計する daily_orders_by_state という名前のマテリアライズドビュー
注記

パイプライン内のビューまたはテーブルに対してクエリを実行する場合は、カタログとスキーマを直接指定することも、パイプラインで設定されたデフォルトを使用することもできます。 この例では、 orderscustomers、および customer_orders テーブルは、パイプラインに設定されたデフォルトのカタログとスキーマから書き込みと読み取りが行われます。

従来のパブリッシング モードでは、 LIVE スキーマを使用して、パイプラインで定義されている他のマテリアライズドビュー とストリーミング テーブルに対してクエリを実行します。 新しいパイプラインでは、 LIVE スキーマ構文は警告なしで無視されます。 LIVE スキーマ (レガシー)を参照してください。

SQL
CREATE OR REFRESH STREAMING TABLE orders(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");

CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
c.customer_id,
o.order_number,
c.state,
date(timestamp(int(o.order_datetime))) order_date
FROM orders o
INNER JOIN customers c
ON o.customer_id = c.customer_id;

CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM customer_orders
GROUP BY state, order_date;