Amazon Kinesis

The Kinesis connector for Structured Streaming is included in Databricks Runtime.

Authenticate with Amazon Kinesis

For authentication with Kinesis, we use Amazon’s default credential provider chain by default. We recommend launching your Databricks clusters with an instance profile that can access Kinesis. If you want to use keys for access, you can provide them using the options awsAccessKey and awsSecretKey.

You can also assume an IAM role using the roleArn option. You can optionally specify the external ID with roleExternalId and a session name with roleSessionName. In order to assume a role, you can either launch your cluster with permissions to assume the role or provide access keys through awsAccessKey and awsSecretKey. For cross-account authentication, we recommend using roleArn to hold the assumed role, which can then be assumed through your Databricks AWS account. For more information about cross-account authentication, see Delegate Access Across AWS Accounts Using IAM Roles.

Note

The Kinesis source requires ListShards, GetRecords, and GetShardIterator permissions. If you encounter Amazon: Access Denied exceptions, check that your user or profile has these permissions. See Controlling Access to Amazon Kinesis Data Streams Resources Using IAM for more details.

Schema

The schema of the records is:

Column

Type

partitionKey

string

data

binary

stream

string

shardId

string

sequenceNumber

string

approximateArrivalTimestamp

timestamp

Use DataFrame operations (cast("string"), udfs) to explicitly deserialize the data column.

Quickstart

Let’s start with a quick example: WordCount. The following notebook demonstrates how to run WordCount using Structured Streaming with Kinesis.

Kinesis WordCount with Structured Streaming notebook

Open notebook in new tab

Configuration

Important

In Databricks Runtime 13.1 and above, you can use Trigger.AvailableNow with Kinesis. See Ingest Kinesis records as an incremental batch.

Here are the most important configurations for specifying what data to read.

Option

Value

Default

Description

streamName

A comma-separated list of stream names.

None (required param)

The stream names to subscribe to.

region

Region for the streams to be specified.

Locally resolved region

The region the streams are defined in.

endpoint

Region for the Kinesis data stream.

Locally resolved region

The regional endpoint for Kinesis Data Streams.

initialPosition

latest, trim_horizon, earliest (alias for trim_horizon), at_timestamp.

latest

Where to start reading from in the stream.

For production considerations, review Key technical considerations and best practices.

Start reading at a point in time

Note

This feature is available on Databricks Runtime 7.3 LTS and above.

To start reading at a point in time, you can use an at_timestamp value for the initialPosition option. You specify the value as a JSON string, such as {"at_timestamp": "06/25/2020 10:23:45 PDT"}. The streaming query will read all changes at or after the given timestamp (inclusive). This uses the Java default format for parsing timestamps. You can explicitly specify formats by providing an additional field in the JSON string, for example:

(spark.readStream
  .format("kinesis")
  .option("streamName", kinesisStreamName)
  .option("region", kinesisRegion)
  .option("initialPosition", '{"at_timestamp": "06/25/2020 10:23:45 PDT", "format": "MM/dd/yyyy HH:mm:ss ZZZ"}')
  .option("awsAccessKey", awsAccessKeyId)
  .option("awsSecretKey", awsSecretKey)
  .load()
)

In addition, there are configurations for controlling the throughput and latency of reading from Kinesis. The Kinesis source runs Spark jobs in a background thread to periodically prefetch Kinesis data and cache it in the memory of the Spark executors. The streaming query processes the cached data only after each prefetch step completes and makes the data available for processing. Hence, this prefetching step determines a lot of the observed end-to-end latency and throughput. You can control the performance using the following options.

Option

Value

Default

Description

maxRecordsPerFetch

A positive integer.

10,000

How many records to be read per API request to Kinesis. Number of records returned may actually be higher depending on whether sub-records were aggregated into a single record using the Kinesis Producer Library.

maxFetchRate

A positive decimal representing data rate in MB/s.

1.0 (max = 2.0)

How fast to prefetch data per shard. This is to rate limit on fetches and avoid Kinesis throttling. 2.0 MB/s is the maximum rate that Kinesis allows.

minFetchPeriod

A duration string, for example, 1s for 1 second.

400ms (min = 200ms)

How long to wait between consecutive prefetch attempts. This is to limit frequency of fetches and avoid Kinesis throttling. 200ms is the minimum as Kinesis allows a maximum of 5 fetches/sec.

maxFetchDuration

A duration string, for example, 1m for 1 minute.

10s

How long to buffer prefetched new data before making it available for processing.

fetchBufferSize

A byte string, for example, 2gb or 10mb.

20gb

How much data to buffer for the next trigger. This is used as a stopping condition and not a strict upper bound,therefore more data may be buffered than what’s specified for this value.

