Best practices: Delta Lake Structured Streaming with Amazon Kinesis

This article describes best practices when using Kinesis as a streaming source with Delta Lake and Apache Spark Structured Streaming.

Amazon Kinesis Data Streams (KDS) is a massively scalable and durable real-time data streaming service. KDS continuously captures gigabytes of data per second from hundreds of thousands of sources such as website clickstreams, database event streams, financial transactions, social media feeds, IT logs, and location-tracking events.

KDS is a popular choice for streaming data services on AWS, due to its ease of use and serverless setup. A Kinesis Data Stream consists of individual throughput units, known as shards, and is billed based on shard-hours as well as PUT payload units. Each shard has an estimated ingestion capacity of 1000 records/sec, or 1MB/sec, and an output rate of 2MB/sec.

After collecting data in KDS, you can use the deep integration of Apache Spark Structured Streaming with Delta Lake for applications such as log analytics, clickstream analytics, and real time metrics. You can continuously process the data and store it into Delta tables. The following diagram depicts a typical architecture for these use cases:

Kinesis Delta architecture diagram

Databricks Kinesis Structured Streaming source

Databricks Runtime includes an out-of-the-box Kinesis source. This is a proprietary connector for KDS that is not available in open source. This connector is not based on the Kinesis Client Library (KCL). The Kinesis source architecture is shown in the diagram:

Kinesis source architecture diagram

Key technical considerations and best practices

This section includes best practices and troubleshooting information for using Kinesis with Delta Lake.

Databricks Runtime version

Databricks recommends using Databricks Runtime 6.5 or above for best performance.

Optimize prefetching

The Kinesis source runs Spark jobs in a background thread to prefetch Kinesis data periodically and cache it in the memory of the Spark executors. The streaming query processes the cached data after each prefetch step completes and makes the data available for processing. The prefetch step significantly affects the observed end-to-end latency and throughput. You can control performance using the options described in this section.

The default setting for the shardsPerTask configuration parameter is 5. At scale, however, this could require a very large number of CPU cores, so a setting of 10 might be a good starting point. Then, depending on the complexity of the streaming workload and data volume, you can adjust this value based on the cluster’s Ganglia metrics (CPU, memory, network, and so on). For example, a CPU-bound cluster may require a smaller value with more cores to compensate.

To optimize for minimal query latency and maximum resource usage, use the following calculation:

total number of CPU cores in the cluster (across all executors) >= total number of Kinesis shards / shardsPerTask.

The parameters used to determine the amount of data read per prefetch from Kinesis are described in this table.

Option Value Default Description
maxRecordsPerFetch Integer 10000 Number of records to fetch per getRecords API call.
shardFetchInterval Duration string (2m = 2 minutes) 1s How long to wait before updating the list of shards (this is how the system knows that a stream has been resized).
minFetchPeriod Duration string 400ms How long to wait between consecutive fetch attempts. This setting helps avoid Kinesis throttling. 200ms is the minimum, because the Kinesis service limit is 5 fetches/sec.
maxFetchRate Decimal 1.0 Maximum rate of prefetch data per shard in MB/sec. This rate-limits fetches and avoids Kinesis throttling. The maximum rate allowed by Kinesis is 2.0 MB/sec.
maxFetchDuration Duration string 10s How long to buffer prefetched new data before making it available for processing
fetchBufferSize Byte string 20gb How much data to buffer for the next trigger. This is a soft limit due to its use as a stopping condition, so more data may be buffered.
shardsPerTask Integer 5 How many shards to prefetch in parallel per task.

Avoid slowdowns caused by too many rate limit errors

The connector reduces the amount of data read from Kinesis by half each time it encounters a rate limiting error and records this event in the log with a message: "Hit rate limit. Sleeping for 5 seconds."

In Databricks Runtime 6.4 and below, consistent Kinesis rate limiting can lead to a stream being consumed at 1 kb/sec, which causes an increased IteratorAge and can cause the application to be unable to catch up. If you encounter these issues, consider using Databricks Runtime 6.5 or above. It is common to see these errors as a stream is being caught up, but after it is, you should no longer see these errors. If you do, you may need to tune either from the Kinesis side (by increasing capacity) or adjust the prefetching options.

Too many tasks

