チェンジデータキャプチャ (CDC)とは?

チェンジデータキャプチャ (CDC) は、ソース システム内のデータに加えられた変更 (挿入、更新、削除など) をキャプチャするデータ統合パターンです。 これらの変更はリストとして表され、一般に CDC フィードと呼ばれます。 ソースデータセット全体を読み取るのではなく、CDC フィードを操作すると、データをはるかに高速に処理できます。 SQL Server、MySQL、Oracle などのトランザクション データベースは、CDC フィードを生成します。 Delta テーブルは、チェンジデータフィード(CDF)と呼ばれる独自の CDC フィードを生成します。

次の図は、従業員データを含むソース テーブルの行が更新されると、変更 のみ を含む CDC フィードに新しい行セットが生成されることを示しています。 CDC フィードの各行には、通常、 UPDATE などの操作や、CDC フィードの各行を確定的に順序付けするために使用できる列など、追加のメタデータが含まれており、順不同の更新を処理できます。 たとえば、次の図の sequenceNum 列によって、CDC フィードの行の順序が決まります。

チェンジデータキャプチャ overview.

チェンジデータフィードの処理:最新のデータのみを保持するか、データの履歴バージョンを保持するか

チェンジデータフィードの処理は、 slowly changing dimensions (SCD) と呼ばれます。 CDC フィードを処理する際には、次の選択肢があります。

  • 最新のデータのみを保持しますか (つまり、既存のデータを上書きしますか)? これは SCD タイプ 1 として知られています。

  • それとも、データに対する変更の履歴を保持しますか? これは SCDタイプ2として知られています。

SCD Type 1 処理では、変更が発生するたびに古いデータを新しいデータで上書きします。 これは、変更の履歴が保持されないことを意味します。 最新バージョンのデータのみが利用可能です。 これは簡単なアプローチであり、エラーの修正や顧客Eメールアドレスなどの重要でないフィールドの更新など、変更の履歴が重要でない場合によく使用されます。

チェンジデータキャプチャ SCD Type 1 overview.

SCD Type 2 処理では、時間の経過と共に異なるバージョンのデータをキャプチャするための追加のレコードを作成することにより、データ変更の履歴レコードが保持されます。 データの各バージョンにはタイムスタンプが付けられるか、メタデータでタグ付けされるため、ユーザーは変更がいつ発生したかを追跡できます。 これは、分析目的で顧客の住所の経時的な変化を追跡するなど、データの進化を追跡することが重要な場合に便利です。

チェンジデータキャプチャ SCD Type 2 overview.

Delta Live Tables を使用した SCD Type 1 および Type 2 処理の例

このセクションの例では、SCD Type 1 と Type 2 の使用方法を示します。

ステップ 1: サンプル データを準備する

この例では、サンプルの CDC フィードを生成します。 まず、ノートブックを作成し、次のコードを貼り付けます。 コード ブロックの先頭にある変数を、テーブルとビューを作成するアクセス許可があるカタログとスキーマに更新します。

このコードでは、複数の変更レコードを含む新しい Delta テーブルを作成します。 スキーマは次のとおりです。

  • id - 整数、この従業員の一意の識別子

  • name - 文字列、従業員の名前

  • age - 整数、従業員の年齢

  • operation - 変更タイプ( INSERTUPDATEDELETEなど)

  • sequenceNum - Integer は、ソース データ内の CDC イベントの論理的な順序を識別します。 Delta Live Tables では、このシーケンス処理を使用して、順不同で到着した変更イベントが処理されます。

# update these to the catalog and schema where you have permissions
# to create tables and views.

catalog = "mycatalog"
schema = "myschema"
employees_cdf_table = "employees_cdf"

def write_employees_cdf_to_delta():
 data = [
   (1, "Alex", "chef", "FR", "INSERT", 1),
   (2, "Jessica", "owner", "US", "INSERT", 2),
   (3, "Mikhail", "security", "UK", "INSERT", 3),
   (4, "Gary", "cleaner", "UK", "INSERT", 4),
   (5, "Chris", "owner", "NL", "INSERT", 6),
   # out of order update, this should be dropped from SCD Type 1
   (5, "Chris", "manager", "NL", "UPDATE", 5)
   (6, "Pat", "mechanic", "NL", "DELETE", 8),
   (6, "Pat", "mechanic", "NL", "INSERT", 7)
 ]
 columns = ["id", "name", "role", "country", "operation", "sequenceNum"]
  df = spark.createDataFrame(data, columns)
 df.write.format("delta").mode("overwrite").saveAsTable(f"{catalog}.{schema}.{employees_cdf_table}")

write_employees_cdf_to_delta()

このデータは、次の SQL コマンドを使用してプレビューできます。

SELECT *
FROM mycatalog.myschema.employees_cdf

