Amazon Kinesis Support

Requirements

The Kinesis connector for Structured Streaming is pre-packaged for you in Databricks Runtime 3.0 and greater and Spark 2.1.1-db5+ versions on Databricks. It is important to note that this is is an integration for exclusive use by Databricks customers.

Schema

The schema of the records are:

Column Type
partitionKey string
data binary
stream string
shardId string
sequenceNumber string
approximateArrivalTimestamp timestamp

Use DataFrame operations (cast("string"), udfs) to explicitly deserialize the data column.

Configuring

Option Value Default Description
streamName A comma-separated list of stream names None (required param) The stream names to subscribe to
region Region for the streams to be specified Locally resolved region The region the streams are defined in
initialPosition latest, trim_horizon, earliest (alias for trim_horizon) latest Where to start reading from in the stream
maxRecordsPerFetch A positive integer 10,000 How many records to be read per API request to Kinesis [1].
maxFetchRate How fast to fetch data from Kinesis in mb/s per shard. 1.0 We will rate limit our fetching rate accordingly to avoid ProvisionedThroughputExceededExceptions.
maxFetchDuration A duration string, e.g. 2m for 2 minutes 10s How long to fetch new data for asynchronously per Spark task.
fetchBufferSize A byte string, e.g. 2gb or 10mb 20gb How much data to buffer for the next trigger. This is used as a stopping condition and not a strict upper bound, therefore more data may be buffered than what’s specified for this value.
shardsPerTask A positive integer 5 How many Kinesis shards to read from in parallel per Spark task.
[1]Number of records returned may actually be higher depending on whether sub-records were aggregated into a single record using the Kinesis Producer Library.

Depending on your use case, here is how you might go about configuring some of these parameters:

  1. ETL from Kinesis to S3: When you’re performing ETL into long term storage, you would prefer to have a small number of large files. In this case, you may want to set a large stream trigger interval, e.g. 5-10 minutes. In addition, you may want to increase your maxFetchDuration so that you buffer large blocks that will be written out during processing, and increase fetchBufferSize so that you don’t stop fetching too early in between triggers, and start falling behind in your stream.
  2. Monitoring and alerting: When you have an alerting use case, you would want lower latency. To achieve that, you may set maxFetchRate to a small value in order to make data available to your stream as fast as possible.

If you have multiple consumers reading from Kinesis, be sure to adjust maxFetchRate accordingly. As you decrease maxFetchRate, you may increase shardsPerTask to increase the utilization of your resources.

We recommend using a cluster with number of CPUs >= # of total Kinesis shards / shardsPerTask for the best performance.

Warning

The execute once trigger (Trigger.Once()) is not supported with Kinesis due to rate limiting performed by Kinesis, and limitations in the Kinesis API. If you have data coming in infrequent intervals, you may use:

sq = # your streaming query
sq.processAllAvailable()
sq.stop()

to process all available data and stop your stream to save on costs. If data is coming in continuously, your stream will run forever.

Authenticating with Amazon Kinesis

For authenticating with Kinesis, we use Amazon’s default credential provider chain by default. We recommend launching your Databricks clusters with an IAM Role that can access Kinesis. If you would like to use keys for access, you may provide them using the options awsAccessKey and awsSecretKey.

Note

The Kinesis Source requires DescribeStream, GetRecords, and GetShardIterator permissions. If you hit Amazon: Access Denied exceptions, please double check that your user or profile has these permissions. Please refer to AWS Documentation for more details.

Quickstart

Let’s start with a quick example: WordCount. The following notebook is all that it takes to run WordCount using Structured Streaming with Kinesis.

Writing To Kinesis

To write data back to Kinesis, the following code snippet can be used as a ForeachSink to write data to Kinesis. It requires a Dataset[(String, Array[Byte])].

Note

The following code snippet provides at least once semantics, not exactly once.