Começar: Prepare seus dados para GDPR compliance

O Regulamento Geral de Proteção de Dados (GDPR) (GDPR) e a Lei de Privacidade do Consumidor da Califórnia (CCPA) são regulamentações de privacidade e segurança de dados que exigem que as empresas excluam de forma permanente e completa todas as informações de identificação pessoal (PII) coletadas sobre um cliente mediante solicitação explícita. Também conhecidas como “direito de ser esquecido” (RTBF) ou “direito à exclusão de dados”, as solicitações de exclusão devem ser executadas durante um período específico (por exemplo, dentro de um mês civil).

Este artigo orienta o senhor sobre como implementar o RTBF em dados armazenados em Databricks. O exemplo incluído neste artigo modela o conjunto de dados de uma empresa de comércio eletrônico e mostra como excluir dados em tabelas de origem e propagar essas alterações para tabelas posteriores.

Plano para implementar o “direito de ser esquecido”

O diagrama a seguir ilustra como implementar o “direito de ser esquecido”.

Diagrama que ilustra como implementar o site GDPR compliance.

Excluir dados em fontes upstream

O GDPR e a CCPA se aplicam a todos os dados, inclusive dados em fontes fora do Delta Lake, como Kafka, arquivos e bancos de dados. Além de excluir dados no Databricks, o senhor também deve se lembrar de excluir dados em fontes upstream, como filas e armazenamento em nuvem.

A exclusão completa é preferível à ofuscação

Você precisa escolher entre excluir dados e ofuscá-los. A ofuscação pode ser implementada usando pseudonimização, mascaramento de dados, etc. No entanto, a opção mais segura é a exclusão completa porque, na prática, eliminar o risco de reidentificação geralmente exige a exclusão completa dos dados de PII.

Excluir dados na camada bronze e, em seguida, propagar as exclusões para as camadas prata e ouro

Recomendamos que o senhor comece GDPR e CCPA compliance excluindo primeiro os dados na camada de bronze, com um trabalho agendado que consulte uma tabela de controle contendo solicitações de exclusão. Depois que os dados são excluídos da camada de bronze, as alterações podem ser propagadas para as camadas de prata e ouro.

Mantenha tabelas regularmente para remover dados de arquivos históricos

Em default, o Delta Lake retém o histórico da tabela, inclusive os registros excluídos, por 30 dias, e o disponibiliza para viagens do tempo e reversões. Mas mesmo que as versões anteriores dos dados sejam removidas, os dados ainda serão retidos no armazenamento em nuvem. Portanto, o senhor deve atualizar regularmente as tabelas e visualizações para remover versões anteriores dos dados. A maneira recomendada é a otimização preditiva para Unity Catalog gerenciar tabelas, que mantém de forma inteligente as tabelas de transmissão e a visualização materializada. Delta Live Tables executa automaticamente a tarefa de manutenção dentro de 24 horas após a atualização das tabelas de transmissão e da visualização materializada.

Se não estiver usando otimização preditiva ou Delta Live Tables, o senhor deve executar um comando VACUUM nas tabelas Delta para remover permanentemente as versões anteriores dos dados. Em default, isso reduzirá os recursos de viagem do tempo para 7 dias, que é uma configuração, e também removerá as versões históricas dos dados em questão do armazenamento em nuvem.

Exclua dados de PII da camada de bronze

Dependendo do design do seu site lakehouse, talvez seja possível cortar o vínculo entre os dados de usuário PII e não PII. Por exemplo, se o senhor estiver usando um key não natural, como user_id, em vez de um key natural, como email, poderá excluir os dados de PII, deixando os dados não PII no lugar.

O restante deste artigo lida com o RTBF excluindo completamente os registros de usuários de todas as tabelas bronze. O senhor pode excluir dados executando um comando DELETE, conforme mostrado no código a seguir:

spark.sql("DELETE FROM bronze.users WHERE user_id = 5")

Ao excluir um grande número de registros de uma só vez, recomendamos o uso do comando MERGE. O código abaixo pressupõe que o senhor tenha uma tabela de controle chamada gdpr_control_table que contém uma coluna user_id. Você insere um registro nessa tabela para cada usuário que solicitou o “direito de ser esquecido” nessa tabela.

