Pular para o conteúdo principal

modo tempo real em transmissão estruturada

info

Visualização

Esse recurso está em Public Preview.

O modo tempo real é um tipo de gatilho para transmissão estruturada que permite o processamento de dados com latência ultrabaixa, com latência de ponta a ponta de apenas cinco milissegundos. Utilize o modo tempo real para cargas de trabalho operacionais que exigem resposta imediata aos dados transmitidos, como detecção de fraudes, personalização em tempo real e sistemas de tomada de decisão instantânea.

O modo tempo real está disponível no Databricks Runtime 16.4 LTS e versões superiores. Para obter instruções de configuração passo a passo, consulte Começar com o modo tempo real. Para exemplos de código, consulte Exemplos do modo em tempo real.

O que é o modo em tempo real?

Cargas de trabalho operacionais versus analíticas

As cargas de trabalho de transmissão podem ser amplamente divididas em cargas de trabalho analíticas e cargas de trabalho operacionais:

  • As cargas de trabalho analíticas utilizam a ingestão de dados e transformações, seguindo normalmente a arquitetura medalhão (por exemplo, ingestão de dados nas tabelas bronze, prata e ouro).
  • As cargas de trabalho operacionais consomem dados em tempo real, aplicam lógica de negócios e acionam ações ou decisões posteriores.

Alguns exemplos de cargas de trabalho operacionais são:

  • Bloquear ou sinalizar uma transação com cartão de crédito em tempo real se uma pontuação de fraude exceder um limite, com base em fatores como localização incomum, grande valor da transação ou padrões de gastos rápidos.
  • Entregar uma mensagem promocional quando os dados do clickstream mostram que um usuário está procurando jeans há cinco minutos, oferecendo um desconto de 25% se ele comprar nos próximos 15 minutos.

De forma geral, as cargas de trabalho operacionais são caracterizadas pela necessidade de latência de ponta a ponta inferior a um segundo. Isso pode ser alcançado com o modo tempo real no Apache Spark transmissão estruturada.

Como o modo em tempo real alcança baixa latência

O modo em tempo real aprimora a arquitetura de execução ao:

  • Executar lotes de longa duração (o default é de cinco minutos), nos quais o sistema processa os dados à medida que ficam disponíveis na fonte.
  • programar todas as etapas da consulta simultaneamente. Isso exige que o número de vagas de tarefas disponíveis seja igual ou maior que o número de tarefas de todos os estágios em um lote.
  • A transmissão de dados entre estágios ocorre assim que os dados são produzidos, utilizando um mecanismo de embaralhamento de transmissão.

Ao final do processamento de um lote, e antes do início dos próximos lotes, a transmissão estruturada verifica o andamento e publica métricas. A duração dos lotes afeta a frequência dos checkpoints:

  • Lotes mais longos : Pontos de verificação menos frequentes, o que significa repetições mais longas em caso de falha e disponibilidade de métricas atrasada.
  • Lotes mais curtos : Pontos de verificação mais frequentes, o que pode afetar a latência.

A Databricks recomenda comparar o modo em tempo real com sua carga de trabalho alvo para encontrar o intervalo de ativação apropriado.

Quando usar o modo em tempo real

Selecione o modo em tempo real quando o seu caso de uso assim o exigir:

  • Latência inferior a um segundo : Aplicações que precisam responder a dados em milissegundos, como sistemas de detecção de fraudes que devem bloquear transações em tempo real.
  • Tomada de decisões operacionais : Sistemas que acionam ações imediatas com base em dados recebidos, como ofertas em tempo real, alertas ou notificações.
  • Processamento contínuo : cargas de trabalho em que os dados devem ser processados assim que chegam, em vez de em lotes periódicos.

