Pular para o conteúdo principal

Otimize e monitore o desempenho das consultas em modo de tempo real.

Esta página aborda otimização compute , técnicas para reduzir a latência de ponta a ponta e abordagens para medir o desempenho de consultas no modo tempo real.

ajuste computacional

Ao configurar seu compute, considere o seguinte:

  • Ao contrário do modo micro-lotes, o modo tempo real tarefa pode permanecer parado enquanto se aguarda dados, portanto, o dimensionamento correto é essencial para evitar o desperdício de recursos.

  • Defina uma meta de utilização do cluster, como 50%, ajustando as seguintes configurações:

    • maxPartitions (para Kafka)
    • spark.sql.shuffle.partitions (para fases aleatórias)
  • A Databricks recomenda definir maxPartitions para que cada tarefa lide com várias partições do Kafka para reduzir a sobrecarga.

  • Ajuste os slots de tarefas por worker para corresponder à carga de trabalho de um Job simples de uma única etapa.

  • Para trabalhos com muitas operações de embaralhamento (shuffle), experimente para encontrar o número mínimo de partições de embaralhamento que evitam acúmulos e ajuste a partir daí. O compute não irá programar a tarefa se não tiver slots suficientes.

nota

A partir do Databricks Runtime 16.4 LTS e versões superiores, todos os pipelines Tempo Real utilizam o checkpoint v2 para permitir transições perfeitas entre os modos Tempo Real e Micro-Lotes.

Otimização de latência

O modo de transmissão estruturada tempo real possui técnicas opcionais para redução da latência ponta a ponta. Nenhuma das duas está ativada por default. Você precisa habilitá-los separadamente.

  • Acompanhamento de progresso assíncrono: Move as gravações nos logs de deslocamento e commit para uma thread assíncrona, reduzindo o tempo de interpolação para consultas sem estado.
  • Checkpoint de estado assíncrono: Inicia o processamento dos próximos microlotes assim que a computação for concluída, sem esperar pelo checkpoint de estado, reduzindo a latência para consultas com estado.

monitoramento e observabilidade

No modo tempo real, os lotes de medição tradicionais não refletem a latência real de ponta a ponta. Utilize as abordagens abaixo para medir a latência com precisão e identificar gargalos em suas consultas.

A latência de ponta a ponta é específica para cada carga de trabalho e, às vezes, só pode ser medida com precisão por meio da lógica de negócios. Por exemplo, se o timestamp de origem for enviado para o Kafka, você pode calcular a latência como a diferença entre o timestamp de saída do Kafka e o timestamp de origem.

redes integradas com StreamingQueryProgress

O evento StreamingQueryProgress é registrado automaticamente nos logs do driver e acessível através da função de retorno de chamada onQueryProgress() de StreamingQueryListener. Isso permite que você reaja a eventos de progresso de forma programática, por exemplo, se quiser publicar métricas em um sistema de monitoramento externo. QueryProgressEvent.json() ou toString() incluem estas métricas de modo em tempo real:

  1. Latência de processamento (processingLatencyMs). O tempo decorrido entre o momento em que a consulta no modo em tempo real lê um registro e o momento em que a consulta o grava no próximo estágio ou em um processo subsequente. Para consultas de estágio único, essa medida corresponde à mesma duração que a latência de ponta a ponta. O sistema reporta essas métricas por tarefa.
  2. Latência de enfileiramento de origem (sourceQueuingLatencyMs). O tempo decorrido entre o momento em que o sistema grava um registro em um barramento de mensagens — por exemplo, o tempo de acréscimo de log no Kafka — e o momento em que a consulta no modo em tempo real lê o registro pela primeira vez. O sistema reporta essas métricas por tarefa.
  3. Latência de ponta a ponta (e2eLatencyMs). O intervalo de tempo entre o momento em que o sistema grava o registro em um barramento de mensagens e o momento em que a consulta em modo de tempo real grava o registro posteriormente. O sistema agrega essas métricas por lote em todos os registros processados por todas as tarefas.

Por exemplo:

JSON
"rtmMetrics" : {
"processingLatencyMs" : {
"P0" : 0,
"P50" : 0,
"P90" : 0,
"P95" : 0,
"P99" : 0
},
"sourceQueuingLatencyMs" : {
"P0" : 0,
"P50" : 1,
"P90" : 1,
"P95" : 2,
"P99" : 3
},
"e2eLatencyMs" : {
"P0" : 0,
"P50" : 1,
"P90" : 1,
"P95" : 2,
"P99" : 4
}
}

Medição de latência personalizada com a API Observe

A API Observe permite medir a latência em linha, sem a necessidade de executar uma tarefa separada. Se você tiver um registro de data e hora da origem que se aproxime do horário de chegada dos dados da origem, poderá estimar a latência por lote registrando um registro de data e hora antes do destino e calculando a diferença. Os resultados aparecem em relatórios de progresso e ficam disponíveis para os ouvintes.

Python
from datetime import datetime

from pyspark.sql.functions import avg, col, lit, max, percentile_approx, udf, unix_millis
from pyspark.sql.types import TimestampType

@udf(returnType=TimestampType())
def current_timestamp():
return datetime.now()

# Query before outputting
.withColumn("temp-timestamp", current_timestamp())
.withColumn(
"latency",
unix_millis(col("temp-timestamp")).cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
"observedLatency",
avg(col("latency")).alias("avg"),
max(col("latency")).alias("max"),
percentile_approx(col("latency"), lit(0.99), lit(150)).alias("p99"),
percentile_approx(col("latency"), lit(0.5), lit(150)).alias("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
# Output part of the query. For example, .WriteStream, etc.

Exemplo de saída:

JSON
"observedMetrics" : {
"observedLatency" : {
"avg" : 63.8369765176552,
"max" : 219,
"p99" : 154,
"p50" : 49
}
}

Recursos adicionais