メインコンテンツまでスキップ

OpenTelemetry (OTLP) クライアントを設定して、データをUnity Catalogに送信するようにします。

備考

ベータ版

この機能はベータ版です。

Zerobus Ingestには、OpenTelemetry Protocol(OTLP)エンドポイントが含まれています。カスタム ライブラリを使用せずに、標準の OpenTelemetry SDK とコレクターを使用して、トレース、ログ、メトリクスをUnity Catalog Deltaテーブルに直接プッシュできます。 このページでは、エンドポイントの取得、ターゲットテーブルの作成、サービスプリンシパルの設定、および最初のテレメトリデータの送信について説明します。

Zerobus Ingest エンドポイントとワークスペース URL を取得する

エンドポイントURLは次のパターンに従います。

  • ワークスペースURL: https://<databricks-instance>.gcp.databricks.com
  • サーバーエンドポイント: <workspace-id>.zerobus.<region>.gcp.databricks.com

例えば:

  • ワークスペースURL: https://1234567890123456.0.gcp.databricks.com
  • サーバーエンドポイント: 1234567890123456.zerobus.us-central1.gcp.databricks.com

ワークスペース ID、URL、およびリージョンの検索の詳細については、 「ワークスペース URL と Zerobus Ingest エンドポイントを取得する」を参照してください。

Unity Catalogでターゲットテーブルを作成する

データを送信する前に、ターゲットとなるDeltaテーブルを作成する必要があります。 各信号タイプ(トレース、ログ、メトリクス)には、それぞれ固有のスキーマを持つ独自のテーブルが必要です。

前提条件:

テーブルをセットアップするには:

  1. <catalog>.<schema>.<prefix> 、カタログ、スキーマ、および希望するテーブル名のプレフィックスに置き換えてください。
  2. <service-principal-uuid>サービスプリンシパルのアプリ ID (UUID) に置き換えてください。これを見つけるには、 Databricksワークスペースのサービスプリンシパルの [構成] タブに移動します。
  3. Databricks SQLでスクリプトを実行してください。

スパン表

スパンテーブルには、各スパンのタイミング、ステータス、属性などの分散トレースデータが格納されます。

SQL
CREATE TABLE <catalog>.<schema>.<prefix>_otel_spans (
record_id STRING,
time TIMESTAMP,
date DATE,
service_name STRING,
trace_id STRING,
span_id STRING,
trace_state STRING,
parent_span_id STRING,
flags INT,
name STRING,
kind STRING,
start_time_unix_nano LONG,
end_time_unix_nano LONG,
attributes VARIANT,
dropped_attributes_count INT,
events ARRAY<STRUCT<
time_unix_nano: LONG,
name: STRING,
attributes: VARIANT,
dropped_attributes_count: INT
>>,
dropped_events_count INT,
links ARRAY<STRUCT<
trace_id: STRING,
span_id: STRING,
trace_state: STRING,
attributes: VARIANT,
dropped_attributes_count: INT,
flags: INT
>>,
dropped_links_count INT,
status STRUCT<
message: STRING,
code: STRING
>,
resource STRUCT<
attributes: VARIANT,
dropped_attributes_count: INT
>,
resource_schema_url STRING,
instrumentation_scope STRUCT<
name: STRING,
version: STRING,
attributes: VARIANT,
dropped_attributes_count: INT
>,
span_schema_url STRING
) USING DELTA
CLUSTER BY (time, service_name, trace_id)
TBLPROPERTIES (
'otel.schemaVersion' = 'v2',
'delta.checkpointPolicy' = 'classic',
'delta.enableVariantShredding' = 'true', -- optional
'delta.feature.variantShredding-preview' = 'supported', -- optional
'delta.feature.variantType-preview' = 'supported' -- optional
);

ログテーブル

ログテーブルには、重要度、本文、リソース属性などの構造化されたログレコードが格納されます。

