Arrow FlightをZerobus Ingestで使用する
ベータ版
この機能はベータ版です。
Arrow Flight の取り込み機能を使用すると、 Apache Arrow RecordBatchデータを、各行を最初に JSON または Protocol Buffers に変換する代わりに、 Zerobus Ingestに直接送信できます。これは、 JSONとProtocol Buffersに加えて、Zerobus SDKにおける3番目のレコードフォーマットオプションであり、同じgRPC接続上で実行されます。 これは、同じ Zerobus エンドポイント、同じ OAuth フロー、および同じx-databricks-zerobus-table-nameヘッダー規約を使用します。ワイヤプロトコルはArrow Flight DoPutであり、gRPC上でArrow IPCメッセージを伝送します。
Arrow Flightはいつ使うべきか
Arrow Flightは、以下のシナリオに最適です。
- アプリケーションは既に、
pyarrow.Tableやpyarrow.RecordBatch(Python)、 arrow-rsクレートからのarrow_array::RecordBatch(Rust)、またはVectorSchemaRoot(Java) などの Arrow データを生成しています。Arrow 上に構築されたDataFrame ( PolarsやDataFusionなど) は、このパスに自然に適合します。 - レコードを1件ずつ送信するのではなく、行をバッチ処理で取り込みます。
- スキーマが幅広く、数値が多く、またはアナリティクス指向であり、行ごとのシリアル化により CPU オーバーヘッドが顕著に増加します。
- あなたは、短時間のデータを集約し、それを1列形式のバッチとして送信するコレクターまたはゲートウェイを構築しています。
アローフライトは、乗客数が少なく、一度に1列ずつしか利用されないような状況では、通常最適な選択肢ではありません。そのような場合、SDKのgRPCパスを介したJSONまたはProtocol Buffersの方が一般的に簡単です。「インターフェースの選択」を参照してください。
取り込みモデルの仕組み
Arrow Flightのデータ取り込みでは、1つのストリームが1つのターゲットテーブルに書き込みます。データを取り込むには、以下の手順に従ってください。
- 宛先Deltaテーブルのスキーマと一致するArrowスキーマを定義します。
- そのテーブルの Zerobus Arrow ストリームを開きます。
RecordBatch(またはTable)のペイロードを送信します。- 最後のオフセットを待つか、
flush()を呼び出して耐久性を確認します。 - ストリームを閉じます。
Zerobus SDKを使用する場合、SDKがArrow Flightの低レベルの配線詳細を自動的に処理します。ArrowのデータをIPC形式にシリアル化し、サイズが大きすぎるバッチを複数のFlightメッセージに自動的に分割します。サーバーは各バッチを永続的に確認し、SDKはその確認を論理バッチオフセットとして表示します。
Protobufのスキーマ規則と同様に、ストリームに渡すスキーマは、ターゲットのDeltaテーブルと1対1で一致する必要があります。スキーマでは、Deltaテーブルに存在するNULL許容列を省略できます(これは互換性を損なわないスキーマ変更として扱われます)が、それ以外の不一致は拒否されます。
RecordBatch内の各行は、10 MB の gRPC メッセージサイズ制限内に収まる必要があります。Arrow Flightは、サイズが大きすぎるバッチを自動的に複数のワイヤメッセージに分割するため、大きなバッチでも問題ありません。しかし、シリアル化されたサイズが制限を超える単一の行は分割できず、拒否されます。Arrow Flight は同じ gRPC トランスポート上で実行されるため、スループット、レイテンシー、クォータ、およびパーティション テーブルの制限も Arrow Flight に適用されます。 Zerobus Ingestコネクタの制限事項を参照してください。
クライアントを記述する
以下の例では、Zerobus Ingestコネクタの使用例で使用されているのと同じair_qualityテーブルに対してArrow Flightストリームを開きます。 簡潔にするため、PythonとRustで示していますが、同じビルダー、設定オプション、および呼び出しシーケンスは、すべてのZerobus SDKで利用可能です。構文を言語に合わせて調整し、言語固有の矢印タイプについてはSDKリポジトリを参照してください。
- Python SDK
- Rust SDK
Python SDK は、ストリーム作成時にpyarrow.Schemaを、各取り込み呼び出し時にpyarrow.RecordBatchまたはpyarrow.Tableを受け入れます。
pip install "databricks-zerobus-ingest-sdk[arrow]" pyarrow
import pyarrow as pa
from zerobus.sdk.sync import ZerobusSdk
# See "Get your workspace URL and Zerobus Ingest endpoint" in zerobus-ingest.md.
SERVER_ENDPOINT = "https://1234567890123456.zerobus.us-west-2.cloud.databricks.com"
DATABRICKS_WORKSPACE_URL = "https://dbc-a1b2c3d4-e5f6.cloud.databricks.com"
TABLE_NAME = "main.default.air_quality"
CLIENT_ID = "your-client-id"
CLIENT_SECRET = "your-client-secret"
schema = pa.schema(
[
("device_name", pa.large_utf8()),
("temp", pa.int32()),
("humidity", pa.int64()),
]
)
sdk = ZerobusSdk(SERVER_ENDPOINT, DATABRICKS_WORKSPACE_URL)
stream = sdk.create_arrow_stream(TABLE_NAME, schema, CLIENT_ID, CLIENT_SECRET)
row_count = 1_000
batch = pa.record_batch(
{
"device_name": [f"sensor-{i}" for i in range(row_count)],
"temp": [20 + (i % 5) for i in range(row_count)],
"humidity": [55 + (i % 10) for i in range(row_count)],
},
schema=schema,
)
try:
offset = stream.ingest_batch(batch)
stream.wait_for_offset(offset)
finally:
stream.close()
stream.ingest_batch() pyarrow.Tableも受け入れます。SDKは送信前に内部的にそれを単一のRecordBatchに変換します。各呼び出しは論理オフセットを返します。サーバーがそのバッチを永続的に保存するまで、 stream.wait_for_offset(offset)ブロックします。
Rust SDK は、 arrow-flight Cargo 機能の背後にあるstream_builder() API を通じて Arrow Flight を公開します。SDKと同じArrowメジャーバージョンを使用することで、コンパイル時にRecordBatchと配列型が一致するようにします。
cargo add databricks-zerobus-ingest-sdk --features arrow-flight
cargo add arrow-array
cargo add arrow-schema
cargo add tokio --features macros,rt-multi-thread
use std::sync::Arc;
use arrow_array::{Int32Array, Int64Array, LargeStringArray, RecordBatch};
use arrow_schema::{DataType, Field, Schema as ArrowSchema};
use databricks_zerobus_ingest_sdk::ZerobusSdk;
const SERVER_ENDPOINT: &str = "https://1234567890123456.zerobus.us-west-2.cloud.databricks.com";
const DATABRICKS_WORKSPACE_URL: &str = "https://dbc-a1b2c3d4-e5f6.cloud.databricks.com";
const TABLE_NAME: &str = "main.default.air_quality";
const CLIENT_ID: &str = "your-client-id";
const CLIENT_SECRET: &str = "your-client-secret";
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let schema = Arc::new(ArrowSchema::new(vec![
Field::new("device_name", DataType::LargeUtf8, false),
Field::new("temp", DataType::Int32, false),
Field::new("humidity", DataType::Int64, false),
]));
let sdk = ZerobusSdk::builder()
.endpoint(SERVER_ENDPOINT)
.unity_catalog_url(DATABRICKS_WORKSPACE_URL)
.build()?;
let mut stream = sdk
.stream_builder()
.table(TABLE_NAME)
.oauth(CLIENT_ID, CLIENT_SECRET)
.arrow(Arc::clone(&schema))
.build_arrow()
.await?;
let row_count: i32 = 1_000;
let batch = RecordBatch::try_new(
Arc::clone(&schema),
vec![
Arc::new(LargeStringArray::from(
(0..row_count)
.map(|i| format!("sensor-{i}"))
.collect::<Vec<_>>(),
)),
Arc::new(Int32Array::from(
(0..row_count).map(|i| 20 + (i % 5)).collect::<Vec<_>>(),
)),
Arc::new(Int64Array::from(
(0..row_count)
.map(|i| 55 + (i % 10) as i64)
.collect::<Vec<_>>(),
)),
],
)?;
let offset = stream.ingest_batch(batch).await?;
stream.wait_for_offset(offset).await?;
stream.close().await?;
Ok(())
}
ビルダーは.arrow(schema)でArrow Flight形式を選択し、 .build_arrow()でストリームを終了させ、 ZerobusArrowStreamを返します。JSONとProtocol Buffersは引き続き.json() / .compiled_proto(...)および.build()を使用します。
IPC圧縮
Arrow IPCペイロードは、通信中に圧縮することができます。SDKは2種類の圧縮コーデックに対応しています。
LZ4_FRAME高速、CPU負荷が低い、圧縮率は控えめ。ZSTD圧縮率が高いほど、バッチあたりのCPU負荷が高くなります。
ネットワーク帯域幅がスループットを制限する場合にのみ、圧縮を有効にしてください。圧縮によって通信データ量は削減されるが、クライアント側のCPU負荷が増加する。
Python SDKで、 ArrowStreamConfigurationOptionsのipc_compressionフィールドを設定します。
from zerobus.sdk.shared.arrow import IPCCompression, ArrowStreamConfigurationOptions
options = ArrowStreamConfigurationOptions(ipc_compression=IPCCompression.ZSTD)
Rust SDKでは、ビルダーで設定します。CompressionType列挙型はarrow-ipcクレートに含まれているので、依存関係として追加してください。
cargo add arrow-ipc
use arrow_ipc::CompressionType;
let stream = sdk
.stream_builder()
.table(TABLE_NAME)
.oauth(CLIENT_ID, CLIENT_SECRET)
.arrow(schema)
.ipc_compression(Some(CompressionType::ZSTD))
.build_arrow()
.await?;
ベストプラクティス
Arrow Flightの吸入機能から最高のパフォーマンスと信頼性を得るには、以下のガイドラインに従ってください。
- バッチごとに新しいストリームを開くのではなく、多くのバッチにストリームを再利用します。 ストリームの作成にはかなりのオーバーヘッドがかかりますが、多くのバッチにわたってストリームを再利用することで軽減できます。
- 1回のバッチ処理で複数の行を送信します。1回の呼び出しで1行ずつ処理するのではなく、アプリケーションのサイズに合わせた自然なバッチ処理から始めましょう。一度に1行ずつ送信する方法は機能しますが、Arrowを使用する際のパフォーマンス上の利点のほとんどが失われます。
- 管理されたチェックポイントで
flush()を呼び出してください。これにより、個々のバッチをブロックすることなく、バッチグループ全体の明確な耐久性境界を設定できます。 - ワークロードがネットワーク負荷の高い場合にのみ、IPC圧縮を有効にしてください。IPC圧縮を参照してください。
- 生産者が既に円柱状になっている場合は、Arrow Flightを使用してください。ソースデータが元々行指向で小規模な場合は、JSONまたはプロトコルバッファを使用してZerobus Ingestコネクタを利用する方が多くの場合簡単です。Zerobus Ingestコネクタの使用方法を参照してください。
エラー処理と回復
Arrow Flight ストリームは、Zerobus Ingest の他の部分と同じ gRPC エラーカテゴリを使用します。エラーコード、再試行に関するガイダンス、およびクライアントとサーバーの完全な分類については、 「Zerobus Ingest エラー処理」を参照してください。
SDKを自動復旧(デフォルト設定)で構成すると、一時的な障害発生時に透過的に再接続し、未確認のバッチを再再生します。ストリームが終了した後でも、サーバーが受信したがまだ確認応答していないバッチを取得できます。これは、ストリームが正常に終了した場合でも、回復不能な障害によって終了した場合でも同様に適用されます。Python SDKでは:
# Retry unacked_batches against a freshly created stream
if stream.is_closed:
unacked_batches = stream.get_unacked_batches()
Rust SDKでは、 stream.get_unacked_batches().await?を呼び出して、再試行のために未確認のバッチを取得します。
次のステップ
- Zerobus Ingest コネクタを使用する: Zerobus Ingest をまだ設定していない場合は、ここからワークスペース URL の検索、ターゲットDeltaテーブルの作成、サービス プリンシパルの構成の手順を開始してください。 これらのステップはすべてのレコード形式で共有されます。
- Zerobus Ingest コネクタの制限: 本番運用に展開する前に、Zerobus クォータを確認してください。 スループット、レイテンシ、およびパーティションテーブルの制限はすべてArrow Flightに適用されます。
- Zerobus Ingestのエラー処理:gRPCエラーコードの完全なリストと、クライアントに対する推奨される再試行および回復動作については、このページを参照してください。