Prepare seus dados para compliance com o GDPR
O Regulamento Geral de Proteção de Dados (GDPR) e a Lei de Privacidade do Consumidor da Califórnia (CCPA) são regulamentos de privacidade e segurança de dados que exigem que as empresas excluam permanente e completamente todas as informações de identificação pessoal (PII) coletadas sobre um cliente mediante solicitação explícita. Também conhecido como “direito ao esquecimento” (RTBF) ou “direito ao apagamento de dados”, as solicitações de exclusão devem ser executadas em um prazo determinado (por exemplo, dentro de um mês corrido).
Este artigo orienta sobre como implementar o RTBF em dados armazenados no Databricks. O exemplo incluído neste artigo modela datasets para uma empresa de comércio eletrônico e demonstra como excluir dados em tabelas de origem e propagar essas alterações para tabelas downstream.
Diretrizes para a implementação do “direito de ser esquecido”
The following diagram illustrates how to implement the “right to be forgotten.”

Point deletes with Delta Lake
Delta Lake acelera exclusões pontuais em grandes data lakes com transações ACID, permitindo localizar e remover informação pessoalmente identificável (PII) em resposta a solicitações de consumidores relacionadas ao GDPR ou ao CCPA.
Delta Lake retains table history and makes it available for point-in-time queries and rollbacks. The VACUUM function removes data files that are no longer referenced by a Delta table and are older than a specified retention threshold, permanently deleting the data. To learn more about defaults and recommendations, see Work with table history.
Garanta que os dados sejam excluídos ao usar vetores de deleção
Para tabelas com vetores de exclusão habilitados, após excluir registros, você deve também executar REORG TABLE ... APPLY (PURGE) para excluir permanentemente os registros subjacentes. Isso inclui tabelas Delta Lake, exibições materializadas e tabelas de transmissão. Consulte Aplicar alterações aos arquivos de dados Parquet.
Excluir dados em origens upstream
O GDPR e o CCPA se aplicam a todos os dados, incluindo dados em fontes fora do Delta Lake, como Kafka, arquivos e bancos de dados. Além da exclusão de dados no Databricks, também é preciso excluir dados em origens upstream, como filas e armazenamento em cloud.
Antes de implementar fluxos de trabalho de exclusão de dados, talvez seja necessário exportar os dados do workspace para fins de compliance ou backup. Consulte Exportar dados do workspace.
A exclusão completa é preferível à ofuscação.
É preciso escolher entre excluir dados e ofuscá-los. Ofuscação pode ser implementada usando pseudonimização, mascaramento de dados, etc. No entanto, a opção mais segura é a eliminação completa, porque, na prática, eliminar o risco de reidentificação geralmente exige a exclusão completa dos dados de PII.
Excluir dados na camada bronze e propagar exclusões para as camadas prata e ouro.
Recomendamos que você comece a compliance com GDPR e CCPA excluindo dados primeiro na camada Bronze, impulsionado por um Job programado que consulta uma tabela de solicitações de exclusão. Após a exclusão dos dados da camada bronze, as alterações podem ser propagadas para as camadas prata e ouro.
Manter as tabelas regularmente para remover dados de arquivos históricos
Por default, o Delta Lake mantém o histórico da tabela, incluindo registros excluídos, por 30 dias, e o torna disponível para viagem do tempo e reversões. Mas mesmo que versões anteriores dos dados sejam removidas, os dados ainda são retidos no armazenamento em cloud. Portanto, os datasets devem ser mantidos regularmente para remover versões anteriores dos dados. A forma recomendada é Otimização preditiva para tabelas gerenciadas do Unity Catalog, que mantém de forma inteligente tanto tabelas de transmissão quanto views materializadas.
- Para tabelas gerenciadas por otimização preditiva, os Pipelines Declarativos do LakeFlow Spark mantêm de forma inteligente tanto as tabelas de transmissão quanto as views materializadas, com base nos padrões de uso.
- Para tabelas sem otimização preditiva habilitada, os pipelines declarativos do Lakeflow Spark executam automaticamente tarefas de manutenção dentro de 24 horas após a atualização das tabelas de transmissão e das views materializadas.
Se você não estiver usando otimização preditiva ou Lakeflow Spark Declarative Pipelines, você deve executar um comando VACUUM em tabelas Delta para remover permanentemente versões anteriores dos dados. Por default, isso reduzirá as capacidades de viagem do tempo para 7 dias, que é uma configuração configurável, e removerá também versões históricas dos dados em questão do armazenamento em cloud.
Excluir dados PII da camada bronze
Dependendo do design do seu lakehouse, poderá ser possível romper o vínculo entre PII e dados de usuário não PII. Por exemplo, caso se utilize uma chave não natural, como user_id, em vez de uma chave natural, como o email, é possível excluir dados de PII, mantendo os dados não PII no local.
O restante deste artigo aborda a RTBF com a exclusão completa de registros de usuário de todas as tabelas Bronze. Você pode excluir dados executando um comando DELETE, como mostrado no código a seguir.
spark.sql("DELETE FROM bronze.users WHERE user_id = 5")
Ao excluir um grande número de registros de uma só vez, recomendamos usar o comando MERGE. O código abaixo assume que você tem uma tabela de controle chamada gdpr_control_table que contém uma coluna user_id. É inserido um registro nesta tabela para cada usuário que tenha solicitado o "direito de ser esquecido".
The MERGE command specifies the condition for matching rows. In this example, it matches records from target_table with records in gdpr_control_table based on the user_id. If there is a match (for example, a user_id in both the target_table and the gdpr_control_table), the row in the target_table is deleted. After this MERGE command is successful, update the control table to confirm that the request has been processed.
spark.sql("""
MERGE INTO target
USING (
SELECT user_id
FROM gdpr_control_table
) AS source
ON target.user_id = source.user_id
WHEN MATCHED THEN DELETE
""")
Propagar alterações do bronze para as camadas prata e ouro
Após a exclusão dos dados na camada bronze, é necessário propagar as alterações para as tabelas nas camadas prata e ouro.
Visualizações materializadas: Lidar automaticamente com exclusões
As visualizações materializadas lidam automaticamente com exclusões nas fontes. Portanto, não é necessário realizar qualquer ação específica para garantir que uma visualização materializada não contenha dados que foram excluídos de uma origem. É preciso refresh uma visualização materializada e executar manutenção para garantir que as exclusões sejam completamente processadas.
Uma view materializada sempre retorna o resultado correto porque utiliza computação incremental se for mais econômica do que a recomputação completa, mas nunca à custa da correção. Em outras palavras, a exclusão de dados de uma origem poderia fazer com que uma visualização materializada fosse totalmente recalculada.

