メインコンテンツまでスキップ

Zerobus Ingestコネクタを使用する

備考

プレビュー

Zerobus Ingest コネクタはパブリック プレビュー段階です。お試しいただくには、Databricks アカウント担当者にお問い合わせください。

このページでは、 LakeFlow Connectの Zerobus 直接書き込みコネクタを使用してデータを取り込む方法について説明します。

ワークスペースのURLを取得する

ログイン後に Databricks ワークスペースを表示するときは、ブラウザーで次の形式の URL を確認してください: https://<databricks-instance>.com/o=XXXXX 。URL は/o=XXXXXより前の部分すべてで構成されます。例:

完全なURL: https://abcd-teste2-test-spcse2.cloud.databricks.com/?o=2281745829657864#

ワークスペース URL: https://abcd-teste2-test-spcse2.cloud.databricks.com

ターゲットテーブルを作成または識別する

取り込むターゲット テーブルを特定します。新しいターゲット テーブルを作成するには、 CREATE TABLE SQL コマンドを実行します。例えば:

SQL
    CREATE TABLE unity.default.air_quality (
device_name STRING, temp INT, humidity LONG);

サービスプリンシパルを作成し、権限を付与する

サービスプリンシパルは、パーソナライズされたアカウントよりも高いセキュリティを提供する特殊な ID です。 サービスプリンシパルに関する詳細と、それらを認証に使用する方法については、これらの手順を参照してください。

  1. [設定] > [ID とアクセス] でワークスペースにサービスプリンシパルを作成します。

  2. サービスプリンシパルのクライアント ID とクライアント シークレットを生成して保存します。

  3. カタログ、スキーマ、テーブルに必要な権限をサービスプリンシパルに付与します。

    1. サービスプリンシパルページで、 「構成」 タブを開きます。
    2. アプリケーション ID (UUID) をコピーします。
    3. 必要に応じて、例の UUID とユーザー名、スキーマ名、およびテーブル名を置き換えて、次の SQL を使用します。
    SQL
    GRANT USE CATALOG ON CATALOG <username> TO `<UUID>`;
    GRANT USE SCHEMA ON SCHEMA <username.default> TO `<UUID>`;
    GRANT MODIFY, SELECT ON TABLE <username.default.table_name> TO `<UUID>`;

クライアントを書く

Python 3.9 以上が必要です。

  1. Python SDK をインストールします。

    Bash
    pip install databricks-zerobus-ingest-sdk

    あるいは、ソースからインストールします。

    Bash
    git clone https://github.com/databricks/zerobus-sdk-py.git
    cd zerobus-sdk-py
    pip install -e .
  2. プロトコル バッファー定義を生成します。

    generate_protoツールを使用して、Unity Catalog テーブル スキーマから proto 定義を自動的に生成します。

    Bash
    python -m zerobus.tools.generate_proto \
    --uc-endpoint "https://dbc-a1b2c3d4-e5f6.cloud.databricks.com" \
    --client-id "<service-principal-application-id>" \
    --client-secret "<service-principal-secret>" \
    --table "<catalog.schema.table_name>" \
    --output "record.proto"

    テーブルの出力例:

    SQL
    CREATE TABLE main.default.air_quality (
    device_name STRING,
    temp INT,
    humidity BIGINT
    )
    USING DELTA;

    record.protoを生成します:

    Proto
    syntax = "proto2";

    message air_quality {
    optional string device_name = 1;
    optional int32 temp = 2;
    optional int64 humidity = 3;
    }

    Protobuf メッセージの詳細については、ここを参照してください。

  3. プロトコル バッファ定義をコンパイルします。

    grpcio-toolsをインストールして proto ファイルをコンパイルします:

    Bash
    pip install "grpcio-tools>=1.60.0,<2.0"
    python -m grpc_tools.protoc --python_out=. --proto_path=. record.proto

    これにより、コンパイルされたプロトコル バッファ定義を含むrecord_pb2.pyファイルが生成されます。

  4. Python クライアントを作成します。

    SDK を使用するには、次の情報が必要です。

    • Databricks ワークスペース URL
    • ワークスペース ID (ワークスペース URL の/o=以降にあります)
    • テーブル名
    • サービスシプリンパルのクライアントIDとクライアントシークレット
    • Zerobusエンドポイントの形式 https://<workspace_id>.zerobus.<region>.cloud.databricks.com

    同期の例:

    Python
    import logging
    from zerobus.sdk.sync import ZerobusSdk
    from zerobus.sdk.shared import TableProperties
    import record_pb2

    # Configure logging (optional).
    logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
    )

    # Configuration.
    server_endpoint = "1234567890123456.zerobus.us-west-2.cloud.databricks.com"
    workspace_url = "https://dbc-a1b2c3d4-e5f6.cloud.databricks.com"
    table_name = "main.default.air_quality"
    client_id = "your-service-principal-application-id"
    client_secret = "your-service-principal-secret"

    # Initialize SDK.
    sdk = ZerobusSdk(server_endpoint, workspace_url)

    # Configure table properties.
    table_properties = TableProperties(
    table_name,
    record_pb2.AirQuality.DESCRIPTOR
    )

    # Create stream.
    stream = sdk.create_stream(
    client_id,
    client_secret,
    table_properties
    )

    try:
    # Ingest records.
    for i in range(100):
    record = record_pb2.AirQuality(
    device_name=f"sensor-{i % 10}",
    temp=20 + (i % 15),
    humidity=50 + (i % 40)
    )

    ack = stream.ingest_record(record)
    ack.wait_for_ack() # Wait for durability.

    print(f"Ingested record {i + 1}")

    print("Successfully ingested 100 records!")
    finally:
    stream.close()

設定オプション

SDK はStreamConfigurationOptionsを介してさまざまな構成オプションをサポートしています。

オプション

デフォルト

説明

最大飛行記録数

50000

未確認レコードの最大数

回復

True

自動ストリーム回復を有効にする

リカバリタイムアウトミリ秒

15000

回復操作のタイムアウト(ミリ秒)

リカバリバックオフミリ秒

2000

回復試行間の遅延(ミリ秒)

回復再試行

3

回復試行の最大回数

フラッシュタイムアウトミリ秒

300000

フラッシュ操作のタイムアウト(ミリ秒)

サーバー_不足_ack_timeout_ms

60000

サーバー確認応答タイムアウト(ミリ秒)

ack_コールバック

なし

レコード確認時に呼び出されるコールバック

エラー処理

SDK は 2 つの例外タイプを提供します。

  • ZerobusException: 再試行可能なエラー(ネットワークの問題、一時的な障害)
  • NonRetriableException: 再試行不可能なエラー (無効な資格情報、テーブルがありません)
Python
from zerobus.sdk.shared import ZerobusException, NonRetriableException

try:
stream.ingest_record(record)
except NonRetriableException as e:
print(f"Fatal error: {e}")
raise
except ZerobusException as e:
print(f"Retriable error: {e}")
# Implement retry logic with backoff.