Streaming tables
A streaming table is a Delta table with additional support for streaming or incremental data processing. A streaming table can be targeted by one or more flows in an ETL pipeline.
Streaming tables are a good choice for data ingestion for the following reasons:
- Each input row is handled only once, which models the vast majority of ingestion workloads (that is, by appending or upserting rows into a table).
- They can handle large volumes of append-only data.
Streaming tables are also a good choice for low-latency streaming transformations for the following reasons:
- Reason over rows and windows of time
- Handle high volumes of data
- Low latency
The following diagram illustrates how streaming tables work.
On each update, the flows associated with a streaming table read the changed information in a streaming source, and append new information to that table.
Streaming tables are defined and updated by a single DLT pipeline. When you create a DLT pipeline, you explicitly define streaming tables in the source code of the pipeline. Tables defined by a pipeline can’t be changed or updated by any other pipeline. You can define multiple flows to append to a single streaming table.
When you create a streaming table outside of a pipeline in Databricks SQL, Databricks creates a hidden DLT pipeline which is used to update this table.
For more information about flows, see Load and process data incrementally with DLT flows.
Streaming tables for ingestion
Streaming tables are designed for append-only data sources and process inputs only once.
The following example shows how to use a streaming table to ingest new files from cloud storage.
- Python
- SQL
import dlt
# create a streaming table
@dlt.table
def customers_bronze():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.inferColumnTypes", "true")
.load("/Volumes/path/to/files")
)
When you use the spark.readStream
function in a dataset definition, it causes DLT to treat the dataset as a stream, and the table created is a streaming table.
-- create a streaming table
CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM STREAM read_files(
"/volumes/path/to/files",
format => "json"
);
For more details on loading data into streaming table, see Load data with DLT.
The following diagram illustrates how append-only streaming tables work.
A row that has already been appended to a streaming table will not be re-queried with later updates to the pipeline. If you modify the query (for example, from SELECT LOWER (name)
to SELECT UPPER (name)
), existing rows will not update to be upper case, but new rows will be upper case. You can trigger a full refresh to requery all previous data from the source table to update all rows in the streaming table.
Streaming tables and low-latency streaming
Streaming tables are designed for low-latency streaming over bounded state. Streaming tables use checkpoint management, which makes them well-suited for low-latency streaming. However, they expect streams that are naturally bounded or bounded with a watermark.
A naturally bounded stream is produced by a streaming data source that has a well-defined start and end. An example of a naturally bounded stream is reading data from a directory of files where no new files are being added after an initial batch of files is placed. The stream is considered bounded because the number of files is finite, and then, the stream ends after all of the files have been processed.
You can also use a watermark to bound a stream. A watermark in Spark Structured Streaming is a mechanism that helps handle late data by specifying how long the system should wait for delayed events before considering the window of time as complete. An unbounded stream that does not have a watermark can cause a DLT pipeline to fail due to memory pressure.
For more information about stateful stream processing, see Optimize stateful processing in DLT with watermarks.
Stream-snapshot joins
Stream-snapshot joins are joins between a stream and a dimension that is snapshotted when streams start. These joins do not recompute if the dimension changes after the stream has started, because the dimension table is treated as a snapshot in time, and changes to the dimension table after the stream starts are not reflected unless you reload or refresh the dimension table. This is reasonable behavior if you can accept small discrepancies in a join. For example, an approximate join is acceptable when the number of transactions is many orders of magnitude larger than the number of customers.
In the following code example, we join a dimension table, customers, with two rows with an ever-increasing dataset, transactions. We materialize a join between these two datasets in a table called sales_report
. Note that if an outside process updates the customers table by adding a new row (customer_id=3, name=Zoya
), this new row will NOT be present in the join because the static dimension table was snapshotted when streams were started.
import dlt
@dlt.view
# assume this table contains an append-only stream of rows about transactions
# (customer_id=1, value=100)
# (customer_id=2, value=150)
# (customer_id=3, value=299)
# ... <and so on> ...
def v_transactions():
return spark.readStream.table("transactions")
# assume this table contains only these two rows about customers
# (customer_id=1, name=Bilal)
# (customer_id=2, name=Olga)
@dlt.view
def v_customers():
return spark.read.table("customers")
@dlt.table
def sales_report():
facts = spark.readStream.table("v_transactions")
dims = spark.read.table("v_customers")
return (
facts.join(dims, on="customer_id", how="inner"
)
Streaming table limitations
Streaming tables have the following limitations:
- Limited evolution: You can change the query without recomputing the entire dataset. Because a streaming table only sees a row once, you can have different queries operating on different rows. This means you must be aware of all previous versions of the query that are running on your dataset. A full refresh is required to make the streaming table update data that has already been processed.
- State management: Streaming tables are low-latency so you need to ensure that the streams they operate over are naturally bounded or bounded with watermark. For more information, see Optimize stateful processing in DLT with watermarks.
- Joins don’t recompute: Joins in streaming tables do not recompute when dimensions change. This characteristic can be good for “fast-but-wrong” scenarios. If you want your view to always be correct, you might want to use a materialized view. Materialized views are always correct because they automatically recompute joins when dimensions change. For more information, see Materialized views.