Connect to Amazon Kinesis
Use Structured Streaming to read and write data to Amazon Kinesis.
Databricks recommends that you enable S3 VPC endpoints so that all S3 traffic is routed on the AWS network.
If you delete and recreate a Kinesis stream, you can't reuse any existing checkpoint directories to restart a streaming query. You must delete the checkpoint directories and start those queries from scratch. You can reshard with Structured Streaming by increasing the number of shards without interrupting or restarting the stream.
For recommendations on troubleshooting query latency, see Recommendations for reducing latency with Kinesis.
Authenticate with Amazon Kinesis
In Databricks Runtime 16.1 and above, Databricks recommends that you manage connections to Kinesis with a Databricks service credential. See Create service credentials.
To use a service credential, do the following:
- Create a Databricks service credential using an IAM role with the necessary permissions to access Kinesis. See Step 1: Create an IAM role.
- Provide the name of the service credential using the
serviceCredentialoption when defining a streaming read.
The Kinesis source requires ListShards, GetRecords, and GetShardIterator permissions. If you encounter Amazon: Access Denied exceptions, verify that your IAM role has these permissions. See Controlling Access to Amazon Kinesis Data Streams Resources Using IAM.
Alternate authentication methods
In Databricks Runtime 16.0 and below, Databricks service credentials aren't available. Databricks has the following alternate authentication methods:
Instance profile
Attach an instance profile during compute configuration. See Instance profiles.
Instance profiles aren't supported in standard access mode (formerly shared access mode). See Standard compute requirements and limitations.
Use access keys directly
Set the awsAccessKey and awsSecretKey options.
If using keys, store them using Databricks secrets. See Secret management.
Assume IAM role
Some compute configurations allow you to assume an IAM role using the roleArn option. To assume a role, either launch your cluster with permissions to assume the role or provide access keys through awsAccessKey and awsSecretKey.
This method supports cross-account authentication. For more information, see Delegate Access Across AWS Accounts Using IAM Roles.
You can optionally specify the external ID with roleExternalId and a session name with roleSessionName.
Schema
Kinesis returns records with the following schema:
Column | Type |
|---|---|
| string |
| binary |
| string |
| string |
| string |
| timestamp |
To deserialize the data in the data column, cast the field to a string.
Quickstart
The following notebook demonstrates how to run WordCount using Structured Streaming with Kinesis.
Kinesis WordCount with Structured Streaming notebook
Configure Kinesis options
In Databricks Runtime 13.3 LTS and above, you can use Trigger.AvailableNow with Kinesis. See Ingest Kinesis records as an incremental batch.
In Databricks Runtime 16.1 and above, you can use streamARN to identify Kinesis sources. For all Databricks Runtime versions, you must specify either streamName or streamARN, but not both.
Do not switch between streamName and streamARN for an active streaming query. Databricks doesn't support changing these options mid-stream. Restarting the query can result in duplicate records or data loss. To switch from streamName to streamARN, start a new streaming query with a fresh checkpoint directory.
The following are common configurations for Kinesis data sources:
Option | Value | Default | Description |
|---|---|---|---|
| A comma-separated list of stream names. | None | The stream names to subscribe to. |
| A comma-separated list of Kinesis stream ARNs. For example, | None | The ARNs of streams to subscribe to. Available in Databricks Runtime 16.1 and above. |
| Region for the streams to be specified. | Locally resolved region | The region the streams are defined in. |
| Region for the Kinesis data stream. | Locally resolved region | The regional endpoint for Kinesis Data Streams. |
|
|
| Where to start reading from in the stream. Specify |
| A positive integer. | 10,000 | The number of records to read per API request to Kinesis. The number of records returned might be higher depending on whether sub-records were aggregated into a single record using the Kinesis Producer Library. |
| A positive decimal representing data rate in MB/s. | 1.0 (max = 2.0) | How fast to prefetch data per shard. This option rate limits fetches and avoids Kinesis throttling. 2.0 MB/s is the maximum rate that Kinesis allows. |
| A duration string, for example, | 400ms (min = 200ms) | The duration to wait between consecutive prefetch attempts. This option limits the frequency of fetches and avoids Kinesis throttling. 200ms is the minimum because Kinesis allows a maximum of 5 fetches/sec. |
| A duration string, for example, | 10s | The duration to buffer prefetched new data before making it available for processing. |
| A byte string, for example, | 20gb | How much data to buffer for the next trigger. This option is a stopping condition, not a strict upper bound. More data might be buffered than what is specified for this value. |
| A positive integer. | 5 | The number of Kinesis shards to prefetch in parallel per Spark task. Ideally, |
| A duration string, for example, | 1s | The interval at which to poll Kinesis for resharding. |
| String | No default. | The name of your Databricks service credential. See Create service credentials. |
| String | No default. | AWS access key. |
| String | No default. | AWS secret access key corresponding to the access key. |
| String | No default. | The Amazon Resource Name (ARN) of the role to assume when accessing Kinesis. |
| String | No default. | An optional value that can be used when delegating access to the AWS account. See How to Use an External ID. |
| String | No default. | An identifier for the assumed role session that uniquely identifies a session when the same role is assumed by different principals or for different reasons. |
| A positive integer. | 10,000,000 | The threshold at which the automatic coalesce occurs. If the average block size is less than this value, pre-fetched blocks are coalesced toward the |
| A positive integer. | 128,000,000 | The approximate block size after coalescing. |
|
|
| Consumer type to run the streaming query with. See Configure Kinesis enhanced fan-out (EFO) for streaming query reads. Available in Databricks Runtime 11.3 LTS and above. |
|
|
| Whether to de-register enhanced fan-out consumer on query termination. Requires |
| A positive integer (max 10000). | 100 | The maximum number of shards to read per API call while listing shards. |
| Single consumer name to use with all streams or comma-separated list of consumer names. | Streaming query ID | Consumer name used to register the query with Kinesis service in EFO mode. Available in Databricks Runtime 11.3 LTS and above. |
| Empty string or custom prefix string. |
| Prefix used along with consumerName to register consumers with the Kinesis service in EFO mode. Available in Databricks Runtime 16.0 and above. |
| A duration string, for example “1s” for 1 second (max 3600s). | 300s | The interval at which the Kinesis EFO consumer registration is checked and refreshed. Available in Databricks Runtime 11.3 LTS and above. |
| A comma-separated list of consumer names or ARNs. | None | Identifiers for existing consumers in EFO mode. Available in Databricks Runtime 16.1 and above. |
|
| None | Specifies whether consumer IDs are names or ARNs. Available in Databricks Runtime 16.1 and above. |
The default option values allow two readers (Spark or otherwise) to simultaneously consume a Kinesis stream without hitting Kinesis rate limits. If you have more consumers, adjust the options accordingly. For example, you might need to reduce maxFetchRate and increase minFetchPeriod.
Low latency monitoring and alerting
Alerting use cases require low latency. To minimize latency:
- Verify that your streaming query is the only consumer of the Kinesis stream to optimize fetch performance and avoid Kinesis rate limits.
- Set the option
maxFetchDurationto a small value, such as 200ms, to process fetched data as fast as possible. This option is a trade-off: it prioritizes faster processing speed per batch instead of a guarantee that the most recent records are consumed in each batch. For example, if you useTrigger.AvailableNow, a small value might cause your query to lag behind the newest records in the Kinesis stream. - Set the option
minFetchPeriodto 210ms to fetch as frequently as possible. - Set the option
shardsPerTaskor configure the cluster such that# cores in cluster >= 2 * (# Kinesis shards) / shardsPerTask. This guarantees that the background prefetching tasks and the streaming query tasks run concurrently.
If your query is receiving data every 5 seconds, you might exceed Kinesis rate limits. Review your configurations.
Monitor Kinesis metrics
Kinesis reports the number of milliseconds a consumer lags behind the beginning of a stream for each workspace. The avgMsBehindLatest, maxMsBehindLatest, and minMsBehindLatest metrics provide the average, minimum, and maximum milliseconds across all workspaces in the streaming query process. See Monitoring Structured Streaming queries on Databricks.
If you are running the stream in a notebook, see metrics under the Raw Data tab in the streaming query progress dashboard. Here is an example:
{
"sources": [
{
"description": "KinesisV2[stream]",
"metrics": {
"avgMsBehindLatest": "32000.0",
"maxMsBehindLatest": "32000",
"minMsBehindLatest": "32000"
}
}
]
}
Ingest Kinesis records as an incremental batch
In Databricks Runtime 13.3 LTS and above, Databricks supports using Trigger.AvailableNow with Kinesis data sources for incremental batch semantics. The following describes the basic configuration:
- When a micro-batch read triggers in available now mode, the current time is recorded by the Databricks client.
- Databricks polls the source system for all records with timestamps between this recorded time and the previous checkpoint.
- Databricks loads these records using
Trigger.AvailableNowsemantics.
Databricks uses a best-effort mechanism to try and consume all records that exist in Kinesis streams when the streaming query runs. Because of small potential differences in timestamps and a lack of guarantee in ordering in data sources, a triggered batch might not include some records. Omitted records are processed in the next triggered micro-batch.
If the query continues failing to fetch records from the Kinesis stream even if there are records, try increasing the maxFetchDuration value.
See AvailableNow: Incremental batch processing.
Write to Kinesis
The following code snippet can be used as a ForeachSink to write data to Kinesis. It requires a Dataset[(String, Array[Byte])].
The following code snippet provides at least once semantics, not exactly once.
Kinesis Foreach Sink notebook
Recommendations for reducing latency with Kinesis
This section has recommendations for troubleshooting various causes of latency for Kinesis streams.
The Kinesis source runs Spark jobs in a background thread to periodically prefetch Kinesis data and then cache the data in Spark executor memory. After each prefetch step completes, the streaming query can process the cached data. The prefetch step significantly affects the observed end-to-end latency and throughput.
Reduce prefetch latency
To optimize for minimal query latency and maximum resource usage, use the following calculation:
total number of CPU cores in the cluster (across all executors) >= total number of Kinesis shards / shardsPerTask.
minFetchPeriod can create multiple GetRecords API calls to the Kinesis shard until it reaches ReadProvisionedThroughputExceeded. If an exception occurs, it might not be an issue because the connector maximizes the utilization of the Kinesis shard.
Avoid slowdowns caused by too many rate limit errors
The connector reduces the amount of data read from Kinesis by half each time it encounters a rate limiting error and records this event in the log with a message: "Hit rate limit. Sleeping for 5 seconds."
You might see these errors while a stream is catching up. If you see these errors after a stream is caught up, you might need to tune the workload by either increasing Kinesis capacity in AWS or by adjusting the prefetching options in Spark.
Avoid spilling data to disk
If you have a sudden increase of data volume in your Kinesis streams, the assigned buffer capacity might fill up and not empty fast enough to add new data. Spark spills data from the buffer to disk, which slows down stream processing, and an event appears in the log with a message like the following:
./log4j.txt:879546:20/03/02 17:15:04 INFO BlockManagerInfo: Updated kinesis_49290928_1_ef24cc00-abda-4acd-bb73-cb135aed175c on disk on 10.0.208.13:43458 (current size: 88.4 MB, original size: 0.0 B)
To avoid spill, increase the cluster memory capacity by adding more nodes or increasing the memory per node, or reduce the configuration parameter fetchBufferSize.
Suspended S3 write tasks
Enable Spark speculation to terminate suspended tasks that would prevent stream processing from proceeding. To ensure that tasks are not terminated too aggressively, tune the quantile and multiplier for this setting carefully. Databricks recommends that you set spark.speculation.multiplier to 3 and spark.speculation.quantile to 0.95 and adjust as needed.
Reduce latency from checkpointing in stateful streams
Databricks recommends using RocksDB with changelog checkpointing for stateful streaming queries. See Enable changelog checkpointing.
Configure Kinesis enhanced fan-out (EFO) for streaming query reads
In Databricks Runtime 11.3 and above, the Databricks Runtime Kinesis connector provides support for using the Amazon Kinesis enhanced fan-out (EFO) feature.
Kinesis enhanced fan-out provides dedicated throughput of 2 MB/s per shard per consumer (maximum of 20 consumers per stream), and delivers records in push mode instead of pull mode.
By default, a Structured Streaming query configured with EFO mode registers itself as a consumer with dedicated throughput and a unique consumer name and consumer ARN (Amazon Resource Name) in Kinesis Data Streams.
By default, Databricks uses the streaming query ID with the databricks_ prefix to name the new consumer. You can optionally specify the consumerNamePrefix or consumerName options to override this behavior. The consumerName must be a string that contains only letters, numbers, and the special characters _ . -.
On query restart, the Kinesis source uses polling mode to replay the latest uncommitted batch if one exists. After the stream replays the uncommitted batch, the source switches back to EFO mode for subsequent reads.
A registered EFO consumer incurs additional charges on Amazon Kinesis. To deregister the consumer automatically on query teardown, set the requireConsumerDeregistration option to true. Databricks cannot guarantee de-registration on events such as driver crashes or node failures. In case of job failure, Databricks recommends managing registered consumers directly to prevent excess Kinesis charges.
Advanced consumer configuration options
In Databricks Runtime 16.1 and above, configure reads in EFO mode for existing consumers by using the options registeredConsumerId and registeredConsumerIdType:
- Python
- Scala
df = (spark.readStream
.format("kinesis")
.option("streamName", "mystreamname1,mystreamname2")
.option("registeredConsumerId", "consumer1,consumer2")
.option("registeredConsumerIdType", "name")
.load()
)
val kinesis = spark.readStream
.format("kinesis")
.option("streamName", "mystreamname1,mystreamname2")
.option("registeredConsumerId", "consumer1,consumer2")
.option("registeredConsumerIdType", "name")
.load()
You can specify existing consumers using either name or ARN. You must provide one consumer ID for each Kinesis stream source specified in the configuration.
Offline consumer management using a Databricks notebook
Use the AWSKinesisConsumerManager utility to programmatically register, list, or deregister consumers for Kinesis data streams, instead of manually configuring consumers in your AWS account console. For example, use the utility to create a consumer for a new stream, or, if you plan to permanently stop a stream, use the utility to delete the consumer in AWS.
The consumer manager utility is only available in Scala with compute set to dedicated access mode. See Access modes.
To use this utility in a Databricks notebook:
-
In a new Databricks notebook attached to an active cluster, create an
AWSKinesisConsumerManagerwith required authentication information.Scalaimport com.databricks.sql.kinesis.AWSKinesisConsumerManager
val manager = AWSKinesisConsumerManager.newManager()
.option("serviceCredential", serviceCredentialName)
.option("region", kinesisRegion)
.create() -
List and display consumers.
Scalaval consumers = manager.listConsumers("<stream name>")
display(consumers) -
Register a consumer for given stream.
Scalaval consumerARN = manager.registerConsumer("<stream name>", "<consumer name>") -
Deregister a consumer for given stream.
Scalamanager.deregisterConsumer("<stream name>", "<consumer name>")