Entre em uma tabela do Delta Lake usando a mesclagem

Você pode atualizar dados de uma tabela de origem, exibição ou DataFrame em uma tabela Delta de destino usando a operação SQL MERGE. O Delta Lake oferece suporte a inserções, atualizações e exclusões em MERGEe oferece suporte à sintaxe estendida além dos padrões SQL para facilitar casos de uso avançados.

Suponha que você tenha uma tabela de origem denominada people10mupdates ou um caminho de origem no /tmp/delta/people-10m-updates que contém novos dados para uma tabela de destino denominada people10m ou um caminho de destino no /tmp/delta/people-10m. Alguns desses novos registros podem já estar presentes nos dados de destino. Para mesclar os novos dados, você deseja atualizar as linhas onde o id da pessoa já está presente e inserir as novas linhas onde nenhum id correspondente está presente. Você pode executar a seguinte consulta:

MERGE INTO people10m
USING people10mupdates
ON people10m.id = people10mupdates.id
WHEN MATCHED THEN
  UPDATE SET
    id = people10mupdates.id,
    firstName = people10mupdates.firstName,
    middleName = people10mupdates.middleName,
    lastName = people10mupdates.lastName,
    gender = people10mupdates.gender,
    birthDate = people10mupdates.birthDate,
    ssn = people10mupdates.ssn,
    salary = people10mupdates.salary
WHEN NOT MATCHED
  THEN INSERT (
    id,
    firstName,
    middleName,
    lastName,
    gender,
    birthDate,
    ssn,
    salary
  )
  VALUES (
    people10mupdates.id,
    people10mupdates.firstName,
    people10mupdates.middleName,
    people10mupdates.lastName,
    people10mupdates.gender,
    people10mupdates.birthDate,
    people10mupdates.ssn,
    people10mupdates.salary
  )
from delta.tables import *

deltaTablePeople = DeltaTable.forPath(spark, '/tmp/delta/people-10m')
deltaTablePeopleUpdates = DeltaTable.forPath(spark, '/tmp/delta/people-10m-updates')

dfUpdates = deltaTablePeopleUpdates.toDF()

deltaTablePeople.alias('people') \
  .merge(
    dfUpdates.alias('updates'),
    'people.id = updates.id'
  ) \
  .whenMatchedUpdate(set =
    {
      "id": "updates.id",
      "firstName": "updates.firstName",
      "middleName": "updates.middleName",
      "lastName": "updates.lastName",
      "gender": "updates.gender",
      "birthDate": "updates.birthDate",
      "ssn": "updates.ssn",
      "salary": "updates.salary"
    }
  ) \
  .whenNotMatchedInsert(values =
    {
      "id": "updates.id",
      "firstName": "updates.firstName",
      "middleName": "updates.middleName",
      "lastName": "updates.lastName",
      "gender": "updates.gender",
      "birthDate": "updates.birthDate",
      "ssn": "updates.ssn",
      "salary": "updates.salary"
    }
  ) \
  .execute()
import io.delta.tables._
import org.apache.spark.sql.functions._

val deltaTablePeople = DeltaTable.forPath(spark, "/tmp/delta/people-10m")
val deltaTablePeopleUpdates = DeltaTable.forPath(spark, "tmp/delta/people-10m-updates")
val dfUpdates = deltaTablePeopleUpdates.toDF()

deltaTablePeople
  .as("people")
  .merge(
    dfUpdates.as("updates"),
    "people.id = updates.id")
  .whenMatched
  .updateExpr(
    Map(
      "id" -> "updates.id",
      "firstName" -> "updates.firstName",
      "middleName" -> "updates.middleName",
      "lastName" -> "updates.lastName",
      "gender" -> "updates.gender",
      "birthDate" -> "updates.birthDate",
      "ssn" -> "updates.ssn",
      "salary" -> "updates.salary"
    ))
  .whenNotMatched
  .insertExpr(
    Map(
      "id" -> "updates.id",
      "firstName" -> "updates.firstName",
      "middleName" -> "updates.middleName",
      "lastName" -> "updates.lastName",
      "gender" -> "updates.gender",
      "birthDate" -> "updates.birthDate",
      "ssn" -> "updates.ssn",
      "salary" -> "updates.salary"
    ))
  .execute()

