Skip to main content

Connect to Amazon Kinesis

This page describes how to use Structured Streaming to read and write data to Amazon Kinesis.

Databricks recommends that you enable S3 VPC endpoints so that all S3 traffic is routed on the AWS network.

note

If you delete and recreate a Kinesis stream, you cannot reuse any existing checkpoint directories to restart a streaming query. You must delete the checkpoint directories and start those queries from scratch. You can reshard with Structured Streaming by increasing the number of shards without interrupting or restarting the stream.

See Recommendations for working with Kinesis.

Authenticate with Amazon Kinesis

Databricks recommends managing your connection to Kinesis using a Databricks service credential. See Create service credentials. Databricks service credentials require Databricks Runtime 16.1 and above.

To use a service credential, do the following:

  1. Create a Databricks service credential using an IAM role with the necessary permissions to access Kinesis. See Step 1: Create an IAM role.
  2. Provide the name of the service credential using the serviceCredential option when defining a streaming read.
note

The Kinesis source requires ListShards, GetRecords, and GetShardIterator permissions. If you encounter Amazon: Access Denied exceptions, check that your IAM role has these permissions. See Controlling Access to Amazon Kinesis Data Streams Resources Using IAM.

Alternate authentication methods

If Databricks service credentials are not available, Databricks has the following alternate authentication methods.

Instance profile

Attach an instance profile during compute configuration. See Instance profiles.

Instance profiles are not supported in standard access mode (formerly shared access mode). See Standard compute requirements and limitations.

Use keys directly

Set the awsAccessKey and awsSecretKey options.

If using keys, store them using Databricks secrets. See Secret management.

Assume IAM role

Some compute configurations allow you to assume an IAM role using the roleArn option. To assume a role, either launch your cluster with permissions to assume the role or provide access keys through awsAccessKey and awsSecretKey.

This method supports cross-account authentication. For more information, see Delegate Access Across AWS Accounts Using IAM Roles.

You can optionally specify the external ID with roleExternalId and a session name with roleSessionName.

Schema

Kinesis returns records with the following schema:

Column

Type

partitionKey

string

data

binary

stream

string

shardId

string

sequenceNumber

string

approximateArrivalTimestamp

timestamp

To deserialize the data in the data column, cast the field to a string.

Quickstart

The following notebook demonstrates how to run WordCount using Structured Streaming with Kinesis.

Kinesis WordCount with Structured Streaming notebook

Open notebook in new tab

Configure Kinesis options

important

In Databricks Runtime 13.3 LTS and above, you can use Trigger.AvailableNow with Kinesis. See Ingest Kinesis records as an incremental batch.

note

In Databricks Runtime 16.1 and above, you can use streamARN to identify Kinesis sources.

For all Databricks Runtime versions, you must specify either streamName or streamARN. You can only provide one of these options.

warning

Do not switch between streamName and streamARN for an active streaming query. Databricks does not support changing these options mid-stream, and restarting the query can result in duplicate records or data loss. If you need to change from streamName to streamARN, start a new streaming query with a fresh checkpoint directory.

The following are common configurations for Kinesis data sources:

Option

Value

Default

Description

streamName

A comma-separated list of stream names.

None

The stream names to subscribe to.

streamARN

A comma-separated list of Kinesis stream ARNs. For example, "arn:aws:kinesis:myarn1,arn:aws:kinesis:myarn2".

None

The ARNs of streams to subscribe to. Available in Databricks Runtime 16.1 and above.

region

Region for the streams to be specified.

Locally resolved region

The region the streams are defined in.

endpoint

Region for the Kinesis data stream.

Locally resolved region

The regional endpoint for Kinesis Data Streams.

initialPosition

latest, trim_horizon, earliest (alias for trim_horizon), at_timestamp.

latest

Where to start reading from in the stream.

Specify at_timestamp as a JSON string using Java default format for timestamps, such as {"at_timestamp": "06/25/2020 10:23:45 PDT"}. The streaming query reads all changes at or after the given timestamp (inclusive). You can explicitly specify formats by providing an additional field in the JSON string, such as {"at_timestamp": "06/25/2020 10:23:45 PDT", "format": "MM/dd/yyyy HH:mm:ss ZZZ"}.