ステップ2:SCD Type 1を使用して最新のデータのみを保持する

APPLY CHANGESAPIDelta Live Tablesパイプラインの を使用して、チェンジデータフィードをSCD Type 1テーブルに処理することをお勧めします。

  1. 新しいノートブックを作成します。

  2. 次のコードを貼り付けます。

  3. パイプラインを作成して接続します

employees_cdf 関数は、チェンジデータキャプチャ処理に使用する APPLY CHANGES APIが入力として変更のストリームを期待しているため、上記で作成したテーブルをストリームとして読み取ります。 このストリームをテーブルに具体化したくないため、 decorator @dlt.view でラップします。

次に、 dlt.create_target_table を使用して、このチェンジデータフィードの処理結果を含むストリーミングテーブルを作成します。

最後に、 dlt.apply_changes を使用してチェンジデータフィードを処理します。 各引数を見てみましょう。

  • target - 前に定義したターゲット ストリーミング テーブル。

  • source - 以前に定義した変更レコードのストリームに対するビュー。

  • keys - 変更フィード内の一意の行を識別します。 id を一意の識別子として使用しているため、唯一の識別列として id を指定するだけです。

  • sequence_by - ソース データ内の CDC イベントの論理順序を指定する列名。 この順序付けは、順不同で到着する変更イベントを処理するために必要です。 シーケンシングカラムとして sequenceNum を指定します。

  • apply_as_deletes - サンプル データには削除操作が含まれているため、 apply_as_deletes を使用して、CDC イベントをアップサートではなく DELETE として扱うタイミングを示します。

  • except_column_list - ターゲット テーブルに含めない列の一覧が含まれています。 この例では、この引数を使用して、 sequenceNumoperationを除外します。

  • stored_as_scd_type - 使用する SCD タイプを示します。

import dlt
from pyspark.sql.functions import col, expr, lit, when
from pyspark.sql.types import StringType, ArrayType

catalog = "mycatalog"
schema = "myschema"
employees_cdf_table = "employees_cdf"
employees_table_current = "employees_current"
employees_table_historical = "employees_historical"

@dlt.view
def employees_cdf():
 return spark.readStream.format("delta").table(f"{catalog}.{schema}.{employees_cdf_table}")

dlt.create_target_table(f"{catalog}.{schema}.{employees_table_current}")

dlt.apply_changes(
 target=f"{catalog}.{schema}.{employees_table_current}",
 source=employees_cdf_table,
 keys=["id"],
 sequence_by=col("sequenceNum"),
 apply_as_deletes=expr("operation = 'DELETE'"),
 except_column_list = ["operation", "sequenceNum"],
 stored_as_scd_type = 1
)

このパイプラインを実行するには、[ 開始] をクリックします。

次に、SQL エディターで次のクエリを実行して、変更レコードが正しく処理されたことを確認します。

SELECT *
FROM mycatalog.myschema.employees_current

注:

従業員 Chris の順不同の更新は、ロールがマネージャーではなく所有者に設定されたままであるため、正しく削除されました。

チェンジデータキャプチャ SCD Type 1 の例。

ステップ3:SCDタイプ2を使用してヒストリカルデータを保持する

この例では、従業員レコードに対する変更の完全な履歴を含む employees_historicalという 2 番目のターゲットテーブルを作成します。

このコードをパイプラインに追加します。 ここでの唯一の違いは、 stored_as_scd_type が 1 ではなく 2 に設定されていることです。

dlt.create_target_table(f"{catalog}.{schema}.{employees_table_historical}")

dlt.apply_changes(
 target=f"{catalog}.{schema}.{employees_table_historical}",
 source=employees_cdf_table,
 keys=["id"],
 sequence_by=col("sequenceNum"),
 apply_as_deletes=expr("operation = 'DELETE'"),
 except_column_list = ["operation", "sequenceNum"],
 stored_as_scd_type = 2
)

このパイプラインを実行するには、[ 開始] をクリックします。

次に、SQL エディターで次のクエリを実行して、変更レコードが正しく処理されたことを確認します。

SELECT *
FROM mycatalog.myschema.employees_historical

削除された従業員 (Pat など) を含む、従業員に対するすべての変更が表示されます。

チェンジデータキャプチャ SCD Type 2 の例。

ステップ 4: リソースをクリーンアップする

完了したら、次の手順に従ってリソースをクリーンアップします。

  1. パイプラインを削除します。note:: このパイプラインを削除すると、 employees テーブルと employees_historical テーブルが自動的に削除されます。 #. [ パイプライン] をクリックします。 #. ケバブメニューをクリックし、[ 削除]をクリックします。

  2. ノートブックを削除します。

  3. チェンジデータフィードを含むテーブルを削除します。

    1. [新しい>クエリ] をクリックします。

    2. 次の SQL コードを貼り付けて実行し、カタログとスキーマを必要に応じて調整します。

