Use the Zerobus Ingest connector
The Zerobus Ingest connector is in Public Preview. To try it, contact your Databricks account representative.
This page describes how to ingest data using the Zerobus direct write connector in Lakeflow Connect.
Get your workspace URL
When viewing your Databricks workspace after logging in, take a look at the URL in your browser with the following format: https://<databricks-instance>.com/o=XXXXX. The URL consists of everything before the /o=XXXXX, for example:
Full URL: https://abcd-teste2-test-spcse2.cloud.databricks.com/?o=2281745829657864#
Workspace URL: https://abcd-teste2-test-spcse2.cloud.databricks.com
Create or identify the target table
Identify the target table that you want to ingest into. To create a new target table, run the CREATE TABLE SQL command. For example:
CREATE TABLE unity.default.air_quality (
device_name STRING, temp INT, humidity LONG);
Create a service principal and grant permissions
A service principal is a specialized identity providing more security than personalized accounts. More information regarding service principal and how to use them for authentication can be found in these instructions.
-
Create a service principal in your workspace under Settings > Identity and Access.
-
Generate and save the client ID and the client secret for the service principal.
-
Grant the required permissions for the catalog, the schema, and the table to the service principal:
- In the Service Principal page, open the Configurations tab.
- Copy the Application Id (UUID).
- Use the following SQL, replacing the example UUID and username, schema name, and table names if required:
SQLGRANT USE CATALOG ON CATALOG <username> TO `<UUID>`;
GRANT USE SCHEMA ON SCHEMA <username.default> TO `<UUID>`;
GRANT MODIFY, SELECT ON TABLE <username.default.table_name> TO `<UUID>`;
Write a client
- Python SDK
- Rust SDK
- Java SDK
Python 3.9 or higher is required.
-
Install the Python SDK.
Bashpip install databricks-zerobus-ingest-sdkAlternatively, install from source:
Bashgit clone https://github.com/databricks/zerobus-sdk-py.git
cd zerobus-sdk-py
pip install -e . -
Generate a Protocol Buffer definition.
Automatically generate a proto definition from your Unity Catalog table schema using the
generate_prototool:Bashpython -m zerobus.tools.generate_proto \
--uc-endpoint "https://dbc-a1b2c3d4-e5f6.cloud.databricks.com" \
--client-id "<service-principal-application-id>" \
--client-secret "<service-principal-secret>" \
--table "<catalog.schema.table_name>" \
--output "record.proto"Example output for a table:
SQLCREATE TABLE main.default.air_quality (
device_name STRING,
temp INT,
humidity BIGINT
)
USING DELTA;Generates
record.proto:Protosyntax = "proto2";
message air_quality {
optional string device_name = 1;
optional int32 temp = 2;
optional int64 humidity = 3;
}You can learn more about Protobuf messages here.
-
Compile the Protocol Buffer definition.
Install
grpcio-toolsand compile your proto file:Bashpip install "grpcio-tools>=1.60.0,<2.0"
python -m grpc_tools.protoc --python_out=. --proto_path=. record.protoThis generates a
record_pb2.pyfile containing the compiled protocol buffer definition. -
Create a Python client.
To use the SDK, you need the following information:
- Databricks workspace URL
- Workspace ID (found in your workspace URL after
/o=) - Table name
- Service principal client ID and client secret
- Zerobus endpoint in the format
https://<workspace_id>.zerobus.<region>.cloud.databricks.com
Synchronous example:
Pythonimport logging
from zerobus.sdk.sync import ZerobusSdk
from zerobus.sdk.shared import TableProperties
import record_pb2
# Configure logging (optional).
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
# Configuration.
server_endpoint = "1234567890123456.zerobus.us-west-2.cloud.databricks.com"
workspace_url = "https://dbc-a1b2c3d4-e5f6.cloud.databricks.com"
table_name = "main.default.air_quality"
client_id = "your-service-principal-application-id"
client_secret = "your-service-principal-secret"
# Initialize SDK.
sdk = ZerobusSdk(server_endpoint, workspace_url)
# Configure table properties.
table_properties = TableProperties(
table_name,
record_pb2.AirQuality.DESCRIPTOR
)
# Create stream.
stream = sdk.create_stream(
client_id,
client_secret,
table_properties
)
try:
# Ingest records.
for i in range(100):
record = record_pb2.AirQuality(
device_name=f"sensor-{i % 10}",
temp=20 + (i % 15),
humidity=50 + (i % 40)
)
ack = stream.ingest_record(record)
ack.wait_for_ack() # Wait for durability.
print(f"Ingested record {i + 1}")
print("Successfully ingested 100 records!")
finally:
stream.close()
Configuration options
The SDK supports various configuration options via StreamConfigurationOptions:
Option | Default | Description |
|---|---|---|
max_inflight_records | 50000 | Maximum number of unacknowledged records |
recovery | True | Enable automatic stream recovery |
recovery_timeout_ms | 15000 | Timeout for recovery operations (ms) |
recovery_backoff_ms | 2000 | Delay between recovery attempts (ms) |
recovery_retries | 3 | Maximum number of recovery attempts |
flush_timeout_ms | 300000 | Timeout for flush operations (ms) |
server_lack_of_ack_timeout_ms | 60000 | Server acknowledgment timeout (ms) |
ack_callback | None | Callback invoked on record acknowledgment |
Error handling
The SDK provides two exception types:
ZerobusException: Retriable errors (network issues, temporary failures)NonRetriableException: Non-retriable errors (invalid credentials, missing table)
from zerobus.sdk.shared import ZerobusException, NonRetriableException
try:
stream.ingest_record(record)
except NonRetriableException as e:
print(f"Fatal error: {e}")
raise
except ZerobusException as e:
print(f"Retriable error: {e}")
# Implement retry logic with backoff.
-
Clone the Zerobus SDK repository (needed for schema generation tool and examples).
Bashgit clone https://github.com/databricks/zerobus-sdk-rs.git
cd zerobus-sdk-rsYou should see something like this:
Textzerobus-sdk-rs/
├── sdk/ # Core Zerobus SDK library
│ ├── src/ # Core SDK logic
│ ├── zerobus_service.proto # gRPC protocol definition
│ ├── build.rs # Build script for protobuf compilation
│ └── Cargo.toml
│
├── tools/
│ └── generate_files/ # Schema generation CLI tool
│ ├── src/ # Unity Catalog -> Proto conversion
│ ├── README.md # Tool documentation
│ └── Cargo.toml
│
├── examples/
│ └── basic_example/ # Working example application
│ ├── README.md # Example documentation
│ ├── src/main.rs # Example usage code
│ ├── output/ # Generated schema files
│ │ ├── orders.proto
│ │ ├── orders.rs
│ │ └── orders.descriptor
│ └── Cargo.toml
│
└── README.md # Main README file -
Create your Rust project.
Bashcd .. # Go back to your workspace directory
cargo new example_project
cd example_projectYou should get something like the following:
Textexample_project/
├── Cargo.toml
└── src/
└── main.rs -
Add dependencies to your
Cargo.toml:Toml[dependencies]
zerobus-ingest-sdk = "0.1.0" # From crates.io
prost = "0.13.3"
prost-types = "0.13.3"
tokio = { version = "1.42.0", features = ["macros", "rt-multi-thread"] } -
Run the schema generation tool to generate protobuf files needed for ingestion. This tool helps generate the proto definition for your table, along with a descriptor and the corresponding Rust file.
Bash# From your example_project directory
cd ../zerobus-sdk-rs/tools/generate_files
cargo run --
--uc-endpoint "https://your-workspace.cloud.databricks.com"
--client-id "your-client-id"
--client-secret "your-client-secret"
--table "catalog.schema.table"
--output-dir "../../output"This will create an
outputfolder in your project with 3 files inside:Textexample_project/
├── Cargo.toml
├── output/
│ ├── <table_name>.rs
│ ├── <table_name>.proto
│ └── <table_name>.descriptor
└── src/
└── main.rs -
Create your client application.
To use the SDK, you need the following information:
- Databricks workspace URL (e.g.,
https://your-workspace.cloud.databricks.com) - Table name (e.g.,
catalog.schema.table) - Service principal client ID and secret
- Zerobus endpoint in the format
https://<workspace_id>.zerobus.<region>.cloud.databricks.com
Example with a mock table
air_quality:After generating all of the files, your file tree looks like this:
Textexample_project/
├── Cargo.toml
├── output/
│ ├── air_quality.rs
│ ├── air_quality.proto
│ └── air_quality.descriptor
└── src/
└── main.rsair_quality.protoProtosyntax = "proto2";
message table_AirQuality {
optional string device_name = 1;
optional int32 temp = 2;
optional int64 humidity = 3;
}air_quality.rs(auto-generated)Rust// @generated by prost-build
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct table_AirQuality {
#[prost(string, optional, tag = "1")]
pub device_name: Option<String>,
#[prost(int32, optional, tag = "2")]
pub temp: Option<i32>,
#[prost(int64, optional, tag = "3")]
pub humidity: Option<i64>,
}main.rs(your application code):Rustuse std::error::Error;
use databricks_zerobus_ingest_sdk::{
StreamConfigurationOptions, TableProperties, ZerobusSdk
};
use prost_types::DescriptorProto;
use std::fs;
use prost::Message;
// Change constants to match your setup
const DATABRICKS_WORKSPACE_URL: &str = "https://your-workspace.cloud.databricks.com";
const TABLE_NAME: &str = "catalog.schema.air_quality";
const DATABRICKS_CLIENT_ID: &str = "your-client-id";
const DATABRICKS_CLIENT_SECRET: &str = "your-client-secret";
const ZEROBUS_ENDPOINT: &str = "https://workspace-id.zerobus.region.cloud.databricks.com";
pub mod air_quality {
include!("../output/air_quality.rs");
}
use crate::air_quality::table_AirQuality;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let descriptor_proto = load_descriptor_proto(
"output/air_quality.descriptor",
"air_quality.proto",
"table_AirQuality"
);
let table_properties = TableProperties {
table_name: TABLE_NAME.to_string(),
descriptor_proto,
};
let stream_configuration_options = StreamConfigurationOptions {
max_inflight_records: 1000,
..Default::default()
};
let sdk = ZerobusSdk::new(
ZEROBUS_ENDPOINT.to_string(),
DATABRICKS_WORKSPACE_URL.to_string(),
)?;
let mut stream = sdk
.create_stream(
table_properties,
DATABRICKS_CLIENT_ID.to_string(),
DATABRICKS_CLIENT_SECRET.to_string(),
Some(stream_configuration_options),
)
.await?;
println!("Stream created successfully!");
let record = table_AirQuality {
device_name: Some("sensor-001".to_string()),
temp: Some(22),
humidity: Some(55),
};
let ack_future = stream
.ingest_record(record.encode_to_vec())
.await?;
println!("Record submitted for ingestion");
let offset_id = ack_future.await?;
println!("Record acknowledged at offset: {}", offset_id);
stream.close().await?;
println!("Stream closed successfully");
Ok(())
}
fn load_descriptor_proto(
path: &str,
file_name: &str,
message_name: &str
) -> DescriptorProto {
let descriptor_bytes = fs::read(path)
.expect("Failed to read proto descriptor file");
let file_descriptor_set = prost_types::FileDescriptorSet::decode(
descriptor_bytes.as_ref()
).unwrap();
let file_descriptor_proto = file_descriptor_set
.file
.into_iter()
.find(|f| f.name.as_ref().map(|n| n.as_str()) == Some(file_name))
.expect("File descriptor not found");
file_descriptor_proto
.message_type
.into_iter()
.find(|m| m.name.as_ref().map(|n| n.as_str()) == Some(message_name))
.expect("Message descriptor not found")
} - Databricks workspace URL (e.g.,
-
Run your application:
Bashcd example_project
cargo runYou should see output like:
TextStream created successfully!
Record submitted for ingestion
Record acknowledged at offset: 0
Stream closed successfully
Java 8 or higher is required.
-
Install the Java SDK.
Option A: Maven
Add the dependency to your
pom.xml:XML<dependency>
<groupId>com.databricks</groupId>
<artifactId>zerobus-ingest-sdk</artifactId>
<version>0.1.0</version>
</dependency>To use the fat JAR with all dependencies bundled:
XML<dependency>
<groupId>com.databricks</groupId>
<artifactId>zerobus-ingest-sdk</artifactId>
<version>0.1.0</version>
<classifier>jar-with-dependencies</classifier>
</dependency>Option B: Build from source
Clone and build the SDK:
Bashgit clone https://github.com/databricks/zerobus-sdk-java.git
cd zerobus-sdk-java
mvn clean packageThis generates two JAR files in the
target/directory:- Fat JAR (recommended):
zerobus-ingest-sdk-0.1.0-jar-with-dependencies.jar(~18MB)- Self-contained with all dependencies bundled
- No additional dependencies required
- Regular JAR:
zerobus-ingest-sdk-0.1.0.jar(~144KB)- SDK classes only
- Requires dependencies on classpath
Dependencies (automatically managed by Maven, or needed if using regular JAR manually):
protobuf-java3.24.0grpc-netty-shaded1.58.0grpc-protobuf1.58.0grpc-stub1.58.0javax.annotation-api1.3.2slf4j-api1.7.36
- Fat JAR (recommended):
-
Generate a Protocol Buffer definition.
Automatically generate a proto definition from your Unity Catalog table schema using the built-in tool.
If using the fat JAR from build:
Bashjava -jar zerobus-ingest-sdk-0.1.0-jar-with-dependencies.jar \
--uc-endpoint "https://dbc-a1b2c3d4-e5f6.cloud.databricks.com" \
--client-id "<service-principal-application-id>" \
--client-secret "<service-principal-secret>" \
--table "<catalog.schema.table_name>" \
--output "record.proto" \
--proto-msg "AirQuality"If using Maven dependency:
Bashmvn dependency:copy-dependencies
java -jar target/dependency/zerobus-ingest-sdk-0.1.0-jar-with-dependencies.jar \
--uc-endpoint "https://dbc-a1b2c3d4-e5f6.cloud.databricks.com" \
--client-id "<service-principal-application-id>" \
--client-secret "<service-principal-secret>" \
--table "<catalog.schema.table_name>" \
--output "record.proto" \
--proto-msg "AirQuality"Example output for a table:
SQLCREATE TABLE main.default.air_quality (
device_name STRING,
temp INT,
humidity BIGINT
)
USING DELTA;Generates
record.proto:Protosyntax = "proto2";
package com.example;
option java_package = "com.example.proto";
option java_outer_classname = "Record";
message AirQuality {
optional string device_name = 1;
optional int32 temp = 2;
optional int64 humidity = 3;
}You can learn more about Protobuf messages here.
-
Compile the Protocol Buffer definition.
Using Maven
Add the protobuf plugin to your
pom.xml:XML<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.7.1</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:3.24.0:exe:${os.detected.classifier}</protocArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>Place your
record.protoinsrc/main/proto/and run:Bashmvn compileUsing protoc directly
Install Protocol Buffers compiler (protoc 3.24.0) and compile:
Bashprotoc --java_out=src/main/java src/main/proto/record.protoThis generates a Java file (for example,
src/main/java/com/example/proto/Record.java). -
Create a Java client.
To use the SDK, you need the following information:
- Databricks workspace URL
- Workspace ID (found in your workspace URL after
/o=) - Table name
- Service principal client ID and client secret
- Zerobus endpoint in the format
https://<workspace_id>.zerobus.<region>.cloud.databricks.com
Blocking ingestion example:
Javapackage com.example;
import com.databricks.zerobus.*;
import com.example.proto.Record.AirQuality;
public class ZerobusClient {
public static void main(String[] args) throws Exception {
// Configuration.
String serverEndpoint = "1234567890123456.zerobus.us-west-2.cloud.databricks.com";
String workspaceUrl = "https://dbc-a1b2c3d4-e5f6.cloud.databricks.com";
String tableName = "main.default.air_quality";
String clientId = "your-service-principal-application-id";
String clientSecret = "your-service-principal-secret";
// Initialize SDK.
ZerobusSdk sdk = new ZerobusSdk(serverEndpoint, workspaceUrl);
// Configure table properties.
TableProperties<AirQuality> tableProperties = new TableProperties<>(
tableName,
AirQuality.getDefaultInstance()
);
// Create stream.
ZerobusStream<AirQuality> stream = sdk.createStream(
tableProperties,
clientId,
clientSecret
).join();
try {
// Ingest records.
for (int i = 0; i < 100; i++) {
AirQuality record = AirQuality.newBuilder()
.setDeviceName("sensor-" + (i % 10))
.setTemp(20 + (i % 15))
.setHumidity(50 + (i % 40))
.build();
stream.ingestRecord(record).join(); // Wait for durability.
System.out.println("Ingested record " + (i + 1));
}
System.out.println("Successfully ingested 100 records!");
} finally {
stream.close();
}
}
}Non-blocking ingestion example:
For high-throughput ingestion:
Javapackage com.example;
import com.databricks.zerobus.*;
import com.example.proto.Record.AirQuality;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
public class NonBlockingClient {
public static void main(String[] args) throws Exception {
// Configuration.
String serverEndpoint = "1234567890123456.zerobus.us-west-2.cloud.databricks.com";
String workspaceUrl = "https://dbc-a1b2c3d4-e5f6.cloud.databricks.com";
String tableName = "main.default.air_quality";
String clientId = "your-service-principal-application-id";
String clientSecret = "your-service-principal-secret";
// Initialize SDK.
ZerobusSdk sdk = new ZerobusSdk(serverEndpoint, workspaceUrl);
// Configure table properties.
TableProperties<AirQuality> tableProperties = new TableProperties<>(
tableName,
AirQuality.getDefaultInstance()
);
// Configure stream options.
StreamConfigurationOptions options = StreamConfigurationOptions.builder()
.setMaxInflightRecords(50000)
.setAckCallback(response ->
System.out.println("Acknowledged offset: " +
response.getDurabilityAckUpToOffset()))
.build();
// Create stream.
ZerobusStream<AirQuality> stream = sdk.createStream(
tableProperties,
clientId,
clientSecret,
options
).join();
List<CompletableFuture<Void>> futures = new ArrayList<>();
try {
// Ingest records.
for (int i = 0; i < 100000; i++) {
AirQuality record = AirQuality.newBuilder()
.setDeviceName("sensor-" + (i % 10))
.setTemp(20 + (i % 15))
.setHumidity(50 + (i % 40))
.build();
futures.add(stream.ingestRecord(record));
}
// Flush and wait for all records to be acknowledged.
stream.flush();
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
System.out.println("Successfully ingested 100,000 records!");
} finally {
stream.close();
}
}
} -
Run your client.
Using Maven:
Bashmvn compile exec:java -Dexec.mainClass="com.example.ZerobusClient"Compile and run manually:
Bashjavac -cp "lib/*" -d out src/main/java/com/example/ZerobusClient.java src/main/java/com/example/proto/Record.java
java -cp "lib/*:out" com.example.ZerobusClient
Configuration options
The SDK supports various configuration options via StreamConfigurationOptions:
Option | Default | Description |
|---|---|---|
maxInflightRecords | 50000 | Maximum number of unacknowledged records |
recovery | true | Enable automatic stream recovery |
recoveryTimeoutMs | 15000 | Timeout for recovery operations (ms) |
recoveryBackoffMs | 2000 | Delay between recovery attempts (ms) |
recoveryRetries | 3 | Maximum number of recovery attempts |
flushTimeoutMs | 300000 | Timeout for flush operations (ms) |
serverLackOfAckTimeoutMs | 60000 | Server acknowledgment timeout (ms) |
ackCallback | null | Callback invoked on record acknowledgment |
Example configuration:
StreamConfigurationOptions options = StreamConfigurationOptions.builder()
.setMaxInflightRecords(10000)
.setRecovery(true)
.setRecoveryTimeoutMs(20000)
.setAckCallback(response ->
System.out.println("Ack: " + response.getDurabilityAckUpToOffset()))
.build();
Logging
The SDK uses SLF4J for logging.
Enable debug logging:
java -Dorg.slf4j.simpleLogger.log.com.databricks.zerobus=debug -cp "lib/*:out" com.example.ZerobusClient
With Maven:
mvn exec:java -Dexec.mainClass="com.example.ZerobusClient" -Dorg.slf4j.simpleLogger.log.com.databricks.zerobus=debug
Available log levels: trace, debug, info, warn, error
Error handling
The SDK provides two exception types:
ZerobusException: Retriable errors (network issues, temporary failures)NonRetriableException: Non-retriable errors (invalid credentials, missing table)
try {
stream.ingestRecord(record);
} catch (NonRetriableException e) {
System.err.println("Fatal error: " + e.getMessage());
throw e;
} catch (ZerobusException e) {
System.err.println("Retriable error: " + e.getMessage());
// Implement retry logic with backoff.
}