Recuperar-se de falhas de consulta de transmissão estruturada com fluxo de trabalho

a transmissão estruturada fornece tolerância a falhas e consistência de dados para query de transmissão; usando o fluxo de trabalho do Databricks, você pode configurar facilmente sua query estruturada transmitida para reiniciar automaticamente em caso de falha. Ao ativar o ponto de verificação para uma query transmitida, você pode reiniciar a query após uma falha. A query reiniciada continua de onde parou a que falhou.

Ativar ponto de verificação para queryestruturada transmitida

Databricks recomenda que você sempre especifique a opção checkpointLocation um caminho de armazenamento cloud antes de iniciar a query. Por exemplo:

streamingDataFrame.writeStream
  .format("parquet")
  .option("path", "/path/to/table")
  .option("checkpointLocation", "/path/to/table/_checkpoint")
  .start()

Esta localização do ponto de verificação preserva toda a informação essencial que identifica uma query. Cada query deve ter um local de ponto de verificação diferente. query múltiplas nunca devem ter o mesmo local. Para mais informações, consulte o guia de programação estruturada transmitida.

Observação

Embora checkpointLocation seja necessário para a maioria dos tipos de coletores de saída, alguns coletores, como o coletor de memória, podem gerar automaticamente um local de ponto de verificação temporário quando você não fornece checkpointLocation. Esses locais de ponto de verificação temporários não garantem nenhuma tolerância a falhas ou garantias de consistência de dados e podem não ser limpos adequadamente. Evite possíveis armadilhas sempre especificando um checkpointLocation.

Configure Job estruturado de transmissão para reiniciar query de transmissão em caso de falha

Você pode criar um Job Databricks com o Notebook ou JAR que possui sua query transmitida e configurá-lo para:

  • Sempre use novos clusters.

  • Sempre tente novamente em caso de falha.

Reiniciar automaticamente em caso de falha no Job é especialmente importante ao configurar cargas de trabalho de transmissão com evolução do esquema. A evolução do esquema funciona no Databricks gerando um erro esperado quando uma alteração de esquema é detectada e, em seguida, processando adequadamente o uso de dados do novo esquema quando o Job for reiniciado. O Databricks recomenda sempre configurar a tarefa de transmissão que contém query com evolução do esquema para reiniciar automaticamente no fluxo de trabalho do Databricks.

Os jobs possuem forte integração com APIs de transmissão estruturada e podem monitorar todas query de transmissão ativas em uma execução. Essa configuração garante que, se alguma parte da query falhar, Job encerrará automaticamente a execução (junto com todas as outras query) e iniciará uma nova execução em novos clusters. Isso reexecuta o código do Notebook ou JAR e reinicia toda a query novamente. Esta é a maneira mais segura de retornar ao bom estado.

Observação

  • A falha em qualquer query de transmissão ativa faz com que a execução ativa falhe e finalize todas as outras query de transmissão.

  • Você não precisa usar streamingQuery.awaitTermination() ou spark.streams.awaitAnyTermination() no final do seu Notebook. As tarefas impedem automaticamente que uma execução seja concluída quando uma query transmitida está ativa.

  • Databricks recomenda usar Job em vez de %run e dbutils.notebook.run() ao orquestrar a transmissão estructurada Notebook. Veja a execução de um Databricks Notebook de outro Notebook.

Veja a seguir um exemplo de uma configuração Job recomendada.

  • clusters: defina isso sempre para usar novos clusters e usar a versão mais recente do Spark (ou pelo menos a versão 2.1). query começar no Spark 2.1 e acima são recuperáveis após upgrades de versão query e Spark.

  • Notificações: defina isso se desejar notificação email sobre falhas.

  • programar: Não defina um programar.

  • Tempo limite: não defina um tempo limite. execução query de transmissão por um tempo indefinidamente longo.

  • Execução simultânea máxima: Definido como 1. Deve haver apenas uma instância de cada query simultaneamente ativa.

  • Novas tentativas: Defina como Ilimitado.

Consulte Criar e executar trabalhos do Databricks para entender essas configurações.

Recuperar após alterações em uma consulta estruturada transmitida

Existem limitações sobre quais alterações são permitidas em uma query de transmissão entre reinicializações do mesmo local de ponto de verificação. Aqui estão algumas alterações que não são permitidas ou o efeito da alteração não está bem definido. Para todos eles:

  • O termo permitido significa que você pode fazer a alteração especificada, mas se a semântica de seu efeito está bem definida depende da query e da alteração.

  • O termo não permitido significa que você não deve fazer a alteração especificada, pois a query reiniciada provavelmente falhará com erros imprevisíveis.

  • sdf representa um DataFrame/dataset gerado com sparkSession.readStream.

