This article provides code examples and explanation of basic concepts necessary to run your first Structured Streaming queries on Databricks. You can use Structured Streaming for near real-time and incremental processing workloads.
Structured Streaming is one of several technologies that power streaming tables in Delta Live Tables. Databricks recommends using Delta Live Tables for all new ETL, ingestion, and Structured Streaming workloads. See What is Delta Live Tables?.
While Delta Live Tables provides a slightly modified syntax for declaring streaming tables, the general syntax for configuring streaming reads and transformations applies to all streaming use cases on Databricks. Delta Live Tables also simplifies streaming by managing state information, metadata, and numerous configurations.
You can use Structured Streaming to incrementally ingest data from supported data sources. Some of the most common data sources used in Databricks Structured Streaming workloads include the following:
Data files in cloud object storage
Message buses and queues
Databricks recommends using Auto Loader for streaming ingestion from cloud object storage. Auto Loader supports most file formats supported by Structured Streaming. See What is Auto Loader?.
Each data source provides a number of options to specify how to load batches of data. During reader configuration, the main options you might need to set fall into the following categories:
Options that specify the data source or format (for example, file type, delimiters, and schema).
Options that configure access to source systems (for example, port settings and credentials).
Options that specify where to start in a stream (for example, Kafka offsets or reading all existing files).
Options that control how much data is processed in each batch (for example, max offsets, files, or bytes per batch).
The following example demonstrates loading JSON data with Auto Loader, which uses
cloudFiles to denote format and options. The
schemaLocation option enables schema inference and evolution. Paste the following code in a Databricks notebook cell and run the cell to create a streaming DataFrame named
file_path = "/databricks-datasets/structured-streaming/events" checkpoint_path = "/tmp/ss-tutorial/_checkpoint" raw_df = (spark.readStream .format("cloudFiles") .option("cloudFiles.format", "json") .option("cloudFiles.schemaLocation", checkpoint_path) .load(file_path) )
Like other read operations on Databricks, configuring a streaming read does not actually load data. You must trigger an action on the data before the stream begins.
display() on a streaming DataFrame starts a streaming job. For most Structured Streaming use cases, the action that triggers a stream should be writing data to a sink. See Preparing your Structured Streaming code for production.
Structured Streaming supports most transformations that are available in Databricks and Spark SQL. You can even load MLflow models as UDFs and make streaming predictions as a transformation.
The following code example completes a simple transformation to enrich the ingested JSON data with additional information using Spark SQL functions:
from pyspark.sql.functions import col, current_timestamp transformed_df = (raw_df.select( "*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time") ) )
transformed_df contains query instructions to load and transform each record as it arrives in the data source.
Structured Streaming treats data sources as unbounded or infinite datasets. As such, some transformations are not supported in Structured Streaming workloads because they would require sorting an infinite number of items.
Most aggregations and many joins require managing state information with watermarks, windows, and output mode. See Apply watermarks to control data processing thresholds.
A data sink is the target of a streaming write operation. Common sinks used in Databricks streaming workloads include the following:
Message buses and queues
As with data sources, most data sinks provide a number of options to control how data is written to the target system. During writer configuration, the main options you might need to set fall into the following categories:
Output mode (append by default).
A checkpoint location (required for each writer).
Trigger intervals; see Configure Structured Streaming trigger intervals.
Options that specify the data sink or format (for example, file type, delimiters, and schema).
Options that configure access to target systems (for example, port settings and credentials).
The following example writes to Delta Lake using a specified file path and checkpoint.
Always make sure you specify a unique checkpoint location for each streaming writer you configure. The checkpoint provides the unique identity for your stream, tracking all records processed and state information associated with your streaming query.
availableNow setting for the trigger instructs Structured Streaming to process all previously unprocessed records from the source dataset and then shut down, so you can safely execute the following code without worrying about leaving a stream running:
target_path = "/tmp/ss-tutorial/" checkpoint_path = "/tmp/ss-tutorial/_checkpoint" transformed_df.writeStream .trigger(availableNow=True) .option("checkpointLocation", checkpoint_path) .option("path", target_path) .start()
In this example, no new records arrive in our data source, so repeat execution of this code does not ingest new records.
Structured Streaming execution can prevent auto termination from shutting down compute resources. To avoid unexpected costs, be sure to terminate streaming queries.
Databricks recommends using Delta Live Tables for most Structured Streaming workloads. The following recommendations provide a starting point for preparing Structured Streaming workloads for production:
Remove unnecessary code from notebooks that would return results, such as
Do not run Structured Streaming workloads on interactive clusters; always schedule streams as jobs.
To help streaming jobs recover automatically, configure jobs with infinite retries.
Do not use auto-scaling for workloads with Structured Streaming.
For more recommendations, see Production considerations for Structured Streaming.
Delta Lake has extensive support for working with Structured Streaming as both a source and a sink. See Delta table streaming reads and writes.
The following example shows example syntax to incrementally load all new records from a Delta table, join them with a snapshot of another Delta table, and write them to a Delta table:
(spark.readStream .table("<table-name1>") .join(spark.read.table("<table-name2>"), on="<id>", how="left") .writeStream .trigger(availableNow=True) .option("checkpointLocation", "<checkpoint-path>") .toTable("<table-name3>") )
You must have proper permissions configured to read source tables and write to target tables and the specified checkpoint location. Fill in all parameters denoted with angle brackets (
<>) using the relevant values for your data sources and sinks.
Delta Live Tables provides a fully declarative syntax for creating Delta Lake pipelines and manages properties like triggers and checkpoints automatically. See What is Delta Live Tables?.
Apache Kafka and other messaging buses provide some of the lowest latency available for large datasets. You can use Databricks to apply transformations to data ingested from Kafka and then write data back to Kafka.
Writing data to cloud object storage adds additional latency overhead. If you wish to store data from a messaging bus in Delta Lake but require the lowest latency possible for streaming workloads, Databricks recommends configuring separate streaming jobs to ingest data to the lakehouse and apply near real-time transformations for downstream messaging bus sinks.
The following code example demonstrates a simple pattern to enrich data from Kafka by joining it with data in a Delta table and then writing back to Kafka:
(spark.readStream .format("kafka") .option("kafka.bootstrap.servers", "<server:ip>") .option("subscribe", "<topic>") .option("startingOffsets", "latest") .load() .join(spark.read.table("<table-name>"), on="<id>", how="left") .writeStream .format("kafka") .option("kafka.bootstrap.servers", "<server:ip>") .option("topic", "<topic>") .option("checkpointLocation", "<checkpoint-path>") .start() )
You must have proper permissions configured for access to your Kafka service. Fill in all parameters denoted with angle brackets (
<>) using the relevant values for your data sources and sinks. See Stream processing with Apache Kafka and Databricks.