メインコンテンツまでスキップ

Zerobus Ingestコネクタを使用する

備考

プレビュー

Zerobus Ingest コネクタはパブリック プレビュー段階です。お試しいただくには、Databricks アカウント担当者にお問い合わせください。

このページでは、 Lakeflow Connectの Zerobus 直接書き込みコネクタを使用してデータを取り込む方法について説明します。

ワークスペースのURLを取得する

ログイン後に Databricks ワークスペースを表示するときは、ブラウザーで次の形式の URL を確認してください: https://<databricks-instance>.com/o=XXXXX 。URL は/o=XXXXXより前の部分すべてで構成されます。例:

完全なURL: https://abcd-teste2-test-spcse2.cloud.databricks.com/?o=2281745829657864#

ワークスペース URL: https://abcd-teste2-test-spcse2.cloud.databricks.com

ターゲットテーブルを作成または識別する

取り込むターゲット テーブルを特定します。新しいターゲット テーブルを作成するには、 CREATE TABLE SQL コマンドを実行します。例えば:

SQL
    CREATE TABLE unity.default.air_quality (
device_name STRING, temp INT, humidity LONG);

サービスプリンシパルを作成し、権限を付与する

サービスプリンシパルは、パーソナライズされたアカウントよりも高いセキュリティを提供する特殊な ID です。 サービスプリンシパルに関する詳細と、それらを認証に使用する方法については、これらの手順を参照してください。

  1. [設定] > [ID とアクセス] でワークスペースにサービスプリンシパルを作成します。

  2. サービスプリンシパルのクライアント ID とクライアント シークレットを生成して保存します。

  3. カタログ、スキーマ、テーブルに必要な権限をサービスプリンシパルに付与します。

    1. サービスプリンシパルページで、 構成 タブを開きます。
    2. アプリケーション ID (UUID) をコピーします。
    3. 必要に応じて、例の UUID とユーザー名、スキーマ名、およびテーブル名を置き換えて、次の SQL を使用します。
    SQL
    GRANT USE CATALOG ON CATALOG <username> TO `<UUID>`;
    GRANT USE SCHEMA ON SCHEMA <username.default> TO `<UUID>`;
    GRANT MODIFY, SELECT ON TABLE <username.default.table_name> TO `<UUID>`;

エンドポイント形式

Zerobus サーバーのエンドポイントとワークスペースの URL 形式は、クラウド プロバイダーによって異なります。

クラウド

サーバーエンドポイント

ワークスペースURL

AWS

<workspace-id>.zerobus.<region>.cloud.databricks.com

https://<instance>.cloud.databricks.com

Azure

<workspace-id>.zerobus.<region>.azuredatabricks.net

https://<instance>.azuredatabricks.net

AWS の例:

Text
Server endpoint: 1234567890123456.zerobus.us-west-2.cloud.databricks.com
Workspace URL: https://dbc-a1b2c3d4-e5f6.cloud.databricks.com

Azureの例:

Text
Server endpoint: 1234567890123456.zerobus.eastus.azuredatabricks.net
Workspace URL: https://adb-1234567890123456.12.azuredatabricks.net

クライアントを記述する

Python 3.9 以上が必要です。SDK 、 JSON (最も単純) とプロトコル バッファー (本番運用に推奨) をサポートしています。

Bash
pip install databricks-zerobus-ingest-sdk

JSONの例:

Python
from zerobus.sdk.sync import ZerobusSdk
from zerobus.sdk.shared import RecordType, StreamConfigurationOptions, TableProperties

# Configuration - see "Before you begin" section for how to obtain these values.
server_endpoint = "1234567890123456.zerobus.us-west-2.cloud.databricks.com"
workspace_url = "https://dbc-a1b2c3d4-e5f6.cloud.databricks.com"
table_name = "main.default.air_quality"
client_id = "your-service-principal-application-id"
client_secret = "your-service-principal-secret"

sdk = ZerobusSdk(server_endpoint, workspace_url)
table_properties = TableProperties(table_name)
options = StreamConfigurationOptions(record_type=RecordType.JSON)

stream = sdk.create_stream(client_id, client_secret, table_properties, options)

try:
for i in range(100):
record = {"device_name": f"sensor-{i}", "temp": 22, "humidity": 55}
ack = stream.ingest_record(record) # Pass dict directly, SDK handles serialization.
ack.wait_for_ack()
finally:
stream.close()

確認コールバック: 取り込みの進行状況を非同期的に追跡するには、 ack_callbackオプションを使用します。コールバックは、そのオフセットまでのすべてのレコードが永続的に書き込まれたことを示すdurability_ack_up_to_offset属性 (int) を持つIngestRecordResponseオブジェクトを受け取ります。

Python
from zerobus.sdk.shared import IngestRecordResponse

def on_ack(response: IngestRecordResponse) -> None:
print(f"Records acknowledged up to offset: {response.durability_ack_up_to_offset}")

options = StreamConfigurationOptions(
record_type=RecordType.JSON,
ack_callback=on_ack
)

プロトコル バッファー: 型セーフな取り込みの場合は、 RecordType.PROTO (デフォルト) でプロトコル バッファーを使用します。python -m zerobus.tools.generate_protoを使用してテーブルからスキーマを生成します。

完全なドキュメント、構成オプション、非同期 API、およびプロトコル バッファーの例については、 Python SDK リポジトリを参照してください。