Pular para o conteúdo principal

Acionar o Job quando novos arquivos chegarem

O senhor pode usar acionadores de chegada de arquivos para acionar a execução do seu trabalho quando novos arquivos chegarem a um local externo, como Amazon S3, Azure storage ou Google Cloud Storage. Esse recurso é útil quando a eficiência de um trabalho programado é comprometida pela chegada irregular de novos dados.

Como os acionadores de chegada de arquivos funcionam

Os acionadores de chegada de arquivos se esforçam ao máximo para verificar se há novos arquivos a cada minuto, embora isso possa ser afetado pelo desempenho do armazenamento em nuvem subjacente. Os gatilhos de chegada de arquivos não geram custos adicionais além dos custos do provedor de nuvem associados à listagem de arquivos no local de armazenamento.

Um acionador de chegada de arquivo pode ser configurado para monitorar a raiz de um local ou volume externo do Unity Catalog ou um subcaminho de um local ou volume externo. Por exemplo, para o volume do Unity Catalog /Volumes/mycatalog/myschema/myvolume/, os caminhos a seguir são válidos para um acionador de chegada de arquivo:

/Volumes/mycatalog/myschema/myvolume/
/Volumes/mycatalog/myschema/myvolume/mydirectory/

Um gatilho de chegada de arquivo verifica recursivamente novos arquivos em todos os subdiretórios do local configurado. Por exemplo, você cria um gatilho de chegada de arquivo para o local /Volumes/mycatalog/myschema/myvolume/mydirectory/ e esse local tem os seguintes subdiretórios:

/Volumes/mycatalog/myschema/myvolume/mydirectory/subdirA
/Volumes/mycatalog/myschema/myvolume/mydirectory/subdirB
/Volumes/mycatalog/myschema/myvolume/mydirectory/subdirC/subdirD

O acionador verifica se há novos arquivos em mydirectory, subdirA, subdirB, subdirC e subdirC/subdirD.

A chegada de arquivos é acionada com eventos de arquivo

Para obter o melhor desempenho, o local externo deve ser ativado para eventos de arquivo. Quando os eventos de arquivo são ativados para um local externo, a Databricks usa um serviço interno para rastrear metadados de ingestão processando notificações de alteração de provedores de nuvem. Esse serviço retém os metadados dos arquivos mais recentes criados ou atualizados durante um período de retenção contínuo determinado pelo serviço, aumentando a eficiência do processamento de arquivos.

Poucos minutos após a ativação de eventos de arquivo em um local externo, os acionadores de chegada de arquivo existentes que monitoram os caminhos cobertos por esse local externo começam a se beneficiar da ativação de eventos de arquivo, e os novos acionadores se beneficiam em segundos.

Para obter mais informações sobre o desempenho e as vantagens de capacidade de eventos de arquivo em locais externos, consulte Limitações. Para perguntas frequentes sobre eventos de arquivo, consulte as Perguntas frequentes sobre eventos de arquivo.

Antes de começar

O seguinte é necessário para usar os gatilhos de chegada de arquivos:

Adicionar um gatilho de chegada de arquivo

Para adicionar um acionador de chegada de arquivo a um trabalho:

  1. Na barra lateral do site Databricks workspace, clique em Jobs & pipeline .
  2. Opcionalmente, selecione os filtros Empregos e de minha propriedade .
  3. Clique no link Nome do seu trabalho.
  4. No painel Detalhes do Job à direita, clique em Adicionar acionador .
  5. Em Tipo de acionador , selecione Chegada do arquivo .
  6. Em Storage location (Local de armazenamento) , digite a URL da raiz ou um subcaminho de um local externo do Unity Catalog ou a raiz ou um subcaminho de um volume do Unity Catalog a ser monitorado.
  7. (Opcional) Configure opções avançadas ( Tempo mínimo entre acionadores em segundos e Esperar após a última alteração em segundos ) para controlar a frequência com que as execuções são acionadas. Para exemplos de configuração, consulte Controlar a frequência com que as execuções são acionadas.
  8. Para validar a configuração, clique em Testar conexão .
  9. Clique em Salvar .

Para editar, pausar ou remover este acionador posteriormente, use a seção Agendamentos e Acionadores do painel Detalhes do Job . Consulte Gerenciar um acionador existente.

Controlar a frequência com que as execuções são acionadas

