メインコンテンツまでスキップ

Zerobus Ingestコネクタを使用する

このページでは、 LakeFlow Connectの Zerobus Ingest コネクタを使用してデータを取り込む方法について説明します。

インターフェースの選択

Zerobus Ingest は gRPC および REST インターフェースをサポートしています。すべての SDK は当社の gRPC APIsと直接統合され、カスタムの高スループット クライアントを構築するための開発者向けのインターフェースを提供します。 REST インターフェースは、大量の「チャッティ」デバイスを扱う際のアーキテクチャ上の制約を処理します。

  • gRPC「接続税」を備えた SDK : gRPC は、永続的な接続による高スループット パフォーマンスに特化しています。ただし、開いているストリームはすべて同時実行クォータにカウントされます。
  • REST「スループット税」 :REST では、更新ごとに完全なハンドシェイクが必要なため、ステートレスになります。REST は、ステータスが頻繁に報告されないエッジ デバイスのユースケースに適しています。

大容量のストリームには gRPC を活用し、大規模で低頻度のデバイス群やサポートされていない言語には REST を活用します。

ワークスペース URL と Zerobus Ingest Endpoint を取得する

ログイン後に Databricks ワークスペースを表示するときは、ブラウザーで次の形式の URL を確認してください: https://<databricks-instance>.com/o=XXXXX 。URL は/o=XXXXXより前の部分すべてで構成されます。例:

完全なURL: https://abcd-teste2-test-spcse2.cloud.databricks.com/?o=2281745829657864#

ワークスペース URL: https://abcd-teste2-test-spcse2.cloud.databricks.com

ワークスペースID: 2281745829657864

サーバー エンドポイントは、ワークスペースとリージョンによって異なります。

サーバーエンドポイント: <workspace-id>.zerobus.<region>.cloud.databricks.com

ターゲットテーブルを作成または識別する

取り込むターゲット テーブルを特定します。新しいターゲット テーブルを作成するには、 CREATE TABLE SQL コマンドを実行します。例えば:

SQL
    CREATE TABLE unity.default.air_quality (
device_name STRING, temp INT, humidity LONG);

サービスプリンシパルを作成し、権限を付与する

サービスプリンシパルは、パーソナライズされたアカウントよりも高いセキュリティを提供する特殊な ID です。 サービスプリンシパルに関する詳細と、それらを認証に使用する方法については、これらの手順を参照してください。

  1. [設定] > [ID とアクセス] でワークスペースにサービスプリンシパルを作成します。

  2. サービスプリンシパルのクライアント ID とクライアント シークレットを生成して保存します。

  3. カタログ、スキーマ、テーブルに必要な権限をサービスプリンシパルに付与します。

    1. サービスプリンシパルページで、 構成 タブを開きます。
    2. アプリケーション ID (UUID) をコピーします。
    3. 必要に応じて、例の UUID とユーザー名、スキーマ名、およびテーブル名を置き換えて、次の SQL を使用します。
    SQL
    GRANT USE CATALOG ON CATALOG <username> TO `<UUID>`;
    GRANT USE SCHEMA ON SCHEMA <username.default> TO `<UUID>`;
    GRANT MODIFY, SELECT ON TABLE <username.default.table_name> TO `<UUID>`;

クライアントを記述する

Python 3.9 以上が必要です。SDK は、高性能 Rust SDK への PyO3 バインディングを使用して、純粋な Python よりも最大 40 倍高いスループットと、Rust 非同期ランタイムによる効率的なネットワーク I/O を提供します。JSON (最も単純) とプロトコル バッファー (本番運用に推奨) をサポートしています。 SDK は、同期と非同期の両方の実装と、3 つの異なる取り込み方法 (将来ベース、オフセット ベース、ファイア アンド フォーゲット) もサポートしています。

Bash
pip install databricks-zerobus-ingest-sdk

JSONの例:

Python
  import logging
from zerobus.sdk.sync import ZerobusSdk
from zerobus.sdk.shared import RecordType, StreamConfigurationOptions, TableProperties

# Configuration - see "Before you begin" section for how to obtain these values.
SERVER_ENDPOINT="https://1234567890123456.zerobus.us-west-2.cloud.databricks.com"
DATABRICKS_WORKSPACE_URL="https://dbc-a1b2c3d4-e5f6.cloud.databricks.com"
TABLE_NAME="main.default.air_quality"
CLIENT_ID="your-client-id"
CLIENT_SECRET="your-client-secret"

sdk = ZerobusSdk(
SERVER_ENDPOINT,
DATABRICKS_WORKSPACE_URL
)

table_properties = TableProperties(TABLE_NAME)
options = StreamConfigurationOptions(record_type=RecordType.JSON)
stream = sdk.create_stream(CLIENT_ID, CLIENT_SECRET, table_properties, options)

try:
for i in range(1000):
record_dict = {
"device_name": f"sensor-{i}",
"temp": 20 + i % 15,
"humidity": 50 + i % 40
}
offset = stream.ingest_record_offset(record_dict)

# Optional: Wait for durability confirmation
stream.wait_for_offset(offset)
finally:
stream.close()

プロトコル バッファー : 型セーフな取り込みを行うには、 RecordType.PROTO (デフォルト) でプロトコル バッファーを使用し、テーブル プロパティに記述子 Proto を指定します。

完全なドキュメント、構成オプション、バッチ取り込み、およびプロトコル バッファーの例については、 Python SDK リポジトリを参照してください。