Best practices: Structured Streaming with Kinesis
This article describes best practices when using Kinesis as a streaming source with Delta Lake and Apache Spark Structured Streaming.
Amazon Kinesis Data Streams (KDS) is a massively scalable and durable real-time data streaming service. KDS continuously captures gigabytes of data per second from hundreds of thousands of sources such as website clickstreams, database event streams, financial transactions, social media feeds, IT logs, and location-tracking events.
KDS is a popular choice for streaming data services on AWS, due to its ease of use and serverless setup. A Kinesis Data Stream consists of individual throughput units, known as shards, and is billed based on shard-hours as well as PUT payload units. Each shard has an estimated ingestion capacity of 1000 records/sec, or 1MB/sec, and an output rate of 2MB/sec.
After collecting data in KDS, you can use the deep integration of Apache Spark Structured Streaming with Delta Lake for applications such as log analytics, clickstream analytics, and real time metrics. You can continuously process the data and store it into Delta tables. The following diagram depicts a typical architecture for these use cases:

Databricks Kinesis Structured Streaming source
Databricks Runtime includes an out-of-the-box Kinesis source. This is a proprietary connector for KDS that is not available in open source. This connector is not based on the Kinesis Client Library (KCL). The Kinesis source architecture is shown in the diagram:

Key technical considerations and best practices
This section includes best practices and troubleshooting information for using Kinesis with Delta Lake.
Optimize prefetching
The Kinesis source runs Spark jobs in a background thread to prefetch Kinesis data periodically and cache it in the memory of the Spark executors. The streaming query processes the cached data after each prefetch step completes and makes the data available for processing. The prefetch step significantly affects the observed end-to-end latency and throughput. You can control performance using the options described in this section.
The default setting for the shardsPerTask
configuration parameter is 5. At scale, however, this could require a very large number of CPU cores, so a setting of 10 might be a good starting point. Then, depending on the complexity of the streaming workload and data volume, you can adjust this value based on the cluster’s Ganglia metrics (CPU, memory, network, and so on). For example, a CPU-bound cluster may require a smaller value with more cores to compensate.
To optimize for minimal query latency and maximum resource usage, use the following calculation:
total number of CPU cores in the cluster (across all executors)
>= total number of Kinesis shards
/ shardsPerTask
.
The parameters used to determine the amount of data read per prefetch from Kinesis are described in this table.
Option |
Value |
Default |
Description |
---|---|---|---|
|
Integer |
10000 |
Number of records to fetch per |
|
Duration string (2m = 2 minutes) |
1s |
How long to wait before updating the list of shards (this is how the system knows that a stream has been resized). |
|
Duration string |
400ms |
How long to wait between consecutive fetch attempts. This setting helps avoid Kinesis throttling. 200ms is the minimum, because the Kinesis service limit is 5 fetches/sec. |
|
Decimal |
1.0 |
Maximum rate of prefetch data per shard in MB/sec. This rate-limits fetches and avoids Kinesis throttling. The maximum rate allowed by Kinesis is 2.0 MB/sec. |
|
Duration string |
10s |
How long to buffer prefetched new data before making it available for processing |
|
Byte string |
20gb |
How much data to buffer for the next trigger. This is a soft limit due to its use as a stopping condition, so more data may be buffered. |
|
Integer |
5 |
How many shards to prefetch in parallel per task. |
Important
minFetchPeriod
can create multiple GetRecords API calls to the Kinesis shard until it hits ReadProvisionedThroughputExceeded. If an exception occurs, it’s not indicative of an issue as the connector maximizes the utilization of the Kinesis shard.
Avoid slowdowns caused by too many rate limit errors
The connector reduces the amount of data read from Kinesis by half each time it encounters a rate limiting error and records this event in the log with a message: "Hit rate limit. Sleeping for 5 seconds."
It is common to see these errors as a stream is being caught up, but after it is, you should no longer see these errors. If you do, you might need to tune either from the Kinesis side (by increasing capacity) or adjust the prefetching options.
Too much data causing writes to disk
If you have a sudden spike in your Kinesis streams, the assigned buffer capacity might fill up and the buffer not be emptied fast enough for new data to be added.
In such cases, Spark spills blocks from the buffer to disk and slows down processing, which affects stream performance. This event appears in the log with a message like this:
./log4j.txt:879546:20/03/02 17:15:04 INFO BlockManagerInfo: Updated kinesis_49290928_1_ef24cc00-abda-4acd-bb73-cb135aed175c on disk on 10.0.208.13:43458 (current size: 88.4 MB, original size: 0.0 B)
To address this problem, try increasing the cluster memory capacity (either add more nodes or increase the memory per node), or adjust the configuration parameter fetchBufferSize
.
Enable S3 VPC endpoints
To ensure that all S3 traffic is routed on the AWS network, Databricks recommends that you enable S3 VPC endpoints.
Hanging S3 write tasks
Hanging tasks can cause long streaming batch durations, which can result in streams having trouble keeping up with the input. In this case, Databricks recommends enabling Spark speculation. To ensure that tasks are not terminated too aggressively, tune the quantile and multiplier for this setting carefully. A good starting point is to set spark.speculation.multiplier
to 3
and spark.speculation.quantile
to 0.95
.
Latency issues while managing state with RocksDB due to slow S3 writes
A common scenario when maintaining stateful operations in your streaming query is large garbage collection pauses, which in turn introduce latency and cause extended batch execution times. This typically happens while maintaining millions of keys in state. In these cases, instead of maintaining state in JVM memory, consider using RocksDB as a state store in native memory or on disk. State changes are propagated automatically to the structured streaming checkpoint. However, you may observe latency when RocksDB writes these checkpoints to S3 due to potential S3 throttling. Try reducing spark.sql.shuffle.partitions
(default 200) to minimize the number of files written. You can also try tuning multi-part upload thresholds (spark.hadoop.fs.s3a.multipart.size
, default 1048576000 bytes) to reduce the number of concurrent S3 writes.
Monitor streaming applications
To monitor streaming applications, Databricks recommends using Spark’s Streaming Query Listener implementation.
Observable metrics are named arbitrary aggregate functions that can be defined on a query (DataFrame). As soon as the execution of a DataFrame reaches a completion point (that is, finishes batch query or reaches streaming epoch) a named event is emitted that contains the metrics for the data processed since the last completion point.
You can observe these metrics by attaching a listener to the Spark session. The listener depends on the execution mode:
Batch mode: Use
QueryExecutionListener
.QueryExecutionListener
is called when the query completes. Access the metrics using theQueryExecution.observedMetrics
map.Streaming, or micro-batch: Use
StreamingQueryListener
.StreamingQueryListener
is called when the streaming query completes an epoch. Access the metrics using theStreamingQueryProgress.observedMetrics
map. Databricks does not support continuous execution streaming.
For example:
// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()
// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
override def onQueryProgress(event: QueryProgressEvent): Unit = {
event.progress.observedMetrics.get("my_event").foreach { row =>
// Trigger if the number of errors exceeds 5 percent
val num_rows = row.getAs[Long]("rc")
val num_error_rows = row.getAs[Long]("erc")
val ratio = num_error_rows.toDouble / num_rows
if (ratio > 0.05) {
// Trigger alert
}
}
}
})
You can also monitor metrics through the UI. If you are using Databricks Runtime 7.0 or above, use the Streaming tab in the Spark UI.