メインコンテンツまでスキップ

Prepare your data for GDPR compliance

EU 一般データ保護規則(GDPR)およびカリフォルニア州消費者プライバシー法(CCPA)は、プライバシーおよびデータセキュリティに関する規制であり、企業に対し、顧客に関するすべての個人を特定できる情報(PII)を、顧客からの明示的な要求に応じて永久的かつ完全に削除することを義務付けています。「忘れられる権利」(RTBF)または「データ消去権」としても知られる削除要求は、指定された期間内(例:1暦月以内)に実行される必要があります。

この記事では、Databricksに格納されているデータにRTBFを実装する方法について説明します。この記事に含まれる例は、Eコマース企業のデータセットをモデル化し、ソーステーブルのデータを削除して、これらの変更をダウンストリームテーブルに伝播する方法を示しています。

Blueprint for implementing the “right to be forgotten”

次の図は、「忘れられる権利」を実装する方法を示しています。

Diagram that illustrates how to implement GDPR compliance.

Delta Lake でポイント削除

Delta Lake は、ACIDトランザクションにより、大規模なデータレイクでのポイント削除を高速化し、消費者のGDPRやCCPAのリクエストに応じた個人識別情報(PII)の特定と削除を可能にします。

Delta Lake はテーブル履歴を保持し、特定の時点のクエリーとロールバックに使用できるようにします。VACUUM 関数は、Delta テーブルによって参照されなくなり、指定した保有期間のしきい値よりも古いデータ ファイルを削除し、データを完全に削除します。デフォルトと推奨事項の詳細については、「テーブル履歴の操作」を参照してください。

Ensure data is deleted when using deletion vectors

削除ベクトルが有効になっているテーブルの場合、レコードを削除した後、基になるレコードを完全に削除するには、REORG TABLE ... APPLY (PURGE)を実行する必要もあります。これには、Delta Lake テーブル、マテリアライズドビュー、およびストリーミングテーブルが含まれます。Parquet データファイルへの変更の適用を参照してください。

上流ソースのデータを削除

GDPR および CCPA は、Delta Lake 以外のソース(Kafka、ファイル、データベースなど)のデータを含む、すべてのデータに適用されます。Databricksでデータを削除するだけでなく、キューやクラウドストレージなどの上流のソースにあるデータも削除する必要があります。

注記

データ削除ワークフローを実装する前に、コンプライアンスまたはバックアップ目的でワークスペースデータをエクスポートする必要がある場合があります。ワークスペースデータをエクスポートを参照してください。

難読化よりも完全な削除が望ましい

データを削除するか、難読化するかのどちらかを選択してください。難読化は、仮名化、データマスキングなどを使用して実装できます。ただし、最も安全な選択肢は完全な消去です。なぜなら、実際には、再識別化のリスクを排除するためには、PIIデータの完全な削除が必要となることが多いためです。

ブロンズレイヤーのデータを削除し、その後シルバーおよびゴールドレイヤーに削除を伝播します。

削除リクエストのテーブルをクエリするスケジュール済みジョブにより、まずブロンズレイヤーのデータを削除してGDPR および CCPA のコンプライアンスを開始することをお勧めします。ブロンズレイヤーからデータが削除された後、変更はシルバーレイヤーとゴールドレイヤーに反映される可能性があります。

Regularly maintain tables to remove data from historical files

デフォルトでは、Delta Lake は削除されたレコードを含むテーブル履歴を30日間保持し、タイムトラベルとロールバックに使用できるようにします。ただし、以前のバージョンのデータを削除しても、データはクラウドストレージに保持されます。したがって、データセットを定期的に管理して、以前のバージョンのデータを削除する必要があります。推奨される方法は、Unity Catalog マネージドテーブルの予測的最適化であり、ストリーミングテーブルとマテリアライズドビューの両方をインテリジェントに維持します。

  • 予測的最適化によって管理されるテーブルの場合、 Lakeflow Spark宣言型パイプラインは、使用パターンに基づいてストリーミング テーブルとマテリアライズドビューの両方をインテリジェントに維持します。
  • 予測的最適化が有効になっていないテーブルの場合、 Lakeflow Spark宣言型パイプラインは、ストリーミング テーブルとマテリアライズドビューが更新されてから 24 時間以内にメンテナンス タスクを自動的に実行します。

予測的最適化またはLakeflow Spark宣言型パイプラインを使用していない場合は、 DeltaテーブルでVACUUMコマンドを実行して、以前のバージョンのデータを完全に削除する必要があります。 デフォルトでは、タイムトラベル機能は 7 日間に短縮されます (これは構成可能な設定です)。また、問題となるデータの履歴バージョンもクラウド ストレージから削除されます。