maxRecordsPerFetch

A positive integer.

10,000

How many records to be read per API request to Kinesis. The number of records returned might be higher depending on whether sub-records were aggregated into a single record using the Kinesis Producer Library.

maxFetchRate

A positive decimal representing data rate in MB/s.

1.0 (max = 2.0)

How fast to prefetch data per shard. This option rate limits fetches and avoids Kinesis throttling. 2.0 MB/s is the maximum rate that Kinesis allows.

minFetchPeriod

A duration string, for example, 1s for 1 second.

400ms (min = 200ms)

How long to wait between consecutive prefetch attempts. This option limits the frequency of fetches and avoid Kinesis throttling. 200ms is the minimum because Kinesis allows a maximum of 5 fetches/sec.

maxFetchDuration

A duration string, for example, 1m for 1 minute.

10s

How long to buffer prefetched new data before making it available for processing.

fetchBufferSize

A byte string, for example, 2gb or 10mb.

20gb

How much data to buffer for the next trigger. This option is a stopping condition, not a strict upper bound. More data might be buffered than what is specified for this value.

shardsPerTask

A positive integer.

5

How many Kinesis shards to prefetch from in parallel per Spark task. Ideally, # cores in cluster >= # Kinesis shards / shardsPerTask for minimum query latency and maximum resource usage.

shardFetchInterval

A duration string, for example, 2m for 2 minutes.

1s

How often to poll Kinesis for resharding.

serviceCredential

String

No default.

The name of your Databricks service credential. See Create service credentials.

awsAccessKey

String

No default.

AWS access key.

awsSecretKey

String

No default.

AWS secret access key corresponding to the access key.

roleArn

String

No default.

The Amazon Resource Name (ARN) of the role to assume when accessing Kinesis.

roleExternalId

String

No default.

An optional value that can be used when delegating access to the AWS account. See How to Use an External ID.

roleSessionName

String

No default.

An identifier for the assumed role session that uniquely identifies a session when the same role is assumed by different principals or for different reasons.

coalesceThresholdBlockSize

A positive integer.

10,000,000

The threshold at which the automatic coalesce occurs. If the average block size is less than this value, pre-fetched blocks are coalesced toward the coalesceBinSize.

coalesceBinSize

A positive integer.

128,000,000

The approximate block size after coalescing.

consumerMode

polling or efo.

polling

Consumer type to run the streaming query with. See Configure Kinesis enhanced fan-out (EFO) for streaming query reads. Available in Databricks Runtime 11.3 LTS and above.

requireConsumerDeregistration

true or false.

false

Whether to de-register enhanced fan-out consumer on query termination. Requires efo for consumerMode. Available in Databricks Runtime 11.3 LTS and above.

maxShardsPerDescribe

A positive integer (max 10000).

100

The maximum number of shards to read per API call while listing shards.

consumerName

Single consumer name to use with all streams or comma separated list of consumer names.

Streaming query ID

Consumer name used to register the query with Kinesis service in EFO mode. Available in Databricks Runtime 11.3 LTS and above.

consumerNamePrefix

Empty string or custom prefix string.

databricks_

Prefix used along with consumerName to register consumers with the Kinesis service in EFO mode. Available in Databricks Runtime 16.0 and above.

consumerRefreshInterval

A duration string, for example “1s” for 1 second (max 3600s).

300s

The interval at which the Kinesis EFO consumer registration is checked and refreshed. Available in Databricks Runtime 11.3 LTS and above.

registeredConsumerId

A comma-separated list of consumer names or ARNs.

None

Identifiers for existing consumers in EFO mode. Available in Databricks Runtime 16.1 and above.

registeredConsumerIdType

name or ARN.

None

Specifies whether consumer IDs are names or ARNs. Available in Databricks Runtime 16.1 and above.

note

The default option values are configured so that two readers (Spark or otherwise) can simultaneously consume a Kinesis stream without hitting Kinesis rate limits. If you have more consumers, you must adjust the options accordingly. For example, you might need to reduce maxFetchRate and increase minFetchPeriod.

