Skip to main content

Use the Zerobus Ingest connector

Preview

The Zerobus Ingest connector is in Public Preview. To try it, contact your Databricks account representative.

This page describes how to ingest data using the Zerobus direct write connector in Lakeflow Connect.

Get your workspace URL

When viewing your Databricks workspace after logging in, take a look at the URL in your browser with the following format: https://<databricks-instance>.com/o=XXXXX. The URL consists of everything before the /o=XXXXX, for example:

Full URL: https://abcd-teste2-test-spcse2.cloud.databricks.com/?o=2281745829657864#

Workspace URL: https://abcd-teste2-test-spcse2.cloud.databricks.com

Create or identify the target table

Identify the target table that you want to ingest into. To create a new target table, run the CREATE TABLE SQL command. For example:

SQL
    CREATE TABLE unity.default.air_quality (
device_name STRING, temp INT, humidity LONG);

Create a service principal and grant permissions

A service principal is a specialized identity providing more security than personalized accounts. More information regarding service principal and how to use them for authentication can be found in these instructions.

  1. Create a service principal in your workspace under Settings > Identity and Access.

  2. Generate and save the client ID and the client secret for the service principal.

  3. Grant the required permissions for the catalog, the schema, and the table to the service principal:

    1. In the Service Principal page, open the Configurations tab.
    2. Copy the Application Id (UUID).
    3. Use the following SQL, replacing the example UUID and username, schema name, and table names if required:
    SQL
    GRANT USE CATALOG ON CATALOG <username> TO `<UUID>`;
    GRANT USE SCHEMA ON SCHEMA <username.default> TO `<UUID>`;
    GRANT MODIFY, SELECT ON TABLE <username.default.table_name> TO `<UUID>`;

Write a client

Python 3.9 or higher is required.

  1. Install the Python SDK.

    Bash
    pip install databricks-zerobus-ingest-sdk

    Alternatively, install from source:

    Bash
    git clone https://github.com/databricks/zerobus-sdk-py.git
    cd zerobus-sdk-py
    pip install -e .
  2. Generate a Protocol Buffer definition.

    Automatically generate a proto definition from your Unity Catalog table schema using the generate_proto tool:

    Bash
    python -m zerobus.tools.generate_proto \
    --uc-endpoint "https://dbc-a1b2c3d4-e5f6.cloud.databricks.com" \
    --client-id "<service-principal-application-id>" \
    --client-secret "<service-principal-secret>" \
    --table "<catalog.schema.table_name>" \
    --output "record.proto"

    Example output for a table:

    SQL
    CREATE TABLE main.default.air_quality (
    device_name STRING,
    temp INT,
    humidity BIGINT
    )
    USING DELTA;

    Generates record.proto:

    Proto
    syntax = "proto2";

    message air_quality {
    optional string device_name = 1;
    optional int32 temp = 2;
    optional int64 humidity = 3;
    }

    You can learn more about Protobuf messages here.

  3. Compile the Protocol Buffer definition.

    Install grpcio-tools and compile your proto file:

    Bash
    pip install "grpcio-tools>=1.60.0,<2.0"
    python -m grpc_tools.protoc --python_out=. --proto_path=. record.proto

    This generates a record_pb2.py file containing the compiled protocol buffer definition.

  4. Create a Python client.

    To use the SDK, you need the following information:

    • Databricks workspace URL
    • Workspace ID (found in your workspace URL after /o=)
    • Table name
    • Service principal client ID and client secret
    • Zerobus endpoint in the format https://<workspace_id>.zerobus.<region>.cloud.databricks.com

    Synchronous example:

    Python
    import logging
    from zerobus.sdk.sync import ZerobusSdk
    from zerobus.sdk.shared import TableProperties
    import record_pb2

    # Configure logging (optional).
    logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
    )

    # Configuration.
    server_endpoint = "1234567890123456.zerobus.us-west-2.cloud.databricks.com"
    workspace_url = "https://dbc-a1b2c3d4-e5f6.cloud.databricks.com"
    table_name = "main.default.air_quality"
    client_id = "your-service-principal-application-id"
    client_secret = "your-service-principal-secret"

    # Initialize SDK.
    sdk = ZerobusSdk(server_endpoint, workspace_url)

    # Configure table properties.
    table_properties = TableProperties(
    table_name,
    record_pb2.AirQuality.DESCRIPTOR
    )

    # Create stream.
    stream = sdk.create_stream(
    client_id,
    client_secret,
    table_properties
    )

    try:
    # Ingest records.
    for i in range(100):
    record = record_pb2.AirQuality(
    device_name=f"sensor-{i % 10}",
    temp=20 + (i % 15),
    humidity=50 + (i % 40)
    )

    ack = stream.ingest_record(record)
    ack.wait_for_ack() # Wait for durability.

    print(f"Ingested record {i + 1}")

    print("Successfully ingested 100 records!")
    finally:
    stream.close()

Configuration options

The SDK supports various configuration options via StreamConfigurationOptions:

Option

Default

Description

max_inflight_records

50000

Maximum number of unacknowledged records

recovery

True

Enable automatic stream recovery

recovery_timeout_ms

15000

Timeout for recovery operations (ms)

recovery_backoff_ms

2000

Delay between recovery attempts (ms)

recovery_retries

3

Maximum number of recovery attempts

flush_timeout_ms

300000

Timeout for flush operations (ms)

server_lack_of_ack_timeout_ms

60000

Server acknowledgment timeout (ms)

ack_callback

None

Callback invoked on record acknowledgment

Error handling

The SDK provides two exception types:

  • ZerobusException: Retriable errors (network issues, temporary failures)
  • NonRetriableException: Non-retriable errors (invalid credentials, missing table)
Python
from zerobus.sdk.shared import ZerobusException, NonRetriableException

try:
stream.ingest_record(record)
except NonRetriableException as e:
print(f"Fatal error: {e}")
raise
except ZerobusException as e:
print(f"Retriable error: {e}")
# Implement retry logic with backoff.