Connect to Lakebase
This feature is in Public Preview.
Use Structured Streaming to write to Lakebase with built-in batching, automatic retries, and workspace-managed authentication.
When to use the Lakebase sink
Use the Lakebase sink for low-latency streaming writes to Lakebase. This sink doesn't require you to implement custom foreachBatch functions to handle batching, connection management, and error handling.
Common use cases include:
- Update application databases in real-time for operational dashboards or customer-facing features.
- Sync continuously changing data, such as aggregated or filtered streaming results, into a transactional database.
- Write the output of a Structured Streaming query into a Lakebase table with sub-second latency using real-time mode.
Requirements
- Databricks Runtime 18.3 or above
- Classic compute with dedicated or standard access modes.
- A Lakebase database
Connect to a database
The Lakebase sink supports the following connection methods:
Lakebase tables not registered with Unity Catalog
For Lakebase tables not registered with Unity Catalog, the connector automatically manages the credentials and uses the identity of the user or Databricks service principal running the query.
To write to a Lakebase table, use the endpoint, database, dbtable, and upsertkey options:
- Python
- Scala
(df.writeStream
.format("postgresql")
.outputMode("update")
.option("endpoint", "<project-id>.<branch-id>.<endpoint-id>")
.option("database", "<database>")
.option("dbtable", "<schema>.<table>")
.option("upsertkey", "<primary-key-column>")
.option("checkpointLocation", "/checkpoints/<query-name>")
.start()
)
df.writeStream
.format("postgresql")
.outputMode("update")
.option("endpoint", "<project-id>.<branch-id>.<endpoint-id>")
.option("database", "<database>")
.option("dbtable", "<schema>.<table>")
.option("upsertkey", "<primary-key-column>")
.option("checkpointLocation", "/checkpoints/<query-name>")
.start()
Configuration options
The sink raises an error for unrecognized options, JDBC_STREAMING_SINK_INVALID_OPTIONS.
The following options apply to all connection methods:
Key | Default | Description |
|---|---|---|
|
| The maximum time to hold rows in the buffer before flushing. For example, |
|
| The maximum number of rows for each database transaction. |
| None | Required. Path to the checkpoint directory. |
| None | A comma-separated list of column names that form the upsert key. For example, |
Lakebase tables not registered with Unity Catalog
The following options apply when you connect to a Lakebase table not registered with Unity Catalog:
Key | Default | Description |
|---|---|---|
| None | The target PostgreSQL database name. |
| None | The target table name in |
| None | Specify the Lakebase endpoint in |
Upsert behavior
When upsert keys exist, either specified with upsertkey or inferred by the sink from the table's primary keys, the sink upserts into the table with PostgreSQL's INSERT INTO ... ON CONFLICT (<upsert_key>) DO UPDATE SET ... syntax.
When no upsert keys exist, the sink performs inserts. A query's output mode has no effect on the upsert or insert behavior.
The upsertkey columns must:
- Be a non-empty subset of the DataFrame columns.
- Reference a target table column with a
PRIMARY KEYconstraint. - Be comparable types, such as numeric or string types. To prevent database deadlocks during concurrent writes, the sink sorts rows by upsert key within each batch. Upsert keys do not support complex or struct types.
Column names are automatically quoted with the PostgreSQL default, double quotes ", which handles reserved keywords, mixed-case names, and special characters.
The sink does not quote table names and passes them as-is to the database. You must quote table names with special characters, such as "my-schema"."my-table".
Performance Tuning
Batching and backpressure
A flush is triggered when either condition is met:
- The buffer reaches
batchsizerows, which defaults to1000. - The buffer age exceeds
batchinterval, which defaults to100 milliseconds.
When the database cannot keep up with the incoming data rate, the sink propagates backpressure upstream to the source.
Latency and throughput guidance:
- For low-latency workloads with real-time mode, decrease
batchintervalto guarantee a shorter maximum time before flushing. See Real-time mode in Structured Streaming. - For high-throughput workloads, increase
batchsizeto reduce overhead for each transaction.
Connection behavior
The sink uses connection pooling on executors. By default, each task uses one database connection.
Databricks recommends that you use the default value of 1 task for each connection. If you increase the number of tasks for each connection, you might cause connection contentions and increase latencies for high throughput connections.
To configure the ratio of tasks to connections, set the spark.databricks.sql.streaming.jdbc.tasksPerConnection Spark configuration. If the target database has a low connection limit, reduce the number of shuffle partitions or increase spark.databricks.sql.streaming.jdbc.tasksPerConnection.
The sink automatically retries transient JDBC errors, including connection failures, deadlocks, and rate limiting. If the sink exhausts all retries, the query fails.
Supported triggers and output modes
Triggers
This table shows support for Structured Streaming trigger types:
Trigger | Supported |
|---|---|
| Yes |
| Yes |
| Yes |
| Yes |
Output modes
This table shows support for Structured Streaming output modes:
Output mode | Supported |
|---|---|
| Yes |
| Yes. Behavior is identical to |
| No |
Limitations
- Serverless compute and Lakeflow Spark Declarative Pipelines are not supported.
- Only Lakebase is supported as a write target. External PostgreSQL-compatible databases are not supported.