read_pubsub streaming table-valued function
Applies to:  Databricks SQL 
 Databricks Runtime 13.3 LTS and above
Returns a table with records read from Pub/Sub from a topic. Only supports streaming queries.
Streaming can only be used in Lakeflow Declarative Pipelines.
Syntax
read_pubsub( { parameter => value } [, ...])
Arguments
read_pubsub requires named parameter invocation.
The only required arguments are subscriptionId, projectId, and topicId. All other arguments are optional.
For full argument descriptions, see Configure options for Pub/Sub streaming read.
Databricks recommends using secrets when providing authorization options. See secret function.
For details on configuring access to Pub/Sub, see Configure access to Pub/Sub.
Parameter  | Type  | Description  | 
|---|---|---|
  | 
  | Required, the unique identifier assigned to a Pub/Sub subscription.  | 
  | 
  | Required, the Google Cloud project ID associated with the Pub/Sub topic.  | 
  | 
  | Required, the ID or name of the Pub/Sub topic to subscribe to.  | 
  | 
  | The name of your Databricks service credential.  | 
  | 
  | The email address associated with a service account for authentication.  | 
  | 
  | The client ID associated with the service account for authentication.  | 
  | 
  | The ID of the private key associated with the service account.  | 
  | 
  | The private key associated with the service account for authentication.  | 
These arguments are used for further fine-tuning when reading from Pub/Sub:
Parameter  | Type  | Description  | 
|---|---|---|
  | 
  | Optional with default number of executors. The number of parallel Spark tasks that fetch records from a subscription.  | 
  | 
  | Optional with default   | 
  | 
  | A soft limit for the batch size to be processed during each triggered micro-batch. The default is 'none'.  | 
  | 
  | The number of records to fetch per task before processing records. The default is '1000'.  | 
  | 
  | The time duration for each task to fetch before processing records. The default is '10s'.  | 
Returns
A table of Pub/Sub records with the following schema. The attributes column could be null but all other columns are not null.
Name  | Data type  | Nullable  | Standard  | Description  | 
|---|---|---|---|---|
  | 
  | No  | Unique identifier for the Pub/Sub message.  | |
  | 
  | No  | The content of the Pub/Sub message.  | |
  | 
  | Yes  | Key-value pairs representing the attributes of the Pub/Sub message. This is a json-encoded string.  | |
  | 
  | No  | The timestamp when the message was published, in milliseconds.  | |
  | 
  | No  | The unique identifier of the record within its shard.  | 
Examples
-- Streaming Ingestion from Pubsub
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => 'app-events-1234',
                projectId => 'app-events-project',
                topicId => 'app-events-topic',
                clientEmail => secret('app-events', 'clientEmail'),
                clientId => secret('app-events', 'clientId'),
        privateKeyId => secret('app-events', 'privateKeyId'),
                privateKey => secret('app-events', 'privateKey')
);
-- A streaming query when a service account is associated with the cluster
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => 'app-events-1234',
                projectId => 'app-events-project',
                topicId => 'app-events-topic'
);
The data would now need to be queried from the testing.streaming_table for further analysis.
Erroneous queries:
-- Missing topicId option
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => 'app-events-1234',
                projectId => 'app-events-project'
);
-- Limit is too high for an option, MAX_RECORDS_PER_FETCH_LIMIT
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => 'app-events-1234',
                projectId => 'app-events-project',
                topicId => 'app-events-topic',
                maxRecordsPerFetchLimit => '1000001'
);