Azure Blob fonte de arquivo de armazenamento com Queue Storage (legado) Azure
Essa documentação foi descontinuada e pode não estar atualizada. O produto, serviço ou tecnologia mencionados neste conteúdo não são mais suportados. Consulte O que é o Auto Loader?
O conector ABS-AQS fornece uma fonte de arquivos otimizada que usa o Azure Queue Storage (AQS) para encontrar novos arquivos gravados em um contêiner do Azure Blob Storage (ABS) sem listar repetidamente todos os arquivos. Isso oferece duas vantagens:
- Menor latência: não há necessidade de listar estruturas de diretório aninhadas no ABS, o que é lento e consome muitos recursos.
- Custos mais baixos: não há mais solicitações caras de LIST API feitas ao ABS.
A fonte ABS-AQS exclui mensagens da fila do AQS à medida que consome eventos. Se o senhor quiser que outros pipelines consumam mensagens dessa fila, configure uma fila AQS separada para o leitor otimizado. O senhor pode configurar várias inscrições na Event Grid para publicar em filas diferentes.
Use a fonte de arquivo ABS-AQS
Para usar a fonte de arquivo ABS-AQS, você deve:
-
Configure as notificações de eventos do ABS aproveitando a inscrição no Azure Event Grid e encaminhe-as para o AQS. Consulte Reagir a eventos de armazenamento de blobs.
-
Especifique as opções
fileFormat
equeueUrl
e um esquema. Por exemplo:Pythonspark.readStream \
.format("abs-aqs") \
.option("fileFormat", "json") \
.option("queueName", ...) \
.option("connectionString", ...) \
.schema(...) \
.load()
Autenticar com o Armazenamento de Filas e Blob do Azure
Para autenticar com o Azure Queue Storage e o armazenamento Blob, use a chave tokens ou account do Shared Access Signature (SAS). O senhor deve fornecer uma string de conexão para o storage account onde a fila está implantada, que contém os tokens SAS ou a chave de acesso ao storage account. Para obter mais informações, consulte Configurar a conexão do Azure Storage strings.
O senhor também precisará fornecer acesso aos seus contêineres de armazenamento do Azure Blob. Consulte Connect to Azure data lake Storage and Blob Storage para obter informações sobre como configurar o acesso ao seu contêiner de armazenamento Azure Blob.
Databricks É altamente recomendável que o senhor use o gerenciar secrets para fornecer sua conexão strings.
Configuração
Opção | Tipo | Padrão | Descrição |
---|---|---|---|
Permitir substituições | Booleana |
| Se um blob que é sobrescrito deve ser reprocessado. |
Cadeia de conexão | String | Nenhum (parâmetro obrigatório) | As cadeias de conexão para acessar sua fila. |
Buscar paralelismo | Integer | 1 | Número de segmentos a serem usados ao buscar mensagens do serviço de enfileiramento. |
Formato de arquivo | String | Nenhum (parâmetro obrigatório) | O formato dos arquivos, como |
Ignorar exclusão de arquivo | Booleana |
| Se você tiver configurações de ciclo de vida ou excluir os arquivos de origem manualmente, deverá definir essa opção como |
Idade máxima do arquivo | Integer | 604800 | Determina por quanto tempo (em segundos) as notificações de arquivos são armazenadas como estado para evitar o processamento duplicado. |
O caminho reescreve | A JSON strings. |
| Se você usar pontos de montagem, poderá reescrever o prefixo do caminho |
Intervalo QueueFetch | Uma string de duração, por exemplo, |
| Quanto tempo esperar entre as buscas se a fila estiver vazia. O Azure cobra por solicitação de API ao AQS. Portanto, se os dados não estiverem chegando com frequência, esse valor pode ser definido como uma duração longa. Desde que a fila não esteja vazia, buscaremos continuamente. Se novos arquivos forem criados a cada 5 minutos, talvez você queira definir um |
Nome da fila | String | Nenhum (parâmetro obrigatório) | O nome da fila do AQS. |
Se o senhor observar muitas mensagens no driver logs que se parecem com Fetched 0 new events and 3 old events.
, onde tende a observar muito mais eventos antigos do que novos, deve reduzir o intervalo de acionamento da transmissão.
Se você estiver consumindo arquivos de um local no armazenamento Blob em que espera que alguns arquivos sejam excluídos antes de serem processados, defina a seguinte configuração para ignorar o erro e continuar o processamento:
spark.sql("SET spark.sql.files.ignoreMissingFiles=true")
Perguntas frequentes (FAQ)
Se ignoreFileDeletion
for False (default) e o objeto tiver sido excluído, haverá falha em todo o pipeline?
Sim, se recebermos um evento informando que o arquivo foi excluído, haverá falha em todo o pipeline.
Como devo definir maxFileAge
?
O Azure Queue Storage fornece a semântica de entrega de mensagens ao menos uma vez, portanto, precisamos manter o estado para a deduplicação. A configuração default para maxFileAge
é 7 dias, que é igual ao TTL máximo de uma mensagem na fila.