Utilize o modo microlotes (acionador de transmissão estruturada default ) quando:

  • Processamento analítico : pipeline ETL , transformações de dados e implementações de arquitetura medallion, onde os requisitos de latência são medidos em segundos ou minutos.
  • Otimização de custos : cargas de trabalho onde a latência inferior a um segundo não é necessária, visto que o modo tempo real requer recursos compute dedicados.
  • A frequência dos pontos de verificação é importante : Aplicações que se beneficiam de pontos de verificação mais frequentes para uma recuperação mais rápida.

Requisitos e configuração

O modo tempo real possui requisitos específicos para configuração compute e configuração de consulta. Esta seção descreve os pré-requisitos e a configuração dos passos necessários para usar o modo tempo real.

Pré-requisitos

Para usar o modo em tempo real, você deve atender aos seguintes requisitos:

  • Databricks Runtime 16.4 LTS ou superior : o modo tempo real está disponível apenas no DBR 16.4 LTS e versões posteriores.
  • computededicada : Você deve usar um compute dedicado (anteriormente, de usuário único). Os pipelines Standard (anteriormente compartilhados), LakeFlow Spark Declarative e clusters serverless não são suportados.
  • Sem escala automática : a escala automática deve estar desativada.
  • Sem Photon : A aceleração Photon não é compatível com o modo Tempo Real.
  • Configuração do Spark : Você deve definir spark.databricks.streaming.realTimeMode.enabled para true.

configuração de computação

Configure seu compute com as seguintes configurações:

  • Defina spark.databricks.streaming.realTimeMode.enabled como true na configuração do Spark.
  • Desative a escala automática.
  • Desativar aceleração Photon .
  • Certifique-se de que o compute esteja configurado como um cluster dedicado (não padrão, pipeline declarativo LakeFlow Spark ou serverless).

Para obter instruções passo a passo sobre como criar e configurar o modo compute para tempo real, consulte Começar com o modo tempo real.

Configuração de consulta

Para executar uma consulta no modo tempo real, é necessário habilitar o trigger tempo real. Os gatilhos em tempo real são suportados apenas no modo de atualização.

Python
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.outputMode("update")
# In PySpark, the realTime trigger requires specifying the interval.
.trigger(realTime="5 minutes")
.start()
)

dimensionamento de cálculo

Você pode executar um Job de tempo real por recurso compute se a compute tiver slots de tarefa suficientes.

Para execução no modo de baixa latência, o número total de slots de tarefa disponíveis deve ser maior ou igual ao número de tarefas em todas as etapas da consulta.

Exemplos de cálculo de slots

tipo de tubulação

Configuração

Vagas necessárias

Sem estado em estágio único (fonte + destino Kafka)

maxPartitions = 8

8 vagas

Com estado em dois estágios (fonte Kafka + embaralhamento)

maxPartitions = 8, embaralhar partições = 20

28 vagas (8 + 20)

Três estágios (fonte Kafka + shuffle + repartição)

maxPartitions = 8, duas fases de embaralhamento de 20 cada

48 vagas (8 + 20 + 20)

Se você não definir maxPartitions, use o número de partições no tópico do Kafka.

considerações importantes

Ao configurar seu compute, considere o seguinte:

  • Ao contrário do modo micro-lotes, a tarefa em tempo real pode permanecer parada enquanto aguarda dados, portanto, o dimensionamento correto é essencial para evitar desperdício de recursos.

  • Defina como meta um nível de utilização (por exemplo, 50%) através dos seguintes ajustes:

    • maxPartitions (para Kafka)
    • spark.sql.shuffle.partitions (para fases aleatórias)
  • A Databricks recomenda definir maxPartitions para que cada tarefa lide com várias partições do Kafka para reduzir a sobrecarga.

  • Ajuste os slots de tarefas por worker para corresponder à carga de trabalho para tarefas simples de uma etapa.

  • Para trabalhos com muitas operações de embaralhamento (shuffle), experimente para encontrar o número mínimo de partições de embaralhamento que evitam acúmulos e ajuste a partir daí. O compute não irá programar a tarefa se não tiver slots suficientes.

nota

A partir de Databricks Runtime 16.4 LTS e acima, todos os pipelines tempo real utilizam o checkpoint v2, que permite a alternância perfeita entre os modos tempo real e micro-lotes.