Low latency monitoring and alerting

Alerting use cases require low latency. To minimize latency:

  • Verify that your streaming query is the only consumer of the Kinesis stream to optimize fetch performance and avoid Kinesis rate limits.
  • Set the option maxFetchDuration to a small value (say, 200ms) to process fetched data as fast as possible. This option is a trade off: it prioritizes faster processing speed per batch instead of a guarantee that the most recent records are consumed in each batch. For example, if you use Trigger.AvailableNow, a small value might cause your query to lag behind the newest records in the Kinesis stream.
  • Set the option minFetchPeriod to 210ms to fetch as frequently as possible.
  • Set the option shardsPerTask or configure the cluster such that # cores in cluster >= 2 * (# Kinesis shards) / shardsPerTask. This guarantees that the background prefetching tasks and the streaming query tasks run concurrently.

If your query is receiving data every 5 seconds, you might hit Kinesis rate limits. Review your configurations.

What metrics does Kinesis report?

Kinesis reports the number of milliseconds a consumer has fallen behind the beginning of a stream for each workspace. The avgMsBehindLatest, maxMsBehindLatest, and minMsBehindLatest metrics provide the average, minimum, and maximum milliseconds across all workspaces in the streaming query process. See Monitoring Structured Streaming queries on Databricks.

If you are running the stream in a notebook, see metrics under the Raw Data tab in the streaming query progress dashboard. Here is an example:

JSON
{
"sources": [
{
"description": "KinesisV2[stream]",
"metrics": {
"avgMsBehindLatest": "32000.0",
"maxMsBehindLatest": "32000",
"minMsBehindLatest": "32000"
}
}
]
}

Ingest Kinesis records as an incremental batch

In Databricks Runtime 13.3 LTS and above, Databricks supports using Trigger.AvailableNow with Kinesis data sources for incremental batch semantics. The following describes the basic configuration:

  1. When a micro-batch read triggers in available now mode, the current time is recorded by the Databricks client.
  2. Databricks polls the source system for all records with timestamps between this recorded time and the previous checkpoint.
  3. Databricks loads these records using Trigger.AvailableNow semantics.

Databricks uses a best-effort mechanism to try and consume all records that exist in Kinesis stream(s) when the streaming query is executed. Because of small potential differences in timestamps and a lack of guarantee in ordering in data sources, some records might not be included in a triggered batch. Omitted records are processed as part of the next triggered micro-batch.

note

If the query continues failing to fetch records from the Kinesis stream even if there are records, try increasing the maxFetchDuration value.

See AvailableNow: Incremental batch processing.

Write to Kinesis

The following code snippet can be used as a ForeachSink to write data to Kinesis. It requires a Dataset[(String, Array[Byte])].

note

The following code snippet provides at least once semantics, not exactly once.

Kinesis Foreach Sink notebook

Open notebook in new tab

Recommendations for working with Kinesis

Kinesis queries might experience latency for a number of reasons. This section provides recommendations for troubleshooting latency.

The Kinesis source runs Spark jobs in a background thread to prefetch Kinesis data periodically and cache it in the memory of the Spark executors. The streaming query processes the cached data after each prefetch step completes and makes the data available for processing. The prefetch step significantly affects the observed end-to-end latency and throughput.

Reduce prefetch latency

To optimize for minimal query latency and maximum resource usage, use the following calculation:

total number of CPU cores in the cluster (across all executors) >= total number of Kinesis shards / shardsPerTask.

important

minFetchPeriod can create multiple GetRecords API calls to the Kinesis shard until it hits ReadProvisionedThroughputExceeded. If an exception occurs, it's not indicative of an issue as the connector maximizes the utilization of the Kinesis shard.

Avoid slowdowns caused by too many rate limit errors

The connector reduces the amount of data read from Kinesis by half each time it encounters a rate limiting error and records this event in the log with a message: "Hit rate limit. Sleeping for 5 seconds."

It is common to see these errors as a stream is being caught up, but after it is, you should no longer see these errors. If you do, you might need to tune either from the Kinesis side (by increasing capacity) or adjust the prefetching options.

