カスタム モデルサービング データをUnity Catalogに保存します
ベータ版
この機能はベータ版です。すべての顧客に対して自動的に有効になるわけではなく、機能は変更される可能性があります。アクセスをリクエストするには、Databricks アカウント チームにお問い合わせください。
カスタム モデルサービング エンドポイントからUnity Catalogテーブルに OpenTelemetry ログ、トレース、メトリクスを永続化するようにエンドポイント テレメトリを構成する方法を学びます。 永続化されたテレメトリ データを使用して根本原因分析を実行し、エンドポイントの正常性を監視し、標準の SQL クエリでコンプライアンス要件を満たします。
要件
-
ワークスペースで Unity Catalog が有効になっている必要があります。デフォルトのストレージ (Arclight) はサポートされていません。
-
ログが保存される宛先 Unity Catalog カタログとスキーマに対する
USE CATALOG、USE SCHEMA、CREATE TABLE、およびMODIFY権限が必要です。 -
既存のカスタム モデルビング サー エンドポイント、またはそれを作成するための権限。
-
ワークスペースはサポートされているリージョン内にある必要があります。
us-east-1us-east-2us-west-2eu-central-1ap-southeast-1ap-southeast-2ap-northeast-1ca-central-1eu-west-1
ステップ 1: モデルコードをインストルメント化する
テレメトリをキャプチャするには、モデル コードにインストルメンテーションを追加します。
-
モデルにアプリケーション ログを追加します。エンドポイント テレメトリは、標準の Python
logging出力を自動的にキャプチャします。基本的なログ記録には OpenTelemetry SDK インストルメンテーションは必要ありません。Pythonimport logging
class MyCustomModel(mlflow.pyfunc.PythonModel):
def predict(self, context, model_input):
# This log will be persisted to the <prefix>_otel_logs table
logging.warning("Received inference request")
try:
# Your model logic here
result = model_input * 2
return result
except Exception as e:
# Error logs are also captured with severity 'ERROR'
logging.error(f"Inference failed: {e}")
raise eルート ログ レベルは
WARNINGに設定されています。ログ レベルを変更するには、 「トラブルシューティング」を参照してください。 -
(オプション) OpenTelemetry を使用してカスタム メトリクスとトレースを計測します。 基本的なログを超えてカスタム メトリクスとトレースをキャプチャするには、OpenTelemetry SDKインストルメンテーションをモデルに追加します。 カウンターを作成し、スパンを記録し、カスタム属性を添付する方法を示す完全な例については、次のセクションを展開してください。
例: OpenTelemetry を使用したカスタム メトリクス、スパン、モデル ロギング
Due to limitations in model serialization, you must write your model to a separate file before logging to avoid errors, as shown below using %%writefile return_input_model.py.
%%writefile return_input_model.py
import os
import mlflow
from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.metrics import get_meter, set_meter_provider
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.trace import get_tracer, set_tracer_provider
# ---- OTel initialization (per-worker) ----
resource = Resource.create({
"worker.pid": str(os.getpid()),
})
otlp_trace_exporter = OTLPSpanExporter()
tracer_provider = TracerProvider(resource=resource)
tracer_provider.add_span_processor(BatchSpanProcessor(otlp_trace_exporter))
set_tracer_provider(tracer_provider)
otlp_metric_exporter = OTLPMetricExporter()
metric_reader = PeriodicExportingMetricReader(otlp_metric_exporter)
meter_provider = MeterProvider(metric_readers=[metric_reader], resource=resource)
set_meter_provider(meter_provider)
_tracer = get_tracer(__name__)
_meter = get_meter(__name__)
_prediction_counter = _meter.create_counter(
name="prediction_count",
description="Number of predictions made",
unit="1"
)
class ReturnInputModel(mlflow.pyfunc.PythonModel):
def load_context(self, context):
self.tracer = _tracer
self.prediction_counter = _prediction_counter
def predict(self, context, model_input):
with self.tracer.start_as_current_span("ReturnInputModel.predict") as span:
span.set_attribute("input_shape", str(model_input.shape))
span.set_attribute("input_columns", str(list(model_input.columns)))
self.prediction_counter.add(1)
return model_input
mlflow.models.set_model(ReturnInputModel())
-
モデルをログに登録します。
Pythonimport pandas as pd
import mlflow
from mlflow.models import infer_signature
# Prepare tabular input/output for signature (pyfunc expects DataFrame)
input_df = pd.DataFrame({"inputs": ["hello world"]})
output_df = input_df.copy() # model returns input unchanged
# Log the model with OpenTelemetry dependencies (using code-based logging to avoid serialization issues)
with mlflow.start_run():
signature = infer_signature(input_df, output_df)
model_info = mlflow.pyfunc.log_model(
name="model",
python_model="return_input_model.py",
signature=signature,
input_example=input_df,
pip_requirements=[
"mlflow==3.1",
"opentelemetry-sdk",
"opentelemetry-exporter-otlp-proto-http",
],
)
# Register with serverless optimized deployment environment packing
# Use Unity Catalog name: catalog.schema.model_name
registered = mlflow.register_model(
model_info.model_uri,
MODEL_NAME,
env_pack="databricks_model_serving"
)
ステップ 2: Unity Catalog宛先を準備する
エンドポイントを作成する前に、テレメトリ データを受信するためのカタログとスキーマの準備ができていることを確認してください。Databricks は、このスキーマに必要なテーブルがまだ存在しない場合は自動的に作成します。
- カタログ エクスプローラーで、使用するカタログとスキーマ (例:
my_catalog.observability) に移動します。
ステップ 3: エンドポイント テレメトリを有効にする
新しいエンドポイントを作成するとき、または既存のエンドポイントにテレメトリを追加するときに、テレメトリを有効にすることができます。
- New endpoint
- Existing endpoint
UI でテレメトリを有効にするには:
- 左側のサイドバーの 「サービング」 に移動します。
- サービングエンドポイントの作成 をクリックします。
- エンドポイント テレメトリ セクション (プレビューとマークされています) で、構成オプションを展開します。
- Unity Catalog場所 : ステップ 2 で準備した宛先の カタログ と スキーマ を選択します。
- (オプション) テーブル プレフィックス : 生成されたテーブルのプレフィックスを入力します。空白のままにすると、プレフィックスは付きません。テーブルの名前は
<prefix>_otel_logs、<prefix>_otel_spans、<prefix>_otel_metricsです。 - 残りのエンドポイント構成 (モデルの選択、コンピュート設定) を完了し、 [作成] をクリックします。
API を使用してこれを行うには: APIを使用してテレメトリを有効にする
curl -X POST -H "Authorization: Bearer <your-token>" \
https://<workspace-url>/api/2.0/serving-endpoints \
-d '{
"name": "my-custom-logging-endpoint",
"config": {
"served_entities": [
{
"name": "my-model",
"entity_name": "my-model",
"entity_version": "1",
"workload_size": "Small",
"scale_to_zero_enabled": true
}
],
"telemetry_config": {
"table_names": {
"logs_table": "my_catalog.observability.custom_endpoint_logs",
"metrics_table": "my_catalog.observability.custom_endpoint_metrics",
"traces_table": "my_catalog.observability.custom_endpoint_spans"
}
}
}
}'
更新すると、新しいデプロイメントがトリガーされます。変更はデプロイメントが完了すると有効になります。
UI でテレメトリを有効にするには:
- エンドポイント ビュー ページの右側のパネルの [エンドポイント テレメトリ] セクションで、 [追加] をクリックします。
- Unity Catalog場所 : ステップ 2 で準備した宛先の カタログ と スキーマ を選択します。
- (オプション) テーブル プレフィックス : 生成されたテーブルのプレフィックスを入力します。空白のままにすると、プレフィックスは付きません。テーブルの名前は
<prefix>_otel_logs、<prefix>_otel_spans、<prefix>_otel_metricsです。 - 更新 をクリックします。
ステップ 4: テレメトリ データを確認してクエリする
エンドポイントがトラフィックを受信すると、設定されたUnity Catalogテーブルにテレメトリ データがストリームされます。
-
カタログ エクスプローラー または SQL エディター に移動します。
-
構成されたスキーマ内で
<prefix>_otel_logsという名前のテーブルを見つけます。 -
クエリを実行して、データが流れていることを確認します。
SQLSELECT * FROM <catalog>.<schema>.<prefix>_otel_logs
LIMIT 10;
テレメトリデータのクエリ
次の例は一般的なクエリを示しています。
テレメトリ テーブルの完全なスキーマを表示するには、次のコマンドを実行します。
DESCRIBE TABLE <catalog>.<schema>.<prefix>_otel_logs;
これらの列を使用して、テレメトリ データをフィルター処理および相関させます。
timestampseverity_textbodytrace_idspan_idattributes— イベント固有のメタデータを含むマップ。
過去1時間のエラーを確認する
SELECT
timestamp,
severity_text,
body,
attributes
FROM <catalog>.<schema>.<prefix>_otel_logs
WHERE
severity_text = 'ERROR'
AND timestamp > current_timestamp() - INTERVAL 1 HOUR
ORDER BY timestamp DESC;
トラブルシューティング
ログがテーブルに表示されません : オーバーヘッドを削減するために、ルート ログ レベルはデフォルトでWARNINGに設定されます。重大度の低いログをキャプチャするには、モデル コードのレベルを変更します。
class MyModel(mlflow.pyfunc.PythonModel):
def load_context(self, context):
root = logging.getLogger()
root.setLevel(logging.DEBUG)
for handler in root.handlers:
handler.setLevel(logging.DEBUG)
制限事項
エンドポイント テレメトリには次の制限が適用されます。
-
ターゲットテーブルでのスキーマ進化はサポートされていません。
-
管理されたDeltaテーブルのみがサポートされます。 外部ストレージおよび Arclight デフォルト ストレージはサポートされていません。
-
テーブルの場所は、ワークスペースと同じリージョンにある必要があります。
-
ASCII 文字、数字、アンダースコアを含むテーブル名のみがサポートされます。
-
ターゲット テーブルの再作成はサポートされていません。
-
単一の可用性ゾーン (single-az) の耐久性のみがサポートされます。
-
配信は少なくとも 1 回行われます。サーバーからの確認応答は、レコードが永続的であり、Delta テーブル内にあることを意味します。
-
レコードはそれぞれ 10 MB 未満である必要があります。
-
リクエストはそれぞれ 30 MB 未満である必要があります。
-
ログ行はそれぞれ 1 MB 未満である必要があります。
-
テレメトリのレイテンシは 2500 QPS を超えると低下します。
-
ログは出力されてから数秒後にUnity Catalogテーブルに表示されます。