SQL を使用したパイプライン コードの開発

Delta Live Tables では、パイプラインで具体化されたビューとストリーミング テーブルを定義するための新しい SQL キーワードと関数がいくつか導入されています。 パイプライン開発の SQL サポートは、Spark SQL の基本に基づいて構築され、構造化ストリーミング機能のサポートが追加されています。

PySpark DataFrames に精通しているユーザーは、Python を使用してパイプライン コードを開発することを好むかもしれません。 Python は、メタプログラミング操作など、SQL での実装が困難な、より広範なテストと操作をサポートしています。 「Python を使用したパイプライン コードの開発」を参照してください。

Delta Live Tables SQL 構文の完全なリファレンスについては、「 Delta Live Tables SQL 言語リファレンス」を参照してください。

パイプライン開発のためのSQLの基本

Delta Live Tables データセットを作成する SQL コードでは、 CREATE OR REFRESH 構文を使用して、クエリ結果に対する具体化されたビューとストリーミング テーブルを定義します。

STREAM キーワードは、SELECT 句で参照されるデータソースをストリーミング セマンティクスで読み取る必要があるかどうかを示します。

Delta Live Tables のソース コードは SQL スクリプトとは大きく異なります: Delta Live Tables は、パイプラインで構成されたすべてのソース コード ファイルのすべてのデータセット定義を評価し、クエリが実行される前にデータフロー グラフを構築します。 ノートブックまたはスクリプトに表示されるクエリの順序によって、実行の順序が定義されるわけではありません。

SQL を使用したマテリアライズドビューの作成

次のコード例は、SQL を使用してマテリアライズドビューを作成するための基本的な構文を示しています。

CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;

SQL を使用してストリーミング テーブルを作成する

次のコード例は、SQL を使用してストリーミング テーブルを作成するための基本的な構文を示しています。

注:

すべてのデータソースがストリーミング読み取りをサポートしているわけではなく、一部のデータソースは常にストリーミングセマンティクスで処理する必要があります。

CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;

オブジェクトストレージからのデータの読み込み

Delta Live Tables は、Databricks でサポートされているすべての形式からのデータの読み込みをサポートしています。 データ形式のオプションを参照してください。

注:

これらの例では、ワークスペースに自動的にマウントされる /databricks-datasets で使用可能なデータを使用します。 Databricks では、ボリューム パスまたはクラウド URI を使用して、クラウド オブジェクト ストレージに格納されているデータを参照することをお勧めします。 Unity Catalogボリュームとはを参照してください

Databricks では、クラウド オブジェクト ストレージに格納されているデータに対して増分インジェスト ワークロードを構成する場合は、 Auto Loader テーブルとストリーミング テーブルを使用することをお勧めします。 「Auto Loaderとは」を参照してください。

SQL は read_files 関数を使用して Auto Loader 機能を呼び出します。 また、 STREAM キーワードを使用して、 read_filesでストリーミング読み取りを設定する必要があります。

次の例では、 を使用して ファイルからストリーミングテーブルを作成します。JSONAuto Loader

CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

read_files 関数は、マテリアライズドビューを作成するためのバッチセマンティクスもサポートしています。次の例では、バッチセマンティクスを使用して JSON ディレクトリを読み取り、具体化されたビューを作成します。

CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders");

期待値でデータを検証

期待値を使用して、データ品質の制約を設定および適用できます。 「Delta Live Tables を使用したデータ品質の管理」を参照してください。

次のコードは、データ取り込み中に null のレコードを削除する valid_data という名前のエクスペクテーションを定義しています。

CREATE OR REFRESH STREAMING TABLE orders_valid(
  CONSTRAINT valid_date
  EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
  ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

パイプラインで定義されたマテリアライズドビューとストリーミングテーブルのクエリ

LIVE スキーマを使用して、パイプラインで定義されている他の具体化されたビューとストリーミング テーブルに対してクエリを実行します。

次の例では、4 つのデータセットを定義しています。

  • JSON データをロードする orders という名前のストリーミングテーブル。

  • CSV データをロードする customers という名前のマテリアライズドビュー。

  • orders データセットと customers データセットのレコードを結合し、注文タイムスタンプを日付にキャストし、customer_idorder_numberstateorder_date の各フィールドを選択する customer_orders という名前の実体化ビュー

  • 各州の日次注文数を集計する daily_orders_by_state という名前の具体化ビュー

CREATE OR REFRESH STREAMING TABLE orders(
  CONSTRAINT valid_date
  EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
  ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");

CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
  c.customer_id,
  o.order_number,
  c.state,
  date(timestamp(int(o.order_datetime))) order_date
FROM LIVE.orders o
INNER JOIN LIVE.customers c
ON o.customer_id = c.customer_id;

CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM LIVE.customer_orders
GROUP BY state, order_date;