Avoid spilling data to disk

If you have a sudden spike in your Kinesis streams, the assigned buffer capacity might fill up and the buffer not be emptied fast enough for new data to be added.

In such cases, Spark spills blocks from the buffer to disk and slows down processing, which affects stream performance. This event appears in the log with a message like this:

Bash
./log4j.txt:879546:20/03/02 17:15:04 INFO BlockManagerInfo: Updated kinesis_49290928_1_ef24cc00-abda-4acd-bb73-cb135aed175c on disk on 10.0.208.13:43458 (current size: 88.4 MB, original size: 0.0 B)

To address this problem, try increasing the cluster memory capacity (either add more nodes or increase the memory per node), or adjust the configuration parameter fetchBufferSize.

Hanging S3 write tasks

You can enable Spark speculation to terminate hanging tasks that would prevent stream processing from proceeding. To ensure that tasks are not terminated too aggressively, tune the quantile and multiplier for this setting carefully. A good starting point is to set spark.speculation.multiplier to 3 and spark.speculation.quantile to 0.95.

Reduce latency associated with checkpointing in stateful streams

Databricks recommends using RocksDB with changelog checkpointing for stateful streaming queries. See Enable changelog checkpointing.

Configure Kinesis enhanced fan-out (EFO) for streaming query reads

In Databricks Runtime 11.3 and above, the Databricks Runtime Kinesis connector provides support for using the Amazon Kinesis enhanced fan-out (EFO) feature.

Kinesis enhanced fan-out is a feature that provides support for enhanced fan-out stream consumers with a dedicated throughput of 2MB/s per shard, per consumer (maximum of 20 consumers per Kinesis stream), and records delivery in push mode instead of pull mode.

By default, a Structured Streaming query configured with EFO mode registers itself as a consumer with dedicated throughput and a unique consumer name and consumer ARN (Amazon Resource Number) in Kinesis Data Streams.

By default, Databricks uses the streaming query ID with the databricks_ prefix to name the new consumer. You can optionally specify the consumerNamePrefix or consumerName options to override this behavior. The consumerName must be a string comprised of letters, numbers, and special characters _ . -.

On query restart, the Kinesis source uses polling mode to replay the latest uncommitted batch if one exists. After the stream replays the uncommitted batch, the source switches back to EFO mode for subsequent reads.

important

A registered EFO consumer incurs additional charges on Amazon Kinesis. To deregister the consumer automatically on query teardown, set the requireConsumerDeregistration option to true. Databricks cannot guarantee de-registration on events such as driver crashes or node failures. In case of job failure, Databricks recommends managing registered consumers directly to prevent excess Kinesis charges.

Advanced consumer configuration options

In Databricks Runtime 16.1 and above, you can configure reads in EFO mode using existing consumers by using the options registeredConsumerId and registeredConsumerIdType, as in the following example:

Python
df = (spark.readStream
.format("kinesis")
.option("streamName", "mystreamname1,mystreamname2")
.option("registeredConsumerId", "consumer1,consumer2")
.option("registeredConsumerIdType", "name")
.load()
)

You can specify existing consumers using either name or ARN. You must provide one consumer ID for each Kinesis stream source specified in the configuration.

Offline consumer management using a Databricks notebook

Databricks provides a consumer management utility to register, list or deregister consumers associated with Kinesis data streams. The following code demonstrates using this utility in a Databricks notebook:

  1. In a new Databricks notebook attached to an active cluster, create a AWSKinesisConsumerManager by providing the necessary authentication information.

    Scala
    import com.databricks.sql.kinesis.AWSKinesisConsumerManager

    val manager = AWSKinesisConsumerManager.newManager()
    .option("serviceCredential", serviceCredentialName)
    .option("region", kinesisRegion)
    .create()
  2. List and display consumers.

    Scala
    val consumers = manager.listConsumers("<stream name>")
    display(consumers)
  3. Register consumer for given stream.

    Scala
    val consumerARN = manager.registerConsumer("<stream name>", "<consumer name>")
  4. Deregister consumer for given stream.

    Scala
    manager.deregisterConsumer("<stream name>", "<consumer name>")