Técnicas de otimização

Técnica

Ativado por default

Acompanhamento de progresso assíncrono: Move a escrita no log de offset e log commit para uma thread assíncrona, reduzindo o tempo entre dois microlotes. Isso pode ajudar a reduzir a latência das consultas de transmissão sem estado.

Não

Checkpointing de estado assíncrono: Ajuda a reduzir a latência de consultas de transmissão com estado, iniciando o processamento dos próximos microlotes assim que o cálculo dos microlotes anteriores for concluído, sem esperar pelo checkpointing de estado.

Não

monitoramento e observabilidade

Medir o desempenho das consultas é essencial para cargas de trabalho em tempo real. No modo tempo real, os lotes de medição tradicionais não refletem a latência real, portanto, você precisa de abordagens alternativas.

A latência de ponta a ponta é específica para cada carga de trabalho e, às vezes, só pode ser medida com precisão por meio da lógica de negócios. Por exemplo, se o timestamp de origem for enviado para o Kafka, você pode calcular a latência como a diferença entre o timestamp de saída do Kafka e o timestamp de origem.

Você também pode estimar a latência de ponta a ponta usando as métricas e APIs integradas descritas abaixo.

integração de métricas com StreamingQueryProgress

As seguintes métricas estão incluídas no evento “ StreamingQueryProgress ”, que é automaticamente registrado no driver “ logs”. Você também pode acessá-los por meio da função de retorno de chamada onQueryProgress() do StreamingQueryListener. QueryProgressEvent.json() ou toString() incluem métricas adicionais do modo em tempo real.

  1. Latência de processamento (processingLatencyMs) . O tempo decorrido entre o momento em que a consulta no modo em tempo real lê um registro e o momento em que a consulta o grava no próximo estágio ou em um processo subsequente. Para consultas de estágio único, essa medida tem a mesma duração que a latência de ponta a ponta. O sistema reporta essas métricas por tarefa.
  2. Latência de enfileiramento de origem (sourceQueuingLatencyMs) . O tempo decorrido entre o momento em que o sistema grava um registro em um barramento de mensagens, por exemplo, o tempo de acréscimo de log no Kafka, e o momento em que a consulta em modo de tempo real lê o registro pela primeira vez. O sistema reporta essas métricas por tarefa.
  3. Latência E2E (e2eLatencyMs) . O intervalo de tempo entre o momento em que o sistema grava o registro em um barramento de mensagens e o momento em que a consulta em modo de tempo real grava o registro posteriormente. O sistema agrega essas métricas por lote em todos os registros processados por todas as tarefas.

Por exemplo:

