Skip to main content

Use the Zerobus Ingest connector

Preview

The Zerobus Ingest connector is in Public Preview. To try it, contact your Databricks account representative.

This page describes how to ingest data using the Zerobus direct write connector in Lakeflow Connect.

Get your workspace URL

When viewing your Databricks workspace after logging in, take a look at the URL in your browser with the following format: https://<databricks-instance>.com/o=XXXXX. The URL consists of everything before the /o=XXXXX, for example:

Full URL: https://abcd-teste2-test-spcse2.cloud.databricks.com/?o=2281745829657864#

Workspace URL: https://abcd-teste2-test-spcse2.cloud.databricks.com

Create or identify the target table

Identify the target table that you want to ingest into. To create a new target table, run the CREATE TABLE SQL command. For example:

SQL
    CREATE TABLE unity.default.air_quality (
device_name STRING, temp INT, humidity LONG);

Create a service principal and grant permissions

A service principal is a specialized identity providing more security than personalized accounts. More information regarding service principal and how to use them for authentication can be found in these instructions.

  1. Create a service principal in your workspace under Settings > Identity and Access.

  2. Generate and save the client ID and the client secret for the service principal.

  3. Grant the required permissions for the catalog, the schema, and the table to the service principal:

    1. In the Service Principal page, open the Configurations tab.
    2. Copy the Application Id (UUID).
    3. Use the following SQL, replacing the example UUID and username, schema name, and table names if required:
    SQL
    GRANT USE CATALOG ON CATALOG <username> TO `<UUID>`;
    GRANT USE SCHEMA ON SCHEMA <username.default> TO `<UUID>`;
    GRANT MODIFY, SELECT ON TABLE <username.default.table_name> TO `<UUID>`;

Endpoint formats

The Zerobus server endpoint and workspace URL formats differ by cloud provider:

Cloud

Server endpoint

Workspace URL

AWS

<workspace-id>.zerobus.<region>.cloud.databricks.com

https://<instance>.cloud.databricks.com

Azure

<workspace-id>.zerobus.<region>.azuredatabricks.net

https://<instance>.azuredatabricks.net

Example for AWS:

Text
Server endpoint: 1234567890123456.zerobus.us-west-2.cloud.databricks.com
Workspace URL: https://dbc-a1b2c3d4-e5f6.cloud.databricks.com

Example for Azure:

Text
Server endpoint: 1234567890123456.zerobus.eastus.azuredatabricks.net
Workspace URL: https://adb-1234567890123456.12.azuredatabricks.net

Write a client

Python 3.9 or higher is required. The SDK supports JSON (simplest) and Protocol Buffers (recommended for production).

Bash
pip install databricks-zerobus-ingest-sdk

JSON example:

Python
from zerobus.sdk.sync import ZerobusSdk
from zerobus.sdk.shared import RecordType, StreamConfigurationOptions, TableProperties

# Configuration - see "Before you begin" section for how to obtain these values.
server_endpoint = "1234567890123456.zerobus.us-west-2.cloud.databricks.com"
workspace_url = "https://dbc-a1b2c3d4-e5f6.cloud.databricks.com"
table_name = "main.default.air_quality"
client_id = "your-service-principal-application-id"
client_secret = "your-service-principal-secret"

sdk = ZerobusSdk(server_endpoint, workspace_url)
table_properties = TableProperties(table_name)
options = StreamConfigurationOptions(record_type=RecordType.JSON)

stream = sdk.create_stream(client_id, client_secret, table_properties, options)

try:
for i in range(100):
record = {"device_name": f"sensor-{i}", "temp": 22, "humidity": 55}
ack = stream.ingest_record(record) # Pass dict directly, SDK handles serialization.
ack.wait_for_ack()
finally:
stream.close()

Acknowledgment callback: To track ingestion progress asynchronously, use the ack_callback option. The callback receives an IngestRecordResponse object with a durability_ack_up_to_offset attribute (int) indicating all records up to that offset have been durably written:

Python
from zerobus.sdk.shared import IngestRecordResponse

def on_ack(response: IngestRecordResponse) -> None:
print(f"Records acknowledged up to offset: {response.durability_ack_up_to_offset}")

options = StreamConfigurationOptions(
record_type=RecordType.JSON,
ack_callback=on_ack
)

Protocol Buffers: For type-safe ingestion, use Protocol Buffers with RecordType.PROTO (default). Generate a schema from your table using python -m zerobus.tools.generate_proto.

For complete documentation, configuration options, async API, and Protocol Buffer examples, see the Python SDK repository.