Consulte a documentação da API Delta Lake para obter detalhes de sintaxe Scala e Python. Para obter detalhes de sintaxe SQL, consulte MERGE INTO

Modificar todas as linhas não correspondentes usando o merge

Em Databricks SQL e Databricks Runtime 12.2 LTS e acima, é possível usar a cláusula WHEN NOT MATCHED BY SOURCE para UPDATE ou DELETE registros na tabela de destino que não tenham registros correspondentes na tabela de origem. A Databricks recomenda adicionar uma cláusula condicional opcional para evitar a reescrita completa da tabela de destino.

O exemplo de código a seguir mostra a sintaxe básica de uso para exclusões, substituindo a tabela de destino pelo conteúdo da tabela de origem e excluindo registros não correspondentes na tabela de destino. Para obter um padrão mais escalável para tabelas em que as atualizações e exclusões de origem são vinculadas ao tempo, consulte Sincronizar tabela Delta de forma incremental com source.

(targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .whenNotMatchedBySourceDelete()
  .execute()
)
targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatched()
  .updateAll()
  .whenNotMatched()
  .insertAll()
  .whenNotMatchedBySource()
  .delete()
  .execute()
MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
  UPDATE SET *
WHEN NOT MATCHED THEN
  INSERT *
WHEN NOT MATCHED BY SOURCE THEN
  DELETE

O exemplo seguinte adiciona condições para a cláusula WHEN NOT MATCHED BY SOURCE e especifica valores para atualizar em linhas de destino sem correspondência.

(targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatchedUpdate(
    set = {"target.lastSeen": "source.timestamp"}
  )
  .whenNotMatchedInsert(
    values = {
      "target.key": "source.key",
      "target.lastSeen": "source.timestamp",
      "target.status": "'active'"
    }
  )
  .whenNotMatchedBySourceUpdate(
    condition="target.lastSeen >= (current_date() - INTERVAL '5' DAY)",
    set = {"target.status": "'inactive'"}
  )
  .execute()
)
targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatched()
  .updateExpr(Map("target.lastSeen" -> "source.timestamp"))
  .whenNotMatched()
  .insertExpr(Map(
    "target.key" -> "source.key",
    "target.lastSeen" -> "source.timestamp",
    "target.status" -> "'active'",
    )
  )
  .whenNotMatchedBySource("target.lastSeen >= (current_date() - INTERVAL '5' DAY)")
  .updateExpr(Map("target.status" -> "'inactive'"))
  .execute()
MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
  UPDATE SET target.lastSeen = source.timestamp
WHEN NOT MATCHED THEN
  INSERT (key, lastSeen, status) VALUES (source.key,  source.timestamp, 'active')
WHEN NOT MATCHED BY SOURCE AND target.lastSeen >= (current_date() - INTERVAL '5' DAY) THEN
  UPDATE SET target.status = 'inactive'

Semântica de operação de fusão

