Use o modo em tempo real em Lakeflow Spark Pipelines Declarativos
Pré-lançamento público
O modo em tempo real no LakeFlow Spark Declarative Pipelines está em Pré-visualização Pública na versão Databricks Runtime 18.1.2. no canal de pré-visualização.
O modo de tempo real permite o processamento de dados de latência ultrabaixa, com latência de ponta a ponta tão baixa quanto cinco milissegundos. Use o modo em tempo real para cargas de trabalho operacionais que exigem resposta imediata a dados de transmissão, como detecção de fraudes e personalização em tempo real.
O modo em tempo real também está disponível diretamente na Transmissão Estructurada fora de pipelines. Consulte modo em tempo real em transmissão estructurada.
Como o modo em tempo real consegue baixa latência
O modo em tempo real difere do processamento contínuo padrão de três maneiras principais:
- Lotes de longa duração: O sistema processa os dados à medida que ficam disponíveis na origem em lotes de longa duração (o default é de cinco minutos).
- Agendamento simultâneo de etapas : todas as etapas da query são agendadas ao mesmo tempo. O recurso de compute deve ter slots de tarefa disponíveis suficientes para cobrir todos os estágios simultaneamente. Consulte Dimensionamento da computação.
- Embaralhamento de transmissão: Os dados são transmitidos entre estágios assim que são produzidos, em vez de esperar que um estágio upstream seja concluído antes de iniciar o estágio downstream.
O intervalo do ponto de verificação (configurado via pipelines.trigger.interval) controla com que frequência o estado e os deslocamentos da origem são persistidos em armazenamento durável. Intervalos mais longos reduzem a sobrecarga de checkpointing, mas aumentam o tempo de recuperação após uma falha e atrasam a comunicação de métricas. Intervalos menores melhoram a durabilidade, mas adicionam sobrecarga.
Modo em tempo real e pipelines contínuos
O modo em tempo real é um tipo especializado de trigger contínuo. O modo contínuo ainda é necessário; o modo em tempo real acrescenta otimizações de latência no nível do fluxo. Para usar o modo em tempo real, o pipeline deve primeiro ser executado no modo contínuo. O modo em tempo real então aplica otimizações adicionais no nível do fluxo para atingir latência de sub-segundo além do que o processamento contínuo padrão oferece.
Habilitar o modo em tempo real requer três os passos de configuração:
- Usar o pipeline no modo contínuo.
- Habilitar o modo em tempo real no nível do pipeline.
- Defina um fluxo de atualização em tempo real.
Requisitos
Requisito | Valor |
|---|---|
Runtime do Databricks | 18.1.2 no canal de pré-visualização do SDP |
Tipo de Compute | compute Classic ou serverless |
Configurar modo em tempo real
O passo 1: Defina o pipeline para o modo contínuo
Nas configurações do seu pipeline, defina o **modo do Pipeline** como **Contínuo**, ou defina-o no JSON do pipeline:
{
"continuous": true
}
O passo 2: Habilitar o modo em tempo real no nível do pipeline
Nas configurações do seu pipeline, adicione a seguinte key à configuração do Spark em Avançado > Configuração do Spark :
spark.databricks.streaming.realTimeMode.enabled = true
Você também pode definir isso no pipeline JSON:
{
"continuous": true,
"spark_conf": {
"spark.databricks.streaming.realTimeMode.enabled": "true"
}
}
O passo 3: Defina um fluxo de atualização em tempo real
O modo em tempo real exige um fluxo de atualização. Use dp.create_sink() para definir o destino da saída, e então use o decorador @dp.update_flow com pipelines.trigger definido como "RealTime" e target apontando para o coletor.
from pyspark import pipelines as dp
# Define the output sink
dp.create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "<bootstrap-servers>",
"topic": "<output-topic>",
}
)
# Define the real-time update flow targeting the sink
@dp.update_flow(
name="my_rtm_flow",
target="my_kafka_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes", # optional; defaults to 5 minutes
}
)
def my_real_time_flow():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<bootstrap-servers>")
.option("subscribe", "<input-topic>")
.load()
)
Parâmetros de configuração de fluxo:
Parâmetro | Obrigatório | Padrão | Descrição |
|---|---|---|---|
| Sim | — | Defina como |
| Não |
| Intervalo de ponto de verificação. Controla a frequência com que o estado e os deslocamentos são confirmados. Valores mais curtos melhoram a recuperabilidade; valores mais longos reduzem a sobrecarga. |
Exemplos de código
Kafka para Kafka
Ler de um tópico Kafka e gravar em um destino de saída Kafka:
from pyspark import pipelines as dp
dp.create_sink("kafka_output_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})
@dp.update_flow(
name="kafka_rtm_flow",
target="kafka_output_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes",
}
)
def kafka_rtm_flow():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.option("startingOffsets", "latest")
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
)
Enriquecer com um join de transmissão
Join uma transmissão Kafka a uma tabela de pesquisa estática. Apenas junções de transmissão (de fluxo para estático) são compatíveis. Joins de transmissão para transmissão não são compatíveis no modo de tempo real.
from pyspark import pipelines as dp
from pyspark.sql.functions import broadcast, expr
dp.create_sink("enriched_output_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": enriched_output_topic,
})
@dp.update_flow(
name="enriched_events_flow",
target="enriched_output_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes",
}
)
def enriched_events():
lookup = spark.read.table("catalog.schema.lookup_table")
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.withColumn("event_key", expr("CAST(value AS STRING)"))
.join(broadcast(lookup), expr("event_key = lookup_key"))
.select("event_key", "lookup_value", "timestamp")
)
Agregação
Contar eventos por key usando um groupBy com estado. Defina spark.sql.shuffle.partitions para corresponder à contagem de partições de entrada para operações com estado:
from pyspark import pipelines as dp
from pyspark.sql.functions import col
dp.create_sink("event_counts_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})
@dp.update_flow(
name="event_counts_flow",
target="event_counts_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes",
"spark.sql.shuffle.partitions": "8",
}
)
def event_counts():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.selectExpr("CAST(key AS STRING) AS event_type", "timestamp")
.groupBy(col("event_type"))
.count()
)
Fontes e destinos compatíveis
Conector | Como fonte | Como pia | Notas |
|---|---|---|---|
Apache Kafka | ✓ | ✓ | — |
AWS MSK | ✓ | ✓ | Usa a interface compatível com Kafka. |
Azure Event Hubs (conector Kafka) | ✓ | ✓ | Usa a interface compatível com Kafka. |
Amazon Kinesis | ✓ | Não suportado | Usar apenas para o modo EFO (Fan-Out Aprimorado). |
Delta | Não suportado | Não suportado | — |
compute sizing
É possível executar um pipeline em tempo real por recurso de compute, se o compute tiver slots de tarefa suficientes. Os slots de tarefa disponíveis devem cobrir todas as tarefas em todas as etapas de consulta.
Tipo de pipeline | Configuração | Espaços de tarefas obrigatórios |
|---|---|---|
Sem estado de estágio único (origem + destino Kafka) |
| 8 |
Com estado de duas etapas (fonte Kafka + shuffle) |
| 28 (8 + 20) |
Três estágios (fonte Kafka + dois embaralhamentos) |
| 48 (8 + 20 + 20) |
Caso não defina maxPartitions, use o número de partições no tópico Kafka.
Suporte do operador
Categoria | Operador | Compatível |
|---|---|---|
Sem estado | Seleção, Projeção | ✓ |
UDFs | UDF Scala | ✓ (com limitações) |
UDFs | UDF Python | ✓ (com limitações) |
Agregação | soma, contagem, máx., mín., média | ✓ |
Janelamento | Tombamento, deslizamento | ✓ |
Janelamento | Sessão | Não suportado |
Desduplicação |
| ✓ (estado ilimitado) |
Desduplicação |
| Não suportado |
join | Join de tabela de broadcast | ✓ |
join | Join entre transmissões | Não suportado |
Personalizada |
| ✓ (com diferenças comportamentais) |
Personalizada |
| ✓ (com limitações) |
Personalizada |
| Não suportado |
Personalizada |
| Não suportado |
Personalizada |
| Não suportado |
Personalizada |
| Não suportado |
transformWithState em modo de tempo real
transformWithState é suportado em tempo real com as seguintes diferenças em relação ao processamento de micro-lotes:
handleInputRowsé invocado uma vez por linha em vez de uma vez por key por lote. OinputRowsiterador produz um único valor por invocação.- Cronômetros de tempo de evento não são compatíveis. Temporizadores de tempo de processamento disparam quando um lote de longa duração é encerrado se nenhum dado tiver chegado.
transformWithStateInPandasNão é compatível.
Pandas UDFs em tempo real
Para minimizar a latência com UDFs pandas, defina spark.sql.execution.arrow.maxRecordsPerBatch para 1. Isso otimiza a latência em detrimento da Taxa de transferência. Se a taxa de transferência também for importante, defina este valor como 100 ou superior.
Monitorar o desempenho do modo em tempo real
O modo em tempo real expõe as métricas de latência em StreamingQueryProgress no campo latencies. Acesse essas métricas por meio de um StreamingQueryListener ou inspecionando a propriedade lastProgress na consulta de transmissão.
Métrica | Descrição |
|---|---|
| Tempo entre a leitura de um registro pelo fluxo e o processamento completo pelo fluxo |
| Tempo entre quando um registro é gravado com sucesso no barramento de mensagens (por exemplo, tempo de acréscimo do log no Kafka) e quando ele é lido pela primeira vez pelo fluxo |
| Latência total de ponta a ponta desde a produção do registro na origem até o processamento completo pelo fluxo |
Cada métrica é relatada como percentis p50, p90, p95 e p99.
Limitações
Um fluxo em tempo real por pipeline é recomendado. Vários fluxos são permitidos, mas a contenção de slot de tarefa entre fluxos aumenta a latência.
Para obter uma lista completa de limitações do operador e da fonte, consulte Limitações do modo em tempo real.