O comando MERGE especifica a condição para a correspondência de linhas. Neste exemplo, ele combina registros de target_table com registros em gdpr_control_table com base no user_id. Se houver uma correspondência (por exemplo, user_id em target_table e gdpr_control_table), a linha em target_table será excluída. Depois que esse comando MERGE for bem-sucedido, atualize a tabela de controle para confirmar que a solicitação foi processada.

spark.sql("""
  MERGE INTO target
  USING (
    SELECT user_id
    FROM gdpr_control_table
  ) AS source
  ON target.user_id = source.user_id
  WHEN MATCHED THEN DELETE
""")

Propagar mudanças das camadas de bronze para prata e ouro

Depois que os dados são excluídos na camada bronze, o senhor deve propagar as alterações para as tabelas nas camadas prata e ouro.

Visualização materializada: Manipular automaticamente as exclusões

A visualização materializada trata automaticamente as exclusões nas fontes. Portanto, o senhor não precisa fazer nada de especial para garantir que um site materializado view não contenha dados que tenham sido excluídos de uma fonte. O senhor deve refresh a materialized view e executar a manutenção para garantir que as exclusões sejam completamente processadas.

Um view materializado sempre retorna o resultado correto porque usa computação incremental se for mais barato do que a recomputação completa, mas nunca ao custo da correção. Em outras palavras, a exclusão de dados de uma fonte pode fazer com que um view materializado seja totalmente recomputado.

Diagrama que ilustra como lidar automaticamente com exclusões.

tabelas de transmissão: Excluir dados e ler a fonte de transmissão usando skipChangeCommits

As tabelas de transmissão só podem processar dados somente de anexos. Ou seja, as tabelas de transmissão esperam que apenas novas linhas de dados apareçam na fonte de transmissão. Quaisquer outras operações, como atualização ou exclusão de qualquer registro de uma tabela de origem usada para transmissão, não são suportadas e interrompem a transmissão.

Diagrama que ilustra como lidar com exclusões em tabelas de transmissão.

Como a transmissão lida apenas com novos dados, o senhor mesmo deve lidar com as alterações nos dados. O método recomendado é: (1) excluir os dados na origem da transmissão, (2) excluir os dados da tabela de transmissão e, em seguida, (3) atualizar a leitura da transmissão para usar skipChangeCommits. Esse sinalizador indica para Databricks que a tabela de transmissão deve ignorar qualquer coisa que não seja inserção, como atualizações ou exclusões.

Diagrama que ilustra um método GDPR compliance que usa skipChangeCommits.

Como alternativa, o senhor pode (1) excluir os dados da origem, (2) excluí-los da tabela de transmissão e, em seguida, (3) refresh totalmente a tabela de transmissão. Quando o senhor acessar totalmente refresh uma tabela de transmissão, ele limpará o estado de transmissão da tabela e reprocessará todos os dados novamente. Qualquer fonte de dados upstream que esteja além de seu período de retenção (por exemplo, um tópico do site Kafka que envelhece os dados após 7 dias) não será processado novamente, o que pode causar perda de dados. Recomendamos essa opção para as tabelas de transmissão somente nos casos em que os dados históricos estiverem disponíveis e o processamento de novo não for dispendioso.

Diagrama que ilustra um método GDPR compliance que executa um refresh completo na tabela de transmissão.

Delta tabelas: Lidar com exclusões usando readChangeFeed

As tabelas Delta regulares não contêm nenhum tratamento especial de exclusões upstream. Em vez disso, você precisa escrever seu próprio código para propagar as exclusões para eles (por exemplo, spark.readStream.option("readChangeFeed", true).table("source_table")).

Exemplo: Conformidade com o GDPR e a CCPA para uma empresa de comércio eletrônico

O diagrama a seguir mostra uma arquitetura de medalhão para uma empresa de comércio eletrônico em que GDPR e CCPA compliance precisam ser implementados. Mesmo que os dados de um usuário sejam excluídos, talvez você queira contar suas atividades em agregações posteriores.

