Subscribe to Google Pub/Sub
Databricks provides a built-in connector to subscribe to Google Pub/Sub in Databricks Runtime 13.3 LTS and above. This connector provides exactly-once processing semantics for records from the subscriber.
Pub/Sub might publish duplicate records, and records might arrive to the subscriber out of order. You should write Databricks code to handle duplicate and out-of-order records.
Syntax example
The following syntax example demonstrates configuring a Structured Streaming read from Pub/Sub using a service credential. For all authentication options, see Configure access to Pub/Sub.
val query = spark.readStream
.format("pubsub")
// we will create a Pubsub subscription if none exists with this id
.option("subscriptionId", "mysub") // required
.option("topicId", "mytopic") // required
.option("projectId", "myproject") // required
.option("serviceCredential", "service-credential-name") // required
.load()
For more configuration options, see Configure options for Pub/Sub streaming read.
Configure access to Pub/Sub
The following table describes the roles required for the configured credentials:
Roles | Required or optional | How it is used |
---|---|---|
| Required | Check if subscription exists and get subscription |
| Required | Fetch data from a subscription |
| Optional | Enables creation of a subscription if one doesn’t exist and also enables use of the |
Databricks recommends configuring a service credential for Pub/Sub reads. Service credentials for Pub/Sub require Databricks Runtime 16.2 and above. See Manage access to external cloud services using service credentials.
If Databricks service credentials are not available, you can use a Google Service Account (GSA) directly.
If you configure compute to use a GSA, permissions for the GSA are available for all queries running on that cluster. See Google service account.
You cannot attach a GSA to compute configured with standard access mode.
You can configure the following options to pass the GSA directly to the stream:
clientEmail
clientId
privateKey
privateKeyId
Pub/Sub schema
The schema for the stream matches the records that are fetched from Pub/Sub, as described in the following table:
Field | Type |
---|---|
|
|
|
|
|
|
|
|
Configure options for Pub/Sub streaming read
The following table describes the options supported for Pub/Sub. All options are configured as part of a Structured Streaming read using .option("<optionName>", "<optionValue>")
syntax.
Some Pub/Sub configuration options use the concept of fetches instead of micro-batches. This reflects internal implementation details, and options work similarly to corollaries in other Structured Streaming connectors, except that records are fetched and then processed.
Option | Default value | Description |
---|---|---|
| Set to one half of the number of executors present at stream initialization. | The number of parallel Spark tasks that fetch records from a subscription. |
|
| If |
| none | A soft limit for the batch size to be processed during each triggered micro-batch. |
| 1000 | The number of records to fetch per task before processing records. |
| 10 seconds | The time duration for each task to fetch before processing records. Databricks recommends using the default value. |
Incremental batch processing semantics for Pub/Sub
You can use Trigger.AvailableNow
to consume available records from the Pub/Sub sources an an incremental batch.
Databricks records the timestamp when you begin a read with the Trigger.AvailableNow
setting. Records processed by the batch include all previously fetched data and any newly published records with a timestamp less than the recorded stream start timestamp.
See Configuring incremental batch processing.
Monitoring streaming metrics
Structured Streaming progress metrics report the number of records fetched and ready to process, the size of the records fetched and ready to process, and the number of duplicates seen since stream start. The following is an example of these metrics:
"metrics" : {
"numDuplicatesSinceStreamStart" : "1",
"numRecordsReadyToProcess" : "1",
"sizeOfRecordsReadyToProcess" : "8"
}
Limitations
Speculative execution (spark.speculation
) is not supported with Pub/Sub.