Connect to Amazon Kinesis
This article describes how you can use Structured Streaming to read and write data to Amazon Kinesis.
Databricks recommends that you enable S3 VPC endpoints to ensure that all S3 traffic is routed on the AWS network.
Note
If you delete and recreate a Kinesis stream, you cannot reuse any existing checkpoint directories to restart a streaming query. You must delete the checkpoint directories and start those queries from scratch. You can reshard with Structured Streaming by increasing the number of shards without interupting or restarting the stream.
See Recommendations for working with Kinesis.
Authenticate with Amazon Kinesis
Databricks recommends managing your connection to Kinesis using an instance profile. See Instance profiles.
Warning
Instance profiles are not supported in shared access mode. Use either single user access mode or an alternate authentication method with shared access mode. See Compute access mode limitations for Unity Catalog.
If you want to use keys for access, you can provide them using the options awsAccessKey
and awsSecretKey
.
You can also assume an IAM role using the roleArn
option. You can optionally specify the external ID with roleExternalId
and a session name with roleSessionName
. In order to assume a role, you can either launch your cluster with permissions to assume the role or provide access keys through awsAccessKey
and awsSecretKey
. For cross-account authentication, Databricks recommends using roleArn
to hold the assumed role, which can then be assumed through your Databricks AWS account. For more information about cross-account authentication, see Delegate Access Across AWS Accounts Using IAM Roles.
Note
The Kinesis source requires ListShards
, GetRecords
, and GetShardIterator
permissions. If you encounter Amazon: Access Denied
exceptions, check that your user or profile has these permissions. See Controlling Access to Amazon Kinesis Data Streams Resources Using IAM for more details.
Schema
Kinesis returns records with the following schema:
Column |
Type |
---|---|
partitionKey |
string |
data |
binary |
stream |
string |
shardId |
string |
sequenceNumber |
string |
approximateArrivalTimestamp |
timestamp |
To deserialize the data in the data
column, you can cast the field to a string.
Quickstart
The following notebook demonstrates how to run WordCount using Structured Streaming with Kinesis.
Configure Kinesis options
Important
In Databricks Runtime 13.3 LTS and above, you can use Trigger.AvailableNow
with Kinesis. See Ingest Kinesis records as an incremental batch.
Note
In Databricks Runtime 16.1 and above, you can use streamARN
to identify Kinesis sources.
For all Databricks Runtime versions, you must specify either streamName
or streamARN
. You can only provide one of these options.
The following are common configurations for Kinesis data sources:
Option |
Value |
Default |
Description |
---|---|---|---|
streamName |
A comma-separated list of stream names. |
None |
The stream names to subscribe to. |
streamARN |
A comma-separated list of Kinesis stream ARNs. For example, |
None |
The ARNs of streams to subscribe to. |
region |
Region for the streams to be specified. |
Locally resolved region |
The region the streams are defined in. |
endpoint |
Region for the Kinesis data stream. |
Locally resolved region |
The regional endpoint for Kinesis Data Streams. |
initialPosition |
|
|
Where to start reading from in the stream. Specify |
maxRecordsPerFetch |
A positive integer. |
10,000 |
How many records to be read per API request to Kinesis. Number of records returned may actually be higher depending on whether sub-records were aggregated into a single record using the Kinesis Producer Library. |
maxFetchRate |
A positive decimal representing data rate in MB/s. |
1.0 (max = 2.0) |
How fast to prefetch data per shard. This is to rate limit on fetches and avoid Kinesis throttling. 2.0 MB/s is the maximum rate that Kinesis allows. |
minFetchPeriod |
A duration string, for example, |
400ms (min = 200ms) |
How long to wait between consecutive prefetch attempts. This is to limit frequency of fetches and avoid Kinesis throttling. 200ms is the minimum as Kinesis allows a maximum of 5 fetches/sec. |
maxFetchDuration |
A duration string, for example, |
10s |
How long to buffer prefetched new data before making it available for processing. |
fetchBufferSize |
A byte string, for example, |
20gb |
How much data to buffer for the next trigger. This is used as a stopping condition and not a strict upper bound,therefore more data may be buffered than what’s specified for this value. |
shardsPerTask |
A positive integer. |
5 |
How many Kinesis shards to prefetch from in parallel per Spark task. Ideally |
shardFetchInterval |
A duration string, for example, |
1s |
How often to poll Kinesis for resharding. |
awsAccessKey |
String |
No default. |
AWS access key. |
awsSecretKey |
String |
No default. |
AWS secret access key corresponding to the access key. |
roleArn |
String |
No default. |
The Amazon Resource Name (ARN) of the role to assume when accessing Kinesis. |
roleExternalId |
String |
No default. |
An optional value that can be used when delegating access to the AWS account. See How to Use an External ID. |
roleSessionName |
String |
No default. |
An identifier for the assumed role session that uniquely identifies a session when the same role is assumed by different principals or for different reasons. |
coalesceThresholdBlockSize |
A positive integer. |
10,000,000 |
The threshold at which the automatic coalesce occurs. If the average block size is less than this value, pre-fetched blocks are coalesced toward the |
coalesceBinSize |
A positive integer. |
128,000,000 |
The approximate block size after coalescing. |
consumerMode |
|
|
Consumer type to run the streaming query with. See Configure Kinesis enhanced fan-out (EFO) for streaming query reads. |
requireConsumerDeregistration |
|
|
Whether to de-register enhanced fan-out consumer on query termination. Requires |
maxShardsPerDescribe |
A positive integer (max 10000). |
100 |
The maximum number of shards to read per API call while listing shards. |
consumerName |
Single consumer name to use with all streams or comma separated list of consumer names. |
Streaming query ID |
Consumer name used to register the query with Kinesis service in EFO mode. |
consumerNamePrefix |
Empty string or custom prefix string. |
databricks_ |
Prefix used along with consumerName to register consumers with the Kinesis service in EFO mode. Available in Databricks Runtime 16.0 and above. |
consumerRefreshInterval |
A duration string, for example “1s” for 1 second (max 3600s). |
300s |
The interval at which the Kinesis EFO consumer registration is checked and refreshed. |
registeredConsumerId |
A comma-separated list of consumer names or ARNs. |
None |
Identifiers for existing consumers in EFO mode. |
registeredConsumerIdType |
|
None |
Specifies whether consumer IDs are names or ARNs. |
Note
The default values of the options have been chosen such that two readers (Spark or otherwise) can simultaneously consume a Kinesis stream without hitting Kinesis rate limits. If you have more consumers, you have to adjust the options accordingly. For example, you may have to reduce maxFetchRate
, and increase minFetchPeriod
.
Low latency monitoring and alerting
When you have an alerting use case, you would want lower latency. To achieve that:
Ensure that there is only one consumer (that is, only your streaming query and no one else) of the Kinesis stream, so that we can optimize your only streaming query to fetch as fast as possible without running into Kinesis rate limits.
Set the option
maxFetchDuration
to a small value (say, 200ms) to start processing fetched data as fast as possible. If you are usingTrigger.AvailableNow
, this increases the chances of not being able to keep up with the newest records in the Kinesis stream.Set the option
minFetchPeriod
to 210ms to fetch as frequently as possible.Set the option
shardsPerTask
or configure the cluster such that# cores in cluster >= 2 * (# Kinesis shards) / shardsPerTask
. This ensures that the background prefetching tasks and the streaming query tasks can execute concurrently.
If you see that your query is receiving data every 5 seconds, then it is likely that you are hitting Kinesis rate limits. Review your configurations.
What metrics does Kinesis report?
Kinesis reports the number of milliseconds a consumer has fallen behind the beginning of a stream for each workspace. You can get the average, minimum, and maximum of the number of milliseconds among all the workspaces in the streaming query process as the avgMsBehindLatest
, maxMsBehindLatest
, and minMsBehindLatest
metrics. See sources metrics object (Kinesis).
If you are running the stream in a notebook, you can see metrics under the Raw Data tab in the streaming query progress dashboard, such as the following example:
{
"sources" : [ {
"description" : "KinesisV2[stream]",
"metrics" : {
"avgMsBehindLatest" : "32000.0",
"maxMsBehindLatest" : "32000",
"minMsBehindLatest" : "32000"
},
} ]
}
Ingest Kinesis records as an incremental batch
In Databricks Runtime 13.3 LTS and above, Databricks supports using Trigger.AvailableNow
with Kinesis data sources for incremental batch semantics. The following describes the basic configuration:
When a micro-batch read triggers in available now mode, the current time is recorded by the Databricks client.
Databricks polls the source system for all records with timestamps between this recorded time and the previous checkpoint.
Databricks loads these records using
Trigger.AvailableNow
semantics.
Databricks uses a best-effort mechanism to try and consume all records that exist in Kinesis stream(s) when the streaming query is executed. Because of small potential differences in timestamps and a lack of guarantee in ordering in data sources, some records might not be included in a triggered batch. Omitted records are processed as part of the next triggered micro-batch.
Note
If the query continues failing to fetch records from the Kinesis stream even if there are records, try increasing the maxFetchDuration
value.
Write to Kinesis
The following code snippet can be used as a ForeachSink
to write data to Kinesis. It requires a Dataset[(String, Array[Byte])]
.
Note
The following code snippet provides at least once semantics, not exactly once.
Recommendations for working with Kinesis
Kinesis queries might experience latency for a number of reasons. This section provides recommendations for troubleshooting latency.
The Kinesis source runs Spark jobs in a background thread to prefetch Kinesis data periodically and cache it in the memory of the Spark executors. The streaming query processes the cached data after each prefetch step completes and makes the data available for processing. The prefetch step significantly affects the observed end-to-end latency and throughput.
Reduce prefetch latency
To optimize for minimal query latency and maximum resource usage, use the following calculation:
total number of CPU cores in the cluster (across all executors)
>= total number of Kinesis shards
/ shardsPerTask
.
Important
minFetchPeriod
can create multiple GetRecords API calls to the Kinesis shard until it hits ReadProvisionedThroughputExceeded
. If an exception occurs, it’s not indicative of an issue as the connector maximizes the utilization of the Kinesis shard.
Avoid slowdowns caused by too many rate limit errors
The connector reduces the amount of data read from Kinesis by half each time it encounters a rate limiting error and records this event in the log with a message: "Hit rate limit. Sleeping for 5 seconds."
It is common to see these errors as a stream is being caught up, but after it is, you should no longer see these errors. If you do, you might need to tune either from the Kinesis side (by increasing capacity) or adjust the prefetching options.
Avoid spilling data to disk
If you have a sudden spike in your Kinesis streams, the assigned buffer capacity might fill up and the buffer not be emptied fast enough for new data to be added.
In such cases, Spark spills blocks from the buffer to disk and slows down processing, which affects stream performance. This event appears in the log with a message like this:
./log4j.txt:879546:20/03/02 17:15:04 INFO BlockManagerInfo: Updated kinesis_49290928_1_ef24cc00-abda-4acd-bb73-cb135aed175c on disk on 10.0.208.13:43458 (current size: 88.4 MB, original size: 0.0 B)
To address this problem, try increasing the cluster memory capacity (either add more nodes or increase the memory per node), or adjust the configuration parameter fetchBufferSize
.
Hanging S3 write tasks
You can enable Spark speculation to terminate hanging tasks that would prevent stream processing from proceeding. To ensure that tasks are not terminated too aggressively, tune the quantile and multiplier for this setting carefully. A good starting point is to set spark.speculation.multiplier
to 3
and spark.speculation.quantile
to 0.95
.
Reduce latency associated with checkpointing in stateful streams
Databricks recommends using RocksDB with changelog checkpointing for stateful streaming queries. See Enable changelog checkpointing.
Configure Kinesis enhanced fan-out (EFO) for streaming query reads
In Databricks Runtime 11.3 and above, the Databricks Runtime Kinesis connector provides support for using the Amazon Kinesis enhanced fan-out (EFO) feature.
Kinesis enhanced fan-out is a feature that provides support for enhanced fan-out stream consumers with a dedicated throughput of 2MB/s per shard, per consumer (maximum of 20 consumers per Kinesis stream), and records delivery in push mode instead of pull mode.
By default, a Structured Streaming query configured with EFO mode registers itself as a consumer with dedicated throughput and a unique consumer name and consumer ARN (Amazon Resource Number) in Kinesis Data Streams.
By default, Databricks uses the streaming query ID with the databricks_
prefix to name the new consumer. You can optionally specify the consumerNamePrefix
or consumerName
options to override this behavior. The consumerName
must be a string comprised of letters, numbers, and special characters _ . -
.
Important
A registered EFO consumer incurs additional charges on Amazon Kinesis. To deregister the consumer automatically on query teardown, set the requireConsumerDeregistration
option to true
. Databricks cannot guarantee de-registration on events such as driver crashes or node failures. In case of job failure, Databricks recommends managing registered consumers directly to prevent excess Kinesis charges.
Advanced consumer configuration options
In Databricks Runtime 16.1 and above, you can configure reads in EFO mode using existing consumers by using the options registeredConsumerId
and registeredConsumerIdType
, as in the following example:
df = (spark.readStream
.format("kinesis")
.option("streamName", "mystreamname1,mystreamname2")
.option("registeredConsumerId", "consumer1,consumer2")
.option("registeredConsumerIdType", "name")
.load()
)
val kinesis = spark.readStream
.format("kinesis")
.option("streamName", "mystreamname1,mystreamname2")
.option("registeredConsumerId", "consumer1,consumer2")
.option("registeredConsumerIdType", "name")
.load()
You can specify existing consumers using either name or ARN. You must provide one consumer ID for each Kinesis stream source specified in the configuration.
Offline consumer management using a Databricks notebook
Databricks provides a consumer management utility to register, list or deregister consumers associated with Kinesis data streams. The following code demonstrates using this utility in a Databricks notebook:
In a new Databricks notebook attached to an active cluster, create a
AWSKinesisConsumerManager
by providing the necessary authentication information.import com.databricks.sql.kinesis.AWSKinesisConsumerManager val manager = AWSKinesisConsumerManager.newManager() .option("awsAccessKey", awsAccessKeyId) .option("awsSecretKey", awsSecretKey) .option("region", kinesisRegion) .create()
List and display consumers.
val consumers = manager.listConsumers("<stream name>") display(consumers)
Register consumer for given stream.
val consumerARN = manager.registerConsumer("<stream name>", "<consumer name>")
Deregister consumer for given stream.
manager.deregisterConsumer("<stream name>", "<consumer name>")