read_kafka
table-valued function
Applies to: Databricks SQL
Databricks Runtime 13.1 and later
Reads data from an Apache Kafka cluster and returns the data in tabular form.
Can read data from one or more Kafka topics. It supports both batch queries and streaming ingestion.
Arguments
This function requires named parameter invocation.
option_key
: The name of the option to configure. You must use backticks (`) for options that contain dots (.
).option_value
: A constant expression to set the option. Accepts literals and scalar functions.
Returns
Records read from an Apache Kafka cluster with the following schema:
key BINARY
: The key of the Kafka record.value BINARY NOT NULL
: The value of the Kafka record.topic STRING NOT NULL
: The name of the Kafka topic the record is read from.partition INT NOT NULL
: The ID of the Kafka partition the record is read from.offset BIGINT NOT NULL
: The offset number of the record in the KafkaTopicPartition
.timestamp TIMESTAMP NOT NULL
: A timestamp value for the record. ThetimestampType
column defines what this timestamp corresponds to.timestampType INTEGER NOT NULL
: The type of the timestamp specified in thetimestamp
column.headers ARRAY<STRUCT<key: STRING, VALUE: BINARY>>
: Header values provided as part of the record (if enabled).
Examples
-- A batch query to read from a topic.
> SELECT value::string as value
FROM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'events'
) LIMIT 10;
-- A more advanced query with security credentials for Kafka.
> SELECT * FROM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'events',
startingOffsets => 'earliest',
`kafka.security.protocol` => 'SASL_SSL',
`kafka.sasl.mechanism` => 'PLAIN',
`kafka.sasl.jaas.config` => 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{USER_NAME}" password="{PASSWORD}";',
);
-- Streaming ingestion from Kafka with JSON parsing.
> CREATE OR REFRESH STREAMING TABLE catalog.schema.raw_events AS
SELECT
value::string:events, -- extract the field `events`
to_timestamp(value::string:ts) as ts -- extract the field `ts` and cast to timestamp
FROM STREAM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'events'
);
Options
You can find a detailed list of options in the Apache Spark documentation.
Required options
Provide the option below for connecting to your Kafka cluster.
Option |
---|
bootstrapServers Type: A comma-separated list of host/port pairs pointing to Kafka cluster. Default value: None |
Provide only one of the options below to configure which Kafka topics to pull data from.
Option |
---|
assign Type: A JSON string that contains the specific topic-partitions to consume from.
For example, for Default value: None |
subscribe Type: A comma separated list of Kafka topics to read from. Default value: None |
subscribePattern Type: A regular expression matching topics to subscribe to. Default value: None |
Miscellaneous options
read_kafka
can be used in batch queries as well as streaming queries. The options below specify which type of query they apply to.
Option |
---|
endingOffsets Type: The offsets to read until for a batch query, either Default value: |
endingOffsetsByTimestamp Type: A JSON string specifying an ending timestamp to read until for each
TopicPartition. The timestamps need to be provided as a long value of the
timestamp in milliseconds since Default value: None |
endingTimestamp Type: A string value of the timestamp in milliseconds since
Default value: None |
includeHeaders Type: Whether to include the Kafka headers in the row. Default value: |
Type: Any Kafka consumer specific options can be passed in with the Note: You should not set the following options with this function:
Default value: None |
maxOffsetsPerTrigger Type: Rate limit on the maximum number of offsets or rows processed per trigger interval. The specified total number of offsets will be proportionally split across TopicPartitions. Default value: None |
startingOffsets Type: The start point when a query is started, either Note: For batch queries, latest (either implicitly or by using -1 in JSON) is not allowed. For streaming queries, this only applies when a new query is started. Restarted streaming queries will continue from the offsets defined in the query checkpoint. Newly discovered partitions during a query will start at earliest. Default value: |
startingOffsetsByTimestamp Type: A JSON string specifying a starting timestamp for each TopicPartition.
The timestamps need to be provided as a long value of the timestamp in
milliseconds since Note: For streaming queries, this only applies when a new query is started. Restarted streaming queries will continue from the offsets defined in the query checkpoint. Newly discovered partitions during a query will start at earliest. Default value: None |
startingOffsetsByTimestampStrategy Type: This strategy is used when the specified starting offset by timestamp (either global or per partition) doesn’t match with the offset Kafka returned. The available strategies are:
Default value: |
startingTimestamp Type: A string value of the timestamp in milliseconds since
Note: For streaming queries, this only applies when a new query is started. Restarted streaming queries will continue from the offsets defined in the query checkpoint. Newly discovered partitions during a query will start at earliest. Default value: None |
Note
The returned offset for each partition is the earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition. The behavior varies across options if Kafka doesn’t return the matched offset - check the description of each option.
Spark simply passes the timestamp information to KafkaConsumer.offsetsForTimes
, and doesn’t interpret or reason about the value. For more details on KafkaConsumer.offsetsForTimes
, please refer to the documentation). Also, the meaning of timestamp here can vary according to the Kafka configuration (log.message.timestamp.type
). For details, see Apache Kafka documentation.