Skip to main content

Use the Zerobus Ingest connector

This page describes how to ingest data using the Zerobus Ingest connector in Lakeflow Connect.

Choose an interface

Zerobus Ingest supports gRPC and REST interfaces. The SDKs provide custom gRPC-based clients with a developer-friendly interface for building high-throughput applications. The REST interface handles architectural constraints when dealing with massive fleets of "chatty" devices.

  • The SDKs with gRPC "Connection Tax": gRPC specializes in high-throughput performance through persistent connections. However, every open stream counts against your concurrency quotas.
  • The REST "Throughput Tax": REST requires a full handshake for every update, making it stateless. REST aligns well with edge device use cases where status is reported infrequently.

Lean on gRPC for high-volume streams and REST for massive, low-frequency device fleets or unsupported languages.

Get your workspace URL and Zerobus Ingest endpoint

Your workspace URL appears in the browser when you log in. While the full URL follows the format https://<databricks-instance>.com/o=XXXXX, the workspace URL consists of everything before the /o=XXXXX. For example, given the following full URL, you can determine the workspace URL and workspace ID.

  • Full URL: https://abcd-teste2-test-spcse2.cloud.databricks.com/?o=2281745829657864#
  • Workspace URL: https://abcd-teste2-test-spcse2.cloud.databricks.com
  • Workspace ID: 2281745829657864

The server endpoint depends on the workspace and region:

  • Server endpoint: <workspace-id>.zerobus.<region>.cloud.databricks.com

For region availability, see Zerobus Ingest connector limitations.

Create or identify the target table

Identify the target table that you want to ingest data into. To create a new target table, run the CREATE TABLE SQL command. For example, create a new table named unity.default.air_quality.

SQL
    CREATE TABLE unity.default.air_quality (
device_name STRING, temp INT, humidity LONG);

Create a service principal and grant permissions

A service principal is a specialized identity that provides more security than personalized accounts. For more information regarding service principals and how to use them for authentication, see Authorize service principal access to Databricks with OAuth.

  1. To create a service principal, navigate to Settings > Identity and Access.

  2. Under Service principals, select Manage.

  3. Click Add service principal.

  4. In the Add service principal window, create a new service principal by clicking Add new.

  5. Generate and save the client ID and the client secret for the service principal.

  6. Grant the required permissions for the catalog, the schema, and the table to the service principal.

    1. In the Service principal page, navigate to the Configurations tab.
    2. Copy the Application Id (UUID).
    3. Use the following SQL to grant permissions, replacing the example UUID and catalog, schema name, and table names if required.
    SQL
    GRANT USE CATALOG ON CATALOG <catalog> TO `<UUID>`;
    GRANT USE SCHEMA ON SCHEMA <catalog.schema> TO `<UUID>`;
    GRANT MODIFY, SELECT ON TABLE <catalog.schema.table_name> TO `<UUID>`;
    important

    You must grant MODIFY and SELECT privileges on the table, even for tables with ALL PRIVILEGES granted.

Write a client

Use a Zerobus SDK in your preferred programming language or the REST API to ingest data into your target table.

Python 3.9 or higher is required. The SDK uses PyO3 bindings to the high-performance Rust SDK, providing up to 40x higher throughput than pure Python and efficient network I/O through the Rust async runtime. It supports JSON (simplest) and Protocol Buffers (recommended for production). The SDK also supports both sync and async implementations, as well as 3 different ingestion methods (future-based, offset-based and fire-and-forget).

Bash
pip install databricks-zerobus-ingest-sdk

JSON example:

Python
import logging
from zerobus.sdk.sync import ZerobusSdk
from zerobus.sdk.shared import RecordType, StreamConfigurationOptions, TableProperties

# See "Get your workspace URL and Zerobus Ingest endpoint" for information on obtaining these values.
SERVER_ENDPOINT="https://1234567890123456.zerobus.us-west-2.cloud.databricks.com"
DATABRICKS_WORKSPACE_URL="https://dbc-a1b2c3d4-e5f6.cloud.databricks.com"
TABLE_NAME="main.default.air_quality"
CLIENT_ID="your-client-id"
CLIENT_SECRET="your-client-secret"

sdk = ZerobusSdk(
SERVER_ENDPOINT,
DATABRICKS_WORKSPACE_URL
)

table_properties = TableProperties(TABLE_NAME)
options = StreamConfigurationOptions(record_type=RecordType.JSON)
stream = sdk.create_stream(CLIENT_ID, CLIENT_SECRET, table_properties, options)

try:
for i in range(1000):
record_dict = {
"device_name": f"sensor-{i}",
"temp": 20 + i % 15,
"humidity": 50 + i % 40
}
offset = stream.ingest_record_offset(record_dict)

# Optional: Wait for durability confirmation
stream.wait_for_offset(offset)
finally:
stream.close()

Acknowledgment callback: To track ingestion progress asynchronously, use the ack_callback option. Pass a subclass of AckCallback with the on_ack(offset: int) and on_error(offset: int, error_message: str) methods, which are called when records are either acknowledged or fail.

Python
from zerobus.sdk.shared import AckCallback, StreamConfigurationOptions, RecordType

class MyAckCallback(AckCallback):
def on_ack(self, offset: int) -> None:
print(f"Record acknowledged at offset: {offset}")

def on_error(self, offset: int, error_message: str) -> None:
print(f"Error at offset {offset}: {error_message}")

options = StreamConfigurationOptions(
record_type = RecordType.JSON,
ack_callback = MyAckCallback()
)

Protocol Buffers: For type-safe ingestion, use Protocol Buffers with RecordType.PROTO (default) and provide a descriptorProto in table properties.

For complete documentation, configuration options, batch ingestion, and Protocol Buffer examples, see the Python SDK repository.

Next steps