Pular para o conteúdo principal

Use o modo em tempo real em Lakeflow Spark Pipelines Declarativos

info

Pré-lançamento público

O modo em tempo real no LakeFlow Spark Declarative Pipelines está em Pré-visualização Pública na versão Databricks Runtime 18.1.2. no canal de pré-visualização.

O modo de tempo real permite o processamento de dados de latência ultrabaixa, com latência de ponta a ponta tão baixa quanto cinco milissegundos. Use o modo em tempo real para cargas de trabalho operacionais que exigem resposta imediata a dados de transmissão, como detecção de fraudes e personalização em tempo real.

O modo em tempo real também está disponível diretamente na Transmissão Estructurada fora de pipelines. Consulte modo em tempo real em transmissão estructurada.

Como o modo em tempo real consegue baixa latência

O modo em tempo real difere do processamento contínuo padrão de três maneiras principais:

  • Lotes de longa duração: O sistema processa os dados à medida que ficam disponíveis na origem em lotes de longa duração (o default é de cinco minutos).
  • Agendamento simultâneo de etapas : todas as etapas da query são agendadas ao mesmo tempo. O recurso de compute deve ter slots de tarefa disponíveis suficientes para cobrir todos os estágios simultaneamente. Consulte Dimensionamento da computação.
  • Embaralhamento de transmissão: Os dados são transmitidos entre estágios assim que são produzidos, em vez de esperar que um estágio upstream seja concluído antes de iniciar o estágio downstream.

O intervalo do ponto de verificação (configurado via pipelines.trigger.interval) controla com que frequência o estado e os deslocamentos da origem são persistidos em armazenamento durável. Intervalos mais longos reduzem a sobrecarga de checkpointing, mas aumentam o tempo de recuperação após uma falha e atrasam a comunicação de métricas. Intervalos menores melhoram a durabilidade, mas adicionam sobrecarga.

Modo em tempo real e pipelines contínuos

O modo em tempo real é um tipo especializado de trigger contínuo. O modo contínuo ainda é necessário; o modo em tempo real acrescenta otimizações de latência no nível do fluxo. Para usar o modo em tempo real, o pipeline deve primeiro ser executado no modo contínuo. O modo em tempo real então aplica otimizações adicionais no nível do fluxo para atingir latência de sub-segundo além do que o processamento contínuo padrão oferece.

Habilitar o modo em tempo real requer três os passos de configuração:

  1. Usar o pipeline no modo contínuo.
  2. Habilitar o modo em tempo real no nível do pipeline.
  3. Defina um fluxo de atualização em tempo real.

Requisitos

Requisito

Valor

Runtime do Databricks

18.1.2 no canal de pré-visualização do SDP

Tipo de Compute

compute Classic ou serverless

Configurar modo em tempo real

O passo 1: Defina o pipeline para o modo contínuo

Nas configurações do seu pipeline, defina o **modo do Pipeline** como **Contínuo**, ou defina-o no JSON do pipeline:

JSON
{
"continuous": true
}

O passo 2: Habilitar o modo em tempo real no nível do pipeline

Nas configurações do seu pipeline, adicione a seguinte key à configuração do Spark em Avançado > Configuração do Spark :

ini
spark.databricks.streaming.realTimeMode.enabled = true

Você também pode definir isso no pipeline JSON:

JSON
{
"continuous": true,
"spark_conf": {
"spark.databricks.streaming.realTimeMode.enabled": "true"
}
}

O passo 3: Defina um fluxo de atualização em tempo real

O modo em tempo real exige um fluxo de atualização. Use dp.create_sink() para definir o destino da saída, e então use o decorador @dp.update_flow com pipelines.trigger definido como "RealTime" e target apontando para o coletor.

Python
from pyspark import pipelines as dp

# Define the output sink
dp.create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "<bootstrap-servers>",
"topic": "<output-topic>",
}
)

# Define the real-time update flow targeting the sink
@dp.update_flow(
name="my_rtm_flow",
target="my_kafka_sink",
spark_conf={
&quot;pipelines.trigger&quot;: &quot;RealTime&quot;,
&quot;pipelines.trigger.interval&quot;: &quot;5 minutes&quot;, # optional; defaults to 5 minutes
}
)
def my_real_time_flow():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<bootstrap-servers>")
.option("subscribe", "<input-topic>")
.load()
)

Parâmetros de configuração de fluxo:

Parâmetro

Obrigatório

Padrão

Descrição

pipelines.trigger

Sim

Defina como "RealTime" para habilitar o modo em tempo real para este fluxo.

pipelines.trigger.interval

Não

"5 minutes"

Intervalo de ponto de verificação. Controla a frequência com que o estado e os deslocamentos são confirmados. Valores mais curtos melhoram a recuperabilidade; valores mais longos reduzem a sobrecarga.

Exemplos de código

Kafka para Kafka

Ler de um tópico Kafka e gravar em um destino de saída Kafka:

Python
from pyspark import pipelines as dp

dp.create_sink("kafka_output_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})

@dp.update_flow(
name="kafka_rtm_flow",
target="kafka_output_sink",
spark_conf={
&quot;pipelines.trigger&quot;: &quot;RealTime&quot;,
&quot;pipelines.trigger.interval&quot;: &quot;5 minutes&quot;,
}
)
def kafka_rtm_flow():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.option("startingOffsets", "latest")
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
)

Enriquecer com um join de transmissão

Join uma transmissão Kafka a uma tabela de pesquisa estática. Apenas junções de transmissão (de fluxo para estático) são compatíveis. Joins de transmissão para transmissão não são compatíveis no modo de tempo real.

