Pular para o conteúdo principal

Exemplos de modo em tempo real

info

Visualização

Este recurso está em Pré-visualização Pública.

Esta página fornece exemplos de código funcionais para consultas em modo tempo real na transmissão estruturada, desde transformações simples sem estado até processamento complexo com estado e gerenciamento de estado personalizado. Veja o modo tempo real em transmissão estruturada para conceitos e configuração, e comece com o modo tempo real para um tutorial prático.

Pré-requisitos

Para executar os exemplos desta página, você precisa de:

  • Um cluster em modo de tempo real configurado e em execução. Consulte a seção "Comece com o modo Tempo Real" para obter instruções de configuração passo a passo.

  • Databricks Runtime 16.4 LTS ou superior.

  • Acesso a fontes e destinos de transmissão suportados:

    • Por exemplo, para o Kafka: um broker Kafka com tópicos de entrada/saída configurados.
    • Para exemplos Kinesis : credenciais AWS e uma transmissão Kinesis configurada para o modo Enhanced Fan-Out (EFO).
    • Para exemplos de coletores personalizados: Banco de dados ou serviço de destino configurado (PostgreSQL no exemplo fornecido).
  • Familiaridade básica com conceitos de transmissão estruturada. Veja conceitos de transmissão estruturada se você é novo em transmissão.

nota

Os exemplos usam valores de espaço reservado como broker_address, input_topic e checkpoint_location. Substitua esses valores pelos seus valores de configuração reais antes de executar o código.

Exemplos de consultas sem estado

Consultas sem estado processam cada registro de forma independente, sem manter qualquer estado entre os registros. Essas consultas são normalmente mais simples e têm menor latência do que as consultas com estado, porque não precisam gerenciar o armazenamento de estado nem realizar pesquisas. Utilize consultas sem estado para transformações, filtragem, junção com dados estáticos e operações de roteamento.

Fonte Kafka para destino Kafka

Neste exemplo, você lê de uma fonte Kafka e escreve em um destino Kafka.

Python
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic)
.load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)

Repartição

Neste exemplo, você lê de uma fonte Kafka, reparticiona os dados em 20 partições e grava em um coletor Kafka.

Devido a uma limitação de implementação atual, você deve definir a configuração do Spark de spark.sql.execution.sortBeforeRepartition para false antes de usar o reparticionamento.

Python
# Sorting is not supported in repartition with real-time mode, so you must set this to false to achieve low latency.
spark.conf.set("spark.sql.execution.sortBeforeRepartition", "false")

query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.option("startingOffsets", "earliest")
.load()
.repartition(20)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)

-Snapshot join (somente transmissão)

Neste exemplo, você lê dados do Kafka, join os dados a uma tabela estática e grava em um coletor Kafka . Somente junções estáticas de transmissão que transmitem a tabela estática são suportadas, o que significa que a tabela estática deve caber na memória.

Python
from pyspark.sql.functions import broadcast, expr

# We assume the static table in the path `static_table_location` has a column 'lookupKey'.

query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.option("startingOffsets", "earliest")
.load()
.withColumn("joinKey", expr("CAST(value AS STRING)"))
.join(
broadcast(spark.read.format("parquet").load(static_table_location)),
expr("joinKey = lookupKey")
)
.selectExpr("value AS key", "value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)

Fonte Kinesis para destino Kafka

Neste exemplo, você lê de uma fonte Kinesis e grava em um destino Kafka.

Python
query = (
spark.readStream
.format("kinesis")
.option("region", region_name)
.option("awsAccessKey", aws_access_key_id)
.option("awsSecretKey", aws_secret_access_key)
.option("consumerMode", "efo")
.option("consumerName", consumer_name)
.load()
.selectExpr("partitionKey AS key", "CAST(data AS STRING) AS value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)

União

Neste exemplo, você une dois DataFrames do Kafka provenientes de dois tópicos diferentes e os grava em um coletor do Kafka.

Python
df1 = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic_1)
.load()
)

df2 = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic_2)
.load()
)

query = (
df1.union(df2)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)

Exemplos de consultas com estado

