メインコンテンツまでスキップ

Databricks SQL での ETL

大量のデータを扱う場合、データセット全体を再処理するのではなく、新規レコードと変更されたレコードのみを処理できるパイプラインが必要です。これは増分ETLと呼ばれます。Databricks SQLでは、プロシージャル コードを記述したり手動更新をスケジュールしたりせずに、ストリーミング テーブルとマテリアライズドビューを使用して増分ETLパイプラインを構築できます。

このチュートリアルでは、よくあるパターンである、製品の経時的な変化を追跡する方法について説明します。ソーステーブルを作成し、変更イベントをキャプチャし、各製品の完全な履歴を保持するディメンションテーブルを構築し、その上に集計レポートレイヤーを追加します。

このチュートリアルの重要な機能はAUTO CDCです。従来のデータウェアハウスでは、ターゲットテーブルへの挿入、更新、削除イベントを調整するために複雑MERGE INTOステートメントを記述します。この方法は、特にイベントが順不同で発生する場合に、エラーが発生しやすい。AUTO CDCこれを処理してくれます。ビジネスキー、シーケンス列、そしてSCDタイプ1(最新の値のみ)かSCDタイプ2(完全な履歴)かを指定すると、Databricksが適切なマージロジックを自動的に適用します。CDCの概要については、 「AUTO CDC APIs : パイプラインを使用した変更データ キャプチャの簡素化」を参照してください。

このチュートリアルを終えると、以下のことができるようになります。

  1. 変更データフィードを使用して変更を追跡するソーステーブルを作成しました。
  2. CDCイベントストリームを理解するために、生の変更データを検査した。
  3. これらのイベントからSCDタイプ2ディメンションテーブルを作成するためにAUTO CDCを使用しました。
  4. 削除イベントをパイプラインを通して段階的に処理しました。
  5. 集計レポートを段階的に維持するマテリアライズドビューを作成しました。
  6. SCHEDULE REFRESH EVERY 1 DAYを設定することで、変更がパイプライン全体に自動的に反映されるようになります。

要件

このチュートリアルを完了するには、以下の条件を満たす必要があります。

ステップ 1: カタログとスキーマをセットアップする

Databricks SQLエディタを開き、作業用カタログとスキーマを設定します。選択したカタログとスキーマに対してUSE権限を持っている必要があります。

SQL
USE CATALOG <your-catalog>;
USE SCHEMA <your-schema>;

ステップ 2: ソーステーブルを作成してデータをロードする

DatabricksでのDelta Lake変更データフィードの使用(CDF) を有効にしてproductsテーブルを作成します。 CDFはDelta Lakeの機能の一つで、挿入、更新、削除といったすべての操作をクエリ可能な変更ログとして記録します。これはトランザクション処理システムのCDCストリームに似ていますが、変更内容が外部ログからではなく、 Deltaテーブル内に直接キャプチャされる点が異なります。 ここでは、下流のパイプラインが利用する変更イベントを生成するためにCDFを使用します。

  1. テーブルを作成し、初期レコードをロードします。

    SQL
    CREATE OR REPLACE TABLE products (
    product_id INT,
    product_name STRING,
    category STRING,
    warehouse STRING
    )
    TBLPROPERTIES (delta.enableChangeDataFeed = true);

    INSERT INTO products VALUES
    (1, 'Spoon', 'Cutlery', 'Seattle'),
    (2, 'Fork', 'Cutlery', 'Portland'),
    (3, 'Knife', 'Cutlery', 'Denver'),
    (4, 'Chair', 'Furniture', 'Austin'),
    (5, 'Table', 'Furniture', 'Chicago'),
    (6, 'Lamp', 'Lighting', 'Boston'),
    (7, 'Mug', 'Kitchenware', 'Seattle'),
    (8, 'Plate', 'Kitchenware', 'Atlanta'),
    (9, 'Bowl', 'Kitchenware', 'Dallas'),
    (10, 'Glass', 'Kitchenware', 'Phoenix');
  2. 新しい製品、ウェアハウスの移動、カテゴリの再割り当てなどの上流の変更をシミュレートします。

    SQL
    INSERT INTO products VALUES
    (11, 'Napkin', 'Dining', 'San Francisco'),
    (12, 'Coaster', 'Dining', 'New York');

    UPDATE products SET warehouse = 'Los Angeles' WHERE product_id = 1;
    UPDATE products SET category = 'Dining' WHERE product_id = 2;

