SQL を使用したパイプライン コードの開発
Delta Live Tables では、パイプラインで具体化されたビューとストリーミング テーブルを定義するための新しい SQL キーワードと関数がいくつか導入されています。 パイプライン開発の SQL サポートは、Spark SQL の基本に基づいて構築され、構造化ストリーミング機能のサポートが追加されています。
PySpark DataFrames に精通しているユーザーは、Python を使用してパイプライン コードを開発することを好むかもしれません。 Python は、メタプログラミング操作など、SQL での実装が困難な、より広範なテストと操作をサポートしています。 「Python を使用したパイプライン コードの開発」を参照してください。
Delta Live Tables SQL 構文の完全なリファレンスについては、「 Delta Live Tables SQL 言語リファレンス」を参照してください。
パイプライン開発のためのSQLの基本
Delta Live Tables データセットを作成する SQL コードでは、 CREATE OR REFRESH
構文を使用して、クエリ結果に対する具体化されたビューとストリーミング テーブルを定義します。
STREAM
キーワードは、SELECT
句で参照されるデータソースをストリーミング セマンティクスで読み取る必要があるかどうかを示します。
Delta Live Tables のソース コードは SQL スクリプトとは大きく異なります: Delta Live Tables は、パイプラインで構成されたすべてのソース コード ファイルのすべてのデータセット定義を評価し、クエリが実行される前にデータフロー グラフを構築します。 ノートブックまたはスクリプトに表示されるクエリの順序によって、実行の順序が定義されるわけではありません。
SQL を使用したマテリアライズドビューの作成
次のコード例は、SQL を使用してマテリアライズドビューを作成するための基本的な構文を示しています。
CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;
SQL を使用してストリーミング テーブルを作成する
次のコード例は、SQL を使用してストリーミング テーブルを作成するための基本的な構文を示しています。
注:
すべてのデータソースがストリーミング読み取りをサポートしているわけではなく、一部のデータソースは常にストリーミングセマンティクスで処理する必要があります。
CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;
オブジェクトストレージからのデータの読み込み
Delta Live Tables は、Databricks でサポートされているすべての形式からのデータの読み込みをサポートしています。 データ形式のオプションを参照してください。
注:
これらの例では、ワークスペースに自動的にマウントされる /databricks-datasets
で使用可能なデータを使用します。 Databricks では、ボリューム パスまたはクラウド URI を使用して、クラウド オブジェクト ストレージに格納されているデータを参照することをお勧めします。 Unity Catalogボリュームとはを参照してください。
Databricks では、クラウド オブジェクト ストレージに格納されているデータに対して増分インジェスト ワークロードを構成する場合は、 Auto Loader テーブルとストリーミング テーブルを使用することをお勧めします。 「Auto Loaderとは」を参照してください。
SQL は read_files
関数を使用して Auto Loader 機能を呼び出します。 また、 STREAM
キーワードを使用して、 read_files
でストリーミング読み取りを設定する必要があります。
次の例では、 を使用して ファイルからストリーミングテーブルを作成します。JSONAuto Loader
CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
read_files
関数は、マテリアライズドビューを作成するためのバッチセマンティクスもサポートしています。次の例では、バッチセマンティクスを使用して JSON ディレクトリを読み取り、具体化されたビューを作成します。
CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders");
期待値でデータを検証
期待値を使用して、データ品質の制約を設定および適用できます。 「Delta Live Tables を使用したデータ品質の管理」を参照してください。
次のコードは、データ取り込み中に null のレコードを削除する valid_data
という名前のエクスペクテーションを定義しています。
CREATE OR REFRESH STREAMING TABLE orders_valid(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
パイプラインで定義されたマテリアライズドビューとストリーミングテーブルのクエリ
LIVE
スキーマを使用して、パイプラインで定義されている他の具体化されたビューとストリーミング テーブルに対してクエリを実行します。
次の例では、4 つのデータセットを定義しています。
JSON データをロードする
orders
という名前のストリーミングテーブル。CSV データをロードする
customers
という名前のマテリアライズドビュー。orders
データセットとcustomers
データセットのレコードを結合し、注文タイムスタンプを日付にキャストし、customer_id
、order_number
、state
、order_date
の各フィールドを選択するcustomer_orders
という名前の実体化ビュー各州の日次注文数を集計する
daily_orders_by_state
という名前の具体化ビュー
CREATE OR REFRESH STREAMING TABLE orders(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");
CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
c.customer_id,
o.order_number,
c.state,
date(timestamp(int(o.order_datetime))) order_date
FROM LIVE.orders o
INNER JOIN LIVE.customers c
ON o.customer_id = c.customer_id;
CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM LIVE.customer_orders
GROUP BY state, order_date;