Query streaming data

You can use Databricks to query streaming data sources using Structured Streaming. Databricks provides extensive support for streaming workloads in Python and Scala, and supports most Structured Streaming functionality with SQL.

The following examples demonstrate using a memory sink for manual inspection of streaming data during interactive development in notebooks. Because of row output limits in the notebook UI, you might not observe all data read by streaming queries. In production workloads, you should only trigger streaming queries by writing them to a target table or external system.

Note

SQL support for interactive queries on streaming data is limited to notebooks running on all-purpose compute. You can also use SQL when you declare streaming tables in Databricks SQL or Delta Live Tables. See Load data using streaming tables in Databricks SQL and What is Delta Live Tables?.

Query data from streaming systems

Databricks provides streaming data readers for the following streaming systems:

  • Kafka

  • Kinesis

  • PubSub

  • Pulsar

You must provide configuration details when you initialize queries against these systems, which vary depending on your configured environment and the system you choose to read from. See Configure streaming data sources.

Common workloads that involve streaming systems include data ingestion to the lakehouse and stream processing to sink data to external systems. For more on streaming workloads, see Streaming on Databricks.

The following examples demonstrate an interactive streaming read from Kafka:

display(spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "latest")
  .load()
)
SELECT * FROM STREAM read_kafka(
  bootstrapServers => '<server:ip>',
  subscribe => '<topic>',
  startingOffsets => 'latest'
);

Query a table as a streaming read

Databricks creates all tables using Delta Lake by default. When you perform a streaming query against a Delta table, the query automatically picks up new records when a version of the table is committed. By default, streaming queries expect source tables to contain only appended records. If you need to work with streaming data that contains updates and deletes, Databricks recommends using Delta Live Tables and APPLY CHANGES INTO. See Simplified change data capture with the APPLY CHANGES API in Delta Live Tables.

The following examples demonstrate performing an interactive streaming read from a table:

display(spark.readStream.table("table_name"))
SELECT * FROM STREAM table_name

Query data in cloud object storage with Auto Loader

You can stream data from cloud object storage using Auto Loader, the Databricks cloud data connector. You can use the connector with files stored in Unity Catalog volumes or other cloud object storage locations. Databricks recommends using volumes to manage access to data in cloud object storage. See Connect to data sources.

Databricks optimizes this connector for streaming ingestion of data in cloud object storage that is stored in popular structured, semi-structured, and unstructured formats. Databricks recommends storing ingested data in a nearly-raw format to maximize throughput and minimize potential data loss due to corrupt records or schema changes.

For more recommendations on ingesting data from cloud object storage, see Ingest data into a Databricks lakehouse.

The follow examples demonstrate an interactive streaming read from a directory of JSON files in a volume:

display(spark.readStream.format("cloudFiles").option("cloudFiles.format", "json").load("/Volumes/catalog/schema/volumes/path/to/files"))
SELECT * FROM STREAM read_files('/Volumes/catalog/schema/volumes/path/to/files', format => 'json')