A seguir está uma descrição detalhada da semântica da operação merge programática.

  • Pode haver qualquer número de cláusulas whenMatched e whenNotMatched .

  • whenMatched as cláusulas são acionadas quando uma linha da fonte coincide com uma linha da tabela de destino com base na condição de correspondência.Estas cláusulas têm a seguinte semântica.

    • whenMatched as cláusulas podem ter no máximo uma ação update e uma delete . A ação update em merge atualiza apenas as colunas especificadas (semelhantes às update operações) da linha de destino correspondente. A ação delete exclui a linha correspondente.

    • Cada cláusula whenMatched pode ter uma condição opcional. Se essa condição de cláusula existir, a ação update ou delete será executada para qualquer par de linhas origem-destino correspondente somente quando a condição de cláusula for verdadeira.

    • Se houver várias cláusulas whenMatched, elas serão avaliadas na ordem em que foram especificadas. Todas as whenMatched cláusulas, exceto a última, devem ter condições.

    • Se nenhuma das condições whenMatched for avaliada como verdadeira para um par de linhas de origem e destino que corresponda à condição de merge, a linha de destino será deixada inalterada.

    • Para atualizar todas as colunas da tabela Delta de destino com as colunas correspondentes do dataset de origem, utilize whenMatched(...).updateAll(). Isso é equivalente a:

      whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
      

      para todas as colunas da tabela Delta de destino. Portanto, essa ação pressupõe que a tabela de origem tenha as mesmas colunas que as da tabela de destino, caso contrário, a consulta mostrará um erro de análise.

      Observação

      Esse comportamento muda quando a migração automática de esquema é habilitada. Consulte a evolução automática do esquema para obter detalhes.

  • whenNotMatched as cláusulas são executadas quando uma linha de origem não corresponde a nenhuma linha de destino com base na condição de correspondência. Estas cláusulas têm a seguinte semântica.

    • whenNotMatched as cláusulas podem ter apenas a ação insert . A nova linha é gerada com base na coluna especificada e expressões correspondentes. Você não precisa especificar todas as colunas na tabela de destino. Para colunas de destino não especificadas, NULL é inserido.

    • Cada cláusula whenNotMatched pode ter uma condição opcional. Se a condição da cláusula estiver presente, uma linha de origem será inserida somente se essa condição for verdadeira para essa linha. Caso contrário, a coluna de origem será ignorada.

    • Se houver várias cláusulas whenNotMatched, elas serão avaliadas na ordem em que foram especificadas. Todas as whenNotMatched cláusulas, exceto a última, devem ter condições.

    • Para inserir todas as colunas da tabela Delta de destino com as colunas correspondentes do dataset de origem, utilize whenNotMatched(...).insertAll(). Isso é equivalente a:

      whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
      

      para todas as colunas da tabela Delta de destino. Portanto, essa ação pressupõe que a tabela de origem tenha as mesmas colunas que as da tabela de destino, caso contrário, a consulta mostrará um erro de análise.

      Observação

      Esse comportamento muda quando a migração automática de esquema é habilitada. Consulte a evolução automática do esquema para obter detalhes.

  • whenNotMatchedBySource as cláusulas são executadas quando uma linha de destino não corresponde a nenhuma linha de origem com base na condição de mesclagem. Estas cláusulas têm a seguinte semântica.

    • whenNotMatchedBySource as cláusulas podem especificar ações delete e update .

    • Cada cláusula whenNotMatchedBySource pode ter uma condição opcional. Se a condição da cláusula estiver presente, uma linha de destino será modificada somente se essa condição for verdadeira para essa linha. Caso contrário, a linha de destino permanece inalterada.

    • Se houver várias cláusulas whenNotMatchedBySource, elas serão avaliadas na ordem em que foram especificadas. Todas as whenNotMatchedBySource cláusulas, exceto a última, devem ter condições.

    • Por definição, whenNotMatchedBySource cláusulas não têm uma linha de origem da qual extrair valores de coluna e, portanto, as colunas de origem não podem ser referenciadas. Para cada coluna a ser modificada, você pode especificar um literal ou executar uma ação na coluna de destino, como SET target.deleted_count = target.deleted_count + 1.

Importante

  • Uma operação merge pode falhar se múltiplas linhas do dataset de origem corresponderem e a mesclagem tentar atualizar as mesmas linhas da tabela Delta de destino. De acordo com a semântica SQL de mesclagem, essa operação de atualização é ambígua, pois não está claro qual linha de origem deve ser usada para atualizar a linha de destino correspondente. É possível pré-processar a tabela de origem para eliminar a possibilidade de várias correspondências.

  • Você pode aplicar uma operação SQL MERGE em um SQL VIEW somente se a visualização tiver sido definida como CREATE VIEW viewName AS SELECT * FROM deltaTable.