Duas opções avançadas em um gatilho de chegada de arquivo controlam como as chegadas de arquivo se traduzem em execuções de Job. Essas opções aplicam dois padrões comuns de controle de taxa: **cooldown** e **debouncing**:

  • **Tempo mínimo entre gatilhos em segundos**: Limita o Job a, no máximo, uma execução por este intervalo (um período de resfriamento entre execuções). Após a conclusão de uma execução, os arquivos que chegam durante o período de resfriamento não começam uma nova execução até que o intervalo termine. Use esta opção para limitar a frequência com que as execuções são criadas, para que chegadas frequentes não criem execuções consecutivas.
  • Tempo de espera após a última alteração em segundos : Espera este período após a chegada do arquivo mais recente antes de iniciar uma execução, e cada nova chegada aciona um Reset no temporizador (debouncing). Use esta opção quando os arquivos chegarem em lotes e você quiser processar o lote inteiro em uma única execução depois que todos os arquivos tiverem chegado.

É possível definir cada opção individualmente ou definir ambas em conjunto. Veja os exemplos a seguir.

Executar no máximo a cada 15 minutos

Para criar execuções à medida que os arquivos chegam, mas não com mais frequência do que a cada 15 minutos, defina a seguinte opção avançada:

  • Tempo mínimo entre triggers em segundos : 900

Após cada execução ser concluída, o acionador aguarda 900 segundos (15 minutos) antes de começar outra execução, mesmo que os arquivos continuem chegando. Isso limita a criação de execuções a no máximo uma execução a cada 15 minutos.

Aguardar a chegada de um lote completo

Quando os arquivos chegam em lotes e você deseja processar cada lote em uma única execução, defina Tempo de espera após a última alteração em segundos para um valor menor que o intervalo entre os lotes, mas maior que o intervalo entre os arquivos dentro de um lote. Por exemplo, se um novo lote começa a cada 5 minutos, defina a seguinte opção avançada:

  • Esperar após a última alteração em segundos : 60

Cada novo arquivo Reset o temporizador, então o gatilho começa uma execução somente depois que 60 segundos se passam sem novas chegadas. Esta configuração assume que os arquivos dentro de um lote chegam com 60 segundos de diferença entre si, para que o temporizador não expire no meio de um lote, e que os lotes estão separados por mais de 60 segundos, para que lotes consecutivos não façam merge em uma única execução.

Limite a frequência e aguarde por lotes completos

Você pode combinar as duas opções quando quiser limitar a frequência com que as execuções são criadas e também evitar iniciar uma execução no meio de um lote. Por exemplo:

  • Tempo mínimo entre triggers em segundos : 900
  • Esperar após a última alteração em segundos : 60

Com esta configuração, o gatilho aguarda até que lotes concluam o envio (60 segundos sem novos arquivos) antes de começar uma execução, e não começa mais de uma execução a cada 15 minutos.

Descubra e processe os arquivos assim que chegarem.

Para processar arquivos que acionaram gatilhos de chegada de arquivos, você pode usar o Auto Loader. O Auto Loader processa novos arquivos de forma incremental e eficiente, com garantia de processamento exatamente uma vez. Por exemplo, use o trecho de código abaixo para carregar arquivos em uma tabela Delta.

Para usar esta solução, crie um Job com um gatilho de chegada de arquivo e adicione um Notebook contendo o código abaixo. Substitua cada marcador [REPLACE] pelo valor apropriado.

Python
# Configuration
file_location = "[REPLACE]" # The same URL configured for the file arrival trigger.
checkpoint_location = "[REPLACE]" # a separate URL (outside `file_location`) used to store the Auto Loader checkpoint, which enables exactly-once processing.
sink_table = "[REPLACE]" # Delta table to write to

# Use Auto Loader to discover new files.
# Do not modify code below this line
streamingQuery = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
.option("cloudFiles.schemaLocation", checkpoint_location) \
.option("cloudFiles.useManagedFileEvents","true") \
.load(file_location) \
.writeStream \
.option("checkpointLocation", checkpoint_location) \
.trigger(availableNow = True) \
.toTable(sink_table)

Se você precisar processar novos arquivos com lógica personalizada e quiser apenas descobrir a URL dos novos arquivos, poderá usar foreachBatch em vez disso, conforme mostrado no trecho de código abaixo. Note que foreachBatch fornece apenas garantias de processamento pelo menos uma vez. Para obter mais informações sobre como usar foreachBatch, consulte Usar foreachBatch para gravar em destinos de dados arbitrários.