Tipos de alterações na queryestruturada transmitida

  • Mudanças no número ou tipo (ou seja, fonte diferente) de fontes de entrada: Isso não é permitido.

  • Mudanças nos parâmetros das fontes de entrada: Se isso é permitido e se a semântica da mudança está bem definida depende da fonte e da query. Aqui estão alguns exemplos.

    • Adição, exclusão e modificação de limites de taxa são permitidos:

      spark.readStream.format("kafka").option("subscribe", "article")
      

      para

      spark.readStream.format("kafka").option("subscribe", "article").option("maxOffsetsPerTrigger", ...)
      
    • Alterações em artigos e arquivos assinados geralmente não são permitidas, pois os resultados são imprevisíveis: spark.readStream.format("kafka").option("subscribe", "article") para spark.readStream.format("kafka").option("subscribe", "newarticle")

  • Alterações no intervalo de acionamento: você pode alterar os acionadores entre lotes incrementais e intervalos de tempo. Consulte Alterando intervalos de acionamento entre execuções.

  • Alterações no tipo de coletor de saída: São permitidas alterações entre algumas combinações específicas de coletores. Isso precisa ser verificado caso a caso. Aqui estão alguns exemplos.

    • O coletor de arquivo para o coletor Kafka é permitido. Kafka verá apenas os novos dados.

    • Kafka coletor para coletor de arquivo não é permitido.

    • Kafka coletor alterado para foreach, ou vice-versa é permitido.

  • Alterações nos parâmetros do coletor de saída: Se isso é permitido e se a semântica da alteração está bem definida depende do coletor e da query. Aqui estão alguns exemplos.

    • Alterações no diretório de saída de um coletor de arquivo não são permitidas: sdf.writeStream.format("parquet").option("path", "/somePath") para sdf.writeStream.format("parquet").option("path", "/anotherPath")

    • Alterações no tópico de saída são permitidas: sdf.writeStream.format("kafka").option("topic", "topic1") para sdf.writeStream.format("kafka").option("topic", "topic2")

    • Alterações no coletor foreach definido pelo usuário (ou seja, o código ForeachWriter ) são permitidas, mas a semântica da alteração depende do código.

  • Mudanças nas operações de projeção/filtro/mapa: Alguns casos são permitidos. Por exemplo:

    • A adição/exclusão de filtros é permitida: sdf.selectExpr("a") a sdf.where(...).selectExpr("a").filter(...).

    • Alterações nas projeções com o mesmo esquema de saída são permitidas: sdf.selectExpr("stringColumn AS json").writeStream a sdf.select(to_json(...).as("json")).writeStream.

    • Mudanças em projeções com diferentes esquemas de saída são permitidas condicionalmente: sdf.selectExpr("a").writeStream para sdf.selectExpr("b").writeStream é permitido somente se o coletor de saída permitir que o esquema mude de "a" para "b".

  • Mudanças em operações de estado: Algumas operações na query transmitida precisam manter dados de estado para atualizar continuamente o resultado. a transmissão estruturada verifica automaticamente os dados de estado para armazenamento tolerante a falhas (por exemplo, DBFS, AWS S3, armazenamento Azure Blob ) e os restaura após a reinicialização. No entanto, isso pressupõe que o esquema dos dados de estado permaneça o mesmo nas reinicializações. Isso significa que quaisquer alterações (ou seja, adições, exclusões ou modificações de esquema) nas operações de estado de uma query de transmissão não são permitidas entre as reinicializações. Aqui está a lista de operações com estado cujo esquema não deve ser alterado entre as reinicializações para garantir a recuperação do estado:

    • agregação de transmissão: Por exemplo, sdf.groupBy("a").agg(...). Qualquer alteração no número ou tipo de key de agrupamento ou agregados não é permitida.

    • desduplicação transmitida: Por exemplo, sdf.dropDuplicates("a"). Qualquer alteração no número ou tipo de key de agrupamento ou agregados não é permitida.

    • transmissão-transmissão join: Por exemplo, sdf1.join(sdf2, ...) (ou seja, ambas as entradas são geradas com sparkSession.readStream). Não são permitidas alterações no esquema ou nas colunas de junção por equivalência. Mudanças no tipo join (externa ou interna) não são permitidas. Outras alterações na condição join são mal definidas.

    • Operações arbitrárias com estado: Por exemplo, sdf.groupByKey(...).mapGroupsWithState(...) ou sdf.groupByKey(...).flatMapGroupsWithState(...). Qualquer alteração no esquema do estado definido pelo usuário e no tipo de tempo limite não é permitida. Qualquer alteração dentro da função de mapeamento de estado definida pelo usuário é permitida, mas o efeito semântico da alteração depende da lógica definida pelo usuário. Se você realmente deseja oferecer suporte a alterações de esquema de estado, pode codificar/decodificar explicitamente suas estruturas de dados de estado complexas em bytes usando um esquema de codificação/decodificação que oferece suporte à migração de esquema. Por exemplo, se você salvar seu estado como bytes codificados em Avro, poderá alterar o esquema de estado Avro entre as reinicializações query , pois isso restaura o estado binário.