modo tempo real em transmissão estruturada
Visualização
Esse recurso está em Public Preview.
O modo tempo real é um tipo de gatilho para transmissão estruturada que permite o processamento de dados com latência ultrabaixa, com latência de ponta a ponta de apenas cinco milissegundos. Utilize o modo tempo real para cargas de trabalho operacionais que exigem resposta imediata aos dados transmitidos, como detecção de fraudes, personalização em tempo real e sistemas de tomada de decisão instantânea.
O modo tempo real está disponível no Databricks Runtime 16.4 LTS e versões superiores. Para obter instruções de configuração passo a passo, consulte Começar com o modo tempo real. Para exemplos de código, consulte Exemplos do modo em tempo real.
O que é o modo em tempo real?
Cargas de trabalho operacionais versus analíticas
As cargas de trabalho de transmissão podem ser amplamente divididas em cargas de trabalho analíticas e cargas de trabalho operacionais:
- As cargas de trabalho analíticas utilizam a ingestão de dados e transformações, seguindo normalmente a arquitetura medalhão (por exemplo, ingestão de dados nas tabelas bronze, prata e ouro).
- As cargas de trabalho operacionais consomem dados em tempo real, aplicam lógica de negócios e acionam ações ou decisões posteriores.
Alguns exemplos de cargas de trabalho operacionais são:
- Bloquear ou sinalizar uma transação com cartão de crédito em tempo real se uma pontuação de fraude exceder um limite, com base em fatores como localização incomum, grande valor da transação ou padrões de gastos rápidos.
- Entregar uma mensagem promocional quando os dados do clickstream mostram que um usuário está procurando jeans há cinco minutos, oferecendo um desconto de 25% se ele comprar nos próximos 15 minutos.
De forma geral, as cargas de trabalho operacionais são caracterizadas pela necessidade de latência de ponta a ponta inferior a um segundo. Isso pode ser alcançado com o modo tempo real no Apache Spark transmissão estruturada.
Como o modo em tempo real alcança baixa latência
O modo em tempo real aprimora a arquitetura de execução ao:
- Executar lotes de longa duração (o default é de cinco minutos), nos quais o sistema processa os dados à medida que ficam disponíveis na fonte.
- programar todas as etapas da consulta simultaneamente. Isso exige que o número de vagas de tarefas disponíveis seja igual ou maior que o número de tarefas de todos os estágios em um lote.
- A transmissão de dados entre estágios ocorre assim que os dados são produzidos, utilizando um mecanismo de embaralhamento de transmissão.
Ao final do processamento de um lote, e antes do início dos próximos lotes, a transmissão estruturada verifica o andamento e publica métricas. A duração dos lotes afeta a frequência dos checkpoints:
- Lotes mais longos : Pontos de verificação menos frequentes, o que significa repetições mais longas em caso de falha e disponibilidade de métricas atrasada.
- Lotes mais curtos : Pontos de verificação mais frequentes, o que pode afetar a latência.
A Databricks recomenda comparar o modo em tempo real com sua carga de trabalho alvo para encontrar o intervalo de ativação apropriado.
Quando usar o modo em tempo real
Selecione o modo em tempo real quando o seu caso de uso assim o exigir:
- Latência inferior a um segundo : Aplicações que precisam responder a dados em milissegundos, como sistemas de detecção de fraudes que devem bloquear transações em tempo real.
- Tomada de decisões operacionais : Sistemas que acionam ações imediatas com base em dados recebidos, como ofertas em tempo real, alertas ou notificações.
- Processamento contínuo : cargas de trabalho em que os dados devem ser processados assim que chegam, em vez de em lotes periódicos.
Utilize o modo microlotes (acionador de transmissão estruturada default ) quando:
- Processamento analítico : pipeline ETL , transformações de dados e implementações de arquitetura medallion, onde os requisitos de latência são medidos em segundos ou minutos.
- Otimização de custos : cargas de trabalho onde a latência inferior a um segundo não é necessária, visto que o modo tempo real requer recursos compute dedicados.
- A frequência dos pontos de verificação é importante : Aplicações que se beneficiam de pontos de verificação mais frequentes para uma recuperação mais rápida.
Requisitos e configuração
O modo tempo real possui requisitos específicos para configuração compute e configuração de consulta. Esta seção descreve os pré-requisitos e a configuração dos passos necessários para usar o modo tempo real.
Pré-requisitos
Para usar o modo em tempo real, você deve atender aos seguintes requisitos:
- Databricks Runtime 16.4 LTS ou superior : o modo tempo real está disponível apenas no DBR 16.4 LTS e versões posteriores.
- computededicada : Você deve usar um compute dedicado (anteriormente, de usuário único). Os pipelines Standard (anteriormente compartilhados), LakeFlow Spark Declarative e clusters serverless não são suportados.
- Sem escala automática : a escala automática deve estar desativada.
- Sem Photon : A aceleração Photon não é compatível com o modo Tempo Real.
- Configuração do Spark : Você deve definir
spark.databricks.streaming.realTimeMode.enabledparatrue.
configuração de computação
Configure seu compute com as seguintes configurações:
- Defina
spark.databricks.streaming.realTimeMode.enabledcomotruena configuração do Spark. - Desative a escala automática.
- Desativar aceleração Photon .
- Certifique-se de que o compute esteja configurado como um cluster dedicado (não padrão, pipeline declarativo LakeFlow Spark ou serverless).
Para obter instruções passo a passo sobre como criar e configurar o modo compute para tempo real, consulte Começar com o modo tempo real.
Configuração de consulta
Para executar uma consulta no modo tempo real, é necessário habilitar o trigger tempo real. Os gatilhos em tempo real são suportados apenas no modo de atualização.
- Python
- Scala
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.outputMode("update")
# In PySpark, the realTime trigger requires specifying the interval.
.trigger(realTime="5 minutes")
.start()
)
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
val readStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic).load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.outputMode("update")
.trigger(RealTimeTrigger.apply())
// RealTimeTrigger can also accept an argument specifying the checkpoint interval.
// For example, this code indicates a checkpoint interval of 5 minutes:
// .trigger(RealTimeTrigger.apply("5 minutes"))
.start()
dimensionamento de cálculo
Você pode executar um Job de tempo real por recurso compute se a compute tiver slots de tarefa suficientes.
Para execução no modo de baixa latência, o número total de slots de tarefa disponíveis deve ser maior ou igual ao número de tarefas em todas as etapas da consulta.
Exemplos de cálculo de slots
tipo de tubulação | Configuração | Vagas necessárias |
|---|---|---|
Sem estado em estágio único (fonte + destino Kafka) |
| 8 vagas |
Com estado em dois estágios (fonte Kafka + embaralhamento) |
| 28 vagas (8 + 20) |
Três estágios (fonte Kafka + shuffle + repartição) |
| 48 vagas (8 + 20 + 20) |
Se você não definir maxPartitions, use o número de partições no tópico do Kafka.
considerações importantes
Ao configurar seu compute, considere o seguinte:
-
Ao contrário do modo micro-lotes, a tarefa em tempo real pode permanecer parada enquanto aguarda dados, portanto, o dimensionamento correto é essencial para evitar desperdício de recursos.
-
Defina como meta um nível de utilização (por exemplo, 50%) através dos seguintes ajustes:
maxPartitions(para Kafka)spark.sql.shuffle.partitions(para fases aleatórias)
-
A Databricks recomenda definir
maxPartitionspara que cada tarefa lide com várias partições do Kafka para reduzir a sobrecarga. -
Ajuste os slots de tarefas por worker para corresponder à carga de trabalho para tarefas simples de uma etapa.
-
Para trabalhos com muitas operações de embaralhamento (shuffle), experimente para encontrar o número mínimo de partições de embaralhamento que evitam acúmulos e ajuste a partir daí. O compute não irá programar a tarefa se não tiver slots suficientes.
A partir de Databricks Runtime 16.4 LTS e acima, todos os pipelines tempo real utilizam o checkpoint v2, que permite a alternância perfeita entre os modos tempo real e micro-lotes.
Técnicas de otimização
Técnica | Ativado por default |
|---|---|
Acompanhamento de progresso assíncrono: Move a escrita no log de offset e log commit para uma thread assíncrona, reduzindo o tempo entre dois microlotes. Isso pode ajudar a reduzir a latência das consultas de transmissão sem estado. | Não |
Checkpointing de estado assíncrono: Ajuda a reduzir a latência de consultas de transmissão com estado, iniciando o processamento dos próximos microlotes assim que o cálculo dos microlotes anteriores for concluído, sem esperar pelo checkpointing de estado. | Não |
monitoramento e observabilidade
Medir o desempenho das consultas é essencial para cargas de trabalho em tempo real. No modo tempo real, os lotes de medição tradicionais não refletem a latência real, portanto, você precisa de abordagens alternativas.
A latência de ponta a ponta é específica para cada carga de trabalho e, às vezes, só pode ser medida com precisão por meio da lógica de negócios. Por exemplo, se o timestamp de origem for enviado para o Kafka, você pode calcular a latência como a diferença entre o timestamp de saída do Kafka e o timestamp de origem.
Você também pode estimar a latência de ponta a ponta usando as métricas e APIs integradas descritas abaixo.
integração de métricas com StreamingQueryProgress
As seguintes métricas estão incluídas no evento “ StreamingQueryProgress ”, que é automaticamente registrado no driver “ logs”. Você também pode acessá-los por meio da função de retorno de chamada onQueryProgress() do StreamingQueryListener. QueryProgressEvent.json() ou toString() incluem métricas adicionais do modo em tempo real.
- Latência de processamento (processingLatencyMs) . O tempo decorrido entre o momento em que a consulta no modo em tempo real lê um registro e o momento em que a consulta o grava no próximo estágio ou em um processo subsequente. Para consultas de estágio único, essa medida tem a mesma duração que a latência de ponta a ponta. O sistema reporta essas métricas por tarefa.
- Latência de enfileiramento de origem (sourceQueuingLatencyMs) . O tempo decorrido entre o momento em que o sistema grava um registro em um barramento de mensagens, por exemplo, o tempo de acréscimo de log no Kafka, e o momento em que a consulta em modo de tempo real lê o registro pela primeira vez. O sistema reporta essas métricas por tarefa.
- Latência E2E (e2eLatencyMs) . O intervalo de tempo entre o momento em que o sistema grava o registro em um barramento de mensagens e o momento em que a consulta em modo de tempo real grava o registro posteriormente. O sistema agrega essas métricas por lote em todos os registros processados por todas as tarefas.
Por exemplo:
"rtmMetrics" : {
"processingLatencyMs" : {
"P0" : 0,
"P50" : 0,
"P90" : 0,
"P95" : 0,
"P99" : 0
},
"sourceQueuingLatencyMs" : {
"P0" : 0,
"P50" : 1,
"P90" : 1,
"P95" : 2,
"P99" : 3
},
"e2eLatencyMs" : {
"P0" : 0,
"P50" : 1,
"P90" : 1,
"P95" : 2,
"P99" : 4
},
Medição de latência personalizada com a API Observe
A API Observe ajuda a medir a latência sem iniciar outra tarefa. Se você tiver um registro de data e hora da origem que se aproxime do horário de chegada dos dados da origem, poderá estimar a latência de cada lote usando a API Observe. Registre o horário antes de chegar à pia:
- Python
- Scala
from datetime import datetime
from pyspark.sql.functions import avg, col, lit, max, percentile_approx, udf, unix_millis
from pyspark.sql.types import TimestampType
@udf(returnType=TimestampType())
def current_timestamp():
return datetime.now()
# Query before outputting
.withColumn("temp-timestamp", current_timestamp())
.withColumn(
"latency",
unix_millis(col("temp-timestamp")).cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
"observedLatency",
avg(col("latency")).alias("avg"),
max(col("latency")).alias("max"),
percentile_approx(col("latency"), lit(0.99), lit(150)).alias("p99"),
percentile_approx(col("latency"), lit(0.5), lit(150)).alias("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
# Output part of the query. For example, .WriteStream, etc.
import org.apache.spark.sql.functions.{avg, col, lit, max, percentile_approx, udf, unix_millis}
val currentTimestampUDF = udf(() => System.currentTimeMillis())
// Query before outputting
.withColumn("temp-timestamp", currentTimestampUDF())
.withColumn(
"latency",
col("temp-timestamp").cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
name = "observedLatency",
avg(col("latency")).as("avg"),
max(col("latency")).as("max"),
percentile_approx(col("latency"), lit(0.99), lit(150)).as("p99"),
percentile_approx(col("latency"), lit(0.5), lit(150)).as("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
// Output part of the query. For example, .WriteStream, etc.
Neste exemplo, um timestamp atual é registrado antes da saída da entrada, e a latência é estimada calculando a diferença entre esse timestamp e o timestamp de origem do registro. Os resultados são incluídos nos relatórios de progresso e disponibilizados aos ouvintes. Aqui está um exemplo de saída:
"observedMetrics" : {
"observedLatency" : {
"avg" : 63.8369765176552,
"max" : 219,
"p99" : 154,
"p50" : 49
}
}
Suporte e limitações de recursos
Esta seção descreve os recursos suportados e as limitações atuais do modo real do Tempo, incluindo ambientes, linguagens, fontes, destinos, operadores compatíveis e considerações especiais para recursos específicos.
Ambientes, idiomas e modos suportados
Tipo de Compute | Suportado |
|---|---|
Dedicado (anteriormente: usuário único) | Sim |
Padrão (anteriormente: compartilhado) | Não |
Pipeline declarativo LakeFlow Spark Clássico | Não |
LakeFlow Spark Declarative pipeline serverless | Não |
Serverless | Não |
Idiomas suportados:
Idioma | Suportado |
|---|---|
Scala | Sim |
Java | Sim |
Python | Sim |
Modos de execução suportados:
Execução Mode | Suportado |
|---|---|
Modo de atualização | Sim |
Modo de anexação | Não |
Modo completo | Não |
Fontes e destinos suportados
Fontes:
Fontes | Suportado |
|---|---|
Apache Kafka | Sim |
AWS MSK | Sim |
Hubs de eventos (usando o conector Kafka) | Sim |
Kinesis | Sim (somente modo EFO) |
Google Pub/Sub | Não |
Apache Pulsar | Não |
Pias:
Pias | Suportado |
|---|---|
Apache Kafka | Sim |
Hubs de eventos (usando o conector Kafka) | Sim |
Kinesis | Não |
Google Pub/Sub | Não |
Apache Pulsar | Não |
Dissipadores arbitrários (usando ForEachWriter) | Sim |
Operadores suportados
Operadores | Suportado |
|---|---|
Operações sem estado | |
Seleção | Sim |
Projeção | Sim |
UDFs | |
UDF Scala | Sim (com algumas limitações) |
UDF em Python | Sim (com algumas limitações) |
Agregação | |
soma | Sim |
contar | Sim |
Máx | Sim |
Mín | Sim |
média | Sim |
Sim | |
Janelamento | |
cambalhotas | Sim |
Deslizando | Sim |
Sessão | Não |
Desduplicação | |
remover duplicados | Sim (o estado é ilimitado) |
removerDuplicadosDentroDaMarcaD'água | Não |
transmissão - Junção de tabelas | |
Tabela de transmissão (deve ser pequena) | Sim |
transmissão - transmissão join | Não |
Grupos de mapas (planos) com estado | Não |
Transformar com estado | Sim (com algumas diferenças) |
união | Sim (com algumas limitações) |
Para cada | Sim |
Para cada lote | Não |
Partições do mapa | Não (ver limitação) |
Considerações especiais
Alguns operadores e recursos têm considerações ou diferenças específicas quando usados no modo tempo real.
transformWithState em modo de tempo real
Para a criação de aplicativos personalizados com estado, o Databricks suporta transformWithState, um API em Apache Spark transmissão estruturada. Consulte Criar um aplicativo com estado personalizado para obter mais informações sobre o API e trechos de código.
No entanto, existem algumas diferenças entre o comportamento do API no modo tempo real e as consultas de transmissão tradicionais que utilizam a arquitetura de micro-lotes.
-
O modo em tempo real chama o método
handleInputRows(key: String, inputRows: Iterator[T], timerValues: TimerValues)para cada linha.- O iterador
inputRowsretorna um único valor. O modo micro-lotes chama-o uma vez para cada key, e o iteradorinputRowsretorna todos os valores para uma key nos micro-lotes. - Você deve estar ciente dessa diferença ao escrever seu código.
- O iterador
-
Os temporizadores de eventos não são suportados no modo em tempo real.
-
No modo em tempo real, os temporizadores têm seu disparo atrasado dependendo da chegada dos dados:
- Se um temporizador estiver agendado para 10:00:00, mas nenhum dado chegar, o temporizador não será acionado imediatamente.
- Se os dados chegarem às 10:00:10, o temporizador será acionado com um atraso de 10 segundos.
- Caso nenhum dado chegue e o lote de longa duração esteja sendo encerrado, o temporizador é acionado antes do término do lote.
UDFs em Python no modo de tempo real
O Databricks suporta a maioria das funções definidas pelo usuário (UDFs) do Python no modo em tempo real:
Tipo de UDF | Suportado |
|---|---|
UDF sem estado | |
UDF escalar em Python (link) | Sim |
UDF escalar de seta | Sim |
UDF escalar do Pandas (link) | Sim |
Função de seta ( | Sim |
Função Pandas (link) | Sim |
UDF de agrupamento com estado (UDAF) | |
transformWithState (somente interface | Sim |
aplicarEmPandasComEstado | Não |
UDF de agrupamento não fatídico (UDAF) | |
aplicar | Não |
aplicarNaSeta | Não |
applyInPandas | Não |
Função de tabela | |
UDTF (link) | Não |
UC UDF | Não |
Há vários pontos a serem considerados ao usar UDFs do Python no modo de tempo real:
-
Para minimizar a latência, configure o tamanho dos lotes Arrow (
spark.sql.execution.arrow.maxRecordsPerBatch) para 1.- Compensação: essa configuração otimiza a latência em detrimento da taxa de transferência. Para a maioria das cargas de trabalho, essa configuração é recomendada.
- Aumente o tamanho dos lotes somente se for necessária uma taxa de transferência maior para acomodar o volume de entrada, aceitando o possível aumento na latência.
-
Pandas Os UDFs e as funções não funcionam bem com um tamanho de lote Arrow de 1.
- Se o senhor usar Pandas UDFs ou funções, defina o tamanho dos lotes Arrow para um valor mais alto (por exemplo, 100 ou mais).
- Observe que isso implica maior latência. A Databricks recomenda o uso do Arrow UDF ou da função, se possível.
-
Devido ao problema de desempenho com Pandas, transformWithState só é compatível com a interface
Row.
Limitações
Limitações da fonte
Para o Kinesis, o modo em tempo real não suporta o modo de polling. Além disso, repartições frequentes podem afetar negativamente a latência.
Limitações sindicais
O operador Union apresenta algumas limitações:
-
O modo em tempo real não suporta auto-união:
- Kafka : Não é possível usar o mesmo objeto de dataframe de origem e unir dataframes derivados dele. Solução alternativa: Utilize DataFrames diferentes que leiam da mesma fonte.
- Kinesis : Não é possível unir quadros de dados derivados da mesma fonte Kinesis com a mesma configuração. Solução alternativa: além de usar DataFrames diferentes, você pode atribuir uma opção 'consumerName' diferente a cada DataFrame.
-
O modo em tempo real não suporta operadores com estado (por exemplo,
aggregate,deduplicate,transformWithState) definidos antes da União. -
O modo tempo real não suporta união com várias fontes.
Limitação do MapPartitions
mapPartitions Em Scala e APIs Python semelhantes (mapInPandas, mapInArrow) recebem um iterador de toda a partição de entrada e produzem um iterador de toda a saída com mapeamento arbitrário entre entrada e saída. Essas APIs podem causar problemas de desempenho no Mode de transmissão em tempo real, bloqueando toda a saída, o que aumenta a latência. A semântica dessas APIs não oferece bom suporte à propagação de marcas d'água.
Use UDFs escalares combinadas com Transform tipos de dados complexos ou filter em vez disso para obter funcionalidade semelhante.
Próximos passos
Agora que você entende o que é o modo tempo real e como configurá-lo, explore estes recursos para começar a implementar aplicações de transmissão em tempo real:
- Comece com o modo tempo real - Siga as instruções passo a passo para configurar compute e a execução de sua primeira consulta de transmissão tempo real.
- Exemplos de código em modo de tempo real - Explore exemplos práticos, incluindo fontes e destinos Kafka, consultas com estado, agregações e destinos personalizados.
- Conceitos de transmissão estruturada - Aprenda os conceitos básicos de transmissão estruturada no Databricks.