ステップ 3: 変更データフィードをクエリする

下流パイプラインを構築する前に、生の変更イベントを確認して、 AUTO CDCが何を処理するのかを理解しておくと役立ちます。table_changes()関数はCDFログを読み込み、キャプチャされたすべての操作とメタデータ列を返します。

SQL
SELECT
product_id, product_name, warehouse,
_change_type, _commit_version
FROM table_changes('products', 1)
ORDER BY _commit_version, product_id;

例えば、スプーンには3つのイベントがあります。 insert (シアトル)、 update_preimage (シアトル)、 update_postimage (ロサンゼルス)です。

一つの論理的な変更(例えば、スプーンを別の場所に移動させる)によって、複数のイベント、つまり前像と後像が生じることに注目してください。 従来のデータウェアハウスでは、 MERGEステートメントを記述して、これらのすべてのイベントをターゲットテーブルに統合し、挿入、更新、削除を個別のロジックで処理し、イベントが正しい順序で適用されるようにします。これはまさに、次のステップでAUTO CDCが排除する複雑さです。

ステップ 4: SCDタイプ 2 ディメンションを構築します AUTO CDC

備考

ベータ版

AUTO CDC ベータ版です。Databricks Runtime 17.3以降が必要です。

ストリーミングテーブルはデータを段階的に処理します。更新のたびに、前回の実行以降に追加された行のみを読み込むため、データセット全体を再処理する必要はありません。そのため、大量のデータソースや頻繁に変化するデータソースに最適です。

AUTO CDC 、ストリーミング テーブルに変更データ キャプチャ処理を追加します。 挿入、更新、削除を手動で処理するMERGE INTOステートメントを記述する代わりに、ビジネスキーとシーケンス列を宣言し、Databricks に適切なロジックを適用させます。AUTO CDC順不同のイベントも自動的に処理します。これは、 MERGE INTOを使用して分散システムや重複タイムスタンプを持つバッチロードから到着するイベントを処理する場合によくある問題です。

以下のステートメントは、各製品の完全なバージョン履歴を保持するSCDタイプ2テーブルを作成します。各バージョンには__START_AT__END_ATタイムスタンプが付与されます。__END_AT内のNULLは現在のバージョンを示します。

SQL
CREATE OR REFRESH STREAMING TABLE products_history
SCHEDULE REFRESH EVERY 1 DAY
FLOW AUTO CDC
FROM STREAM products WITH (readChangeFeed = true)
KEYS (product_id)
APPLY AS DELETE WHEN _change_type = 'delete'
SEQUENCE BY _commit_timestamp
COLUMNS * EXCEPT (_change_type, _commit_version, _commit_timestamp)
STORED AS SCD TYPE 2;
  • SCHEDULE REFRESH EVERY 1 DAY: 毎日定期的にテーブルを更新します。
  • FLOW AUTO CDC: これは CDC フローとして宣言します。Databricksは、挿入、更新、削除のセマンティクスを自動的に適用します。
  • KEYS (product_id): ビジネスの鍵。同じキーを持つイベントは、バージョン管理された行に統合されます。
  • APPLY AS DELETE WHEN _change_type = 'delete': 削除イベントが発生したときに、現在のバージョンを閉じます。これにより、削除イベントを識別する条件を定義できます。
  • SEQUENCE BY _commit_timestampイベントの順序を確立します。順不同の到着にも正しく対応します。
  • STORED AS SCD TYPE 2: 完全な履歴を保持します。AUTO CDC SCDタイプ1とSCDタイプ2の両方をサポートしています。

ディメンションテーブルを照会します。

