はじめに: GDPR コンプライアンスのためのデータの準備
EU 一般データ保護規則(GDPR)およびカリフォルニア州消費者プライバシー法(CCPA)は、プライバシーおよびデータセキュリティに関する規制であり、企業は、顧客について収集されたすべての個人を特定できる情報(PII)を明示的な要求に応じて永久的かつ完全に削除することを義務付けています。 「忘れられる権利」(RTBF)または「データ消去の権利」とも呼ばれ、削除リクエストは指定された期間(たとえば、1暦月以内)に実行する必要があります。
この記事では、Databricks に格納されているデータに RTBF を実装する方法について説明します。 この記事に含まれる例では、電子商取引会社のデータセットをモデル化し、ソース テーブルのデータを削除し、これらの変更をダウンストリーム テーブルに反映する方法を示します。
アップストリームソースのデータの削除
GDPR と CCPA は、Kafka やファイル、データベースなど、Delta Lake 以外のソースのデータを含む、すべてのデータに適用されます。 Databricks でデータを削除するだけでなく、キューやクラウド ストレージなどのアップストリーム ソースのデータも削除する必要があります。
難読化よりも完全な削除が望ましい
データを削除するか、難読化するかを選択する必要があります。 難読化は、仮名化、データマスキングなどを使用して実装できます。 ただし、実際には、再識別のリスクを排除するにはPIIデータを完全に削除する必要があるため、最も安全なオプションは完全な消去です。
ブロンズレイヤーのデータを削除し、削除をシルバーレイヤーとゴールドレイヤーに反映します
GDPR と CCPA のコンプライアンスを開始するには、まずブロンズレイヤーのデータを削除し、削除リクエストを含む制御テーブルをクエリするスケジュールされたジョブを実行することをお勧めします。 ブロンズレイヤーからデータを削除した後、変更をシルバーレイヤーとゴールドレイヤーに反映できます。
テーブルを定期的に管理して、履歴ファイルからデータを削除する
デフォルトでは、Delta Lake は削除されたレコードを含むテーブル履歴を 30 日間保持し、タイムトラベルとロールバックに使用できるようにします。 ただし、以前のバージョンのデータを削除しても、データはクラウドストレージに保持されます。 したがって、テーブルとビューを定期的に管理して、以前のバージョンのデータを削除する必要があります。 推奨される方法は、Unity Catalog マネージドテーブルの予測的最適化であり、ストリーミングテーブルとマテリアライズドビューの両方をインテリジェントに維持します。Delta Live Tables は、ストリーミング テーブルと具体化されたビューが更新されてから 24 時間以内にメンテナンス タスクを自動的に実行します。
予測的最適化 または Delta Live Tablesを使用していない場合は、 Delta テーブルで VACUUM
コマンドを実行して、以前のバージョンのデータを完全に削除する必要があります。 デフォルトでは、これにより、タイムトラベル機能が構成 可能な設定である7日間に短縮され、問題のデータの履歴バージョンもクラウドストレージから削除されます。
ブロンズレイヤーからPIIデータを削除する
レイクハウスの設計によっては、PII ユーザー データと非 PII ユーザー データの間のリンクを切断できる場合があります。 たとえば、Eメールのような自然キーの代わりに user_id
などの非自然キーを使用している場合は、PIIデータを削除して、非PIIデータをそのままにすることができます。
この記事の残りの部分では、すべてのブロンズ テーブルからユーザー レコードを完全に削除することで RTBF を処理します。 データを削除するには、次のコードに示すように、 DELETE
コマンドを実行します。
spark.sql("DELETE FROM bronze.users WHERE user_id = 5")
一度に多数のレコードをまとめて削除する場合は、 MERGE
コマンドを使用することをお勧めします。 次のコードでは、user_id
列を含む gdpr_control_table
という制御テーブルがあることを前提としています。このテーブルに「忘れられる権利」を要求したすべてのユーザーのレコードをこのテーブルに挿入します。
MERGE
コマンドは、行を一致させるための条件を指定します。この例では、user_id
に基づいて、target_table
のレコードと gdpr_control_table
のレコードを照合します。一致するものがある場合 (たとえば、target_table
とgdpr_control_table
の両方でuser_id
がある場合)、target_table
の行は削除されます。この MERGE
コマンドが成功したら、制御テーブルを更新して、要求が処理されたことを確認します。
spark.sql("""
MERGE INTO target
USING (
SELECT user_id
FROM gdpr_control_table
) AS source
ON target.user_id = source.user_id
WHEN MATCHED THEN DELETE
""")
ブロンズからシルバー、ゴールドレイヤーへの変更の伝播
ブロンズレイヤーでデータを削除した後、シルバーレイヤーとゴールドレイヤーのテーブルに変更を反映させる必要があります。
マテリアライズドビュー:削除を自動的に処理
マテリアライズドビューは、ソースの削除を自動的に処理します。 したがって、ソースから削除されたデータがマテリアライズドビューに含まれないようにするために、特別な操作を行う必要はありません。 マテリアライズドビューを更新し、メンテナンスを実行して、削除が完全に処理されるようにする必要があります。
マテリアライズドビューは、完全な再計算よりもコストがかからない場合はインクリメンタル計算を使用しますが、正確性を犠牲にすることはありませんので、常に正しい結果を返します。 つまり、ソースからデータを削除すると、マテリアライズドビューが完全に再計算される可能性があります。
ストリーミング tables: skipChangeCommits を使用してデータを削除し、ストリーミング ソースを読み取ります
ストリーミング テーブルは、追加専用データのみを処理できます。 つまり、ストリーミング テーブルでは、新しいデータ行のみがストリーミング ソースに表示されることが想定されています。 ストリーミングに使用されるソーステーブルのレコードの更新や削除など、その他の操作はサポートされておらず、ストリームが中断されます。
ストリーミングは新しいデータのみを処理するため、データの変更を自分で処理する必要があります。 推奨される方法は、(1) ストリーミング ソースのデータを削除し、(2) ストリーミング テーブルからデータを削除してから、(3) ストリーミング読み取りを更新して skipChangeCommits
を使用することです。 このフラグは、ストリーミング テーブルが挿入以外のもの (更新や削除など) をスキップする必要があることを Databricks に示します。
または、(1) ソースからデータを削除し、(2) ストリーミング テーブルから削除してから、(3) ストリーミング テーブルを完全に更新することもできます。 ストリーミングテーブルを完全に更新すると、テーブルのストリーミング状態がクリアされ、すべてのデータが再処理されます。 保持期間を過ぎたアップストリームデータソース (たとえば、7 日後にデータを期限切れにする Kafka トピック) は再度処理されないため、データが失われる可能性があります。 このオプションは、ヒストリカルデータが使用可能で、再度処理してもコストがかからないシナリオでのみ、ストリーミングテーブルにお勧めします。
例: eコマース企業の GDPR と CCPA のコンプライアンス
次の図は、 GDPR & CCPA コンプライアンスを実装する必要がある電子商取引会社のメダリオンアーキテクチャを示しています。 ユーザーのデータが削除されても、ダウンストリーム集計でユーザーのアクティビティをカウントしたい場合があります。
ブロンズレイヤー
users
- ユーザーディメンション。 PII が含まれています (例: Eメール アドレス)。clickstream
- [イベント]をクリックします。 PII (IP アドレスなど) が含まれています。gdpr_requests
- 「忘れられる権利」の対象となるユーザーIDを含むコントロールテーブル。
シルバーレイヤー
clicks_hourly
- 1時間あたりの合計クリック数。 ユーザーを削除しても、そのユーザーのクリック数はカウントされます。clicks_by_user
- ユーザーあたりの合計クリック数。 ユーザーを削除する場合、そのユーザーのクリック数はカウントされません。
ゴールドレイヤー
revenue_by_user
- 各ユーザーの合計支出。
ステップ 1: テーブルにサンプル データを設定する
次のコードは、これら 2 つのテーブルを作成します。
source_users
ユーザーに関するディメンション データが含まれます。 このテーブルには、email
という PII 列が含まれています。source_clicks
ユーザーが実行したアクティビティに関するイベント データが含まれます。 これには、ip_address
という PII 列が含まれています。
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, MapType, DateType
catalog = "users"
schema = "name"
# Create table containing sample users
users_schema = StructType([
StructField('user_id', IntegerType(), False),
StructField('username', StringType(), True),
StructField('email', StringType(), True),
StructField('registration_date', StringType(), True),
StructField('user_preferences', MapType(StringType(), StringType()), True)
])
users_data = [
(1, 'alice', 'alice@example.com', '2021-01-01', {'theme': 'dark', 'language': 'en'}),
(2, 'bob', 'bob@example.com', '2021-02-15', {'theme': 'light', 'language': 'fr'}),
(3, 'charlie', 'charlie@example.com', '2021-03-10', {'theme': 'dark', 'language': 'es'}),
(4, 'david', 'david@example.com', '2021-04-20', {'theme': 'light', 'language': 'de'}),
(5, 'eve', 'eve@example.com', '2021-05-25', {'theme': 'dark', 'language': 'it'})
]
users_df = spark.createDataFrame(users_data, schema=users_schema)
users_df.write..mode("overwrite").saveAsTable(f"{catalog}.{schema}.source_users")
# Create table containing clickstream (i.e. user activities)
from pyspark.sql.types import TimestampType
clicks_schema = StructType([
StructField('click_id', IntegerType(), False),
StructField('user_id', IntegerType(), True),
StructField('url_clicked', StringType(), True),
StructField('click_timestamp', StringType(), True),
StructField('device_type', StringType(), True),
StructField('ip_address', StringType(), True)
])
clicks_data = [
(1001, 1, 'https://example.com/home', '2021-06-01T12:00:00', 'mobile', '192.168.1.1'),
(1002, 1, 'https://example.com/about', '2021-06-01T12:05:00', 'desktop', '192.168.1.1'),
(1003, 2, 'https://example.com/contact', '2021-06-02T14:00:00', 'tablet', '192.168.1.2'),
(1004, 3, 'https://example.com/products', '2021-06-03T16:30:00', 'mobile', '192.168.1.3'),
(1005, 4, 'https://example.com/services', '2021-06-04T10:15:00', 'desktop', '192.168.1.4'),
(1006, 5, 'https://example.com/blog', '2021-06-05T09:45:00', 'tablet', '192.168.1.5')
]
clicks_df = spark.createDataFrame(clicks_data, schema=clicks_schema)
clicks_df.write.format("delta").mode("overwrite").saveAsTable(f"{catalog}.{schema}.source_clicks")
ステップ 2: PII データを処理するパイプラインを作成する
次のコードは、上記のメダリオンアーキテクチャの bronze、silver、および goldlayer を作成します。
import dlt
from pyspark.sql.functions import col, concat_ws, count, countDistinct, avg, when, expr
catalog = "users"
schema = "name"
# ----------------------------
# Bronze Layer - Raw Data Ingestion
# ----------------------------
@dlt.table(
name=f"{catalog}.{schema}.users_bronze",
comment='Raw users data loaded from source'
)
def users_bronze():
return (
spark.readStream.table(f"{catalog}.{schema}.source_users")
)
@dlt.table(
name=f"{catalog}.{schema}.clicks_bronze",
comment='Raw clicks data loaded from source'
)
def clicks_bronze():
return (
spark.readStream.table(f"{catalog}.{schema}.source_clicks")
)
# ----------------------------
# Silver Layer - Data Cleaning and Enrichment
# ----------------------------
@dlt.table(
name=f"{catalog}.{schema}.users_silver",
comment='Cleaned and standardized users data'
)
@dlt.expect_or_drop('valid_email', "email IS NOT NULL")
def users_silver():
return (
spark.readStream
.table(f"{catalog}.{schema}.users_bronze")
.withColumn('registration_date', col('registration_date').cast('timestamp'))
.dropDuplicates(['user_id', 'registration_date'])
.select('user_id', 'username', 'email', 'registration_date', 'user_preferences')
)
@dlt.table(
name=f"{catalog}.{schema}.clicks_silver",
comment='Cleaned and standardized clicks data'
)
@dlt.expect_or_drop('valid_click_timestamp', "click_timestamp IS NOT NULL")
def clicks_silver():
return (
spark.readStream
.table(f"{catalog}.{schema}.clicks_bronze")
.withColumn('click_timestamp', col('click_timestamp').cast('timestamp'))
.withWatermark('click_timestamp', '10 minutes')
.dropDuplicates(['click_id'])
.select('click_id', 'user_id', 'url_clicked', 'click_timestamp', 'device_type', 'ip_address')
)
@dlt.table(
name=f"{catalog}.{schema}.user_clicks_silver",
comment='Joined users and clicks data on user_id'
)
def user_clicks_silver():
# Read users_silver as a static DataFrame
users = spark.read.table(f"{catalog}.{schema}.users_silver")
# Read clicks_silver as a streaming DataFrame
clicks = spark.readStream \
.table('clicks_silver')
# Perform the join
joined_df = clicks.join(users, on='user_id', how='inner')
return joined_df
# ----------------------------
# Gold Layer - Aggregated and Business-Level Data
# ----------------------------
@dlt.table(
name=f"{catalog}.{schema}.user_behavior_gold",
comment='Aggregated user behavior metrics'
)
def user_behavior_gold():
df = spark.read.table(f"{catalog}.{schema}.user_clicks_silver")
return (
df.groupBy('user_id')
.agg(
count('click_id').alias('total_clicks'),
countDistinct('url_clicked').alias('unique_urls')
)
)
@dlt.table(
name=f"{catalog}.{schema}.marketing_insights_gold",
comment='User segments for marketing insights'
)
def marketing_insights_gold():
df = spark.read.table(f"{catalog}.{schema}.user_behavior_gold")
return (
df.withColumn(
'engagement_segment',
when(col('total_clicks') >= 100, 'High Engagement')
.when((col('total_clicks') >= 50) & (col('total_clicks') < 100), 'Medium Engagement')
.otherwise('Low Engagement')
)
)
ステップ 3: ソース テーブルのデータを削除する
このステップでは、PII が見つかったすべてのテーブルのデータを削除します。
catalog = "users"
schema = "name"
def apply_gdpr_delete(user_id):
tables_with_pii = ["clicks_bronze", "users_bronze", "clicks_silver", "users_silver", "user_clicks_silver"]
for table in tables_with_pii:
print(f"Deleting user_id {user_id} from table {table}")
spark.sql(f"""
DELETE FROM {catalog}.{schema}.{table}
WHERE user_id = {user_id}
""")
ステップ 4: 影響を受けるストリーミング テーブルの定義に skipChangeCommits を追加する
この手順では、追加されていない行をスキップするように Delta Live Tables に指示する必要があります。 skipChangeCommits オプションを次のメソッドに追加します。 具体化されたビューの定義は、更新と削除を自動的に処理するため、更新する必要はありません。
users_bronze
users_silver
clicks_bronze
clicks_silver
user_clicks_silver
次のコードは、 users_bronze
メソッドを更新する方法を示しています。
def users_bronze():
return (
spark.readStream.option('skipChangeCommits', 'true').table(f"{catalog}.{schema}.source_users")
)
パイプラインを再度実行すると、パイプラインは正常に更新されます。