Connect to Amazon Kinesis
This page describes how to use Structured Streaming to read and write data to Amazon Kinesis.
Databricks recommends that you enable S3 VPC endpoints so that all S3 traffic is routed on the AWS network.
If you delete and recreate a Kinesis stream, you cannot reuse any existing checkpoint directories to restart a streaming query. You must delete the checkpoint directories and start those queries from scratch. You can reshard with Structured Streaming by increasing the number of shards without interrupting or restarting the stream.
See Recommendations for working with Kinesis.
Authenticate with Amazon Kinesis
Databricks recommends managing your connection to Kinesis using a Databricks service credential. See Create service credentials. Databricks service credentials require Databricks Runtime 16.1 and above.
To use a service credential, do the following:
- Create a Databricks service credential using an IAM role with the necessary permissions to access Kinesis. See Step 1: Create an IAM role.
- Provide the name of the service credential using the
serviceCredentialoption when defining a streaming read.
The Kinesis source requires ListShards, GetRecords, and GetShardIterator permissions. If you encounter Amazon: Access Denied exceptions, check that your IAM role has these permissions. See Controlling Access to Amazon Kinesis Data Streams Resources Using IAM.
Alternate authentication methods
If Databricks service credentials are not available, Databricks has the following alternate authentication methods.
Instance profile
Attach an instance profile during compute configuration. See Instance profiles.
Instance profiles are not supported in standard access mode (formerly shared access mode). See Standard compute requirements and limitations.
Use keys directly
Set the awsAccessKey and awsSecretKey options.
If using keys, store them using Databricks secrets. See Secret management.
Assume IAM role
Some compute configurations allow you to assume an IAM role using the roleArn option. To assume a role, either launch your cluster with permissions to assume the role or provide access keys through awsAccessKey and awsSecretKey.
This method supports cross-account authentication. For more information, see Delegate Access Across AWS Accounts Using IAM Roles.
You can optionally specify the external ID with roleExternalId and a session name with roleSessionName.
Schema
Kinesis returns records with the following schema:
Column | Type |
|---|---|
| string |
| binary |
| string |
| string |
| string |
| timestamp |
To deserialize the data in the data column, cast the field to a string.
Quickstart
The following notebook demonstrates how to run WordCount using Structured Streaming with Kinesis.
Kinesis WordCount with Structured Streaming notebook
Configure Kinesis options
In Databricks Runtime 13.3 LTS and above, you can use Trigger.AvailableNow with Kinesis. See Ingest Kinesis records as an incremental batch.
In Databricks Runtime 16.1 and above, you can use streamARN to identify Kinesis sources.
For all Databricks Runtime versions, you must specify either streamName or streamARN. You can only provide one of these options.
Do not switch between streamName and streamARN for an active streaming query. Databricks does not support changing these options mid-stream, and restarting the query can result in duplicate records or data loss. If you need to change from streamName to streamARN, start a new streaming query with a fresh checkpoint directory.
The following are common configurations for Kinesis data sources:
Option | Value | Default | Description |
|---|---|---|---|
streamName | A comma-separated list of stream names. | None | The stream names to subscribe to. |
streamARN | A comma-separated list of Kinesis stream ARNs. For example, | None | The ARNs of streams to subscribe to. Available in Databricks Runtime 16.1 and above. |
region | Region for the streams to be specified. | Locally resolved region | The region the streams are defined in. |
endpoint | Region for the Kinesis data stream. | Locally resolved region | The regional endpoint for Kinesis Data Streams. |
initialPosition |
|
| Where to start reading from in the stream. Specify |
maxRecordsPerFetch | A positive integer. | 10,000 | How many records to be read per API request to Kinesis. The number of records returned might be higher depending on whether sub-records were aggregated into a single record using the Kinesis Producer Library. |
maxFetchRate | A positive decimal representing data rate in MB/s. | 1.0 (max = 2.0) | How fast to prefetch data per shard. This option rate limits fetches and avoids Kinesis throttling. 2.0 MB/s is the maximum rate that Kinesis allows. |
minFetchPeriod | A duration string, for example, | 400ms (min = 200ms) | How long to wait between consecutive prefetch attempts. This option limits the frequency of fetches and avoid Kinesis throttling. 200ms is the minimum because Kinesis allows a maximum of 5 fetches/sec. |
maxFetchDuration | A duration string, for example, | 10s | How long to buffer prefetched new data before making it available for processing. |
fetchBufferSize | A byte string, for example, | 20gb | How much data to buffer for the next trigger. This option is a stopping condition, not a strict upper bound. More data might be buffered than what is specified for this value. |
shardsPerTask | A positive integer. | 5 | How many Kinesis shards to prefetch from in parallel per Spark task. Ideally, |
shardFetchInterval | A duration string, for example, | 1s | How often to poll Kinesis for resharding. |
serviceCredential | String | No default. | The name of your Databricks service credential. See Create service credentials. |
awsAccessKey | String | No default. | AWS access key. |
awsSecretKey | String | No default. | AWS secret access key corresponding to the access key. |
roleArn | String | No default. | The Amazon Resource Name (ARN) of the role to assume when accessing Kinesis. |
roleExternalId | String | No default. | An optional value that can be used when delegating access to the AWS account. See How to Use an External ID. |
roleSessionName | String | No default. | An identifier for the assumed role session that uniquely identifies a session when the same role is assumed by different principals or for different reasons. |
coalesceThresholdBlockSize | A positive integer. | 10,000,000 | The threshold at which the automatic coalesce occurs. If the average block size is less than this value, pre-fetched blocks are coalesced toward the |
coalesceBinSize | A positive integer. | 128,000,000 | The approximate block size after coalescing. |
consumerMode |
|
| Consumer type to run the streaming query with. See Configure Kinesis enhanced fan-out (EFO) for streaming query reads. Available in Databricks Runtime 11.3 LTS and above. |
requireConsumerDeregistration |
|
| Whether to de-register enhanced fan-out consumer on query termination. Requires |
maxShardsPerDescribe | A positive integer (max 10000). | 100 | The maximum number of shards to read per API call while listing shards. |
consumerName | Single consumer name to use with all streams or comma separated list of consumer names. | Streaming query ID | Consumer name used to register the query with Kinesis service in EFO mode. Available in Databricks Runtime 11.3 LTS and above. |
consumerNamePrefix | Empty string or custom prefix string. |
| Prefix used along with consumerName to register consumers with the Kinesis service in EFO mode. Available in Databricks Runtime 16.0 and above. |
consumerRefreshInterval | A duration string, for example “1s” for 1 second (max 3600s). | 300s | The interval at which the Kinesis EFO consumer registration is checked and refreshed. Available in Databricks Runtime 11.3 LTS and above. |
registeredConsumerId | A comma-separated list of consumer names or ARNs. | None | Identifiers for existing consumers in EFO mode. Available in Databricks Runtime 16.1 and above. |
registeredConsumerIdType |
| None | Specifies whether consumer IDs are names or ARNs. Available in Databricks Runtime 16.1 and above. |
The default option values are configured so that two readers (Spark or otherwise) can simultaneously consume a Kinesis stream without hitting Kinesis rate limits. If you have more consumers, you must adjust the options accordingly. For example, you might need to reduce maxFetchRate and increase minFetchPeriod.
Low latency monitoring and alerting
Alerting use cases require low latency. To minimize latency:
- Verify that your streaming query is the only consumer of the Kinesis stream to optimize fetch performance and avoid Kinesis rate limits.
- Set the option
maxFetchDurationto a small value (say, 200ms) to process fetched data as fast as possible. This option is a trade off: it prioritizes faster processing speed per batch instead of a guarantee that the most recent records are consumed in each batch. For example, if you useTrigger.AvailableNow, a small value might cause your query to lag behind the newest records in the Kinesis stream. - Set the option
minFetchPeriodto 210ms to fetch as frequently as possible. - Set the option
shardsPerTaskor configure the cluster such that# cores in cluster >= 2 * (# Kinesis shards) / shardsPerTask. This guarantees that the background prefetching tasks and the streaming query tasks run concurrently.
If your query is receiving data every 5 seconds, you might hit Kinesis rate limits. Review your configurations.
What metrics does Kinesis report?
Kinesis reports the number of milliseconds a consumer has fallen behind the beginning of a stream for each workspace. The avgMsBehindLatest, maxMsBehindLatest, and minMsBehindLatest metrics provide the average, minimum, and maximum milliseconds across all workspaces in the streaming query process. See Monitoring Structured Streaming queries on Databricks.
If you are running the stream in a notebook, see metrics under the Raw Data tab in the streaming query progress dashboard. Here is an example:
{
"sources": [
{
"description": "KinesisV2[stream]",
"metrics": {
"avgMsBehindLatest": "32000.0",
"maxMsBehindLatest": "32000",
"minMsBehindLatest": "32000"
}
}
]
}
Ingest Kinesis records as an incremental batch
In Databricks Runtime 13.3 LTS and above, Databricks supports using Trigger.AvailableNow with Kinesis data sources for incremental batch semantics. The following describes the basic configuration:
- When a micro-batch read triggers in available now mode, the current time is recorded by the Databricks client.
- Databricks polls the source system for all records with timestamps between this recorded time and the previous checkpoint.
- Databricks loads these records using
Trigger.AvailableNowsemantics.
Databricks uses a best-effort mechanism to try and consume all records that exist in Kinesis stream(s) when the streaming query is executed. Because of small potential differences in timestamps and a lack of guarantee in ordering in data sources, some records might not be included in a triggered batch. Omitted records are processed as part of the next triggered micro-batch.
If the query continues failing to fetch records from the Kinesis stream even if there are records, try increasing the maxFetchDuration value.
See AvailableNow: Incremental batch processing.
Write to Kinesis
The following code snippet can be used as a ForeachSink to write data to Kinesis. It requires a Dataset[(String, Array[Byte])].
The following code snippet provides at least once semantics, not exactly once.
Kinesis Foreach Sink notebook
Recommendations for working with Kinesis
Kinesis queries might experience latency for a number of reasons. This section provides recommendations for troubleshooting latency.
The Kinesis source runs Spark jobs in a background thread to prefetch Kinesis data periodically and cache it in the memory of the Spark executors. The streaming query processes the cached data after each prefetch step completes and makes the data available for processing. The prefetch step significantly affects the observed end-to-end latency and throughput.
Reduce prefetch latency
To optimize for minimal query latency and maximum resource usage, use the following calculation:
total number of CPU cores in the cluster (across all executors) >= total number of Kinesis shards / shardsPerTask.
minFetchPeriod can create multiple GetRecords API calls to the Kinesis shard until it hits ReadProvisionedThroughputExceeded. If an exception occurs, it's not indicative of an issue as the connector maximizes the utilization of the Kinesis shard.
Avoid slowdowns caused by too many rate limit errors
The connector reduces the amount of data read from Kinesis by half each time it encounters a rate limiting error and records this event in the log with a message: "Hit rate limit. Sleeping for 5 seconds."
It is common to see these errors as a stream is being caught up, but after it is, you should no longer see these errors. If you do, you might need to tune either from the Kinesis side (by increasing capacity) or adjust the prefetching options.
Avoid spilling data to disk
If you have a sudden spike in your Kinesis streams, the assigned buffer capacity might fill up and the buffer not be emptied fast enough for new data to be added.
In such cases, Spark spills blocks from the buffer to disk and slows down processing, which affects stream performance. This event appears in the log with a message like this:
./log4j.txt:879546:20/03/02 17:15:04 INFO BlockManagerInfo: Updated kinesis_49290928_1_ef24cc00-abda-4acd-bb73-cb135aed175c on disk on 10.0.208.13:43458 (current size: 88.4 MB, original size: 0.0 B)
To address this problem, try increasing the cluster memory capacity (either add more nodes or increase the memory per node), or adjust the configuration parameter fetchBufferSize.
Hanging S3 write tasks
You can enable Spark speculation to terminate hanging tasks that would prevent stream processing from proceeding. To ensure that tasks are not terminated too aggressively, tune the quantile and multiplier for this setting carefully. A good starting point is to set spark.speculation.multiplier to 3 and spark.speculation.quantile to 0.95.
Reduce latency associated with checkpointing in stateful streams
Databricks recommends using RocksDB with changelog checkpointing for stateful streaming queries. See Enable changelog checkpointing.
Configure Kinesis enhanced fan-out (EFO) for streaming query reads
In Databricks Runtime 11.3 and above, the Databricks Runtime Kinesis connector provides support for using the Amazon Kinesis enhanced fan-out (EFO) feature.
Kinesis enhanced fan-out is a feature that provides support for enhanced fan-out stream consumers with a dedicated throughput of 2MB/s per shard, per consumer (maximum of 20 consumers per Kinesis stream), and records delivery in push mode instead of pull mode.
By default, a Structured Streaming query configured with EFO mode registers itself as a consumer with dedicated throughput and a unique consumer name and consumer ARN (Amazon Resource Number) in Kinesis Data Streams.
By default, Databricks uses the streaming query ID with the databricks_ prefix to name the new consumer. You can optionally specify the consumerNamePrefix or consumerName options to override this behavior. The consumerName must be a string comprised of letters, numbers, and special characters _ . -.
On query restart, the Kinesis source uses polling mode to replay the latest uncommitted batch if one exists. After the stream replays the uncommitted batch, the source switches back to EFO mode for subsequent reads.
A registered EFO consumer incurs additional charges on Amazon Kinesis. To deregister the consumer automatically on query teardown, set the requireConsumerDeregistration option to true. Databricks cannot guarantee de-registration on events such as driver crashes or node failures. In case of job failure, Databricks recommends managing registered consumers directly to prevent excess Kinesis charges.
Advanced consumer configuration options
In Databricks Runtime 16.1 and above, you can configure reads in EFO mode using existing consumers by using the options registeredConsumerId and registeredConsumerIdType, as in the following example:
- Python
- Scala
df = (spark.readStream
.format("kinesis")
.option("streamName", "mystreamname1,mystreamname2")
.option("registeredConsumerId", "consumer1,consumer2")
.option("registeredConsumerIdType", "name")
.load()
)
val kinesis = spark.readStream
.format("kinesis")
.option("streamName", "mystreamname1,mystreamname2")
.option("registeredConsumerId", "consumer1,consumer2")
.option("registeredConsumerIdType", "name")
.load()
You can specify existing consumers using either name or ARN. You must provide one consumer ID for each Kinesis stream source specified in the configuration.
Offline consumer management using a Databricks notebook
Databricks provides a consumer management utility to register, list or deregister consumers associated with Kinesis data streams. The following code demonstrates using this utility in a Databricks notebook:
-
In a new Databricks notebook attached to an active cluster, create a
AWSKinesisConsumerManagerby providing the necessary authentication information.Scalaimport com.databricks.sql.kinesis.AWSKinesisConsumerManager
val manager = AWSKinesisConsumerManager.newManager()
.option("serviceCredential", serviceCredentialName)
.option("region", kinesisRegion)
.create() -
List and display consumers.
Scalaval consumers = manager.listConsumers("<stream name>")
display(consumers) -
Register consumer for given stream.
Scalaval consumerARN = manager.registerConsumer("<stream name>", "<consumer name>") -
Deregister consumer for given stream.
Scalamanager.deregisterConsumer("<stream name>", "<consumer name>")