Skip to main content

Use Arrow Flight with Zerobus Ingest

Beta

This feature is in Beta.

Arrow Flight ingestion lets you send Apache Arrow RecordBatch data directly to Zerobus Ingest instead of converting every row to JSON or Protocol Buffers first. It is a third record format option on the Zerobus SDKs, alongside JSON and Protocol Buffers, and it runs over the same gRPC connection. It uses the same Zerobus endpoint, the same OAuth flow, and the same x-databricks-zerobus-table-name header convention. The wire protocol is Arrow Flight DoPut, which carries Arrow IPC messages over gRPC.

When to use Arrow Flight

Arrow Flight is the best fit in the following scenarios:

  • Your application already produces Arrow data, such as pyarrow.Table or pyarrow.RecordBatch (Python), arrow_array::RecordBatch from the arrow-rs crates (Rust), or VectorSchemaRoot (Java). DataFrame libraries built on Arrow — such as Polars or DataFusion — fit naturally into this path.
  • You ingest rows in batches instead of sending one record at a time.
  • Your schema is wide, numeric-heavy, or analytics-oriented, where row-by-row serialization adds noticeable CPU overhead.
  • You are building collectors or gateways that aggregate data for a short interval and then send it as one column format batch.

Arrow Flight is usually not the best choice for sparse, one-row-at-a-time traffic. In those cases, JSON or Protocol Buffers over the SDK gRPC path are typically simpler. See Choose an interface.

How the ingestion model works

With Arrow Flight ingestion, one stream writes to one target table. To ingest data, follow this sequence:

  1. Define an Arrow schema that matches the destination Delta table schema.
  2. Open a Zerobus Arrow stream for that table.
  3. Send RecordBatch (or Table) payloads.
  4. Wait for the last offset or call flush() to confirm durability.
  5. Close the stream.

If you use a Zerobus SDK, the SDK handles the low-level Arrow Flight wire details for you. It serializes your Arrow data to IPC format and automatically splits oversized batches across multiple Flight messages. The server confirms each batch durably, and the SDK surfaces those confirmations as logical batch offsets.

Similar to the Protobuf schema rule, the schema you pass to the stream must match the target Delta table 1:1. Your schema may omit nullable columns that exist in the Delta table (this is treated as a non-breaking schema change), but any other mismatch is rejected.

Each individual row inside a RecordBatch must fit within the 10 MB gRPC message size limit. Arrow Flight automatically splits oversized batches into multiple wire messages, so large batches are not a problem — but a single row whose serialized size exceeds the limit cannot be split and is rejected. Throughput, latency, quota, and partitioned-table limits also apply to Arrow Flight since it runs over the same gRPC transport. See Zerobus Ingest connector limitations.

Write a client

The examples below open an Arrow Flight stream against the same air_quality table used in the Use the Zerobus Ingest connector examples. They are shown in Python and Rust for brevity, but the same builder, configuration options, and call sequence are available in every Zerobus SDK. Adapt the syntax for your language and consult the SDK repository for language-specific Arrow types.

The Python SDK accepts a pyarrow.Schema at stream creation time and a pyarrow.RecordBatch or pyarrow.Table for each ingest call.

Bash
pip install "databricks-zerobus-ingest-sdk[arrow]" pyarrow
Python
import pyarrow as pa

from zerobus.sdk.sync import ZerobusSdk

# See "Get your workspace URL and Zerobus Ingest endpoint" in zerobus-ingest.md.
SERVER_ENDPOINT = "https://1234567890123456.zerobus.us-west-2.cloud.databricks.com"
DATABRICKS_WORKSPACE_URL = "https://dbc-a1b2c3d4-e5f6.cloud.databricks.com"
TABLE_NAME = "main.default.air_quality"
CLIENT_ID = "your-client-id"
CLIENT_SECRET = "your-client-secret"

schema = pa.schema(
[
("device_name", pa.large_utf8()),
("temp", pa.int32()),
("humidity", pa.int64()),
]
)