In Databricks Runtime 6.4 and below, too many small fetch blocks—typically caused by a stream that is over-provisioned with too many shards—can cause an explosion of map tasks from the Kinesis stream source. For example, one Databricks customer provisioned a 10,000 shard stream but was only pushing a few thousand records per second as part of a test harness. This setup triggered jobs that launched over 300,000 tasks. The workaround was to add a coalesce to the query.

In Databricks Runtime 6.5 and above, the Kinesis source implements a bin-packing algorithm, so this coalesce task occurs automatically.

Too much data causing writes to disk

If you have a sudden spike in your Kinesis streams, the assigned buffer capacity might fill up and the buffer not be emptied fast enough for new data to be added.

In such cases, Spark spills blocks from the buffer to disk and slows down processing, which affects stream performance. This event appears in the log with a message like this:

./log4j.txt:879546:20/03/02 17:15:04 INFO BlockManagerInfo: Updated kinesis_49290928_1_ef24cc00-abda-4acd-bb73-cb135aed175c on disk on 10.0.208.13:43458 (current size: 88.4 MB, original size: 0.0 B)

To address this problem, try increasing the cluster memory capacity (either add more nodes or increase the memory per node), or adjust the configuration parameter fetchBufferSize.

Enable S3 VPC endpoints

To ensure that all S3 traffic is routed on the AWS network, Databricks recommends that you enable S3 VPC endpoints.

Hanging S3 write tasks

Hanging tasks can cause long streaming batch durations, which can result in streams having trouble keeping up with the input. In this case, Databricks recommends enabling Spark speculation. To ensure that tasks are not terminated too aggressively, tune the quantile and multiplier for this setting carefully. A good starting point is to set spark.speculation.multiplier to 3 and spark.speculation.quantile to 0.95.

Latency issues while managing state with RocksDB due to slow S3 writes

A common scenario when maintaining stateful operations in your streaming query is large garbage collection pauses, which in turn introduce latency and cause extended batch execution times. This typically happens while maintaining millions of keys in state. In these cases, instead of maintaining state in JVM memory, consider using RocksDB as a state store in native memory or on disk. State changes are propagated automatically to the structured streaming checkpoint. However, you may observe latency when RocksDB writes these checkpoints to S3 due to potential S3 throttling. Try reducing spark.sql.shuffle.partitions (default 200) to minimize the number of files written. You can also try tuning multi-part upload thresholds (spark.hadoop.fs.s3a.multipart.size, default 1048576000 bytes) to reduce the number of concurrent S3 writes.

Monitor streaming applications

To monitor streaming applications, Databricks recommends using Spark’s Streaming Query Listener implementation.

Observable metrics are named arbitrary aggregate functions that can be defined on a query (DataFrame). As soon as the execution of a DataFrame reaches a completion point (that is, finishes batch query or reaches streaming epoch) a named event is emitted that contains the metrics for the data processed since the last completion point.

You can observe these metrics by attaching a listener to the Spark session. The listener depends on the execution mode:

  • Batch mode: Use QueryExecutionListener.

    QueryExecutionListener is called when the query completes. Access the metrics using the QueryExecution.observedMetrics map.

  • Streaming, or micro-batch: Use StreamingQueryListener.

    StreamingQueryListener is called when the streaming query completes an epoch. Access the metrics using the StreamingQueryProgress.observedMetrics map. Databricks does not support continuous execution streaming.

For example:

// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()

// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
  override def onQueryProgress(event: QueryProgressEvent): Unit = {
    event.progress.observedMetrics.get("my_event").foreach { row =>
      // Trigger if the number of errors exceeds 5 percent
      val num_rows = row.getAs[Long]("rc")
      val num_error_rows = row.getAs[Long]("erc")
      val ratio = num_error_rows.toDouble / num_rows
      if (ratio > 0.05) {
        // Trigger alert
      }
    }
  }
})

You can also monitor metrics through the UI. If you are using Databricks Runtime 7.0 or above, use the Streaming tab in the Spark UI. If you are using Databricks Runtime 6.x, you can use Spark’s observable APIs.

Deleting and recreating a stream

If you delete and then recreate a stream, you must use a new checkpoint location and directory.

Resharding

Structured Streaming supports resharding. In such a case, increasing the number of shards is sufficient. You do not need to switch streams or create temporary streams to divert traffic.

Learn more

Amazon Kinesis