Use o Arrow Flight com o Zerobus Ingest
Beta
Este recurso está em versão Beta.
A ingestão de voos Arrow permite enviar dados do Apache Arrow RecordBatch diretamente para o Zerobus Ingest em vez de converter cada linha para JSON ou Protocol Buffers primeiro. É uma terceira opção de formato de registro nos SDKs Zerobus, juntamente com JSON e Protocol Buffers, e sua execução ocorre na mesma conexão gRPC. Ele usa o mesmo endpoint Zerobus, o mesmo fluxo OAuth e a mesma convenção de cabeçalho x-databricks-zerobus-table-name . O protocolo de comunicação é o Arrow Flight DoPut, que transporta mensagens Arrow IPC sobre gRPC.
Quando usar o Arrow Flight
Arrow Flight é a melhor opção nos seguintes cenários:
- Seu aplicativo já produz dados Arrow, como
pyarrow.Tableoupyarrow.RecordBatch(Python),arrow_array::RecordBatchdos crates arrow-rs (Rust) ouVectorSchemaRoot(Java). Bibliotecas DataFrame construídas sobre Arrow — como Polars ou DataFusion — se encaixam naturalmente nesse caminho. - Você ingere linhas em lotes, em vez de enviar um registro por vez.
- Seu esquema é amplo, envolve muitos dados numéricos ou é orientado para análises, onde a serialização linha por linha adiciona uma sobrecarga de CPU considerável.
- Você está criando coletores ou gateways que agregam dados por um curto período e, em seguida, os enviam como lotes em formato de coluna única.
O Arrow Flight geralmente não é a melhor opção para voos com pouco movimento, em que apenas uma fileira de passageiros por vez. Nesses casos, JSON ou Protocol Buffers sobre o caminho gRPC do SDK são normalmente mais simples. Consulte Escolher uma interface.
Como funciona o modelo de ingestão
Com a ingestão do Arrow Flight, uma transmissão grava em uma tabela de destino. Para ingerir dados, siga esta sequência:
- Defina um esquema Arrow que corresponda ao esquema da tabela Delta de destino.
- Abra uma transmissão Zerobus Arrow para essa mesa.
- Enviar cargas úteis
RecordBatch(ouTable). - Aguarde o último deslocamento ou chame
flush()para confirmar a durabilidade. - Feche a transmissão.
Se você usar um SDK Zerobus, o SDK cuidará dos detalhes de baixo nível da fiação do Arrow Flight para você. Ele serializa seus dados Arrow para o formato IPC e divide automaticamente lotes muito grandes em várias mensagens Flight. O servidor confirma cada lote de forma duradoura, e o SDK apresenta essas confirmações como deslocamentos lógicos de lotes.
Semelhante à regra de esquemaProtobuf , o esquema que você passa para a transmissão deve corresponder à tabela Delta de destino em uma proporção de 1:1. Seu esquema pode omitir colunas anuláveis que existam na tabela Delta (isso é tratado como uma alteração de esquema não incompatível), mas qualquer outra incompatibilidade será rejeitada.
Cada linha individual dentro de um RecordBatch deve caber dentro do limite de tamanho de mensagem gRPC de 10 MB. O Arrow Flight divide automaticamente lotes muito grandes em várias mensagens transmitidas, portanto, lotes grandes não são um problema — mas uma única linha cujo tamanho serializado exceda o limite não pode ser dividida e é rejeitada. Taxa de transferência, latência, cota e limites de tabela particionada também se aplicam ao Arrow Flight, uma vez que ele é executado no mesmo transporte gRPC. Consulte as limitações do conector Zerobus Ingest.
Escreva para um cliente
Os exemplos abaixo abrem uma transmissão Arrow Flight contra a mesma tabela air_quality usada nos exemplos do conector Use the Zerobus Ingest . Eles são mostrados em Python e Rust por brevidade, mas o mesmo construtor, opções de configuração e sequência de chamadas estão disponíveis em todos os SDK da Zerobus. Adapte a sintaxe para o seu idioma e consulte o repositório do SDK para obter informações sobre os tipos Arrow específicos de cada idioma.
- Python SDK
- Rust SDK
O SDK Python aceita um pyarrow.Schema no momento da criação da transmissão e um pyarrow.RecordBatch ou pyarrow.Table para cada chamada de ingestão.
pip install "databricks-zerobus-ingest-sdk[arrow]" pyarrow
import pyarrow as pa
from zerobus.sdk.sync import ZerobusSdk
# See "Get your workspace URL and Zerobus Ingest endpoint" in zerobus-ingest.md.
SERVER_ENDPOINT = "https://1234567890123456.zerobus.us-west-2.cloud.databricks.com"
DATABRICKS_WORKSPACE_URL = "https://dbc-a1b2c3d4-e5f6.cloud.databricks.com"
TABLE_NAME = "main.default.air_quality"
CLIENT_ID = "your-client-id"
CLIENT_SECRET = "your-client-secret"
schema = pa.schema(
[
("device_name", pa.large_utf8()),
("temp", pa.int32()),
("humidity", pa.int64()),
]
)
sdk = ZerobusSdk(SERVER_ENDPOINT, DATABRICKS_WORKSPACE_URL)
stream = sdk.create_arrow_stream(TABLE_NAME, schema, CLIENT_ID, CLIENT_SECRET)
row_count = 1_000
batch = pa.record_batch(
{
"device_name": [f"sensor-{i}" for i in range(row_count)],
"temp": [20 + (i % 5) for i in range(row_count)],
"humidity": [55 + (i % 10) for i in range(row_count)],
},
schema=schema,
)
try:
offset = stream.ingest_batch(batch)
stream.wait_for_offset(offset)
finally:
stream.close()
stream.ingest_batch() também aceita um pyarrow.Table. O SDK converte-o internamente em um único RecordBatch antes de enviar. Cada chamada retorna um deslocamento lógico. stream.wait_for_offset(offset) blocos até que o servidor tenha persistido de forma duradoura esses lotes.
O SDK Rust expõe o Arrow Flight através da API stream_builder() , por trás do recurso arrow-flight Cargo. Use a mesma versão principal do Arrow que o SDK para que os tipos RecordBatch e array correspondam em tempo de compilação.
cargo add databricks-zerobus-ingest-sdk --features arrow-flight
cargo add arrow-array
cargo add arrow-schema
cargo add tokio --features macros,rt-multi-thread
use std::sync::Arc;
use arrow_array::{Int32Array, Int64Array, LargeStringArray, RecordBatch};
use arrow_schema::{DataType, Field, Schema as ArrowSchema};
use databricks_zerobus_ingest_sdk::ZerobusSdk;
const SERVER_ENDPOINT: &str = "https://1234567890123456.zerobus.us-west-2.cloud.databricks.com";
const DATABRICKS_WORKSPACE_URL: &str = "https://dbc-a1b2c3d4-e5f6.cloud.databricks.com";
const TABLE_NAME: &str = "main.default.air_quality";
const CLIENT_ID: &str = "your-client-id";
const CLIENT_SECRET: &str = "your-client-secret";
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let schema = Arc::new(ArrowSchema::new(vec![
Field::new("device_name", DataType::LargeUtf8, false),
Field::new("temp", DataType::Int32, false),
Field::new("humidity", DataType::Int64, false),
]));
let sdk = ZerobusSdk::builder()
.endpoint(SERVER_ENDPOINT)
.unity_catalog_url(DATABRICKS_WORKSPACE_URL)
.build()?;
let mut stream = sdk
.stream_builder()
.table(TABLE_NAME)
.oauth(CLIENT_ID, CLIENT_SECRET)
.arrow(Arc::clone(&schema))
.build_arrow()
.await?;
let row_count: i32 = 1_000;
let batch = RecordBatch::try_new(
Arc::clone(&schema),
vec![
Arc::new(LargeStringArray::from(
(0..row_count)
.map(|i| format!("sensor-{i}"))
.collect::<Vec<_>>(),
)),
Arc::new(Int32Array::from(
(0..row_count).map(|i| 20 + (i % 5)).collect::<Vec<_>>(),
)),
Arc::new(Int64Array::from(
(0..row_count)
.map(|i| 55 + (i % 10) as i64)
.collect::<Vec<_>>(),
)),
],
)?;
let offset = stream.ingest_batch(batch).await?;
stream.wait_for_offset(offset).await?;
stream.close().await?;
Ok(())
}
O construtor seleciona o formato Arrow Flight com .arrow(schema) e finaliza a transmissão com .build_arrow(), que retorna um ZerobusArrowStream. JSON e Protocol Buffers continuam a usar .json() / .compiled_proto(...) e .build().
Compressão IPC
Os dados de carga útil do Arrow IPC podem ser comprimidos no fio. O SDK aceita dois codecs de compressão.
LZ4_FRAMERápido, baixo consumo de CPU, taxa de compressão moderada.ZSTDTaxa de compressão mais alta, mais CPU por lote.
Habilite a compactação somente quando a largura de banda da rede limitar Taxa de transferência. A compressão reduz a quantidade de bytes transmitidos pela rede, mas aumenta o custo de processamento da CPU no cliente.
No SDK do Python, defina o campo ipc_compression em ArrowStreamConfigurationOptions:
from zerobus.sdk.shared.arrow import IPCCompression, ArrowStreamConfigurationOptions
options = ArrowStreamConfigurationOptions(ipc_compression=IPCCompression.ZSTD)
No SDK do Rust, configure-o no construtor. O enum CompressionType reside no crate arrow-ipc , portanto, adicione-o como uma dependência:
cargo add arrow-ipc
use arrow_ipc::CompressionType;
let stream = sdk
.stream_builder()
.table(TABLE_NAME)
.oauth(CLIENT_ID, CLIENT_SECRET)
.arrow(schema)
.ipc_compression(Some(CompressionType::ZSTD))
.build_arrow()
.await?;
Melhores práticas
Siga estas orientações para obter o melhor desempenho e confiabilidade da ingestão de dados do Arrow Flight.
- Reutilizar uma transmissão por muitos lotes em vez de abrir uma nova transmissão por lotes. A criação de transmissões acarreta custos indiretos significativos que podem ser amortizados reutilizando uma transmissão em vários lotes.
- Enviar várias linhas por lote. Comece com lotes do tamanho ideal para a aplicação, e não com uma linha por chamada. Enviar uma linha de cada vez funciona, mas anula a maior parte da vantagem de desempenho de usar o Arrow.
- Chame
flush()em pontos de verificação controlados. Isso proporciona um limite de durabilidade claro para um grupo de lotes, sem bloquear cada um individualmente. - Habilite a compressão IPC somente quando a carga de trabalho estiver limitada pela rede. Veja compressão IPC.
- Use a opção Arrow Flight quando seu produtor já for colunar. Se os seus dados de origem forem naturalmente orientados a linhas e pequenos, usar o conector Zerobus Ingest com JSON ou Protocol Buffers costuma ser mais simples. Consulte Usar o conector Zerobus Ingest.
Tratamento e recuperação de erros
As transmissões do Arrow Flight usam as mesmas categorias de erro gRPC que o restante do Zerobus Ingest. Para obter informações sobre códigos de erro, orientações sobre como tentar novamente e a taxonomia completa de cliente versus servidor, consulte Tratamento de erros do Zerobus Ingest.
Ao configurar o SDK com recuperação automática (o default), ele se reconecta e reproduz lotes não reconhecidos de forma transparente em caso de falhas transitórias. Após o encerramento da transmissão, você poderá recuperar quaisquer lotes que o servidor tenha recebido, mas ainda não tenha confirmado. Isso se aplica tanto se a transmissão foi encerrada normalmente quanto devido a uma falha irrecuperável. No SDK do Python:
# Retry unacked_batches against a freshly created stream
if stream.is_closed:
unacked_batches = stream.get_unacked_batches()
No SDK Rust, chame stream.get_unacked_batches().await? para recuperar lotes não reconhecidos para nova tentativa.
Próximos passos
- Utilize o conector Zerobus Ingest: Se você ainda não configurou o Zerobus Ingest, comece aqui para obter instruções sobre como encontrar o URL do seu workspace , criar a tabela Delta de destino e configurar uma entidade de serviço. Esses passos são comuns a todos os formatos de gravação.
- Limitações do conector Zerobus Ingest: revise as quotas do Zerobus antes de implantá-lo em produção. Taxas de transferência, latência e limites de tabelas particionadas se aplicam ao Arrow Flight.
- Tratamento de erros do Zerobus Ingest: Consulte esta página para obter uma lista completa dos códigos de erro gRPC e o comportamento recomendado de repetição e recuperação para o seu cliente.