Skip to main content

Connect to Lakebase

Preview

This feature is in Public Preview.

Use Structured Streaming to write to Lakebase with built-in batching, automatic retries, and workspace-managed authentication.

When to use the Lakebase sink

Use the Lakebase sink for low-latency streaming writes to Lakebase. This sink doesn't require you to implement custom foreachBatch functions to handle batching, connection management, and error handling.

Common use cases include:

  • Update application databases in real-time for operational dashboards or customer-facing features.
  • Sync continuously changing data, such as aggregated or filtered streaming results, into a transactional database.
  • Write the output of a Structured Streaming query into a Lakebase table with sub-second latency using real-time mode.

Requirements

  • Databricks Runtime 18 and above
  • Classic compute with dedicated or standard access modes.
  • A Lakebase database

Connect to a database

The Lakebase sink supports the following connection methods:

Lakebase tables not registered with Unity Catalog

For Lakebase tables not registered with Unity Catalog, the connector automatically manages the credentials and uses the identity of the user or Databricks service principal running the query.

To write to a Lakebase table, use the endpoint and dbtable options. The following example also includes the optional database and upsertkey options:

Python
(df.writeStream
.format("postgresql")
.outputMode("update")
.option("endpoint", "<project-id>.<branch-id>.<endpoint-id>")
.option("database", "<database>") # Optional. Defaults to databricks_postgres.
.option("dbtable", "<schema>.<table>")
.option("upsertkey", "<primary-key-column>") # Optional. Inferred from the table's primary key if omitted.
.option("checkpointLocation", "/Volumes/<catalog>/<schema>/<volume>/<checkpoint-name>")
.start()
)

Replace the following placeholders:

  • <project-id>.<branch-id>.<endpoint-id>: Your Lakebase endpoint. Find all three values in the Resource name on the Get ID menu of the Computes tab, which has the format projects/<project-id>/branches/<branch-id>/endpoints/<endpoint-id>. See Compute identifiers.
  • <database>: Optional. The name of the target Postgres database. Defaults to databricks_postgres. See Manage databases.
  • <schema>.<table>: The target table in schema.table format. If you omit the schema, the sink uses the public schema. Use simple identifiers that start with a letter or underscore and contain only letters, numbers, and underscores; quoted identifiers and special characters, such as hyphens, are not supported.
  • <primary-key-column>: Optional. A comma-separated list of the columns that form the upsert key, for example id or user_id,event_type. If you omit upsertkey, the sink infers the key from the target table's primary key. See Upsert behavior.
  • /Volumes/<catalog>/<schema>/<volume>/<checkpoint-name>: A Unity Catalog volume path where the query stores its checkpoint. You can also use a cloud object storage URI. The location must be storage that you can write to, not local disk, and must be unique to each streaming query. This is independent of the target table. See Structured Streaming checkpoints.

For optional configurations, such as batchsize and batchinterval, see Configuration options.

Configuration options

The sink raises an error for unrecognized options, JDBC_STREAMING_SINK_INVALID_OPTIONS.

The following options apply to all connection methods:

Key

Default

Description

batchinterval

100 milliseconds

Optional. The maximum time to hold rows in the buffer before flushing. For example, "50 milliseconds".

batchsize

1000

Optional. The maximum number of rows for each database transaction.

checkpointLocation

None

Required. Path to a checkpoint directory, such as a Unity Catalog volume (/Volumes/<catalog>/<schema>/<volume>/<checkpoint-name>). Must be unique to each query. See Structured Streaming checkpoints.

upsertkey

None

Optional. A comma-separated list of column names that form the upsert key. For example, "id" or "user_id,event_type". If you specify upsertkey, the columns must match the table's primary key, or the query fails. If you omit it, the sink uses the primary key automatically. For more information, see Upsert behavior.

Lakebase tables not registered with Unity Catalog

The following options apply when you connect to a Lakebase table not registered with Unity Catalog:

Key

Default

Description

database

databricks_postgres

Optional. The target PostgreSQL database name.

dbtable

None

Required. The target table name in schema.table format. If you don't specify a schema, the default schema value is public. Use simple identifiers that start with a letter or underscore and contain only letters, numbers, and underscores. Do not quote table or schema names; quoted identifiers and names with special characters, such as hyphens, are not supported.

endpoint

None

Required. The Lakebase endpoint, in project_id.branch_id or project_id.branch_id.endpoint_id format. The endpoint_id is optional; if you omit it and the branch has a single read-write endpoint, the sink selects that endpoint by default.

Upsert behavior

When upsert keys exist, either specified with upsertkey or inferred by the sink from the table's primary keys, the sink upserts into the table with PostgreSQL's INSERT INTO ... ON CONFLICT (<upsert_key>) DO UPDATE SET ... syntax.

When no upsert keys exist, the sink performs inserts. A query's output mode has no effect on the upsert or insert behavior.

The upsertkey columns must:

  • Be a non-empty subset of the DataFrame columns.
  • Match the target table's PRIMARY KEY exactly. If the columns you specify don't match the primary key, the query fails.
  • Be comparable types, such as numeric or string types. To prevent database deadlocks during concurrent writes, the sink sorts rows by upsert key within each batch. Upsert keys do not support complex or struct types.

Column names are automatically quoted with the PostgreSQL default, double quotes ", which handles reserved keywords and mixed-case names.

Table and schema names must use simple identifiers that start with a letter or underscore and contain only letters, numbers, and underscores. The sink does not support quoted identifiers or special characters, such as hyphens, in table or schema names.

Performance Tuning

Batching and backpressure

A flush is triggered when either condition is met:

  • The buffer reaches batchsize rows, which defaults to 1000.
  • The buffer age exceeds batchinterval, which defaults to 100 milliseconds.

When the database cannot keep up with the incoming data rate, the sink propagates backpressure upstream to the source.

Latency and throughput guidance:

  • For low-latency workloads with real-time mode, decrease batchinterval to guarantee a shorter maximum time before flushing. See Real-time mode in Structured Streaming.
  • For high-throughput workloads, increase batchsize to reduce overhead for each transaction.

Connection behavior

The sink uses connection pooling on executors. By default, each task uses one database connection.

Databricks recommends that you use the default value of 1 task for each connection. If you increase the number of tasks for each connection, you might cause connection contentions and increase latencies for high throughput connections.

To configure the ratio of tasks to connections, set the spark.databricks.sql.streaming.jdbc.tasksPerConnection Spark configuration. If the target database has a low connection limit, reduce the number of shuffle partitions or increase spark.databricks.sql.streaming.jdbc.tasksPerConnection.

The sink automatically retries transient JDBC errors, including connection failures, deadlocks, and rate limiting. If the sink exhausts all retries, the query fails.

Supported triggers and output modes

Triggers

This table shows support for Structured Streaming trigger types:

Trigger

Supported

realTime

Yes

ProcessingTime

Yes

AvailableNow

Yes

Once

Yes

Output modes

This table shows support for Structured Streaming output modes:

Output mode

Supported

update

Yes

append

Yes. Behavior is identical to update. The query upserts when the target table has a primary key, otherwise the query inserts. See Upsert behavior.

complete

No

Limitations

  • Serverless compute and Lakeflow Spark Declarative Pipelines are not supported.
  • Only Lakebase is supported as a write target. External PostgreSQL-compatible databases are not supported.