Skip to main content

Connect to Amazon Kinesis

Use Structured Streaming to read and write data to Amazon Kinesis.

Databricks recommends that you enable S3 VPC endpoints so that all S3 traffic is routed on the AWS network.

note

If you delete and recreate a Kinesis stream, you can't reuse any existing checkpoint directories to restart a streaming query. You must delete the checkpoint directories and start those queries from scratch. You can reshard with Structured Streaming by increasing the number of shards without interrupting or restarting the stream.

For recommendations on troubleshooting query latency, see Recommendations for reducing latency with Kinesis.

Authenticate with Amazon Kinesis

In Databricks Runtime 16.1 and above, Databricks recommends that you manage connections to Kinesis with a Databricks service credential. See Create service credentials.

To use a service credential, do the following:

  1. Create a Databricks service credential using an IAM role with the necessary permissions to access Kinesis. See Step 1: Create an IAM role.
  2. Provide the name of the service credential using the serviceCredential option when defining a streaming read.
note

The Kinesis source requires ListShards, GetRecords, and GetShardIterator permissions. If you encounter Amazon: Access Denied exceptions, verify that your IAM role has these permissions. See Controlling Access to Amazon Kinesis Data Streams Resources Using IAM.

Alternate authentication methods

In Databricks Runtime 16.0 and below, Databricks service credentials aren't available. Databricks has the following alternate authentication methods:

Instance profile

Attach an instance profile during compute configuration. See Instance profiles.

Instance profiles aren't supported in standard access mode (formerly shared access mode). See Standard compute requirements and limitations.

Use access keys directly

Set the awsAccessKey and awsSecretKey options.

If using keys, store them using Databricks secrets. See Secret management.

Assume IAM role

Some compute configurations allow you to assume an IAM role using the roleArn option. To assume a role, either launch your cluster with permissions to assume the role or provide access keys through awsAccessKey and awsSecretKey.

This method supports cross-account authentication. For more information, see Delegate Access Across AWS Accounts Using IAM Roles.

You can optionally specify the external ID with roleExternalId and a session name with roleSessionName.

Schema

Kinesis returns records with the following schema:

Column

Type

partitionKey

string

data

binary

stream

string

shardId

string

sequenceNumber

string

approximateArrivalTimestamp

timestamp

To deserialize the data in the data column, cast the field to a string.

Quickstart

The following notebook demonstrates how to run WordCount using Structured Streaming with Kinesis.

Kinesis WordCount with Structured Streaming notebook

Open notebook in new tab

Configure Kinesis options

In Databricks Runtime 13.3 LTS and above, you can use Trigger.AvailableNow with Kinesis. See Ingest Kinesis records as an incremental batch.

In Databricks Runtime 16.1 and above, you can use streamARN to identify Kinesis sources. For all Databricks Runtime versions, you must specify either streamName or streamARN, but not both.

warning

Do not switch between streamName and streamARN for an active streaming query. Databricks doesn't support changing these options mid-stream. Restarting the query can result in duplicate records or data loss. To switch from streamName to streamARN, start a new streaming query with a fresh checkpoint directory.

The following are common configurations for Kinesis data sources:

Option

Value

Default

Description

streamName

A comma-separated list of stream names.

None

The stream names to subscribe to.

streamARN

A comma-separated list of Kinesis stream ARNs. For example, "arn:aws:kinesis:myarn1,arn:aws:kinesis:myarn2".

None

The ARNs of streams to subscribe to. Available in Databricks Runtime 16.1 and above.

region

Region for the streams to be specified.

Locally resolved region

The region the streams are defined in.

endpoint

Region for the Kinesis data stream.

Locally resolved region

The regional endpoint for Kinesis Data Streams.

initialPosition

latest, trim_horizon, earliest (alias for trim_horizon), at_timestamp.

latest

Where to start reading from in the stream.

Specify at_timestamp as a JSON string using Java default format for timestamps, such as {"at_timestamp": "06/25/2020 10:23:45 PDT"}. The streaming query reads all changes at or after the given timestamp (inclusive). You can explicitly specify formats by providing an additional field in the JSON string, such as {"at_timestamp": "06/25/2020 10:23:45 PDT", "format": "MM/dd/yyyy HH:mm:ss ZZZ"}.