Delete PII data from the bronze layer

レイクハウスの設計によっては、PIIと非PIIのユーザーデータ間のリンクを切断できる可能性があります。例えば、Eメールのような自然キーではなく、「user_id」といった非自然キーを使用している場合、PIIデータを削除することができ、非PIIデータは保持されます。

この記事の残りの部分では、すべてのブロンズテーブルからユーザーレコードを完全に削除することで、RTBFに対処します。次のコードに示すように、DELETEコマンドを実行してデータを削除できます。

Python
spark.sql("DELETE FROM bronze.users WHERE user_id = 5")

一度に多数のレコードを削除する場合、MERGEコマンドを使用することをお勧めします。以下のコードは、gdpr_control_table という制御テーブルに user_id 列が含まれていることを前提としています。このテーブルには、「忘れられる権利」を要求したユーザーごとにレコードが挿入されます。

MERGE コマンドは、一致する行の条件を指定します。この例では、target_tableからのレコードをgdpr_control_tableのレコードとuser_idに基づいて照合しています。一致する項目がある場合 (たとえば、target_tablegdpr_control_tableの両方にuser_idがある場合など)、target_table内の行は削除されます。このMERGEコマンドが成功したら、リクエストが処理されたことを確認するために制御テーブルを更新してください。

spark.sql("""
MERGE INTO target
USING (
SELECT user_id
FROM gdpr_control_table
) AS source
ON target.user_id = source.user_id
WHEN MATCHED THEN DELETE
""")

ブロンズ、シルバー、およびゴールドレイヤーへの変更を反映する

After data is deleted in the bronze layer, you must propagate the changes to tables in the silver and gold layers.

マテリアライズドビュー: 自動的に削除を処理します

マテリアライズドビューはソースでの削除を自動的に処理します。そのため、マテリアライズドビューにソースから削除されたデータが含まれないように、特別な対応をする必要はありません。削除が完全に処理されるように、マテリアライズドビューを更新し、メンテナンスを実行する必要があります。

A materialized view always returns the correct result because it uses incremental computation if it is cheaper than full recomputation, but never at the cost of correctness. In other words, deleting data from a source could cause a materialized view to fully recompute.

自動的な削除処理の仕組みを示す図

ストリーミングテーブル: skipChangeCommits を使用してデータを削除し、ストリーミング ソースを読み取ります

ストリーミングテーブルは、Delta テーブルソースからストリームする際に、追記専用データを処理します。その他の操作、たとえばストリーミングソースからのレコードの更新や削除などはサポートされておらず、ストリームが中断されます。

注記

より堅牢なストリーミング実装のために、Delta テーブルの変更フィードからストリームし、処理コードで更新と削除を処理します。「ソース Delta テーブルへの変更の処理」を参照してください。

stsにおける削除の処理の仕組みを示す図です。

Deltaテーブルからのストリーミングは新しいデータのみを処理するため、データの変更はご自身で処理する必要があります。推奨される方法は次のとおりです。(1)DML を使用してソース Delta テーブルのデータを削除する、(2)DML を使用してストリーミングテーブルからデータを削除する、その後、(3)ストリーミング読み取りを skipChangeCommits を使用するように更新することです。このフラグは、ストリーミングテーブルが、更新や削除などの挿入以外のすべてをスキップすることを示します。

skipChangeCommits を使用する GDPRコンプライアンス方法を示す図。

Alternatively, you can (1) delete data from the source, then (2) fully refresh the streaming table. When you fully refresh a streaming table, it clears the table's streaming state and reprocesses all data again. Any upstream data source that is beyond its retention period (for example, a Kafka topic that ages out data after 7 days) won't be processed again, which could cause data loss. We recommend this option for streaming tables only in the scenario where historical data is available and processing it again won't be costly.

st.に対して完全更新を実行するGDPRコンプライアンス方法を示す図

例: EC企業向けの GDPR および CCPA のコンプライアンス

次の図は、GDPR および CCPA のコンプライアンスが求められるeコマース企業のメダリオンアーキテクチャを示しています。ユーザーのデータが削除されても、ダウンストリーム集計でそのアクティビティをカウントすることが望ましい場合があります。

