メインコンテンツまでスキップ

GDPR コンプライアンスのためのデータの準備

EU 一般データ保護規則(GDPR)およびカリフォルニア州消費者プライバシー法(CCPA)は、プライバシーおよびデータセキュリティに関する規制であり、企業に対し、顧客に関するすべての個人を特定できる情報(PII)を、顧客からの明示的な要求に応じて永久的かつ完全に削除することを義務付けています。「忘れられる権利」(RTBF)または「データ消去権」としても知られる削除要求は、指定された期間内(例:1暦月以内)に実行される必要があります。

この記事では、Databricksに格納されているデータにRTBFを実装する方法について説明します。この記事に含まれる例は、Eコマース企業のデータセットをモデル化し、ソーステーブルのデータを削除して、これらの変更をダウンストリームテーブルに伝播する方法を示しています。

「忘れられる権利」を実装するための設計図

次の図は、「忘れられる権利」を実装する方法を示しています。

GDPRコンプライアンスを実装する方法を示す図。

Delta Lake でポイント削除

Delta Lake は、ACIDトランザクションにより、大規模なデータレイクでのポイント削除を高速化し、消費者のGDPRやCCPAのリクエストに応じた個人識別情報(PII)の特定と削除を可能にします。

Delta Lake はテーブル履歴を保持し、特定の時点のクエリーとロールバックに使用できるようにします。VACUUM 関数は、Delta テーブルによって参照されなくなり、指定した保有期間のしきい値よりも古いデータ ファイルを削除し、データを完全に削除します。デフォルトと推奨事項の詳細については、「テーブル履歴の操作」を参照してください。

削除ベクトルを使用する際は、データが削除されることを確認してください。

削除ベクトルが有効になっているテーブルの場合、レコードを削除した後、基になるレコードを完全に削除するには、REORG TABLE ... APPLY (PURGE)を実行する必要もあります。これには、Delta Lake テーブル、マテリアライズドビュー、およびストリーミングテーブルが含まれます。Parquet データファイルへの変更の適用を参照してください。

上流ソースのデータを削除

GDPR および CCPA は、Delta Lake 以外のソース(Kafka、ファイル、データベースなど)のデータを含む、すべてのデータに適用されます。Databricksでデータを削除するだけでなく、キューやクラウドストレージなどの上流のソースにあるデータも削除する必要があります。

注記

データ削除ワークフローを実装する前に、コンプライアンスまたはバックアップ目的でワークスペースデータをエクスポートする必要がある場合があります。ワークスペースデータをエクスポートを参照してください。

難読化よりも完全な削除が望ましい

データを削除するか、難読化するかのどちらかを選択してください。難読化は、仮名化、データマスキングなどを使用して実装できます。ただし、最も安全な選択肢は完全な消去です。なぜなら、実際には、再識別化のリスクを排除するためには、PIIデータの完全な削除が必要となることが多いためです。

ブロンズレイヤーのデータを削除し、その後シルバーおよびゴールドレイヤーに削除を伝播します。

削除リクエストのテーブルをクエリするスケジュール済みジョブにより、まずブロンズレイヤーのデータを削除してGDPR および CCPA のコンプライアンスを開始することをお勧めします。ブロンズレイヤーからデータが削除された後、変更はシルバーレイヤーとゴールドレイヤーに反映される可能性があります。

履歴ファイルからデータを削除するため、テーブルを定期的に管理します

デフォルトでは、Delta Lake は削除されたレコードを含むテーブル履歴を30日間保持し、タイムトラベルとロールバックに使用できるようにします。ただし、以前のバージョンのデータを削除しても、データはクラウドストレージに保持されます。したがって、データセットを定期的に管理して、以前のバージョンのデータを削除する必要があります。推奨される方法は、Unity Catalog マネージドテーブルの予測的最適化であり、ストリーミングテーブルとマテリアライズドビューの両方をインテリジェントに維持します。

  • 予測的最適化によって管理されるテーブルの場合、 Lakeflow Spark宣言型パイプラインは、使用パターンに基づいてストリーミング テーブルとマテリアライズドビューの両方をインテリジェントに維持します。
  • 予測的最適化が有効になっていないテーブルの場合、 Lakeflow Spark宣言型パイプラインは、ストリーミング テーブルとマテリアライズドビューが更新されてから 24 時間以内にメンテナンス タスクを自動的に実行します。

予測的最適化またはLakeflow Spark宣言型パイプラインを使用していない場合は、 DeltaテーブルでVACUUMコマンドを実行して、以前のバージョンのデータを完全に削除する必要があります。 デフォルトでは、タイムトラベル機能は 7 日間に短縮されます (これは構成可能な設定です)。また、問題となるデータの履歴バージョンもクラウド ストレージから削除されます。

