Zerobus Ingestコネクタを使用する
プレビュー
Zerobus Ingest コネクタはパブリック プレビュー段階です。お試しいただくには、Databricks アカウント担当者にお問い合わせください。
このページでは、 LakeFlow Connectの Zerobus 直接書き込みコネクタを使用してデータを取り込む方法について説明します。
ワークスペースのURLを取得する
ログイン後に Databricks ワークスペースを表示するときは、ブラウザーで次の形式の URL を確認してください: https://<databricks-instance>.com/o=XXXXX 。URL は/o=XXXXXより前の部分すべてで構成されます。例:
完全なURL: https://abcd-teste2-test-spcse2.cloud.databricks.com/?o=2281745829657864#
ワークスペース URL: https://abcd-teste2-test-spcse2.cloud.databricks.com
ターゲットテーブルを作成または識別する
取り込むターゲット テーブルを特定します。新しいターゲット テーブルを作成するには、 CREATE TABLE SQL コマンドを実行します。例えば:
CREATE TABLE unity.default.air_quality (
device_name STRING, temp INT, humidity LONG);
サービスプリンシパルを作成し、権限を付与する
サービスプリンシパルは、パーソナライズされたアカウントよりも高いセキュリティを提供する特殊な ID です。 サービスプリンシパルに関する詳細と、それらを認証に使用する方法については、これらの手順を参照してください。
-
[設定] > [ID とアクセス] でワークスペースにサービスプリンシパルを作成します。
-
サービスプリンシパルのクライアント ID とクライアント シークレットを生成して保存します。
-
カタログ、スキーマ、テーブルに必要な権限をサービスプリンシパルに付与します。
- サービスプリンシパルページで、 「構成」 タブを開きます。
- アプリケーション ID (UUID) をコピーします。
- 必要に応じて、例の UUID とユーザー名、スキーマ名、およびテーブル名を置き換えて、次の SQL を使用します。
SQLGRANT USE CATALOG ON CATALOG <username> TO `<UUID>`;
GRANT USE SCHEMA ON SCHEMA <username.default> TO `<UUID>`;
GRANT MODIFY, SELECT ON TABLE <username.default.table_name> TO `<UUID>`;
クライアントを書く
- Python SDK
- Rust SDK
- Java SDK
Python 3.9 以上が必要です。
-
Python SDK をインストールします。
Bashpip install databricks-zerobus-ingest-sdkあるいは、ソースからインストールします。
Bashgit clone https://github.com/databricks/zerobus-sdk-py.git
cd zerobus-sdk-py
pip install -e . -
プロトコル バッファー定義を生成します。
generate_protoツールを使用して、Unity Catalog テーブル スキーマから proto 定義を自動的に生成します。Bashpython -m zerobus.tools.generate_proto \
--uc-endpoint "https://dbc-a1b2c3d4-e5f6.cloud.databricks.com" \
--client-id "<service-principal-application-id>" \
--client-secret "<service-principal-secret>" \
--table "<catalog.schema.table_name>" \
--output "record.proto"テーブルの出力例:
SQLCREATE TABLE main.default.air_quality (
device_name STRING,
temp INT,
humidity BIGINT
)
USING DELTA;record.protoを生成します:Protosyntax = "proto2";
message air_quality {
optional string device_name = 1;
optional int32 temp = 2;
optional int64 humidity = 3;
}Protobuf メッセージの詳細については、ここを参照してください。
-
プロトコル バッファ定義をコンパイルします。
grpcio-toolsをインストールして proto ファイルをコンパイルします:Bashpip install "grpcio-tools>=1.60.0,<2.0"
python -m grpc_tools.protoc --python_out=. --proto_path=. record.protoこれにより、コンパイルされたプロトコル バッファ定義を含む
record_pb2.pyファイルが生成されます。 -
Python クライアントを作成します。
SDK を使用するには、次の情報が必要です。
- Databricks ワークスペース URL
- ワークスペース ID (ワークスペース URL の
/o=以降にあります) - テーブル名
- サービスシプリンパルのクライアントIDとクライアントシークレット
- Zerobusエンドポイントの形式
https://<workspace_id>.zerobus.<region>.cloud.databricks.com
同期の例:
Pythonimport logging
from zerobus.sdk.sync import ZerobusSdk
from zerobus.sdk.shared import TableProperties
import record_pb2
# Configure logging (optional).
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
# Configuration.
server_endpoint = "1234567890123456.zerobus.us-west-2.cloud.databricks.com"
workspace_url = "https://dbc-a1b2c3d4-e5f6.cloud.databricks.com"
table_name = "main.default.air_quality"
client_id = "your-service-principal-application-id"
client_secret = "your-service-principal-secret"
# Initialize SDK.
sdk = ZerobusSdk(server_endpoint, workspace_url)
# Configure table properties.
table_properties = TableProperties(
table_name,
record_pb2.AirQuality.DESCRIPTOR
)
# Create stream.
stream = sdk.create_stream(
client_id,
client_secret,
table_properties
)
try:
# Ingest records.
for i in range(100):
record = record_pb2.AirQuality(
device_name=f"sensor-{i % 10}",
temp=20 + (i % 15),
humidity=50 + (i % 40)
)
ack = stream.ingest_record(record)
ack.wait_for_ack() # Wait for durability.
print(f"Ingested record {i + 1}")
print("Successfully ingested 100 records!")
finally:
stream.close()
設定オプション
SDK はStreamConfigurationOptionsを介してさまざまな構成オプションをサポートしています。
オプション | デフォルト | 説明 |
|---|---|---|
最大飛行記録数 | 50000 | 未確認レコードの最大数 |
回復 | True | 自動ストリーム回復を有効にする |
リカバリタイムアウトミリ秒 | 15000 | 回復操作のタイムアウト(ミリ秒) |
リカバリバックオフミリ秒 | 2000 | 回復試行間の遅延(ミリ秒) |
回復再試行 | 3 | 回復試行の最大回数 |
フラッシュタイムアウトミリ秒 | 300000 | フラッシュ操作のタイムアウト(ミリ秒) |
サーバー_不足_ack_timeout_ms | 60000 | サーバー確認応答タイムアウト(ミリ秒) |
ack_コールバック | なし | レコード確認時に呼び出されるコールバック |
エラー処理
SDK は 2 つの例外タイプを提供します。
ZerobusException: 再試行可能なエラー(ネットワークの問題、一時的な障害)NonRetriableException: 再試行不可能なエラー (無効な資格情報、テーブルがありません)
from zerobus.sdk.shared import ZerobusException, NonRetriableException
try:
stream.ingest_record(record)
except NonRetriableException as e:
print(f"Fatal error: {e}")
raise
except ZerobusException as e:
print(f"Retriable error: {e}")
# Implement retry logic with backoff.
-
Zerobus SDK リポジトリをクローンします (スキーマ生成ツールと例に必要)。
Bashgit clone https://github.com/databricks/zerobus-sdk-rs.git
cd zerobus-sdk-rs次のような画面が表示されます。
Textzerobus-sdk-rs/
├── sdk/ # Core Zerobus SDK library
│ ├── src/ # Core SDK logic
│ ├── zerobus_service.proto # gRPC protocol definition
│ ├── build.rs # Build script for protobuf compilation
│ └── Cargo.toml
│
├── tools/
│ └── generate_files/ # Schema generation CLI tool
│ ├── src/ # Unity Catalog -> Proto conversion
│ ├── README.md # Tool documentation
│ └── Cargo.toml
│
├── examples/
│ └── basic_example/ # Working example application
│ ├── README.md # Example documentation
│ ├── src/main.rs # Example usage code
│ ├── output/ # Generated schema files
│ │ ├── orders.proto
│ │ ├── orders.rs
│ │ └── orders.descriptor
│ └── Cargo.toml
│
└── README.md # Main README file -
Rust プロジェクトを作成します。
Bashcd .. # Go back to your workspace directory
cargo new example_project
cd example_project次のようなものが表示されるはずです。
Textexample_project/
├── Cargo.toml
└── src/
└── main.rs -
Cargo.tomlに依存関係を追加します:Toml[dependencies]
zerobus-ingest-sdk = "0.1.0" # From crates.io
prost = "0.13.3"
prost-types = "0.13.3"
tokio = { version = "1.42.0", features = ["macros", "rt-multi-thread"] } -
スキーマ生成ツールを実行して、取り込みに必要な protobuf ファイルを生成します。このツールは、テーブルの proto 定義、記述子、および対応する Rust ファイルを生成するのに役立ちます。
Bash# From your example_project directory
cd ../zerobus-sdk-rs/tools/generate_files
cargo run --
--uc-endpoint "https://your-workspace.cloud.databricks.com"
--client-id "your-client-id"
--client-secret "your-client-secret"
--table "catalog.schema.table"
--output-dir "../../output"これにより、プロジェクト内に 3 つのファイルを含む
outputフォルダーが作成されます。Textexample_project/
├── Cargo.toml
├── output/
│ ├── <table_name>.rs
│ ├── <table_name>.proto
│ └── <table_name>.descriptor
└── src/
└── main.rs -
クライアント アプリケーションを作成します。
SDK を使用するには、次の情報が必要です。
- Databricks ワークスペース URL (例:
https://your-workspace.cloud.databricks.com) - テーブル名(例:
catalog.schema.table) - サービスプリンシパルのクライアントIDとシークレット
- Zerobusエンドポイントの形式
https://<workspace_id>.zerobus.<region>.cloud.databricks.com
モックテーブル
air_qualityの例:すべてのファイルを生成すると、ファイル ツリーは次のようになります。
Textexample_project/
├── Cargo.toml
├── output/
│ ├── air_quality.rs
│ ├── air_quality.proto
│ └── air_quality.descriptor
└── src/
└── main.rsair_quality.protoProtosyntax = "proto2";
message table_AirQuality {
optional string device_name = 1;
optional int32 temp = 2;
optional int64 humidity = 3;
}air_quality.rs(自動生成)Rust// @generated by prost-build
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct table_AirQuality {
#[prost(string, optional, tag = "1")]
pub device_name: Option<String>,
#[prost(int32, optional, tag = "2")]
pub temp: Option<i32>,
#[prost(int64, optional, tag = "3")]
pub humidity: Option<i64>,
}main.rs(アプリケーションコード):Rustuse std::error::Error;
use databricks_zerobus_ingest_sdk::{
StreamConfigurationOptions, TableProperties, ZerobusSdk
};
use prost_types::DescriptorProto;
use std::fs;
use prost::Message;
// Change constants to match your setup
const DATABRICKS_WORKSPACE_URL: &str = "https://your-workspace.cloud.databricks.com";
const TABLE_NAME: &str = "catalog.schema.air_quality";
const DATABRICKS_CLIENT_ID: &str = "your-client-id";
const DATABRICKS_CLIENT_SECRET: &str = "your-client-secret";
const ZEROBUS_ENDPOINT: &str = "https://workspace-id.zerobus.region.cloud.databricks.com";
pub mod air_quality {
include!("../output/air_quality.rs");
}
use crate::air_quality::table_AirQuality;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let descriptor_proto = load_descriptor_proto(
"output/air_quality.descriptor",
"air_quality.proto",
"table_AirQuality"
);
let table_properties = TableProperties {
table_name: TABLE_NAME.to_string(),
descriptor_proto,
};
let stream_configuration_options = StreamConfigurationOptions {
max_inflight_records: 1000,
..Default::default()
};
let sdk = ZerobusSdk::new(
ZEROBUS_ENDPOINT.to_string(),
DATABRICKS_WORKSPACE_URL.to_string(),
)?;
let mut stream = sdk
.create_stream(
table_properties,
DATABRICKS_CLIENT_ID.to_string(),
DATABRICKS_CLIENT_SECRET.to_string(),
Some(stream_configuration_options),
)
.await?;
println!("Stream created successfully!");
let record = table_AirQuality {
device_name: Some("sensor-001".to_string()),
temp: Some(22),
humidity: Some(55),
};
let ack_future = stream
.ingest_record(record.encode_to_vec())
.await?;
println!("Record submitted for ingestion");
let offset_id = ack_future.await?;
println!("Record acknowledged at offset: {}", offset_id);
stream.close().await?;
println!("Stream closed successfully");
Ok(())
}
fn load_descriptor_proto(
path: &str,
file_name: &str,
message_name: &str
) -> DescriptorProto {
let descriptor_bytes = fs::read(path)
.expect("Failed to read proto descriptor file");
let file_descriptor_set = prost_types::FileDescriptorSet::decode(
descriptor_bytes.as_ref()
).unwrap();
let file_descriptor_proto = file_descriptor_set
.file
.into_iter()
.find(|f| f.name.as_ref().map(|n| n.as_str()) == Some(file_name))
.expect("File descriptor not found");
file_descriptor_proto
.message_type
.into_iter()
.find(|m| m.name.as_ref().map(|n| n.as_str()) == Some(message_name))
.expect("Message descriptor not found")
} - Databricks ワークスペース URL (例:
-
アプリケーションを実行します。
Bashcd example_project
cargo run次のような出力が表示されます。
TextStream created successfully!
Record submitted for ingestion
Record acknowledged at offset: 0
Stream closed successfully
Java 8以降が必要です。
-
Java SDK をインストールします。
オプションA: Maven
依存関係を
pom.xmlに追加します:XML<dependency>
<groupId>com.databricks</groupId>
<artifactId>zerobus-ingest-sdk</artifactId>
<version>0.1.0</version>
</dependency>すべての依存関係がバンドルされた fat JAR を使用するには:
XML<dependency>
<groupId>com.databricks</groupId>
<artifactId>zerobus-ingest-sdk</artifactId>
<version>0.1.0</version>
<classifier>jar-with-dependencies</classifier>
</dependency>オプションB: ソースからビルドする
SDK をクローンしてビルドします。
Bashgit clone https://github.com/databricks/zerobus-sdk-java.git
cd zerobus-sdk-java
mvn clean packageこれにより、
target/ディレクトリに 2 つの JAR ファイルが生成されます。-
Fat JAR(推奨) :
zerobus-ingest-sdk-0.1.0-jar-with-dependencies.jar(約18MB)- すべての依存関係がバンドルされた自己完結型
- 追加の依存関係は不要
-
通常の JAR :
zerobus-ingest-sdk-0.1.0.jar(~144KB)- SDKクラスのみ
- クラスパスへの依存関係が必要
依存関係 (Maven によって自動的に管理されるか、通常の JAR を手動で使用する場合に必要):
protobuf-java3.24.0grpc-netty-shaded1.58.0grpc-protobuf1.58.0grpc-stub1.58.0javax.annotation-api1.3.2slf4j-api1.7.36
-
-
プロトコル バッファー定義を生成します。
組み込みツールを使用して、 Unity Catalogテーブル スキーマからプロトタイプ定義を自動的に生成します。
ビルドから fat JAR を使用する場合:
Bashjava -jar zerobus-ingest-sdk-0.1.0-jar-with-dependencies.jar \
--uc-endpoint "https://dbc-a1b2c3d4-e5f6.cloud.databricks.com" \
--client-id "<service-principal-application-id>" \
--client-secret "<service-principal-secret>" \
--table "<catalog.schema.table_name>" \
--output "record.proto" \
--proto-msg "AirQuality"Maven 依存関係を使用する場合:
Bashmvn dependency:copy-dependencies
java -jar target/dependency/zerobus-ingest-sdk-0.1.0-jar-with-dependencies.jar \
--uc-endpoint "https://dbc-a1b2c3d4-e5f6.cloud.databricks.com" \
--client-id "<service-principal-application-id>" \
--client-secret "<service-principal-secret>" \
--table "<catalog.schema.table_name>" \
--output "record.proto" \
--proto-msg "AirQuality"テーブルの出力例:
SQLCREATE TABLE main.default.air_quality (
device_name STRING,
temp INT,
humidity BIGINT
)
USING DELTA;record.protoを生成します:Protosyntax = "proto2";
package com.example;
option java_package = "com.example.proto";
option java_outer_classname = "Record";
message AirQuality {
optional string device_name = 1;
optional int32 temp = 2;
optional int64 humidity = 3;
}Protobuf メッセージの詳細については、ここを参照してください。
-
プロトコル バッファ定義をコンパイルします。
Mavenの使用
protobuf プラグインを
pom.xmlに追加します:XML<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.7.1</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:3.24.0:exe:${os.detected.classifier}</protocArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>record.protosrc/main/proto/に配置して実行します:Bashmvn compileprotocを直接使用する
プロトコル バッファ コンパイラ (protoc 3.24.0) をインストールします。そしてコンパイルします:
Bashprotoc --java_out=src/main/java src/main/proto/record.protoこれにより、Java ファイル (たとえば、
src/main/java/com/example/proto/Record.java) が生成されます。 -
Java クライアントを作成します。
SDK を使用するには、次の情報が必要です。
- Databricks ワークスペース URL
- ワークスペース ID (ワークスペース URL の
/o=以降にあります) - テーブル名
- サービスシプリンパルのクライアントIDとクライアントシークレット
- Zerobusエンドポイントの形式
https://<workspace_id>.zerobus.<region>.cloud.databricks.com
ブロック取り込みの例:
Javapackage com.example;
import com.databricks.zerobus.*;
import com.example.proto.Record.AirQuality;
public class ZerobusClient {
public static void main(String[] args) throws Exception {
// Configuration.
String serverEndpoint = "1234567890123456.zerobus.us-west-2.cloud.databricks.com";
String workspaceUrl = "https://dbc-a1b2c3d4-e5f6.cloud.databricks.com";
String tableName = "main.default.air_quality";
String clientId = "your-service-principal-application-id";
String clientSecret = "your-service-principal-secret";
// Initialize SDK.
ZerobusSdk sdk = new ZerobusSdk(serverEndpoint, workspaceUrl);
// Configure table properties.
TableProperties<AirQuality> tableProperties = new TableProperties<>(
tableName,
AirQuality.getDefaultInstance()
);
// Create stream.
ZerobusStream<AirQuality> stream = sdk.createStream(
tableProperties,
clientId,
clientSecret
).join();
try {
// Ingest records.
for (int i = 0; i < 100; i++) {
AirQuality record = AirQuality.newBuilder()
.setDeviceName("sensor-" + (i % 10))
.setTemp(20 + (i % 15))
.setHumidity(50 + (i % 40))
.build();
stream.ingestRecord(record).join(); // Wait for durability.
System.out.println("Ingested record " + (i + 1));
}
System.out.println("Successfully ingested 100 records!");
} finally {
stream.close();
}
}
}非ブロッキング取り込みの例:
高スループットの取り込みの場合:
Javapackage com.example;
import com.databricks.zerobus.*;
import com.example.proto.Record.AirQuality;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
public class NonBlockingClient {
public static void main(String[] args) throws Exception {
// Configuration.
String serverEndpoint = "1234567890123456.zerobus.us-west-2.cloud.databricks.com";
String workspaceUrl = "https://dbc-a1b2c3d4-e5f6.cloud.databricks.com";
String tableName = "main.default.air_quality";
String clientId = "your-service-principal-application-id";
String clientSecret = "your-service-principal-secret";
// Initialize SDK.
ZerobusSdk sdk = new ZerobusSdk(serverEndpoint, workspaceUrl);
// Configure table properties.
TableProperties<AirQuality> tableProperties = new TableProperties<>(
tableName,
AirQuality.getDefaultInstance()
);
// Configure stream options.
StreamConfigurationOptions options = StreamConfigurationOptions.builder()
.setMaxInflightRecords(50000)
.setAckCallback(response ->
System.out.println("Acknowledged offset: " +
response.getDurabilityAckUpToOffset()))
.build();
// Create stream.
ZerobusStream<AirQuality> stream = sdk.createStream(
tableProperties,
clientId,
clientSecret,
options
).join();
List<CompletableFuture<Void>> futures = new ArrayList<>();
try {
// Ingest records.
for (int i = 0; i < 100000; i++) {
AirQuality record = AirQuality.newBuilder()
.setDeviceName("sensor-" + (i % 10))
.setTemp(20 + (i % 15))
.setHumidity(50 + (i % 40))
.build();
futures.add(stream.ingestRecord(record));
}
// Flush and wait for all records to be acknowledged.
stream.flush();
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
System.out.println("Successfully ingested 100,000 records!");
} finally {
stream.close();
}
}
} -
クライアントを実行します。
Maven の使用:
Bashmvn compile exec:java -Dexec.mainClass="com.example.ZerobusClient"手動でコンパイルして実行します。
Bashjavac -cp "lib/*" -d out src/main/java/com/example/ZerobusClient.java src/main/java/com/example/proto/Record.java
java -cp "lib/*:out" com.example.ZerobusClient
設定オプション
SDK はStreamConfigurationOptionsを介してさまざまな構成オプションをサポートしています。
オプション | デフォルト | 説明 |
|---|---|---|
最大飛行記録数 | 50000 | 未確認レコードの最大数 |
回復 | True | 自動ストリーム回復を有効にする |
回復タイムアウトミリ秒 | 15000 | 回復操作のタイムアウト(ミリ秒) |
回復バックオフMs | 2000 | 回復試行間の遅延(ミリ秒) |
回復再試行 | 3 | 回復試行の最大回数 |
フラッシュタイムアウトミリ秒 | 300000 | フラッシュ操作のタイムアウト(ミリ秒) |
サーバーのAckTimeoutMs不足 | 60000 | サーバー確認応答タイムアウト(ミリ秒) |
ackコールバック | null | レコード確認時に呼び出されるコールバック |
構成例:
StreamConfigurationOptions options = StreamConfigurationOptions.builder()
.setMaxInflightRecords(10000)
.setRecovery(true)
.setRecoveryTimeoutMs(20000)
.setAckCallback(response ->
System.out.println("Ack: " + response.getDurabilityAckUpToOffset()))
.build();
ロギング
SDK はログ記録に SLF4J を使用します。
デバッグ ログを有効にする:
java -Dorg.slf4j.simpleLogger.log.com.databricks.zerobus=debug -cp "lib/*:out" com.example.ZerobusClient
Maven を使用する場合:
mvn exec:java -Dexec.mainClass="com.example.ZerobusClient" -Dorg.slf4j.simpleLogger.log.com.databricks.zerobus=debug
使用可能なログレベル: trace 、 debug 、 info 、 warn 、 error
エラー処理
SDK は 2 つの例外タイプを提供します。
ZerobusException: 再試行可能なエラー(ネットワークの問題、一時的な障害)NonRetriableException: 再試行不可能なエラー (無効な資格情報、テーブルがありません)
try {
stream.ingestRecord(record);
} catch (NonRetriableException e) {
System.err.println("Fatal error: " + e.getMessage());
throw e;
} catch (ZerobusException e) {
System.err.println("Retriable error: " + e.getMessage());
// Implement retry logic with backoff.
}