eコマース企業向けのGDPRおよびCCPAコンプライアンスの例を示す図

  • ソーステーブル

    • source_users - A streaming source table of users (created here, for the example). Production environments typically use Kafka, Kinesis, or similar streaming platforms.
    • source_clicks クリックのストリーミングソーステーブル(例のためにここで作成)本番運用環境は一般的にKafka、Kinesis、または類似のストリーミングプラットフォームを使用します。
  • コントロールテーブル

    • gdpr_requests 「忘れられる権利」の対象となるユーザーIDを含む制御テーブルユーザーが削除をリクエストした場合、ここに追加します。
  • ブロンズレイヤー

    • users_bronze ユーザー ディメンションPIIが含まれています(例: Eメールアドレス)。
    • clicks_bronze - Click events. Contains PII (for example, IP address).
  • シルバーレイヤー

    • clicks_silver クリーニングされ、標準化されたクリックデータ
    • users_silver クリーニングされ、標準化されたユーザーデータ
    • user_clicks_silver - Joins clicks_silver (streaming) with a snapshot of users_silver.
  • ゴールドレイヤー

    • user_behavior_gold - ユーザー行動集計メトリクス
    • marketing_insights_gold 市場知見向けのユーザーセグメント

ステップ1: テーブルにサンプルデータを投入する

以下のコードは、この例のために2つのテーブルを作成し、サンプルデータを入力します。

  • source_users ユーザーに関するディメンションデータが含まれています。このテーブルには、emailというPII列が含まれています。
  • source_clicks ユーザーによって実行されたアクティビティに関するイベントデータを含みます。ip_address という PII 列を含んでいます。
Python
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, MapType, DateType

catalog = "users"
schema = "name"

# Create table containing sample users
users_schema = StructType([
StructField('user_id', IntegerType(), False),
StructField('username', StringType(), True),
StructField('email', StringType(), True),
StructField('registration_date', StringType(), True),
StructField('user_preferences', MapType(StringType(), StringType()), True)
])

users_data = [
(1, 'alice', 'alice@example.com', '2021-01-01', {'theme': 'dark', 'language': 'en'}),
(2, 'bob', 'bob@example.com', '2021-02-15', {'theme': 'light', 'language': 'fr'}),
(3, 'charlie', 'charlie@example.com', '2021-03-10', {'theme': 'dark', 'language': 'es'}),
(4, 'david', 'david@example.com', '2021-04-20', {'theme': 'light', 'language': 'de'}),
(5, 'eve', 'eve@example.com', '2021-05-25', {'theme': 'dark', 'language': 'it'})
]

users_df = spark.createDataFrame(users_data, schema=users_schema)
users_df.write.mode("overwrite").saveAsTable(f"{catalog}.{schema}.source_users")

# Create table containing clickstream (i.e. user activities)
from pyspark.sql.types import TimestampType

clicks_schema = StructType([
StructField('click_id', IntegerType(), False),
StructField('user_id', IntegerType(), True),
StructField('url_clicked', StringType(), True),
StructField('click_timestamp', StringType(), True),
StructField('device_type', StringType(), True),
StructField('ip_address', StringType(), True)
])

clicks_data = [
(1001, 1, 'https://example.com/home', '2021-06-01T12:00:00', 'mobile', '192.168.1.1'),
(1002, 1, 'https://example.com/about', '2021-06-01T12:05:00', 'desktop', '192.168.1.1'),
(1003, 2, 'https://example.com/contact', '2021-06-02T14:00:00', 'tablet', '192.168.1.2'),
(1004, 3, 'https://example.com/products', '2021-06-03T16:30:00', 'mobile', '192.168.1.3'),
(1005, 4, 'https://example.com/services', '2021-06-04T10:15:00', 'desktop', '192.168.1.4'),
(1006, 5, 'https://example.com/blog', '2021-06-05T09:45:00', 'tablet', '192.168.1.5')
]

clicks_df = spark.createDataFrame(clicks_data, schema=clicks_schema)
clicks_df.write.format("delta").mode("overwrite").saveAsTable(f"{catalog}.{schema}.source_clicks")

ステップ 2: PIIデータを処理するパイプラインを作成する

次のコードは、上記のメダリオンアーキテクチャのブロンズ、シルバー、およびゴールドレイヤーを作成します。

Python
from pyspark import pipelines as dp
from pyspark.sql.functions import col, concat_ws, count, countDistinct, avg, when, expr

catalog = "users"
schema = "name"

# ----------------------------
# Bronze Layer - Raw Data Ingestion
# ----------------------------

@dp.table(
name=f"{catalog}.{schema}.users_bronze",
comment='Raw users data loaded from source'
)
def users_bronze():
return (
spark.readStream.table(f"{catalog}.{schema}.source_users")
)