Consultas com estado mantêm informações de estado entre registros, permitindo operações como desduplicação, agregação e janelamento. Essas consultas são essenciais para casos de uso que exigem acompanhamento de informações ao longo do tempo ou em vários eventos. O modo em tempo real suporta operações com estado com a mesma semântica do modo de micro-lotes, mas processa os dados continuamente para menor latência. Consultas com estado exigem mais memória e recursos compute do que consultas sem estado, pois precisam manter e atualizar o estado.

Desduplicação

Neste exemplo, você remove registros duplicados com base nas colunas timestamp e value .

Python
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic)
.load()
.dropDuplicates(["timestamp", "value"])
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)

Agregação

Neste exemplo, você agrupa os registros por timestamp e value e, em seguida, conta as ocorrências.

Python
from pyspark.sql.functions import col

query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic)
.load()
.groupBy(col("timestamp"), col("value"))
.count()
.selectExpr("CAST(value AS STRING) AS key", "CAST(count AS STRING) AS value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)

União com agregação

Neste exemplo, você primeiro une dois DataFrames do Kafka de dois tópicos diferentes e depois realiza uma agregação. No final, você escreve para o coletor do Kafka.

Python
from pyspark.sql.functions import col

df1 = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic_1)
.load()
)

df2 = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic_2)
.load()
)

query = (
df1.union(df2)
.groupBy(col("timestamp"), col("value"))
.count()
.selectExpr("CAST(value AS STRING) AS key", "CAST(count AS STRING) AS value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)

transformarComEstado

Neste exemplo, você usa transformWithState para manter um estado personalizado com TTL (tempo de vida). O processador conta o número de registros visualizados para cada key.

Python
from typing import Iterator, Tuple

from pyspark.sql import Row
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import LongType, StringType, TimestampType, StructField, StructType


class RTMStatefulProcessor(StatefulProcessor):
"""
This processor counts the number of records it has seen for each key using state variables
with TTLs. It redundantly maintains this count with a value, list, and map state to put load
on the state variable cleanup mechanism. (In practice, only one value state is needed to maintain
the count for a given grouping key.)

The input schema it expects is (String, Long) which represents a (key, source-timestamp) tuple.
The source-timestamp is passed through so that we can calculate end-to-end latency. The output
schema is (String, Long, Long), which represents a (key, count, source-timestamp) 3-tuple.
"""

def init(self, handle: StatefulProcessorHandle) -> None:
state_schema = StructType([StructField("value", LongType(), True)])
self.value_state = handle.getValueState("value", state_schema, 30000)
map_key_schema = StructType([StructField("key", LongType(), True)])
map_value_schema = StructType([StructField("value", StringType(), True)])
self.map_state = handle.getMapState("map", map_key_schema, map_value_schema, 30000)
list_schema = StructType([StructField("value", StringType(), True)])
self.list_state = handle.getListState("list", list_schema, 30000)

def handleInputRows(self, key, rows, timerValues) -> Iterator[Row]:
for row in rows:
# row is a tuple (key, source_timestamp)
key_str = row[0]
source_timestamp = row[1]
old_value = value.get()
if old_value is None:
old_value = 0
self.value_state.update((old_value + 1,))
self.map_state.update((old_value,), (key_str,))
self.list_state.appendValue((key_str,))
yield Row(key=key_str, value=old_value + 1, timestamp=source_timestamp)

def close(self) -> None:
pass


output_schema = StructType(
[
StructField("key", StringType(), True),
StructField("value", LongType(), True),
StructField("timestamp", TimestampType(), True),
]
)

query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
.groupBy("key")
.transformWithState(
statefulProcessor=RTMStatefulProcessor(),
outputStructType=output_schema,
outputMode="Update",
timeMode="processingTime",
)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("Update")
.start()
)
nota

Há uma diferença entre o modo tempo real e outros modos de execução na transmissão estruturada execução do StatefulProcessor em transformWithState. Consulte Usar transformWithState no modo em tempo real.

Desenvolvimento e testes

Utilizando a tela para desenvolvimento interativo

Você pode usar a função display para visualizar dados de transmissão em tempo real diretamente em um Notebook. Isso é útil para desenvolvimento interativo, teste e depuração de consultas em modo de tempo real sem a necessidade de configurar coletores externos ou infraestrutura de produção.

A função display com gatilho realTime está disponível no Databricks Runtime 17.1 e versões superiores. Use display durante o desenvolvimento para verificar sua lógica de consulta e transformações de dados antes de implantar em produção com Kafka ou sinks personalizados. Para um exemplo completo usando a fonte de taxa com display, veja Get começar com o modo tempo real.

Exibir taxa de fonte

Neste exemplo, você lê de uma fonte de dados e exibe o DataFrame de transmissão em um Notebook.

Python
inputDF = (
spark
.readStream
.format("rate")
.option("numPartitions", 2)
.option("rowsPerSecond", 1)
.load()
)
display(inputDF, realTime="5 minutes", outputMode="update")

Exemplos de pias personalizadas

Quando você precisar escrever dados de transmissão para destinos que não têm suporte integrado de transmissão estruturada, use foreachSink para implementar lógica de escrita personalizada. Os sinks personalizados oferecem controle total sobre como os dados são gravados, permitindo a integração com qualquer banco de dados, API ou sistema de armazenamento. O exemplo a seguir demonstra como escrever dados em um banco de dados PostgreSQL usando JDBC.

Escreva no PostgreSQL usando foreachSink

Scala
import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.spark.sql.{ForeachWriter, Row}

/**
* Groups connection properties for
* the JDBC writers.
*
* @param url JDBC url of the form jdbc:subprotocol:subname to connect to
* @param dbtable database table that should be written into
* @param username username for authentication
* @param password password for authentication
*/
class JdbcWriterConfig(
val url: String,
val dbtable: String,
val username: String,
val password: String,
) extends Serializable

/**
* Handles streaming data writes to a database sink via JDBC, by:
* - connecting to the database
* - buffering incoming data rows in batches to reduce write overhead
*
* @param config connection parameters and configuration knobs for the writer
*/
class JdbcStreamingDataWriter(config: JdbcWriterConfig)
extends ForeachWriter[Row] with Serializable {
// The writer currently only supports this hard-coded schema
private val UPSERT_STATEMENT_SQL =
s"""MERGE INTO "${config.dbtable}"
|USING (
| SELECT
| CAST(? AS INTEGER) AS "id",
| CAST(? AS CHARACTER VARYING) AS "data"
|) AS "source"
|ON "test"."id" = "source"."id"
|WHEN MATCHED THEN
| UPDATE SET "data" = "source"."data"
|WHEN NOT MATCHED THEN
| INSERT ("id", "data") VALUES ("source"."id", "source"."data")
|""".stripMargin

private val MAX_BUFFER_SIZE = 3
private val buffer = new Array[Row](MAX_BUFFER_SIZE)
private var bufferSize = 0

private var connection: Connection = _

/**
* Flushes the [[buffer]] by writing all rows in the buffer to the database.
*/
private def flushBuffer(): Unit = {
require(connection != null)

if (bufferSize == 0) {
return
}

var upsertStatement: PreparedStatement = null

try {
upsertStatement = connection.prepareStatement(UPSERT_STATEMENT_SQL)

for (i <- 0 until bufferSize) {
val row = buffer(i)
upsertStatement.setInt(1, row.getAs[String]("key"))
upsertStatement.setString(2, row.getAs[String]("value"))
upsertStatement.addBatch()
}

upsertStatement.executeBatch()
connection.commit()

bufferSize = 0
} catch { case e: Exception =>
if (connection != null) {
connection.rollback()
}
throw e
} finally {
if (upsertStatement != null) {
upsertStatement.close()
}
}
}

override def open(partitionId: Long, epochId: Long): Boolean = {
connection = DriverManager.getConnection(config.url, config.username, config.password)
true
}

override def process(row: Row): Unit = {
buffer(bufferSize) = row
bufferSize += 1
if (bufferSize >= MAX_BUFFER_SIZE) {
flushBuffer()
}
}

override def close(errorOrNull: Throwable): Unit = {
flushBuffer()
if (connection != null) {
connection.close()
connection = null
}
}
}


spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", inputTopic)
.load()
.writeStream
.outputMode(OutputMode.Update())
.trigger(defaultTrigger)
.foreach(new JdbcStreamingDataWriter(new JdbcWriterConfig(jdbcUrl, tableName, jdbcUsername, jdbcPassword)))
.start()

Próximos passos

Agora que você explorou esses exemplos de modo tempo real, aqui estão alguns recursos para aprofundar seu conhecimento e criar aplicativos de transmissão prontos para produção: