Table streaming reads and writes

Delta Lake is deeply integrated with Spark Structured Streaming through readStream and writeStream. Delta Lake overcomes many of the limitations typically associated with streaming systems and files, including:

  • Coalescing small files produced by low latency ingest
  • Maintaining “exactly-once” processing with more than one stream (or concurrent batch jobs)
  • Efficiently discovering which files are new when using files as the source for a stream

Delta table as a source

When you load a Delta table as a stream source and use it in a streaming query, the query processes all of the data present in the table as well as any new data that arrives after the stream is started.

You can load both paths and tables as a stream.

spark.readStream.format("delta")
  .load("/mnt/delta/events")

import io.delta.implicits._
spark.readStream.delta("/mnt/delta/events")

or

import io.delta.implicits._

spark.readStream.format("delta").table("events")

Limit input rate

The following options are available to control micro-batches:

  • maxFilesPerTrigger: How many new files to be considered in every micro-batch. The default is 1000.
  • maxBytesPerTrigger: How much data gets processed in each micro-batch. This option sets a “soft max”, meaning that a batch processes approximately this amount of data and may process more than the limit. If you use Trigger.Once for your streaming, this option is ignored. This is not set by default.

If you use maxBytesPerTrigger in conjunction with maxFilesPerTrigger, the micro-batch processes data until either the maxFilesPerTrigger or maxBytesPerTrigger limit is reached.

Note

In cases when the source table transactions are cleaned up due to the logRetentionDuration configuration and the stream lags in processing, Delta Lake processes the data corresponding to the latest available transaction history of the source table but does not fail the stream. This can result in data being dropped.

Ignore updates and deletes

Structured Streaming does not handle input that is not an append and throws an exception if any modifications occur on the table being used as a source. There are two main strategies for dealing with changes that cannot be automatically propagated downstream:

  • You can delete the output and checkpoint and restart the stream from the beginning.
  • You can set either of these two options:
    • ignoreDeletes: ignore transactions that delete data at partition boundaries.
    • ignoreChanges: re-process updates if files had to be rewritten in the source table due to a data changing operation such as UPDATE, MERGE INTO, DELETE (within partitions), or OVERWRITE. Unchanged rows may still be emitted, therefore your downstream consumers should be able to handle duplicates. Deletes are not propagated downstream. ignoreChanges subsumes ignoreDeletes. Therefore if you use ignoreChanges, your stream will not be disrupted by either deletions or updates to the source table.

Example

For example, suppose you have a table user_events with date, user_email, and action columns that is partitioned by date. You stream out of the user_events table and you need to delete data from it due to GDPR.

When you delete at partition boundaries (that is, the WHERE is on a partition column), the files are already segmented by value so the delete just drops those files from the metadata. Thus, if you just want to delete data from some partitions, you can use:

spark.readStream.format("delta")
  .option("ignoreDeletes", "true")
  .load("/mnt/delta/user_events")

However, if you have to delete data based on user_email, then you will need to use:

spark.readStream.format("delta")
  .option("ignoreChanges", "true")
  .load("/mnt/delta/user_events")

If you update a user_email with the UPDATE statement, the file containing the user_email in question is rewritten. When you use ignoreChanges, the new record is propagated downstream with all other unchanged records that were in the same file. Your logic should be able to handle these incoming duplicate records.

Specify initial position

Note

This feature is available on Databricks Runtime 7.3 LTS and above.

You can use the following options to specify the starting point of the Delta Lake streaming source without processing the entire table.

  • startingVersion: The Delta Lake version to start from. All table changes starting from this version (inclusive) will be read by the streaming source. You can obtain the commit versions from the version column of the DESCRIBE HISTORY command output.

    In Databricks Runtime 7.4 and above, to return only the latest changes, specify latest.

  • startingTimestamp: The timestamp to start from. All table changes committed at or after the timestamp (inclusive) will be read by the streaming source. One of:

    • A timestamp string. For example, "2019-01-01T00:00:00.000Z".
    • A date string. For example, "2019-01-01".

You cannot set both options at the same time; you can use only one of them. They take effect only when starting a new streaming query. If a streaming query has started and the progress has been recorded in its checkpoint, these options are ignored.

