メインコンテンツまでスキップ

チュートリアル: Build an ETL パイプライン using チェンジデータキャプチャ with DLT

for data オーケストレーションと ETLを使用して、チェンジデータキャプチャ () を使用して (抽出、変換、読み込み) パイプラインを作成およびデプロイする方法を学習します。CDCDLTAuto LoaderETL パイプラインは、ソース システムからデータを読み取り、データ品質チェックやレコード重複排除などの要件に基づいてそのデータを変換し、データ ウェアハウスやデータレイクなどのターゲット システムにデータを書き込む手順を実装します。

このチュートリアルでは、MySQL データベースの customers テーブルのデータを使用して、次の操作を行います。

  • Debezium またはその他のツールを使用してトランザクションデータベースから変更を抽出し、クラウドオブジェクトストレージ (S3 フォルダー、ADLS、GCS) に保存します。チュートリアルを簡略化するために、外部 CDC システムの設定をスキップします。
  • Auto Loader を使用して、クラウド・オブジェクト・ストレージからメッセージを増分的にロードし、未加工のメッセージを customers_cdc テーブルに保管します。Auto Loader スキーマを推論し、スキーマの進化を処理します。
  • ビュー customers_cdc_clean を追加して、期待値を使用してデータ品質を確認します。たとえば、 id は、アップサート操作を実行するために使用するため、 null しないでください。
  • クリーニングされた CDC データに対して APPLY CHANGES INTO (アップサートの実行) を実行して、最終的な customers テーブルに変更を適用します
  • DLT がタイプ 2 緩やかに変化するディメンション (SCD2) を作成して、すべての変更を追跡する方法を示します。

目標は、生データをほぼリアルタイムで取り込み、データ品質を確保しながらアナリストチーム用のテーブルを構築することです。

このチュートリアルでは、メダリオン レイクハウス アーキテクチャを使用して、ブロンズレイヤーを介して生データを取り込み、シルバーレイヤーでデータのクリーニングと検証を行い、ゴールドレイヤーを使用してディメンション モデリングと集計を適用します。 メ ダリオンレイクハウスの建築とは何ですか?詳細については。

実装するフローは、次のようになります。

CDC を使用した DLT パイプライン

DLT、Auto Loader、CDCの詳細については、「DLT」、「Auto Loaderとは」、「チェンジデータキャプチャとは」(CDC)を参照してください。

必要条件

このチュートリアルを完了するには、以下の条件を満たす必要があります。

チェンジデータキャプチャ in an ETL パイプライン

チェンジデータキャプチャ (CDC) は、トランザクションデータベース ( MySQL や PostgreSQLなど) またはデータウェアハウスに対して行われたレコードの変更をキャプチャするプロセスです。 CDC は、データの削除、追加、更新などの操作を、通常は外部システムのテーブルを再具体化するためのストリームとしてキャプチャします。CDC は、一括読み込みの更新を不要にしながら、増分読み込みを可能にします。

注記

チュートリアルを簡略化するために、外部 CDC システムのセットアップはスキップします。CDC データを JSON ファイルとして BLOB ストレージ (S3、ADLS、GCS) に保存して、稼働中と見なすことができます。

CDCのキャプチャ

さまざまなCDCツールが利用可能です。オープンソースのリーダーソリューションの1つはDebeziumですが、Fivetran、Qlik Replicate、Streamset、Talend、Oracle GoldenGate、AWS DMSなど、データソースを簡素化する他の実装も存在します。

このチュートリアルでは、Debezium や DMS などの外部システムからの CDC データを使用します。Debezium は、変更されたすべての行をキャプチャします。通常、データ変更の履歴を Kafka ログに送信するか、ファイルとして保存します。

customers テーブル (JSON 形式) から CDC 情報を取り込み、それが正しいことを確認してから、レイクハウスで顧客テーブルを具体化する必要があります。

Debezium からの CDC 入力