Tabelas de transmissão: excluir dados e ler fonte de transmissão usando skipChangeCommits
As tabelas de transmissão processam dados somente anexados quando transmitem de fontes de tabelas Delta. Qualquer outra operação, como a atualização ou exclusão de um registro de uma fonte de transmissão, não é suportada e interrompe a transmissão.
For a more robust streaming implementation, stream from the change feeds of Delta tables instead, and handle updates and deletes in your processing code. See Handle changes to source Delta tables.

Como a transmissão das tabelas Delta lida apenas com novos dados, é preciso gerenciar as alterações nos dados. O método recomendado é: (1) excluir dados nas tabelas Delta de origem usando DML, (2) excluir dados da tabela de transmissão usando DML e, em seguida, (3) atualizar a leitura de transmissão para usar skipChangeCommits. Este sinalizador indica que a tabela de transmissão deve ignorar tudo o que não seja inserções, como atualizações ou exclusões.

Alternativamente, você pode (1) excluir dados da origem e, depois, (2) fazer um refresh completo da tabela de transmissão. Ao realizar um refresh completo em uma tabela de transmissão, o estado de transmissão da tabela é limpo e todos os dados são reprocessados novamente. Qualquer fonte de dados a montante que esteja além do seu período de retenção (por exemplo, um tópico Kafka que elimina os dados após 7 dias) não será processada novamente, o que poderia causar perda de dados. Recomendamos esta opção somente para tabelas de transmissão no cenário em que data histórica está disponível e processá-los novamente não será custoso.

Exemplo: compliance com GDPR e CCPA para uma empresa de comércio eletrônico
O diagrama a seguir mostra uma arquitetura medallion para uma empresa de e-commerce onde a compliance com GDPR e CCPA precisa ser implementada. Mesmo que os dados de um usuário sejam excluídos, poderá ser útil contabilizar suas atividades em agregações downstream.