ブロンズレイヤーからPIIデータを削除する

レイクハウスの設計によっては、PIIと非PIIのユーザーデータ間のリンクを切断できる可能性があります。例えば、Eメールのような自然キーではなく、「user_id」といった非自然キーを使用している場合、PIIデータを削除することができ、非PIIデータは保持されます。

この記事の残りの部分では、すべてのブロンズテーブルからユーザーレコードを完全に削除することで、RTBFに対処します。次のコードに示すように、DELETEコマンドを実行してデータを削除できます。

Python
spark.sql("DELETE FROM bronze.users WHERE user_id = 5")

一度に多数のレコードを削除する場合、MERGEコマンドを使用することをお勧めします。以下のコードは、gdpr_control_table という制御テーブルに user_id 列が含まれていることを前提としています。このテーブルには、「忘れられる権利」を要求したユーザーごとにレコードが挿入されます。

MERGE コマンドは、一致する行の条件を指定します。この例では、target_tableからのレコードをgdpr_control_tableのレコードとuser_idに基づいて照合しています。一致する項目がある場合 (たとえば、target_tablegdpr_control_tableの両方にuser_idがある場合など)、target_table内の行は削除されます。このMERGEコマンドが成功したら、リクエストが処理されたことを確認するために制御テーブルを更新してください。

spark.sql("""
MERGE INTO target
USING (
SELECT user_id
FROM gdpr_control_table
) AS source
ON target.user_id = source.user_id
WHEN MATCHED THEN DELETE
""")

ブロンズ、シルバー、およびゴールドレイヤーへの変更を反映する

ブロンズレイヤーでデータが削除された後は、シルバー層とゴールドレイヤーのテーブルにも変更内容を反映させる必要があります。

マテリアライズドビュー: 自動的に削除を処理します

マテリアライズドビューはソースでの削除を自動的に処理します。そのため、マテリアライズドビューにソースから削除されたデータが含まれないように、特別な対応をする必要はありません。削除が完全に処理されるように、マテリアライズドビューを更新し、メンテナンスを実行する必要があります。

マテリアライズドビューは、完全再計算よりもコストが低い場合は増分計算を使用するため、常に正しい結果を返しますが、正しさを犠牲にすることはありません。つまり、ソースからデータを削除すると、マテリアライズドビューが完全に再計算される可能性があります。

自動的な削除処理の仕組みを示す図

ストリーミングテーブル: skipChangeCommits を使用してデータを削除し、ストリーミング ソースを読み取ります

ストリーミングテーブルは、Delta テーブルソースからストリームする際に、追記専用データを処理します。その他の操作、たとえばストリーミングソースからのレコードの更新や削除などはサポートされておらず、ストリームが中断されます。

注記

より堅牢なストリーミング実装のために、Delta テーブルの変更フィードからストリームし、処理コードで更新と削除を処理します。「ソース Delta テーブルへの変更の処理」を参照してください。

stsにおける削除の処理の仕組みを示す図です。

Deltaテーブルからのストリーミングは新しいデータのみを処理するため、データの変更はご自身で処理する必要があります。推奨される方法は次のとおりです。(1)DML を使用してソース Delta テーブルのデータを削除する、(2)DML を使用してストリーミングテーブルからデータを削除する、その後、(3)ストリーミング読み取りを skipChangeCommits を使用するように更新することです。このフラグは、ストリーミングテーブルが、更新や削除などの挿入以外のすべてをスキップすることを示します。

skipChangeCommits を使用する GDPRコンプライアンス方法を示す図。

あるいは、(1)ソースからデータを削除してから、(2)ストリーミングテーブルを完全に更新することもできます。ストリーミングテーブルを完全に更新すると、テーブルのストリーミング状態がクリアされ、すべてのデータが再処理されます。保持期間を過ぎたアップストリームのデータソース(たとえば、7日後にデータが期限切れになるKafkaトピック)は再処理されず、これによりデータ損失が発生する可能性があります。ヒストリカルデータが利用可能で、その再処理に費用がかからない場合に限り、このオプションはストリーミングテーブルでのみ推奨されます。

st.に対して完全更新を実行するGDPRコンプライアンス方法を示す図

例: EC企業向けの GDPR および CCPA のコンプライアンス

次の図は、GDPR および CCPA のコンプライアンスが求められるeコマース企業のメダリオンアーキテクチャを示しています。ユーザーのデータが削除されても、ダウンストリーム集計でそのアクティビティをカウントすることが望ましい場合があります。

