メインコンテンツまでスキップ

チュートリアル: LakeFlow 宣言型パイプラインとチェンジデータキャプチャを用いたETLパイプラインの構築

Auto Loaderとデータ オーケストレーションのためのLakeFlow宣言型パイプラインを使用して、チェンジデータキャプチャ (CDC) を使用してETL(抽出、変換、読み込み) パイプラインを作成およびデプロイする方法 学習します。ETL パイプラインは、ソース システムからデータを読み取り、データ品質チェックやレコード重複排除などの要件に基づいてそのデータを変換し、データ ウェアハウスやデータレイクなどのターゲット システムにデータを書き込む手順を実装します。

このチュートリアルでは、MySQL データベースの customers テーブルのデータを使用して、次の操作を行います。

  • Debezium またはその他のツールを使用してトランザクションデータベースから変更を抽出し、クラウドオブジェクトストレージ (S3 フォルダー、ADLS、GCS) に保存します。チュートリアルを簡略化するために、外部 CDC システムの設定をスキップします。
  • Auto Loader を使用して、クラウド・オブジェクト・ストレージからメッセージを増分的にロードし、未加工のメッセージを customers_cdc テーブルに保管します。Auto Loaderはスキーマを推論し、スキーマの進化を処理します。
  • ビュー customers_cdc_clean を追加して、エクスペクテーションを使用してデータ品質を確認します。たとえば、 id は、アップサート操作を実行するために使用するため、 null しないでください。
  • クリーニングされた CDC データに対して AUTO CDC ... INTO (アップサートの実行) を実行して、最終的な customers テーブルに変更を適用します
  • LakeFlow宣言型パイプラインは、タイプ2の緩やかに変化するディメンション(SCD2)を作成して、すべての変更を追跡する方法を示します。

目標は、生データをほぼリアルタイムで取り込み、データ品質を確保しながらアナリストチーム用のテーブルを構築することです。

このチュートリアルでは、メダリオン レイクハウス アーキテクチャを使用して、ブロンズレイヤーを介して生データを取り込み、シルバーレイヤーでデータのクリーニングと検証を行い、ゴールドレイヤーを使用してディメンション モデリングと集計を適用します。 詳細は、メダリオンレイクハウスアーキテクチャとは何ですか?をご覧ください。

実装するフローは、次のようになります。

CDCを使用したLDP

LakeFlow 宣言型パイプライン、Auto Loader、CDCの詳細については、LakeFlow 宣言型パイプラインAuto Loaderとは、およびチェンジデータキャプチャとは (CDC)を参照してください。

必要条件

このチュートリアルを完了するには、以下の条件を満たす必要があります。

ETLパイプラインにおけるチェンジデータキャプチャ

チェンジデータキャプチャ (CDC) は、トランザクションデータベース ( MySQL や PostgreSQLなど) またはデータウェアハウスに対して行われたレコードの変更をキャプチャするプロセスです。 CDC は、データの削除、追加、更新などの操作を、通常は外部システムのテーブルを再具体化するためのストリームとしてキャプチャします。CDC は、一括読み込みの更新を不要にしながら、増分読み込みを可能にします。

注記

チュートリアルを簡略化するために、外部 CDC システムのセットアップはスキップします。CDC データを JSON ファイルとして BLOB ストレージ (S3、ADLS、GCS) に保存して、稼働中と見なすことができます。

CDCのキャプチャ

さまざまなCDCツールが利用可能です。オープンソースのリーダーソリューションの1つはDebeziumですが、Fivetran、Qlik Replicate、Streamset、Talend、Oracle GoldenGate、AWS DMSなど、データソースを簡素化する他の実装も存在します。

このチュートリアルでは、Debezium や DMS などの外部システムからの CDC データを使用します。Debezium は、変更されたすべての行をキャプチャします。通常、データ変更の履歴を Kafka ログに送信するか、ファイルとして保存します。

customers テーブル (JSON 形式) から CDC 情報を取り込み、それが正しいことを確認してから、レイクハウスで顧客テーブルを具体化する必要があります。

Debezium からの CDC 入力

