Execução de consulta adaptável
A execução query adaptável (AQE) é a reotimização query que ocorre durante a execução query .
A motivação para a reotimização Runtime é que o Databricks tem as estatísticas precisas mais atualizadas no final de uma troca aleatória e de transmissão (referida como um estágio query no AQE). Como resultado, o Databricks pode optar por uma estratégia física melhor, escolher um tamanho e número de partição pós-embaralhamento ideal ou fazer otimizações que costumavam exigir dicas, por exemplo, manipulação join inclinação.
Isso pode ser muito útil quando a coleta de estatísticas não está ativada ou quando as estatísticas estão desatualizadas. Também é útil em locais onde as estatísticas derivadas estaticamente são imprecisas, como no meio de uma query complicada ou após a ocorrência de distorção de dados.
Capacidades
AQE está habilitado por default. Possui 4 recursos principais:
Altera dinamicamente a join merge de classificação para join de hash de transmissão.
Aglutina dinamicamente as partições (combina pequenas partições em partições de tamanho razoável) após a troca aleatória. Tarefas muito pequenas têm uma taxa de transferência de E/S pior e tendem a sofrer mais com a sobrecarga do programa e com a configuração da tarefa. A combinação de pequenas tarefas economiza recursos e melhora clusters Taxa de transferência.
Manipula dinamicamente a join de merge de classificação e join aleatória, dividindo (e replicando, se necessário) tarefas distorcidas em tarefas de tamanho aproximadamente igual.
Detecta e propaga dinamicamente relações vazias.
Aplicativo
AQE se aplica a todas query que são:
Não transmissão
Conter pelo menos uma troca (geralmente quando há uma join, agregação ou janela), umaquery ou ambas.
Nem todas query aplicadas por AQE são necessariamente otimizadas novamente. A reotimização pode ou não apresentar um plano query diferente daquele compilado estaticamente. Para determinar se o plano de uma consulta foi alterado pelo AQE, consulte a seção a seguir, Planos de consulta.
Planos de consulta
Esta seção discute como você pode examinar os planos query de diferentes maneiras.
Nesta secção:
Spark UI
AdaptiveSparkPlan
nó
query aplicada por AQE contém um ou mais nós AdaptiveSparkPlan
, geralmente como o nó raiz de cada query principal ouquery. Antes da execução query ou durante a execução, o sinalizador isFinalPlan
do nó AdaptiveSparkPlan
correspondente é exibido como false
; após a conclusão da execução query , o sinalizador isFinalPlan
muda para true.
Plano em evolução
O diagrama do plano query evolui à medida que a execução progride e reflete o plano mais atual que está sendo executado. Os nós que já foram executados (nos quais as métricas estão disponíveis) não serão alterados, mas aqueles que não foram podem mudar ao longo do tempo como resultado de reotimizações.
Veja a seguir um exemplo de diagrama de plano query :
DataFrame.explain()
AdaptiveSparkPlan
nó
query aplicada por AQE contém um ou mais nós AdaptiveSparkPlan
, geralmente como o nó raiz de cada query principal ouquery. Antes da execução query ou durante a execução, o sinalizador isFinalPlan
do nó AdaptiveSparkPlan
correspondente é exibido como false
; após a conclusão da execução query , o sinalizador isFinalPlan
muda para true
.
Plano atual e inicial
Em cada nó AdaptiveSparkPlan
haverá o plano inicial (o plano antes de aplicar qualquer otimização AQE) e o plano atual ou final, dependendo se a execução foi concluída. O plano atual evoluirá à medida que a execução avança.
Estatísticas Runtime
Cada estágio de shuffle e broadcast contém estatísticas de dados.
Antes da execução do estágio ou quando o estágio está em execução, as estatísticas são estimativas de tempo de compilação e o sinalizador isRuntime
é false
, por exemplo: Statistics(sizeInBytes=1024.0 KiB, rowCount=4, isRuntime=false);
Após a conclusão da execução do estágio, as estatísticas são aquelas coletadas em Runtime e o sinalizador isRuntime
se tornará true
, por exemplo: Statistics(sizeInBytes=658.1 KiB, rowCount=2.81E+4, isRuntime=true)
A seguir está um exemplo de DataFrame.explain
:
Antes da execução
Durante a execução
Após a execução
SQL EXPLAIN
Eficácia
O plano query mudará se uma ou mais otimizações AQE entrarem em vigor. O efeito dessas otimizações AQE é demonstrado pela diferença entre os planos atual e final e o plano inicial e os nós do plano específico nos planos atual e final.
Altere dinamicamente join merge de classificação para join de hash de transmissão: diferentes nós join física entre o plano atual/final e o plano inicial
Unir partições dinamicamente: nó
CustomShuffleReader
com propriedadeCoalesced
Trate dinamicamente join de inclinação: nó
SortMergeJoin
com campoisSkew
como verdadeiro.Detectar e propagar relações vazias dinamicamente: parte (ou todo) do plano é substituído pelo nó LocalTableScan com o campo de relação como vazio.
Configuração
Ativar e desativar a execução de consulta adaptável
Propriedade |
---|
spark.databricks.optimizer.adaptive.enabled Tipo: Se deve habilitar ou desabilitar a execução query adaptável. valor default : |
Ativar reprodução aleatória otimizada automaticamente
Propriedade |
---|
spark.sql.shuffle.partitions Tipo: O número default de partições a serem usadas ao embaralhar dados para join ou agregações. A configuração do valor Nota: Para a transmissão estruturada, esta configuração não pode ser alterada entre as reinicializações query a partir do mesmo ponto de verificação. valor default : 200 |
Altere dinamicamente a junção de mesclagem de classificação para junção de hash de transmissão
Propriedade |
---|
spark.databricks.adaptive.autoBroadcastJoinThreshold Tipo: O limite para acionar a alternância para join de transmissão em Runtime. valor default : |
Unir partições dinamicamente
Propriedade |
---|
spark.sql.adaptive.coalescePartitions.enabled Tipo: Se deve ativar ou desativar a união de partições. valor default : |
spark.sql.adaptive.advisoryPartitionSizeInBytes Tipo: O tamanho de destino após a coalescência. Os tamanhos de partição aglutinados serão próximos, mas não maiores que esse tamanho de destino. valor default : |
spark.sql.adaptive.coalescePartitions.minPartitionSize Tipo: O tamanho mínimo das partições após a união. Os tamanhos das partições combinadas não serão menores que esse tamanho. valor default : |
spark.sql.adaptive.coalescePartitions.minPartitionNum Tipo: O número mínimo de partições após a união. Não recomendado, porque a configuração substitui explicitamente valor default : 2x não. de núcleos clusters |
Lidar dinamicamente com junção de inclinação
Propriedade |
---|
spark.sql.adaptive.skewJoin.enabled Tipo: Se deve habilitar ou desabilitar o tratamento join de inclinação. valor default : |
spark.sql.adaptive.skewJoin.skewedPartitionFactor Tipo: Um fator que, quando multiplicado pelo tamanho médio da partição, contribui para determinar se uma partição está distorcida. valor default : |
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes Tipo: Um limite que contribui para determinar se uma partição está distorcida. valor default : |
Uma partição é considerada distorcida quando (partition size > skewedPartitionFactor * median partition size)
e (partition size > skewedPartitionThresholdInBytes)
são true
.
Detecte e propague dinamicamente relações vazias
Propriedade |
---|
spark.databricks.adaptive.emptyRelationPropagation.enabled Tipo: Se deve habilitar ou desabilitar a propagação dinâmica de relação vazia. valor default : |
Perguntas frequentes (FAQ)
Nesta secção:
Por que o AQE não transmitiu uma pequena tabela de junção?
Se o tamanho da relação que se espera ser transmitido cair abaixo desse limite, mas ainda não for transmitido:
Verifique o tipo join . A transmissão não é compatível com determinados tipos join , por exemplo, a relação à esquerda de um
LEFT OUTER JOIN
não pode ser transmitida.Também pode ser que a relação contenha muitas partições vazias, caso em que a maioria das tarefas pode ser concluída rapidamente com join merge de classificação ou pode ser potencialmente otimizada com manipulação join de inclinação. O AQE evita alterar essa join merge de classificação para join hash de transmissão se a porcentagem de partições não vazias for menor que
spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin
.
Ainda devo usar uma dica de estratégia de junção de transmissão com AQE ativado?
Sim. Uma join de transmissão planejada estaticamente geralmente tem mais desempenho do que uma planejada dinamicamente por AQE, pois AQE pode não alternar para join de transmissão até depois de realizar o shuffle para ambos os lados da join (quando os tamanhos reais da relação são obtidos). Portanto, usar uma dica de transmissão ainda pode ser uma boa escolha se você conhecer bem sua query . O AQE respeitará as dicas de query da mesma forma que a otimização estática, mas ainda poderá aplicar otimizações dinâmicas que não são afetadas pelas dicas.
Qual é a diferença entre dica de junção de inclinação e otimização de junção de inclinação AQE? Qual devo usar?
Recomenda-se confiar na manipulação join de inclinação AQE em vez de usar a dica join inclinação, porque join de inclinação AQE é totalmente automática e, em geral, tem um desempenho melhor do que a contraparte da dica.
Por que o AQE não ajustou minha ordem de junção automaticamente?
A reordenação join dinâmica não faz parte do AQE.
Por que o AQE não detectou minha distorção de dados?
Há duas condições de tamanho que devem ser satisfeitas para que o AQE detecte uma partição como uma partição distorcida:
O tamanho da partição é maior que
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes
(default 256 MB)O tamanho da partição é maior que o tamanho médio de todas as partições vezes o fator de partição distorcida
spark.sql.adaptive.skewJoin.skewedPartitionFactor
(default 5)
Além disso, o suporte à manipulação de distorção é limitado para determinados tipos join , por exemplo, em LEFT OUTER JOIN
, somente a distorção no lado esquerdo pode ser otimizada.
Legado
O termo “Adaptive Execution” existe desde o Spark 1.6, mas o novo AQE no Spark 3.0 é fundamentalmente diferente. Em termos de funcionalidade, o Spark 1.6 faz apenas a parte “unir partições dinamicamente”. Em termos de arquitetura técnica, o novo AQE é uma estrutura de planejamento dinâmico e replanejamento de query com base em estatísticas Runtime , que suporta uma variedade de otimizações como as que descrevemos neste artigo e pode ser estendida para permitir otimizações mais potenciais.