メインコンテンツまでスキップ

Zerobus Ingestコネクタを使用する

このページでは、 LakeFlow Connectの Zerobus Ingest コネクタを使用してデータを取り込む方法について説明します。

インターフェースを選択

Zerobus Ingestは、gRPC、REST、およびOpenTelemetry(OTLP)インターフェースをサポートしています。SDKは、高スループットアプリケーションを構築するための開発者向けインターフェースを備えた、カスタムgRPCベースのクライアントを提供します。RESTインターフェースは、大量の「通信量の多い」デバイスを扱う際のアーキテクチャ上の制約に対応します。OTLPインターフェースは、カスタムライブラリを必要とせずに、標準的なOpenTelemetryデータを受け入れます。

  • gRPC「接続税」を備えた SDK: gRPC は、永続的な接続による高スループット パフォーマンスに特化しています。ただし、開いているストリームはすべて同時実行クォータにカウントされます。
  • REST「スループット税」: REST では更新ごとに完全なハンドシェイクが必要なため、ステートレスになります。REST は、ステータスが頻繁に報告されないエッジ デバイスのユースケースに適しています。
  • OpenTelemetry (OTLP): OpenTelemetry SDK またはコレクターを既に使用している場合、OTLP エンドポイントはトレース、ログ、およびメトリクスをカスタム統合を必要とせずにUnity Catalog Deltaテーブルに取り込みます。 詳細については、 「Zerobus Ingest を使用した OpenTelemetry データの取り込み」を参照してください。

大容量ストリームにはgRPC、大規模で低頻度なデバイス群にはREST、OpenTelemetryで既に計測済みの環境にはOTLPを活用する。

ワークスペース URL と Zerobus Ingest エンドポイントを取得する

ログインすると、ワークスペースの URL がブラウザに表示されます。完全な URL はhttps://<databricks-instance>.gcp.databricks.com/?o=XXXXX形式に従いますが、ワークスペース URL は/?o=XXXXXより前のすべての内容で構成されます。たとえば、次の完全な URL を指定すると、ワークスペースの URL とワークスペース ID を特定できます。

  • 完全なURL: https://1234567890123456.0.gcp.databricks.com/?o=1234567890123456#
  • ワークスペース URL: https://1234567890123456.0.gcp.databricks.com
  • ワークスペースID: 1234567890123456

サーバー エンドポイントは、ワークスペースとリージョンによって異なります。

  • サーバーエンドポイント: <workspace-id>.zerobus.<region>.gcp.databricks.com

利用可能なリージョンについては、 Zerobus Ingest コネクタの制限事項を参照してください。

ターゲットテーブルを作成または識別する

データを取り込むターゲット テーブルを特定します。新しいターゲット テーブルを作成するには、 CREATE TABLE SQL コマンドを実行します。たとえば、 unity.default.air_qualityという名前の新しいテーブルを作成します。

SQL
    CREATE TABLE unity.default.air_quality (
device_name STRING, temp INT, humidity LONG);
注記

OpenTelemetry 取り込みの場合、テーブルは信号タイプ (トレース、ログ、メトリクス) ごとに事前定義されたスキーマを使用する必要があります。 Unity Catalogでターゲットテーブルを作成するを参照してください。

サービスプリンシパルを作成し、権限を付与する

サービスプリンシパルは、パーソナライズされたアカウントよりも高いセキュリティを提供する特殊な ID です。 サービスプリンシパルと認証にサービスプリンシパルを使用する方法の詳細については、 「 OAuthを使用したDatabricksへのサービスプリンシパルのアクセスを許可する」を参照してください。

  1. サービスプリンシパルを作成するには、 [設定] > [ID とアクセス] に移動します。

  2. 「サービスシプリンパル」 で、 「管理」 を選択します。

  3. [ サービスプリンシパルの追加 ] をクリックします。

  4. [サービスプリンシパルの追加] ウィンドウで、 [新規追加 ] をクリックして新しいサービスプリンシパルを作成します。

  5. サービスプリンシパルのクライアント ID とクライアント シークレットを生成して保存します。

  6. カタログ、スキーマ、テーブルに必要な権限をサービスプリンシパルに付与します。

    1. サービス プリンシパルページで、 「構成」 タブに移動します。
    2. アプリケーション ID (UUID) をコピーします。
    3. 権限を付与するには、以下のSQLを使用してください。必要に応じて、例のUUID、カタログ名、スキーマ名、テーブル名を置き換えてください。
    SQL
    GRANT USE CATALOG ON CATALOG <catalog> TO `<UUID>`;
    GRANT USE SCHEMA ON SCHEMA <catalog.schema> TO `<UUID>`;
    GRANT MODIFY, SELECT ON TABLE <catalog.schema.table_name> TO `<UUID>`;