SQL
CREATE TABLE <catalog>.<schema>.<prefix>_otel_logs (
record_id STRING,
time TIMESTAMP,
date DATE,
service_name STRING,
event_name STRING,
trace_id STRING,
span_id STRING,
time_unix_nano LONG,
observed_time_unix_nano LONG,
severity_number STRING,
severity_text STRING,
body VARIANT,
attributes VARIANT,
dropped_attributes_count INT,
flags INT,
resource STRUCT<
attributes: VARIANT,
dropped_attributes_count: INT
>,
resource_schema_url STRING,
instrumentation_scope STRUCT<
name: STRING,
version: STRING,
attributes: VARIANT,
dropped_attributes_count: INT
>,
log_schema_url STRING
) USING DELTA
CLUSTER BY (time, service_name)
TBLPROPERTIES (
'otel.schemaVersion' = 'v2',
'delta.checkpointPolicy' = 'classic',
'delta.enableVariantShredding' = 'true', -- optional
'delta.feature.variantShredding-preview' = 'supported', -- optional
'delta.feature.variantType-preview' = 'supported' -- optional
);

メトリクステーブル

メトリクステーブルには、ゲージ、合計、ヒストグラムの測定値と、それらに関連付けられたリソースおよび計測範囲の属性が格納されます。

SQL
CREATE TABLE <catalog>.<schema>.<prefix>_otel_metrics (
record_id STRING,
time TIMESTAMP,
date DATE,
service_name STRING,
start_time_unix_nano LONG,
time_unix_nano LONG,
name STRING,
description STRING,
unit STRING,
metric_type STRING,
gauge STRUCT<
value: DOUBLE,
exemplars: ARRAY<STRUCT<
time_unix_nano: LONG,
value: DOUBLE,
span_id: STRING,
trace_id: STRING,
filtered_attributes: VARIANT
>>,
attributes: VARIANT,
flags: INT
>,
sum STRUCT<
value: DOUBLE,
exemplars: ARRAY<STRUCT<
time_unix_nano: LONG,
value: DOUBLE,
span_id: STRING,
trace_id: STRING,
filtered_attributes: VARIANT
>>,
attributes: VARIANT,
flags: INT,
aggregation_temporality: STRING,
is_monotonic: BOOLEAN
>,
histogram STRUCT<
count: LONG,
sum: DOUBLE,
bucket_counts: ARRAY<LONG>,
explicit_bounds: ARRAY<DOUBLE>,
exemplars: ARRAY<STRUCT<
time_unix_nano: LONG,
value: DOUBLE,
span_id: STRING,
trace_id: STRING,
filtered_attributes: VARIANT
>>,
attributes: VARIANT,
flags: INT,
min: DOUBLE,
max: DOUBLE,
aggregation_temporality: STRING
>,
exponential_histogram STRUCT<
attributes: VARIANT,
count: LONG,
sum: DOUBLE,
scale: INT,
zero_count: LONG,
positive_bucket: STRUCT<
offset: INT,
bucket_counts: ARRAY<LONG>
>,
negative_bucket: STRUCT<
offset: INT,
bucket_counts: ARRAY<LONG>
>,
flags: INT,
exemplars: ARRAY<STRUCT<
time_unix_nano: LONG,
value: DOUBLE,
span_id: STRING,
trace_id: STRING,
filtered_attributes: VARIANT
>>,
min: DOUBLE,
max: DOUBLE,
zero_threshold: DOUBLE,
aggregation_temporality: STRING
>,
summary STRUCT<
count: LONG,
sum: DOUBLE,
quantile_values: ARRAY<STRUCT<
quantile: DOUBLE,
value: DOUBLE
>>,
attributes: VARIANT,
flags: INT
>,
metadata VARIANT,
resource STRUCT<
attributes: VARIANT,
dropped_attributes_count: INT
>,
resource_schema_url STRING,
instrumentation_scope STRUCT<
name: STRING,
version: STRING,
attributes: VARIANT,
dropped_attributes_count: INT
>,
metric_schema_url STRING
) USING DELTA
CLUSTER BY (time, service_name)
TBLPROPERTIES (
'otel.schemaVersion' = 'v2',
'delta.checkpointPolicy' = 'classic',
'delta.enableVariantShredding' = 'true', -- optional
'delta.feature.variantShredding-preview' = 'supported', -- optional
'delta.feature.variantType-preview' = 'supported' -- optional
);

サービスプリンシパルを作成し、権限を付与する

OAuth認証情報を使用してサービスプリンシパルを設定し、テーブルへのアクセス権を付与します。サービスプリンシパルの設定の詳細については、 OAuthを使用したDatabricksへのサービスプリンシパルのアクセスを許可する」を参照してください。