maxRecordsPerFetch

A positive integer.

10,000

The number of records to read per API request to Kinesis. The number of records returned might be higher depending on whether sub-records were aggregated into a single record using the Kinesis Producer Library.

maxFetchRate

A positive decimal representing data rate in MB/s.

1.0 (max = 2.0)

How fast to prefetch data per shard. This option rate limits fetches and avoids Kinesis throttling. 2.0 MB/s is the maximum rate that Kinesis allows.

minFetchPeriod

A duration string, for example, 1s for 1 second.

400ms (min = 200ms)

The duration to wait between consecutive prefetch attempts. This option limits the frequency of fetches and avoids Kinesis throttling. 200ms is the minimum because Kinesis allows a maximum of 5 fetches/sec.

maxFetchDuration

A duration string, for example, 1m for 1 minute.

10s

The duration to buffer prefetched new data before making it available for processing.

fetchBufferSize

A byte string, for example, 2gb or 10mb.

20gb

How much data to buffer for the next trigger. This option is a stopping condition, not a strict upper bound. More data might be buffered than what is specified for this value.

shardsPerTask

A positive integer.

5

The number of Kinesis shards to prefetch in parallel per Spark task. Ideally, # cores in cluster >= # Kinesis shards / shardsPerTask for minimum query latency and maximum resource usage.

shardFetchInterval

A duration string, for example, 2m for 2 minutes.

1s

The interval at which to poll Kinesis for resharding.

serviceCredential

String

No default.

The name of your Databricks service credential. See Create service credentials.

awsAccessKey

String

No default.

AWS access key.

awsSecretKey

String

No default.

AWS secret access key corresponding to the access key.

roleArn

String

No default.

The Amazon Resource Name (ARN) of the role to assume when accessing Kinesis.

roleExternalId

String

No default.

An optional value that can be used when delegating access to the AWS account. See How to Use an External ID.

roleSessionName

String

No default.

An identifier for the assumed role session that uniquely identifies a session when the same role is assumed by different principals or for different reasons.

coalesceThresholdBlockSize

A positive integer.

10,000,000

The threshold at which the automatic coalesce occurs. If the average block size is less than this value, pre-fetched blocks are coalesced toward the coalesceBinSize.

coalesceBinSize

A positive integer.

128,000,000

The approximate block size after coalescing.

consumerMode

polling or efo.

polling

Consumer type to run the streaming query with. See Configure Kinesis enhanced fan-out (EFO) for streaming query reads. Available in Databricks Runtime 11.3 LTS and above.

requireConsumerDeregistration

true or false.

false

Whether to de-register enhanced fan-out consumer on query termination. Requires efo for consumerMode. Available in Databricks Runtime 11.3 LTS and above.

maxShardsPerDescribe

A positive integer (max 10000).

100

The maximum number of shards to read per API call while listing shards.

consumerName

Single consumer name to use with all streams or comma-separated list of consumer names.

Streaming query ID

Consumer name used to register the query with Kinesis service in EFO mode. Available in Databricks Runtime 11.3 LTS and above.

consumerNamePrefix

Empty string or custom prefix string.

databricks_

Prefix used along with consumerName to register consumers with the Kinesis service in EFO mode. Available in Databricks Runtime 16.0 and above.

consumerRefreshInterval

A duration string, for example “1s” for 1 second (max 3600s).

300s

The interval at which the Kinesis EFO consumer registration is checked and refreshed. Available in Databricks Runtime 11.3 LTS and above.

registeredConsumerId

A comma-separated list of consumer names or ARNs.

None

Identifiers for existing consumers in EFO mode. Available in Databricks Runtime 16.1 and above.

registeredConsumerIdType

name or ARN.

None

Specifies whether consumer IDs are names or ARNs. Available in Databricks Runtime 16.1 and above.

The default option values allow two readers (Spark or otherwise) to simultaneously consume a Kinesis stream without hitting Kinesis rate limits. If you have more consumers, adjust the options accordingly. For example, you might need to reduce maxFetchRate and increase minFetchPeriod.