Python
from pyspark import pipelines as dp
from pyspark.sql.functions import broadcast, expr

dp.create_sink("enriched_output_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": enriched_output_topic,
})

@dp.update_flow(
name="enriched_events_flow",
target="enriched_output_sink",
spark_conf={
&quot;pipelines.trigger&quot;: &quot;RealTime&quot;,
&quot;pipelines.trigger.interval&quot;: &quot;5 minutes&quot;,
}
)
def enriched_events():
lookup = spark.read.table("catalog.schema.lookup_table")
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.withColumn("event_key", expr("CAST(value AS STRING)"))
.join(broadcast(lookup), expr("event_key = lookup_key"))
.select("event_key", "lookup_value", "timestamp")
)

Agregação

Contar eventos por key usando um groupBy com estado. Defina spark.sql.shuffle.partitions para corresponder à contagem de partições de entrada para operações com estado:

Python
from pyspark import pipelines as dp
from pyspark.sql.functions import col

dp.create_sink("event_counts_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})

@dp.update_flow(
name="event_counts_flow",
target="event_counts_sink",
spark_conf={
&quot;pipelines.trigger&quot;: &quot;RealTime&quot;,
&quot;pipelines.trigger.interval&quot;: &quot;5 minutes&quot;,
&quot;spark.sql.shuffle.partitions&quot;: &quot;8&quot;,
}
)
def event_counts():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.selectExpr("CAST(key AS STRING) AS event_type", "timestamp")
.groupBy(col("event_type"))
.count()
)

Fontes e destinos compatíveis

Conector

Como fonte

Como pia

Notas

Apache Kafka

AWS MSK

Usa a interface compatível com Kafka.

Azure Event Hubs (conector Kafka)

Usa a interface compatível com Kafka.

Amazon Kinesis

Não suportado

Usar apenas para o modo EFO (Fan-Out Aprimorado).

Delta

Não suportado

Não suportado

compute sizing

É possível executar um pipeline em tempo real por recurso de compute, se o compute tiver slots de tarefa suficientes. Os slots de tarefa disponíveis devem cobrir todas as tarefas em todas as etapas de consulta.

Tipo de pipeline

Configuração

Espaços de tarefas obrigatórios

Sem estado de estágio único (origem + destino Kafka)

maxPartitions = 8

8

Com estado de duas etapas (fonte Kafka + shuffle)

maxPartitions = 8, partições de embaralhamento = 20

28 (8 + 20)

Três estágios (fonte Kafka + dois embaralhamentos)

maxPartitions = 8, dois estágios de shuffle de 20 cada

48 (8 + 20 + 20)

Caso não defina maxPartitions, use o número de partições no tópico Kafka.

Suporte do operador

Categoria

Operador

Compatível

Sem estado

Seleção, Projeção

UDFs

UDF Scala

✓ (com limitações)

UDFs

UDF Python

✓ (com limitações)

Agregação

soma, contagem, máx., mín., média

Janelamento

Tombamento, deslizamento

Janelamento

Sessão

Não suportado

Desduplicação

dropDuplicates

✓ (estado ilimitado)

Desduplicação

dropDuplicatesWithinWatermark

Não suportado

join

Join de tabela de broadcast

join

Join entre transmissões

Não suportado

Personalizada

transformWithState

✓ (com diferenças comportamentais)

Personalizada

union

✓ (com limitações)

Personalizada

forEach

Não suportado

Personalizada

flatMapGroupsWithState

Não suportado

Personalizada

mapPartitions

Não suportado

Personalizada

forEachBatch

Não suportado

transformWithState em modo de tempo real

transformWithState é suportado em tempo real com as seguintes diferenças em relação ao processamento de micro-lotes:

  • handleInputRows é invocado uma vez por linha em vez de uma vez por key por lote. O inputRows iterador produz um único valor por invocação.
  • Cronômetros de tempo de evento não são compatíveis. Temporizadores de tempo de processamento disparam quando um lote de longa duração é encerrado se nenhum dado tiver chegado.
  • transformWithStateInPandas Não é compatível.

Pandas UDFs em tempo real

Para minimizar a latência com UDFs pandas, defina spark.sql.execution.arrow.maxRecordsPerBatch para 1. Isso otimiza a latência em detrimento da Taxa de transferência. Se a taxa de transferência também for importante, defina este valor como 100 ou superior.

Monitorar o desempenho do modo em tempo real

O modo em tempo real expõe as métricas de latência em StreamingQueryProgress no campo latencies. Acesse essas métricas por meio de um StreamingQueryListener ou inspecionando a propriedade lastProgress na consulta de transmissão.

Métrica

Descrição

processingLatencyMs

Tempo entre a leitura de um registro pelo fluxo e o processamento completo pelo fluxo

sourceQueuingLatencyMs

Tempo entre quando um registro é gravado com sucesso no barramento de mensagens (por exemplo, tempo de acréscimo do log no Kafka) e quando ele é lido pela primeira vez pelo fluxo

e2eLatencyMs

Latência total de ponta a ponta desde a produção do registro na origem até o processamento completo pelo fluxo

Cada métrica é relatada como percentis p50, p90, p95 e p99.

Limitações

Um fluxo em tempo real por pipeline é recomendado. Vários fluxos são permitidos, mas a contenção de slot de tarefa entre fluxos aumenta a latência.

Para obter uma lista completa de limitações do operador e da fonte, consulte Limitações do modo em tempo real.

Recursos adicionais