変更ごとに、更新される行のすべてのフィールド (idfirstnamelastnameemailaddress) を含む JSON メッセージを受信します。さらに、次のような追加のメタデータ情報があります。

  • operation: 操作コード (通常は (DELETEAPPENDUPDATE) です。
  • operation_date: 各操作アクションのレコードの日付とタイムスタンプ。

Debezium のようなツールは、変更前の行値など、より高度な出力を生成できますが、このチュートリアルでは簡単にするために省略します。

ステップ0:チュートリアルデータのセットアップ

まず、新しいノートブックを作成し、このチュートリアルで使用するデモ ファイルをワークスペースにインストールする必要があります。

  1. 左上隅の 新規 をクリックします。

  2. [ノートブック] をクリックします。

  3. ノートブックのタイトルを 無題のノートブック... から パイプライン チュートリアルセットアップ に変更します。

  4. 上部にあるノートブックのタイトルの横にある、ノートブックのデフォルト言語を Python に設定します。

  5. チュートリアルで使用するデータセットを生成するには、最初のセルに次のコードを入力し、 Shift + Enter キーを押してコードを実行します。

    Python
    # You can change the catalog, schema, dbName, and db. If you do so, you must also
    # change the names in the rest of the tutorial.
    catalog = "main"
    schema = dbName = db = "dbdemos_dlt_cdc"
    volume_name = "raw_data"

    spark.sql(f'CREATE CATALOG IF NOT EXISTS `{catalog}`')
    spark.sql(f'USE CATALOG `{catalog}`')
    spark.sql(f'CREATE SCHEMA IF NOT EXISTS `{catalog}`.`{schema}`')
    spark.sql(f'USE SCHEMA `{schema}`')
    spark.sql(f'CREATE VOLUME IF NOT EXISTS `{catalog}`.`{schema}`.`{volume_name}`')
    volume_folder = f"/Volumes/{catalog}/{db}/{volume_name}"

    try:
    dbutils.fs.ls(volume_folder+"/customers")
    except:
    print(f"folder doesn't exists, generating the data under {volume_folder}...")
    from pyspark.sql import functions as F
    from faker import Faker
    from collections import OrderedDict
    import uuid
    fake = Faker()
    import random

    fake_firstname = F.udf(fake.first_name)
    fake_lastname = F.udf(fake.last_name)
    fake_email = F.udf(fake.ascii_company_email)
    fake_date = F.udf(lambda:fake.date_time_this_month().strftime("%m-%d-%Y %H:%M:%S"))
    fake_address = F.udf(fake.address)
    operations = OrderedDict([("APPEND", 0.5),("DELETE", 0.1),("UPDATE", 0.3),(None, 0.01)])
    fake_operation = F.udf(lambda:fake.random_elements(elements=operations, length=1)[0])
    fake_id = F.udf(lambda: str(uuid.uuid4()) if random.uniform(0, 1) < 0.98 else None)

    df = spark.range(0, 100000).repartition(100)
    df = df.withColumn("id", fake_id())
    df = df.withColumn("firstname", fake_firstname())
    df = df.withColumn("lastname", fake_lastname())
    df = df.withColumn("email", fake_email())
    df = df.withColumn("address", fake_address())
    df = df.withColumn("operation", fake_operation())
    df_customers = df.withColumn("operation_date", fake_date())
    df_customers.repartition(100).write.format("json").mode("overwrite").save(volume_folder+"/customers")
  6. このチュートリアルで使用するデータをプレビューするには、次のセルにコードを入力し、 Shift + Enter キーを押してコードを実行します。

    Python
    display(spark.read.json("/Volumes/main/dbdemos_dlt_cdc/raw_data/customers"))

ステップ 1: パイプラインを作成する

まず、LakeFlow宣言型パイプラインで ETL パイプラインを作成します。 LakeFlow 宣言型パイプラインは、宣言型パイプライン構文を使用してノートブックまたはファイル ( ソース コード と呼ばれます) で定義された依存関係を解決することで LakeFlow パイプラインを作成します。 各ソース コード ファイルには 1 つの言語のみを含めることができますが、パイプラインには複数の言語固有のノートブックまたはファイルを追加できます。詳細については、「LakeFlow 宣言型パイプライン」を参照してください。

important

ソース コード フィールドを空白のままにすると、ソース コード作成用のノートブックが自動的に作成および構成されます。

このチュートリアルでは、サーバレス コンピュート と Unity Catalogを使用します。 指定されていないすべての構成オプションについては、デフォルト設定を使用します。サーバレス コンピュートがワークスペースで有効になっていないか、サポートされていない場合は、デフォルト コンピュートの設定を使用して、記載されているとおりにチュートリアルを完了できます。 デフォルト コンピュート設定を使用する場合は、 パイプラインの作成 UIにおける 配信先 セクションの ストレージオプション 配下で Unity Catalog を手動で選択する必要があります。

宣言型パイプラインで新しい ETL パイプラインを作成するには LakeFlow 次の手順を実行します。

  1. ワークスペースで、サイドバーの ワークフローアイコン。 Jobs & パイプライン をクリックします。
  2. [ 新規 ] で、[ ETL パイプライン ] をクリックします。
  3. パイプライン名 に、一意のパイプライン名を入力します。
  4. サーバレス チェックボックスを選択します。
  5. パイプラインモードで トリガー を選択します。これにより、AvailableNow トリガーを使用してストリーミング フローが実行され、既存のすべてのデータが処理され、ストリームがシャットダウンされます。
  6. 配信先 で、テーブルが公開される Unity Catalog の場所を構成するには、既存の カタログ を選択し、 スキーマ に新しい名前を書き込み、カタログに新しいスキーマを作成します。
  7. 作成 をクリックします。

新しいパイプラインのパイプライン UI が表示されます。

空白のソース コード ノートブックが自動的に作成され、パイプライン用に構成されます。ノートブックは、ユーザー・ディレクトリー内の新しいディレクトリーに作成されます。新しいディレクトリとファイルの名前は、パイプラインの名前と一致します。たとえば、 /Users/someone@example.com/my_pipeline/my_pipelineです。

  1. このノートブックにアクセスするためのリンクは、 パイプラインの詳細 パネルの ソース コード フィールドの下にあります。リンクをクリックしてノートブックを開き、次のステップに進みます。
  2. 右上の 接続 をクリックして、コンピュート設定メニューを開きます。
  3. ステップ 1 で作成したパイプラインの名前にカーソルを合わせます。
  4. 接続 」をクリックします。
  5. 上部にあるノートブックのタイトルの横にあるノートブックのデフォルト言語(Python または SQL)を選択します。
important

ノートブックには、1 つのプログラミング言語のみを含めることができます。 パイプラインのソースコードノートブックで Python コードと SQL コードを混在させないでください。

LakeFlow 宣言型パイプラインを開発する場合、Python または SQLのいずれかを選択できます。このチュートリアルには、両方の言語の例が含まれています。選択した言語に基づいて、デフォルトのノートブック言語を選択していることを確認します。

LakeFlow宣言型パイプラインのコード開発に対するノートブックのサポートの詳細については、LakeFlow宣言型パイプラインのノートブックを使用して ETLパイプラインを開発およびデバッグするを参照してください。

ステップ 2: Auto Loader を使用してデータを段階的に取り込む

最初のステップは、クラウドストレージからブロンズレイヤーに生データを取り込むことです。

これは、次のような理由から、さまざまな理由で困難な場合があります。

  • 大規模に運用するため、数百万の小さなファイルを取り込む可能性があります。
  • スキーマと JSON 型を推論します。
  • 正しくない JSON スキーマで不良レコードを処理します。
  • スキーマの進化 (たとえば、顧客テーブルの新しい列) に注意してください。

Auto Loader 、スキーマの推論やスキーマの進化など、このインジェストを簡略化すると同時に、数百万の受信ファイルにスケーリングできます。 Auto Loaderは、cloudFilesを使用してPython、SELECT * FROM STREAM read_files(...)を使用してSQLで利用でき、さまざまな形式(JSON、CSV、Apache Avroなど)で使用できます。

テーブルをストリーミングテーブルとして定義すると、新しい受信データのみが消費されることが保証されます。 ストリーミングテーブルとして定義しない場合は、使用可能なすべてのデータをスキャンして取り込むことになります。詳細については、 ストリーミングテーブル を参照してください。

  1. Auto Loaderを使用して受信データを取り込むには、次のコードをコピーしてノートブックの最初のセルに貼り付けます。Python または SQLを使用できます。前の手順で選択したノートブックのデフォルト言語によって異なります。

    タブ :::タブ-item[Python]

    Python
    from dlt import *
    from pyspark.sql.functions import *

    # Create the target bronze table
    dlt.create_streaming_table("customers_cdc_bronze", comment="New customer data incrementally ingested from cloud object storage landing zone")

    # Create an Append Flow to ingest the raw data into the bronze table
    @append_flow(
    target = "customers_cdc_bronze",
    name = "customers_bronze_ingest_flow"
    )
    def customers_bronze_ingest_flow():
    return (
    spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.inferColumnTypes", "true")
    .load("/Volumes/main/dbdemos_dlt_cdc/raw_data/customers")
    )

    :::

    タブ-item[sql]

    SQL
    CREATE OR REFRESH STREAMING TABLE customers_cdc_bronze
    COMMENT "New customer data incrementally ingested from cloud object storage landing zone";

    CREATE FLOW customers_bronze_ingest_flow AS
    INSERT INTO customers_cdc_bronze BY NAME
    SELECT *
    FROM STREAM read_files(
    "/Volumes/main/dbdemos_dlt_cdc/raw_data/customers",
    format => "json",
    inferColumnTypes => "true"
    )

    ::: ::::

  2. [ 開始 ] をクリックして、接続されたパイプラインの更新を開始します。

ステップ 3: クリーンアップとデータ品質を追跡するためのエクスペクテーション

ブロンズレイヤーを定義したら、次の条件を確認してデータ品質を制御するエクスペクテーションを追加して、シルバーレイヤーを作成します。

  • ID は nullしないでください。
  • CDC 操作タイプは有効である必要があります。
  • jsonはAuto Loaderによって適切に読まれている必要があります。

これらの条件のいずれかが守られていない場合、行は削除されます。

詳細については、「 パイプラインのエクスペクテーションを使用してデータ品質を管理する 」を参照してください。

  1. クリック 編集 して 下にセルを挿入 新しい空のセルを挿入します。

  2. クレンジングされたテーブルを使用してシルバーレイヤーを作成し、制約を適用するには、次のコードをコピーしてノートブックの新しいセルに貼り付けます。

    タブ :::タブ-item[Python]

    Python
    dlt.create_streaming_table(
    name = "customers_cdc_clean",
    expect_all_or_drop = {"no_rescued_data": "_rescued_data IS NULL","valid_id": "id IS NOT NULL","valid_operation": "operation IN ('APPEND', 'DELETE', 'UPDATE')"}
    )

    @append_flow(
    target = "customers_cdc_clean",
    name = "customers_cdc_clean_flow"
    )
    def customers_cdc_clean_flow():
    return (
    dlt.read_stream("customers_cdc_bronze")
    .select("address", "email", "id", "firstname", "lastname", "operation", "operation_date", "_rescued_data")
    )

    :::

    タブ-item[sql]

    SQL
    CREATE OR REFRESH STREAMING TABLE customers_cdc_clean (
    CONSTRAINT no_rescued_data EXPECT (_rescued_data IS NULL) ON VIOLATION DROP ROW,
    CONSTRAINT valid_id EXPECT (id IS NOT NULL) ON VIOLATION DROP ROW,
    CONSTRAINT valid_operation EXPECT (operation IN ('APPEND', 'DELETE', 'UPDATE')) ON VIOLATION DROP ROW
    )
    COMMENT "New customer data incrementally ingested from cloud object storage landing zone";

    CREATE FLOW customers_cdc_clean_flow AS
    INSERT INTO customers_cdc_clean BY NAME
    SELECT * FROM STREAM customers_cdc_bronze;

    ::: ::::

  3. [ 開始 ] をクリックして、接続されたパイプラインの更新を開始します。

ステップ4:AUTO CDC フローによる顧客テーブルの具体化

customers テーブルには最新のビューが含まれ、元のテーブルのレプリカになります。

これは、手動で実装するのは簡単ではありません。最新の行を保持するために、データ重複排除などを考慮する必要があります。

ただし、 LakeFlow 宣言型パイプラインは、 AUTO CDC 操作でこれらの課題を解決します。

  1. クリック 編集 して 下にセルを挿入 新しい空のセルを挿入します。

  2. 宣言型パイプラインのAUTO CDCを使用してCDCデータを処理するにはLakeFlow次のコードをコピーしてノートブックの新しいセルに貼り付けます。

    タブ :::タブ-item[Python]

    Python
    dlt.create_streaming_table(name="customers", comment="Clean, materialized customers")

    dlt.create_auto_cdc_flow(
    target="customers", # The customer table being materialized
    source="customers_cdc_clean", # the incoming CDC
    keys=["id"], # what we'll be using to match the rows to upsert
    sequence_by=col("operation_date"), # de-duplicate by operation date, getting the most recent value
    ignore_null_updates=False,
    apply_as_deletes=expr("operation = 'DELETE'"), # DELETE condition
    except_column_list=["operation", "operation_date", "_rescued_data"],
    )

    :::

    タブ-item[sql]

    SQL
    CREATE OR REFRESH STREAMING TABLE customers;

    CREATE FLOW customers_cdc_flow
    AS AUTO CDC INTO customers
    FROM stream(customers_cdc_clean)
    KEYS (id)
    APPLY AS DELETE WHEN
    operation = "DELETE"
    SEQUENCE BY operation_date
    COLUMNS * EXCEPT (operation, operation_date, _rescued_data)
    STORED AS SCD TYPE 1;

    ::: ::::

  3. [ 開始 ] をクリックして、接続されたパイプラインの更新を開始します。

ステップ5:タイプ2のSlowly Changing Dimension(SCD2)

多くの場合、 APPENDUPDATE、および DELETEに起因するすべての変更を追跡するテーブルを作成する必要があります。

  • 履歴: テーブルに対するすべての変更の履歴を保持する必要があります。
  • トレーサビリティ: どの操作が発生したかを確認する必要があります。

SCD2 と LakeFlow 宣言型パイプライン

Delta は変更データ フロー (CDF) をサポートしており、 table_change SQL と Python でテーブルの変更をクエリできます。ただし、CDF の主な使用例は、パイプラインの変更をキャプチャすることであり、テーブルの変更の全体像を最初から作成することはありません。

順不同のイベントがある場合、実装は特に複雑になります。タイムスタンプで変更を順序付けし、過去に発生した変更を受け取る必要がある場合は、SCD テーブルに新しいエントリを追加し、以前のエントリを更新する必要があります。

LakeFlow宣言型パイプラインはこの複雑さを取り除き、時間の開始からのすべての変更を含む別のテーブルを作成できます。このテーブルは、必要に応じて特定のパーティション/zorder列を使用して、大規模に使用できます。順不同のフィールドは、_sequence_byに基づいてすぐに処理されます

SCD2 テーブルを作成するには、SQL の STORED AS SCD TYPE 2 または Python の stored_as_scd_type="2" のオプションを使用する必要があります。

注記

また、次のオプションを使用して、フィーチャが追跡する列を制限することもできます。 TRACK HISTORY ON {columnList | EXCEPT(exceptColumnList)}

  1. クリック 編集 して 下にセルを挿入 新しい空のセルを挿入します。

  2. 次のコードをコピーして、ノートブックの新しいセルに貼り付けます。

    タブ :::タブ-item[Python]

    Python
    # create the table
    dlt.create_streaming_table(
    name="customers_history", comment="Slowly Changing Dimension Type 2 for customers"
    )

    # store all changes as SCD2
    dlt.create_auto_cdc_flow(
    target="customers_history",
    source="customers_cdc_clean",
    keys=["id"],
    sequence_by=col("operation_date"),
    ignore_null_updates=False,
    apply_as_deletes=expr("operation = 'DELETE'"),
    except_column_list=["operation", "operation_date", "_rescued_data"],
    stored_as_scd_type="2",
    ) # Enable SCD2 and store individual updates

    :::

    タブ-item[sql]

    SQL
    CREATE OR REFRESH STREAMING TABLE customers_history;

    CREATE FLOW cusotmers_history_cdc
    AS AUTO CDC INTO
    customers_history
    FROM stream(customers_cdc_clean)
    KEYS (id)
    APPLY AS DELETE WHEN
    operation = "DELETE"
    SEQUENCE BY operation_date
    COLUMNS * EXCEPT (operation, operation_date, _rescued_data)
    STORED AS SCD TYPE 2;

    ::: ::::

  3. [ 開始 ] をクリックして、接続されたパイプラインの更新を開始します。

ステップ 6: マテリアライズドビューを作成し、誰が最も情報を変更したかを追跡する

customers_history表には、ユーザーが情報に対して行ったすべての履歴変更が含まれています。次に、ゴールドレイヤーに単純なマテリアライズドビューを作成し、誰が情報を最も多く変更したかを追跡します。 これは、実際のシナリオでの不正検出分析やユーザーの推奨事項に使用できます。さらに、SCD2 で変更を適用すると、すでに重複が削除されているため、ユーザー ID ごとに行を直接カウントできます。

  1. クリック 編集 して 下にセルを挿入 新しい空のセルを挿入します。

  2. 次のコードをコピーして、ノートブックの新しいセルに貼り付けます。

    タブ :::タブ-item[Python]

    Python
    @dlt.table(
    name = "customers_history_agg",
    comment = "Aggregated customer history"
    )
    def customers_history_agg():
    return (
    dlt.read("customers_history")
    .groupBy("id")
    .agg(
    count("address").alias("address_count"),
    count("email").alias("email_count"),
    count("firstname").alias("firstname_count"),
    count("lastname").alias("lastname_count")
    )
    )

    :::

    タブ-item[sql]

    SQL
    CREATE OR REPLACE MATERIALIZED VIEW customers_history_agg AS
    SELECT
    id,
    count("address") as address_count,
    count("email") AS email_count,
    count("firstname") AS firstname_count,
    count("lastname") AS lastname_count
    FROM customers_history
    GROUP BY id

    ::: ::::

  3. [ 開始 ] をクリックして、接続されたパイプラインの更新を開始します。

ステップ 7: ETL パイプラインを実行するジョブを作成する

次に、 Databricks ジョブを使用してデータ取り込み、処理、分析のステップを自動化するワークフローを作成します。

  1. ワークスペースで、サイドバーの ワークフローアイコン。 Jobs & パイプライン をクリックします。
  2. [新規 ] で、[ ジョブ ] をクリックします。
  3. タスク タイトル ボックスで、 新しいジョブ <date and time> をジョブ名に置き換えます。 たとえば、 CDC customers workflow
  4. [タスク名] に、最初のタスクの名前を入力します(例:ETL_customers_data)。
  5. 種類パイプライン を選択します。
  6. パイプライン で、ステップ 1 で作成したパイプラインを選択します。
  7. 作成 をクリックします。
  8. ワークフローを実行するには、 今すぐ実行 をクリックします。 実行の詳細を表示するには、 実行 タブをクリックします。 タスクをクリックして、タスク実行の詳細を表示します。
  9. ワークフローの完了時に結果を表示するには、 成功した最新の実行に移動する か、ジョブ実行の開始 時刻 をクリックします。 出力 ページが表示され、クエリ結果が表示されます。

ジョブの実行の詳細については、「LakeFlowジョブのモニタリングと可観測性」を参照してください。

ステップ 8: ジョブをスケジュールする

ETL パイプラインをスケジュールに従って実行するには、次の手順を実行します。

  1. ワークフローアイコン。サイドバーの「 ジョブとパイプライン 」をクリックします。
  2. 必要に応じて、[ ジョブ ] と [自分が所有] フィルターを選択します。
  3. [ 名前 ] 列で、ジョブ名をクリックします。サイドパネルが ジョブの詳細 として表示されます。
  4. スケジュールとトリガー パネルで トリガーの追加 をクリックし、 トリガー・タイプスケジュール済み を選択します。
  5. 期間、開始時刻、およびタイムゾーンを指定します。
  6. 保存 をクリックします。

追加のリソース