サービスプリンシパルに、カタログ、スキーマ、および各テーブルへのアクセス権を付与します。ALL PRIVILEGESを付与するだけでは不十分です。各テーブルに対して、 MODIFYSELECT明示的に付与する必要があります。

SQL
GRANT USE CATALOG ON CATALOG <catalog> TO `<service-principal-uuid>`;
GRANT USE SCHEMA ON SCHEMA <catalog>.<schema> TO `<service-principal-uuid>`;
GRANT MODIFY, SELECT ON TABLE <catalog>.<schema>.<prefix>_otel_spans TO `<service-principal-uuid>`;
GRANT MODIFY, SELECT ON TABLE <catalog>.<schema>.<prefix>_otel_logs TO `<service-principal-uuid>`;
GRANT MODIFY, SELECT ON TABLE <catalog>.<schema>.<prefix>_otel_metrics TO `<service-principal-uuid>`;

エクスポートツールを設定する

以下の例では、OpenTelemetryのゼロコード計測機能を使用して、アプリケーションのコードを変更することなく、トレース、ログ、およびメトリクスを自動的に収集し、Zerobus Ingestに転送します。 gRPCとカスタムメタデータヘッダーをサポートする、OTLP互換の他のエクスポーターを使用することもできます。

必須ヘッダー

すべてのOTLPリクエストには、以下のメタデータヘッダーを含める必要があります。

  • x-databricks-zerobus-table-name: 完全修飾されたUnity Catalogテーブル名 ( <catalog>.<schema>.<table>形式)。 各リクエストは単一のテーブルを対象とします。
  • Authorizationサービスプリンシパルの認証情報から生成されたOAuthベアラートークン。

サービスプリンシパルの認証情報から静的ベアラートークンを生成するには、 「OAuth を使用して Databricks へのサービスプリンシパルのアクセスを承認する」を参照してください。静的トークンは1時間後に有効期限が切れます。長時間実行されるアプリケーションについては、トークンの自動更新機能を備えた OpenTelemetry Collector を参照してください。

変数設定

いずれの例を実行する前にも、以下の変数を定義してください。

変数

DATABRICKS_CLIENT_ID

abc123-... (サービスプリンシパルアプリID)

DATABRICKS_CLIENT_SECRET

dose1234...

WORKSPACE_URL

1234567890123456.0.gcp.databricks.com

WORKSPACE_ID

1234567890123456

REGION

us-central1

CATALOG

my_catalog

SCHEMA

my_schema

TABLE_PREFIX

my_prefix

これらの変数は、Bashを使用して環境変数として定義できます。例えば:

Bash
export DATABRICKS_CLIENT_ID="<your-client-id>"
export DATABRICKS_CLIENT_SECRET="<your-client-secret>"

静的トークンを使用したクイックスタート

以下の例では、各シグナルタイプ(トレース、ログ、メトリクス)ごとに静的なベアラートークンを使用しています。この例を実行する前に、サービスプリンシパルの認証情報からトークンを生成してください。「 OAuthを使用したDatabricksへのサービスプリンシパル アクセスの承認」を参照してください。

トークンの更新管理が問題とならない、短期間または臨時のパイプラインには、この方法を使用してください。静的OAuth 1時間後に期限切れとなり、長時間実行されるプロセスには適していません。 本番運用ワークロードの場合は、代わりに自動更新を備えた OpenTelemetry Collector を使用してください。

信号の種類ごとに個別のトークンを生成する必要があります。authorization_detailsペイロードでは、 $TABLE_NAME ${TABLE_PREFIX}_otel_spans${TABLE_PREFIX}_otel_logs${TABLE_PREFIX}_otel_metricsなどの各シグナルの完全なテーブル名に置き換えます。

Bash
authorization_details=$(cat <<EOF
[{
"type": "unity_catalog_privileges",
"privileges": ["USE CATALOG"],
"object_type": "CATALOG",
"object_full_path": "$CATALOG"
},
{
"type": "unity_catalog_privileges",
"privileges": ["USE SCHEMA"],
"object_type": "SCHEMA",
"object_full_path": "$CATALOG.$SCHEMA"
},
{
"type": "unity_catalog_privileges",
"privileges": ["SELECT", "MODIFY"],
"object_type": "TABLE",
"object_full_path": "$CATALOG.$SCHEMA.$TABLE_NAME"
}]
EOF
)