Low latency monitoring and alerting

Alerting use cases require low latency. To minimize latency:

  • Verify that your streaming query is the only consumer of the Kinesis stream to optimize fetch performance and avoid Kinesis rate limits.
  • Set the option maxFetchDuration to a small value, such as 200ms, to process fetched data as fast as possible. This option is a trade-off: it prioritizes faster processing speed per batch instead of a guarantee that the most recent records are consumed in each batch. For example, if you use Trigger.AvailableNow, a small value might cause your query to lag behind the newest records in the Kinesis stream.
  • Set the option minFetchPeriod to 210ms to fetch as frequently as possible.
  • Set the option shardsPerTask or configure the cluster such that # cores in cluster >= 2 * (# Kinesis shards) / shardsPerTask. This guarantees that the background prefetching tasks and the streaming query tasks run concurrently.

If your query is receiving data every 5 seconds, you might exceed Kinesis rate limits. Review your configurations.

Monitor Kinesis metrics

Kinesis reports the number of milliseconds a consumer lags behind the beginning of a stream for each workspace. The avgMsBehindLatest, maxMsBehindLatest, and minMsBehindLatest metrics provide the average, minimum, and maximum milliseconds across all workspaces in the streaming query process. See Monitoring Structured Streaming queries on Databricks.

If you are running the stream in a notebook, see metrics under the Raw Data tab in the streaming query progress dashboard. Here is an example:

JSON
{
"sources": [
{
"description": "KinesisV2[stream]",
"metrics": {
"avgMsBehindLatest": "32000.0",
"maxMsBehindLatest": "32000",
"minMsBehindLatest": "32000"
}
}
]
}

Ingest Kinesis records as an incremental batch

In Databricks Runtime 13.3 LTS and above, Databricks supports using Trigger.AvailableNow with Kinesis data sources for incremental batch semantics. The following describes the basic configuration:

  1. When a micro-batch read triggers in available now mode, the current time is recorded by the Databricks client.
  2. Databricks polls the source system for all records with timestamps between this recorded time and the previous checkpoint.
  3. Databricks loads these records using Trigger.AvailableNow semantics.

Databricks uses a best-effort mechanism to try and consume all records that exist in Kinesis streams when the streaming query runs. Because of small potential differences in timestamps and a lack of guarantee in ordering in data sources, a triggered batch might not include some records. Omitted records are processed in the next triggered micro-batch.

note

If the query continues failing to fetch records from the Kinesis stream even if there are records, try increasing the maxFetchDuration value.

See AvailableNow: Incremental batch processing.

Write to Kinesis

The following code snippet can be used as a ForeachSink to write data to Kinesis. It requires a Dataset[(String, Array[Byte])].

note

The following code snippet provides at least once semantics, not exactly once.

Kinesis Foreach Sink notebook

Open notebook in new tab

Recommendations for reducing latency with Kinesis

This section has recommendations for troubleshooting various causes of latency for Kinesis streams.

The Kinesis source runs Spark jobs in a background thread to periodically prefetch Kinesis data and then cache the data in Spark executor memory. After each prefetch step completes, the streaming query can process the cached data. The prefetch step significantly affects the observed end-to-end latency and throughput.

Reduce prefetch latency

To optimize for minimal query latency and maximum resource usage, use the following calculation:

total number of CPU cores in the cluster (across all executors) >= total number of Kinesis shards / shardsPerTask.

important

minFetchPeriod can create multiple GetRecords API calls to the Kinesis shard until it reaches ReadProvisionedThroughputExceeded. If an exception occurs, it might not be an issue because the connector maximizes the utilization of the Kinesis shard.

Avoid slowdowns caused by too many rate limit errors

The connector reduces the amount of data read from Kinesis by half each time it encounters a rate limiting error and records this event in the log with a message: "Hit rate limit. Sleeping for 5 seconds."

You might see these errors while a stream is catching up. If you see these errors after a stream is caught up, you might need to tune the workload by either increasing Kinesis capacity in AWS or by adjusting the prefetching options in Spark.

Avoid spilling data to disk