-
Source tables
source_users- Uma tabela de transmissão de origem de usuários (criada aqui, para o exemplo). Ambientes de produção normalmente usam Kafka, Kinesis ou plataformas de transmissão similares.source_clicks- Uma tabela de transmissão de origem de cliques (criada aqui, para o exemplo). Ambientes de produção normalmente usam Kafka, Kinesis ou plataformas de transmissão similares.
-
Controle a tabela
gdpr_requests- Tabela de controle contendo IDs de usuário sujeitos ao "direito de ser esquecido". Quando um usuário solicitar a remoção, adicione-o aqui.
-
Camada bronze
users_bronze- User dimensions. Contains PII (for example, email address).clicks_bronze- Clique em eventos. Contém PII (por exemplo, endereço IP).
-
Camada Prata
clicks_silver- Cleaned and standardized clicks data.users_silver- Cleaned and standardized user data.user_clicks_silver- joinclicks_silver(transmissão) com um Snapshot deusers_silver.
-
Camada Ouro
user_behavior_gold- Aggregated user behavior metrics.marketing_insights_goldSegmento de usuário para percepções de mercado.
Etapa 1: Preencha as tabelas com dados de exemplo.
O código a seguir cria estas duas tabelas para este exemplo e as preenche com dados de amostra.
source_userscontém dados dimensionais sobre usuários. Esta tabela contém uma coluna PII chamadaemail.source_clickscontém dados de eventos sobre atividades realizadas por usuários. Contém uma coluna PII chamadaip_address.
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, MapType, DateType
catalog = "users"
schema = "name"
# Create table containing sample users
users_schema = StructType([
StructField('user_id', IntegerType(), False),
StructField('username', StringType(), True),
StructField('email', StringType(), True),
StructField('registration_date', StringType(), True),
StructField('user_preferences', MapType(StringType(), StringType()), True)
])
users_data = [
(1, 'alice', 'alice@example.com', '2021-01-01', {'theme': 'dark', 'language': 'en'}),
(2, 'bob', 'bob@example.com', '2021-02-15', {'theme': 'light', 'language': 'fr'}),
(3, 'charlie', 'charlie@example.com', '2021-03-10', {'theme': 'dark', 'language': 'es'}),
(4, 'david', 'david@example.com', '2021-04-20', {'theme': 'light', 'language': 'de'}),
(5, 'eve', 'eve@example.com', '2021-05-25', {'theme': 'dark', 'language': 'it'})
]
users_df = spark.createDataFrame(users_data, schema=users_schema)
users_df.write.mode("overwrite").saveAsTable(f"{catalog}.{schema}.source_users")
# Create table containing clickstream (i.e. user activities)
from pyspark.sql.types import TimestampType
clicks_schema = StructType([
StructField('click_id', IntegerType(), False),
StructField('user_id', IntegerType(), True),
StructField('url_clicked', StringType(), True),
StructField('click_timestamp', StringType(), True),
StructField('device_type', StringType(), True),
StructField('ip_address', StringType(), True)
])
clicks_data = [
(1001, 1, 'https://example.com/home', '2021-06-01T12:00:00', 'mobile', '192.168.1.1'),
(1002, 1, 'https://example.com/about', '2021-06-01T12:05:00', 'desktop', '192.168.1.1'),
(1003, 2, 'https://example.com/contact', '2021-06-02T14:00:00', 'tablet', '192.168.1.2'),
(1004, 3, 'https://example.com/products', '2021-06-03T16:30:00', 'mobile', '192.168.1.3'),
(1005, 4, 'https://example.com/services', '2021-06-04T10:15:00', 'desktop', '192.168.1.4'),
(1006, 5, 'https://example.com/blog', '2021-06-05T09:45:00', 'tablet', '192.168.1.5')
]
clicks_df = spark.createDataFrame(clicks_data, schema=clicks_schema)
clicks_df.write.format("delta").mode("overwrite").saveAsTable(f"{catalog}.{schema}.source_clicks")
O passo 2: criar um pipeline que processa dados PII
O código a seguir cria camadas bronze, prata e ouro da arquitetura medallion mostrada acima.
from pyspark import pipelines as dp
from pyspark.sql.functions import col, concat_ws, count, countDistinct, avg, when, expr
catalog = "users"
schema = "name"
# ----------------------------
# Bronze Layer - Raw Data Ingestion
# ----------------------------
@dp.table(
name=f"{catalog}.{schema}.users_bronze",
comment='Raw users data loaded from source'
)
def users_bronze():
return (
spark.readStream.table(f"{catalog}.{schema}.source_users")
)
@dp.table(
name=f"{catalog}.{schema}.clicks_bronze",
comment='Raw clicks data loaded from source'
)
def clicks_bronze():
return (
spark.readStream.table(f"{catalog}.{schema}.source_clicks")
)
# ----------------------------
# Silver Layer - Data Cleaning and Enrichment
# ----------------------------
@dp.create_streaming_table(
name=f"{catalog}.{schema}.users_silver",
comment='Cleaned and standardized users data'
)
@dp.view
@dp.expect_or_drop('valid_email', "email IS NOT NULL")
def users_bronze_view():
return (
spark.readStream
.table(f"{catalog}.{schema}.users_bronze")
.withColumn('registration_date', col('registration_date').cast('timestamp'))
.dropDuplicates(['user_id', 'registration_date'])
.select('user_id', 'username', 'email', 'registration_date', 'user_preferences')
)
@dp.create_auto_cdc_flow(
target=f"{catalog}.{schema}.users_silver",
source="users_bronze_view",
keys=["user_id"],
sequence_by="registration_date",
)
@dp.table(
name=f"{catalog}.{schema}.clicks_silver",
comment='Cleaned and standardized clicks data'
)
@dp.expect_or_drop('valid_click_timestamp', "click_timestamp IS NOT NULL")
def clicks_silver():
return (
spark.readStream
.table(f"{catalog}.{schema}.clicks_bronze")
.withColumn('click_timestamp', col('click_timestamp').cast('timestamp'))
.withWatermark('click_timestamp', '10 minutes')
.dropDuplicates(['click_id'])
.select('click_id', 'user_id', 'url_clicked', 'click_timestamp', 'device_type', 'ip_address')
)
@dp.table(
name=f"{catalog}.{schema}.user_clicks_silver",
comment='Joined users and clicks data on user_id'
)
def user_clicks_silver():
# Read users_silver as a static DataFrame - each refresh
# will use a snapshot of the users_silver table.
users = spark.read.table(f"{catalog}.{schema}.users_silver")
# Read clicks_silver as a streaming DataFrame.
clicks = spark.readStream \
.table('clicks_silver')
# Perform the join - join of a static dataset with a
# streaming dataset creates a streaming table.
joined_df = clicks.join(users, on='user_id', how='inner')
return joined_df
# ----------------------------
# Gold Layer - Aggregated and Business-Level Data
# ----------------------------
@dp.materialized_view(
name=f"{catalog}.{schema}.user_behavior_gold",
comment='Aggregated user behavior metrics'
)
def user_behavior_gold():
df = spark.read.table(f"{catalog}.{schema}.user_clicks_silver")
return (
df.groupBy('user_id')
.agg(
count('click_id').alias('total_clicks'),
countDistinct('url_clicked').alias('unique_urls')
)
)
@dp.materialized_view(
name=f"{catalog}.{schema}.marketing_insights_gold",
comment='User segments for marketing insights'
)
def marketing_insights_gold():
df = spark.read.table(f"{catalog}.{schema}.user_behavior_gold")
return (
df.withColumn(
'engagement_segment',
when(col('total_clicks') >= 100, 'High Engagement')
.when((col('total_clicks') >= 50) & (col('total_clicks') < 100), 'Medium Engagement')
.otherwise('Low Engagement')
)
)
Etapa 3: excluir dados em tabelas de origem
Neste passo, os dados são excluídos em todas as tabelas onde PII é encontrada. A função a seguir remove todas as instâncias de PII de um usuário de tabelas com PII.
catalog = "users"
schema = "name"
def apply_gdpr_delete(user_id):
tables_with_pii = ["clicks_bronze", "users_bronze", "clicks_silver", "users_silver", "user_clicks_silver"]
for table in tables_with_pii:
print(f"Deleting user_id {user_id} from table {table}")
spark.sql(f"""
DELETE FROM {catalog}.{schema}.{table}
WHERE user_id = {user_id}
""")
Passo 4: adicionar skipChangeCommits às definições de tabelas de transmissão afetadas
Neste o passo, é preciso instruir o Lakeflow Spark Declarative Pipelines a ignorar linhas de não anexação. Adicione a opção `skipChangeCommits` aos seguintes métodos. Não é necessário atualizar as definições das visualizações materializadas porque elas lidarão automaticamente com atualizações e exclusões.
users_bronzeusers_silverclicks_bronzeclicks_silveruser_clicks_silver
O código a seguir mostra como atualizar o método users_bronze:
def users_bronze():
return (
spark.readStream.option('skipChangeCommits', 'true').table(f"{catalog}.{schema}.source_users")
)
Quando o pipeline for executado novamente, ele será atualizado com êxito.