Pular para o conteúdo principal

Fonte Amazon S3 com Amazon SQS (legado)

important

Essa documentação foi descontinuada e pode não estar atualizada. O produto, serviço ou tecnologia mencionados neste conteúdo não são mais suportados. Consulte O que é o Auto Loader?

O conector S3-SQS usa o Amazon Simple Queue Service (SQS) para fornecer uma fonte Amazon S3 otimizada que permite localizar novos arquivos gravados em um bucket S3 sem listar repetidamente todos os arquivos. Isso oferece duas vantagens:

  • Menor latência: não há necessidade de listar grandes buckets em S3, o que é lento e exige muitos recursos.
  • Custos mais baixos: não há mais solicitações caras de LIST API feitas ao S3.
nota

A origem do S3-SQS exclui mensagens da fila SQS à medida que consome eventos. Se o senhor quiser que outros pipelines consumam da mesma fila, configure uma fila SQS separada para o leitor otimizado. Você pode usar o SNS para publicar mensagens em várias filas SQS.

Use a fonte de arquivo S3-SQS

Para usar a fonte de arquivo S3-SQS, você deve:

  • Configure notificações de eventos e encaminhe-as para o SQS. Consulte Configuração das notificações de eventos do Amazon S3.

  • Especifique as opções fileFormat e queueUrl e um esquema. Por exemplo:

    Python
    spark.readStream \
    .format("s3-sqs") \
    .option("fileFormat", "json") \
    .option("queueUrl", ...) \
    .schema(...) \
    .load()

Autenticação com o Amazon SQS e o S3

Databricks usa a cadeia de provedores de credenciaisAmazon's default para autenticação no SQS. Recomendamos que o senhor inicie o clustering Databricks com um instance profile que possa acessar o SQS e o bucket S3.

Essa fonte requer as permissões sqs:ReceiveMessage, sqs:DeleteMessage e s3:GetObject. Se você tiver exceções Amazon: Access Denied, verifique se seu usuário ou perfil tem essas permissões. Consulte Usando políticas baseadas em identidade (IAM) para o Amazon SQS e Exemplos de políticas de bucket para obter detalhes.

Configuração

Opção

Tipo

Padrão

Descrição

Permitir substituições

Booleana

true

Se um blob que é sobrescrito deve ser reprocessado.

Excluir Regex

String

Nenhuma

Exclua os arquivos com base no caminho.

Buscar paralelismo

Integer

1

Número de segmentos a serem usados ao buscar mensagens do serviço de enfileiramento.

Formato de arquivo

String

Nenhum (parâmetro obrigatório)

O formato dos arquivos, como parquet, json, csv, text e assim por diante.

Idade máxima do arquivo

Integer

604800

Determina por quanto tempo (em segundos) as notificações de arquivos são armazenadas como estado para evitar o processamento duplicado.

Máximo de arquivos por gatilho

Integer

1000

Número máximo de novos arquivos a serem considerados em cada acionador.

O caminho reescreve

A JSON strings.

"{}"

Se você usar pontos de montagem, poderá reescrever o prefixo do caminho bucket/key com o ponto de montagem. Somente prefixos podem ser reescritos. Por exemplo, para a configuração {"<databricks-mounted-bucket>/path": "/mnt/data-warehouse"}, o caminho <databricks-mounted-bucket>/path/2017/08/fileA.json é reescrito para /mnt/data-warehouse/2017/08/fileA.json.

URL da fila

String

Nenhum (parâmetro obrigatório)

O URL da fila SQS.

região

String

Região resolvida localmente

A região em que a fila está definida.

Intervalo de busca do SQS

A duração das cordas, por exemplo, 2m por 2 minutos.

"5s"

Quanto tempo esperar entre as buscas se a fila estiver vazia. O AWS cobra por solicitação de API ao SQS. Portanto, se os dados não estiverem chegando com frequência, esse valor pode ser definido como uma duração longa. O SQS suporta polling longo com duração máxima de 20 segundos. Se esse valor for definido para mais de 20 segundos, dormiremos pela duração restante. Desde que a fila não esteja vazia, buscamos continuamente. Se novos arquivos forem criados a cada 5 minutos, como no caso do Kinesis Firehose ou do CloudTrail logs, para reduzir os custos do SQS, talvez o senhor queira definir um valor alto sqsFetchInterval.

Se o senhor observar muitas mensagens no driver logs que se parecem com Fetched 0 new events and 3 old events., onde tende a observar muito mais eventos antigos do que novos eventos, poderá reduzir o intervalo de acionamento da transmissão ou aumentar o tempo limite de visibilidade na fila do SQS.

Se estiver consumindo arquivos de um local no S3 onde espera que alguns arquivos sejam excluídos antes de serem processados, o senhor pode definir a seguinte configuração para ignorar o erro e continuar o processamento:

Python
spark.sql("SET spark.sql.files.ignoreMissingFiles=true")

Perguntas frequentes (FAQ)

Uma URL de fila SQS já tem um endpoint de região, portanto, o campo region não precisa ser definido, correto?

O senhor deve definir explicitamente a região se a fila do SQS não estiver na mesma região do clustering Spark.

sqsFetchInterval

  • Se o valor for menor que 20 segundos, definimos o tempo limite de sondagem longa do SQS como esse valor específico? Sim
  • Se o valor for maior que 20 segundos e houver dados na fila, continuaremos criando solicitações de sondagem longas do SQS com tempo limite de 20 segundos? Faremos solicitações com uma sondagem longa definida para 20 segundos, mas o SQS retornará imediatamente. Você não vai esperar 20 segundos.
  • Se o valor for maior que 20 segundos e a fila estiver vazia, criaremos solicitações de pesquisa longas com um tempo limite de 20 segundos após o intervalo especificado? Faremos solicitações com uma longa votação definida para 20 segundos. Se o SQS não retornar nada, dormiremos o resto do intervalo. Não faremos mais solicitações ao SQS para a duração do intervalo, pois o SQS cobra por chamada REST API .

Se ignoreFileDeletion for False (default) e o objeto tiver sido excluído, haverá falha em todo o pipeline?

Sim, se recebermos um evento informando que o arquivo foi excluído, haverá falha em todo o pipeline.

Como devo definir maxFileAge?

O SQS fornece semântica de entrega de mensagens pelo menos uma vez, portanto, precisamos manter o estado para desduplicação. A configuração default para maxFileAge é de 7 dias, o que é maior do que o default TTL de uma mensagem no SQS, que é de 4 dias. Se o senhor definir que a duração da retenção de uma mensagem na fila é maior, defina essa configuração de acordo.