メインコンテンツまでスキップ

チュートリアル: Lakeflow 宣言型パイプラインとチェンジデータキャプチャを用いたETLパイプラインの構築

Auto Loaderとデータ オーケストレーションのためのLakeflow宣言型パイプラインを使用して、チェンジデータキャプチャ (CDC) を使用してETL(抽出、変換、読み込み) パイプラインを作成およびデプロイする方法 学習します。ETL パイプラインは、ソース システムからデータを読み取り、データ品質チェックやレコード重複排除などの要件に基づいてそのデータを変換し、データ ウェアハウスやデータレイクなどのターゲット システムにデータを書き込む手順を実装します。

このチュートリアルでは、MySQL データベースのcustomersテーブルのデータを使用して次の操作を行います。

  • Debezium またはその他のツールを使用してトランザクション データベースから変更を抽出し、クラウド オブジェクト ストレージ (S3 フォルダー、ADLS、GCS) に保存します。このチュートリアルでは、外部 CDC システムの設定を省略し、代わりに偽造データを生成することでチュートリアルを簡素化します。
  • Auto Loaderを使用して、クラウド オブジェクト ストレージからメッセージを段階的に読み込み、生のメッセージをcustomers_cdcテーブルに保存します。 Auto Loaderスキーマを推測し、スキーマ進化を処理します。
  • 期待値を使用してデータの品質を確認するためのビューcustomers_cdc_cleanを追加します。たとえば、 id 、upsert 操作を実行するために使用されるため、 nullにしないでください。
  • クリーンアップされた CDC データに対してAUTO CDC ... INTO (アップサートの実行) を実行し、最終的なcustomersテーブルに変更を適用します。
  • Lakeflow宣言型パイプラインは、タイプ2の緩やかに変化するディメンション(SCD2)を作成して、すべての変更を追跡する方法を示します。

目標は、生データをほぼリアルタイムで取り込み、データの品質を確保しながらアナリスト チーム向けのテーブルを構築することです。

このチュートリアルでは、メダリオン レイクハウス アーキテクチャを使用して、ブロンズレイヤーを介して生データを取り込み、シルバーレイヤーでデータのクリーニングと検証を行い、ゴールドレイヤーを使用してディメンション モデリングと集計を適用します。 詳細は、メダリオンレイクハウスアーキテクチャとは何ですか?をご覧ください。

実装されたフローは次のようになります。

CDC付きLakeFlow宣言型パイプライン

Lakeflow 宣言型パイプライン、Auto Loader、CDCの詳細については、Lakeflow 宣言型パイプラインAuto Loaderとは、およびチェンジデータキャプチャとは (CDC)を参照してください。

要件

このチュートリアルを完了するには、以下の条件を満たす必要があります。

ETLパイプラインにおけるチェンジデータキャプチャ

変更データキャプチャ ( CDC ) は、トランザクション データベース ( MySQLやPostgreSQLなど) またはデータウェアハウスに対して行われたレコードの変更をキャプチャするプロセスです。 CDC は、データの削除、追加、更新などの操作を、通常は外部システムでテーブルを再マテリアライズするためのストリームとしてキャプチャします。CDC を使用すると、一括ロード更新の必要性を排除しながら増分ロードが可能になります。

注記

チュートリアルを簡略化するために、外部 CDC システムの設定をスキップします。これを実行して、CDC データを JSON ファイルとして BLOB ストレージ (S3、ADLS、GCS) に保存することを検討してください。このチュートリアルでは、 fakerライブラリを使用して、チュートリアルで使用するデータを生成します。

CDCを捕獲する

さまざまな CDC ツールが利用可能です。オープンソースのリーダーソリューションの 1 つは Debezium ですが、Fivetran、Qlik Replicate、Streamset、Talend、Oracle GoldenGate、AWS DMS など、データソースを簡素化する他の実装も存在します。

このチュートリアルでは、Debezium や DMS などの外部システムからの CDC データを使用します。Debezium は変更されたすべての行をキャプチャします。通常、データの変更履歴は Kafka ログに送信され、ファイルとして保存されます。

