OSS埋め込みモデルを登録およびサービングする
このノートブックは、AI検索で使用できるModel Servingエンドポイントに、オープンソースのテキスト埋め込みモデルe5-small-v2を設定します。
- Hugging Face Hubからモデルをダウンロードします。
- MLflow Model Registry に登録します。
- モデルをサーブするために、モデルサービングエンドポイントを起動します。
モデルe5-small-v2は、https://huggingface.co/intfloat/e5-small-v2で利用可能です。
- MITライセンス
- バリエーション:
Databricks Runtime に含まれるライブラリバージョンのリストについては、ご利用の Databricks Runtime バージョンのリリースノートを参照してください。
Databricks Python SDK をインストールする
このノートブックは、Pythonクライアントを使用してサービングエンドポイントを操作します。
Python
%pip install -U databricks-sdk python-snappy
%pip install sentence-transformers
dbutils.library.restartPython()
モデルのダウンロード
Python
# Download model using the sentence_transformers library.
from sentence_transformers import SentenceTransformer
source_model_name = 'intfloat/e5-small-v2' # model name on Hugging Face Hub
model = SentenceTransformer(source_model_name)
Python
# Test the model, just to show it works.
sentences = ["This is an example sentence", "Each sentence is converted"]
embeddings = model.encode(sentences)
print(embeddings)
モデルを MLflow に登録する
Python
import mlflow
mlflow.set_registry_uri("databricks-uc")
Python
# Specify the catalog and schema to use. You must have USE_CATALOG privilege on the catalog and USE_SCHEMA and CREATE_TABLE privileges on the schema.
# Change the catalog and schema here if necessary.
catalog = "main"
schema = "default"
model_name = "e5-small-v2"
Python
# MLflow model name. The Model Registry uses this name for the model.
registered_model_name = f"{catalog}.{schema}.{model_name}"
Python
# Compute input and output schema.
signature = mlflow.models.signature.infer_signature(sentences, embeddings)
print(signature)
Python
model_info = mlflow.sentence_transformers.log_model(
model,
artifact_path="model",
signature=signature,
input_example=sentences,
registered_model_name=registered_model_name)
Python
inference_test = ["I enjoy pies of both apple and cherry.", "I prefer cookies."]
# Load the custom model by providing the URI for where the model was logged.
loaded_model_pyfunc = mlflow.pyfunc.load_model(model_info.model_uri)
# Perform a quick test to ensure that the loaded model generates the correct output.
embeddings_test = loaded_model_pyfunc.predict(inference_test)
embeddings_test
Python
# Extract the version of the model you just registered.
mlflow_client = mlflow.MlflowClient()
def get_latest_model_version(model_name):
client = mlflow_client
model_version_infos = client.search_model_versions("name = '%s'" % model_name)
return max([int(model_version_info.version) for model_version_info in model_version_infos])
model_version = get_latest_model_version(registered_model_name)
model_version
モデルサービングエンドポイントを作成
詳細については、基盤モデルのサービングエンドポイントの作成を参照してください。
注 :この例では、0 にスケールダウンする_小規模な_ CPU エンドポイントを作成します。これは、迅速な小規模テストに適しています。より現実的なユースケースでは、埋め込み計算を高速化するためにGPUエンドポイントの使用をご検討ください。また、頻繁なクエリが予想される場合は、0へのスケールダウンは行わないでください。これは、モデルサービングのエンドポイントにはコールドスタートのオーバーヘッドがあるためです。
Python
endpoint_name = "e5-small-v2" # Name of endpoint to create
Python
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.serving import EndpointCoreConfigInput
w = WorkspaceClient()
Python
endpoint_config_dict = {
"served_entities": [
{
"name": f'{registered_model_name.replace(".", "_")}_{1}',
"entity_name": registered_model_name,
"entity_version": model_version,
"workload_type": "CPU",
"workload_size": "Small",
"scale_to_zero_enabled": True,
}
]
}
endpoint_config = EndpointCoreConfigInput.from_dict(endpoint_config_dict)
# The endpoint may take several minutes to get ready.
w.serving_endpoints.create_and_wait(name=endpoint_name, config=endpoint_config)
エンドポイントのクエリー
上記のcreate_and_waitコマンドは、エンドポイントの準備ができるまで待機します。Databricks UI でサービングエンドポイントのステータスも確認できます。
詳細については、「基盤モデルをクエリする」を参照してください。
Python
# Only run this command after the Model Serving endpoint is in the Ready state.
import time
start = time.time()
# If the endpoint is not yet ready, you might get a timeout error. If so, wait and then rerun the command.
endpoint_response = w.serving_endpoints.query(name=endpoint_name, dataframe_records=['Hello world', 'Good morning'])
end = time.time()
print(endpoint_response)
print(f'Time taken for querying endpoint in seconds: {end-start}')