Skip to main content

Connect to Amazon Kinesis

Use Structured Streaming to read and write data to Amazon Kinesis.

Databricks recommends that you enable S3 VPC endpoints so that all S3 traffic is routed on the AWS network.

note

If you delete and recreate a Kinesis stream, you can't reuse any existing checkpoint directories to restart a streaming query. You must delete the checkpoint directories and start those queries from scratch. You can reshard with Structured Streaming by increasing the number of shards without interrupting or restarting the stream.

For recommendations on troubleshooting query latency, see Recommendations for reducing latency with Kinesis.

Authenticate with Amazon Kinesis

In Databricks Runtime 16.1 and above, Databricks recommends that you manage connections to Kinesis with a Databricks service credential. See Create service credentials.

To use a service credential, do the following:

  1. Create a Databricks service credential using an IAM role with the necessary permissions to access Kinesis. See Step 1: Create an IAM role.
  2. Provide the name of the service credential using the serviceCredential option when defining a streaming read.
note

The Kinesis source requires ListShards, GetRecords, and GetShardIterator permissions. If you encounter Amazon: Access Denied exceptions, verify that your IAM role has these permissions. See Controlling Access to Amazon Kinesis Data Streams Resources Using IAM.

Alternate authentication methods

In Databricks Runtime 16.0 and below, Databricks service credentials aren't available. Databricks has the following alternate authentication methods:

Instance profile

Attach an instance profile during compute configuration. See Instance profiles.

Instance profiles aren't supported in standard access mode (formerly shared access mode). See Standard compute requirements and limitations.

Use access keys directly

Set the awsAccessKey and awsSecretKey options.

If using keys, store them using Databricks secrets. See Secret management.

Assume IAM role

Some compute configurations allow you to assume an IAM role using the roleArn option. To assume a role, either launch your cluster with permissions to assume the role or provide access keys through awsAccessKey and awsSecretKey.

This method supports cross-account authentication. For more information, see Delegate Access Across AWS Accounts Using IAM Roles.

You can optionally specify the external ID with roleExternalId and a session name with roleSessionName.

Schema

Kinesis returns records with the following schema:

Column

Type

partitionKey

string

data

binary

stream

string

shardId

string

sequenceNumber

string

approximateArrivalTimestamp

timestamp

To deserialize the data in the data column, cast the field to a string.

Quickstart

The following notebook demonstrates how to run WordCount using Structured Streaming with Kinesis.

Kinesis WordCount with Structured Streaming notebook

Open notebook in new tab

Configure Kinesis options

In Databricks Runtime 13.3 LTS and above, you can use Trigger.AvailableNow with Kinesis. See Ingest Kinesis records as an incremental batch.

In Databricks Runtime 16.1 and above, you can use streamARN to identify Kinesis sources. For all Databricks Runtime versions, you must specify either streamName or streamARN, but not both.

warning

Do not switch between streamName and streamARN for an active streaming query. Databricks doesn't support changing these options mid-stream. Restarting the query can result in duplicate records or data loss. To switch from streamName to streamARN, start a new streaming query with a fresh checkpoint directory.

For the full list of options, see Kinesis.

Low latency monitoring and alerting

Alerting use cases require low latency. To minimize latency:

  • Verify that your streaming query is the only consumer of the Kinesis stream to optimize fetch performance and avoid Kinesis rate limits.
  • Set the option maxFetchDuration to a small value, such as 200ms, to process fetched data as fast as possible. This option is a trade-off: it prioritizes faster processing speed per batch instead of a guarantee that the most recent records are consumed in each batch. For example, if you use Trigger.AvailableNow, a small value might cause your query to lag behind the newest records in the Kinesis stream.
  • Set the option minFetchPeriod to 210ms to fetch as frequently as possible.
  • Set the option shardsPerTask or configure the cluster such that # cores in cluster >= 2 * (# Kinesis shards) / shardsPerTask. This guarantees that the background prefetching tasks and the streaming query tasks run concurrently.

If your query is receiving data every 5 seconds, you might exceed Kinesis rate limits. Review your configurations.

Monitor Kinesis metrics

Kinesis reports the number of milliseconds a consumer lags behind the beginning of a stream for each workspace. The avgMsBehindLatest, maxMsBehindLatest, and minMsBehindLatest metrics provide the average, minimum, and maximum milliseconds across all workspaces in the streaming query process. See Monitoring Structured Streaming queries on Databricks.

If you are running the stream in a notebook, see metrics under the Raw Data tab in the streaming query progress dashboard. Here is an example:

JSON
{
"sources": [
{
"description": "KinesisV2[stream]",
"metrics": {
"avgMsBehindLatest": "32000.0",
"maxMsBehindLatest": "32000",
"minMsBehindLatest": "32000"
}
}
]
}

Ingest Kinesis records as an incremental batch

In Databricks Runtime 13.3 LTS and above, Databricks supports using Trigger.AvailableNow with Kinesis data sources for incremental batch semantics. The following describes the basic configuration:

  1. When a micro-batch read triggers in available now mode, the current time is recorded by the Databricks client.
  2. Databricks polls the source system for all records with timestamps between this recorded time and the previous checkpoint.
  3. Databricks loads these records using Trigger.AvailableNow semantics.

Databricks uses a best-effort mechanism to try and consume all records that exist in Kinesis streams when the streaming query runs. Because of small potential differences in timestamps and a lack of guarantee in ordering in data sources, a triggered batch might not include some records. Omitted records are processed in the next triggered micro-batch.