Python
# Configuration
file_location = "[REPLACE]" # The same URL configured for the file arrival trigger.
checkpoint_location = "[REPLACE]" # a separate URL (outside `file_location`) used to store the Auto Loader checkpoint, which enables exactly-once processing.

def process_batch(batch_df, batch_id):
file_url = batch_df.select("path").collect()[0].path
# [REPLACE] Your custom function for processing newly arrived files


# Use Auto Loader to discover new files.
# Do not modify code below this line
streamingQuery = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "binaryFile") \
.option("cloudFiles.useManagedFileEvents","true") \
.load(file_location) \
.drop("content") \
.writeStream \
.foreachBatch(process_batch) \
.option("checkpointLocation", checkpoint_location) \
.trigger(availableNow = True) \
.start()

Receba notificações de gatilhos de falha na chegada de arquivos

Para ser notificado se um acionador de chegada de arquivo falhar na avaliação, configure o site email ou as notificações de destino do sistema sobre falha de trabalho. Consulte Adicionar notificações em um trabalho.

Limitações

  • Somente novos arquivos acionam a execução. A substituição de um arquivo existente por um arquivo com o mesmo nome não aciona uma execução.

  • O caminho usado para um acionador de chegada de arquivo não deve conter tabelas externas ou locais gerenciais de catálogos e esquemas.

  • O caminho usado para um acionador de chegada de arquivo não pode conter curingas, por exemplo, * ou ?.

  • Se o local de armazenamento estiver configurado como um local externo no Unity Catalog e esse local externo estiver ativado para eventos de arquivo:

    • Não há limites para o número de arquivos no local de armazenamento.

    • Os gatilhos podem apresentar erros de tempo limite quando há muitas atualizações de arquivos desnecessárias.

      Quando um gatilho de chegada de arquivo é definido em um subcaminho de um local externo ou volume Unity Catalog , alterações fora desse subcaminho, como na raiz do local externo, podem aumentar os metadados que o gatilho precisa processar. Em ambientes com alta volatilidade, isso pode fazer com que o gatilho exceda seu limite de tempo de processamento e entre em estado de erro.

      Para evitar isso, crie um volume Unity Catalog que mapeie especificamente o subdiretório que você deseja monitorar e defina o gatilho de chegada de arquivos na raiz desse volume. Essa abordagem isola o caminho de destino como a raiz efetiva do gatilho, reduzindo alterações não relacionadas no nível raiz e impedindo que o gatilho entre em estado de erro.

    • Se um arquivo existente for modificado e seus metadados estiverem fora do período de retenção contínua, essa modificação é tratada como uma nova chegada de arquivo, acionando uma execução de Job. Você pode evitar isso ingerindo apenas arquivos imutáveis, ou você pode usar gatilhos de chegada de arquivo com o Auto Loader para rastrear o progresso da ingestão.

  • Se o local de armazenamento não estiver habilitado para eventos de arquivo:

    • Um máximo de 50 trabalhos podem ser configurados com um acionador de chegada de arquivos nesses locais em um Databricks workspace.
    • O local de armazenamento pode conter até 10.000 arquivos. Se o local de armazenamento configurado for um subcaminho de um local ou volume externo do Unity Catalog, o limite de 10.000 arquivos se aplicará ao subcaminho e não à raiz do local de armazenamento. Por exemplo, a raiz do local de armazenamento pode conter mais de 10.000 arquivos em seus subdiretórios, mas o subdiretório configurado não deve exceder o limite de 10.000 arquivos.

Consulte também Limitações de eventos de arquivo.

A chegada de arquivos é acionada por caminhos inexistentes em locais externos do S3 e do GCS.

Quando o diretório configurado não existe ou é excluído do Amazon S3 ou do Google Cloud Storage, os gatilhos de chegada de arquivos continuam sendo avaliados sem erros. Esse comportamento ocorre porque tanto o S3 quanto o GCS não diferenciam entre diretórios inexistentes, excluídos e vazios.

Consequentemente, um gatilho de chegada de arquivo que monitora um caminho de diretório inexistente ou excluído não falha nem gera uma notificação de erro. O gatilho continua avaliando, não encontra arquivos e não aciona a execução de nenhuma tarefa até que arquivos sejam adicionados novamente a esse caminho. Este é o comportamento esperado e não uma condição de erro.