Eliminação de duplicações de dados ao escrever em tabelas Delta

Um caso de uso comum de ETL é coletar logs na tabela Delta anexando-os a uma tabela. No entanto, muitas vezes as fontes podem gerar registros logs duplicados e etapas de duplicação downstream são necessárias para cuidar deles. Com merge, você pode evitar a inserção de registros duplicados.

MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
  THEN INSERT *
deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId") \
  .whenNotMatchedInsertAll() \
  .execute()
deltaTable
  .as("logs")
  .merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute()
deltaTable
  .as("logs")
  .merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute();

Observação

O dataset que contém os novos logs precisa ser duplicação dentro de si mesmo. Pela semântica SQL de mesclagem, ele corresponde e elimina a duplicação dos novos dados com os dados existentes na tabela, mas se houver dados duplicados no novo dataset, eles serão inseridos. Portanto, elimine a duplicação dos novos dados antes de mesclar na tabela.

Se você sabe que poderá obter registros duplicados apenas por alguns dias, poderá otimizar ainda mais sua query particionando a tabela por data e, em seguida, especificando o intervalo de datas da tabela de destino para correspondência.

MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
  THEN INSERT *
deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") \
  .whenNotMatchedInsertAll("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS") \
  .execute()
deltaTable.as("logs").merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
  .whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
  .insertAll()
  .execute()
deltaTable.as("logs").merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
  .whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
  .insertAll()
  .execute();

Isso é mais eficiente do que o comando anterior, pois procura duplicatas apenas nos últimos 7 dias de logs, não na tabela inteira. Além disso, você pode usar essa mesclagem somente inserção com a transmissão estruturada para executar a eliminação contínua de duplicação dos logs.

  • Em uma query transmitida, você pode usar operações merge em foreachBatch para gravar continuamente quaisquer dados transmitidos em uma tabela Delta com desduplicação. Veja o seguinte exemplo de transmissão para mais informações sobre foreachBatch.

  • Em outra consulta de streaming, você pode ler continuamente os dados duplicação dessa tabela Delta. Isso é possível porque um merge somente de inserção acrescenta apenas novos dados à tabela Delta.

Dados de alteração lenta (SCD) e captura de dados de alterações (CDC) (CDC) com Delta Lake

Delta Live Tables tem suporte nativo para acompanhamento e aplicação de SCD Tipo 1 e Tipo 2. Use APPLY CHANGES INTO com Delta Live Tables para garantir que os registros fora de ordem sejam tratados corretamente ao processar feeds CDC. Consulte Captura simplificada de dados de alterações (CDC) com a API APPLY CHANGES em Delta Live Tables.

Sincronizar incrementalmente a tabela Delta com a origem

Em Databricks SQL e Databricks Runtime 12.2 LTS e acima, o senhor pode usar WHEN NOT MATCHED BY SOURCE para criar condições arbitrárias para excluir e substituir atomicamente uma parte de uma tabela. Isso pode ser especialmente útil quando se tem uma tabela de origem em que os registros podem ser alterados ou excluídos por vários dias após a entrada inicial de dados, mas acabam chegando a um estado final.

A consulta a seguir mostra o uso desse padrão para selecionar 5 dias de registros da origem, atualizar os registros correspondentes no destino, inserir novos registros da origem no destino e excluir todos os registros não correspondentes dos últimos 5 dias no destino.

MERGE INTO target AS t
USING (SELECT * FROM source WHERE created_at >= (current_date() - INTERVAL '5' DAY)) AS s
ON t.key = s.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
WHEN NOT MATCHED BY SOURCE AND created_at >= (current_date() - INTERVAL '5' DAY) THEN DELETE

Ao fornecer o mesmo filtro booleano nas tabelas de origem e destino, você pode propagar dinamicamente as alterações de sua origem para as tabelas de destino, incluindo exclusões.

Observação

Embora esse padrão possa ser usado sem nenhuma cláusula condicional, isso levaria à reescrita completa da tabela de destino, o que pode ser caro.