重要

ALL PRIVILEGESが付与されているテーブルであっても、テーブルに対するMODIFYおよびSELECT権限を付与する必要があります。

クライアントを記述する

好みのプログラミング言語で Zerobus SDK を使用するか、REST API を使用して、データをターゲット テーブルに取り込みます。

Python 3.9 以上が必要です。SDK は、高性能 Rust SDK への PyO3 バインディングを使用して、純粋な Python よりも最大 40 倍高いスループットと、Rust 非同期ランタイムによる効率的なネットワーク I/O を提供します。JSON (最も単純) とプロトコル バッファー (本番運用に推奨) をサポートしています。 SDK は、同期と非同期の両方の実装と、3 つの異なる取り込み方法 (将来ベース、オフセット ベース、ファイア アンド フォーゲット) もサポートしています。

Bash
pip install databricks-zerobus-ingest-sdk

JSONの例:

Python
import logging
from zerobus.sdk.sync import ZerobusSdk
from zerobus.sdk.shared import RecordType, StreamConfigurationOptions, TableProperties

# See "Get your workspace URL and Zerobus Ingest endpoint" for information on obtaining these values.
SERVER_ENDPOINT="https://1234567890123456.zerobus.us-east1.gcp.databricks.com"
DATABRICKS_WORKSPACE_URL="https://1234567890123456.0.gcp.databricks.com"
TABLE_NAME="main.default.air_quality"
CLIENT_ID="your-client-id"
CLIENT_SECRET="your-client-secret"

sdk = ZerobusSdk(
SERVER_ENDPOINT,
DATABRICKS_WORKSPACE_URL
)

table_properties = TableProperties(TABLE_NAME)
options = StreamConfigurationOptions(record_type=RecordType.JSON)
stream = sdk.create_stream(CLIENT_ID, CLIENT_SECRET, table_properties, options)

try:
for i in range(1000):
record_dict = {
"device_name": f"sensor-{i}",
"temp": 20 + i % 15,
"humidity": 50 + i % 40
}
offset = stream.ingest_record_offset(record_dict)

# Optional: Wait for durability confirmation
stream.wait_for_offset(offset)
finally:
stream.close()

確認コールバック: 取り込みの進行状況を非同期的に追跡するには、 ack_callbackオプションを使用します。レコードが確認されるか失敗したときに呼び出されるon_ack(offset: int)メソッドとon_error(offset: int, error_message: str)メソッドを持つAckCallbackのサブクラスを渡します。

Python
from zerobus.sdk.shared import AckCallback, StreamConfigurationOptions, RecordType

class MyAckCallback(AckCallback):
def on_ack(self, offset: int) -> None:
print(f"Record acknowledged at offset: {offset}")

def on_error(self, offset: int, error_message: str) -> None:
print(f"Error at offset {offset}: {error_message}")

options = StreamConfigurationOptions(
record_type = RecordType.JSON,
ack_callback = MyAckCallback()
)

プロトコル バッファー: 型セーフな取り込みを行うには、 RecordType.PROTO (デフォルト) でプロトコル バッファーを使用し、テーブル プロパティにdescriptorProtoを指定します。

完全なドキュメント、構成オプション、バッチ取り込み、およびプロトコル バッファーの例については、 Python SDK リポジトリを参照してください。

次のステップ