customersテーブル ( JSON形式) からCDC情報を取り込み、それが正しいことを確認してから、顧客テーブルをレイクハウスに具体化する必要があります。

DebeziumからのCDCの入力

変更ごとに、更新される行のすべてのフィールド ( idfirstnamelastnameemailaddress ) を含む JSON メッセージを受け取ります。メッセージには追加のメタデータも含まれます:

  • operation: 操作コード。通常は ( DELETEAPPENDUPDATE )。
  • operation_date: 各操作アクションの記録の日付とタイムスタンプ。

Debezium などのツールは、変更前の行の値など、より高度な出力を生成できますが、このチュートリアルでは簡潔にするために省略しています。

ステップ 1: パイプラインを作成する

LakeFlow宣言型パイプラインに新しいETLパイプラインを作成して、 CDCデータ ソースをクエリし、ワークスペースにテーブルを生成します。

  1. ワークスペースで、 プラスアイコン。 左上隅に新しいものが表示されます。

  2. ETL パイプライン をクリックします。

  3. パイプラインのタイトルをPipelines with CDC tutorialまたは任意の名前に変更します。

  4. タイトルの下で、書き込み権限を持つカタログとスキーマを選択します。

    コード内でカタログまたはスキーマを指定しない場合は、このカタログとスキーマがデフォルトで使用されます。完全なパスを指定することで、コードは任意のカタログまたはスキーマに書き込むことができます。このチュートリアルでは、ここで指定したデフォルトを使用します。

  5. 詳細オプション から、 空のファイルで開始を 選択します。

  6. コード用のフォルダーを選択します。 [参照] を選択すると、ワークスペース内のフォルダーのリストを参照できます。書き込み権限を持つ任意のフォルダーを選択できます。

    バージョン管理を使用するには、Git フォルダーを選択します。新しいフォルダを作成する必要がある場合は、プラスアイコン。ボタン。

  7. チュートリアルで使用する言語に基づいて、ファイルの言語として Python または SQL を選択します。

  8. [選択] をクリックしてこれらの設定でパイプラインを作成し、 LakeFlow Pipelinesエディターを開きます。

これで、デフォルトのカタログとスキーマを持つ空のパイプラインが作成されました。次に、チュートリアルでインポートするサンプルデータを設定します。

ステップ 2: このチュートリアルにインポートするサンプルデータを作成します

既存のソースから独自のデータをインポートする場合、このステップは必要ありません。 このチュートリアルでは、チュートリアルの例として偽のデータを生成します。Python データ生成スクリプトを実行するためのノートブックを作成します。このコードはサンプル データを生成するために 1 回だけ実行する必要があるため、パイプラインの更新の一部として実行されないパイプラインのexplorationsフォルダー内に作成します。

注記

