Zerobus Ingestコネクタを使用する
このページでは、 LakeFlow Connectの Zerobus Ingest コネクタを使用してデータを取り込む方法について説明します。
インターフェースの選択
Zerobus Ingest は gRPC および REST インターフェースをサポートしています。すべての SDK は当社の gRPC APIsと直接統合され、カスタムの高スループット クライアントを構築するための開発者向けのインターフェースを提供します。 REST インターフェースは、大量の「チャッティ」デバイスを扱う際のアーキテクチャ上の制約を処理します。
- gRPC「接続税」を備えた SDK : gRPC は、永続的な接続による高スループット パフォーマンスに特化しています。ただし、開いているストリームはすべて同時実行クォータにカウントされます。
- REST「スループット税」 :REST では、更新ごとに完全なハンドシェイクが必要なため、ステートレスになります。REST は、ステータスが頻繁に報告されないエッジ デバイスのユースケースに適しています。
大容量のストリームには gRPC を活用し、大規模で低頻度のデバイス群やサポートされていない言語には REST を活用します。
ワークスペース URL と Zerobus Ingest Endpoint を取得する
ログイン後に Databricks ワークスペースを表示するときは、ブラウザーで次の形式の URL を確認してください: https://<databricks-instance>.com/o=XXXXX 。URL は/o=XXXXXより前の部分すべてで構成されます。例:
完全なURL: https://abcd-teste2-test-spcse2.cloud.databricks.com/?o=2281745829657864#
ワークスペース URL: https://abcd-teste2-test-spcse2.cloud.databricks.com
ワークスペースID: 2281745829657864
サーバー エンドポイントは、ワークスペースとリージョンによって異なります。
サーバーエンドポイント: <workspace-id>.zerobus.<region>.cloud.databricks.com
ターゲットテーブルを作成または識別する
取り込むターゲット テーブルを特定します。新しいターゲット テーブルを作成するには、 CREATE TABLE SQL コマンドを実行します。例えば:
CREATE TABLE unity.default.air_quality (
device_name STRING, temp INT, humidity LONG);
サービスプリンシパルを作成し、権限を付与する
サービスプリンシパルは、パーソナライズされたアカウントよりも高いセキュリティを提供する特殊な ID です。 サービスプリンシパルに関する詳細と、それらを認証に使用する方法については、これらの手順を参照してください。
-
[設定] > [ID とアクセス] でワークスペースにサービスプリンシパルを作成します。
-
サービスプリンシパルのクライアント ID とクライアント シークレットを生成して保存します。
-
カタログ、スキーマ、テーブルに必要な権限をサービスプリンシパルに付与します。
- サービスプリンシパルページで、 構成 タブを開きます。
- アプリケーション ID (UUID) をコピーします。
- 必要に応じて、例の UUID とユーザー名、スキーマ名、およびテーブル名を置き換えて、次の SQL を使用します。
SQLGRANT USE CATALOG ON CATALOG <username> TO `<UUID>`;
GRANT USE SCHEMA ON SCHEMA <username.default> TO `<UUID>`;
GRANT MODIFY, SELECT ON TABLE <username.default.table_name> TO `<UUID>`;
クライアントを記述する
- Python SDK
- Rust SDK
- Java SDK
- Go SDK
- TypeScript SDK
- REST API
Python 3.9 以上が必要です。SDK は、高性能 Rust SDK への PyO3 バインディングを使用して、純粋な Python よりも最大 40 倍高いスループットと、Rust 非同期ランタイムによる効率的なネットワーク I/O を提供します。JSON (最も単純) とプロトコル バッファー (本番運用に推奨) をサポートしています。 SDK は、同期と非同期の両方の実装と、3 つの異なる取り込み方法 (将来ベース、オフセット ベース、ファイア アンド フォーゲット) もサポートしています。
pip install databricks-zerobus-ingest-sdk
JSONの例:
import logging
from zerobus.sdk.sync import ZerobusSdk
from zerobus.sdk.shared import RecordType, StreamConfigurationOptions, TableProperties
# Configuration - see "Before you begin" section for how to obtain these values.
SERVER_ENDPOINT="https://1234567890123456.zerobus.us-west-2.cloud.databricks.com"
DATABRICKS_WORKSPACE_URL="https://dbc-a1b2c3d4-e5f6.cloud.databricks.com"
TABLE_NAME="main.default.air_quality"
CLIENT_ID="your-client-id"
CLIENT_SECRET="your-client-secret"
sdk = ZerobusSdk(
SERVER_ENDPOINT,
DATABRICKS_WORKSPACE_URL
)
table_properties = TableProperties(TABLE_NAME)
options = StreamConfigurationOptions(record_type=RecordType.JSON)
stream = sdk.create_stream(CLIENT_ID, CLIENT_SECRET, table_properties, options)
try:
for i in range(1000):
record_dict = {
"device_name": f"sensor-{i}",
"temp": 20 + i % 15,
"humidity": 50 + i % 40
}
offset = stream.ingest_record_offset(record_dict)
# Optional: Wait for durability confirmation
stream.wait_for_offset(offset)
finally:
stream.close()
プロトコル バッファー : 型セーフな取り込みを行うには、 RecordType.PROTO (デフォルト) でプロトコル バッファーを使用し、テーブル プロパティに記述子 Proto を指定します。
完全なドキュメント、構成オプション、バッチ取り込み、およびプロトコル バッファーの例については、 Python SDK リポジトリを参照してください。
Rust 1.70以上が必要です。SDK は、高スループットの取り込みのために非同期 I/O と gRPC を活用し、他のすべての SDK の中核として機能します。JSON (最も単純) とプロトコル バッファー (本番運用に推奨) をサポートしています。
まず、パッケージをインポートします。
cargo add databricks-zerobus-ingest-sdk
または、 Cargo.tomlに追加します。
[dependencies]
databricks-zerobus-ingest-sdk = "0.5.0" # Latest version at time of publication
JSONの例:
use databricks_zerobus_ingest_sdk::{
databricks::zerobus::RecordType, JsonString, StreamConfigurationOptions,
TableProperties, ZerobusSdk,
};
use std::error::Error;
//Configuration - see "Before you begin" section for how to obtain these values.
const DATABRICKS_WORKSPACE_URL: &str = "https://<your-workspace>.cloud.databricks.com";
const SERVER_ENDPOINT: &str = "<your-shard-id>.zerobus.<region>.cloud.databricks.com";
const TABLE_NAME: &str = "main.default.air_quality";
const CLIENT_ID: &str = "your-client-id";
const CLIENT_SECRET: &str = "your-client-secret";
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let table_properties = TableProperties {
table_name: TABLE_NAME.to_string(),
descriptor_proto: None,
};
let stream_configuration_options = StreamConfigurationOptions {
max_inflight_requests: 100,
record_type: RecordType::Json,
..Default::default()
};
let sdk_handle = ZerobusSdk::builder()
.endpoint(SERVER_ENDPOINT)
.unity_catalog_url(DATABRICKS_WORKSPACE_URL)
.build()?;
let mut stream = sdk_handle
.create_stream(
table_properties.clone(),
CLIENT_ID.to_string(),
CLIENT_SECRET.to_string(),
Some(stream_configuration_options),
)
.await?;
let offset = stream.ingest_record_offset(
JsonString("{
\"device_name\": \"sensor\",
\"temp\": 22,
\"humidity\": 55}".to_string())).await?;
stream.wait_for_offset(offset).await?;
println!("Record ingested successfully");
stream.close().await?;
println!("Stream closed successfully");
Ok(())
}
プロトコル バッファー: 型セーフな取り込みを行うには、 RecordType::Proto (デフォルト) でプロトコル バッファーを使用し、テーブル プロパティに descriptor_proto を指定します。generate_protoツールを使用して必要なファイルを生成し、プロジェクトにインポートします。完全なドキュメント、構成オプション、バッチ取り込み、 generate_protoツール、およびプロトコル バッファーの例については、 Rust SDK リポジトリを参照してください。
Java 8以降が必要です。SDK は、高性能 Rust SDK への JNI (Java Native Interface) バインディングを使用して、純粋な Java gRPC よりも低いレイテンシと、Rust 非同期ランタイムによる効率的なネットワーク I/O を提供します。JSON (最も単純) とプロトコル バッファー (本番運用に推奨) をサポートしています。
Maven :
<dependency>
<groupId>com.databricks</groupId>
<artifactId>zerobus-ingest-sdk</artifactId>
<version>0.2.0</version>
</dependency>
JSONの例:
import com.databricks.zerobus.*;
public class ZerobusClient {
// Configuration - see "Before you begin" section for how to obtain these values.
private static final String SERVER_ENDPOINT =
"https://1234567890123456.zerobus.us-west-2.cloud.databricks.com";
private static final String DATABRICKS_WORKSPACE_URL =
"https://dbc-a1b2c3d4-e5f6.cloud.databricks.com";
private static final String TABLE_NAME = "main.default.air_quality";
private static final String CLIENT_ID = "your-client-id";
private static final String CLIENT_SECRET = "your-client-secret";
public static void main(String[] args) throws Exception {
ZerobusSdk sdk = new ZerobusSdk(
SERVER_ENDPOINT,
DATABRICKS_WORKSPACE_URL
);
ZerobusJsonStream stream = sdk.createJsonStream(
TABLE_NAME, CLIENT_ID, CLIENT_SECRET
).join();
try {
long lastOffset = 0;
for (int i = 0; i < 100; i++) {
String record = String.format(
"{\"device_name\": \"sensor-%d\", \"temp\": 22, \"humidity\": 55}", i
);
lastOffset = stream.ingestRecordOffset(record);
}
stream.waitForOffset(lastOffset);
} finally {
stream.close();
}
}
}
プロトコル バッファー: 型安全な取り込みの場合は、 ZerobusProtoStreamとcreateProtoStream()を使用します。バンドルされた JAR ツールを使用してテーブルからスキーマを生成し、 protocでコンパイルします。
完全なドキュメント、構成オプション、バッチ取り込み、およびプロトコル バッファーの例については、 Java SDK リポジトリを参照してください。
Go 1.21以上が必要です。SDK は CGO と FFI を使用して高性能 Rust SDK をラップし、同じスループットとパフォーマンスを提供します。JSON (最も単純) とプロトコル バッファー (本番運用に推奨) をサポートしています。
go get github.com/databricks/zerobus-sdk-go@latest
JSONの例:
注: 簡潔にするために、ここではエラーは無視されます。本番運用コードでは、常にエラーをチェックしてください。
package main
import (
"fmt"
zerobus "github.com/databricks/zerobus-sdk-go"
)
//Configuration - see "Before you begin" section for how to obtain these values.
const (
ServerEndpoint = "https://1234567890123456.zerobus.us-west-2.cloud.databricks.com"
DatabricksWorkspaceURL = "https://dbc-a1b2c3d4-e5f6.cloud.databricks.com"
TableName = "main.default.air_quality"
ClientID = "your-client-id"
ClientSecret = "your-client-secret"
)
func main() {
sdk, _ := zerobus.NewZerobusSdk(
ServerEndpoint,
DatabricksWorkspaceURL,
)
defer sdk.Free()
options := zerobus.DefaultStreamConfigurationOptions()
options.RecordType = zerobus.RecordTypeJson
stream, _ := sdk.CreateStream(
zerobus.TableProperties{
TableName: TableName,
},
ClientID,
ClientSecret,
options,
)
defer stream.Close()
offset, _ := stream.IngestRecordOffset(`{
"device_name": "sensor-001",
"temp": 20,
"humidity": 60
}`)
_ = stream.WaitForOffset(offset)
fmt.Println("Record ingested successfully")
_ = stream.Close()
fmt.Println("Stream closed successfully")
}
プロトコル バッファー: 型セーフな取り込みを行うには、 RecordTypeProto (デフォルト) でプロトコル バッファーを使用し、テーブル プロパティにdescriptorProtoを指定します。.protoを作成するテーブル スキーマに一致するファイルを検索し、 generate_protoスクリプトを実行してファイルをプロジェクトにインポートします。
完全なドキュメント、構成オプション、バッチ取り込み、generate_proto ツール、およびプロトコル バッファーの例については、 Go SDK リポジトリを参照してください。
Node.js 16 以上が必要です。SDK は、NAPI-RS ネイティブ バインディングを使用して高性能 Rust SDK をラップし、JavaScript Promise にマップされた Rust Futures でネイティブ パフォーマンスを提供します。JSON (最も単純) とプロトコル バッファー (本番運用に推奨) をサポートしています。
npm install @databricks/zerobus-ingest-sdk
JSONの例:
import { ZerobusSdk, RecordType } from '@databricks/zerobus-ingest-sdk';
// Configuration - see "Before you begin" section for how to obtain these values.
const SERVER_ENDPOINT = 'https://1234567890123456.zerobus.us-west-2.cloud.databricks.com';
const DATABRICKS_WORKSPACE_URL = 'https://dbc-a1b2c3d4-e5f6.cloud.databricks.com';
const TABLE_NAME = 'main.default.air_quality';
const CLIENT_ID = 'your-client-id';
const CLIENT_SECRET = 'your-client-secret';
const sdk = new ZerobusSdk(SERVER_ENDPOINT, DATABRICKS_WORKSPACE_URL);
const stream = await sdk.createStream({ tableName: TABLE_NAME }, CLIENT_ID, CLIENT_SECRET, {
recordType: RecordType.Json,
});
try {
let lastOffset = BigInt(0);
for (let i = 0; i < 100; i++) {
const record = { device_name: `sensor-${i}`, temp: 22, humidity: 55 };
lastOffset = await stream.ingestRecordOffset(record);
}
await stream.waitForOffset(lastOffset);
} finally {
await stream.close();
}
プロトコル バッファー: 型セーフな取り込みを行うには、 RecordType.Proto (デフォルト) でプロトコル バッファーを使用し、テーブル プロパティにdescriptorProtoを指定します。
完全なドキュメント、構成オプション、バッチ取り込み、およびプロトコル バッファーの例については、 TypeScript SDK リポジトリを参照してください。
プレビュー
Zerobus Ingest コネクタの REST API はベータ版です。
REST API を使用すると、 /zerobus/v1/tables/<table-name>/insertエンドポイントに HTTP POST リクエストを送信して、単一のレコードを取り込むことができます。レコード自体はリクエスト本文に含まれており、JSON 形式である必要があります。
この例では、CURL を使用して REST API で Zerobus Ingest にデータをプッシュする方法について説明します。
ヘッダー
リクエストを正しく認証してフォーマットするには、2 つの特定の HTTP ヘッダーが必要です。
-
Content-Type: application/json- コンテンツ タイプを指定するための必須フィールド。現在、サポートされているメッセージ形式は JSON のみです。
-
Authorization: Bearer <token><token>後ほど提供される curl コマンドを使用して取得した OAuth トークンに置き換えます。
OAuth取得: これらは 1 時間ごとに期限切れになるため、更新する必要があります。 OAuth トークンを再度取得することで更新できます。
次の事項を記入してください:
$CATALOG、$SCHEMA、$TABLE、$WORKSPACE_ID、$WORKSPACE_URL$DATABRICKS_CLIENT_IDそして$DATABRICKS_CLIENT_SECRET- これら 2 つのパラメーターは、作成したサービス プリンシパルに対応します。
authorization_details=$(cat <<EOF
[{
"type": "unity_catalog_privileges",
"privileges": ["USE CATALOG"],
"object_type": "CATALOG",
"object_full_path": "$CATALOG"
},
{
"type": "unity_catalog_privileges",
"privileges": ["USE SCHEMA"],
"object_type": "SCHEMA",
"object_full_path": "$CATALOG.$SCHEMA"
},
{
"type": "unity_catalog_privileges",
"privileges": ["SELECT", "MODIFY"],
"object_type": "TABLE",
"object_full_path": "$CATALOG.$SCHEMA.$TABLE"
}]
EOF
)
export OAUTH_TOKEN=$(curl -X POST \
-u "$DATABRICKS_CLIENT_ID:$DATABRICKS_CLIENT_SECRET" \
-d "grant_type=client_credentials" \
-d "scope=all-apis" \
-d "resource=api://databricks/workspaces/$WORKSPACE_ID/zerobusDirectWriteApi" \
--data-urlencode "authorization_details=$authorization_details" \
"$WORKSPACE_URL/oidc/v1/token" | jq -r '.access_token')
レコードの取り込み:
次の事項を記入してください:
-
$ZEROBUS_ENDPOINT- 「ワークスペース URL と Zerobus 取り込みエンドポイントを取得する」セクションで定義されているとおりです。
-
$CATALOG、$SCHEMA、$TABLE、$WORKSPACE_ID、$WORKSPACE_URL -
$OAUTH_TOKEN- これは前のステップで作成されました。
注意: リクエスト本体は JSON オブジェクトのリストである必要があります。
curl -X POST \
"$ZEROBUS_ENDPOINT/zerobus/v1/tables/$CATALOG.$SCHEMA.$TABLE/insert" \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $OAUTH_TOKEN" \
-d '[{ "device_name": "device_num_1", "temp": 28, "humidity": 60 },
{ "device_name": "device_num_1", "temp": 28, "humidity": 60 }]'
すべてが正しければ、HTTP ステータス コード 200 の空の JSON 応答が返されます。