eコマース企業向けのGDPRおよびCCPAコンプライアンスの例を示す図

  • ソーステーブル

    • source_users ユーザーのストリーミングソーステーブル (例として、ここで作成されています。)本番運用環境は一般的にKafka、Kinesis、または類似のストリーミングプラットフォームを使用します。
    • source_clicks クリックのストリーミングソーステーブル(例のためにここで作成)本番運用環境は一般的にKafka、Kinesis、または類似のストリーミングプラットフォームを使用します。
  • コントロールテーブル

    • gdpr_requests 「忘れられる権利」の対象となるユーザーIDを含む制御テーブルユーザーが削除をリクエストした場合、ここに追加します。
  • ブロンズレイヤー

    • users_bronze ユーザー ディメンションPIIが含まれています(例: Eメールアドレス)。
    • clicks_bronze イベントをクリックしてください。PIIが含まれています(例えば、IPアドレス)。
  • シルバーレイヤー

    • clicks_silver クリーニングされ、標準化されたクリックデータ
    • users_silver クリーニングされ、標準化されたユーザーデータ
    • user_clicks_silver clicks_silver(ストリーミング)とusers_silverのスナップショットを結合します。
  • ゴールドレイヤー

    • user_behavior_gold - ユーザー行動集計メトリクス
    • marketing_insights_gold 市場知見向けのユーザーセグメント

ステップ1: テーブルにサンプルデータを投入する

以下のコードは、この例のために2つのテーブルを作成し、サンプルデータを入力します。

  • source_users ユーザーに関するディメンションデータが含まれています。このテーブルには、emailというPII列が含まれています。
  • source_clicks ユーザーによって実行されたアクティビティに関するイベントデータを含みます。ip_address という PII 列を含んでいます。
Python
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, MapType, DateType

catalog = "users"
schema = "name"

# Create table containing sample users
users_schema = StructType([
StructField('user_id', IntegerType(), False),
StructField('username', StringType(), True),
StructField('email', StringType(), True),
StructField('registration_date', StringType(), True),
StructField('user_preferences', MapType(StringType(), StringType()), True)
])

users_data = [
(1, 'alice', 'alice@example.com', '2021-01-01', {'theme': 'dark', 'language': 'en'}),
(2, 'bob', 'bob@example.com', '2021-02-15', {'theme': 'light', 'language': 'fr'}),
(3, 'charlie', 'charlie@example.com', '2021-03-10', {'theme': 'dark', 'language': 'es'}),
(4, 'david', 'david@example.com', '2021-04-20', {'theme': 'light', 'language': 'de'}),
(5, 'eve', 'eve@example.com', '2021-05-25', {'theme': 'dark', 'language': 'it'})
]

users_df = spark.createDataFrame(users_data, schema=users_schema)
users_df.write.mode("overwrite").saveAsTable(f"{catalog}.{schema}.source_users")

# Create table containing clickstream (i.e. user activities)
from pyspark.sql.types import TimestampType

clicks_schema = StructType([
StructField('click_id', IntegerType(), False),
StructField('user_id', IntegerType(), True),
StructField('url_clicked', StringType(), True),
StructField('click_timestamp', StringType(), True),
StructField('device_type', StringType(), True),
StructField('ip_address', StringType(), True)
])

clicks_data = [
(1001, 1, 'https://example.com/home', '2021-06-01T12:00:00', 'mobile', '192.168.1.1'),
(1002, 1, 'https://example.com/about', '2021-06-01T12:05:00', 'desktop', '192.168.1.1'),
(1003, 2, 'https://example.com/contact', '2021-06-02T14:00:00', 'tablet', '192.168.1.2'),
(1004, 3, 'https://example.com/products', '2021-06-03T16:30:00', 'mobile', '192.168.1.3'),
(1005, 4, 'https://example.com/services', '2021-06-04T10:15:00', 'desktop', '192.168.1.4'),
(1006, 5, 'https://example.com/blog', '2021-06-05T09:45:00', 'tablet', '192.168.1.5')
]

clicks_df = spark.createDataFrame(clicks_data, schema=clicks_schema)
clicks_df.write.format("delta").mode("overwrite").saveAsTable(f"{catalog}.{schema}.source_clicks")

ステップ 2: PIIデータを処理するパイプラインを作成する

次のコードは、上記のメダリオンアーキテクチャのブロンズ、シルバー、およびゴールドレイヤーを作成します。

Python
from pyspark import pipelines as dp
from pyspark.sql.functions import col, concat_ws, count, countDistinct, avg, when, expr

catalog = "users"
schema = "name"

# ----------------------------
# Bronze Layer - Raw Data Ingestion
# ----------------------------

@dp.table(
name=f"{catalog}.{schema}.users_bronze",
comment='Raw users data loaded from source'
)
def users_bronze():
return (
spark.readStream.table(f"{catalog}.{schema}.source_users")
)