このコードはFakerを使用してサンプル CDC データを生成します。Faker は自動的にインストールできるため、チュートリアルでは%pip install fakerを使用します。ノートブックに faker への依存関係を設定することもできます。「ノートブックに依存関係を追加する」を参照してください。

  1. LakeFlow Pipelinesエディター内のエディターの左側にあるアセットブラウザサイドバーで、プラスアイコン。 追加 を選択し探索 を選択します。

  2. Setup dataなどの 名前 を付け、 Python を選択します。デフォルトの宛先フォルダ(新しいexplorationsフォルダ)をそのまま使用できます。

  3. [作成]を クリックします。これにより、新しいフォルダーにノートブックが作成されます。

  4. 最初のセルに次のコードを入力します。前の手順で選択したデフォルトのカタログとスキーマと一致するように、 <my_catalog><my_schema>の定義を変更する必要があります。

    Python
    %pip install faker
    # Update these to match the catalog and schema
    # that you used for the pipeline in step 1.
    catalog = "<my_catalog>"
    schema = dbName = db = "<my_schema>"

    spark.sql(f'USE CATALOG `{catalog}`')
    spark.sql(f'USE SCHEMA `{schema}`')
    spark.sql(f'CREATE VOLUME IF NOT EXISTS `{catalog}`.`{db}`.`{volume_name}`')
    volume_folder = f"/Volumes/{catalog}/{db}/raw_data"

    try:
    dbutils.fs.ls(volume_folder+"/customers")
    except:
    print(f"folder doesn't exist, generating the data under {volume_folder}...")
    from pyspark.sql import functions as F
    from faker import Faker
    from collections import OrderedDict
    import uuid
    fake = Faker()
    import random

    fake_firstname = F.udf(fake.first_name)
    fake_lastname = F.udf(fake.last_name)
    fake_email = F.udf(fake.ascii_company_email)
    fake_date = F.udf(lambda:fake.date_time_this_month().strftime("%m-%d-%Y %H:%M:%S"))
    fake_address = F.udf(fake.address)
    operations = OrderedDict([("APPEND", 0.5),("DELETE", 0.1),("UPDATE", 0.3),(None, 0.01)])
    fake_operation = F.udf(lambda:fake.random_elements(elements=operations, length=1)[0])
    fake_id = F.udf(lambda: str(uuid.uuid4()) if random.uniform(0, 1) < 0.98 else None)

    df = spark.range(0, 100000).repartition(100)
    df = df.withColumn("id", fake_id())
    df = df.withColumn("firstname", fake_firstname())
    df = df.withColumn("lastname", fake_lastname())
    df = df.withColumn("email", fake_email())
    df = df.withColumn("address", fake_address())
    df = df.withColumn("operation", fake_operation())
    df_customers = df.withColumn("operation_date", fake_date())
    df_customers.repartition(100).write.format("json").mode("overwrite").save(volume_folder+"/customers")
  5. チュートリアルで使用するデータセットを生成するには、 Shift + Enter キー を押してコードを実行します。

  6. オプション。このチュートリアルで使用されるデータをプレビューするには、次のセルに次のコードを入力してコードを実行します。前のコードからのパスと一致するようにカタログとスキーマを更新する必要があります。

    Python
    # Update these to match the catalog and schema
    # that you used for the pipeline in step 1.
    catalog = "<my_catalog>"
    schema = "<my_schema>"

    display(spark.read.json(f"/Volumes/{catalog}/{schema}/raw_data/customers"))

これにより、チュートリアルの残りの部分で使用できる大規模なデータ セット (偽の CDC データを含む) が生成されます。次のステップでは、Auto Loader を使用してデータを取り込みます。

ステップ 3: Auto Loaderを使用してデータを増分的に取り込む

次のステップは、(偽の) クラウド ストレージから生データをブロンズ レイヤーに取り込むことです。

これは、次のような複数の理由から難しい場合があります。

  • 大規模に運用し、数百万の小さなファイルを取り込む可能性があります。
  • スキーマと JSON 型を推測します。
  • 不正な JSON スキーマを持つ不良レコードを処理します。
  • スキーマ進化 (顧客テーブルの新しい列など) に注意してください。

Auto Loader は、スキーマ推論やスキーマ進化などのこの取り込みを簡素化し、数百万の受信ファイルに拡張します。Auto Loader は、Python ではcloudFiles 、SQL ではSELECT * FROM STREAM read_files(...)を使用して使用でき、さまざまな形式 (JSON、CSV、Apache Avro など) で使用できます。

テーブルをストリーミング テーブルとして定義すると、新しい受信データのみを使用することが保証されます。 ストリーミング テーブルとして定義しない場合、利用可能なすべてのデータがスキャンされ、取り込まれます。 詳細については、ストリーミング テーブルを参照してください。

  1. Auto Loaderを使用して受信CDCデータを取り込むには、次のコードをコピーして、パイプラインで作成されたコード ファイル ( my_transformation.py ) に貼り付けます。 パイプラインの作成時に選択した言語に基づいて、Python または SQL を使用できます。<catalog><schema> 、パイプラインのデフォルトとして設定したものに必ず置き換えてください。
Python
from pyspark import pipelines as dp
from pyspark.sql.functions import *

# Replace with the catalog and schema name that
# you are using:
path = "/Volumes/<catalog>/<schema>/raw_data/customers"


