Zerobus Ingestコネクタを使用する
プレビュー
Zerobus Ingest コネクタはパブリック プレビュー段階です。お試しいただくには、Databricks アカウント担当者にお問い合わせください。
このページでは、 Lakeflow Connectの Zerobus 直接書き込みコネクタを使用してデータを取り込む方法について説明します。
ワークスペースのURLを取得する
ログイン後に Databricks ワークスペースを表示するときは、ブラウザーで次の形式の URL を確認してください: https://<databricks-instance>.com/o=XXXXX 。URL は/o=XXXXXより前の部分すべてで構成されます。例:
完全なURL: https://abcd-teste2-test-spcse2.cloud.databricks.com/?o=2281745829657864#
ワークスペース URL: https://abcd-teste2-test-spcse2.cloud.databricks.com
ターゲットテーブルを作成または識別する
取り込むターゲット テーブルを特定します。新しいターゲット テーブルを作成するには、 CREATE TABLE SQL コマンドを実行します。例えば:
CREATE TABLE unity.default.air_quality (
device_name STRING, temp INT, humidity LONG);
サービスプリンシパルを作成し、権限を付与する
サービスプリンシパルは、パーソナライズされたアカウントよりも高いセキュリティを提供する特殊な ID です。 サービスプリンシパルに関する詳細と、それらを認証に使用する方法については、これらの手順を参照してください。
-
[設定] > [ID とアクセス] でワークスペースにサービスプリンシパルを作成します。
-
サービスプリンシパルのクライアント ID とクライアント シークレットを生成して保存します。
-
カタログ、スキーマ、テーブルに必要な権限をサービスプリンシパルに付与します。
- サービスプリンシパルページで、 構成 タブを開きます。
- アプリケーション ID (UUID) をコピーします。
- 必要に応じて、例の UUID とユーザー名、スキーマ名、およびテーブル名を置き換えて、次の SQL を使用します。
SQLGRANT USE CATALOG ON CATALOG <username> TO `<UUID>`;
GRANT USE SCHEMA ON SCHEMA <username.default> TO `<UUID>`;
GRANT MODIFY, SELECT ON TABLE <username.default.table_name> TO `<UUID>`;
エンドポイント形式
Zerobus サーバーのエンドポイントとワークスペースの URL 形式は、クラウド プロバイダーによって異なります。
クラウド | サーバーエンドポイント | ワークスペースURL |
|---|---|---|
AWS |
|
|
Azure |
|
|
AWS の例:
Server endpoint: 1234567890123456.zerobus.us-west-2.cloud.databricks.com
Workspace URL: https://dbc-a1b2c3d4-e5f6.cloud.databricks.com
Azureの例:
Server endpoint: 1234567890123456.zerobus.eastus.azuredatabricks.net
Workspace URL: https://adb-1234567890123456.12.azuredatabricks.net
クライアントを記述する
- Python SDK
- Rust SDK
- Java SDK
- Go SDK
- TypeScript SDK
Python 3.9 以上が必要です。SDK 、 JSON (最も単純) とプロトコル バッファー (本番運用に推奨) をサポートしています。
pip install databricks-zerobus-ingest-sdk
JSONの例:
from zerobus.sdk.sync import ZerobusSdk
from zerobus.sdk.shared import RecordType, StreamConfigurationOptions, TableProperties
# Configuration - see "Before you begin" section for how to obtain these values.
server_endpoint = "1234567890123456.zerobus.us-west-2.cloud.databricks.com"
workspace_url = "https://dbc-a1b2c3d4-e5f6.cloud.databricks.com"
table_name = "main.default.air_quality"
client_id = "your-service-principal-application-id"
client_secret = "your-service-principal-secret"
sdk = ZerobusSdk(server_endpoint, workspace_url)
table_properties = TableProperties(table_name)
options = StreamConfigurationOptions(record_type=RecordType.JSON)
stream = sdk.create_stream(client_id, client_secret, table_properties, options)
try:
for i in range(100):
record = {"device_name": f"sensor-{i}", "temp": 22, "humidity": 55}
ack = stream.ingest_record(record) # Pass dict directly, SDK handles serialization.
ack.wait_for_ack()
finally:
stream.close()
確認コールバック: 取り込みの進行状況を非同期的に追跡するには、 ack_callbackオプションを使用します。コールバックは、そのオフセットまでのすべてのレコードが永続的に書き込まれたことを示すdurability_ack_up_to_offset属性 (int) を持つIngestRecordResponseオブジェクトを受け取ります。
from zerobus.sdk.shared import IngestRecordResponse
def on_ack(response: IngestRecordResponse) -> None:
print(f"Records acknowledged up to offset: {response.durability_ack_up_to_offset}")
options = StreamConfigurationOptions(
record_type=RecordType.JSON,
ack_callback=on_ack
)
プロトコル バッファー: 型セーフな取り込みの場合は、 RecordType.PROTO (デフォルト) でプロトコル バッファーを使用します。python -m zerobus.tools.generate_protoを使用してテーブルからスキーマを生成します。
完全なドキュメント、構成オプション、非同期 API、およびプロトコル バッファーの例については、 Python SDK リポジトリを参照してください。
Rust 1.70以上が必要です。SDK 、 JSON (最も単純) とプロトコル バッファー (本番運用に推奨) をサポートしています。
cargo add databricks-zerobus-ingest-sdk
cargo add tokio --features macros,rt-multi-thread
JSONの例:
use std::error::Error;
use databricks_zerobus_ingest_sdk::{
RecordType, StreamConfigurationOptions, TableProperties, ZerobusSdk
};
// Configuration - see "Before you begin" section for how to obtain these values.
const ZEROBUS_ENDPOINT: &str = "workspace-id.zerobus.region.cloud.databricks.com";
const WORKSPACE_URL: &str = "https://your-workspace.cloud.databricks.com";
const TABLE_NAME: &str = "catalog.schema.air_quality";
const CLIENT_ID: &str = "your-client-id";
const CLIENT_SECRET: &str = "your-client-secret";
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let table_properties = TableProperties {
table_name: TABLE_NAME.to_string(),
descriptor_proto: None,
};
let options = StreamConfigurationOptions {
record_type: RecordType::Json,
..Default::default()
};
let sdk = ZerobusSdk::new(ZEROBUS_ENDPOINT.to_string(), WORKSPACE_URL.to_string())?;
let mut stream = sdk.create_stream(
table_properties, CLIENT_ID.to_string(), CLIENT_SECRET.to_string(), Some(options)
).await?;
for i in 0..100 {
let record = format!(r#"{{"device_name": "sensor-{}", "temp": 22, "humidity": 55}}"#, i);
let ack = stream.ingest_record(record.into_bytes()).await?;
ack.await?;
}
stream.close().await?;
Ok(())
}
プロトコル バッファー: 型セーフな取り込みの場合は、 RecordType::Proto (デフォルト) でプロトコル バッファーを使用します。リポジトリ内のtools/generate_filesツールを使用してスキーマを生成します。
完全なドキュメント、構成オプション、バッチ取り込み、およびプロトコル バッファーの例については、 Rust SDK リポジトリを参照してください。
Java 8以降が必要です。SDK は、型安全なスキーマ定義にプロトコル バッファーを使用します。
Maven :
<dependency>
<groupId>com.databricks</groupId>
<artifactId>zerobus-ingest-sdk</artifactId>
<version>0.1.0</version>
</dependency>
プロトコル バッファの設定:
テーブルからプロト スキーマを生成し、コンパイルします。
# Generate proto from Unity Catalog table schema.
java -jar zerobus-ingest-sdk-0.1.0-jar-with-dependencies.jar \
--uc-endpoint "https://dbc-a1b2c3d4-e5f6.cloud.databricks.com" \
--client-id "<client-id>" --client-secret "<client-secret>" \
--table "<catalog.schema.table>" --output "record.proto"
# Compile proto to Java.
protoc --java_out=src/main/java record.proto
例:
import com.databricks.zerobus.*;
import com.example.proto.Record.AirQuality;
public class ZerobusClient {
public static void main(String[] args) throws Exception {
// Configuration - see "Before you begin" section for how to obtain these values.
ZerobusSdk sdk = new ZerobusSdk(
"1234567890123456.zerobus.us-west-2.cloud.databricks.com",
"https://dbc-a1b2c3d4-e5f6.cloud.databricks.com"
);
TableProperties<AirQuality> tableProperties = new TableProperties<>(
"main.default.air_quality",
AirQuality.getDefaultInstance()
);
ZerobusStream<AirQuality> stream = sdk.createStream(
tableProperties, "your-client-id", "your-client-secret"
).join();
try {
for (int i = 0; i < 100; i++) {
AirQuality record = AirQuality.newBuilder()
.setDeviceName("sensor-" + i)
.setTemp(22)
.setHumidity(55)
.build();
stream.ingestRecord(record).join();
}
} finally {
stream.close();
}
}
}
完全なドキュメント、構成オプション、非ブロッキング取り込み、およびログ設定については、 Java SDK リポジトリを参照してください。
Go 1.21以上が必要です。SDK 、 JSON (最も単純) とプロトコル バッファー (本番運用に推奨) をサポートしています。
go get github.com/databricks/zerobus-go-sdk
JSONの例:
package main
import (
"fmt"
"log"
zerobus "github.com/databricks/zerobus-go-sdk/sdk"
)
func main() {
// Configuration - see "Before you begin" section for how to obtain these values.
sdk, _ := zerobus.NewZerobusSdk(
"1234567890123456.zerobus.us-west-2.cloud.databricks.com",
"https://dbc-a1b2c3d4-e5f6.cloud.databricks.com",
)
defer sdk.Free()
options := zerobus.DefaultStreamConfigurationOptions()
options.RecordType = zerobus.RecordTypeJson
stream, _ := sdk.CreateStream(
zerobus.TableProperties{TableName: "main.default.air_quality"},
"your-client-id", "your-client-secret", options,
)
defer stream.Close()
for i := 0; i < 100; i++ {
record := fmt.Sprintf(`{"device_name": "sensor-%d", "temp": 22, "humidity": 55}`, i)
ack, _ := stream.IngestRecord(record)
ack.Await()
}
stream.Flush()
}
プロトコル バッファー: 型安全な取り込みを行うには、 RecordTypeProto (デフォルト) でプロトコル バッファーを使用し、 TablePropertiesにDescriptorProtoを指定します。
完全なドキュメント、構成オプション、およびプロトコル バッファーの例については、 Go SDK リポジトリを参照してください。
Node.js 16 以上が必要です。SDK 、 JSON (最も単純) とプロトコル バッファー (本番運用に推奨) をサポートしています。
npm install @databricks/zerobus-ingest-sdk
JSONの例:
import { ZerobusSdk, RecordType } from '@databricks/zerobus-ingest-sdk';
// Configuration - see "Before you begin" section for how to obtain these values.
const sdk = new ZerobusSdk(
'1234567890123456.zerobus.us-west-2.cloud.databricks.com',
'https://dbc-a1b2c3d4-e5f6.cloud.databricks.com',
);
const stream = await sdk.createStream(
{ tableName: 'main.default.air_quality' },
'your-client-id',
'your-client-secret',
{ recordType: RecordType.Json },
);
try {
for (let i = 0; i < 100; i++) {
const record = { device_name: `sensor-${i}`, temp: 22, humidity: 55 };
await stream.ingestRecord(record);
}
await stream.flush();
} finally {
await stream.close();
}
プロトコル バッファー: 型セーフな取り込みを行うには、 RecordType.Proto (デフォルト) でプロトコル バッファーを使用し、テーブル プロパティにdescriptorProtoを指定します。
完全なドキュメント、構成オプション、およびプロトコル バッファーの例については、 TypeScript SDK リポジトリを参照してください。