@dp.table(
name=f"{catalog}.{schema}.clicks_bronze",
comment='Raw clicks data loaded from source'
)
def clicks_bronze():
return (
spark.readStream.table(f"{catalog}.{schema}.source_clicks")
)

# ----------------------------
# Silver Layer - Data Cleaning and Enrichment
# ----------------------------

@dp.create_streaming_table(
name=f"{catalog}.{schema}.users_silver",
comment='Cleaned and standardized users data'
)

@dp.view
@dp.expect_or_drop('valid_email', "email IS NOT NULL")
def users_bronze_view():
return (
spark.readStream
.table(f"{catalog}.{schema}.users_bronze")
.withColumn('registration_date', col('registration_date').cast('timestamp'))
.dropDuplicates(['user_id', 'registration_date'])
.select('user_id', 'username', 'email', 'registration_date', 'user_preferences')
)

@dp.create_auto_cdc_flow(
target=f"{catalog}.{schema}.users_silver",
source="users_bronze_view",
keys=["user_id"],
sequence_by="registration_date",
)

@dp.table(
name=f"{catalog}.{schema}.clicks_silver",
comment='Cleaned and standardized clicks data'
)
@dp.expect_or_drop('valid_click_timestamp', "click_timestamp IS NOT NULL")
def clicks_silver():
return (
spark.readStream
.table(f"{catalog}.{schema}.clicks_bronze")
.withColumn('click_timestamp', col('click_timestamp').cast('timestamp'))
.withWatermark('click_timestamp', '10 minutes')
.dropDuplicates(['click_id'])
.select('click_id', 'user_id', 'url_clicked', 'click_timestamp', 'device_type', 'ip_address')
)

@dp.table(
name=f"{catalog}.{schema}.user_clicks_silver",
comment='Joined users and clicks data on user_id'
)
def user_clicks_silver():
# Read users_silver as a static DataFrame - each refresh
# will use a snapshot of the users_silver table.
users = spark.read.table(f"{catalog}.{schema}.users_silver")

# Read clicks_silver as a streaming DataFrame.
clicks = spark.readStream \
.table('clicks_silver')

# Perform the join - join of a static dataset with a
# streaming dataset creates a streaming table.
joined_df = clicks.join(users, on='user_id', how='inner')

return joined_df

# ----------------------------
# Gold Layer - Aggregated and Business-Level Data
# ----------------------------

@dp.materialized_view(
name=f"{catalog}.{schema}.user_behavior_gold",
comment='Aggregated user behavior metrics'
)
def user_behavior_gold():
df = spark.read.table(f"{catalog}.{schema}.user_clicks_silver")
return (
df.groupBy('user_id')
.agg(
count('click_id').alias('total_clicks'),
countDistinct('url_clicked').alias('unique_urls')
)
)

@dp.materialized_view(
name=f"{catalog}.{schema}.marketing_insights_gold",
comment='User segments for marketing insights'
)
def marketing_insights_gold():
df = spark.read.table(f"{catalog}.{schema}.user_behavior_gold")
return (
df.withColumn(
'engagement_segment',
when(col('total_clicks') >= 100, 'High Engagement')
.when((col('total_clicks') >= 50) & (col('total_clicks') < 100), 'Medium Engagement')
.otherwise('Low Engagement')
)
)

ステップ3: ソーステーブルのデータを削除する

このステップでは、PIIが含まれるすべてのテーブルでデータを削除します。次の関数は、PIIを含むテーブルからユーザーのすべてのPIIを削除します。

Python
catalog = "users"
schema = "name"

def apply_gdpr_delete(user_id):
tables_with_pii = ["clicks_bronze", "users_bronze", "clicks_silver", "users_silver", "user_clicks_silver"]

for table in tables_with_pii:
print(f"Deleting user_id {user_id} from table {table}")
spark.sql(f"""
DELETE FROM {catalog}.{schema}.{table}
WHERE user_id = {user_id}
""")

ステップ4: skipChangeCommits を影響を受けるストリーミングテーブルの定義に追加します。

このステップでは、追加以外の行をスキップするようにLakeflow Spark宣言型パイプラインに指示する必要があります。次のメソッドに skipChangeCommits オプションを追加します。マテリアライズドビューの定義は、更新と削除を自動的に処理するため、更新する必要はありません。

  • users_bronze
  • users_silver
  • clicks_bronze
  • clicks_silver
  • user_clicks_silver

以下のコードは、 users_bronzeメソッドを更新する方法を示しています。

Python
def users_bronze():
return (
spark.readStream.option('skipChangeCommits', 'true').table(f"{catalog}.{schema}.source_users")
)

パイプラインを再度実行すると、正常に更新されます。