Configurar modo em tempo real
Esta página descreve os pré-requisitos e configurações necessárias para execução de consultas em modo real de tempo em transmissão estruturada. Para obter um tutorial passo a passo, consulte tutorial: execução a tempo real transmissão workload. Para informações conceituais sobre modo tempo real, veja modo tempo real em transmissão estruturada.
Pré-requisitos
Para usar o modo tempo real, você deve configurar seu compute para atender aos seguintes requisitos:
- Use o modo de acesso dedicado em compute clássica. O modo de acesso padrão, o pipeline declarativo LakeFlow Spark e clusters serverless não são suportados.
- Utilize Databricks Runtime 16.4 LTS ou superior.
- Desative a escala automática.
- Desligue o Photon.
- Defina
spark.databricks.streaming.realTimeMode.enabledcomotrue. - Desative as instâncias spot para evitar interrupções.
Para obter instruções sobre como criar e configurar compute clássica, consulte a referência de configuração de computação.
Configuração da consulta
Para executar uma consulta no modo tempo real, é necessário habilitar o trigger tempo real. Os gatilhos em tempo real são suportados apenas no modo de atualização.
- Python
- Scala
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.outputMode("update")
# In PySpark, the realTime trigger requires specifying the interval.
.trigger(realTime="5 minutes")
.start()
)
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
val readStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic).load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.outputMode("update")
.trigger(RealTimeTrigger.apply())
// RealTimeTrigger can also accept an argument specifying the checkpoint interval.
// For example, this code indicates a checkpoint interval of 5 minutes:
// .trigger(RealTimeTrigger.apply("5 minutes"))
.start()
dimensionamento de cálculo
Você pode executar um Job de tempo real por recurso compute se a compute tiver slots de tarefa suficientes.
Para execução em modo de baixa latência, o número total de slots de tarefas disponíveis deve ser maior ou igual ao número de tarefas em todos os estágios da consulta.
Exemplos de cálculo de slots
tipo de tubulação | Configuração | Vagas necessárias |
|---|---|---|
Sem estado em estágio único (fonte + destino Kafka) |
| 8 vagas |
Com estado em dois estágios (fonte Kafka + embaralhamento) |
| 28 vagas (8 + 20) |
Três estágios (fonte Kafka + shuffle + repartição) |
| 48 vagas (8 + 20 + 20) |
Se você não definir maxPartitions, use o número de partições no tópico do Kafka.