変更ごとに、更新される行のすべてのフィールド (idfirstnamelastnameemailaddress) を含む JSON メッセージを受信します。さらに、次のような追加のメタデータ情報があります。

  • operation: 操作コード (通常は (DELETEAPPENDUPDATE) です。
  • operation_date: 各操作アクションのレコードの日付とタイムスタンプ。

Debezium のようなツールは、変更前の行値など、より高度な出力を生成できますが、このチュートリアルでは簡単にするために省略します。

ステップ0:チュートリアルデータのセットアップ

まず、新しいノートブックを作成し、このチュートリアルで使用するデモ ファイルをワークスペースにインストールする必要があります。

  1. 左上隅の [新規 ] をクリックします。

  2. [ノートブック] をクリックします。

  3. ノートブックのタイトルを Untitled ノートブック <date and time> から DLT tutorial setup . に変更します。

  4. 上部にあるノートブックのタイトルの横にある、ノートブックのデフォルト言語を Python に設定します。

  5. チュートリアルで使用するデータセットを生成するには、最初のセルに次のコードを入力し、 Shift + Enter キーを押してコードを実行します。

    Python
    # You can change the catalog, schema, dbName, and db. If you do so, you must also
    # change the names in the rest of the tutorial.
    catalog = "main"
    schema = dbName = db = "dbdemos_dlt_cdc"
    volume_name = "raw_data"

    spark.sql(f'CREATE CATALOG IF NOT EXISTS `{catalog}`')
    spark.sql(f'USE CATALOG `{catalog}`')
    spark.sql(f'CREATE SCHEMA IF NOT EXISTS `{catalog}`.`{schema}`')
    spark.sql(f'USE SCHEMA `{schema}`')
    spark.sql(f'CREATE VOLUME IF NOT EXISTS `{catalog}`.`{schema}`.`{volume_name}`')
    volume_folder = f"/Volumes/{catalog}/{db}/{volume_name}"

    try:
    dbutils.fs.ls(volume_folder+"/customers")
    except:
    print(f"folder doesn't exists, generating the data under {volume_folder}...")
    from pyspark.sql import functions as F
    from faker import Faker
    from collections import OrderedDict
    import uuid
    fake = Faker()
    import random

    fake_firstname = F.udf(fake.first_name)
    fake_lastname = F.udf(fake.last_name)
    fake_email = F.udf(fake.ascii_company_email)
    fake_date = F.udf(lambda:fake.date_time_this_month().strftime("%m-%d-%Y %H:%M:%S"))
    fake_address = F.udf(fake.address)
    operations = OrderedDict([("APPEND", 0.5),("DELETE", 0.1),("UPDATE", 0.3),(None, 0.01)])
    fake_operation = F.udf(lambda:fake.random_elements(elements=operations, length=1)[0])
    fake_id = F.udf(lambda: str(uuid.uuid4()) if random.uniform(0, 1) < 0.98 else None)

    df = spark.range(0, 100000).repartition(100)
    df = df.withColumn("id", fake_id())
    df = df.withColumn("firstname", fake_firstname())
    df = df.withColumn("lastname", fake_lastname())
    df = df.withColumn("email", fake_email())
    df = df.withColumn("address", fake_address())
    df = df.withColumn("operation", fake_operation())
    df_customers = df.withColumn("operation_date", fake_date())
    df_customers.repartition(100).write.format("json").mode("overwrite").save(volume_folder+"/customers")
  6. このチュートリアルで使用するデータをプレビューするには、次のセルにコードを入力し、 Shift + Enter キーを押してコードを実行します。

    Python
    display(spark.read.json("/Volumes/main/dbdemos_dlt_cdc/raw_data/customers"))

ステップ 1: パイプラインを作成する

まず、DLT で ETL パイプラインを作成します。DLT は、DLT 構文を使用してノートブックまたはファイル ( ソース コード と呼ばれます) で定義された依存関係を解決することでパイプラインを作成します。各ソース コード ファイルには 1 つの言語のみを含めることができますが、パイプラインには複数の言語固有のノートブックまたはファイルを追加できます。詳細については、DLTを参照してください。

important

ソース コード フィールドを空白のままにすると、ソース コード作成用のノートブックが自動的に作成および構成されます。

このチュートリアルでは、サーバレス コンピュート と Unity Catalogを使用します。 指定されていないすべての構成オプションについては、デフォルト設定を使用します。サーバレス コンピュートがワークスペースで有効になっていないか、サポートされていない場合は、デフォルト コンピュートの設定を使用して、記載されているとおりにチュートリアルを完了できます。 デフォルト コンピュート設定を使用する場合は、 Create パイプライン Unity Catalog UI の[Destination] セクションの[Storage options] で [Storage] を手動で選択する必要があります。

DLT で新しい ETL パイプラインを作成するには、次の手順に従います。

  1. サイドバーで、「 パイプライン」 をクリックします。
  2. パイプラインの作成ETL パイプライン をクリックします。
  3. パイプライン名 に、一意のパイプライン名を入力します。
  4. サーバレス チェックボックスを選択します。
  5. パイプラインモードで トリガー を選択します。これにより、AvailableNow トリガーを使用してストリーミング フローが実行され、既存のすべてのデータが処理され、ストリームがシャットダウンされます。
  6. 公開先 で、テーブルが公開される Unity Catalog の場所を構成するには、既存の カタログ を選択し、 スキーマ に新しい名前を書き込み、カタログに新しいスキーマを作成します。
  7. 作成 をクリックします。

新しいパイプラインのパイプライン UI が表示されます。

空白のソース コード ノートブックが自動的に作成され、パイプライン用に構成されます。ノートブックは、ユーザー・ディレクトリー内の新しいディレクトリーに作成されます。新しいディレクトリとファイルの名前は、パイプラインの名前と一致します。たとえば、 /Users/someone@example.com/my_pipeline/my_pipelineです。

  1. このノートブックにアクセスするためのリンクは、 パイプラインの詳細 パネルの ソース コード フィールドの下にあります。リンクをクリックしてノートブックを開き、次のステップに進みます。
  2. 右上の 接続 をクリックして、コンピュート設定メニューを開きます。
  3. ステップ 1 で作成したパイプラインの名前にカーソルを合わせます。
  4. 接続 」をクリックします。
  5. 上部にあるノートブックのタイトルの横にあるノートブックのデフォルト言語(Python または SQL)を選択します。
important

ノートブックには、1 つのプログラミング言語のみを含めることができます。 パイプラインのソースコードノートブックで Python コードと SQL コードを混在させないでください。

DLT パイプラインを開発する場合は、Python または SQL のいずれかを選択できます。このチュートリアルには、両方の言語の例が含まれています。選択した言語に基づいて、デフォルトのノートブック言語を選択していることを確認します。

DLTパイプラインコード開発におけるノートブック サポートの詳細については、DLTにおけるノートブックを用いたETLパイプラインの開発とデバッグ を参照してください。

ステップ 2: Auto Loader を使用してデータを段階的に取り込む

最初のステップは、クラウドストレージからブロンズレイヤーに生データを取り込むことです。

これは、次のような理由から、さまざまな理由で困難な場合があります。

  • 大規模に運用するため、数百万の小さなファイルを取り込む可能性があります。
  • スキーマと JSON 型を推論します。
  • 正しくない JSON スキーマで不良レコードを処理します。
  • スキーマの進化 (たとえば、顧客テーブルの新しい列) に注意してください。

Auto Loader 、スキーマの推論やスキーマの進化など、このインジェストを簡略化すると同時に、数百万の受信ファイルにスケーリングできます。 Auto Loaderは、cloudFilesを使用してPython、SELECT * FROM STREAM read_files(...)を使用してSQLで利用でき、さまざまな形式(JSON、CSV、Apache Avroなど)で使用できます。

テーブルをストリーミングテーブルとして定義すると、新しい受信データのみが消費されることが保証されます。 ストリーミングテーブルとして定義しない場合は、使用可能なすべてのデータをスキャンして取り込むことになります。詳細については、 ストリーミングテーブル を参照してください。

  1. Auto Loaderを使用して受信データを取り込むには、次のコードをコピーしてノートブックの最初のセルに貼り付けます。Python または SQLを使用できます。前の手順で選択したノートブックのデフォルト言語によって異なります。

    タブ :::タブ-item[Python]

    Python
    from dlt import *
    from pyspark.sql.functions import *

    # Create the target bronze table
    dlt.create_streaming_table("customers_cdc_bronze", comment="New customer data incrementally ingested from cloud object storage landing zone")

    # Create an Append Flow to ingest the raw data into the bronze table
    @append_flow(
    target = "customers_cdc_bronze",
    name = "customers_bronze_ingest_flow"
    )
    def customers_bronze_ingest_flow():
    return (
    spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.inferColumnTypes", "true")
    .load("/Volumes/main/dbdemos_dlt_cdc/raw_data/customers")
    )

    :::

    タブ-item[sql]

    SQL
    CREATE OR REFRESH STREAMING TABLE customers_cdc_bronze
    COMMENT "New customer data incrementally ingested from cloud object storage landing zone";

    CREATE FLOW customers_bronze_ingest_flow AS
    INSERT INTO customers_cdc_bronze BY NAME
    SELECT *
    FROM STREAM read_files(
    "/Volumes/main/dbdemos_dlt_cdc/raw_data/customers",
    format => "json",
    inferColumnTypes => "true"
    )

    ::: ::::

  2. [ 開始 ] をクリックして、接続されたパイプラインの更新を開始します。

ステップ 3: クリーンアップとデータ品質を追跡するための期待値

ブロンズレイヤーを定義したら、次の条件を確認してデータ品質を制御する期待値を追加して、シルバーレイヤーを作成します。

  • ID は nullしないでください。
  • CDC 操作タイプは有効である必要があります。
  • jsonはAuto Loaderによって適切に読まれている必要があります。

これらの条件のいずれかが守られていない場合、行は削除されます。

詳細については、「 パイプラインの期待値を使用してデータ品質を管理する 」を参照してください。

  1. クリック 編集 して 下にセルを挿入 新しい空のセルを挿入します。

  2. クレンジングされたテーブルを使用してシルバーレイヤーを作成し、制約を適用するには、次のコードをコピーしてノートブックの新しいセルに貼り付けます。

    タブ :::タブ-item[Python]

    Python
    dlt.create_streaming_table(
    name = "customers_cdc_clean",
    expect_all_or_drop = {"no_rescued_data": "_rescued_data IS NULL","valid_id": "id IS NOT NULL","valid_operation": "operation IN ('APPEND', 'DELETE', 'UPDATE')"}
    )

    @append_flow(
    target = "customers_cdc_clean",
    name = "customers_cdc_clean_flow"
    )
    def customers_cdc_clean_flow():
    return (
    dlt.read_stream("customers_cdc_bronze")
    .select("address", "email", "id", "firstname", "lastname", "operation", "operation_date", "_rescued_data")
    )

    :::

    タブ-item[sql]

    SQL
    CREATE OR REFRESH STREAMING TABLE customers_cdc_clean (
    CONSTRAINT no_rescued_data EXPECT (_rescued_data IS NULL) ON VIOLATION DROP ROW,
    CONSTRAINT valid_id EXPECT (id IS NOT NULL) ON VIOLATION DROP ROW,
    CONSTRAINT valid_operation EXPECT (operation IN ('APPEND', 'DELETE', 'UPDATE')) ON VIOLATION DROP ROW
    )
    COMMENT "New customer data incrementally ingested from cloud object storage landing zone";

    CREATE FLOW customers_cdc_clean_flow AS
    INSERT INTO customers_cdc_clean BY NAME
    SELECT * FROM STREAM customers_cdc_bronze;

    ::: ::::

  3. [ 開始 ] をクリックして、接続されたパイプラインの更新を開始します。

ステップ 4: 変更を適用して顧客テーブルを具体化する

customers テーブルには最新のビューが含まれ、元のテーブルのレプリカになります。

これは、手動で実装するのは簡単ではありません。最新の行を保持するために、データ重複排除などを考慮する必要があります。

ただし、DLT は 変更の適用 操作でこれらの課題を解決します。

  1. クリック 編集 して 下にセルを挿入 新しい空のセルを挿入します。

  2. DLT で 変更の適用 を使用して CDC データを処理するには、次のコードをコピーしてノートブックの新しいセルに貼り付けます。

    タブ :::タブ-item[Python]

    Python
    dlt.create_streaming_table(name="customers", comment="Clean, materialized customers")

    dlt.apply_changes(
    target="customers", # The customer table being materialized
    source="customers_cdc_clean", # the incoming CDC
    keys=["id"], # what we'll be using to match the rows to upsert
    sequence_by=col("operation_date"), # de-duplicate by operation date, getting the most recent value
    ignore_null_updates=False,
    apply_as_deletes=expr("operation = 'DELETE'"), # DELETE condition
    except_column_list=["operation", "operation_date", "_rescued_data"],
    )

    :::

    タブ-item[sql]

    SQL
    CREATE OR REFRESH STREAMING TABLE customers;

    APPLY CHANGES INTO customers
    FROM stream(customers_cdc_clean)
    KEYS (id)
    APPLY AS DELETE WHEN
    operation = "DELETE"
    SEQUENCE BY operation_date
    COLUMNS * EXCEPT (operation, operation_date, _rescued_data)
    STORED AS SCD TYPE 1;

    ::: ::::

  3. [ 開始 ] をクリックして、接続されたパイプラインの更新を開始します。

ステップ5:タイプ2(SCD2)の緩やかに変化する寸法

多くの場合、 APPENDUPDATE、および DELETEに起因するすべての変更を追跡するテーブルを作成する必要があります。

  • 履歴: テーブルに対するすべての変更の履歴を保持する必要があります。
  • トレーサビリティ: どの操作が発生したかを確認する必要があります。

DLT付きSCD2

Delta は変更データ フロー (CDF) をサポートしており、 table_change SQL と Python でテーブルの変更をクエリできます。ただし、CDF の主な使用例は、パイプラインの変更をキャプチャすることであり、テーブルの変更の全体像を最初から作成することはありません。

順不同のイベントがある場合、実装は特に複雑になります。タイムスタンプで変更を順序付けし、過去に発生した変更を受け取る必要がある場合は、SCD テーブルに新しいエントリを追加し、以前のエントリを更新する必要があります。

DLT は、この複雑さを取り除き、時間の初めからのすべての変更を含む別のテーブルを作成することができます。このテーブルは、必要に応じて特定のパーティション/zorder列を使用して、大規模に使用できます。順不同のフィールドは、_sequence_byに基づいてすぐに処理されます

SCD2テーブルを作成するには、SQLのSTORED AS SCD TYPE 2またはPythonのstored_as_scd_type="2"という追加オプションを指定してAPPLY CHANGESを使用する必要があります。

注記

また、次のオプションを使用して、フィーチャが追跡する列を制限することもできます。 TRACK HISTORY ON {columnList | EXCEPT(exceptColumnList)}

  1. クリック 編集 して 下にセルを挿入 新しい空のセルを挿入します。

  2. 次のコードをコピーして、ノートブックの新しいセルに貼り付けます。

    タブ :::タブ-item[Python]

    Python
    # create the table
    dlt.create_streaming_table(
    name="customers_history", comment="Slowly Changing Dimension Type 2 for customers"
    )

    # store all changes as SCD2
    dlt.apply_changes(
    target="customers_history",
    source="customers_cdc_clean",
    keys=["id"],
    sequence_by=col("operation_date"),
    ignore_null_updates=False,
    apply_as_deletes=expr("operation = 'DELETE'"),
    except_column_list=["operation", "operation_date", "_rescued_data"],
    stored_as_scd_type="2",
    ) # Enable SCD2 and store individual updates

    :::

    タブ-item[sql]

    SQL
    CREATE OR REFRESH STREAMING TABLE customers_history;

    APPLY CHANGES INTO customers_history
    FROM stream(customers_cdc_clean)
    KEYS (id)
    APPLY AS DELETE WHEN
    operation = "DELETE"
    SEQUENCE BY operation_date
    COLUMNS * EXCEPT (operation, operation_date, _rescued_data)
    STORED AS SCD TYPE 2;

    ::: ::::

  3. [ 開始 ] をクリックして、接続されたパイプラインの更新を開始します。

ステップ 6: マテリアライズドビューを作成し、誰が最も情報を変更したかを追跡する

customers_history表には、ユーザーが情報に対して行ったすべての履歴変更が含まれています。次に、ゴールドレイヤーに単純なマテリアライズドビューを作成し、誰が情報を最も多く変更したかを追跡します。 これは、実際のシナリオでの不正検出分析やユーザーの推奨事項に使用できます。さらに、SCD2 で変更を適用すると、すでに重複が削除されているため、ユーザー ID ごとに行を直接カウントできます。

  1. クリック 編集 して 下にセルを挿入 新しい空のセルを挿入します。

  2. 次のコードをコピーして、ノートブックの新しいセルに貼り付けます。

    タブ :::タブ-item[Python]

    Python
    @dlt.table(
    name = "customers_history_agg",
    comment = "Aggregated customer history"
    )
    def customers_history_agg():
    return (
    dlt.read("customers_history")
    .groupBy("id")
    .agg(
    count("address").alias("address_count"),
    count("email").alias("email_count"),
    count("firstname").alias("firstname_count"),
    count("lastname").alias("lastname_count")
    )
    )

    :::

    タブ-item[sql]

    SQL
    CREATE OR REPLACE MATERIALIZED VIEW customers_history_agg AS
    SELECT
    id,
    count("address") as address_count,
    count("email") AS email_count,
    count("firstname") AS firstname_count,
    count("lastname") AS lastname_count
    FROM customers_history
    GROUP BY id

    ::: ::::

  3. [ 開始 ] をクリックして、接続されたパイプラインの更新を開始します。

ステップ 7: DLT パイプラインを実行するジョブを作成する

次に、 Databricks ジョブを使用してデータ取り込み、処理、分析のステップを自動化するワークフローを作成します。

  1. ワークスペースで、ワークフローアイコン サイドバーの ワークフロー をクリックし、 ジョブを作成 をクリックします。
  2. タスク タイトル ボックスで、 新しいジョブ <date and time> をジョブ名に置き換えます。 たとえば、 CDC customers workflow.
  3. [タスク名] に、最初のタスクの名前を入力します(例:ETL_customers_data)。
  4. 種類パイプライン を選択します。
  5. パイプライン で、ステップ 1 で作成した DLT パイプラインを選択します。
  6. 作成 をクリックします。
  7. ワークフローを実行するには、 今すぐ実行 をクリックします。 実行の詳細を表示するには、 実行 タブをクリックします。 タスクをクリックして、タスク実行の詳細を表示します。
  8. ワークフローの完了時に結果を表示するには、 成功した最新の実行に移動する か、ジョブ実行の開始 時刻 をクリックします。 出力 ページが表示され、クエリ結果が表示されます。

ジョブの実行の詳細については、「Databricksジョブのモニタリングと可観測性」を参照してください。

ステップ 8: DLT パイプライン ジョブをスケジュールする

ETL パイプラインをスケジュールに従って実行するには、次の手順を実行します。

  1. サイドバーのワークフローアイコンワークフロー ]をクリックします。
  2. [ 名前 ] 列で、ジョブ名をクリックします。サイドパネルが ジョブの詳細 として表示されます。
  3. スケジュールとトリガー パネルで トリガーの追加 をクリックし、 トリガー・タイプスケジュール済み を選択します。
  4. 期間、開始時刻、およびタイムゾーンを指定します。
  5. 保存 をクリックします。

追加のリソース