メインコンテンツまでスキップ

カスタム モデルサービング データをUnity Catalogに保存します

備考

ベータ版

この機能はベータ版です。すべての顧客に対して自動的に有効になるわけではなく、機能は変更される可能性があります。アクセスをリクエストするには、Databricks アカウント チームにお問い合わせください。

カスタム モデルサービング エンドポイントからUnity Catalogテーブルに OpenTelemetry ログ、トレース、メトリクスを永続化するようにエンドポイント テレメトリを構成する方法を学びます。 永続化されたテレメトリ データを使用して根本原因分析を実行し、エンドポイントの正常性を監視し、標準の SQL クエリでコンプライアンス要件を満たします。

要件

  • ワークスペースで Unity Catalog が有効になっている必要があります。デフォルトのストレージ (Arclight) はサポートされていません。

  • ログが保存される宛先 Unity Catalog カタログとスキーマに対するUSE CATALOGUSE SCHEMACREATE TABLE 、およびMODIFY権限が必要です。

  • 既存のカスタム モデルビング サー エンドポイント、またはそれを作成するための権限。

  • ワークスペースはサポートされているリージョン内にある必要があります。

    • us-east-1
    • us-east-2
    • us-west-2
    • eu-central-1
    • ap-southeast-1
    • ap-southeast-2
    • ap-northeast-1
    • ca-central-1
    • eu-west-1

ステップ 1: モデルコードをインストルメント化する

テレメトリをキャプチャするには、モデル コードにインストルメンテーションを追加します。

  1. モデルにアプリケーション ログを追加します。エンドポイント テレメトリは、標準の Python logging出力を自動的にキャプチャします。基本的なログ記録には OpenTelemetry SDK インストルメンテーションは必要ありません。

    Python
    import logging

    class MyCustomModel(mlflow.pyfunc.PythonModel):
    def predict(self, context, model_input):
    # This log will be persisted to the <prefix>_otel_logs table
    logging.warning("Received inference request")

    try:
    # Your model logic here
    result = model_input * 2
    return result
    except Exception as e:
    # Error logs are also captured with severity 'ERROR'
    logging.error(f"Inference failed: {e}")
    raise e

    ルート ログ レベルはWARNINGに設定されています。ログ レベルを変更するには、 「トラブルシューティング」を参照してください。

  2. (オプション) OpenTelemetry を使用してカスタム メトリクスとトレースを計測します。 基本的なログを超えてカスタム メトリクスとトレースをキャプチャするには、OpenTelemetry SDKインストルメンテーションをモデルに追加します。 カウンターを作成し、スパンを記録し、カスタム属性を添付する方法を示す完全な例については、次のセクションを展開してください。

括弧の四角いアイコン。 例: OpenTelemetry を使用したカスタム メトリクス、スパン、モデル ロギング

注記

Due to limitations in model serialization, you must write your model to a separate file before logging to avoid errors, as shown below using %%writefile return_input_model.py.

Python
%%writefile return_input_model.py
import os

import mlflow
from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.metrics import get_meter, set_meter_provider
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.trace import get_tracer, set_tracer_provider

# ---- OTel initialization (per-worker) ----
resource = Resource.create({
"worker.pid": str(os.getpid()),
})

otlp_trace_exporter = OTLPSpanExporter()
tracer_provider = TracerProvider(resource=resource)
tracer_provider.add_span_processor(BatchSpanProcessor(otlp_trace_exporter))
set_tracer_provider(tracer_provider)

otlp_metric_exporter = OTLPMetricExporter()
metric_reader = PeriodicExportingMetricReader(otlp_metric_exporter)
meter_provider = MeterProvider(metric_readers=[metric_reader], resource=resource)
set_meter_provider(meter_provider)

_tracer = get_tracer(__name__)
_meter = get_meter(__name__)
_prediction_counter = _meter.create_counter(
name="prediction_count",
description="Number of predictions made",
unit="1"
)


class ReturnInputModel(mlflow.pyfunc.PythonModel):
def load_context(self, context):
self.tracer = _tracer
self.prediction_counter = _prediction_counter

def predict(self, context, model_input):
with self.tracer.start_as_current_span("ReturnInputModel.predict") as span:
span.set_attribute("input_shape", str(model_input.shape))
span.set_attribute("input_columns", str(list(model_input.columns)))
self.prediction_counter.add(1)
return model_input

mlflow.models.set_model(ReturnInputModel())
  1. モデルをログに登録します。

    Python
    import pandas as pd
    import mlflow
    from mlflow.models import infer_signature

    # Prepare tabular input/output for signature (pyfunc expects DataFrame)
    input_df = pd.DataFrame({"inputs": ["hello world"]})
    output_df = input_df.copy() # model returns input unchanged

    # Log the model with OpenTelemetry dependencies (using code-based logging to avoid serialization issues)
    with mlflow.start_run():
    signature = infer_signature(input_df, output_df)

    model_info = mlflow.pyfunc.log_model(
    name="model",
    python_model="return_input_model.py",
    signature=signature,
    input_example=input_df,
    pip_requirements=[
    "mlflow==3.1",
    "opentelemetry-sdk",
    "opentelemetry-exporter-otlp-proto-http",
    ],
    )

    # Register with serverless optimized deployment environment packing
    # Use Unity Catalog name: catalog.schema.model_name
    registered = mlflow.register_model(
    model_info.model_uri,
    MODEL_NAME,
    env_pack="databricks_model_serving"
    )