# Create the target bronze table
dp.create_streaming_table("customers_cdc_bronze", comment="New customer data incrementally ingested from cloud object storage landing zone")

# Create an Append Flow to ingest the raw data into the bronze table
@dp.append_flow(
target = "customers_cdc_bronze",
name = "customers_bronze_ingest_flow"
)
def customers_bronze_ingest_flow():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.inferColumnTypes", "true")
.load(f"{path}")
)
  1. クリック再生アイコン。 ファイルを実行する か、 パイプラインを実行して、 接続されたパイプラインの更新を開始します。パイプラインにソース ファイルが 1 つしかない場合、これらは機能的に同等です。

更新が完了すると、エディターはパイプラインに関する情報で更新されます。

  • コードの右側のサイドバーにあるパイプライン グラフ (DAG) には、単一のテーブルcustomers_cdc_bronzeが表示されます。
  • 更新の概要は、パイプライン アセット ブラウザーの上部に表示されます。
  • 生成されたテーブルの詳細は下部のペインに表示され、テーブルを選択してデータを参照できます。

これは、クラウド データからインポートされた生のブロンズ レイヤー データです。 次のステップでは、データをクリーンアップしてシルバーレイヤー テーブルを作成します。

ステップ 4: データ品質を追跡するためのクリーンアップと期待

ブロンズ レイヤーを定義した後、データ品質を制御するための期待値を追加してシルバー レイヤーを作成します。 次の条件を確認してください。

  • ID はnullであってはなりません。
  • CDC 操作タイプは有効である必要があります。
  • json Auto Loader によって適切に読み取られている必要があります。

これらの条件を満たさない行は削除されます。

詳細については、「 パイプラインのエクスペクテーションを使用してデータ品質を管理する 」を参照してください。

  1. パイプラインアセットブラウザのサイドバーから、プラスアイコン。 追加し てから 変換します

  2. 名前 を入力し、ソース コード ファイルの言語 (Python または SQL) を選択します。パイプライン内では言語を組み合わせて使用できるため、このステップではどちらかの言語を選択できます。

  3. クレンジングされたテーブルを含むシルバーレイヤーを作成し、制約を課すには、次のコードをコピーして新しいファイルに貼り付けます (ファイルの言語に基づいてPythonまたはSQLを選択します)。

Python
from pyspark import pipelines as dp
from pyspark.sql.functions import *

dp.create_streaming_table(
name = "customers_cdc_clean",
expect_all_or_drop = {"no_rescued_data": "_rescued_data IS NULL","valid_id": "id IS NOT NULL","valid_operation": "operation IN ('APPEND', 'DELETE', 'UPDATE')"}
)

@dp.append_flow(
target = "customers_cdc_clean",
name = "customers_cdc_clean_flow"
)
def customers_cdc_clean_flow():
return (
spark.read_stream("customers_cdc_bronze")
.select("address", "email", "id", "firstname", "lastname", "operation", "operation_date", "_rescued_data")
)
  1. クリック再生アイコン。 ファイルを実行する か、 パイプラインを実行して、 接続されたパイプラインの更新を開始します。

    ソース ファイルが 2 つあるため、同じことを行うわけではありませんが、この場合、出力は同じになります。

    • パイプライン実行 ステップ 3 のコードを含むパイプライン全体を実行します。 入力データが更新されている場合、そのソースからの変更がブロンズレイヤーに取り込まれます。 これは、データ設定ステップのコードは実行しません。これは、データ設定ステップのコードが探索フォルダ内にあり、パイプラインのソースの一部ではないためです。
    • 実行 file 現在のソース ファイルのみを実行します。 この場合、入力データは更新されずに、キャッシュされたブロンズ テーブルからシルバー データが生成されます。パイプライン コードを作成または編集するときに、このファイルだけを実行すると反復処理が高速化されます。

更新が完了すると、パイプライン グラフに 2 つのテーブル (ブロンズ レイヤーに応じてシルバー レイヤー) が表示され、下部のパネルに両方のテーブルの詳細が表示されることがわかります。 パイプライン アセット ブラウザーの上部には複数の実行時間が表示されますが、最新の実行の詳細のみが表示されます。