Diagrama que ilustra um exemplo de GDPR e CCPA compliance para uma empresa de comércio eletrônico.
  • Camada de bronze

    • users - Dimensões do usuário. Contém PII (por exemplo, endereço email ).

    • clickstream - Clique em eventos. Contém PII (por exemplo, endereço IP).

    • gdpr_requests - Tabela de controle contendo IDs de usuário sujeitas ao “direito de ser esquecido”.

  • Camada de prata

    • clicks_hourly - Total de cliques por hora. Se você excluir um usuário, ainda desejará contar seus cliques.

    • clicks_by_user - Total de cliques por usuário. Se você excluir um usuário, NÃO desejará contar seus cliques.

  • Camada de ouro

    • revenue_by_user - Gastos totais de cada usuário.

Etapa 1: preencher tabelas com dados de amostra

O código a seguir cria essas duas tabelas:

  • source_users contém dados dimensionais sobre os usuários. Essa tabela contém uma coluna de PII chamada email.

  • source_clicks contém dados de eventos sobre atividades realizadas pelos usuários. Ele contém uma coluna de PII chamada ip_address.

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, MapType, DateType


catalog = "users"
schema = "name"


# Create table containing sample users
users_schema = StructType([
   StructField('user_id', IntegerType(), False),
   StructField('username', StringType(), True),
   StructField('email', StringType(), True),
   StructField('registration_date', StringType(), True),
   StructField('user_preferences', MapType(StringType(), StringType()), True)
])


users_data = [
   (1, 'alice', 'alice@example.com', '2021-01-01', {'theme': 'dark', 'language': 'en'}),
   (2, 'bob', 'bob@example.com', '2021-02-15', {'theme': 'light', 'language': 'fr'}),
   (3, 'charlie', 'charlie@example.com', '2021-03-10', {'theme': 'dark', 'language': 'es'}),
   (4, 'david', 'david@example.com', '2021-04-20', {'theme': 'light', 'language': 'de'}),
   (5, 'eve', 'eve@example.com', '2021-05-25', {'theme': 'dark', 'language': 'it'})
]


users_df = spark.createDataFrame(users_data, schema=users_schema)
users_df.write..mode("overwrite").saveAsTable(f"{catalog}.{schema}.source_users")


# Create table containing clickstream (i.e. user activities)
from pyspark.sql.types import TimestampType


clicks_schema = StructType([
   StructField('click_id', IntegerType(), False),
   StructField('user_id', IntegerType(), True),
   StructField('url_clicked', StringType(), True),
   StructField('click_timestamp', StringType(), True),
   StructField('device_type', StringType(), True),
   StructField('ip_address', StringType(), True)
])


clicks_data = [
   (1001, 1, 'https://example.com/home', '2021-06-01T12:00:00', 'mobile', '192.168.1.1'),
   (1002, 1, 'https://example.com/about', '2021-06-01T12:05:00', 'desktop', '192.168.1.1'),
   (1003, 2, 'https://example.com/contact', '2021-06-02T14:00:00', 'tablet', '192.168.1.2'),
   (1004, 3, 'https://example.com/products', '2021-06-03T16:30:00', 'mobile', '192.168.1.3'),
   (1005, 4, 'https://example.com/services', '2021-06-04T10:15:00', 'desktop', '192.168.1.4'),
   (1006, 5, 'https://example.com/blog', '2021-06-05T09:45:00', 'tablet', '192.168.1.5')
]


clicks_df = spark.createDataFrame(clicks_data, schema=clicks_schema)
clicks_df.write.format("delta").mode("overwrite").saveAsTable(f"{catalog}.{schema}.source_clicks")

Etapa 2: Criar um pipeline que processe dados de PII

O código a seguir cria camadas de bronze, prata e ouro da arquitetura do medalhão mostrada acima.

import dlt
from pyspark.sql.functions import col, concat_ws, count, countDistinct, avg, when, expr


catalog = "users"
schema = "name"


# ----------------------------
# Bronze Layer - Raw Data Ingestion
# ----------------------------


@dlt.table(
   name=f"{catalog}.{schema}.users_bronze",
   comment='Raw users data loaded from source'
)
def users_bronze():
   return (
     spark.readStream.table(f"{catalog}.{schema}.source_users")
   )


@dlt.table(
   name=f"{catalog}.{schema}.clicks_bronze",
   comment='Raw clicks data loaded from source'
)
def clicks_bronze():
   return (
       spark.readStream.table(f"{catalog}.{schema}.source_clicks")
   )


