read_kinesis streaming table-valued function

Applies to: check marked yes Databricks SQL check marked yes Databricks Runtime 13.3 LTS and above

Returns a table with records read from Kinesis from one or more streams.

Syntax

read_kinesis ( { parameter => value } [, ...] )

Arguments

read_kinesis requires named parameter invocation.

The only required argument is streamName. All other arguments are optional.

The descriptions of the arguments are brief here. For more details, see the Amazon Kinesis documentation.

There are various connection options to connect and authenticate with AWS. awsAccessKey, and awsSecretKey can either be specified in the function arguments using the secret function, manually set in the arguments, or configured as environment variables as indicated below. roleArn, roleExternalID, roleSessionName can also be used to authenticate with AWS by using instance profiles. If none of these are specified, it will use the default AWS provider chain.

For more information regarding authentication, visit Amazon Kinesis.

Parameter

Type

Description

streamName

STRING

Required, comma-separated list of one or more kinesis streams.

awsAccessKey

STRING

The AWS Access key, if any. Can also be specified through the various options supported through the AWS default credential provider chain including environment variables (AWS_ACCESS_KEY_ID) and a credential profiles file.

awsSecretKey

STRING

The secret key which corresponds to the access key. Can be specified either in the arguments or through the various options supported through the AWS default credential provider chain including environment variables (AWS_SECRET_KEY or AWS_SECRET_ACCESS_KEY) and a credentials profiles file.

roleArn

STRING

Amazon resource name of the role to assume when accessing Kinesis.

roleExternalId

STRING

Used when delegating access to the AWS account.

roleSessionName

STRING

AWS role session name.

stsEndpoint

STRING

An endpoint for requesting temporary access credentials.

region

STRING

Region for the streams to be specified. The default is the locally resolved region.

endpoint

STRING

regional endpoint for Kinesis data streams. The default is the locally resolved region.

initialPosition

STRING

Starting position for reading from in the stream. One of: ‘latest’ (default), ‘trim_horizon’, ‘earliest’, ‘at_timestamp’.

consumerMode

STRING

One of: ‘polling’ (default), or ‘EFO’ (enhanced-fan-out).

consumerName

STRING

The name of the consumer. All consumers are prefixed with ‘databricks_’. The default is an empty string.

registerConsumerTimeoutInterval

STRING

the max timeout to wait for the Kinesis EFO consumer to be registered with the Kinesis stream before throwing an error. The default is ‘300s’.

requireConsumerDeregistration

BOOLEAN

true to de-register the EFO consumer on query termination. Default is false.

deregisterConsumerTimeoutInterval

STRING

The max timeout to wait for the Kinesis EFO consumer to be deregistered with the Kinesis stream before throwing an error. The default is ‘300s’.

consumerRefreshInterval

STRING

The interval at which the consumer is checked and refreshed. The default is ‘300s’.

The following arguments are used for controlling the read throughput and latency for Kinesis:

Parameter

Type

Description

maxRecordsPerFetch

INTEGER (>0)

Optional, with a default of 10,000 records to be read per API request to Kinesis.

maxFetchRate

STRING

How fast to prefetch data per shard. A value between ‘1.0’ and ‘2.0’ that’s measured in MB/s. The default is ‘1.0’.

minFetchPeriod

STRING

The maximum wait time between consecutive prefetch attempts. The default is ‘400ms’.

maxFetchDuration

STRING

The maximum duration to buffer prefetched new data. The default is ’10s’.

fetchBufferSize

STRING

The amount of data for the next trigger. The default is ‘20gb’.

shardsPerTask

INTEGER (>0)

The number of Kinesis shards to prefetch from in parallel per spark task. The default is 5.

shardFetchinterval

STRING

How often to poll for resharding. The default is ‘1s’.

coalesceThresholdBlockSize

INTEGER (>0)

The threshold at which automatic coalesce occurs. The default is 10,000,000.

coalesce

BOOLEAN

true to coalesce prefetched requests. The default is true.

coalesceBinSize

INTEGER (>0)

The approximate block size after coalescing. The default is 128,000,000.

reuseKinesisClient

BOOLEAN

true to reuse the Kinesis client stored in the cache. The default is true except on a PE cluster.

clientRetries

INTEGER (>0)

The number of retries in the retry scenario. The default is 5.

Returns

A table of Kinesis records with the following schema:

Name

Data type

Nullable

Standard

Description

partitionKey

STRING

No

A key that is used to distribute data among the shards of a stream. All data records with the same partition key will be read from the same shard.

data

BINARY

No

The kinesis data payload, base-64 encoded.

stream

STRING

No

The name of the stream where the data was read from.

shardId

STRING

No

A unique identifier for the shard where the data was read from.

sequenceNumber

BIGINT

No

The unique identifier of the record within its shard.

approximateArrivalTimestamp

TIMESTAMP

No

The approximate time that the record was inserted into the stream.

The columns (stream, shardId, sequenceNumber) constitute a primary key.

Examples

-- Streaming Ingestion from Kinesis
> CREATE STREAMING TABLE testing.streaming_table AS
    SELECT * FROM STREAM read_kinesis (
        streamName => 'test_databricks',
        awsAccessKey => secret(test-databricks, awsAccessKey),
        awsSecretKey => secret(test-databricks, awsSecretKey),
        initialPosition => 'earliest');

-- The data would now need to be queried from the testing.streaming_table

-- A streaming query when the environment variables already contain AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY configured
> CREATE STREAMING TABLE testing.streaming_table AS
    SELECT * FROM STREAM read_kinesis (
        streamName => 'test_databricks',
        initialPosition => 'earliest');

-- A streaming query when the roleArn, roleSessionName, and roleExternalID are configured
> CREATE STREAMING TABLE testing.streaming_table AS
    SELECT * FROM STREAM read_kinesis (
        streamName => 'test_databricks',
        initialPosition => 'earliest',
        roleArn => 'arn:aws:iam::123456789012:role/MyRole',
        roleSessionName => 'testing@databricks.com');