If you have a sudden increase of data volume in your Kinesis streams, the assigned buffer capacity might fill up and not empty fast enough to add new data. Spark spills data from the buffer to disk, which slows down stream processing, and an event appears in the log with a message like the following:

Bash
./log4j.txt:879546:20/03/02 17:15:04 INFO BlockManagerInfo: Updated kinesis_49290928_1_ef24cc00-abda-4acd-bb73-cb135aed175c on disk on 10.0.208.13:43458 (current size: 88.4 MB, original size: 0.0 B)

To avoid spill, increase the cluster memory capacity by adding more nodes or increasing the memory per node, or reduce the configuration parameter fetchBufferSize.

Suspended S3 write tasks

Enable Spark speculation to terminate suspended tasks that would prevent stream processing from proceeding. To ensure that tasks are not terminated too aggressively, tune the quantile and multiplier for this setting carefully. Databricks recommends that you set spark.speculation.multiplier to 3 and spark.speculation.quantile to 0.95 and adjust as needed.

Reduce latency from checkpointing in stateful streams

Databricks recommends using RocksDB with changelog checkpointing for stateful streaming queries. See Enable changelog checkpointing.

Configure Kinesis enhanced fan-out (EFO) for streaming query reads

In Databricks Runtime 11.3 and above, the Databricks Runtime Kinesis connector provides support for using the Amazon Kinesis enhanced fan-out (EFO) feature.

Kinesis enhanced fan-out provides dedicated throughput of 2 MB/s per shard per consumer (maximum of 20 consumers per stream), and delivers records in push mode instead of pull mode.

By default, a Structured Streaming query configured with EFO mode registers itself as a consumer with dedicated throughput and a unique consumer name and consumer ARN (Amazon Resource Name) in Kinesis Data Streams.

By default, Databricks uses the streaming query ID with the databricks_ prefix to name the new consumer. You can optionally specify the consumerNamePrefix or consumerName options to override this behavior. The consumerName must be a string that contains only letters, numbers, and the special characters _ . -.

On query restart, the Kinesis source uses polling mode to replay the latest uncommitted batch if one exists. After the stream replays the uncommitted batch, the source switches back to EFO mode for subsequent reads.

important

A registered EFO consumer incurs additional charges on Amazon Kinesis. To deregister the consumer automatically on query teardown, set the requireConsumerDeregistration option to true. Databricks cannot guarantee de-registration on events such as driver crashes or node failures. In case of job failure, Databricks recommends managing registered consumers directly to prevent excess Kinesis charges.

Advanced consumer configuration options

In Databricks Runtime 16.1 and above, configure reads in EFO mode for existing consumers by using the options registeredConsumerId and registeredConsumerIdType:

Python
df = (spark.readStream
.format("kinesis")
.option("streamName", "mystreamname1,mystreamname2")
.option("registeredConsumerId", "consumer1,consumer2")
.option("registeredConsumerIdType", "name")
.load()
)

You can specify existing consumers using either name or ARN. You must provide one consumer ID for each Kinesis stream source specified in the configuration.

Offline consumer management using a Databricks notebook

Use the AWSKinesisConsumerManager utility to programmatically register, list, or deregister consumers for Kinesis data streams, instead of manually configuring consumers in your AWS account console. For example, use the utility to create a consumer for a new stream, or, if you plan to permanently stop a stream, use the utility to delete the consumer in AWS.

The consumer manager utility is only available in Scala with compute set to dedicated access mode. See Access modes.

To use this utility in a Databricks notebook:

  1. In a new Databricks notebook attached to an active cluster, create an AWSKinesisConsumerManager with required authentication information.

    Scala
    import com.databricks.sql.kinesis.AWSKinesisConsumerManager

    val manager = AWSKinesisConsumerManager.newManager()
    .option("serviceCredential", serviceCredentialName)
    .option("region", kinesisRegion)
    .create()
  2. List and display consumers.

    Scala
    val consumers = manager.listConsumers("<stream name>")
    display(consumers)
  3. Register a consumer for given stream.

    Scala
    val consumerARN = manager.registerConsumer("<stream name>", "<consumer name>")
  4. Deregister a consumer for given stream.

    Scala
    manager.deregisterConsumer("<stream name>", "<consumer name>")