# ----------------------------
# Silver Layer - Data Cleaning and Enrichment
# ----------------------------


@dlt.table(
   name=f"{catalog}.{schema}.users_silver",
   comment='Cleaned and standardized users data'
)
@dlt.expect_or_drop('valid_email', "email IS NOT NULL")
def users_silver():
   return (
       spark.readStream
           .table(f"{catalog}.{schema}.users_bronze")
           .withColumn('registration_date', col('registration_date').cast('timestamp'))
           .dropDuplicates(['user_id', 'registration_date'])
           .select('user_id', 'username', 'email', 'registration_date', 'user_preferences')
   )


@dlt.table(
   name=f"{catalog}.{schema}.clicks_silver",
   comment='Cleaned and standardized clicks data'
)
@dlt.expect_or_drop('valid_click_timestamp', "click_timestamp IS NOT NULL")
def clicks_silver():
   return (
       spark.readStream
           .table(f"{catalog}.{schema}.clicks_bronze")
           .withColumn('click_timestamp', col('click_timestamp').cast('timestamp'))
           .withWatermark('click_timestamp', '10 minutes')
           .dropDuplicates(['click_id'])
           .select('click_id', 'user_id', 'url_clicked', 'click_timestamp', 'device_type', 'ip_address')
   )


@dlt.table(
   name=f"{catalog}.{schema}.user_clicks_silver",
   comment='Joined users and clicks data on user_id'
)
def user_clicks_silver():
   # Read users_silver as a static DataFrame
   users = spark.read.table(f"{catalog}.{schema}.users_silver")

   # Read clicks_silver as a streaming DataFrame
   clicks = spark.readStream \
       .table('clicks_silver')

   # Perform the join
   joined_df = clicks.join(users, on='user_id', how='inner')

   return joined_df


# ----------------------------
# Gold Layer - Aggregated and Business-Level Data
# ----------------------------


@dlt.table(
   name=f"{catalog}.{schema}.user_behavior_gold",
   comment='Aggregated user behavior metrics'
)
def user_behavior_gold():
   df = spark.read.table(f"{catalog}.{schema}.user_clicks_silver")
   return (
       df.groupBy('user_id')
         .agg(
             count('click_id').alias('total_clicks'),
             countDistinct('url_clicked').alias('unique_urls')
         )
   )


@dlt.table(
   name=f"{catalog}.{schema}.marketing_insights_gold",
   comment='User segments for marketing insights'
)
def marketing_insights_gold():
   df = spark.read.table(f"{catalog}.{schema}.user_behavior_gold")
   return (
       df.withColumn(
           'engagement_segment',
           when(col('total_clicks') >= 100, 'High Engagement')
           .when((col('total_clicks') >= 50) & (col('total_clicks') < 100), 'Medium Engagement')
           .otherwise('Low Engagement')
       )
   )

Etapa 3: excluir dados nas tabelas de origem

Nesta etapa, você exclui dados em todas as tabelas em que as PII são encontradas.

catalog = "users"
schema = "name"


def apply_gdpr_delete(user_id):
 tables_with_pii = ["clicks_bronze", "users_bronze", "clicks_silver", "users_silver", "user_clicks_silver"]


 for table in tables_with_pii:
   print(f"Deleting user_id {user_id} from table {table}")
   spark.sql(f"""
     DELETE FROM {catalog}.{schema}.{table}
     WHERE user_id = {user_id}
   """)

Etapa 4: Adicionar skipChangeCommits às definições das tabelas de transmissão afetadas

Nesta etapa, o senhor deve informar ao Delta Live Tables para ignorar as linhas não anexadas. Adicione a opção skipChangeCommits aos métodos a seguir. O senhor não precisa atualizar as definições da visualização materializada, pois elas tratam automaticamente das atualizações e exclusões:

  • users_bronze

  • users_silver

  • clicks_bronze

  • clicks_silver

  • user_clicks_silver

O código a seguir mostra como atualizar o método users_bronze:

def users_bronze():
   return (
     spark.readStream.option('skipChangeCommits', 'true').table(f"{catalog}.{schema}.source_users")
   )

Quando o senhor executar o site pipeline novamente, ele será atualizado com sucesso.