sdk = ZerobusSdk(SERVER_ENDPOINT, DATABRICKS_WORKSPACE_URL)

stream = sdk.create_arrow_stream(TABLE_NAME, schema, CLIENT_ID, CLIENT_SECRET)

row_count = 1_000
batch = pa.record_batch(
{
"device_name": [f"sensor-{i}" for i in range(row_count)],
"temp": [20 + (i % 5) for i in range(row_count)],
"humidity": [55 + (i % 10) for i in range(row_count)],
},
schema=schema,
)

try:
offset = stream.ingest_batch(batch)
stream.wait_for_offset(offset)
finally:
stream.close()

stream.ingest_batch() also accepts a pyarrow.Table. The SDK converts it to a single RecordBatch internally before sending. Each call returns a logical offset. stream.wait_for_offset(offset) blocks until the server has durably persisted that batch.

IPC compression

Arrow IPC payloads can be compressed on the wire. The SDK accepts two compression codecs.

  • LZ4_FRAME: Fast, low CPU overhead, modest compression ratio.
  • ZSTD: Higher compression ratio, more CPU per batch.

Enable compression only when network bandwidth limits throughput. Compression reduces bytes on the wire but adds CPU cost on the client.

In the Python SDK, set the ipc_compression field on ArrowStreamConfigurationOptions:

Python
from zerobus.sdk.shared.arrow import IPCCompression, ArrowStreamConfigurationOptions

options = ArrowStreamConfigurationOptions(ipc_compression=IPCCompression.ZSTD)

In the Rust SDK, set it on the builder. The CompressionType enum lives in the arrow-ipc crate, so add it as a dependency:

Bash
cargo add arrow-ipc
Rust
use arrow_ipc::CompressionType;

let stream = sdk
.stream_builder()
.table(TABLE_NAME)
.oauth(CLIENT_ID, CLIENT_SECRET)
.arrow(schema)
.ipc_compression(Some(CompressionType::ZSTD))
.build_arrow()
.await?;

Best practices

Follow these guidelines to get the best performance and reliability from Arrow Flight ingestion.

  • Reuse a stream for many batches instead of opening a new stream per batch. Stream creation carries a significant overhead that you can amortize by reusing a stream across many batches.
  • Send multiple rows per batch. Start with natural application-sized batches, not one row per call. Sending one row at a time works, but negates most of the performance advantage of using Arrow.
  • Call flush() at controlled checkpoints. This gives you a clear durability boundary for a group of batches without blocking on every single one.
  • Enable IPC compression only when the workload is network-bound. See IPC compression.
  • Use Arrow Flight when your producer is already columnar. If your source data is naturally row-oriented and small, using the Zerobus Ingest connector with JSON or Protocol Buffers is often simpler. See Use the Zerobus Ingest connector.

Error handling and recovery

Arrow Flight streams use the same gRPC error categories as the rest of Zerobus Ingest. For error codes, retry guidance, and the full client-vs-server taxonomy, see Zerobus Ingest Error Handling.

When you configure the SDK with automatic recovery (the default), it transparently reconnects and replays unacknowledged batches on transient failures. After the stream closes, you can retrieve any batches that the server received but did not yet acknowledge. This applies whether the stream closed gracefully or due to an unrecoverable failure. In the Python SDK:

Python
# Retry unacked_batches against a freshly created stream
if stream.is_closed:
unacked_batches = stream.get_unacked_batches()

In the Rust SDK, call stream.get_unacked_batches().await? to retrieve unacknowledged batches for retry.

Next steps

  • Use the Zerobus Ingest connector: If you haven't set up Zerobus Ingest yet, start here for instructions on finding your workspace URL, creating the target Delta table, and configuring a service principal. These steps are shared across all record formats.
  • Zerobus Ingest connector limitations: Review Zerobus quotas before deploying to production. Throughput, latency, and partitioned-table limits all apply to Arrow Flight.
  • Zerobus Ingest Error Handling: Consult this page for a full list of gRPC error codes and recommended retry and recovery behavior for your client