次に、 customersテーブルの最終的なゴールドレイヤー バージョンを作成します。

ステップ 5: AUTO CDCフローを使用して顧客テーブルを具体化する

ここまで、テーブルは各ステップで CDC データを渡してきました。ここで、 customersテーブルを作成して、最新のビューを含み、元のテーブルを作成した CDC 操作のリストではなく、元のテーブルのレプリカとなるようにします。

これを手動で実装するのは簡単ではありません。最新の行を保持するには、データの重複排除などを考慮する必要があります。

ただし、 Lakeflow 宣言型パイプラインは、 AUTO CDC 操作でこれらの課題を解決します。

  1. パイプラインアセットブラウザのサイドバーから、プラスアイコン。 追加変換

  2. 名前 を入力し、新しいソース コード ファイルの言語 (Python または SQL) を選択します。このステップではどちらの言語も選択できますが、以下の正しいコードを使用してください。

  3. LakeFlow宣言型パイプラインでAUTO CDCを使用してCDCデータを処理するには、次のコードをコピーして新しいファイルに貼り付けます。

Python
from pyspark import pipelines as dp
from pyspark.sql.functions import *

dp.create_streaming_table(name="customers", comment="Clean, materialized customers")

dp.create_auto_cdc_flow(
target="customers", # The customer table being materialized
source="customers_cdc_clean", # the incoming CDC
keys=["id"], # what we'll be using to match the rows to upsert
sequence_by=col("operation_date"), # de-duplicate by operation date, getting the most recent value
ignore_null_updates=False,
apply_as_deletes=expr("operation = 'DELETE'"), # DELETE condition
except_column_list=["operation", "operation_date", "_rescued_data"],
)
  1. クリック再生アイコン。 ファイルを実行して 、接続されたパイプラインの更新を開始します。

更新が完了すると、パイプライン グラフに、ブロンズ、シルバー、ゴールドの順に 3 つのテーブルが表示されることがわかります。

ステップ 6: タイプ 2 (SCD2) のゆっくりと変化する寸法による更新履歴の追跡

多くの場合、 APPENDUPDATEDELETEから生じるすべての変更を追跡するテーブルを作成する必要があります。

  • 履歴: テーブルに対するすべての変更の履歴を保持します。
  • トレーサビリティ: どの操作が発生したかを確認します。

SCD2 と Lakeflow 宣言型パイプライン

Delta は変更データフロー (CDF) をサポートしており、 table_change SQL および Python でテーブルの変更をクエリできます。ただし、CDF の主な使用例は、パイプライン内の変更をキャプチャすることであり、最初からテーブルの変更の完全なビューを作成することではありません。

順序どおりに実行されないイベントがある場合、実装は特に複雑になります。変更をタイムスタンプ順に並べ、過去に発生した変更を受け取る必要がある場合は、SCD テーブルに新しいエントリを追加し、以前のエントリを更新する必要があります。

LakeFlow宣言型パイプラインを使用すると、この複雑さが解消され、最初からのすべての変更を含む別のテーブルを作成できるようになります。 このテーブルは、必要に応じて特定のパーティション/zorder 列を使用して大規模に使用できます。順序が乱れたフィールドは_sequence_byに基づいてすぐに処理されます

SCD2 テーブルを作成するには、SQL ではオプションSTORED AS SCD TYPE 2 、Python ではオプションstored_as_scd_type="2"を使用する必要があります。

注記

このオプションを使用して、機能が追跡する列を制限することもできます。 TRACK HISTORY ON {columnList | EXCEPT(exceptColumnList)}

  1. パイプラインアセットブラウザのサイドバーから、プラスアイコン。 追加変換

  2. 名前 を入力し、新しいソース コード ファイルの言語 (Python または SQL) を選択します。

  3. 次のコードをコピーして新しいファイルに貼り付けます。

Python
from pyspark import pipelines as dp
from pyspark.sql.functions import *

# create the table
dp.create_streaming_table(
name="customers_history", comment="Slowly Changing Dimension Type 2 for customers"
)

