Use the Zerobus Ingest connector
This page describes how to ingest data using the Zerobus Ingest connector in Lakeflow Connect.
Interface choice
Zerobus Ingest supports gRPC and REST interfaces. All SDKs integrate directly with our gRPC APIs, providing a developer-friendly interface for building custom high-throughput clients. The REST interface handles architectural constraints when dealing with massive fleets of "chatty" devices.
- The SDKs with gRPC "Connection Tax": gRPC specializes in high-throughput performance through persistent connections. However, every open stream counts against your concurrency quotas.
- The REST "Throughput Tax": REST requires a full handshake for every update, making it stateless. REST aligns well with edge device use cases where status is reported infrequently.
Lean on gRPC for high-volume streams and REST for massive, low-frequency device fleets or unsupported languages.
Get your workspace URL and Zerobus Ingest Endpoint
When viewing your Databricks workspace after logging in, take a look at the URL in your browser with the following format: https://<databricks-instance>.com/o=XXXXX. The URL consists of everything before the /o=XXXXX, for example:
Full URL: https://abcd-teste2-test-spcse2.cloud.databricks.com/?o=2281745829657864#
Workspace URL: https://abcd-teste2-test-spcse2.cloud.databricks.com
Workspace ID: 2281745829657864
The server endpoint depends on the workspace and region:
Server endpoint: <workspace-id>.zerobus.<region>.cloud.databricks.com
Create or identify the target table
Identify the target table that you want to ingest into. To create a new target table, run the CREATE TABLE SQL command. For example:
CREATE TABLE unity.default.air_quality (
device_name STRING, temp INT, humidity LONG);
Create a service principal and grant permissions
A service principal is a specialized identity providing more security than personalized accounts. More information regarding service principal and how to use them for authentication can be found in these instructions.
-
Create a service principal in your workspace under Settings > Identity and Access.
-
Generate and save the client ID and the client secret for the service principal.
-
Grant the required permissions for the catalog, the schema, and the table to the service principal:
- In the Service Principal page, open the Configurations tab.
- Copy the Application Id (UUID).
- Use the following SQL, replacing the example UUID and username, schema name, and table names if required:
SQLGRANT USE CATALOG ON CATALOG <username> TO `<UUID>`;
GRANT USE SCHEMA ON SCHEMA <username.default> TO `<UUID>`;
GRANT MODIFY, SELECT ON TABLE <username.default.table_name> TO `<UUID>`;
Write a client
- Python SDK
- Rust SDK
- Java SDK
- Go SDK
- TypeScript SDK
- REST API
Python 3.9 or higher is required. The SDK uses PyO3 bindings to the high-performance Rust SDK, providing up to 40x higher throughput than pure Python and efficient network I/O through the Rust async runtime. It supports JSON (simplest) and Protocol Buffers (recommended for production). The SDK also supports both sync and async implementations, as well as 3 different ingestion methods (future-based, offset-based and fire-and-forget).
pip install databricks-zerobus-ingest-sdk
JSON example:
import logging
from zerobus.sdk.sync import ZerobusSdk
from zerobus.sdk.shared import RecordType, StreamConfigurationOptions, TableProperties
# Configuration - see "Before you begin" section for how to obtain these values.
SERVER_ENDPOINT="https://1234567890123456.zerobus.us-west-2.cloud.databricks.com"
DATABRICKS_WORKSPACE_URL="https://dbc-a1b2c3d4-e5f6.cloud.databricks.com"
TABLE_NAME="main.default.air_quality"
CLIENT_ID="your-client-id"
CLIENT_SECRET="your-client-secret"
sdk = ZerobusSdk(
SERVER_ENDPOINT,
DATABRICKS_WORKSPACE_URL
)
table_properties = TableProperties(TABLE_NAME)
options = StreamConfigurationOptions(record_type=RecordType.JSON)
stream = sdk.create_stream(CLIENT_ID, CLIENT_SECRET, table_properties, options)
try:
for i in range(1000):
record_dict = {
"device_name": f"sensor-{i}",
"temp": 20 + i % 15,
"humidity": 50 + i % 40
}
offset = stream.ingest_record_offset(record_dict)
# Optional: Wait for durability confirmation
stream.wait_for_offset(offset)
finally:
stream.close()
Protocol Buffers: For type-safe ingestion, use Protocol Buffers with RecordType.PROTO (default) and provide a descriptorProto in table properties.
For complete documentation, configuration options, batch ingestion, and Protocol Buffer examples, see the Python SDK repository.
Rust 1.70 or higher is required. The SDK leverages async I/O and gRPC for high-throughput ingestion and serves as the core of all other SDKs. It supports JSON (simplest) and Protocol Buffers (recommended for production).
First, import the package.
cargo add databricks-zerobus-ingest-sdk
Or add it to your Cargo.toml.
[dependencies]
databricks-zerobus-ingest-sdk = "0.5.0" # Latest version at time of publication
JSON example:
use databricks_zerobus_ingest_sdk::{
databricks::zerobus::RecordType, JsonString, StreamConfigurationOptions,
TableProperties, ZerobusSdk,
};
use std::error::Error;
//Configuration - see "Before you begin" section for how to obtain these values.
const DATABRICKS_WORKSPACE_URL: &str = "https://<your-workspace>.cloud.databricks.com";
const SERVER_ENDPOINT: &str = "<your-shard-id>.zerobus.<region>.cloud.databricks.com";
const TABLE_NAME: &str = "main.default.air_quality";
const CLIENT_ID: &str = "your-client-id";
const CLIENT_SECRET: &str = "your-client-secret";
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let table_properties = TableProperties {
table_name: TABLE_NAME.to_string(),
descriptor_proto: None,
};
let stream_configuration_options = StreamConfigurationOptions {
max_inflight_requests: 100,
record_type: RecordType::Json,
..Default::default()
};
let sdk_handle = ZerobusSdk::builder()
.endpoint(SERVER_ENDPOINT)
.unity_catalog_url(DATABRICKS_WORKSPACE_URL)
.build()?;
let mut stream = sdk_handle
.create_stream(
table_properties.clone(),
CLIENT_ID.to_string(),
CLIENT_SECRET.to_string(),
Some(stream_configuration_options),
)
.await?;
let offset = stream.ingest_record_offset(
JsonString("{
\"device_name\": \"sensor\",
\"temp\": 22,
\"humidity\": 55}".to_string())).await?;
stream.wait_for_offset(offset).await?;
println!("Record ingested successfully");
stream.close().await?;
println!("Stream closed successfully");
Ok(())
}
Protocol Buffers: For type-safe ingestion, use Protocol Buffers with RecordType::Proto (default) and provide a descriptor_proto in table properties. Generate the needed files using the generate_proto tool and import into your project.
For complete documentation, configuration options, batch ingestion, generate_proto tool and Protocol Buffer examples, see the Rust SDK repository.
Java 8 or higher is required. The SDK uses JNI (Java Native Interface) bindings to the high-performance Rust SDK, providing lower latency than pure Java gRPC and efficient network I/O through the Rust async runtime. It supports JSON (simplest) and Protocol Buffers (recommended for production).
Maven:
<dependency>
<groupId>com.databricks</groupId>
<artifactId>zerobus-ingest-sdk</artifactId>
<version>0.2.0</version>
</dependency>
JSON example:
import com.databricks.zerobus.*;
public class ZerobusClient {
// Configuration - see "Before you begin" section for how to obtain these values.
private static final String SERVER_ENDPOINT =
"https://1234567890123456.zerobus.us-west-2.cloud.databricks.com";
private static final String DATABRICKS_WORKSPACE_URL =
"https://dbc-a1b2c3d4-e5f6.cloud.databricks.com";
private static final String TABLE_NAME = "main.default.air_quality";
private static final String CLIENT_ID = "your-client-id";
private static final String CLIENT_SECRET = "your-client-secret";
public static void main(String[] args) throws Exception {
ZerobusSdk sdk = new ZerobusSdk(
SERVER_ENDPOINT,
DATABRICKS_WORKSPACE_URL
);
ZerobusJsonStream stream = sdk.createJsonStream(
TABLE_NAME, CLIENT_ID, CLIENT_SECRET
).join();
try {
long lastOffset = 0;
for (int i = 0; i < 100; i++) {
String record = String.format(
"{\"device_name\": \"sensor-%d\", \"temp\": 22, \"humidity\": 55}", i
);
lastOffset = stream.ingestRecordOffset(record);
}
stream.waitForOffset(lastOffset);
} finally {
stream.close();
}
}
}
Protocol Buffers: For type-safe ingestion, use ZerobusProtoStream with createProtoStream(). Generate a schema from your table using the bundled JAR tool, then compile it with protoc.
For complete documentation, configuration options, batch ingestion, and Protocol Buffer examples, see the Java SDK repository.
Go 1.21 or higher is required. The SDK wraps the high-performance Rust SDK using CGO and FFI, providing the same throughput and performance. It supports JSON (simplest) and Protocol Buffers (recommended for production).
go get github.com/databricks/zerobus-sdk-go@latest
JSON example:
NOTE: For simplicity, errors are ignored here. In production code, always check errors.
package main
import (
"fmt"
zerobus "github.com/databricks/zerobus-sdk-go"
)
//Configuration - see "Before you begin" section for how to obtain these values.
const (
ServerEndpoint = "https://1234567890123456.zerobus.us-west-2.cloud.databricks.com"
DatabricksWorkspaceURL = "https://dbc-a1b2c3d4-e5f6.cloud.databricks.com"
TableName = "main.default.air_quality"
ClientID = "your-client-id"
ClientSecret = "your-client-secret"
)
func main() {
sdk, _ := zerobus.NewZerobusSdk(
ServerEndpoint,
DatabricksWorkspaceURL,
)
defer sdk.Free()
options := zerobus.DefaultStreamConfigurationOptions()
options.RecordType = zerobus.RecordTypeJson
stream, _ := sdk.CreateStream(
zerobus.TableProperties{
TableName: TableName,
},
ClientID,
ClientSecret,
options,
)
defer stream.Close()
offset, _ := stream.IngestRecordOffset(`{
"device_name": "sensor-001",
"temp": 20,
"humidity": 60
}`)
_ = stream.WaitForOffset(offset)
fmt.Println("Record ingested successfully")
_ = stream.Close()
fmt.Println("Stream closed successfully")
}
Protocol Buffers: For type-safe ingestion, use Protocol Buffers with RecordTypeProto (default) and provide a descriptorProto in table properties. Create a .proto file matching your table schema and run generate_proto script to help you import the files into your project.
For complete documentation, configuration options, batch ingestion, generate_proto tool and Protocol Buffer examples, see the Go SDK repository.
Node.js 16 or higher is required. The SDK wraps the high-performance Rust SDK using NAPI-RS native bindings, providing native performance with Rust futures mapped to JavaScript Promises. It supports JSON (simplest) and Protocol Buffers (recommended for production).
npm install @databricks/zerobus-ingest-sdk
JSON example:
import { ZerobusSdk, RecordType } from '@databricks/zerobus-ingest-sdk';
// Configuration - see "Before you begin" section for how to obtain these values.
const SERVER_ENDPOINT = 'https://1234567890123456.zerobus.us-west-2.cloud.databricks.com';
const DATABRICKS_WORKSPACE_URL = 'https://dbc-a1b2c3d4-e5f6.cloud.databricks.com';
const TABLE_NAME = 'main.default.air_quality';
const CLIENT_ID = 'your-client-id';
const CLIENT_SECRET = 'your-client-secret';
const sdk = new ZerobusSdk(SERVER_ENDPOINT, DATABRICKS_WORKSPACE_URL);
const stream = await sdk.createStream({ tableName: TABLE_NAME }, CLIENT_ID, CLIENT_SECRET, {
recordType: RecordType.Json,
});
try {
let lastOffset = BigInt(0);
for (let i = 0; i < 100; i++) {
const record = { device_name: `sensor-${i}`, temp: 22, humidity: 55 };
lastOffset = await stream.ingestRecordOffset(record);
}
await stream.waitForOffset(lastOffset);
} finally {
await stream.close();
}
Protocol Buffers: For type-safe ingestion, use Protocol Buffers with RecordType.Proto (default) and provide a descriptorProto in table properties.
For complete documentation, configuration options, batch ingestion, and Protocol Buffer examples, see the TypeScript SDK repository.
The REST API for the Zerobus Ingest connector is in Beta.
The REST API allows you to ingest a single record by sending an HTTP POST request to the /zerobus/v1/tables/<table-name>/insert endpoint. The record itself is included in the request body and must be in JSON format.
This example walks you through how to use CURL to push data to Zerobus Ingest using the REST API.
Headers
The request requires two specific HTTP headers to authenticate and format the request correctly.
Content-Type: application/json- Mandatory field for specifying the content type. Currently, JSON is the only supported message format.
Authorization: Bearer <token>- Replace
<token>with the OAuth token you have fetched using the curl command provided later.
- Replace
Fetch OAuth Token: These tokens expire every hour and must be refreshed. You can refresh them by re-fetching the OAuth token.
Fill in the following parameters:
$CATALOG,$SCHEMA,$TABLE,$WORKSPACE_ID,$WORKSPACE_URL$DATABRICKS_CLIENT_IDand$DATABRICKS_CLIENT_SECRET- These two parameters correspond to the service principle you created.
authorization_details=$(cat <<EOF
[{
"type": "unity_catalog_privileges",
"privileges": ["USE CATALOG"],
"object_type": "CATALOG",
"object_full_path": "$CATALOG"
},
{
"type": "unity_catalog_privileges",
"privileges": ["USE SCHEMA"],
"object_type": "SCHEMA",
"object_full_path": "$CATALOG.$SCHEMA"
},
{
"type": "unity_catalog_privileges",
"privileges": ["SELECT", "MODIFY"],
"object_type": "TABLE",
"object_full_path": "$CATALOG.$SCHEMA.$TABLE"
}]
EOF
)
export OAUTH_TOKEN=$(curl -X POST \
-u "$DATABRICKS_CLIENT_ID:$DATABRICKS_CLIENT_SECRET" \
-d "grant_type=client_credentials" \
-d "scope=all-apis" \
-d "resource=api://databricks/workspaces/$WORKSPACE_ID/zerobusDirectWriteApi" \
--data-urlencode "authorization_details=$authorization_details" \
"$WORKSPACE_URL/oidc/v1/token" | jq -r '.access_token')
Record Ingestion:
Fill in the following parameters:
$ZEROBUS_ENDPOINT- As defined in the Get your workspace URL and Zerobus Ingest Endpoint section.
$CATALOG,$SCHEMA,$TABLE,$WORKSPACE_ID,$WORKSPACE_URL$OAUTH_TOKEN- This was created in the previous step.
NOTE: The request body must be a list of JSON objects.
curl -X POST \
"$ZEROBUS_ENDPOINT/zerobus/v1/tables/$CATALOG.$SCHEMA.$TABLE/insert" \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $OAUTH_TOKEN" \
-d '[{ "device_name": "device_num_1", "temp": 28, "humidity": 60 },
{ "device_name": "device_num_1", "temp": 28, "humidity": 60 }]'
If everything is correct, you should receive an empty JSON response with an HTTP status code of 200.