Skip to main content

Real-time mode examples

Preview

This feature is in Public Preview.

This page provides working code examples for real-time mode queries in Structured Streaming, from simple stateless transformations to complex stateful processing with custom state management. See Real-time mode in Structured Streaming for concepts and configuration, and Get started with real-time mode for a hands-on tutorial.

Prerequisites

To run the examples on this page, you need:

  • A real-time mode cluster configured and running. See Get started with real-time mode for step-by-step setup instructions.
  • Databricks Runtime 16.4 LTS or above.
  • Access to supported streaming sources and sinks:
    • For Kafka examples: A Kafka broker with input/output topics configured
    • For Kinesis examples: AWS credentials and a Kinesis stream configured for Enhanced Fan-Out (EFO) mode
    • For custom sink examples: Target database or service configured (PostgreSQL for the example provided)
  • Basic familiarity with Structured Streaming concepts. See Structured Streaming concepts if you're new to streaming.
note

The examples use placeholder values like broker_address, input_topic, and checkpoint_location. Replace these with your actual configuration values before running the code.

Stateless query examples

Stateless queries process each record independently without maintaining any state between records. These queries are typically simpler and have lower latency than stateful queries because they don't need to manage state storage or perform lookups. Use stateless queries for transformations, filtering, joins with static data, and routing operations.

Kafka source to Kafka sink

In this example, you read from a Kafka source and write to a Kafka sink.

Python
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic)
.load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)

Repartition

In this example, you read from a Kafka source, repartition the data into 20 partitions, and write to a Kafka sink.

Due to a current implementation limitation, you must set the Spark configuration spark.sql.execution.sortBeforeRepartition to false before using the repartition.

Python
# Sorting is not supported in repartition with real-time mode, so you must set this to false to achieve low latency.
spark.conf.set("spark.sql.execution.sortBeforeRepartition", "false")

query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.option("startingOffsets", "earliest")
.load()
.repartition(20)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)

Stream-snapshot join (broadcast only)

In this example, you read from Kafka, join the data with a static table, and write to a Kafka sink. Only stream-static joins that broadcast the static table are supported, which means the static table should fit in memory.

Python
from pyspark.sql.functions import broadcast, expr

# We assume the static table in the path `static_table_location` has a column 'lookupKey'.

query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.option("startingOffsets", "earliest")
.load()
.withColumn("joinKey", expr("CAST(value AS STRING)"))
.join(
broadcast(spark.read.format("parquet").load(static_table_location)),
expr("joinKey = lookupKey")
)
.selectExpr("value AS key", "value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)

Kinesis source to Kafka sink

In this example, you read from a Kinesis source and write to a Kafka sink.

Python
query = (
spark.readStream
.format("kinesis")
.option("region", region_name)
.option("awsAccessKey", aws_access_key_id)
.option("awsSecretKey", aws_secret_access_key)
.option("consumerMode", "efo")
.option("consumerName", consumer_name)
.load()
.selectExpr("partitionKey AS key", "CAST(data AS STRING) AS value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)

Union

In this example, you union two Kafka DataFrames from two different topics and write to a Kafka sink.

Python
df1 = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic_1)
.load()
)

df2 = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic_2)
.load()
)

query = (
df1.union(df2)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)

Stateful query examples

Stateful queries maintain state information across records, enabling operations such as deduplication, aggregation, and windowing. These queries are essential for use cases that require tracking information over time or across multiple events. Real-time mode supports stateful operations with the same semantics as micro-batch mode, but processes data continuously for lower latency. Stateful queries require more memory and compute resources than stateless queries because they must maintain and update state.

Deduplication

In this example, you deduplicate records based on the timestamp and value columns.

Python
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic)
.load()
.dropDuplicates(["timestamp", "value"])
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)

Aggregation

In this example, you group records by timestamp and value, then count the occurrences.

Python
from pyspark.sql.functions import col

query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic)
.load()
.groupBy(col("timestamp"), col("value"))
.count()
.selectExpr("CAST(value AS STRING) AS key", "CAST(count AS STRING) AS value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)

Union with aggregation

In this example, you first union two Kafka DataFrames from two different topics and then do an aggregation. In the end, you write to the Kafka sink.

Python
from pyspark.sql.functions import col

df1 = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic_1)
.load()
)

df2 = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic_2)
.load()
)