ステップ 2: Unity Catalog宛先を準備する

エンドポイントを作成する前に、テレメトリ データを受信するためのカタログとスキーマの準備ができていることを確認してください。Databricks は、このスキーマに必要なテーブルがまだ存在しない場合は自動的に作成します。

  1. カタログ エクスプローラーで、使用するカタログとスキーマ (例: my_catalog.observability ) に移動します。

ステップ 3: エンドポイント テレメトリを有効にする

新しいエンドポイントを作成するとき、または既存のエンドポイントにテレメトリを追加するときに、テレメトリを有効にすることができます。

UI でテレメトリを有効にするには:

  1. 左側のサイドバーの 「サービング」 に移動します。
  2. サービングエンドポイントの作成 をクリックします。
  3. エンドポイント テレメトリ セクション (プレビューとマークされています) で、構成オプションを展開します。
  4. Unity Catalog場所 : ステップ 2 で準備した宛先の カタログスキーマ を選択します。
  5. (オプション) テーブル プレフィックス : 生成されたテーブルのプレフィックスを入力します。空白のままにすると、プレフィックスは付きません。テーブルの名前は<prefix>_otel_logs<prefix>_otel_spans<prefix>_otel_metricsです。
  6. 残りのエンドポイント構成 (モデルの選択、コンピュート設定) を完了し、 [作成] をクリックします。

API を使用してこれを行うには:

括弧の四角いアイコン。 APIを使用してテレメトリを有効にする

Bash
curl -X POST -H "Authorization: Bearer <your-token>" \
https://<workspace-url>/api/2.0/serving-endpoints \
-d '{
"name": "my-custom-logging-endpoint",
"config": {
"served_entities": [
{
"name": "my-model",
"entity_name": "my-model",
"entity_version": "1",
"workload_size": "Small",
"scale_to_zero_enabled": true
}
],
"telemetry_config": {
"table_names": {
"logs_table": "my_catalog.observability.custom_endpoint_logs",
"metrics_table": "my_catalog.observability.custom_endpoint_metrics",
"traces_table": "my_catalog.observability.custom_endpoint_spans"
}
}
}
}'

ステップ 4: テレメトリ データを確認してクエリする

エンドポイントがトラフィックを受信すると、設定されたUnity Catalogテーブルにテレメトリ データがストリームされます。

  1. カタログ エクスプローラー または SQL エディター に移動します。

  2. 構成されたスキーマ内で<prefix>_otel_logsという名前のテーブルを見つけます。

  3. クエリを実行して、データが流れていることを確認します。

    SQL
    SELECT * FROM <catalog>.<schema>.<prefix>_otel_logs
    LIMIT 10;

テレメトリデータのクエリ

次の例は一般的なクエリを示しています。

テレメトリ テーブルの完全なスキーマを表示するには、次のコマンドを実行します。

SQL
DESCRIBE TABLE <catalog>.<schema>.<prefix>_otel_logs;

これらの列を使用して、テレメトリ データをフィルター処理および相関させます。

  • timestamp
  • severity_text
  • body
  • trace_id
  • span_id
  • attributes — イベント固有のメタデータを含むマップ。

過去1時間のエラーを確認する

SQL
SELECT
timestamp,
severity_text,
body,
attributes
FROM <catalog>.<schema>.<prefix>_otel_logs
WHERE
severity_text = 'ERROR'
AND timestamp > current_timestamp() - INTERVAL 1 HOUR
ORDER BY timestamp DESC;

トラブルシューティング

ログがテーブルに表示されません : オーバーヘッドを削減するために、ルート ログ レベルはデフォルトでWARNINGに設定されます。重大度の低いログをキャプチャするには、モデル コードのレベルを変更します。

Python
class MyModel(mlflow.pyfunc.PythonModel):
def load_context(self, context):
root = logging.getLogger()
root.setLevel(logging.DEBUG)
for handler in root.handlers:
handler.setLevel(logging.DEBUG)

制限事項

エンドポイント テレメトリには次の制限が適用されます。

  • ターゲットテーブルでのスキーマ進化はサポートされていません。

  • 管理されたDeltaテーブルのみがサポートされます。 外部ストレージおよび Arclight デフォルト ストレージはサポートされていません。

  • テーブルの場所は、ワークスペースと同じリージョンにある必要があります。

  • ASCII 文字、数字、アンダースコアを含むテーブル名のみがサポートされます。

  • ターゲット テーブルの再作成はサポートされていません。

  • 単一の可用性ゾーン (single-az) の耐久性のみがサポートされます。

  • 配信は少なくとも 1 回行われます。サーバーからの確認応答は、レコードが永続的であり、Delta テーブル内にあることを意味します。

  • レコードはそれぞれ 10 MB 未満である必要があります。

  • リクエストはそれぞれ 30 MB 未満である必要があります。

  • ログ行はそれぞれ 1 MB 未満である必要があります。

  • テレメトリのレイテンシは 2500 QPS を超えると低下します。

  • ログは出力されてから数秒後にUnity Catalogテーブルに表示されます。