Pular para o conteúdo principal

Recuperar um pipeline declarativo LakeFlow de uma falha no ponto de verificação da transmissão

Esta página descreve como recuperar um pipeline no pipeline LakeFlow Declarative quando um ponto de verificação de transmissão se torna inválido ou corrompido.

O que é um ponto de controle de transmissão?

Em Apache Spark transmissão estruturada, um ponto de verificação é um mecanismo usado para manter o estado de uma consulta de transmissão. Esse estado inclui:

  • Informações sobre o progresso : Quais offsets da fonte foram processados.
  • Estado intermediário : Dados que precisam ser mantidos em micro-lotes para operações com estado (por exemplo, agregações, mapGroupsWithState).
  • Metadados : informações sobre a execução da consulta de transmissão.

Os pontos de verificação são essenciais para garantir a tolerância a falhas e a consistência dos dados em aplicativos de transmissão:

  • Tolerância a falhas : Se um aplicativo de transmissão falhar (por exemplo, devido a uma falha de nó, falha do aplicativo), o ponto de verificação permite que o aplicativo reinicie a partir do último estado de ponto de verificação bem-sucedido, em vez de reprocessar todos os dados desde o início. Isso evita a perda de dados e garante o processamento incremental.
  • Processamento exato : Para muitas fontes de transmissão, os pontos de verificação, em conjunto com sinks idempotentes, permitem que o processamento exatamente único garanta que cada registro seja processado exatamente uma vez, mesmo diante de falhas, evitando duplicatas ou omissões.
  • Gerenciamento de estado : Para transformações com estado, os pontos de verificação mantêm o estado interno dessas operações, permitindo que a consulta de transmissão continue processando corretamente os novos dados com base no estado histórico acumulado.

Pontos de verificação em LakeFlow Pipeline declarativo

LakeFlow O pipeline declarativo se baseia na transmissão estruturada e abstrai grande parte do gerenciamento de pontos de verificação subjacentes, oferecendo uma abordagem mais declarativa. Quando o senhor define uma tabela de transmissão em seu site pipeline, há um estado de ponto de verificação para cada fluxo que escreve na tabela de transmissão. Esses locais de ponto de verificação são internos ao pipeline e não são acessíveis aos usuários.

Normalmente, o senhor não precisa gerenciar ou entender os pontos de verificação subjacentes das tabelas de transmissão, exceto nos casos a seguir:

  • Retroceder e reproduzir : Se quiser reprocessar os dados a partir de um ponto específico no tempo, preservando o estado atual da tabela, o senhor deve redefinir o ponto de verificação da tabela de transmissão.
  • Recuperação de uma falha ou corrupção de ponto de verificação : Se uma consulta que estiver gravando na tabela de transmissão tiver falhado devido a erros relacionados ao ponto de verificação, isso causará uma falha grave e a consulta não poderá continuar. Há três abordagens que você pode usar para se recuperar dessa classe de falha:
    • Tabela completa refresh : Redefine a tabela e apaga os dados existentes.
    • Tabela completa refresh com backup e backfill : O senhor faz um backup da tabela antes de executar uma tabela completa refresh e preencher novamente os dados antigos, mas isso é muito caro e deve ser o último recurso.
    • Reset O senhor pode fazer o checkpoint e continuar de forma incremental: Se o senhor não puder se dar ao luxo de perder os dados existentes, deverá executar um checkpoint Reset seletivo para os fluxos de transmissão afetados.

Exemplo: Falha no pipeline devido à mudança de código

Considere um cenário em que o senhor tem um pipeline declarativo LakeFlow que processa um feed de dados de alteração junto com a tabela inicial Snapshot de um sistema de armazenamento cloud, como Amazon S3, e grava em uma tabela de transmissão SCD-1.

O site pipeline tem dois fluxos de transmissão:

  • customers_incremental_flow: Lê de forma incremental o feed CDC da tabela de origem customer, filtra os registros duplicados e os insere na tabela de destino.
  • customers_snapshot_flow: Leitura única do Snapshot inicial da tabela de origem customers e insere os registros na tabela de destino.

LakeFlow Pipeline declarativo CDC exemplo

Python
@dlt.view(name="customers_incremental_view")
def query():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.inferColumnTypes", "true")
.option("cloudFiles.includeExistingFiles", "true")
.load(customers_incremental_path)
.dropDuplicates(["customer_id"])
)

@dlt.view(name="customers_snapshot_view")
def full_orders_snapshot():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.includeExistingFiles", "true")
.option("cloudFiles.inferColumnTypes", "true")
.load(customers_snapshot_path)
.select("*")
)

