The Kinesis connector for Structured Streaming is packaged in Databricks Runtime 3.0 and above and Spark 2.1.1-db5+.
This topic includes:
The schema of the records is:
Use DataFrame operations (
cast("string"), udfs) to explicitly deserialize the
Let’s start with a quick example: WordCount. The following notebook demonstrates how to run WordCount using Structured Streaming with Kinesis.
Due to rate limiting performed by Kinesis and limitations in the Kinesis API, the execute once trigger (
Trigger.Once()) is not supported with Kinesis.
Here are the most important configurations for specifying what data to read.
|streamName||A comma-separated list of stream names.||None (required param)||The stream names to subscribe to.|
|region||Region for the streams to be specified.||Locally resolved region||The region the streams are defined in.|
|initialPosition||latest, trim_horizon, earliest (alias for trim_horizon)||latest||Where to start reading from in the stream.|
In addition, there are configurations for controlling the throughput and latency of reading from Kinesis. The Kinesis source runs Spark jobs in a background thread to periodically prefetch Kinesis data and cache it in the memory of the Spark executors. The streaming query processes the cached data only after each prefetch step completes and makes the data available for processing. Hence, this prefetching step determines a lot of the observed end-to-end latency and throughput. You can control the performance using the following options.
|maxRecordsPerFetch||A positive integer.||10,000||How many records to be read per API request to Kinesis. Number of records returned may actually be higher depending on whether sub-records were aggregated into a single record using the Kinesis Producer Library.|
|maxFetchRate||A positive decimal representing data rate in MB/s.||1.0 (max = 2.0)||How fast to prefetch data per shard. This is to rate limit on fetches and avoid Kinesis throttling. 2.0 MB/s is the maximum rate that Kinesis allows.|
|minFetchPeriod||A duration string, for example,
||400ms (min = 200ms)||How long to wait between consecutive prefetch attempts. This is to limit frequency of fetches and avoid Kinesis throttling. 200ms is the minimum as Kinesis allows maximum 5 fetches/sec.|
|maxFetchDuration||A duration string, for example,
||10s||How long to buffer prefetched new data before making it available for processing.|
|fetchBufferSize||A byte string, for example,
||20gb||How much data to buffer for the next trigger. This is used as a stopping condition and not a strict upper bound, therefore more data may be buffered than what’s specified for this value.|
|shardsPerTask||A positive integer.||5||How many Kinesis shards to prefetch from in parallel per Spark task. Ideally
|shardFetchInterval||A duration string, for example,
||1s||How often to poll Kinesis for resharding.|
The the default values of the options have been chosen such that two readers (Spark or otherwise) can simultaneously consume a Kinesis stream without hitting Kinesis rate limits.
If you have more consumers, you have to adjust the options accordingly. For example, you may have to reduce
maxFetchRate, and increase
Here are a few suggested configuration for specific use cases.
- ETL from Kinesis to S3
- When you’re performing ETL into long term storage, you would prefer to have a small number of large files. In this case, you may want to
set a large stream trigger interval, for example, 5-10 minutes. In addition, you may want to increase your
maxFetchDurationso that you buffer large blocks that will be written out during processing, and increase
fetchBufferSizeso that you don’t stop fetching too early in between triggers, and start falling behind in your stream.
- Low latency monitoring and alerting
When you have an alerting use case, you would want lower latency. To achieve that:
- Ensure that there is only one consumer (that is, only your streaming query and no one else) of the Kinesis stream, so that we can optimize your only streaming query to fetch as fast as possible without running into Kinesis rate limits.
- Set the option
maxFetchDurationto a small value (say,
200ms) to start processing fetched data as fast as possible.
- Set the option
210msto fetch as frequently as possible.
- Set the option
shardsPerTaskor configure the cluster such that
# cores in cluster >= 2 * (# Kinesis shards) / shardsPerTask. This ensures that the background prefetching tasks and the streaming query tasks can execute concurrently.
If you see that your query is receiving data every 5 seconds, then it is likely that you are hitting Kinesis rate limits. Review your configurations.
For authentication with Kinesis, we use Amazon’s default credential provider chain by default.
We recommend launching your Databricks clusters with an IAM Role that can access Kinesis. If you want to use keys for access, you can provide them using the options
You can also assume an IAM Role using the
roleArn option. You can optionally specify the external id with
roleExternalId and a session name with
roleSessionName. In order to assume a role,
you can either launch your cluster with permissions to assume the role or provide access keys through
awsSecretKey. For cross-account authentication, we recommend using
roleArn to hold the assumed role, which can then be assumed through your Databricks AWS account. For more information about cross-account authentication, see Delegate Access Across AWS Accounts Using IAM Roles. The ability to assuming roles requires Databricks Runtime 3.5 and above.
The Kinesis Source requires
GetShardIterator permissions. If you hit
Amazon: Access Denied exceptions,
double-check that your user or profile has these permissions. See Controlling Access to Amazon Kinesis Data Streams Resources Using IAM for more details.
To write data back to Kinesis, the following code snippet can be used as a
ForeachSink to write data to Kinesis.
It requires a
The following code snippet provides at least once semantics, not exactly once.