Kinesis Structured Streaming source with enhanced fan-out (EFO) support

In Databricks Runtime 11.3 and above, the Databricks Runtime Kinesis connector provides support for using the Amazon Kinesis enhanced fan-out (EFO) feature.

Kinesis enhanced fan-out is a feature that provides support for enhanced fan-out stream consumers with a dedicated throughput of 2MB/s per shard, per consumer (maximum of 20 consumers per Kinesis stream), and records delivery in push mode instead of pull mode.

Note

Kinesis start positions configurations and behavior with enhanced fan-out is the same as polling mode Configuration. Refer to Best practices: Structured Streaming with Kinesis for best practices.

Configure enhanced fan-out streaming query options

The following configurations are available when running a streaming query using the Kinesis source (only streamName is required):

Option

Value

Default

Description

streamName

A comma-separated list of stream names.

None (required param)

The stream names to subscribe to.

region

Region for the streams to be specified.

Locally resolved region

The region the streams are defined in.

initialPosition

latest, trim_horizon, earliest (alias for trim_horizon), at_timestamp

latest

Where to start reading from in the stream.

consumerMode

polling or efo

polling

Consumer type to run the streaming query with.

requireConsumerDeregistration

true or false

false

Whether to de-register EFO consumer on query termination.

Perform consumer registration with Kinesis Data Streams

If a Structured Streaming query is running in EFO mode, then it acts as a consumer with dedicated throughput and registers itself with Kinesis Data Streams. In order to register with Kinesis Data Streams, the query needs to provide a unique consumer name so that it can use the generated consumer ARN (Amazon Resource Number) for future operations. You can either provide an explicit consumer name or reuse the streaming query id as the consumer name. All consumers registered by the Databricks source have the “databricks_” prefix. Structured Streaming queries that reference consumers that have previously been registered use the consumerARN returned by describeStreamConsumer.

The consumerName field allows you to provide a unique name for your streaming query. If you choose not to provide a name, the streaming query ID is used. The consumerName must be a string comprising letters, numbers and special characters such as _ (underscore), . (dot) and - (hyphen).

Deregister the consumer with Kinesis Data Streams

You can optionally deregister the consumer on query termination. A registered EFO consumer incurs additional charges on Amazon Kinesis. To deregister the consumer automatically on query teardown, set the requireConsumerDeregistration option to true. Databricks cannot guarantee de-registration on events such as driver crashes or node failures. In case of job failure, Databricks recommends managing registered consumers directly to prevent excess Kinesis charges.

Offline consumer management using a Databricks notebook

Databricks provides a consumer management utility to register, list or deregister consumers associated with Kinesis data streams. The following code demonstrates using this utility in a Databricks notebook:

  1. In a new Databricks notebook attached to an active cluster, create a AWSKinesisConsumerManager by providing the necessary authentication information.

    import com.databricks.sql.kinesis.AWSKinesisConsumerManager
    
    val manager = AWSKinesisConsumerManager.newManager()
    .option("awsAccessKey", awsAccessKeyId)
    .option("awsSecretKey", awsSecretKey)
    .option("region", kinesisRegion)
    .create()
    
  2. List and display consumers.

    val consumers = manager.listConsumers("<stream name>")
    display(consumers)
    
  3. Register consumer for given stream.

    val consumerARN = manager.registerConsumer("<stream name>", "<consumer name>")
    
  4. Deregister consumer for given stream.

    manager.deregisterConsumer("<stream name>", "<consumer name>")