SQL
SELECT product_id, product_name, warehouse, __START_AT, __END_AT
FROM products_history
ORDER BY product_id, __START_AT;
  • スプーン:2種類。シアトル(閉鎖、 __END_ATセット)とロサンゼルス(現在、 __END_AT = NULL )。
  • フォーク:2つのバージョン。カトラリーカテゴリー(販売終了)とダイニングカテゴリー(販売中)。
  • ナプキンとコースター:それぞれ1種類(新規追加、 __END_AT = NULL )。
  • その他の製品:各1バージョン( __END_AT = NULL )。

ステップ 5: パイプラインを介して削除を処理します

次に、ソーステーブルから2つの販売終了製品を削除することで、それらをシミュレートします。

SQL
DELETE FROM products WHERE product_id = 9;
DELETE FROM products WHERE product_id = 10;

これらの削除イベントはCDFログに記録されますが、ストリーミングテーブルにはまだ反映されていません。新しいイベントを処理するために、ストリーミングテーブルを更新します。

SQL
REFRESH STREAMING TABLE products_history;

ディメンションテーブルを照会して、削除が適用されたことを確認します。

SQL
SELECT product_id, product_name, warehouse, __START_AT, __END_AT
FROM products_history
ORDER BY product_id, __START_AT;

ボウルとグラスは現在__END_ATが設定されており、販売終了としてマークされています。その他の既存製品はすべて変更ありません。ストリーミングテーブルは、前回の更新時の挿入および更新を再処理することなく、新しい削除イベントのみを処理しました。

ステップ 6: 集約マテリアライズドビューを作成する

ソースの変更に合わせて常に最新の状態に保たれるディメンションテーブルができたので、その上にレポートレイヤーを追加できます。

AMATERIALIZEDビューは、コンピュート前のクエリ結果を物理テーブルとして保存します。 通常のビューは、読み取りのたびにクエリを再実行しますが、マテリアライズドビューは結果を永続化し、更新のたびに上流の変更によって影響を受ける行のみを再計算します。そのため、クエリのパフォーマンスが重要なダッシュボードやレポートに最適です。

SQL
CREATE OR REPLACE MATERIALIZED VIEW products_by_category
SCHEDULE REFRESH EVERY 1 DAY
AS
SELECT
category,
COUNT(*) AS active_products
FROM products_history
WHERE __END_AT IS NULL
GROUP BY category;

SCHEDULE REFRESH EVERY 1 DAY これは、このビューが毎日定期的に更新されることを意味します。ストリーミングテーブルにも同じスケジュールを適用することで、ソーステーブルへの変更が各更新サイクルでディメンションを経て集計テーブルに連鎖的に反映される、3段階のパイプラインが構築されます。手動で更新して実行する必要はありません。

SQL
SELECT * FROM products_by_category ORDER BY active_products DESC;

ステップ 7: エンドツーエンドのカスケードを確認する

パイプライン全体のカスケードを確認するには、ソーステーブルに変更を加えます。

SQL
UPDATE products SET warehouse = 'Seattle' WHERE product_id = 3;

ザ・ナイフはデンバーからシアトルへ移転する。この単一のDML変更によってパイプライン全体の連鎖反応が引き起こされ、3つのステージがどのように連携して動作するかが示されます。

  1. products 変更イベントをCDF経由で記録します。
  2. products_history イベントを処理し、ナイフの新しいバージョンを追加します。
  3. products_by_category 影響を受けるカトラリーの行のみを再計算します。

確認する:

SQL
SELECT product_id, product_name, warehouse, __START_AT, __END_AT
FROM products_history
WHERE product_id = 3
ORDER BY __START_AT;

SELECT * FROM products_by_category ORDER BY active_products DESC;

掃除

このチュートリアルで作成されたリソースをクリーンアップするには、次のSQLを使用します。

SQL
DROP MATERIALIZED VIEW IF EXISTS products_by_category;
DROP STREAMING TABLE IF EXISTS products_history;
DROP TABLE IF EXISTS products;

その他のリソース