curl -X POST \
-u "$DATABRICKS_CLIENT_ID:$DATABRICKS_CLIENT_SECRET" \
-d "grant_type=client_credentials" \
-d "scope=all-apis" \
-d "resource=api://databricks/workspaces/$WORKSPACE_ID/zerobusDirectWriteApi" \
--data-urlencode "authorization_details=$authorization_details" \
"https://$WORKSPACE_URL/oidc/v1/token"

自動インストルメンテーション パッケージをインストールする前に、返された 3 つのアクセスウイルスをTOKEN_SPANSTOKEN_LOGS 、およびTOKEN_METRICSとして保存します。 次に、アプリケーションを実行します。

Bash
OTEL_SERVICE_NAME="my-service" \
OTEL_EXPORTER_OTLP_PROTOCOL="grpc" \
OTEL_EXPORTER_OTLP_TRACES_ENDPOINT="https://${WORKSPACE_ID}.zerobus.${REGION}.gcp.databricks.com:443" \
OTEL_EXPORTER_OTLP_LOGS_ENDPOINT="https://${WORKSPACE_ID}.zerobus.${REGION}.gcp.databricks.com:443" \
OTEL_EXPORTER_OTLP_METRICS_ENDPOINT="https://${WORKSPACE_ID}.zerobus.${REGION}.gcp.databricks.com:443" \
OTEL_EXPORTER_OTLP_TRACES_HEADERS="authorization=Bearer ${TOKEN_SPANS},x-databricks-zerobus-table-name=${CATALOG}.${SCHEMA}.${TABLE_PREFIX}_otel_spans" \
OTEL_EXPORTER_OTLP_LOGS_HEADERS="authorization=Bearer ${TOKEN_LOGS},x-databricks-zerobus-table-name=${CATALOG}.${SCHEMA}.${TABLE_PREFIX}_otel_logs" \
OTEL_EXPORTER_OTLP_METRICS_HEADERS="authorization=Bearer ${TOKEN_METRICS},x-databricks-zerobus-table-name=${CATALOG}.${SCHEMA}.${TABLE_PREFIX}_otel_metrics" \
OTEL_TRACES_EXPORTER="otlp" \
OTEL_METRICS_EXPORTER="otlp" \
OTEL_LOGS_EXPORTER="otlp" \
opentelemetry-instrument python my_app.py

トークンの自動更新機能を備えたOpenTelemetryコレクター

Databricks OAuth 1 時間後に期限切れになります。 アプリケーションコード内でトークンの更新を管理するのではなく、アプリケーションとZerobus Ingest間のプロキシとしてOpenTelemetry Collectorをデプロイしてください。コレクターは、起動時にサービスプリンシパルの認証情報からトークンを生成し、有効期限が切れる前に自動的に更新するためにoauth2clientauthextensionを使用します。

これは、長時間実行される本番運用ワークロードに推奨されるアプローチです。 静的なアプローチとは異なり、Collector はOAuthの取得と更新を自動的に処理します。アプリケーション コードを変更する必要はありません。

コレクターは、アプリケーションとZerobus Ingestの間に位置します。アプリケーションは認証なしで、 localhost:4317上のコレクターにプレーンな OTLP を送信します。コレクターは、各リクエストにOAuthとテーブルヘッダーを追加し、Zerobus Ingestエンドポイントに転送します。

コレクター構成

コレクターを設定するためのcollector.yamlファイルを作成します。

YAML
extensions:
oauth2client/spans:
client_id: ${env:DATABRICKS_CLIENT_ID}
client_secret: ${env:DATABRICKS_CLIENT_SECRET}
token_url: https://${env:WORKSPACE_URL}/oidc/v1/token
scopes: ['all-apis']
endpoint_params:
resource: 'api://databricks/workspaces/${env:WORKSPACE_ID}/zerobusDirectWriteApi'
authorization_details:
- '[{"type":"unity_catalog_privileges","privileges":["USE CATALOG"],"object_type":"CATALOG","object_full_path":"${env:CATALOG}"},{"type":"unity_catalog_privileges","privileges":["USE SCHEMA"],"object_type":"SCHEMA","object_full_path":"${env:CATALOG}.${env:SCHEMA}"},{"type":"unity_catalog_privileges","privileges":["SELECT","MODIFY"],"object_type":"TABLE","object_full_path":"${env:CATALOG}.${env:SCHEMA}.${env:TABLE_PREFIX}_otel_spans"}]'