query = (
df1.union(df2)
.groupBy(col("timestamp"), col("value"))
.count()
.selectExpr("CAST(value AS STRING) AS key", "CAST(count AS STRING) AS value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)

transformWithState

In this example, you use transformWithState to maintain custom state with TTL (time-to-live). The processor counts the number of records seen for each key.

Python
from typing import Iterator, Tuple

from pyspark.sql import Row
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import LongType, StringType, TimestampType, StructField, StructType


class RTMStatefulProcessor(StatefulProcessor):
"""
This processor counts the number of records it has seen for each key using state variables
with TTLs. It redundantly maintains this count with a value, list, and map state to put load
on the state variable cleanup mechanism. (In practice, only one value state is needed to maintain
the count for a given grouping key.)

The input schema it expects is (String, Long) which represents a (key, source-timestamp) tuple.
The source-timestamp is passed through so that we can calculate end-to-end latency. The output
schema is (String, Long, Long), which represents a (key, count, source-timestamp) 3-tuple.
"""

def init(self, handle: StatefulProcessorHandle) -> None:
state_schema = StructType([StructField("value", LongType(), True)])
self.value_state = handle.getValueState("value", state_schema, 30000)
map_key_schema = StructType([StructField("key", LongType(), True)])
map_value_schema = StructType([StructField("value", StringType(), True)])
self.map_state = handle.getMapState("map", map_key_schema, map_value_schema, 30000)
list_schema = StructType([StructField("value", StringType(), True)])
self.list_state = handle.getListState("list", list_schema, 30000)

def handleInputRows(self, key, rows, timerValues) -> Iterator[Row]:
for row in rows:
# row is a tuple (key, source_timestamp)
key_str = row[0]
source_timestamp = row[1]
old_value = value.get()
if old_value is None:
old_value = 0
self.value_state.update((old_value + 1,))
self.map_state.update((old_value,), (key_str,))
self.list_state.appendValue((key_str,))
yield Row(key=key_str, value=old_value + 1, timestamp=source_timestamp)

def close(self) -> None:
pass


output_schema = StructType(
[
StructField("key", StringType(), True),
StructField("value", LongType(), True),
StructField("timestamp", TimestampType(), True),
]
)

query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
.groupBy("key")
.transformWithState(
statefulProcessor=RTMStatefulProcessor(),
outputStructType=output_schema,
outputMode="Update",
timeMode="processingTime",
)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("Update")
.start()
)
note

There is a difference between how real-time mode and other execution modes in Structured Streaming run the StatefulProcessor in transformWithState. See Use transformWithState in real-time mode.

Development and testing

Using display for interactive development

You can use the display function to visualize real-time streaming data directly in a notebook. This is useful for interactive development, testing, and debugging real-time mode queries without setting up external sinks or production infrastructure.

The display function with realTime trigger is available in Databricks Runtime 17.1 and above. Use display during development to verify your query logic and data transformations before deploying to production with Kafka or custom sinks. For a complete example using the rate source with display, see Get started with real-time mode.

Display rate source

In this example, you read from a rate source and display the streaming DataFrame in a notebook.

Python
inputDF = (
spark
.readStream
.format("rate")
.option("numPartitions", 2)
.option("rowsPerSecond", 1)
.load()
)
display(inputDF, realTime="5 minutes", outputMode="update")

Custom sink examples

When you need to write streaming data to destinations that don't have built-in Structured Streaming support, use foreachSink to implement custom write logic. Custom sinks give you full control over how data is written, allowing you to integrate with any database, API, or storage system. The following example demonstrates writing to a PostgreSQL database using JDBC.

Write to PostgreSQL using foreachSink

Scala
import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.spark.sql.{ForeachWriter, Row}

/**
* Groups connection properties for
* the JDBC writers.
*
* @param url JDBC url of the form jdbc:subprotocol:subname to connect to
* @param dbtable database table that should be written into
* @param username username for authentication
* @param password password for authentication
*/
class JdbcWriterConfig(
val url: String,
val dbtable: String,
val username: String,
val password: String,
) extends Serializable

/**
* Handles streaming data writes to a database sink via JDBC, by:
* - connecting to the database
* - buffering incoming data rows in batches to reduce write overhead
*
* @param config connection parameters and configuration knobs for the writer
*/
class JdbcStreamingDataWriter(config: JdbcWriterConfig)
extends ForeachWriter[Row] with Serializable {
// The writer currently only supports this hard-coded schema
private val UPSERT_STATEMENT_SQL =
s"""MERGE INTO "${config.dbtable}"
|USING (
| SELECT
| CAST(? AS INTEGER) AS "id",
| CAST(? AS CHARACTER VARYING) AS "data"
|) AS "source"
|ON "test"."id" = "source"."id"
|WHEN MATCHED THEN
| UPDATE SET "data" = "source"."data"
|WHEN NOT MATCHED THEN
| INSERT ("id", "data") VALUES ("source"."id", "source"."data")
|""".stripMargin

private val MAX_BUFFER_SIZE = 3
private val buffer = new Array[Row](MAX_BUFFER_SIZE)
private var bufferSize = 0

private var connection: Connection = _

/**
* Flushes the [[buffer]] by writing all rows in the buffer to the database.
*/
private def flushBuffer(): Unit = {
require(connection != null)

if (bufferSize == 0) {
return
}

var upsertStatement: PreparedStatement = null

try {
upsertStatement = connection.prepareStatement(UPSERT_STATEMENT_SQL)

for (i <- 0 until bufferSize) {
val row = buffer(i)
upsertStatement.setInt(1, row.getAs[String]("key"))
upsertStatement.setString(2, row.getAs[String]("value"))
upsertStatement.addBatch()
}

upsertStatement.executeBatch()
connection.commit()

bufferSize = 0
} catch { case e: Exception =>
if (connection != null) {
connection.rollback()
}
throw e
} finally {
if (upsertStatement != null) {
upsertStatement.close()
}
}
}

override def open(partitionId: Long, epochId: Long): Boolean = {
connection = DriverManager.getConnection(config.url, config.username, config.password)
true
}

override def process(row: Row): Unit = {
buffer(bufferSize) = row
bufferSize += 1
if (bufferSize >= MAX_BUFFER_SIZE) {
flushBuffer()
}
}

override def close(errorOrNull: Throwable): Unit = {
flushBuffer()
if (connection != null) {
connection.close()
connection = null
}
}
}


spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", inputTopic)
.load()
.writeStream
.outputMode(OutputMode.Update())
.trigger(defaultTrigger)
.foreach(new JdbcStreamingDataWriter(new JdbcWriterConfig(jdbcUrl, tableName, jdbcUsername, jdbcPassword)))
.start()

Next steps

Now that you've explored these real-time mode examples, here are resources to deepen your knowledge and build production-ready streaming applications: