メインコンテンツまでスキップ

Zerobus Ingestコネクタを使用する

このページでは、 LakeFlow Connectの Zerobus Ingest コネクタを使用してデータを取り込む方法について説明します。

インターフェースを選択

Zerobus Ingest は gRPC および REST インターフェースをサポートしています。SDK は、高スループット アプリケーションを構築するための開発者向けのインターフェースを備えたカスタム gRPC ベースのクライアントを提供します。REST インターフェースは、大量の「チャッティ」デバイスを扱う際のアーキテクチャ上の制約を処理します。

  • gRPC「接続税」を備えた SDK: gRPC は、永続的な接続による高スループット パフォーマンスに特化しています。ただし、開いているストリームはすべて同時実行クォータにカウントされます。
  • REST「スループット税」: REST では更新ごとに完全なハンドシェイクが必要なため、ステートレスになります。REST は、ステータスが頻繁に報告されないエッジ デバイスのユースケースに適しています。

大容量のストリームには gRPC を活用し、大規模で低頻度のデバイス群やサポートされていない言語には REST を活用します。

ワークスペース URL と Zerobus Ingest エンドポイントを取得する

ログインすると、ワークスペースの URL がブラウザに表示されます。完全な URL はhttps://<databricks-instance>.com/o=XXXXX形式に従いますが、ワークスペース URL は/o=XXXXXより前のすべての内容で構成されます。たとえば、次の完全な URL を指定すると、ワークスペースの URL とワークスペース ID を特定できます。

  • 完全なURL: https://abcd-teste2-test-spcse2.cloud.databricks.com/?o=2281745829657864#
  • ワークスペース URL: https://abcd-teste2-test-spcse2.cloud.databricks.com
  • ワークスペースID: 2281745829657864

サーバー エンドポイントは、ワークスペースとリージョンによって異なります。

  • サーバーエンドポイント: <workspace-id>.zerobus.<region>.cloud.databricks.com

利用可能なリージョンについては、 Zerobus Ingest コネクタの制限事項を参照してください。

ターゲットテーブルを作成または識別する

データを取り込むターゲット テーブルを特定します。新しいターゲット テーブルを作成するには、 CREATE TABLE SQL コマンドを実行します。たとえば、 unity.default.air_qualityという名前の新しいテーブルを作成します。

SQL
    CREATE TABLE unity.default.air_quality (
device_name STRING, temp INT, humidity LONG);

サービスプリンシパルを作成し、権限を付与する

サービスプリンシパルは、パーソナライズされたアカウントよりも高いセキュリティを提供する特殊な ID です。 サービスプリンシパルと認証にサービスプリンシパルを使用する方法の詳細については、 「 OAuthを使用したDatabricksへのサービスプリンシパルのアクセスを許可する」を参照してください。

  1. サービスプリンシパルを作成するには、 [設定] > [ID とアクセス] に移動します。

  2. 「サービスシプリンパル」 で、 「管理」 を選択します。

  3. [ サービスプリンシパルの追加 ] をクリックします。

  4. [サービスプリンシパルの追加] ウィンドウで、 [新規追加 ] をクリックして新しいサービスプリンシパルを作成します。

  5. サービスプリンシパルのクライアント ID とクライアント シークレットを生成して保存します。

  6. カタログ、スキーマ、テーブルに必要な権限をサービスプリンシパルに付与します。

    1. サービス プリンシパルページで、 「構成」 タブに移動します。
    2. アプリケーション ID (UUID) をコピーします。
    3. 権限を付与するには、以下のSQLを使用してください。必要に応じて、例のUUID、カタログ名、スキーマ名、テーブル名を置き換えてください。
    SQL
    GRANT USE CATALOG ON CATALOG <catalog> TO `<UUID>`;
    GRANT USE SCHEMA ON SCHEMA <catalog.schema> TO `<UUID>`;
    GRANT MODIFY, SELECT ON TABLE <catalog.schema.table_name> TO `<UUID>`;
重要

ALL PRIVILEGESが付与されているテーブルであっても、テーブルに対するMODIFYおよびSELECT権限を付与する必要があります。

クライアントを記述する

好みのプログラミング言語で Zerobus SDK を使用するか、REST API を使用して、データをターゲット テーブルに取り込みます。

Python 3.9 以上が必要です。SDK は、高性能 Rust SDK への PyO3 バインディングを使用して、純粋な Python よりも最大 40 倍高いスループットと、Rust 非同期ランタイムによる効率的なネットワーク I/O を提供します。JSON (最も単純) とプロトコル バッファー (本番運用に推奨) をサポートしています。 SDK は、同期と非同期の両方の実装と、3 つの異なる取り込み方法 (将来ベース、オフセット ベース、ファイア アンド フォーゲット) もサポートしています。

Bash
pip install databricks-zerobus-ingest-sdk

JSONの例:

Python
import logging
from zerobus.sdk.sync import ZerobusSdk
from zerobus.sdk.shared import RecordType, StreamConfigurationOptions, TableProperties

# See "Get your workspace URL and Zerobus Ingest endpoint" for information on obtaining these values.
SERVER_ENDPOINT="https://1234567890123456.zerobus.us-west-2.cloud.databricks.com"
DATABRICKS_WORKSPACE_URL="https://dbc-a1b2c3d4-e5f6.cloud.databricks.com"
TABLE_NAME="main.default.air_quality"
CLIENT_ID="your-client-id"
CLIENT_SECRET="your-client-secret"

sdk = ZerobusSdk(
SERVER_ENDPOINT,
DATABRICKS_WORKSPACE_URL
)

table_properties = TableProperties(TABLE_NAME)
options = StreamConfigurationOptions(record_type=RecordType.JSON)
stream = sdk.create_stream(CLIENT_ID, CLIENT_SECRET, table_properties, options)

try:
for i in range(1000):
record_dict = {
"device_name": f"sensor-{i}",
"temp": 20 + i % 15,
"humidity": 50 + i % 40
}
offset = stream.ingest_record_offset(record_dict)

# Optional: Wait for durability confirmation
stream.wait_for_offset(offset)
finally:
stream.close()

確認コールバック: 取り込みの進行状況を非同期的に追跡するには、 ack_callbackオプションを使用します。レコードが確認されるか失敗したときに呼び出されるon_ack(offset: int)メソッドとon_error(offset: int, error_message: str)メソッドを持つAckCallbackのサブクラスを渡します。

Python
from zerobus.sdk.shared import AckCallback, StreamConfigurationOptions, RecordType

class MyAckCallback(AckCallback):
def on_ack(self, offset: int) -> None:
print(f"Record acknowledged at offset: {offset}")

def on_error(self, offset: int, error_message: str) -> None:
print(f"Error at offset {offset}: {error_message}")

options = StreamConfigurationOptions(
record_type = RecordType.JSON,
ack_callback = MyAckCallback()
)

プロトコル バッファー: 型セーフな取り込みを行うには、 RecordType.PROTO (デフォルト) でプロトコル バッファーを使用し、テーブル プロパティにdescriptorProtoを指定します。

完全なドキュメント、構成オプション、バッチ取り込み、およびプロトコル バッファーの例については、 Python SDK リポジトリを参照してください。

次のステップ