ベクトル検索外部埋め込みモデル(OpenAI)の例
このノートブックでは、地下鉄検索Python SDK使用方法を説明します。これは、地下鉄検索を操作するための主要なAPIとしてVectorSearchClientを提供します。
このノートブックは、Databricksの外部モデルサポートを利用してOpenAIの埋め込みモデルにアクセスし、埋め込みを生成します。
Python
%pip install --upgrade --force-reinstall databricks-vectorsearch tiktoken
dbutils.library.restartPython()
Python
from databricks.vector_search.client import VectorSearchClient
vsc = VectorSearchClient(disable_notice=True)
Python
# Display help for the Vector Search Client
help(VectorSearchClient)
おもちゃのデータセットをDeltaテーブルにロードします
以下はDeltaテーブルを作成します。
Python
# Specify the catalog and schema to use. You must have USE_CATALOG privilege on the catalog and USE_SCHEMA and CREATE_TABLE privileges on the schema.
# Change the catalog and schema here if necessary.
catalog_name = "main"
schema_name = "default"
Python
source_table_name = "wiki_articles_demo"
source_table_fullname = f"{catalog_name}.{schema_name}.{source_table_name}"
Python
# Uncomment the following line if you want to start from scratch.
# spark.sql(f"DROP TABLE {source_table_fullname}")
Python
source_df = spark.read.parquet("/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet").limit(10)
display(source_df)
チャンクサンプルデータセット
サンプルデータセットを分割することで、埋め込みモデルのコンテキスト制限を超えることを回避できます。OpenAIモデルは最大8192個のトークンをサポートしています。ただし、Databricksでは、RAGアプリケーションの推論モデルに、より多様な例を入力できるように、データをより小さなコンテキストチャンクに分割することを推奨しています。
Python
import tiktoken
import pandas as pd
max_chunk_tokens = 1024
encoding = tiktoken.get_encoding("cl100k_base")
def chunk_text(text):
# Encode and then decode within the UDF
tokens = encoding.encode(text)
chunks = []
while tokens:
chunk_tokens = tokens[:max_chunk_tokens]
chunk_text = encoding.decode(chunk_tokens)
chunks.append(chunk_text)
tokens = tokens[max_chunk_tokens:]
return chunks
# Process the data and store in a new list
pandas_df = source_df.toPandas()
processed_data = []
for index, row in pandas_df.iterrows():
text_chunks = chunk_text(row['text'])
chunk_no = 0
for chunk in text_chunks:
row_data = row.to_dict()
# Replace the id column with a new unique chunk id
# and the text column with the text chunk
row_data['id'] = f"{row['id']}_{chunk_no}"
row_data['text'] = chunk
processed_data.append(row_data)
chunk_no += 1
chunked_pandas_df = pd.DataFrame(processed_data)
chunked_spark_df = spark.createDataFrame(chunked_pandas_df)
# Write the chunked DataFrame to a Delta table
spark.sql(f"DROP TABLE IF EXISTS {source_table_fullname}")
chunked_spark_df.write.format("delta") \
.option("delta.enableChangeDataFeed", "true") \
.saveAsTable(source_table_fullname)
Python
display(spark.sql(f"SELECT * FROM {source_table_fullname}"))
トレンド検索エンドポイントの作成
Python
vector_search_endpoint_name = "vector-search-demo-endpoint"
Python
vsc.create_endpoint(
name=vector_search_endpoint_name,
endpoint_type="STANDARD" # or "STORAGE_OPTIMIZED"
)
Python
vsc.get_endpoint(
name=vector_search_endpoint_name
)
OpenAI埋め込みモデルのエンドポイントを登録する
詳細な使用方法については、 OpenAIエンドポイントの設定に関する外部モデルのドキュメントを参照してください。
認証情報を提供するには、 Databricks シークレット マネージャーを使用してください。
Python
embedding_model_endpoint_name = "openai-embedding-endpoint"
Python
import mlflow.deployments
mlflow_deploy_client = mlflow.deployments.get_deploy_client("databricks")
# Configure the secret manager with the OpenAPI key and provide the
# correct scope and key name below.
mlflow_deploy_client.create_endpoint(
name=embedding_model_endpoint_name,
config={
"served_entities": [{
"external_model": {
"name": "text-embedding-ada-002",
"provider": "openai",
"task": "llm/v1/embeddings",
"openai_config": {
"openai_api_key": "{{secrets/demo/openai-api-key}}" # CHANGE ME
}
}
}]
}
)
ベクターインデックスを作成する
Python
# Vector index
vs_index = f"{source_table_name}_openai_index"
vs_index_fullname = f"{catalog_name}.{schema_name}.{vs_index}"
Python
index = vsc.create_delta_sync_index(
endpoint_name=vector_search_endpoint_name,
source_table_name=source_table_fullname,
index_name=vs_index_fullname,
pipeline_type='TRIGGERED',
primary_key="id",
embedding_source_column="text",
embedding_model_endpoint_name=embedding_model_endpoint_name
)
index.describe()['status']['message']
Python
# Wait for index to come online. Expect this command to take several minutes.
# You can also track the status of the index build in Catalog Explorer in the
# Overview tab for the vector index.
import time
index = vsc.get_index(endpoint_name=vector_search_endpoint_name,index_name=vs_index_fullname)
while not index.describe().get('status')['ready']:
print("Waiting for index to be ready...")
time.sleep(30)
print("Index is ready!")
index.describe()
類似性検索
以下のセルは、ベクターインデックスを照会して類似の文書を検索する方法を示しています。
Python
results = index.similarity_search(
query_text="Greek myths",
columns=["id", "text", "title"],
num_results=5
)
rows = results['result']['data_array']
for (id, text, title, score) in rows:
if len(text) > 32:
# trim text output for readability
text = text[0:32] + "..."
print(f"id: {id} title: {title} text: '{text}' score: {score}")
Python
# Search with a filter. Note that the syntax depends on the endpoint type.
# Standard endpoint syntax
results = index.similarity_search(
query_text="Greek myths",
columns=["id", "text", "title"],
num_results=5,
filters={"title NOT": "Hercules"}
)
# Storage-optimized endpoint syntax
# results = index.similarity_search(
# query_text="Greek myths",
# columns=["id", "text", "title"],
# num_results=5,
# filters='title != "Hercules"'
# )
rows = results['result']['data_array']
for (id, text, title, score) in rows:
if len(text) > 32:
# trim text output for readability
text = text[0:32] + "..."
print(f"id: {id} title: {title} text: '{text}' score: {score}")
ベクターインデックスを削除
Python
vsc.delete_index(
endpoint_name=vector_search_endpoint_name,
index_name=vs_index_fullname
)