Subscribe to Google Pub/Sub
Use the built-in connector to subscribe to Google Pub/Sub. This connector provides exactly-once processing semantics for records from the subscriber.
Pub/Sub might publish duplicate records, or records might arrive to the subscriber out of order. Write code to handle duplicate and out-of-order records.
Configure a Pub/Sub stream
The following code example demonstrates the basic syntax for configuring a Structured Streaming read from Pub/Sub.
- Python
- SQL
- Scala
auth_options = {
"clientId": client_id,
"clientEmail": client_email,
"privateKey": private_key,
"privateKeyId": private_key_id
}
query = (spark.readStream
.format("pubsub")
.option("subscriptionId", "mysub")
.option("topicId", "mytopic")
.option("projectId", "myproject")
.options(auth_options)
.load()
)
CREATE OR REFRESH STREAMING TABLE pubsub_raw
AS SELECT * FROM STREAM read_pubsub(
subscriptionId => 'mysub',
projectId => 'myproject',
topicId => 'mytopic',
clientEmail => secret('pubsub-scope', 'clientEmail'),
clientId => secret('pubsub-scope', 'clientId'),
privateKeyId => secret('pubsub-scope', 'privateKeyId'),
privateKey => secret('pubsub-scope', 'privateKey')
);
val authOptions: Map[String, String] =
Map("clientId" -> clientId,
"clientEmail" -> clientEmail,
"privateKey" -> privateKey,
"privateKeyId" -> privateKeyId)
val query = spark.readStream
.format("pubsub")
// Creates a Pub/Sub subscription if one does not already exist with this ID
.option("subscriptionId", "mysub")
.option("topicId", "mytopic")
.option("projectId", "myproject")
.options(authOptions)
.load()
For more configuration options, see Configure options for Pub/Sub streaming read.
Configure access to Pub/Sub
The credentials you configure must have the following roles.
Roles | Required or optional | How role is used |
|---|---|---|
| Required | Checks if subscription exists and gets subscription. |
| Required | Fetches data from a subscription. |
| Optional | Enables creation of a subscription if one doesn't exist and enables use of the |
Databricks recommends using secrets when providing authorization options. The following options are required to authorize a connection:
clientEmailclientIdprivateKeyprivateKeyId
Understand the Pub/Sub schema
The schema for the stream matches the records that are fetched from Pub/Sub, as described in the following table.
Field | Type |
|---|---|
|
|
|
|
|
|
|
|
Configure options for Pub/Sub streaming read
The following table describes the options supported for Pub/Sub. All options are configured as part of a Structured Streaming read using .option("<optionName>", "<optionValue>") syntax.
Some Pub/Sub configuration options use the concept of fetches instead of micro-batches. This reflects internal implementation details, and options work similarly to corollaries in other Structured Streaming connectors, except that records are fetched and then processed.
Option | Default value | Description |
|---|---|---|
| Set to one half of the number of executors present at stream initialization. | The number of parallel Spark tasks that fetch records from a subscription. |
|
| If |
|
| A soft limit for the batch size to be processed during each triggered micro-batch. |
|
| The number of records to fetch per task before processing records. |
|
| The time duration for each task to fetch before processing records. Accepts a duration string, for example, |
Use incremental batch processing with Pub/Sub
You can use Trigger.AvailableNow to consume available records from the Pub/Sub sources as an incremental batch.
Databricks records the timestamp when you begin a read with the Trigger.AvailableNow setting. Records processed by the batch include all previously fetched data and any newly published records with a timestamp less than the recorded stream start timestamp. For more information, see AvailableNow: Incremental batch processing.
Monitor Pub/Sub streaming metrics
Structured Streaming progress metrics report the number of records fetched and ready to process, the size of the records fetched and ready to process, and the number of duplicates seen since stream start. The following is an example of these metrics:
"metrics" : {
"numDuplicatesSinceStreamStart" : "1",
"numRecordsReadyToProcess" : "1",
"sizeOfRecordsReadyToProcess" : "8"
}
Limitations
Pub/Sub does not support speculative execution (spark.speculation).