DROP TABLE mycatalog.myschema.employees_cdf

チェンジデータキャプチャMERGE INTOforeachBatchを使用することの欠点

Databricks には、foreachBatch API と共に行を Delta テーブルにアップサートするために使用できる MERGE INTO SQL コマンドが用意されています。このセクションでは、この手法を単純なユース ケースに使用する方法について説明しますが、この手法を実際のシナリオに適用すると、ますます複雑で脆弱になります。

この例では、前の例で使用したのと同じサンプル チェンジデータフィードを使用します。

MERGE INTOforeachBatch による素朴な実装

ノートブックを作成し、次のコードをコピーします。 必要に応じて、 catalogschema、および employees_table 変数を変更します。 catalog 変数と schema 変数は、テーブルを作成できる Unity Catalog 内の場所に設定する必要があります。

ノートブックを実行すると、次の処理が行われます。

  • create_tableにターゲット テーブルを作成します。この手順を自動的に処理する apply_changesとは異なり、スキーマを指定する必要があります。

  • チェンジデータフィードをストリームとして読み取ります。 各マイクロバッチは、MERGE INTOコマンドを実行するupsertToDeltaメソッドを使用して処理されます。

catalog = "jobs"
schema = "myschema"
employees_cdf_table = "employees_cdf"
employees_table = "employees_merge"


def upsertToDelta(microBatchDF, batchId):
 microBatchDF.createOrReplaceTempView("updates")
 microBatchDF.sparkSession.sql(f"""
   MERGE INTO {catalog}.{schema}.{employees_table} t
   USING updates s
   ON s.id = t.id
   WHEN MATCHED THEN UPDATE SET *
   WHEN NOT MATCHED THEN INSERT *
 """)


def create_table():
 spark.sql(f"DROP TABLE IF EXISTS {catalog}.{schema}.{employees_table}")
  spark.sql(f"""
   CREATE TABLE IF NOT EXISTS {catalog}.{schema}.{employees_table}
   (id INT, name STRING, age INT, country STRING)
 """)


create_table()


cdcData = spark.readStream.table(f"{catalog}.{schema}.{employees_cdf_table}")


cdcData.writeStream \
.foreachBatch(upsertToDelta) \
.outputMode("append") \
.start()

結果を表示するには、次の SQL クエリを実行します。

SELECT *
FROM mycatalog.myschema.employees_merge

残念ながら、次に示すように、結果は正しくありません。

チェンジデータキャプチャ MERGE INTO 例です。

同じマイクロバッチ内の同じキーに対する複数の更新

最初の問題は、コードが同じマイクロバッチ内の同じキーに対する複数の更新を処理しないことです。 たとえば、 INSERT を使用して従業員 Chris を挿入し、そのロールを [オーナー] から [マネージャー] に更新したとします。 これにより、1 つの行になるはずですが、実際には 2 つの行があります。

マイクロバッチで同じキーに複数の更新がある場合、どちらの変更が優先されますか?

チェンジデータキャプチャ 同じマイクロバッチの例で同じキーに対する複数の更新。

ロジックはより複雑になります。 次のコード例では、最新の行を sequenceNum 取得し、そのデータのみを次のようにターゲット テーブルにマージします。

  • グループ を主キーで指定し、 id.

  • そのキーのバッチで最大 sequenceNum を持つ行のすべての列を取ります。

  • 行を分解して戻します。

次に示すように upsertToDelta メソッドを更新し、コードを実行します。

def upsertToDelta(microBatchDF, batchId):
 microBatchDF = microBatchDF.groupBy("id").agg(
   max_by(struct("*"), "sequenceNum").alias("row")
 ).select("row.*").createOrReplaceTempView("updates")


 spark.sql(f"""
   MERGE INTO {catalog}.{schema}.{employees_table} t
   USING updates s
   ON s.id = t.id
   WHEN MATCHED THEN UPDATE SET *
   WHEN NOT MATCHED THEN INSERT *
 """)

ターゲット テーブルをクエリすると、Chris という名前の従業員が正しいロールを持っていることがわかりますが、ターゲット テーブルにはまだ削除済みレコードが表示されているため、解決すべき他の問題があります。

チェンジデータキャプチャ 同じマイクロバッチの例の結果で同じキーに複数の更新をしました。

マイクロバッチ間での順不同の更新

このセクションでは、マイクロバッチ間での順不同の更新の問題について説明します。 次の図は、Chris の行の最初のマイクロバッチに UPDATE 操作があり、その後に続くマイクロバッチに INSERT がある場合はどうなるかという問題を示しています。 コードはこれを正しく処理しません。

複数のマイクロバッチ間で同じキーに対する順不同の更新がある場合、どちらの変更が優先されますか?