shardsPerTask

A positive integer.

5

How many Kinesis shards to prefetch from in parallel per Spark task. Ideally # cores in cluster >= # Kinesis shards / shardsPerTaskfor min query latency & max resource usage.

shardFetchInterval

A duration string, for example, 2m for 2 minutes.

1s

How often to poll Kinesis for resharding.

awsAccessKey

String

No default.

AWS access key.

awsSecretKey

String

No default.

AWS secret access key corresponding to the access key.

roleArn

String

No default.

The Amazon Resource Name (ARN) of the role to assume when accessing Kinesis.

roleExternalId

String

No default.

An optional value that can be used when delegating access to the AWS account. See How to Use an External ID.

roleSessionName

String

No default.

An identifier for the assumed role session that uniquely identifies a session when the same role is assumed by different principals or for different reasons.

coalesceThresholdBlockSize

A positive integer.

10,000,000

The threshold at which the automatic coalesce occurs. If the average block size is less than this value, pre-fetched blocks are coalesced toward the coalesceBinSize.

coalesceBinSize

A positive integer.

128,000,000

The approximate block size after coalescing.

Note

The default values of the options have been chosen such that two readers (Spark or otherwise) can simultaneously consume a Kinesis stream without hitting Kinesis rate limits. If you have more consumers, you have to adjust the options accordingly. For example, you may have to reduce maxFetchRate, and increase minFetchPeriod.

Here are a few suggested configurations for specific use cases.

ETL from Kinesis to S3

When you’re performing ETL into long term storage, you would prefer to have a small number of large files. In this case, you may want to set a large stream trigger interval, for example, 5-10 minutes. In addition, you may want to increase your maxFetchDuration so that you buffer large blocks that will be written out during processing, and increase fetchBufferSize so that you don’t stop fetching too early in between triggers, and start falling behind in your stream.

Low latency monitoring and alerting

When you have an alerting use case, you would want lower latency. To achieve that:

  • Ensure that there is only one consumer (that is, only your streaming query and no one else) of the Kinesis stream, so that we can optimize your only streaming query to fetch as fast as possible without running into Kinesis rate limits.

  • Set the option maxFetchDuration to a small value (say, 200ms) to start processing fetched data as fast as possible.

  • Set the option minFetchPeriod to 210ms to fetch as frequently as possible.

  • Set the option shardsPerTask or configure the cluster such that # cores in cluster >= 2 * (# Kinesis shards) / shardsPerTask. This ensures that the background prefetching tasks and the streaming query tasks can execute concurrently.

If you see that your query is receiving data every 5 seconds, then it is likely that you are hitting Kinesis rate limits. Review your configurations.

Warning

If you delete and recreate a Kinesis stream, you cannot reuse any existing checkpoint directories to restart a streaming query. You must delete the checkpoint directories and start those queries from scratch.

Metrics

Note

Available in Databricks Runtime 8.1 and above.

Kinesis reports the number of milliseconds a consumer has fallen behind the beginning of a stream for each workspace. You can get the average, minimum, and maximum of the number of milliseconds among all the workspaces in the streaming query process(https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#reading-metrics-interactively) as the avgMsBehindLatest, maxMsBehindLatest, and minMsBehindLatest metrics. If you are running the stream in a notebook, you can see these metrics under the Raw Data tab in the streaming query progress dashboard:

{
  "sources" : [ {
    "description" : "KinesisV2[stream]",
    "metrics" : {
      "avgMsBehindLatest" : "32000.0",
      "maxMsBehindLatest" : "32000",
      "minMsBehindLatest" : "32000"
    },
  } ]
}

Ingest Kinesis records as an incremental batch

In Databricks Runtime 13.1 and above, Databricks supports using Trigger.AvailableNow with Kinesis data sources for incremental batch semantics. The following describes the basic configuration:

  1. When a micro-batch read triggers in available now mode, the current time is recorded by the Databricks client.

  2. Databricks polls the source system for all records with timestamps between this recorded time and the previous checkpoint.

  3. Databricks loads these records using Trigger.AvailableNow semantics.

Note

Databricks makes a best attempt to consume all records in message queue sources that exist when a read is read. Because of small potential differences in timestamps and a lack of guarantee in ordering in data sources, some records might not be included in a triggered batch. Omitted records are processed as part of the next triggered micro-batch.

See Configuring incremental batch processing.

Write to Kinesis

The following code snippet can be used as a ForeachSink to write data to Kinesis. It requires a Dataset[(String, Array[Byte])].

Note

The following code snippet provides at least once semantics, not exactly once.

Kinesis Foreach Sink notebook

Open notebook in new tab