Skip to main content

Use the Zerobus Ingest connector

This page describes how to ingest data using the Zerobus Ingest connector in Lakeflow Connect.

Interface choice

Zerobus Ingest supports gRPC and REST interfaces. All SDKs integrate directly with our gRPC APIs, providing a developer-friendly interface for building custom high-throughput clients. The REST interface handles architectural constraints when dealing with massive fleets of "chatty" devices.

  • The SDKs with gRPC "Connection Tax": gRPC specializes in high-throughput performance through persistent connections. However, every open stream counts against your concurrency quotas.
  • The REST "Throughput Tax": REST requires a full handshake for every update, making it stateless. REST aligns well with edge device use cases where status is reported infrequently.

Lean on gRPC for high-volume streams and REST for massive, low-frequency device fleets or unsupported languages.

Get your workspace URL and Zerobus Ingest Endpoint

When viewing your Databricks workspace after logging in, take a look at the URL in your browser with the following format: https://<databricks-instance>.com/o=XXXXX. The URL consists of everything before the /o=XXXXX, for example:

Full URL: https://abcd-teste2-test-spcse2.cloud.databricks.com/?o=2281745829657864#

Workspace URL: https://abcd-teste2-test-spcse2.cloud.databricks.com

Workspace ID: 2281745829657864

The server endpoint depends on the workspace and region:

Server endpoint: <workspace-id>.zerobus.<region>.cloud.databricks.com

Create or identify the target table

Identify the target table that you want to ingest into. To create a new target table, run the CREATE TABLE SQL command. For example:

SQL
    CREATE TABLE unity.default.air_quality (
device_name STRING, temp INT, humidity LONG);

Create a service principal and grant permissions

A service principal is a specialized identity providing more security than personalized accounts. More information regarding service principal and how to use them for authentication can be found in these instructions.

  1. Create a service principal in your workspace under Settings > Identity and Access.

  2. Generate and save the client ID and the client secret for the service principal.

  3. Grant the required permissions for the catalog, the schema, and the table to the service principal:

    1. In the Service Principal page, open the Configurations tab.
    2. Copy the Application Id (UUID).
    3. Use the following SQL, replacing the example UUID and username, schema name, and table names if required:
    SQL
    GRANT USE CATALOG ON CATALOG <username> TO `<UUID>`;
    GRANT USE SCHEMA ON SCHEMA <username.default> TO `<UUID>`;
    GRANT MODIFY, SELECT ON TABLE <username.default.table_name> TO `<UUID>`;

Write a client

Python 3.9 or higher is required. The SDK uses PyO3 bindings to the high-performance Rust SDK, providing up to 40x higher throughput than pure Python and efficient network I/O through the Rust async runtime. It supports JSON (simplest) and Protocol Buffers (recommended for production). The SDK also supports both sync and async implementations, as well as 3 different ingestion methods (future-based, offset-based and fire-and-forget).

Bash
pip install databricks-zerobus-ingest-sdk

JSON example:

Python
  import logging
from zerobus.sdk.sync import ZerobusSdk
from zerobus.sdk.shared import RecordType, StreamConfigurationOptions, TableProperties

# Configuration - see "Before you begin" section for how to obtain these values.
SERVER_ENDPOINT="https://1234567890123456.zerobus.us-west-2.cloud.databricks.com"
DATABRICKS_WORKSPACE_URL="https://dbc-a1b2c3d4-e5f6.cloud.databricks.com"
TABLE_NAME="main.default.air_quality"
CLIENT_ID="your-client-id"
CLIENT_SECRET="your-client-secret"

sdk = ZerobusSdk(
SERVER_ENDPOINT,
DATABRICKS_WORKSPACE_URL
)

table_properties = TableProperties(TABLE_NAME)
options = StreamConfigurationOptions(record_type=RecordType.JSON)
stream = sdk.create_stream(CLIENT_ID, CLIENT_SECRET, table_properties, options)

try:
for i in range(1000):
record_dict = {
"device_name": f"sensor-{i}",
"temp": 20 + i % 15,
"humidity": 50 + i % 40
}
offset = stream.ingest_record_offset(record_dict)

# Optional: Wait for durability confirmation
stream.wait_for_offset(offset)
finally:
stream.close()

Protocol Buffers: For type-safe ingestion, use Protocol Buffers with RecordType.PROTO (default) and provide a descriptorProto in table properties.

For complete documentation, configuration options, batch ingestion, and Protocol Buffer examples, see the Python SDK repository.