Configurar modo em tempo real
Esta página descreve os pré-requisitos e configurações necessárias para execução de consultas em modo real de tempo em transmissão estruturada. Para obter um tutorial passo a passo, consulte tutorial: execução a tempo real transmissão workload. Para informações conceituais sobre modo tempo real, veja modo tempo real em transmissão estruturada.
Pré-requisitos
Para usar o modo tempo real, você deve configurar seu compute para atender aos seguintes requisitos:
- Use compute clássica. São suportados modos de acesso dedicado e padrão. O modo de acesso padrão é compatível apenas com Python. O pipeline declarativo LakeFlow Spark e clusters serverless não são suportados.
- Utilize Databricks Runtime 16.4 LTS ou superior.
- Desative a escala automática.
- Desligue o Photon.
- Defina
spark.databricks.streaming.realTimeMode.enabledcomotrue. - Desative as instâncias spot para evitar interrupções.
Para cargas de trabalho sensíveis à latência com UDFs (Funções Definidas pelo Usuário), a Databricks recomenda o uso do modo de acesso dedicado. Consulte as funções da tabela.
Para obter instruções sobre como criar e configurar compute clássica, consulte a referência de configuração de computação.
joins de transmissão para transmissão
Joins internos entre transmissões exigem configuração adicional para o modo de tempo real. Outer joins não são compatíveis. Consulte join entre transmissões.
Para executar um join de stream para transmissão no modo em tempo real com várias outras transmissões no mesmo cluster, você deve usar o Databricks Runtime 18 e superior.
No Databricks Runtime 18.2 e abaixo, a transmissão estructurada não oferece suporte para as seguintes configurações para outros modos de processamento, incluindo processingTime e availableNow.
Para habilitar joins de transmissão para transmissão para o modo de tempo real, defina as seguintes configurações do Spark:
- Python
- Scala
- SQL
spark.conf.set("spark.databricks.streaming.realTimeMode.streamStreamJoin.enabled", "true")
spark.conf.set("spark.sql.streaming.join.stateFormatVersion", "4")
spark.conf.set("spark.sql.streaming.join.stateFormatV4.enabled", "true")
spark.conf.set("spark.sql.streaming.stateStore.rocksdb.mergeOperatorVersion", "2")
spark.conf.set("spark.sql.streaming.realTimeMode.controlMessage.enabled", "true")
spark.conf.set("spark.databricks.streaming.realTimeMode.streamStreamJoin.enabled", "true")
spark.conf.set("spark.sql.streaming.join.stateFormatVersion", "4")
spark.conf.set("spark.sql.streaming.join.stateFormatV4.enabled", "true")
spark.conf.set("spark.sql.streaming.stateStore.rocksdb.mergeOperatorVersion", "2")
spark.conf.set("spark.sql.streaming.realTimeMode.controlMessage.enabled", "true")
SET spark.databricks.streaming.realTimeMode.streamStreamJoin.enabled = true;
SET spark.sql.streaming.join.stateFormatVersion = 4;
SET spark.sql.streaming.join.stateFormatV4.enabled = true;
SET spark.sql.streaming.stateStore.rocksdb.mergeOperatorVersion = 2;
SET spark.sql.streaming.realTimeMode.controlMessage.enabled = true;
Configuração da consulta
Para executar uma consulta no modo tempo real, é necessário habilitar o trigger tempo real. Os gatilhos em tempo real são suportados apenas no modo de atualização.
- Python
- Scala
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.outputMode("update")
# In PySpark, the realTime trigger requires specifying the interval.
.trigger(realTime="5 minutes")
.start()
)
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
val readStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic).load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.outputMode("update")
.trigger(RealTimeTrigger.apply())
// RealTimeTrigger can also accept an argument specifying the checkpoint interval.
// For example, this code indicates a checkpoint interval of 5 minutes:
// .trigger(RealTimeTrigger.apply("5 minutes"))
.start()
dimensionamento de cálculo
Você pode executar um Job de tempo real por recurso compute se a compute tiver slots de tarefa suficientes.
Para execução em modo de baixa latência, o número total de slots de tarefas disponíveis deve ser maior ou igual ao número de tarefas em todos os estágios da consulta.
Exemplos de cálculo de slots
tipo de tubulação | Configuração | Vagas necessárias |
|---|---|---|
Sem estado em estágio único (fonte + destino Kafka) |
| 8 vagas |
Com estado em dois estágios (fonte Kafka + embaralhamento) |
| 28 vagas (8 + 20) |
Três estágios (fonte Kafka + shuffle + repartição) |
| 48 vagas (8 + 20 + 20) |
Se você não definir maxPartitions, use o número de partições no tópico do Kafka.