dlt.create_streaming_table("customers")

dlt.create_auto_cdc_flow(
flow_name = "customers_incremental_flow",
target = "customers",
source = "customers_incremental_view",
keys = ["customer_id"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
apply_as_truncates = expr("operation = 'TRUNCATE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)
dlt.create_auto_cdc_flow(
flow_name = "customers_snapshot_flow",
target = "customers",
source = "customers_snapshot_view",
keys = ["customer_id"],
sequence_by = lit(0),
stored_as_scd_type = 1,
once = True
)

Após a implantação do site pipeline, ele será executado com sucesso e começará a processar o feed de dados de alteração e o Snapshot inicial.

Mais tarde, o senhor percebe que a lógica de deduplicação na consulta customers_incremental_view é redundante e causa um gargalo no desempenho. O senhor remove o dropDuplicates() para melhorar o desempenho:

Python
@dlt.view(name="customers_raw_view")
def query():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.inferColumnTypes", "true")
.option("cloudFiles.includeExistingFiles", "true")
.load()
# .dropDuplicates()
)

Depois de remover a API dropDuplicates() e reimplantar o pipeline, a atualização falha com o seguinte erro:

Streaming stateful operator name does not match with the operator in state metadata.
This is likely to happen when a user adds/removes/changes stateful operators of existing streaming query.
Stateful operators in the metadata: [(OperatorId: 0 -> OperatorName: dedupe)];
Stateful operators in current batch: []. SQLSTATE: 42K03 SQLSTATE: XXKST

Esse erro indica que a alteração não é permitida devido a uma incompatibilidade entre o estado do ponto de verificação e a definição da consulta atual, impedindo o avanço do pipeline.

As falhas relacionadas ao ponto de verificação podem ocorrer por vários motivos além da simples remoção da API dropDuplicates. Os cenários comuns incluem:

  • Adicionar ou remover operadores com estado (por exemplo, introduzir ou eliminar dropDuplicates() ou agregações) em uma consulta de transmissão existente.
  • Adição, remoção ou combinação de fontes de transmissão em uma consulta com checkpoint anterior (por exemplo, união de uma consulta de transmissão existente com uma nova, ou adição/remoção de fontes de uma união de operações existente).
  • Modificar o esquema de estado das operações de transmissão com estado (como alterar as colunas usadas para deduplicação ou agregação).

Para obter uma lista abrangente de alterações suportadas e não suportadas, consulte o guiaSpark transmissão estructurada e Tipos de alterações nas consultas de transmissão estructurada.

Opções de recuperação

Há três estratégias de recuperação, dependendo dos requisitos de durabilidade dos dados e das restrições de recursos do senhor:

Métodos

Complexidade

Custo

Possível perda de dados

Possível duplicação de dados

Requer Snapshot inicial

Tabela completa Reiniciar

Tabela completa refresh

Baixa

Médio

Sim (se nenhum Snapshot inicial estiver disponível ou se os arquivos brutos tiverem sido excluídos na origem).

Não (Para aplicar alterações na tabela de metas.)

Sim

Sim

Mesa completa refresh com apoio e preenchimento

Médio

Alta

Não

Não (Para sumidouros idempotentes. Por exemplo, CDC automático).

Não

Não

Reset ponto de controle da tabela

Médio-alto (Médio para fontes somente de acréscimo que fornecem compensações imutáveis).

Baixa

Não (requer uma análise cuidadosa.)

Não (Para escritores idempotentes. Por exemplo, CDC automático somente para a tabela de destino).

Não

Não

A complexidade média-alta depende do tipo de fonte de transmissão e da complexidade da consulta.

Recomendações

  • Use uma tabela completa refresh se não quiser lidar com a complexidade de um checkpoint Reset e puder recomputar a tabela inteira. Isso também lhe dará a opção de fazer alterações no código.
  • Use a tabela completa refresh com backup e backfill se o senhor não quiser lidar com a complexidade da redefinição de pontos de verificação e não se importar com o custo adicional de fazer um backup e backfilling do histórico de dados.
  • Use a opção Reset table checkpoint se o senhor precisar preservar os dados existentes na tabela e continuar processando novos dados de forma incremental. No entanto, essa abordagem requer um tratamento cuidadoso do checkpoint Reset para verificar se os dados existentes na tabela não são perdidos e se o site pipeline pode continuar processando novos dados.

Reset ponto de controle e continuar de forma incremental

Para redefinir o ponto de verificação e continuar o processamento de forma incremental, siga estes passos:

  1. Pare o pipeline: Certifique-se de que o pipeline não tenha atualizações ativas em execução.

  2. Determine a posição inicial do novo ponto de verificação: identifique o último deslocamento ou carimbo de data/hora bem-sucedido a partir do qual você deseja continuar o processamento. Normalmente, esse é o último deslocamento processado com sucesso antes da ocorrência da falha.

    No exemplo acima, como o senhor está lendo os arquivos JSON usando o autoloader, é possível usar a opção modifiedAfter para especificar a posição inicial do novo ponto de verificação. Essa opção permite que o senhor defina um registro de data e hora para quando o carregador automático deve começar a processar novos arquivos.

    Para fontes Kafka, o senhor pode usar a opção startingOffsets para especificar os offsets a partir dos quais a consulta de transmissão deve começar a processar novos dados.

    Para fontes Delta Lake, o senhor pode usar a opção startingVersion para especificar a versão a partir da qual a consulta de transmissão deve começar a processar novos dados.

  3. Fazer alterações no código: O senhor pode modificar a consulta de transmissão para remover o dropDuplicates() API ou fazer outras alterações. Além disso, verifique se você adicionou a opção modifiedAfter ao caminho de leitura do carregador automático.

    Python
    @dlt.view(name="customers_incremental_view")
    def query():
    return (
    spark.readStream.format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.inferColumnTypes", "true")
    .option("cloudFiles.includeExistingFiles", "true")
    .option("modifiedAfter", "2025-04-09T06:15:00")
    .load(customers_incremental_path)
    # .dropDuplicates(["customer_id"])
    )
nota

Fornecer um timestamp modifiedAfter incorreto pode levar à perda ou duplicação de dados. Verifique se o carimbo de data/hora está definido corretamente para evitar o processamento de dados antigos novamente ou a perda de novos dados.

Se a sua consulta tiver uma transmissão-transmissão join ou transmissão-transmissão sindical, o senhor deverá aplicar a estratégia acima para todas as fontes de transmissão participantes. Por exemplo:

Python
cdc_1 = spark.readStream.format("cloudFiles")...
cdc_2 = spark.readStream.format("cloudFiles")...
cdc_source = cdc_1..union(cdc_2)
  1. Identifique o(s) nome(s) do fluxo associado(s) à tabela de transmissão para a qual o senhor deseja redefinir o ponto de verificação. No exemplo, é customers_incremental_flow. O senhor pode encontrar o nome do fluxo no código pipeline ou verificando a interface do usuário pipeline, ou o evento pipeline logs.

  2. Reset o ponto de controle: Crie um Notebook Python e anexe-o a um Databricks cluster.

    O senhor precisará das seguintes informações para poder reiniciar o ponto de controle:

    • Databricks workspace URL
    • ID do pipeline
    • Nome(s) do fluxo para o qual o senhor está redefinindo o ponto de verificação
    Python
    import requests
    import json

    # Define your Databricks instance and pipeline ID
    databricks_instance = "<DATABRICKS_URL>"
    token = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get()
    pipeline_id = "<YOUR_PIPELINE_ID>"
    flows_to_reset = ["<YOUR_FLOW_NAME>"]
    # Set up the API endpoint
    endpoint = f"{databricks_instance}/api/2.0/pipelines/{pipeline_id}/updates"


    # Set up the request headers
    headers = {
    "Authorization": f"Bearer {token}",
    "Content-Type": "application/json"
    }

    # Define the payload
    payload = {
    "reset_checkpoint_selection": flows_to_reset
    }

    # Make the POST request
    response = requests.post(endpoint, headers=headers, data=json.dumps(payload))

    # Check the response
    if response.status_code == 200:
    print("Pipeline update started successfully.")
    else:
    print(f"Error: {response.status_code}, {response.text}")
  3. execução do pipeline: O pipeline começa a processar novos dados a partir da posição inicial especificada com um novo ponto de verificação, preservando os dados da tabela existente enquanto continua o processamento incremental.

Melhores práticas

  • Evite usar o recurso de visualização privada na produção.
  • Teste suas alterações antes de fazer alterações em seu ambiente de produção.
    • Crie um pipeline de teste (de preferência em um ambiente inferior). Se isso não for possível, tente usar um catálogo e um esquema diferentes para o teste.
    • Reproduza o erro.
    • Aplique as alterações.
    • Valide os resultados e tome uma decisão sobre go/no-go.
    • Implemente as alterações em seu pipeline de produção.