Executar várias consultas de transmissão estructurada no mesmo clusters
Muitos clientes executam várias consultas de transmissão estructurada no mesmo cluster Databricks. Embora esse padrão seja suportado, o Databricks recomenda limitar o número de consultas por cluster para evitar problemas de dimensionamento e gargalos de desempenho. Na computação serverless, a Databricks gerencia o dimensionamento automaticamente, assim, essas considerações são tratadas para você. Caso o compute clássico esteja sendo utilizado, onde o dimensionamento de driver e executor é controlado, esta página descreve os key gargalos a serem considerados e as formas de solucioná-los.
A Databricks recomenda o uso de LakeFlow Spark Declarative Pipelines para novas cargas de trabalho de transmissão, que gerencia automaticamente a complexidade da infraestrutura. Consulte Lakeflow Spark Declarative Pipelines.
Quando usar várias consultas no mesmo cluster
A execução de múltiplas consultas de transmissão no mesmo cluster reduz os custos de infraestrutura, especialmente quando há muitas transmissões pequenas que individualmente não exigem compute dedicado. A principal desvantagem é a falha compartilhada: se o cluster falhar, toda transmissão nele falha. Para pipelines de missão crítica, esse modo de falha compartilhado muitas vezes é inaceitável.
Para cargas de trabalho que misturam transmissões críticas e não críticas, a Databricks recomenda o seguinte:
- Uma prioridade deve ser atribuída a cada transmissão com base em seu impacto nos negócios.
- Mantenha transmissões de missão crítica em clusters dedicados, mesmo a um custo mais alto.
- Colocalizar transmissões de menor prioridade para compartilhar compute e reduzir o custo.
Dimensionamento do Driver
O driver é um recurso compartilhado. Múltiplas consultas compartilham o mesmo CPU, memória, programador DAG, programador de tarefas e execução de UDF do lado do driver (por exemplo, foreachBatch). Ao executar muitas transmissões concorrentes, observe estes gargalos específicos além do provisionamento padrão de CPU e memória:
- Sobrecarga do Auto Loader : Se suas transmissões usarem o Auto Loader, a descoberta de arquivos e a listagem de diretórios aumentam a pressão no driver.
- Limites de recurso em nível de SO (arquivos abertos) : executar um alto volume de transmissões baseadas em arquivo (como
FileStreamSourceou Auto Loader) simultaneamente em um único driver pode esgotar os limites de descritores de arquivo abertos em nível de usuário, o que pode causar falhas aleatórias de transmissão. - Contrapressão do barramento do ouvinte: Um grande número de consultas de transmissão concorrentes pode causar contrapressão no barramento da única sessão do
StreamingQueryListenerSpark. Todos os eventos (incluindoonQueryIdle) são enviados para este único barramento, e um grande acúmulo de eventos pode atrasar severamente os manipuladores assíncronosonQueryProgresse afetar a estabilidade do cluster. - Operações de driver caras: Evite chamar
collect()ou outras operações de DataFrame custosas no driver, a menos que seja absolutamente necessário, para evitar a materialização de grandes conjuntos de resultados e causar erros de falta de memória (OOM).
Solucionar conflito de driver
Se você estiver enfrentando falhas no driver devido a OOM ou problemas de contenção:
- Monitorar métricas do driver no Spark UI. Caso observe alto uso de CPU, memória ou disco, ajuste o dimensionamento do driver nas configurações de compute do cluster.
- Se os problemas persistirem, verifique se o seu código não está executando operações intensivas em memória ou UDFs no driver.
- Se você não conseguir dimensionar o driver verticalmente ainda mais, a Databricks recomenda fortemente que você distribua seus jobs entre vários clusters para contornar esses gargalos de dimensionamento de nó compartilhado.
Dimensionamento do executor
Com múltiplas consultas sendo executadas no mesmo cluster, todas as consultas compartilham slots de tarefa nos executores. Estágios de uma consulta podem ocupar slots disponíveis, resultando em atrasos e privação de recursos para outras consultas. O Spark usa um mapeamento 1:1 entre slots de tarefas e núcleos disponíveis. Certifique-se de que núcleos suficientes estejam disponíveis se as consultas precisarem ser executadas simultaneamente.
Em geral, executores podem realizar operações mais intensivas em memória do que o nó do driver. Ajuste os parâmetros de alocação de memória off-heap e da JVM do executor, se necessário, para lidar com a carga do seu aplicativo. Garanta que os nós do executor sejam dimensionados de forma apropriada em termos de CPU, memória e espaço em disco e escalem verticalmente se necessário. Se o escalonamento vertical não for possível, considere adicionar nós worker adicionais ao cluster.
Algumas dessas alterações podem exigir o reinício do cluster para serem efetivadas.
Usar pools do programador
É possível configurar pools do programador para atribuir capacidade de compute a consultas ao executar várias consultas de transmissão a partir do mesmo código-fonte.
Por default, todas as queries iniciadas em um Notebook são executadas no mesmo pool de agendamento justo. Os jobs do Apache Spark gerados por gatilhos de todas as consultas de transmissão em um Notebook são executados um após o outro na ordem "primeiro a entrar, primeiro a sair" (FIFO). Isso pode causar atrasos desnecessários nas consultas, porque não estão compartilhando os recursos do cluster de forma eficiente.
Os pools do programador permitem declarar quais consultas de transmissão estructurada compartilham recursos de compute.
O exemplo a seguir atribui query1 a um pool dedicado, enquanto query2 e query3 compartilham um pool de programadores.
# Run streaming query1 in scheduler pool1
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")
df.writeStream.queryName("query1").toTable("table1")
# Run streaming query2 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query2").toTable("table2")
# Run streaming query3 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query3").toTable("table3")
A configuração da propriedade local deve estar na mesma célula de Notebook onde você começa sua consulta de transmissão.
Para obter mais informações sobre pools do programador justo, consulte a documentação do programador justo do Apache Spark.
Considerações sobre consultas com estado
Para consultas com estado sendo executadas no mesmo cluster, tenha em mente o seguinte:
- Use o RocksDB como o provedor de armazenamento do estado para evitar problemas de OOM e pausas de GC. RocksDB é o provedor de armazenamento do estado default no Databricks Runtime 17.3 e acima. Consulte Configurar armazenamento do estado do RocksDB no Databricks.
- Ajuste as partições aleatórias para os requisitos da sua aplicação. Para estágios com estado, o Spark programa tarefas proporcional ao número de partições aleatórias.
- Limite o uso da memória do RocksDB por nó para evitar erros de OOM devido ao uso de memória off-heap. Isso é tratado automaticamente no Databricks Runtime 17.3 e acima, mas exige configuração manual em versões anteriores. Ver Limitação do uso de memória do RocksDB.
- Evite agrupar muitas partições no mesmo nó executor. Operações de manutenção no armazenamento do estado, incluindo upload de Snapshot e limpeza, são executadas por nó. Atribuir muitas partições a um nó executor pode causar interrupções na manutenção e tempos de recuperação mais longos devido à menor disponibilidade de snapshots completos.