Stream from Apache Pulsar

Preview

This feature is in Public Preview.

In Databricks Runtime 14.1 and above, you can use Structured Streaming to stream data from Apache Pulsar on Databricks.

Structured Streaming provides exactly-once processing semantics for data read from Pulsar sources.

Syntax example

The following is a basic example of using Structured Streaming to read from Pulsar:

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .load()

You must always provide a service.url and one of the following options to specify topics:

  • topic

  • topics

  • topicsPattern

For a complete list of options, see Configure options for Pulsar streaming read.

Authenticate to Pulsar

Databricks supports truststore and keystore authentication to Pulsar. Databricks recommends using secrets when storing configuration details.

You can set the following options during stream configuration:

  • pulsar.client.authPluginClassName

  • pulsar.client.authParams

  • pulsar.client.useKeyStoreTls

  • pulsar.client.tlsTrustStoreType

  • pulsar.client.tlsTrustStorePath

  • pulsar.client.tlsTrustStorePassword

If the stream uses a PulsarAdmin, also set the following:

  • pulsar.admin.authPluginClassName

  • pulsar.admin.authParams

The following example demonstrates configuring authentication options:

val clientAuthParams = dbutils.secrets.get(scope = "pulsar", key = "clientAuthParams")
val clientPw = dbutils.secrets.get(scope = "pulsar", key = "clientPw")

// clientAuthParams is a comma-separated list of key-value pairs, such as:
//"keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .option("startingOffsets", startingOffsets)
  .option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
  .option("pulsar.client.authParams", clientAuthParams)
  .option("pulsar.client.useKeyStoreTls", "true")
  .option("pulsar.client.tlsTrustStoreType", "JKS")
  .option("pulsar.client.tlsTrustStorePath", trustStorePath)
  .option("pulsar.client.tlsTrustStorePassword", clientPw)
  .load()

Pulsar schema

The schema of records read from Pulsar depends on how topics have their schemas encoded.

  • For topics with Avro or JSON schema, field names and field types are preserved in the resulting Spark DataFrame.

  • For topics without schema or with a simple data type in Pulsar, the payload is loaded to a value column.

  • If the reader is configured to read multiple topics with different schemas, set allowDifferentTopicSchemas to load the raw content to a value column.

Pulsar records have the following metadata fields:

Column

Type

__key

binary

__topic

string

__messageId

binary

__publishTime

timestamp

__eventTime

timestamp

__messageProperties

map<String, String>

Configure options for Pulsar streaming read

All options are configured as part of a Structured Streaming read using .option("<optionName>", "<optionValue>") syntax. You can also configure authentication using options. See Authenticate to Pulsar.

The following table describes required configurations for Pulsar. You must specify only one of the options topic, topics or topicsPattern.

Option

Default value

Description

service.url

none

The Pulsar serviceUrl configuration for the Pulsar service.

topic

none

A topic name string for the topic to consume.

topics

none

A comma-separated list of the topics to consume.

topicsPattern

none

A Java regex string to match on topics to consume.

The following table describes other options supported for Pulsar:

Option

Default value

Description

predefinedSubscription

none

The predefined subscription name used by the connector to track spark application progress.

subscriptionPrefix

none

A prefix used by the connector to generate a random subscription to track spark application progress.

pollTimeoutMs

120000

The timeout for reading messages from Pulsar in milliseconds.

waitingForNonExistedTopic

false

Whether the connector should wait until the desired topics are created.

failOnDataLoss

true

Controls whether to fail a query when data is lost (for example, topics are deleted, or messages are deleted because of retention policy).

allowDifferentTopicSchemas

false

If multiple topics with different schemas are read, use this parameter to turn off automatic schema-based topic value deserialization. Only the raw values are returned when this is true.

startingOffsets

latest

If latest, the reader reads the newest records after it starts running. If earliest, the reader reads from the earliest offset. The user can also specify a JSON string that specifies a specific offset.

maxBytesPerTrigger

none

A soft limit of the maximum number of bytes we want to process per microbatch. If this is specified, admin.url also needs to be specified.

admin.url

none

The Pulsar serviceHttpUrl configuration. Only needed when maxBytesPerTrigger is specified.

You can also specify any Pulsar client, admin, and reader configurations using the following patterns:

Pattern

Link to conifiguration options

pulsar.client.*

Pulsar client configuration

pulsar.admin.*

Pulsar admin configuration

pulsar.reader.*

Pulsar reader configuration

Construct starting offsets JSON

You can manually construct a message ID to specify a specific offset and pass this as a JSON to the startingOffsets option. The following code example demonstrates this syntax:

import org.apache.spark.sql.pulsar.JsonUtils
import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.client.impl.MessageIdImpl

val topic = "my-topic"
val msgId: MessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex)
val startOffsets = JsonUtils.topicOffsets(Map(topic -> msgId))

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topic", topic)
  .option("startingOffsets", startOffsets)
  .load()