Connect to Lakebase
This feature is in Public Preview.
Use Structured Streaming to write to Lakebase with built-in batching, automatic retries, and workspace-managed authentication.
When to use the Lakebase sink
Use the Lakebase sink for low-latency streaming writes to Lakebase. This sink doesn't require you to implement custom foreachBatch functions to handle batching, connection management, and error handling.
Common use cases include:
- Update application databases in real-time for operational dashboards or customer-facing features.
- Sync continuously changing data, such as aggregated or filtered streaming results, into a transactional database.
- Write the output of a Structured Streaming query into a Lakebase table with sub-second latency using real-time mode.
To sync data from Lakebase to Delta Lake tables in the Lakehouse, the reverse direction, see Lakebase Change Data Feed.
Requirements
- Databricks Runtime 18 and above
- Classic compute with dedicated or standard access modes.
- A Lakebase database
Connect to a database
The Lakebase sink supports the following connection methods:
Lakebase tables registered with Unity Catalog
For Lakebase tables registered with Unity Catalog, the connector automatically manages the credentials and uses the identity of the user or Databricks service principal running the query. If the table doesn't exist, the connector creates the table.
To register a Lakebase database with Unity Catalog, see Register a Lakebase database in Unity Catalog.
To write to a Lakebase table, use the .toTable() method with a fully qualified table name, catalog.schema.table. The following example shows the required options, plus the optional upsertkey option:
- Python
- Scala
(df.writeStream
.format("postgresql")
.outputMode("update")
.option("upsertkey", "<primary-key-column>") # Optional. Inferred from the table's primary key if omitted.
.option("checkpointLocation", "/Volumes/<catalog>/<schema>/<volume>/<checkpoint-name>")
.toTable("<catalog>.<schema>.<table>")
)
df.writeStream
.format("postgresql")
.outputMode("update")
.option("upsertkey", "<primary-key-column>") // Optional. Inferred from the table's primary key if omitted.
.option("checkpointLocation", "/Volumes/<catalog>/<schema>/<volume>/<checkpoint-name>")
.toTable("<catalog>.<schema>.<table>")
Replace the following placeholders:
<catalog>.<schema>.<table>: The fully qualified name of the target table. Thecatalogis the Unity Catalog catalog you created when you registered the Lakebase database, see Register a Lakebase database in Unity Catalog. If the table doesn't exist, the connector creates it.<primary-key-column>: Optional. A comma-separated list of the columns that form the upsert key, for exampleidoruser_id,event_type. If you omitupsertkey, the sink infers the key from the target table's primary key. See Upsert behavior./Volumes/<catalog>/<schema>/<volume>/<checkpoint-name>: A Unity Catalog volume path where the query stores its checkpoint. You can also use a cloud object storage URI. The location must be storage that you can write to, not local disk, and must be unique to each streaming query. This is independent of the target table. See Structured Streaming checkpoints.
For optional configurations, such as batchsize and batchinterval, see Configuration options.
Lakebase tables not registered with Unity Catalog
For Lakebase tables not registered with Unity Catalog, the connector automatically manages the credentials and uses the identity of the user or Databricks service principal running the query.
To write to a Lakebase table, use the endpoint and dbtable options. The following example also includes the optional database and upsertkey options:
- Python
- Scala
(df.writeStream
.format("postgresql")
.outputMode("update")
.option("endpoint", "<project-id>.<branch-id>.<endpoint-id>")
.option("database", "<database>") # Optional. Defaults to databricks_postgres.
.option("dbtable", "<schema>.<table>")
.option("upsertkey", "<primary-key-column>") # Optional. Inferred from the table's primary key if omitted.
.option("checkpointLocation", "/Volumes/<catalog>/<schema>/<volume>/<checkpoint-name>")
.start()
)
df.writeStream
.format("postgresql")
.outputMode("update")
.option("endpoint", "<project-id>.<branch-id>.<endpoint-id>")
.option("database", "<database>") // Optional. Defaults to databricks_postgres.
.option("dbtable", "<schema>.<table>")
.option("upsertkey", "<primary-key-column>") // Optional. Inferred from the table's primary key if omitted.
.option("checkpointLocation", "/Volumes/<catalog>/<schema>/<volume>/<checkpoint-name>")
.start()
Replace the following placeholders:
<project-id>.<branch-id>.<endpoint-id>: Your Lakebase endpoint. Find all three values in the Resource name on the Get ID menu of the Computes tab, which has the formatprojects/<project-id>/branches/<branch-id>/endpoints/<endpoint-id>. See Compute identifiers.<database>: Optional. The name of the target Postgres database. Defaults todatabricks_postgres. See Manage databases.<schema>.<table>: The target table inschema.tableformat. If you omit the schema, the sink uses thepublicschema. Use simple identifiers that start with a letter or underscore and contain only letters, numbers, and underscores; quoted identifiers and special characters, such as hyphens, are not supported.<primary-key-column>: Optional. A comma-separated list of the columns that form the upsert key, for exampleidoruser_id,event_type. If you omitupsertkey, the sink infers the key from the target table's primary key. See Upsert behavior./Volumes/<catalog>/<schema>/<volume>/<checkpoint-name>: A Unity Catalog volume path where the query stores its checkpoint. You can also use a cloud object storage URI. The location must be storage that you can write to, not local disk, and must be unique to each streaming query. This is independent of the target table. See Structured Streaming checkpoints.
For optional configurations, such as batchsize and batchinterval, see Configuration options.
Configuration options
The sink raises an error for unrecognized options, JDBC_STREAMING_SINK_INVALID_OPTIONS.
The following options apply to all connection methods:
Key | Default | Description |
|---|---|---|
|
| Optional. The maximum time to hold rows in the buffer before flushing. For example, |
|
| Optional. The maximum number of rows for each database transaction. |
| None | Required. Path to a checkpoint directory, such as a Unity Catalog volume ( |
| None | Optional. A comma-separated list of column names that form the upsert key. For example, |
Lakebase tables not registered with Unity Catalog
The following options apply when you connect to a Lakebase table not registered with Unity Catalog:
Key | Default | Description |
|---|---|---|
|
| Optional. The target PostgreSQL database name. |
| None | Required. The target table name in |
| None | Required. The Lakebase endpoint, in |
Upsert behavior
When upsert keys exist, either specified with upsertkey or inferred by the sink from the table's primary keys, the sink upserts into the table with PostgreSQL's INSERT INTO ... ON CONFLICT (<upsert_key>) DO UPDATE SET ... syntax.
When no upsert keys exist, the sink performs inserts. A query's output mode has no effect on the upsert or insert behavior.
The upsertkey columns must:
- Be a non-empty subset of the DataFrame columns.
- Match the target table's
PRIMARY KEYexactly. If the columns you specify don't match the primary key, the query fails. - Be comparable types, such as numeric or string types. To prevent database deadlocks during concurrent writes, the sink sorts rows by upsert key within each batch. Upsert keys do not support complex or struct types.
Column names are automatically quoted with the PostgreSQL default, double quotes ", which handles reserved keywords and mixed-case names.
Table and schema names must use simple identifiers that start with a letter or underscore and contain only letters, numbers, and underscores. The sink does not support quoted identifiers or special characters, such as hyphens, in table or schema names.
Performance Tuning
Batching and backpressure
A flush is triggered when either condition is met:
- The buffer reaches
batchsizerows, which defaults to1000. - The buffer age exceeds
batchinterval, which defaults to100 milliseconds.
When the database cannot keep up with the incoming data rate, the sink propagates backpressure upstream to the source.
Latency and throughput guidance:
- For low-latency workloads with real-time mode, decrease
batchintervalto guarantee a shorter maximum time before flushing. See Real-time mode in Structured Streaming. - For high-throughput workloads, increase
batchsizeto reduce overhead for each transaction.
Connection behavior
The sink uses connection pooling on executors. By default, each task uses one database connection.
Databricks recommends that you use the default value of 1 task for each connection. If you increase the number of tasks for each connection, you might cause connection contentions and increase latencies for high throughput connections.
To configure the ratio of tasks to connections, set the spark.databricks.sql.streaming.jdbc.tasksPerConnection Spark configuration. If the target database has a low connection limit, reduce the number of shuffle partitions or increase spark.databricks.sql.streaming.jdbc.tasksPerConnection.
The sink automatically retries transient JDBC errors, including connection failures, deadlocks, and rate limiting. If the sink exhausts all retries, the query fails.
Supported triggers and output modes
Triggers
This table shows support for Structured Streaming trigger types:
Trigger | Supported |
|---|---|
| Yes |
| Yes |
| Yes |
| Yes |
Output modes
This table shows support for Structured Streaming output modes:
Output mode | Supported |
|---|---|
| Yes |
| Yes. Behavior is identical to |
| No |
Limitations
- Serverless compute and Lakeflow Spark Declarative Pipelines are not supported.
- Only Lakebase is supported as a write target. External PostgreSQL-compatible databases are not supported.