Delta Tables - Streaming Reads and Writes

Delta is deeply integrated with Spark Structured Streaming through the readStream (scala/java/python) and writeStream API (scala/java/python). It overcomes many of the limitations typically associated with streaming systems and files, including:

  • Dealing with small files produced by low latency ingest using OPTIMIZE.
  • Maintaining “exactly-once” processing with more than one stream (or concurrent batch jobs).
  • Efficiently discovering which files are new when using files as the source for a stream.

As a Source

When you load a Delta table as a stream source and use it in a streaming query, the query processes all of the data present in the table as well as any new data that arrives after the stream is started. You can load both tables and paths as a stream.

spark.readStream.format("delta").load("/delta/events")
stream("events")

You can also control the maximum size of any micro-batch that Delta gives to streaming by setting the maxFilesPerTrigger option. This specifies the maximum number of new files to be considered in every trigger. The default is 1000.

Ignoring Updates / Deletes

Structured Streaming currently does not handle input that is not an append, and throws an exception if any modifications occur on the table being used as a source. There are two main strategies for dealing with changes that cannot be pushed downstream automatically:

  • Restart the Stream - Since Delta tables retain all history by default, in many cases you can simply delete the output and checkpoint and restart the stream from the beginning.

Attention

Future Support will include: - Block Changes - By setting the appendOnly table property you can block non-append operations to the source table. This is also useful for meeting data governance requirements. - Ignore Changes - If your application does not need to push corrections downstream, you can specify either of the following options for the query:

Set ignoreDeletes to false as a read option to ignore commits that contain only deletes. This allows you to ignore cases when data is being dropped from a table, for example due to a retention policy.

As a Sink

You can also write data into a Delta table using structured streaming. By using the transaction log, Delta can guarantee exactly-once processing, even when there are other streams or batch queries running concurrently against the table.

Append Mode

By default, streams run in append mode, which simply adds new records to the table:

events.writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", "/delta/events/_checkpoints/etl-from-json")
  .save("/delta/events") // as a path

Complete Mode

You can also use structured streaming to replace the entire table with every batch. One example use case is to compute a summary using aggregation:

stream("events")
  .groupBy("customerId")
  .count()
  .writeStream
  .format("delta")
  .outputMode("complete")
  .option("checkpointLocation", "/delta/eventsByCustomer/_checkpoints/streaming-agg")
  .save("/delta/eventsByCustomer")

The example above continuously updates a table that contains the aggregate number of events by customer.

For applications with lower latency requirements, you can save computing resources with one-time triggers. Use these to update summary aggregation tables on a given schedule, processing only new data that has arrived since the last update.