note

If the query continues failing to fetch records from the Kinesis stream even if there are records, try increasing the maxFetchDuration value.

See AvailableNow: Incremental batch processing.

Write to Kinesis

Use the following code snippet as a ForeachSink to write data to Kinesis. It requires a Dataset[(String, Array[Byte])].

note

The following code snippet provides at least once semantics, not exactly once.

Kinesis Foreach Sink notebook

Open notebook in new tab

Recommendations for reducing latency with Kinesis

This section has recommendations for troubleshooting various causes of latency for Kinesis streams.

The Kinesis source runs Spark jobs in a background thread to periodically prefetch Kinesis data and then cache the data in Spark executor memory. After each prefetch step completes, the streaming query can process the cached data. The prefetch step significantly affects the observed end-to-end latency and throughput.

Reduce prefetch latency

To optimize for minimal query latency and maximum resource usage, use the following calculation:

total number of CPU cores in the cluster (across all executors) >= total number of Kinesis shards / shardsPerTask.

important

minFetchPeriod can create multiple GetRecords API calls to the Kinesis shard until it reaches ReadProvisionedThroughputExceeded. If an exception occurs, it might not be an issue because the connector maximizes the utilization of the Kinesis shard.

Avoid slowdowns caused by too many rate limit errors

The connector reduces the amount of data read from Kinesis by half each time it encounters a rate limiting error and records this event in the log with a message: "Hit rate limit. Sleeping for 5 seconds."

You might see these errors while a stream is catching up. If you see these errors after a stream is caught up, you might need to tune the workload by either increasing Kinesis capacity in AWS or by adjusting the prefetching options in Spark.

Avoid disk spill

If you have a sudden increase of data volume in your Kinesis streams, the assigned buffer capacity might fill up and not empty fast enough to add new data. Spark spills data from the buffer to disk, which slows down stream processing, and an event appears in the log with a message like the following:

Bash
./log4j.txt:879546:20/03/02 17:15:04 INFO BlockManagerInfo: Updated kinesis_49290928_1_ef24cc00-abda-4acd-bb73-cb135aed175c on disk on 10.0.208.13:43458 (current size: 88.4 MB, original size: 0.0 B)

To avoid spill, increase the cluster memory capacity by adding more nodes or increasing the memory per node, or reduce the configuration parameter fetchBufferSize.

Suspended S3 write tasks

Enable Spark speculation to terminate suspended tasks that would prevent stream processing from proceeding. To ensure that tasks are not terminated too aggressively, tune the quantile and multiplier for this setting carefully. Databricks recommends that you set spark.speculation.multiplier to 3 and spark.speculation.quantile to 0.95 and adjust as needed.

Reduce latency from checkpointing in stateful streams

Databricks recommends using RocksDB with changelog checkpointing for stateful streaming queries. See Enable changelog checkpointing.

Configure Kinesis enhanced fan-out (EFO) for streaming query reads

In Databricks Runtime 11.3 and above, the Databricks Runtime Kinesis connector provides support for using the Amazon Kinesis enhanced fan-out (EFO) feature.

Kinesis enhanced fan-out provides dedicated throughput of 2 MB/s per shard per consumer (maximum of 20 consumers per stream), and delivers records in push mode instead of pull mode.

By default, a Structured Streaming query configured with EFO mode registers itself as a consumer with dedicated throughput and a unique consumer name and consumer ARN (Amazon Resource Name) in Kinesis Data Streams.

By default, Databricks uses the streaming query ID with the databricks_ prefix to name the new consumer. You can optionally specify the consumerNamePrefix or consumerName options to override this behavior. The consumerName must be a string that contains only letters, numbers, and the special characters _ . -.

On query restart, the Kinesis source uses polling mode to replay the latest uncommitted batch if one exists. After the stream replays the uncommitted batch, the source switches back to EFO mode for subsequent reads.

important

A registered EFO consumer incurs additional charges on Amazon Kinesis. To deregister the consumer automatically on query teardown, set the requireConsumerDeregistration option to true. Databricks cannot guarantee de-registration on events such as driver crashes or node failures. In case of job failure, Databricks recommends managing registered consumers directly to prevent excess Kinesis charges.

Offline consumer management using a Databricks notebook

Use the AWSKinesisConsumerManager utility to programmatically register, list, or deregister consumers for Kinesis data streams, instead of manually configuring consumers in your AWS account console. For example, use the utility to create a consumer for a new stream, or, if you plan to permanently stop a stream, use the utility to delete the consumer in AWS.

The consumer manager utility is only available in Scala with compute set to dedicated access mode. See Access modes.

To use this utility in a Databricks notebook:

  1. In a new Databricks notebook attached to an active cluster, create an AWSKinesisConsumerManager with required authentication information.

    Scala
    import com.databricks.sql.kinesis.AWSKinesisConsumerManager

    val manager = AWSKinesisConsumerManager.newManager()
    .option("serviceCredential", serviceCredentialName)
    .option("region", kinesisRegion)
    .create()
  2. List and display consumers.

    Scala
    val consumers = manager.listConsumers("<stream name>")
    display(consumers)
  3. Register a consumer for given stream.

    Scala
    val consumerARN = manager.registerConsumer("<stream name>", "<consumer name>")
  4. Deregister a consumer for given stream.

    Scala
    manager.deregisterConsumer("<stream name>", "<consumer name>")