oauth2client/logs:
client_id: ${env:DATABRICKS_CLIENT_ID}
client_secret: ${env:DATABRICKS_CLIENT_SECRET}
token_url: https://${env:WORKSPACE_URL}/oidc/v1/token
scopes: ['all-apis']
endpoint_params:
resource: 'api://databricks/workspaces/${env:WORKSPACE_ID}/zerobusDirectWriteApi'
authorization_details:
- '[{"type":"unity_catalog_privileges","privileges":["USE CATALOG"],"object_type":"CATALOG","object_full_path":"${env:CATALOG}"},{"type":"unity_catalog_privileges","privileges":["USE SCHEMA"],"object_type":"SCHEMA","object_full_path":"${env:CATALOG}.${env:SCHEMA}"},{"type":"unity_catalog_privileges","privileges":["SELECT","MODIFY"],"object_type":"TABLE","object_full_path":"${env:CATALOG}.${env:SCHEMA}.${env:TABLE_PREFIX}_otel_logs"}]'

oauth2client/metrics:
client_id: ${env:DATABRICKS_CLIENT_ID}
client_secret: ${env:DATABRICKS_CLIENT_SECRET}
token_url: https://${env:WORKSPACE_URL}/oidc/v1/token
scopes: ['all-apis']
endpoint_params:
resource: 'api://databricks/workspaces/${env:WORKSPACE_ID}/zerobusDirectWriteApi'
authorization_details:
- '[{"type":"unity_catalog_privileges","privileges":["USE CATALOG"],"object_type":"CATALOG","object_full_path":"${env:CATALOG}"},{"type":"unity_catalog_privileges","privileges":["USE SCHEMA"],"object_type":"SCHEMA","object_full_path":"${env:CATALOG}.${env:SCHEMA}"},{"type":"unity_catalog_privileges","privileges":["SELECT","MODIFY"],"object_type":"TABLE","object_full_path":"${env:CATALOG}.${env:SCHEMA}.${env:TABLE_PREFIX}_otel_metrics"}]'

exporters:
otlp/spans:
endpoint: ${env:WORKSPACE_ID}.zerobus.${env:REGION}.gcp.databricks.com:443
auth:
authenticator: oauth2client/spans
headers:
x-databricks-zerobus-table-name: '${env:CATALOG}.${env:SCHEMA}.${env:TABLE_PREFIX}_otel_spans'

otlp/logs:
endpoint: ${env:WORKSPACE_ID}.zerobus.${env:REGION}.gcp.databricks.com:443
auth:
authenticator: oauth2client/logs
headers:
x-databricks-zerobus-table-name: '${env:CATALOG}.${env:SCHEMA}.${env:TABLE_PREFIX}_otel_logs'

otlp/metrics:
endpoint: ${env:WORKSPACE_ID}.zerobus.${env:REGION}.gcp.databricks.com:443
auth:
authenticator: oauth2client/metrics
headers:
x-databricks-zerobus-table-name: '${env:CATALOG}.${env:SCHEMA}.${env:TABLE_PREFIX}_otel_metrics'

receivers:
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:4317

processors:
batch:
timeout: 5s # adjust as needed
send_batch_size: 10 # adjust as needed

service:
extensions: [oauth2client/spans, oauth2client/logs, oauth2client/metrics]
pipelines:
traces:
receivers: [otlp] # adjust as needed
processors: [batch]
exporters: [otlp/spans]
logs:
receivers: [otlp]
processors: [batch]
exporters: [otlp/logs]
metrics:
receivers: [otlp]
processors: [batch]
exporters: [otlp/metrics]

次にコレクターを実行します。

Bash
./otelcol-contrib --config collector.yaml

アプリケーションを計測する

このコードサンプルを実行する前に、必要な変数を設定してください。次に、自動計測パッケージをインストールし、アプリケーションを実行します。

Bash
OTEL_SERVICE_NAME=my-service \
OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317 \
OTEL_EXPORTER_OTLP_PROTOCOL=grpc \
OTEL_TRACES_EXPORTER=otlp \
OTEL_METRICS_EXPORTER=otlp \
OTEL_LOGS_EXPORTER=otlp \
opentelemetry-instrument python my_app.py

次のステップ