@dp.table(
name=f"{catalog}.{schema}.clicks_bronze",
comment='Raw clicks data loaded from source'
)
def clicks_bronze():
return (
spark.readStream.table(f"{catalog}.{schema}.source_clicks")
)

# ----------------------------
# Silver Layer - Data Cleaning and Enrichment
# ----------------------------

@dp.create_streaming_table(
name=f"{catalog}.{schema}.users_silver",
comment='Cleaned and standardized users data'
)

@dp.view
@dp.expect_or_drop('valid_email', "email IS NOT NULL")
def users_bronze_view():
return (
spark.readStream
.table(f"{catalog}.{schema}.users_bronze")
.withColumn('registration_date', col('registration_date').cast('timestamp'))
.dropDuplicates(['user_id', 'registration_date'])
.select('user_id', 'username', 'email', 'registration_date', 'user_preferences')
)

@dp.create_auto_cdc_flow(
target=f"{catalog}.{schema}.users_silver",
source="users_bronze_view",
keys=["user_id"],
sequence_by="registration_date",
)

@dp.table(
name=f"{catalog}.{schema}.clicks_silver",
comment='Cleaned and standardized clicks data'
)
@dp.expect_or_drop('valid_click_timestamp', "click_timestamp IS NOT NULL")
def clicks_silver():
return (
spark.readStream
.table(f"{catalog}.{schema}.clicks_bronze")
.withColumn('click_timestamp', col('click_timestamp').cast('timestamp'))
.withWatermark('click_timestamp', '10 minutes')
.dropDuplicates(['click_id'])
.select('click_id', 'user_id', 'url_clicked', 'click_timestamp', 'device_type', 'ip_address')
)

@dp.table(
name=f"{catalog}.{schema}.user_clicks_silver",
comment='Joined users and clicks data on user_id'
)
def user_clicks_silver():
# Read users_silver as a static DataFrame - each refresh
# will use a snapshot of the users_silver table.
users = spark.read.table(f"{catalog}.{schema}.users_silver")

# Read clicks_silver as a streaming DataFrame.
clicks = spark.readStream \
.table('clicks_silver')

# Perform the join - join of a static dataset with a
# streaming dataset creates a streaming table.
joined_df = clicks.join(users, on='user_id', how='inner')

return joined_df

# ----------------------------
# Gold Layer - Aggregated and Business-Level Data
# ----------------------------

@dp.materialized_view(
name=f"{catalog}.{schema}.user_behavior_gold",
comment='Aggregated user behavior metrics'
)
def user_behavior_gold():
df = spark.read.table(f"{catalog}.{schema}.user_clicks_silver")
return (
df.groupBy('user_id')
.agg(
count('click_id').alias('total_clicks'),
countDistinct('url_clicked').alias('unique_urls')
)
)

@dp.materialized_view(
name=f"{catalog}.{schema}.marketing_insights_gold",
comment='User segments for marketing insights'
)
def marketing_insights_gold():
df = spark.read.table(f"{catalog}.{schema}.user_behavior_gold")
return (
df.withColumn(
'engagement_segment',
when(col('total_clicks') >= 100, 'High Engagement')
.when((col('total_clicks') >= 50) & (col('total_clicks') < 100), 'Medium Engagement')
.otherwise('Low Engagement')
)
)

ステップ3: ソーステーブルのデータを削除する

このステップでは、PIIが含まれるすべてのテーブルでデータを削除します。次の関数は、PIIを含むテーブルからユーザーのすべてのPIIを削除します。

Python
catalog = "users"
schema = "name"

def apply_gdpr_delete(user_id):
tables_with_pii = ["clicks_bronze", "users_bronze", "clicks_silver", "users_silver", "user_clicks_silver"]

for table in tables_with_pii:
print(f"Deleting user_id {user_id} from table {table}")
spark.sql(f"""
DELETE FROM {catalog}.{schema}.{table}
WHERE user_id = {user_id}
""")

Step 4: Add skipChangeCommits to definitions of affected streaming tables

このステップでは、追加以外の行をスキップするようにLakeflow Spark宣言型パイプラインに指示する必要があります。次のメソッドに skipChangeCommits オプションを追加します。マテリアライズドビューの定義は、更新と削除を自動的に処理するため、更新する必要はありません。

  • users_bronze
  • users_silver
  • clicks_bronze
  • clicks_silver
  • user_clicks_silver

以下のコードは、 users_bronzeメソッドを更新する方法を示しています。

Python
def users_bronze():
return (
spark.readStream.option('skipChangeCommits', 'true').table(f"{catalog}.{schema}.source_users")
)

When you run the pipeline again, it will successfully update.