JSON
"rtmMetrics" : {
"processingLatencyMs" : {
"P0" : 0,
"P50" : 0,
"P90" : 0,
"P95" : 0,
"P99" : 0
},
"sourceQueuingLatencyMs" : {
"P0" : 0,
"P50" : 1,
"P90" : 1,
"P95" : 2,
"P99" : 3
},
"e2eLatencyMs" : {
"P0" : 0,
"P50" : 1,
"P90" : 1,
"P95" : 2,
"P99" : 4
},

Medição de latência personalizada com a API Observe

A API Observe ajuda a medir a latência sem iniciar outra tarefa. Se você tiver um registro de data e hora da origem que se aproxime do horário de chegada dos dados da origem, poderá estimar a latência de cada lote usando a API Observe. Registre o horário antes de chegar à pia:

Python
from datetime import datetime

from pyspark.sql.functions import avg, col, lit, max, percentile_approx, udf, unix_millis
from pyspark.sql.types import TimestampType

@udf(returnType=TimestampType())
def current_timestamp():
return datetime.now()

# Query before outputting
.withColumn("temp-timestamp", current_timestamp())
.withColumn(
"latency",
unix_millis(col("temp-timestamp")).cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
"observedLatency",
avg(col("latency")).alias("avg"),
max(col("latency")).alias("max"),
percentile_approx(col("latency"), lit(0.99), lit(150)).alias("p99"),
percentile_approx(col("latency"), lit(0.5), lit(150)).alias("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
# Output part of the query. For example, .WriteStream, etc.

Neste exemplo, um timestamp atual é registrado antes da saída da entrada, e a latência é estimada calculando a diferença entre esse timestamp e o timestamp de origem do registro. Os resultados são incluídos nos relatórios de progresso e disponibilizados aos ouvintes. Aqui está um exemplo de saída:

JSON
"observedMetrics" : {
"observedLatency" : {
"avg" : 63.8369765176552,
"max" : 219,
"p99" : 154,
"p50" : 49
}
}

Suporte e limitações de recursos

Esta seção descreve os recursos suportados e as limitações atuais do modo real do Tempo, incluindo ambientes, linguagens, fontes, destinos, operadores compatíveis e considerações especiais para recursos específicos.

Ambientes, idiomas e modos suportados

Tipo de Compute

Suportado

Dedicado (anteriormente: usuário único)

Sim

Padrão (anteriormente: compartilhado)

Não

Pipeline declarativo LakeFlow Spark Clássico

Não

LakeFlow Spark Declarative pipeline serverless

Não

Serverless

Não

Idiomas suportados:

Idioma

Suportado

Scala

Sim

Java

Sim

Python

Sim

Modos de execução suportados:

Execução Mode

Suportado

Modo de atualização

Sim

Modo de anexação

Não

Modo completo

Não

Fontes e destinos suportados

Fontes:

Fontes

Suportado

Apache Kafka

Sim

AWS MSK

Sim

Hubs de eventos (usando o conector Kafka)

Sim

Kinesis

Sim (somente modo EFO)

Google Pub/Sub

Não

Apache Pulsar

Não

Pias:

Pias

Suportado

Apache Kafka

Sim

Hubs de eventos (usando o conector Kafka)

Sim

Kinesis

Não

Google Pub/Sub

Não

Apache Pulsar

Não

Dissipadores arbitrários (usando ForEachWriter)

Sim

Operadores suportados

Operadores

Suportado

Operações sem estado

Seleção

Sim

Projeção

Sim

UDFs

UDF Scala

Sim (com algumas limitações)

UDF em Python

Sim (com algumas limitações)

Agregação

soma

Sim

contar

Sim

Máx

Sim

Mín

Sim

média

Sim

Funções de agregação

Sim

Janelamento

cambalhotas

Sim

Deslizando

Sim

Sessão

Não

Desduplicação

remover duplicados

Sim (o estado é ilimitado)

removerDuplicadosDentroDaMarcaD'água

Não

transmissão - Junção de tabelas

Tabela de transmissão (deve ser pequena)

Sim

transmissão - transmissão join

Não

Grupos de mapas (planos) com estado

Não

Transformar com estado

Sim (com algumas diferenças)

união

Sim (com algumas limitações)

Para cada

Sim

Para cada lote

Não

Partições do mapa

Não (ver limitação)

Considerações especiais

Alguns operadores e recursos têm considerações ou diferenças específicas quando usados no modo tempo real.

transformWithState em modo de tempo real

Para a criação de aplicativos personalizados com estado, o Databricks suporta transformWithState, um API em Apache Spark transmissão estruturada. Consulte Criar um aplicativo com estado personalizado para obter mais informações sobre o API e trechos de código.

No entanto, existem algumas diferenças entre o comportamento do API no modo tempo real e as consultas de transmissão tradicionais que utilizam a arquitetura de micro-lotes.

  • O modo em tempo real chama o método handleInputRows(key: String, inputRows: Iterator[T], timerValues: TimerValues) para cada linha.

    • O iterador inputRows retorna um único valor. O modo micro-lotes chama-o uma vez para cada key, e o iterador inputRows retorna todos os valores para uma key nos micro-lotes.
    • Você deve estar ciente dessa diferença ao escrever seu código.
  • Os temporizadores de eventos não são suportados no modo em tempo real.

  • No modo em tempo real, os temporizadores têm seu disparo atrasado dependendo da chegada dos dados:

    • Se um temporizador estiver agendado para 10:00:00, mas nenhum dado chegar, o temporizador não será acionado imediatamente.
    • Se os dados chegarem às 10:00:10, o temporizador será acionado com um atraso de 10 segundos.
    • Caso nenhum dado chegue e o lote de longa duração esteja sendo encerrado, o temporizador é acionado antes do término do lote.

UDFs em Python no modo de tempo real

O Databricks suporta a maioria das funções definidas pelo usuário (UDFs) do Python no modo em tempo real:

Tipo de UDF

Suportado

UDF sem estado

UDF escalar em Python (link)

Sim

UDF escalar de seta

Sim

UDF escalar do Pandas (link)

Sim

Função de seta (mapInArrow)

Sim

Função Pandas (link)

Sim

UDF de agrupamento com estado (UDAF)

transformWithState (somente interface Row )

Sim

aplicarEmPandasComEstado

Não

UDF de agrupamento não fatídico (UDAF)

aplicar

Não

aplicarNaSeta

Não

applyInPandas

Não

Função de tabela

UDTF (link)

Não

UC UDF

Não

Há vários pontos a serem considerados ao usar UDFs do Python no modo de tempo real:

  • Para minimizar a latência, configure o tamanho dos lotes Arrow (spark.sql.execution.arrow.maxRecordsPerBatch) para 1.

    • Compensação: essa configuração otimiza a latência em detrimento da taxa de transferência. Para a maioria das cargas de trabalho, essa configuração é recomendada.
    • Aumente o tamanho dos lotes somente se for necessária uma taxa de transferência maior para acomodar o volume de entrada, aceitando o possível aumento na latência.
  • Pandas Os UDFs e as funções não funcionam bem com um tamanho de lote Arrow de 1.

    • Se o senhor usar Pandas UDFs ou funções, defina o tamanho dos lotes Arrow para um valor mais alto (por exemplo, 100 ou mais).
    • Observe que isso implica maior latência. A Databricks recomenda o uso do Arrow UDF ou da função, se possível.
  • Devido ao problema de desempenho com Pandas, transformWithState só é compatível com a interface Row.

Limitações

Limitações da fonte

Para o Kinesis, o modo em tempo real não suporta o modo de polling. Além disso, repartições frequentes podem afetar negativamente a latência.

Limitações sindicais

O operador Union apresenta algumas limitações:

  • O modo em tempo real não suporta auto-união:

    • Kafka : Não é possível usar o mesmo objeto de dataframe de origem e unir dataframes derivados dele. Solução alternativa: Utilize DataFrames diferentes que leiam da mesma fonte.
    • Kinesis : Não é possível unir quadros de dados derivados da mesma fonte Kinesis com a mesma configuração. Solução alternativa: além de usar DataFrames diferentes, você pode atribuir uma opção 'consumerName' diferente a cada DataFrame.
  • O modo em tempo real não suporta operadores com estado (por exemplo, aggregate, deduplicate, transformWithState) definidos antes da União.

  • O modo tempo real não suporta união com várias fontes.

Limitação do MapPartitions

mapPartitions Em Scala e APIs Python semelhantes (mapInPandas, mapInArrow) recebem um iterador de toda a partição de entrada e produzem um iterador de toda a saída com mapeamento arbitrário entre entrada e saída. Essas APIs podem causar problemas de desempenho no Mode de transmissão em tempo real, bloqueando toda a saída, o que aumenta a latência. A semântica dessas APIs não oferece bom suporte à propagação de marcas d'água.

Use UDFs escalares combinadas com Transform tipos de dados complexos ou filter em vez disso para obter funcionalidade semelhante.

Próximos passos

Agora que você entende o que é o modo tempo real e como configurá-lo, explore estes recursos para começar a implementar aplicações de transmissão em tempo real: