Use the Zerobus Ingest connector
The Zerobus Ingest connector is in Public Preview. To try it, contact your Databricks account representative.
This page describes how to ingest data using the Zerobus direct write connector in Lakeflow Connect.
Get your workspace URL
When viewing your Databricks workspace after logging in, take a look at the URL in your browser with the following format: https://<databricks-instance>.com/o=XXXXX. The URL consists of everything before the /o=XXXXX, for example:
Full URL: https://abcd-teste2-test-spcse2.cloud.databricks.com/?o=2281745829657864#
Workspace URL: https://abcd-teste2-test-spcse2.cloud.databricks.com
Create or identify the target table
Identify the target table that you want to ingest into. To create a new target table, run the CREATE TABLE SQL command. For example:
CREATE TABLE unity.default.air_quality (
device_name STRING, temp INT, humidity LONG);
Create a service principal and grant permissions
A service principal is a specialized identity providing more security than personalized accounts. More information regarding service principal and how to use them for authentication can be found in these instructions.
-
Create a service principal in your workspace under Settings > Identity and Access.
-
Generate and save the client ID and the client secret for the service principal.
-
Grant the required permissions for the catalog, the schema, and the table to the service principal:
- In the Service Principal page, open the Configurations tab.
- Copy the Application Id (UUID).
- Use the following SQL, replacing the example UUID and username, schema name, and table names if required:
SQLGRANT USE CATALOG ON CATALOG <username> TO `<UUID>`;
GRANT USE SCHEMA ON SCHEMA <username.default> TO `<UUID>`;
GRANT MODIFY, SELECT ON TABLE <username.default.table_name> TO `<UUID>`;
Endpoint formats
The Zerobus server endpoint and workspace URL formats differ by cloud provider:
Cloud | Server endpoint | Workspace URL |
|---|---|---|
AWS |
|
|
Azure |
|
|
Example for AWS:
Server endpoint: 1234567890123456.zerobus.us-west-2.cloud.databricks.com
Workspace URL: https://dbc-a1b2c3d4-e5f6.cloud.databricks.com
Example for Azure:
Server endpoint: 1234567890123456.zerobus.eastus.azuredatabricks.net
Workspace URL: https://adb-1234567890123456.12.azuredatabricks.net
Write a client
- Python SDK
- Rust SDK
- Java SDK
- Go SDK
- TypeScript SDK
Python 3.9 or higher is required. The SDK supports JSON (simplest) and Protocol Buffers (recommended for production).
pip install databricks-zerobus-ingest-sdk
JSON example:
from zerobus.sdk.sync import ZerobusSdk
from zerobus.sdk.shared import RecordType, StreamConfigurationOptions, TableProperties
# Configuration - see "Before you begin" section for how to obtain these values.
server_endpoint = "1234567890123456.zerobus.us-west-2.cloud.databricks.com"
workspace_url = "https://dbc-a1b2c3d4-e5f6.cloud.databricks.com"
table_name = "main.default.air_quality"
client_id = "your-service-principal-application-id"
client_secret = "your-service-principal-secret"
sdk = ZerobusSdk(server_endpoint, workspace_url)
table_properties = TableProperties(table_name)
options = StreamConfigurationOptions(record_type=RecordType.JSON)
stream = sdk.create_stream(client_id, client_secret, table_properties, options)
try:
for i in range(100):
record = {"device_name": f"sensor-{i}", "temp": 22, "humidity": 55}
ack = stream.ingest_record(record) # Pass dict directly, SDK handles serialization.
ack.wait_for_ack()
finally:
stream.close()
Acknowledgment callback: To track ingestion progress asynchronously, use the ack_callback option. The callback receives an IngestRecordResponse object with a durability_ack_up_to_offset attribute (int) indicating all records up to that offset have been durably written:
from zerobus.sdk.shared import IngestRecordResponse
def on_ack(response: IngestRecordResponse) -> None:
print(f"Records acknowledged up to offset: {response.durability_ack_up_to_offset}")
options = StreamConfigurationOptions(
record_type=RecordType.JSON,
ack_callback=on_ack
)
Protocol Buffers: For type-safe ingestion, use Protocol Buffers with RecordType.PROTO (default). Generate a schema from your table using python -m zerobus.tools.generate_proto.
For complete documentation, configuration options, async API, and Protocol Buffer examples, see the Python SDK repository.
Rust 1.70 or higher is required. The SDK supports JSON (simplest) and Protocol Buffers (recommended for production).
cargo add databricks-zerobus-ingest-sdk
cargo add tokio --features macros,rt-multi-thread
JSON example:
use std::error::Error;
use databricks_zerobus_ingest_sdk::{
RecordType, StreamConfigurationOptions, TableProperties, ZerobusSdk
};
// Configuration - see "Before you begin" section for how to obtain these values.
const ZEROBUS_ENDPOINT: &str = "workspace-id.zerobus.region.cloud.databricks.com";
const WORKSPACE_URL: &str = "https://your-workspace.cloud.databricks.com";
const TABLE_NAME: &str = "catalog.schema.air_quality";
const CLIENT_ID: &str = "your-client-id";
const CLIENT_SECRET: &str = "your-client-secret";
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let table_properties = TableProperties {
table_name: TABLE_NAME.to_string(),
descriptor_proto: None,
};
let options = StreamConfigurationOptions {
record_type: RecordType::Json,
..Default::default()
};
let sdk = ZerobusSdk::new(ZEROBUS_ENDPOINT.to_string(), WORKSPACE_URL.to_string())?;
let mut stream = sdk.create_stream(
table_properties, CLIENT_ID.to_string(), CLIENT_SECRET.to_string(), Some(options)
).await?;
for i in 0..100 {
let record = format!(r#"{{"device_name": "sensor-{}", "temp": 22, "humidity": 55}}"#, i);
let ack = stream.ingest_record(record.into_bytes()).await?;
ack.await?;
}
stream.close().await?;
Ok(())
}
Protocol Buffers: For type-safe ingestion, use Protocol Buffers with RecordType::Proto (default). Generate a schema using the tools/generate_files tool in the repository.
For complete documentation, configuration options, batch ingestion, and Protocol Buffer examples, see the Rust SDK repository.
Java 8 or higher is required. The SDK uses Protocol Buffers for type-safe schema definition.
Maven:
<dependency>
<groupId>com.databricks</groupId>
<artifactId>zerobus-ingest-sdk</artifactId>
<version>0.1.0</version>
</dependency>
Protocol Buffer setup:
Generate a proto schema from your table, then compile it:
# Generate proto from Unity Catalog table schema.
java -jar zerobus-ingest-sdk-0.1.0-jar-with-dependencies.jar \
--uc-endpoint "https://dbc-a1b2c3d4-e5f6.cloud.databricks.com" \
--client-id "<client-id>" --client-secret "<client-secret>" \
--table "<catalog.schema.table>" --output "record.proto"
# Compile proto to Java.
protoc --java_out=src/main/java record.proto
Example:
import com.databricks.zerobus.*;
import com.example.proto.Record.AirQuality;
public class ZerobusClient {
public static void main(String[] args) throws Exception {
// Configuration - see "Before you begin" section for how to obtain these values.
ZerobusSdk sdk = new ZerobusSdk(
"1234567890123456.zerobus.us-west-2.cloud.databricks.com",
"https://dbc-a1b2c3d4-e5f6.cloud.databricks.com"
);
TableProperties<AirQuality> tableProperties = new TableProperties<>(
"main.default.air_quality",
AirQuality.getDefaultInstance()
);
ZerobusStream<AirQuality> stream = sdk.createStream(
tableProperties, "your-client-id", "your-client-secret"
).join();
try {
for (int i = 0; i < 100; i++) {
AirQuality record = AirQuality.newBuilder()
.setDeviceName("sensor-" + i)
.setTemp(22)
.setHumidity(55)
.build();
stream.ingestRecord(record).join();
}
} finally {
stream.close();
}
}
}
For complete documentation, configuration options, non-blocking ingestion, and logging setup, see the Java SDK repository.
Go 1.21 or higher is required. The SDK supports JSON (simplest) and Protocol Buffers (recommended for production).
go get github.com/databricks/zerobus-go-sdk
JSON example:
package main
import (
"fmt"
"log"
zerobus "github.com/databricks/zerobus-go-sdk/sdk"
)
func main() {
// Configuration - see "Before you begin" section for how to obtain these values.
sdk, _ := zerobus.NewZerobusSdk(
"1234567890123456.zerobus.us-west-2.cloud.databricks.com",
"https://dbc-a1b2c3d4-e5f6.cloud.databricks.com",
)
defer sdk.Free()
options := zerobus.DefaultStreamConfigurationOptions()
options.RecordType = zerobus.RecordTypeJson
stream, _ := sdk.CreateStream(
zerobus.TableProperties{TableName: "main.default.air_quality"},
"your-client-id", "your-client-secret", options,
)
defer stream.Close()
for i := 0; i < 100; i++ {
record := fmt.Sprintf(`{"device_name": "sensor-%d", "temp": 22, "humidity": 55}`, i)
ack, _ := stream.IngestRecord(record)
ack.Await()
}
stream.Flush()
}
Protocol Buffers: For type-safe ingestion, use Protocol Buffers with RecordTypeProto (default) and provide a DescriptorProto in TableProperties.
For complete documentation, configuration options, and Protocol Buffer examples, see the Go SDK repository.
Node.js 16 or higher is required. The SDK supports JSON (simplest) and Protocol Buffers (recommended for production).
npm install @databricks/zerobus-ingest-sdk
JSON example:
import { ZerobusSdk, RecordType } from '@databricks/zerobus-ingest-sdk';
// Configuration - see "Before you begin" section for how to obtain these values.
const sdk = new ZerobusSdk(
'1234567890123456.zerobus.us-west-2.cloud.databricks.com',
'https://dbc-a1b2c3d4-e5f6.cloud.databricks.com',
);
const stream = await sdk.createStream(
{ tableName: 'main.default.air_quality' },
'your-client-id',
'your-client-secret',
{ recordType: RecordType.Json },
);
try {
for (let i = 0; i < 100; i++) {
const record = { device_name: `sensor-${i}`, temp: 22, humidity: 55 };
await stream.ingestRecord(record);
}
await stream.flush();
} finally {
await stream.close();
}
Protocol Buffers: For type-safe ingestion, use Protocol Buffers with RecordType.Proto (default) and provide a descriptorProto in table properties.
For complete documentation, configuration options, and Protocol Buffer examples, see the TypeScript SDK repository.