Important

Although you can start the streaming source from a specified version or timestamp, the schema of the streaming source is always the latest schema of the Delta table. You must ensure there is no incompatible schema change to the Delta table after the specified version or timestamp. Otherwise, the streaming source may return incorrect results when reading the data with an incorrect schema.

Example

For example, suppose you have a table user_events. If you want to read changes since version 5, use:

spark.readStream.format("delta")
  .option("startingVersion", "5")
  .load("/mnt/delta/user_events")

If you want to read changes since 2018-10-18, use:

spark.readStream.format("delta")
  .option("startingTimestamp", "2018-10-18")
  .load("/mnt/delta/user_events")

Delta table as a sink

You can also write data into a Delta table using Structured Streaming. The transaction log enables Delta Lake to guarantee exactly-once processing, even when there are other streams or batch queries running concurrently against the table.

Metrics

Note

Available in Databricks Runtime 8.1 and above.

You can find out the number of bytes and number of files yet to be processed in a streaming query process as the numBytesOutstanding and numFilesOutstanding metrics. If you are running the stream in a notebook, you can see these metrics under the Raw Data tab in the streaming query progress dashboard:

{
  "sources" : [
    {
      "description" : "DeltaSource[file:/path/to/source]",
      "metrics" : {
        "numBytesOutstanding" : "3456",
        "numFilesOutstanding" : "8"
      },
    }
  ]
}

Append mode

By default, streams run in append mode, which adds new records to the table.

You can use the path method:

events.writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", "/delta/events/_checkpoints/etl-from-json")
  .start("/delta/events")
events.writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", "/mnt/delta/events/_checkpoints/etl-from-json")
  .start("/mnt/delta/events")

import io.delta.implicits._
events.writeStream
  .outputMode("append")
  .option("checkpointLocation", "/mnt/delta/events/_checkpoints/etl-from-json")
  .delta("/mnt/delta/events")

or the table method:

events.writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", "/delta/events/_checkpoints/etl-from-json")
  .table("events")
events.writeStream
  .outputMode("append")
  .option("checkpointLocation", "/mnt/delta/events/_checkpoints/etl-from-json")
  .table("events")

Complete mode

You can also use Structured Streaming to replace the entire table with every batch. One example use case is to compute a summary using aggregation:

spark.readStream
  .format("delta")
  .load("/mnt/delta/events")
  .groupBy("customerId")
  .count()
  .writeStream
  .format("delta")
  .outputMode("complete")
  .option("checkpointLocation", "/mnt/delta/eventsByCustomer/_checkpoints/streaming-agg")
  .start("/mnt/delta/eventsByCustomer")

The preceding example continuously updates a table that contains the aggregate number of events by customer.

For applications with more lenient latency requirements, you can save computing resources with one-time triggers. Use these to update summary aggregation tables on a given schedule, processing only new data that has arrived since the last update.

Idempotent multi-table writes

Note

Available in Databricks Runtime 8.4 and above.

This section describes how to write to multiple tables using the foreachBatch command. foreachBatch allows the output of every micro-batch in the streaming query to be written to multiple target destinations. However, foreachBatch does not make those writes idempotent as those write attempts lack the information of whether the batch is being re-executed or not. For example, rerunning a failed batch could result in duplicate data writes.

To address this, Delta tables support the following DataFrameWriter options to make the writes idempotent:

  • txnAppId: A unique string that you can pass on each DataFrame write. For example, you can use the StreamingQuery ID as txnAppId.
  • txnVersion: A monotonically increasing number that acts as transaction version.

Delta Lake uses the combination of txnAppId and txnVersion to identify duplicate writes and ignore them.

If a batch write is interrupted with a failure, rerunning the batch uses the same application and batch ID, which would help the runtime correctly identify duplicate writes and ignore them. Application ID (txnAppId) can be any user-generated unique string and does not have to be related to the stream ID.

Warning

If you delete the streaming checkpoint and restart the query, you must provide a different appId; otherwise, writes from the restarted query are ignored because it would contain the same txnAppId and the batch ID would start from 0.

Example

appId = ... // A unique string that is used as application ID.
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...)  // location 1
  batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...)  // location 2
}