# store all changes as SCD2
dp.create_auto_cdc_flow(
target="customers_history",
source="customers_cdc_clean",
keys=["id"],
sequence_by=col("operation_date"),
ignore_null_updates=False,
apply_as_deletes=expr("operation = 'DELETE'"),
except_column_list=["operation", "operation_date", "_rescued_data"],
stored_as_scd_type="2",
) # Enable SCD2 and store individual updates
  1. クリック再生アイコン。 ファイルを実行して 、接続されたパイプラインの更新を開始します。

更新が完了すると、パイプライン グラフには、これもシルバーレイヤー テーブルに依存する新しいcustomers_historyテーブルが含まれ、下部パネルには 4 つのテーブルすべての詳細が表示されます。

ステップ 7: 誰の情報を最も多く変更したかを追跡するマテリアライズドビューを作成する

テーブルcustomers_historyには、ユーザーが自分の情報に対して行ったすべての変更履歴が含まれます。ゴールドレイヤーで簡単なマテリアライズドビューを作成し、誰の情報を最も多く変更したかを追跡します。 これは、実際のシナリオにおける不正行為検出分析やユーザー推奨に使用できます。さらに、SCD2 を使用して変更を適用すると重複がすでに削除されているため、ユーザー ID ごとに行を直接カウントできます。

  1. パイプラインアセットブラウザのサイドバーから、プラスアイコン。 追加変換

  2. 名前 を入力し、新しいソース コード ファイルの言語 (Python または SQL) を選択します。

  3. 次のコードをコピーして新しいソース ファイルに貼り付けます。

Python
from pyspark import pipelines as dp
from pyspark.sql.functions import *

@dp.table(
name = "customers_history_agg",
comment = "Aggregated customer history"
)
def customers_history_agg():
return (
spark.read("customers_history")
.groupBy("id")
.agg(
count("address").alias("address_count"),
count("email").alias("email_count"),
count("firstname").alias("firstname_count"),
count("lastname").alias("lastname_count")
)
)
  1. クリック再生アイコン。 ファイルを実行して 、接続されたパイプラインの更新を開始します。

更新が完了すると、パイプライン グラフにcustomers_historyテーブルに依存する新しいテーブルが追加され、下部のパネルで確認できます。パイプラインが完成しました。完全な 実行パイプライン を実行してテストできます。残っているステップは、パイプラインを定期的に更新するようにスケジュールすることだけです。

ステップ 8: ETLパイプラインを実行するジョブを作成する

次に、 Databricksジョブを使用してパイプラインのデータ取り込み、処理、分析ステップを自動化するワークフローを作成します。

  1. エディターの上部にある [スケジュール] ボタンを選択します。
  2. [スケジュール] ダイアログが表示されたら、 [スケジュールの追加] を 選択します。
  3. これにより、[ 新しいスケジュール] ダイアログが開き、スケジュールに従ってパイプラインを実行するジョブを作成できます。
  4. 必要に応じて、ジョブに名前を付けます。
  5. デフォルトでは、スケジュールは 1 日に 1 回実行されるように設定されています。このデフォルトを受け入れるか、独自のスケジュールを設定することができます。 「詳細」 を選択すると、ジョブを実行する特定の時間を設定するオプションが提供されます。 その他のオプション を選択すると、ジョブの実行時に通知を作成できます。
  6. 変更を適用してジョブを作成するには、 [作成] を選択します。

これで、ジョブは毎日実行され、パイプラインが最新の状態に保たれます。スケジュールのリストを表示するには、もう一度 「スケジュール」 を選択します。このダイアログから、スケジュールの追加、編集、削除など、パイプラインのスケジュールを管理できます。

スケジュール (またはジョブ) の名前をクリックすると、 [ジョブとパイプライン] リストのジョブのページに移動します。そこから、実行の履歴を含むジョブ実行に関する詳細を表示したり、 今すぐ実行 ボタンを使用してジョブをすぐに実行したりできます。

ジョブの実行の詳細については、「Lakeflowジョブのモニタリングと可観測性」を参照してください。

その他のリソース