Zerobus Ingestコネクタを使用する
このページでは、 LakeFlow Connectの Zerobus Ingest コネクタを使用してデータを取り込む方法について説明します。
インターフェースを選択
Zerobus Ingest は gRPC および REST インターフェースをサポートしています。SDK は、高スループット アプリケーションを構築するための開発者向けのインターフェースを備えたカスタム gRPC ベースのクライアントを提供します。REST インターフェースは、大量の「チャッティ」デバイスを扱う際のアーキテクチャ上の制約を処理します。
- gRPC「接続税」を備えた SDK: gRPC は、永続的な接続による高スループット パフォーマンスに特化しています。ただし、開いているストリームはすべて同時実行クォータにカウントされます。
- REST「スループット税」: REST では更新ごとに完全なハンドシェイクが必要なため、ステートレスになります。REST は、ステータスが頻繁に報告されないエッジ デバイスのユースケースに適しています。
大容量のストリームには gRPC を活用し、大規模で低頻度のデバイス群やサポートされていない言語には REST を活用します。
ワークスペース URL と Zerobus Ingest エンドポイントを取得する
ログインすると、ワークスペースの URL がブラウザに表示されます。完全な URL はhttps://<databricks-instance>.com/o=XXXXX形式に従いますが、ワークスペース URL は/o=XXXXXより前のすべての内容で構成されます。たとえば、次の完全な URL を指定すると、ワークスペースの URL とワークスペース ID を特定できます。
- 完全なURL:
https://abcd-teste2-test-spcse2.cloud.databricks.com/?o=2281745829657864# - ワークスペース URL:
https://abcd-teste2-test-spcse2.cloud.databricks.com - ワークスペースID:
2281745829657864
サーバー エンドポイントは、ワークスペースとリージョンによって異なります。
- サーバーエンドポイント:
<workspace-id>.zerobus.<region>.cloud.databricks.com
利用可能なリージョンについては、 Zerobus Ingest コネクタの制限事項を参照してください。
ターゲットテーブルを作成または識別する
データを取り込むターゲット テーブルを特定します。新しいターゲット テーブルを作成するには、 CREATE TABLE SQL コマンドを実行します。たとえば、 unity.default.air_qualityという名前の新しいテーブルを作成します。
CREATE TABLE unity.default.air_quality (
device_name STRING, temp INT, humidity LONG);
サービスプリンシパルを作成し、権限を付与する
サービスプリンシパルは、パーソナライズされたアカウントよりも高いセキュリティを提供する特殊な ID です。 サービスプリンシパルと認証にサービスプリンシパルを使用する方法の詳細については、 「 OAuthを使用したDatabricksへのサービスプリンシパルのアクセスを許可する」を参照してください。
-
サービスプリンシパルを作成するには、 [設定] > [ID とアクセス] に移動します。
-
「サービスシプリンパル」 で、 「管理」 を選択します。
-
[ サービスプリンシパルの追加 ] をクリックします。
-
[サービスプリンシパルの追加] ウィンドウで、 [新規追加 ] をクリックして新しいサービスプリンシパルを作成します。
-
サービスプリンシパルのクライアント ID とクライアント シークレットを生成して保存します。
-
カタログ、スキーマ、テーブルに必要な権限をサービスプリンシパルに付与します。
- サービス プリンシパルページで、 「構成」 タブに移動します。
- アプリケーション ID (UUID) をコピーします。
- 権限を付与するには、以下のSQLを使用してください。必要に応じて、例のUUID、カタログ名、スキーマ名、テーブル名を置き換えてください。
SQLGRANT USE CATALOG ON CATALOG <catalog> TO `<UUID>`;
GRANT USE SCHEMA ON SCHEMA <catalog.schema> TO `<UUID>`;
GRANT MODIFY, SELECT ON TABLE <catalog.schema.table_name> TO `<UUID>`;
ALL PRIVILEGESが付与されているテーブルであっても、テーブルに対するMODIFYおよびSELECT権限を付与する必要があります。
クライアントを記述する
好みのプログラミング言語で Zerobus SDK を使用するか、REST API を使用して、データをターゲット テーブルに取り込みます。
- Python SDK
- Rust SDK
- Java SDK
- Go SDK
- TypeScript SDK
- REST API
Python 3.9 以上が必要です。SDK は、高性能 Rust SDK への PyO3 バインディングを使用して、純粋な Python よりも最大 40 倍高いスループットと、Rust 非同期ランタイムによる効率的なネットワーク I/O を提供します。JSON (最も単純) とプロトコル バッファー (本番運用に推奨) をサポートしています。 SDK は、同期と非同期の両方の実装と、3 つの異なる取り込み方法 (将来ベース、オフセット ベース、ファイア アンド フォーゲット) もサポートしています。
pip install databricks-zerobus-ingest-sdk
JSONの例:
import logging
from zerobus.sdk.sync import ZerobusSdk
from zerobus.sdk.shared import RecordType, StreamConfigurationOptions, TableProperties
# See "Get your workspace URL and Zerobus Ingest endpoint" for information on obtaining these values.
SERVER_ENDPOINT="https://1234567890123456.zerobus.us-west-2.cloud.databricks.com"
DATABRICKS_WORKSPACE_URL="https://dbc-a1b2c3d4-e5f6.cloud.databricks.com"
TABLE_NAME="main.default.air_quality"
CLIENT_ID="your-client-id"
CLIENT_SECRET="your-client-secret"
sdk = ZerobusSdk(
SERVER_ENDPOINT,
DATABRICKS_WORKSPACE_URL
)
table_properties = TableProperties(TABLE_NAME)
options = StreamConfigurationOptions(record_type=RecordType.JSON)
stream = sdk.create_stream(CLIENT_ID, CLIENT_SECRET, table_properties, options)
try:
for i in range(1000):
record_dict = {
"device_name": f"sensor-{i}",
"temp": 20 + i % 15,
"humidity": 50 + i % 40
}
offset = stream.ingest_record_offset(record_dict)
# Optional: Wait for durability confirmation
stream.wait_for_offset(offset)
finally:
stream.close()
確認コールバック: 取り込みの進行状況を非同期的に追跡するには、 ack_callbackオプションを使用します。レコードが確認されるか失敗したときに呼び出されるon_ack(offset: int)メソッドとon_error(offset: int, error_message: str)メソッドを持つAckCallbackのサブクラスを渡します。
from zerobus.sdk.shared import AckCallback, StreamConfigurationOptions, RecordType
class MyAckCallback(AckCallback):
def on_ack(self, offset: int) -> None:
print(f"Record acknowledged at offset: {offset}")
def on_error(self, offset: int, error_message: str) -> None:
print(f"Error at offset {offset}: {error_message}")
options = StreamConfigurationOptions(
record_type = RecordType.JSON,
ack_callback = MyAckCallback()
)
プロトコル バッファー: 型セーフな取り込みを行うには、 RecordType.PROTO (デフォルト) でプロトコル バッファーを使用し、テーブル プロパティにdescriptorProtoを指定します。
完全なドキュメント、構成オプション、バッチ取り込み、およびプロトコル バッファーの例については、 Python SDK リポジトリを参照してください。
Rust 1.70以上が必要です。SDK は、高スループットの取り込みのために非同期 I/O と gRPC を活用し、他のすべての SDK の中核として機能します。JSON (最も単純) とプロトコル バッファー (本番運用に推奨) をサポートしています。
まず、パッケージをインポートします。
cargo add databricks-zerobus-ingest-sdk
または、 Cargo.tomlに追加します。
[dependencies]
databricks-zerobus-ingest-sdk = "0.5.0" # Latest version at time of publication
JSONの例:
use databricks_zerobus_ingest_sdk::{
databricks::zerobus::RecordType, JsonString, StreamConfigurationOptions,
TableProperties, ZerobusSdk,
};
use std::error::Error;
// See "Get your workspace URL and Zerobus Ingest endpoint" for information on obtaining these values.
const DATABRICKS_WORKSPACE_URL: &str = "https://dbc-a1b2c3d4-e5f6.cloud.databricks.com";
const SERVER_ENDPOINT: &str = "https://1234567890123456.zerobus.us-west-2.cloud.databricks.com";
const TABLE_NAME: &str = "main.default.air_quality";
const CLIENT_ID: &str = "your-client-id";
const CLIENT_SECRET: &str = "your-client-secret";
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let table_properties = TableProperties {
table_name: TABLE_NAME.to_string(),
descriptor_proto: None,
};
let stream_configuration_options = StreamConfigurationOptions {
max_inflight_requests: 100,
record_type: RecordType::Json,
..Default::default()
};
let sdk_handle = ZerobusSdk::builder()
.endpoint(SERVER_ENDPOINT)
.unity_catalog_url(DATABRICKS_WORKSPACE_URL)
.build()?;
let mut stream = sdk_handle
.create_stream(
table_properties.clone(),
CLIENT_ID.to_string(),
CLIENT_SECRET.to_string(),
Some(stream_configuration_options),
)
.await?;
let offset = stream.ingest_record_offset(
JsonString("{
\"device_name\": \"sensor\",
\"temp\": 22,
\"humidity\": 55}".to_string())).await?;
stream.wait_for_offset(offset).await?;
println!("Record ingested successfully");
stream.close().await?;
println!("Stream closed successfully");
Ok(())
}
プロトコル バッファー: 型セーフな取り込みを行うには、 RecordType::Proto (デフォルト) でプロトコル バッファーを使用し、テーブル プロパティにdescriptor_protoを指定します。generate_protoツールを使用して必要なファイルを生成し、プロジェクトにインポートします。完全なドキュメント、構成オプション、バッチ取り込み、 generate_protoツール、およびプロトコル バッファーの例については、 Rust SDK リポジトリを参照してください。
Java 8以降が必要です。SDK は、高性能 Rust SDK への JNI (Java Native Interface) バインディングを使用して、純粋な Java gRPC よりも低いレイテンシと、Rust 非同期ランタイムによる効率的なネットワーク I/O を提供します。JSON (最も単純) とプロトコル バッファー (本番運用に推奨) をサポートしています。
Maven :
<dependency>
<groupId>com.databricks</groupId>
<artifactId>zerobus-ingest-sdk</artifactId>
<version>0.2.0</version>
</dependency>
JSONの例:
import com.databricks.zerobus.*;
public class ZerobusClient {
// See "Get your workspace URL and Zerobus Ingest endpoint" for information on obtaining these values.
private static final String SERVER_ENDPOINT =
"https://1234567890123456.zerobus.us-west-2.cloud.databricks.com";
private static final String DATABRICKS_WORKSPACE_URL =
"https://dbc-a1b2c3d4-e5f6.cloud.databricks.com";
private static final String TABLE_NAME = "main.default.air_quality";
private static final String CLIENT_ID = "your-client-id";
private static final String CLIENT_SECRET = "your-client-secret";
public static void main(String[] args) throws Exception {
ZerobusSdk sdk = new ZerobusSdk(
SERVER_ENDPOINT,
DATABRICKS_WORKSPACE_URL
);
ZerobusJsonStream stream = sdk.createJsonStream(
TABLE_NAME, CLIENT_ID, CLIENT_SECRET
).join();
try {
long lastOffset = 0;
for (int i = 0; i < 100; i++) {
String record = String.format(
"{\"device_name\": \"sensor-%d\", \"temp\": 22, \"humidity\": 55}", i
);
lastOffset = stream.ingestRecordOffset(record);
}
stream.waitForOffset(lastOffset);
} finally {
stream.close();
}
}
}
プロトコル バッファー: 型安全な取り込みの場合は、 ZerobusProtoStreamとcreateProtoStream()を使用します。バンドルされた JAR ツールを使用してテーブルからスキーマを生成し、 protocでコンパイルします。
完全なドキュメント、構成オプション、バッチ取り込み、およびプロトコル バッファーの例については、 Java SDK リポジトリを参照してください。
Go 1.21以上が必要です。SDK は CGO と FFI を使用して高性能 Rust SDK をラップし、同じスループットとパフォーマンスを提供します。JSON (最も単純) とプロトコル バッファー (本番運用に推奨) をサポートしています。
go get github.com/databricks/zerobus-sdk-go@latest
JSONの例:
簡潔にするために、ここではエラーは無視されます。本番運用コードでは、常にエラーをチェックしてください。
package main
import (
"fmt"
zerobus "github.com/databricks/zerobus-sdk-go"
)
// See "Get your workspace URL and Zerobus Ingest endpoint" for information on obtaining these values.
const (
ServerEndpoint = "https://1234567890123456.zerobus.us-west-2.cloud.databricks.com"
DatabricksWorkspaceURL = "https://dbc-a1b2c3d4-e5f6.cloud.databricks.com"
TableName = "main.default.air_quality"
ClientID = "your-client-id"
ClientSecret = "your-client-secret"
)
func main() {
sdk, _ := zerobus.NewZerobusSdk(
ServerEndpoint,
DatabricksWorkspaceURL,
)
defer sdk.Free()
options := zerobus.DefaultStreamConfigurationOptions()
options.RecordType = zerobus.RecordTypeJson
stream, _ := sdk.CreateStream(
zerobus.TableProperties{
TableName: TableName,
},
ClientID,
ClientSecret,
options,
)
defer stream.Close()
offset, _ := stream.IngestRecordOffset(`{
"device_name": "sensor-001",
"temp": 20,
"humidity": 60
}`)
_ = stream.WaitForOffset(offset)
fmt.Println("Record ingested successfully")
_ = stream.Close()
fmt.Println("Stream closed successfully")
}
プロトコル バッファー: 型セーフな取り込みを行うには、 RecordTypeProto (デフォルト) でプロトコル バッファーを使用し、テーブル プロパティにdescriptorProtoを指定します。.protoを作成するテーブル スキーマに一致するファイルを検索し、 generate_protoスクリプトを実行してファイルをプロジェクトにインポートします。
完全なドキュメント、構成オプション、バッチ取り込み、generate_proto ツール、およびプロトコル バッファーの例については、 Go SDK リポジトリを参照してください。
Node.js 16 以上が必要です。SDK は、NAPI-RS ネイティブ バインディングを使用して高性能 Rust SDK をラップし、JavaScript Promise にマップされた Rust Futures でネイティブ パフォーマンスを提供します。JSON (最も単純) とプロトコル バッファー (本番運用に推奨) をサポートしています。
npm install @databricks/zerobus-ingest-sdk
JSONの例:
import { ZerobusSdk, RecordType } from '@databricks/zerobus-ingest-sdk';
// See "Get your workspace URL and Zerobus Ingest endpoint" for information on obtaining these values.
const SERVER_ENDPOINT = 'https://1234567890123456.zerobus.us-west-2.cloud.databricks.com';
const DATABRICKS_WORKSPACE_URL = 'https://dbc-a1b2c3d4-e5f6.cloud.databricks.com';
const TABLE_NAME = 'main.default.air_quality';
const CLIENT_ID = 'your-client-id';
const CLIENT_SECRET = 'your-client-secret';
const sdk = new ZerobusSdk(SERVER_ENDPOINT, DATABRICKS_WORKSPACE_URL);
const stream = await sdk.createStream({ tableName: TABLE_NAME }, CLIENT_ID, CLIENT_SECRET, {
recordType: RecordType.Json,
});
try {
let lastOffset = BigInt(0);
for (let i = 0; i < 100; i++) {
const record = { device_name: `sensor-${i}`, temp: 22, humidity: 55 };
lastOffset = await stream.ingestRecordOffset(record);
}
await stream.waitForOffset(lastOffset);
} finally {
await stream.close();
}
プロトコル バッファー: 型セーフな取り込みを行うには、 RecordType.Proto (デフォルト) でプロトコル バッファーを使用し、テーブル プロパティにdescriptorProtoを指定します。
完全なドキュメント、構成オプション、バッチ取り込み、およびプロトコル バッファーの例については、 TypeScript SDK リポジトリを参照してください。
プレビュー
Zerobus Ingest コネクタの REST API はベータ版です。
REST API を使用すると、 /zerobus/v1/tables/<table-name>/insertエンドポイントに HTTP POST リクエストを送信して、単一のレコードを取り込むことができます。レコード自体はリクエスト本文に含まれており、JSON 形式である必要があります。
この例では、CURL を使用して REST API で Zerobus Ingest にデータをプッシュする方法について説明します。
ヘッダー
リクエストを正しく認証してフォーマットするには、2 つの特定の HTTP ヘッダーが必要です。
-
Content-Type: application/json- コンテンツ タイプを指定するための必須フィールド。現在、サポートされているメッセージ形式は JSON のみです。
-
Authorization: Bearer <token><token>後ほど提供される curl コマンドを使用して取得した OAuth トークンに置き換えます。
OAuth取得: これらは 1 時間ごとに期限切れになるため、更新する必要があります。 OAuth トークンを再度取得することで更新できます。
次の事項を記入してください:
$CATALOG、$SCHEMA、$TABLE、$WORKSPACE_ID、$WORKSPACE_URL$DATABRICKS_CLIENT_IDそして$DATABRICKS_CLIENT_SECRET- これら 2 つのパラメーターは、作成したサービス プリンシパルに対応します。
authorization_details=$(cat <<EOF
[{
"type": "unity_catalog_privileges",
"privileges": ["USE CATALOG"],
"object_type": "CATALOG",
"object_full_path": "$CATALOG"
},
{
"type": "unity_catalog_privileges",
"privileges": ["USE SCHEMA"],
"object_type": "SCHEMA",
"object_full_path": "$CATALOG.$SCHEMA"
},
{
"type": "unity_catalog_privileges",
"privileges": ["SELECT", "MODIFY"],
"object_type": "TABLE",
"object_full_path": "$CATALOG.$SCHEMA.$TABLE"
}]
EOF
)
export OAUTH_TOKEN=$(curl -X POST \
-u "$DATABRICKS_CLIENT_ID:$DATABRICKS_CLIENT_SECRET" \
-d "grant_type=client_credentials" \
-d "scope=all-apis" \
-d "resource=api://databricks/workspaces/$WORKSPACE_ID/zerobusDirectWriteApi" \
--data-urlencode "authorization_details=$authorization_details" \
"$WORKSPACE_URL/oidc/v1/token" | jq -r '.access_token')
レコードの取り込み:
次の事項を記入してください:
-
$ZEROBUS_ENDPOINT- 「ワークスペース URL と Zerobus 取り込みエンドポイントを取得する」セクションで定義されているとおりです。
-
$CATALOG、$SCHEMA、$TABLE、$WORKSPACE_ID、$WORKSPACE_URL -
$OAUTH_TOKEN- これは前のステップで作成されました。
リクエスト本体は JSON オブジェクトのリストである必要があります。
curl -X POST \
"$ZEROBUS_ENDPOINT/zerobus/v1/tables/$CATALOG.$SCHEMA.$TABLE/insert" \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $OAUTH_TOKEN" \
-d '[{ "device_name": "device_num_1", "temp": 28, "humidity": 60 },
{ "device_name": "device_num_1", "temp": 28, "humidity": 60 }]'
すべての情報が正しく入力されている場合は、HTTP ステータス コード 200 の空の JSON 応答が返されます。