チェンジデータキャプチャ マイクロバッチ間での順不同の更新の例。

これを修正するには、次のようにコードを展開して、各行にバージョンを格納します。

  • 行が最後に更新されたときの sequenceNum を格納します。

  • 新しい行ごとに、タイムスタンプが格納されているタイムスタンプよりも大きいかどうかを確認し、次のロジックを適用します。

    • これより大きい場合は、ターゲットからの新しいデータを使用します。

    • それ以外の場合は、データをソースに保持します。

まず、 createTable メソッドを更新して sequenceNum を格納します。これは、各行のバージョン管理に使用するためです。

def create_table():
 spark.sql(f"DROP TABLE IF EXISTS {catalog}.{schema}.{employees_table}")
  spark.sql(f"""
   CREATE TABLE IF NOT EXISTS {catalog}.{schema}.{employees_table}
   (id INT, name STRING, age INT, country STRING, sequenceNum INT)
 """)

次に、行バージョンを処理するように upsertToDelta を更新します。 MERGE INTOUPDATE SET 句は、すべての列を個別に処理する必要があります。

def upsertToDelta(microBatchDF, batchId):
 microBatchDF = microBatchDF.groupBy("id").agg(
   max_by(struct("*"), "sequenceNum").alias("row")
 ).select("row.*").createOrReplaceTempView("updates")


 spark.sql(f"""
   MERGE INTO {catalog}.{schema}.{employees_table} t
   USING updates s
   ON s.id = t.id
   WHEN MATCHED THEN UPDATE SET
     name=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.name ELSE t.name END,
     age=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.age ELSE t.age END,
     country=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.country ELSE t.country END
   WHEN NOT MATCHED THEN INSERT *
 """)

削除の処理

残念ながら、コードにはまだ問題があります。 DELETE操作は処理しません。これは、従業員の Pat がまだターゲット テーブルに存在するという事実からも明らかです。

削除が同じマイクロバッチで到着すると仮定しましょう。 これらを処理するには、次に示すように、 upsertToDelta メソッドを再度更新して、変更データ レコードが削除を示しているときに行を削除します。

def upsertToDelta(microBatchDF, batchId):
 microBatchDF = microBatchDF.groupBy("id").agg(
   max_by(struct("*"), "sequenceNum").alias("row")
 ).select("row.*").createOrReplaceTempView("updates")


 spark.sql(f"""
   MERGE INTO {catalog}.{schema}.{employees_table} t
   USING updates s
   ON s.id = t.id
   WHEN MATCHED AND s.operation = 'DELETE' THEN DELETE
   WHEN MATCHED THEN UPDATE SET
     name=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.name ELSE t.name END,
     age=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.age ELSE t.age END,
     country=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.country ELSE t.country END
   WHEN NOT MATCHED THEN INSERT *
 """)

削除後に順不同で到着する更新の処理

残念ながら、上記のコードは、 DELETE の後にマイクロバッチ間で順不同の UPDATE が続くケースを処理しないため、まだ完全には正しくありません。

チェンジデータキャプチャは、削除後に順不同で到着する更新を処理します。

このケースを処理するアルゴリズムは、後続の順不同の更新を処理できるように、削除を記憶する必要があります。 これを行うには、次の手順を実行します。

  • 行をすぐに削除する代わりに、タイムスタンプまたは sequenceNumを使用して行を論理的に削除します。 論理的に削除された行は 廃棄されます

  • すべてのユーザーを、トゥームストーンを除外するビューにリダイレクトします。

  • 時間の経過とともに廃棄石を削除するクリーンアップ ジョブを作成します。

次のコードを使用します。

def upsertToDelta(microBatchDF, batchId):
 microBatchDF = microBatchDF.groupBy("id").agg(
   max_by(struct("*"), "sequenceNum").alias("row")
 ).select("row.*").createOrReplaceTempView("updates")


 spark.sql(f"""
   MERGE INTO {catalog}.{schema}.{employees_table} t
   USING updates s
   ON s.id = t.id
   WHEN MATCHED AND s.operation = 'DELETE' THEN UPDATE SET DELETED_AT=now()
   WHEN MATCHED THEN UPDATE SET
     name=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.name ELSE t.name END,
     age=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.age ELSE t.age END,
     country=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.country ELSE t.country END
   WHEN NOT MATCHED THEN INSERT *
 """)

ユーザーはターゲット テーブルを直接使用できないため、クエリを実行できるビューを作成します。

CREATE VIEW employees_v AS
SELECT * FROM employees_merge
WHERE DELETED_AT = NULL

最後に、廃棄された行を定期的に削除するクリーンアップ ジョブを作成します。

DELETE FROM employees_merge
WHERE DELETED_AT < now() - INTERVAL 1 DAY