Run your first Structured Streaming workload
This article provides code examples and explanation of basic concepts necessary to run your first Structured Streaming queries on Databricks. You can use Structured Streaming for near real-time and incremental processing workloads.
Structured Streaming is one of several technologies that power streaming tables in Delta Live Tables. Databricks recommends using Delta Live Tables for all new ETL, ingestion, and Structured Streaming workloads. See What is Delta Live Tables?.
Note
While Delta Live Tables provides a slightly modified syntax for declaring streaming tables, the general syntax for configuring streaming reads and transformations applies to all streaming use cases on Databricks. Delta Live Tables also simplifies streaming by managing state information, metadata, and numerous configurations.
Use Auto Loader to read streaming data from object storage
The following example demonstrates loading JSON data with Auto Loader, which uses cloudFiles
to denote format and options. The schemaLocation
option enables schema inference and evolution. Paste the following code in a Databricks notebook cell and run the cell to create a streaming DataFrame named raw_df
:
file_path = "/databricks-datasets/structured-streaming/events"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"
raw_df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
)
Like other read operations on Databricks, configuring a streaming read does not actually load data. You must trigger an action on the data before the stream begins.
Note
Calling display()
on a streaming DataFrame starts a streaming job. For most Structured Streaming use cases, the action that triggers a stream should be writing data to a sink. See Production considerations for Structured Streaming.
Perform a streaming transformation
Structured Streaming supports most transformations that are available in Databricks and Spark SQL. You can even load MLflow models as UDFs and make streaming predictions as a transformation.
The following code example completes a simple transformation to enrich the ingested JSON data with additional information using Spark SQL functions:
from pyspark.sql.functions import col, current_timestamp
transformed_df = (raw_df.select(
"*",
col("_metadata.file_path").alias("source_file"),
current_timestamp().alias("processing_time")
)
)
The resulting transformed_df
contains query instructions to load and transform each record as it arrives in the data source.
Note
Structured Streaming treats data sources as unbounded or infinite datasets. As such, some transformations are not supported in Structured Streaming workloads because they would require sorting an infinite number of items.
Most aggregations and many joins require managing state information with watermarks, windows, and output mode. See Apply watermarks to control data processing thresholds.
Perform an incremental batch write to Delta Lake
The following example writes to Delta Lake using a specified file path and checkpoint.
Important
Always make sure you specify a unique checkpoint location for each streaming writer you configure. The checkpoint provides the unique identity for your stream, tracking all records processed and state information associated with your streaming query.
The availableNow
setting for the trigger instructs Structured Streaming to process all previously unprocessed records from the source dataset and then shut down, so you can safely execute the following code without worrying about leaving a stream running:
target_path = "/tmp/ss-tutorial/"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"
transformed_df.writeStream
.trigger(availableNow=True)
.option("checkpointLocation", checkpoint_path)
.option("path", target_path)
.start()
In this example, no new records arrive in our data source, so repeat execution of this code does not ingest new records.
Warning
Structured Streaming execution can prevent auto termination from shutting down compute resources. To avoid unexpected costs, be sure to terminate streaming queries.
Read data from Delta Lake, transform, and write to Delta Lake
Delta Lake has extensive support for working with Structured Streaming as both a source and a sink. See Delta table streaming reads and writes.
The following example shows example syntax to incrementally load all new records from a Delta table, join them with a snapshot of another Delta table, and write them to a Delta table:
(spark.readStream
.table("<table-name1>")
.join(spark.read.table("<table-name2>"), on="<id>", how="left")
.writeStream
.trigger(availableNow=True)
.option("checkpointLocation", "<checkpoint-path>")
.toTable("<table-name3>")
)
You must have proper permissions configured to read source tables and write to target tables and the specified checkpoint location. Fill in all parameters denoted with angle brackets (<>
) using the relevant values for your data sources and sinks.
Note
Delta Live Tables provides a fully declarative syntax for creating Delta Lake pipelines and manages properties like triggers and checkpoints automatically. See What is Delta Live Tables?.
Read data from Kafka, transform, and write to Kafka
Apache Kafka and other messaging buses provide some of the lowest latency available for large datasets. You can use Databricks to apply transformations to data ingested from Kafka and then write data back to Kafka.
Note
Writing data to cloud object storage adds additional latency overhead. If you wish to store data from a messaging bus in Delta Lake but require the lowest latency possible for streaming workloads, Databricks recommends configuring separate streaming jobs to ingest data to the lakehouse and apply near real-time transformations for downstream messaging bus sinks.
The following code example demonstrates a simple pattern to enrich data from Kafka by joining it with data in a Delta table and then writing back to Kafka:
(spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
.join(spark.read.table("<table-name>"), on="<id>", how="left")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.option("checkpointLocation", "<checkpoint-path>")
.start()
)
You must have proper permissions configured for access to your Kafka service. Fill in all parameters denoted with angle brackets (<>
) using the relevant values for your data sources and sinks. See Stream processing with Apache Kafka and Databricks.