Melhores práticas do Auto Loader
Esta página descreve as práticas recomendadas que você pode aplicar para configurar o Auto Loader para execução de forma confiável, econômica e em escala para seu caso de uso.
Essas melhores práticas reduzem a sobrecarga operacional e evitam problemas comuns que são difíceis de diagnosticar em produção, tais como: custos desnecessários da API LIST devido a varreduras completas de diretório, perda de dados silenciosa devido ao desvio de esquema e reinícios de pipeline causados por configuração incorreta de checkpoint.
Para obter detalhes de configuração de produção, consulte Configurar o Auto Loader para cargas de trabalho de produção. Para monitoramento e observabilidade, consulte Monitorar e observar o Auto Loader.
Escolha a estrutura de execução certa
A melhor estrutura de execução para seu caso de uso depende de quanto controle você precisa sobre o pipeline e de quanta sobrecarga operacional você deseja gerenciar. Para a maioria dos usuários e pipelines de produção, o Auto Loader com Lakeflow Spark Declarative Pipelines é uma boa opção. No entanto, se você precisar de controle máximo e personalização, use o Auto Loader com Transmissão Estructurada. Para a configuração mais simples com uma experiência gerenciada, use um Conector LakeFlow gerenciado quando disponível.
O Lakeflow Spark Declarative Pipelines estende a transmissão estructurada com autoscale, verificações de qualidade de dados, tratamento de evolução do esquema e monitoramento através do log de eventos. A Databricks recomenda o Lakeflow Spark Declarative Pipelines para a maioria das cargas de trabalho de ingestão de produção.
Escolha o tipo certo de programar e acionador
A melhor forma de programar e o melhor tipo de acionador para seu caso de uso dependem dos seus requisitos de latência e dos padrões de chegada de arquivo. Para a maioria dos casos de uso, a Databricks recomenda um acionador de chegada de arquivo com eventos de arquivo ativados. Isso permite uma ingestão de baixa latência a baixo custo porque o compute só entra em execução quando novos arquivos chegam. Os três tipos de acionador diferem em quando e com que frequência o pipeline começa:
- **Contínuo**: O pipeline está em execução sem parar. Use somente quando a latência de submilisegundos for um requisito rigoroso, pois o compute contínuo custa mais. Parear com eventos de arquivo.
- **Gatilho de chegada de arquivo**: O pipeline começa quando novos arquivos chegam no local de origem. Melhor para baixa a média latência ou padrões de chegada de arquivo irregulares. Requer que os eventos de arquivo estejam ativados. Consulte Acionar jobs quando chegarem novos arquivos.
- Programado : O pipeline está em execução com base em um programar baseado em tempo (por exemplo, a cada hora). Use quando os requisitos de latência forem flexíveis (minutos a horas). Funciona com listagem de diretório, mas eventos de arquivo reduzem custos mesmo no modo programado, evitando varreduras completas de diretório.
Para obter detalhes sobre o uso de Trigger.AvailableNow para programar em lotes, consulte Usando Trigger.AvailableNow e limitação de taxa.
Escolha o modo certo de descoberta de arquivos
O Auto Loader suporta três modos de descoberta de arquivos com diferentes compensações em termos de complexidade de configuração, escalabilidade e custo.
Mode | Complexidade de configuração | Escalabilidade | Custo | Quando usar |
|---|---|---|---|---|
Eventos de arquivo (recomendado) | Baixo (configuração de permissão única) | Milhões de arquivos por hora | Mínimo | Default para a maioria das cargas de trabalho |
Notificação de arquivo clássica | Alto (21+ opções de configuração de cloud) | Milhões de arquivos por hora | Médio | Quando os eventos de arquivo estiverem indisponíveis |
Listagem de diretório | Nenhuma | Limitado por tamanho do diretório | Maiores (custos da LIST API) | Diretórios pequenos, preenchimentos únicos, ou quando as políticas de segurança impedem eventos de arquivo |
Eventos de arquivo consolidam os recursos de armazenamento em cloud usando uma inscrição e fila por local externo, em vez de uma por transmissão. A diferença de desempenho é significativa em escala: a listagem do diretório deve verificar o diretório de origem inteiro a cada gatilho, portanto, o tempo de ingestão aumenta com o tamanho do diretório. Eventos de arquivo entregam novas notificações de arquivo diretamente, então o tempo de ingestão permanece baixo independentemente de quantos objetos existam no diretório.
Ativar eventos de arquivo
Eventos de arquivo requerem uma concessão única de permissões de cloud e um local externo configurado para usar o serviço de eventos de arquivo gerenciado. Uma vez configurados, todas as transmissões do Auto Loader que leem desse local externo podem usar eventos de arquivo sem configuração adicional.
-
Conceda as permissões de cloud necessárias no lado do provedor de cloud. Os requisitos variam de acordo com o provedor de cloud. Consulte Configure eventos de arquivo para um local externo.
-
Defina
cloudFiles.useManagedFileEventscomotrueem sua consulta do Auto Loader.Pythondf = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.useManagedFileEvents", "true")
.load("/path/to/data/dir"))Para os passos de configuração completos, consulte Migrar para o Auto Loader com eventos de arquivo.
Quando você não pode usar eventos de arquivo
Pode ser que você não consiga usar eventos de arquivo quando:
- O local externo não está configurado com eventos de arquivo.
- As políticas de segurança da organização não permitem a ativação de eventos de arquivo em um local externo compartilhado.
Nestes casos, use o modo de notificação de arquivo clássica ou o modo de listagem de diretório. Para uma comparação completa dos modos de detecção de arquivo, consulte Comparar modos de detecção de arquivos do Auto Loader.
Gerenciar evolução do esquema
O Auto Loader infere o esquema automaticamente, mas como você configura a evolução do esquema afeta a integridade dos dados e a estabilidade do pipeline. Use a seguinte tabela para escolher uma estratégia.
Cenário | Recomendação |
|---|---|
O esquema é conhecido e fixo. | Forneça um esquema explícito com |
Esquema desconhecido, mudanças aditivas esperadas |
|
Esquema desconhecido, mudanças de tipo esperadas |
|
Contrato de esquema estrito exigido. |
|
Esquema arbitrário ou imprevisível | Ingestão como tipo |
Após escolher uma estratégia, aplique as seguintes práticas para ajustar o comportamento da evolução do esquema.
Use dicas de esquema para tipos de campo conhecidos
Use a opção cloudFiles.schemaHints para impor tipos para campos que você conhece antecipadamente, enquanto ainda permite a inferência de esquema para outros campos.
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaHints", "id long, amount double")
.load("/path/to/data/dir"))
Use a ampliação de tipo para mudanças de tipo compatíveis
O modo de evolução do esquema addNewColumnsWithTypeWidening amplia automaticamente os tipos compatíveis (por exemplo, int para long) em vez de direcionar dados para a coluna _rescued_data. Isso evita a necessidade de jobs de pós-processamento para lidar com promoções de tipo simples.
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "parquet")
.option("cloudFiles.schemaEvolutionMode", "addNewColumnsWithTypeWidening")
.load("/path/to/data/dir"))
Ingira como tipo Variant para esquemas imprevisíveis
Quando seus dados não estiverem em conformidade com nenhum esquema específico, ou o esquema for alterado continuamente, ingira os dados como um tipo Variant.
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("singleVariantColumn", "data")
.load("/path/to/data/dir"))
Variant oferece esquema sob demanda no momento da consulta, mas é menos eficiente do que consultar colunas estruturadas. Para a mecânica completa de inferência e evolução de esquema, consulte Configurar inferência e evolução de esquema no Auto Loader.
Lidar com dados inválidos e qualidade dos dados
As práticas a seguir ajudam você a detectar, capturar e isolar dados incorretos antes que se propaguem para as camadas downstream.
Ativar _rescued_data e _corrupt_record
O Auto Loader fornece duas colunas para capturar dados que não podem ser analisados corretamente.
_rescued_datacaptura campos que não correspondem ao esquema atual. É adicionado automaticamente pelo Auto Loader._corrupt_recordcaptura linhas que não podem ser analisadas de forma alguma. Habilite-o usandocolumnNameOfCorruptRecord:
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaHints", "_corrupt_record string")
.option("columnNameOfCorruptRecord", "_corrupt_record")
.load("/path/to/data/dir"))
O Databricks recomenda columnNameOfCorruptRecord em vez de badRecordsPath para evitar possíveis condições de corrida que podem perder registros corrompidos.
Use as expectativas de Lakeflow Spark Declarative Pipelines para monitoramento
Defina as expectativas dos Pipelines Declarativos do Lakeflow Spark para verificar se _rescued_data e _corrupt_record são NULL em condições normais. Valores não nulos sinalizam desvio de esquema ou corrupção de dados.
import dlt
@dlt.table
@dlt.expect("no rescued data", "_rescued_data IS NULL")
@dlt.expect("no corrupt records", "_corrupt_record IS NULL")
def bronze_table():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaHints", "_corrupt_record string")
.option("columnNameOfCorruptRecord", "_corrupt_record")
.load("/path/to/data/dir"))
Isolar dados corrompidos
Isolar linhas com dados não analisáveis em um coletor dedicado para investigação. Isso evita que dados corrompidos se propaguem para camadas downstream.
import dlt
@dlt.table
def corrupt_records_sink():
return dlt.read_stream("bronze_table").where("_corrupt_record IS NOT NULL")
@dlt.view
def clean_table():
return dlt.read_stream("bronze_table").where("_corrupt_record IS NULL")
Anotar dados com metadados de arquivo de origem
Inclua a coluna _metadata em suas consultas de ingestão do Auto Loader. No mínimo, capture file_path e file_modification_time. Isso permite rastrear problemas de dados até arquivos de origem específicos e fazer join com cloud_files_state() para o ciclo de vida completo do arquivo.
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/path/to/data/dir")
.select("*", "_metadata.file_path", "_metadata.file_modification_time"))
Para obter detalhes, consulte Coluna de metadados de arquivo.
Otimizar custo e desempenho
As seguintes práticas reduzem os três principais fatores de custo para o Auto Loader: chamadas de API LIST cloud, compute parado e crescimento de armazenamento de longo prazo.
-
Use eventos de arquivo para minimizar os custos da API LIST : Eventos de arquivo fornecem descoberta incremental de arquivos, eliminando a necessidade de listagens completas de diretórios em cada execução. Esta é a otimização de custo mais impactante para o Auto Loader.
-
**Utilize gatilhos de chegada de arquivo para processamento orientado a eventos**: Gatilhos de chegada de arquivo começam seu pipeline apenas quando novos arquivos chegam, assim você não paga por compute parado. Consulte Acionar jobs quando chegarem novos arquivos.
-
Arquivar arquivos processados com cloudFiles.cleanSource : Use
cloudFiles.cleanSourcepara excluir ou mover automaticamente arquivos processados. Isso reduz tanto os custos de armazenamento quanto os custos de listagem de diretório para transmissões de longa duração. Para obter todos os detalhes, consulte Arquivamento de arquivos no diretório de origem para reduzir custos.- Use o modo
deletepara remover arquivos após a ingestão. - Use o modo
movepara arquivar arquivos em um local diferente para compliance ou auditoria.
Pythondf = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.cleanSource", "delete")
.load("/path/to/data/dir")) - Use o modo
Não habilite cloudFiles.cleanSource se várias transmissões do Auto Loader ou outros clientes lerem do mesmo diretório de origem.
- Aproveite as melhorias de desempenho : Atualize para o Databricks Runtime mais recente ou use o compute serverless para se beneficiar das melhorias de desempenho recentes do Auto Loader.
Gerenciamento de pontos de verificação
O ponto de verificação armazena o progresso e o estado do arquivo da transmissão. Configurar incorretamente ou perder o ponto de verificação exige uma reinicialização completa, portanto, trate-o como infraestrutura crítica.
- Nunca aplique políticas de ciclo de vida de objetos de cloud a locais de ponto de verificação. Se os arquivos de ponto de verificação forem excluídos, o estado da transmissão estará corrompido e você deverá reiniciar do zero.
- Use pontos de verificação separados para cada transmissão e diretório de origem.
- Considere
cloudFiles.maxFileAgepara transmissões de longa duração e alto volume para limitar o crescimento do estado. Use uma configuração conservadora (90 dias mínimo recomendado). Definir esse valor de forma muito agressiva pode resultar no reprocessamento de arquivos que o Auto Loader já ingeriu, caso eles estejam fora da janela.
Para obter todos os detalhes, consulte Acompanhamento de eventos de arquivo.
Use volumes para descoberta otimizada de arquivos com eventos de arquivo
Para desempenho aprimorado com eventos de arquivo, crie um volume externo para cada caminho ou subdiretório do qual o Auto Loader carrega. Forneça caminhos de volume (por exemplo, /Volumes/catalog/schema/volume) para o Auto Loader em vez de caminhos da cloud (por exemplo, s3://bucket/path). Isso otimiza a descoberta de arquivos por meio de um padrão otimizado de acesso a dados.
Para mais práticas recomendadas sobre eventos de arquivo, consulte Práticas recomendadas para o Auto Loader com eventos de arquivo.