The Apache Kafka connectors for Structured Streaming are packaged in Databricks Runtime.
You use the
kafka connector to connect to Kafka 0.10+ and the
kafka08 connector to connect to Kafka 0.8+ (deprecated).
The schema of the records is:
key and the
value are always deserialized as byte arrays with the
Use DataFrame operations (
cast("string"), udfs) to explicitly deserialize the keys and values.
Let’s start with a the canonical WordCount example. The following notebook demonstrates how to run WordCount using Structured Streaming with Kafka.
This notebook example uses Kafka 0.10. To use Kafka 0.8, change the format to
kafka08 (that is,
Refer to the Spark Structured Streaming + Kafka Integration Guide for the comprehensive list of configurations. To get you started, here is a subset of configurations.
As Structured Streaming is still under development, this list may not be up to date.
There are multiple ways of specifying which topics to subscribe to. You should provide only one of these parameters:
|Option||Value||Supported Kafka Versions||Description|
|subscribe||A comma-separated list of topics.||0.8, 0.10||The topic list to subscribe to.|
|subscribePattern||Java regex string.||0.10||The pattern used to subscribe to topic(s).|
||0.8, 0.10||Specific topicPartitions to consume.|
Other notable configurations:
|Option||Value||Default Value||Supported Kafka Versions||Description|
|kafka.bootstrap.servers||Comma-separated list of host:port.||empty||0.8, 0.10||[Required] The Kafka
||0.10||[Optional] Whether to fail the query when it’s possible that data was lost. Queries can permanently fail
to read data from Kafka due to many scenarios such as deleted topics, topic truncation before processing,
and so on. We try to estimate conservatively whether data was possibly lost or not. Sometimes this can
cause false alarms. Set this option to
|minPartitions||Integer >= 0, 0 = disabled.||0 (disabled)||0.10||[Optional] Minimum number of partitions to read from Kafka. With Spark 2.1.0-db2 and above, you can
configure Spark to use an arbitrary minimum of partitions to read from Kafka using the
|kafka.group.id||A Kafka consumer group ID.||not set||0.10||
[Optional] Group ID to use while reading from Kafka. Supported in Spark 2.2+. Use this with caution. By default, each query generates a unique group ID for reading data. This ensures that each query has its own consumer group that does not face interference from any other consumer, and therefore can read all of the partitions of its subscribed topics. In some scenarios (for example, Kafka group-based authorization), you may want to use specific authorized group IDs to read data. You can optionally set the group ID. However, do this with extreme caution as it can cause unexpected behavior.
See Structured Streaming Kafka Integration Guide for other optional configurations.
You should not set the following Kafka parameters for the Kafka 0.10 connector as it will throw an exception:
group.id: Setting this parameter is not allowed for Spark versions below 2.2.
auto.offset.reset: Instead, set the source option
startingOffsetsto specify where to start. To maintain consistency, Structured Streaming (as opposed to the Kafka Consumer) manages the consumption of offsets internally. This ensures that you don’t miss any data after dynamically subscribing to new topics/partitions.
startingOffsetsapplies only when you start a new Streaming query, and that resuming from a checkpoint always picks up from where the query left off.
key.deserializer: Keys are always deserialized as byte arrays with
ByteArrayDeserializer. Use DataFrame operations to explicitly deserialize the keys.
value.deserializer: Values are always deserialized as byte arrays with
ByteArrayDeserializer. Use DataFrame operations to explicitly deserialize the values.
enable.auto.commit: Setting this parameter is not allowed. Spark keeps track of Kafka offsets internally and doesn’t commit any offset.
interceptor.classes: Kafka source always read keys and values as byte arrays. It’s not safe to use
ConsumerInterceptoras it may break the query.
To enable SSL connections to Kafka, follow the instructions in the Confluent documentation Encryption and Authentication with SSL. You can provide the configurations described there, prefixed with
kafka., as options. For example, you specify the trust store location in the property
We recommend that you:
- Store your certificates in S3 and access them through a DBFS mount point. Combined with cluster and job ACLs, you can restrict access to the certificates only to clusters that can access Kafka.
- Store your certificate passwords as secrets in a secret scope.
Once paths are mounted and secrets stored, you can do the following:
df = spark.readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", ...) \ .option("kafka.ssl.truststore.location", <dbfs-truststore-location>) \ .option("kafka.ssl.keystore.location", <dbfs-keystore-location>) \ .option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>)) \ .option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))