Pular para o conteúdo principal

AI Search modelo de incorporação fundacional (GTE) de exemplo

Este Notebook demonstra como usar o SDK Python de Pesquisa de IA, que fornece um VectorSearchClient como API primária para trabalhar com a Pesquisa de IA.

Este notebook usa APIs do Foundation Model do Databricks para acessar o modelo de embeddings GTE para gerar embeddings.

Python
%pip install --upgrade --force-reinstall databricks-vectorsearch
dbutils.library.restartPython()
Python
from databricks.vector_search.client import VectorSearchClient

vsc = VectorSearchClient(disable_notice=True)
Python
help(VectorSearchClient)

Carregar dataset de exemplo em tabela Delta de origem

Isto cria a tabela Delta de origem.

Python
# Specify the catalog and schema to use. You must have USE_CATALOG privilege on the catalog and USE_SCHEMA and CREATE_TABLE privileges on the schema.
# Change the catalog and schema here if necessary.

catalog_name = "main"
schema_name = "default"
Python
source_table_name = "wiki_articles_demo"
source_table_fullname = f"{catalog_name}.{schema_name}.{source_table_name}"
Python
# Uncomment if you want to start from scratch.

# spark.sql(f"DROP TABLE {source_table_fullname}")
Python
source_df = spark.read.parquet("/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet").limit(10)
display(source_df)

Fragmento de dataset de amostra

Agrupar o dataset de exemplo ajuda a evitar exceder o limite de contexto do modelo de incorporação. O modelo GTE suporta até 8192 tokens. No entanto, a Databricks recomenda que os dados sejam divididos em partes de contexto menores para que seja possível alimentar uma variedade maior de exemplos no modelo de raciocínio para seu aplicativo RAG.

Python
import tiktoken
import pandas as pd

# The GTE model has been trained on a max context lenth of 8192 tokens.
max_chunk_tokens = 8192
encoding = tiktoken.get_encoding("cl100k_base")

def chunk_text(text):
# Encode and then decode within the UDF
tokens = encoding.encode(text)
chunks = []
while tokens:
chunk_tokens = tokens[:max_chunk_tokens]
chunk_text = encoding.decode(chunk_tokens)
chunks.append(chunk_text)
tokens = tokens[max_chunk_tokens:]
return chunks

# Process the data and store in a new list
pandas_df = source_df.toPandas()
processed_data = []
for index, row in pandas_df.iterrows():
text_chunks = chunk_text(row['text'])
chunk_no = 0
for chunk in text_chunks:
row_data = row.to_dict()

# replace the id column with a new unique chunk id
# and the text column with the text chunk
row_data['id'] = f"{row['id']}_{chunk_no}"
row_data['text'] = chunk

processed_data.append(row_data)
chunk_no += 1

chunked_pandas_df = pd.DataFrame(processed_data)
chunked_spark_df = spark.createDataFrame(chunked_pandas_df)

# Write the chunked DataFrame to a Delta table
spark.sql(f"DROP TABLE IF EXISTS {source_table_fullname}")
chunked_spark_df.write.format("delta") \
.option("delta.enableChangeDataFeed", "true") \
.saveAsTable(source_table_fullname)
Python
display(spark.sql(f"SELECT * FROM {source_table_fullname}"))

Criar endpoint

Python
vector_search_endpoint_name = "vector-search-demo-endpoint"
Python
vsc.create_endpoint(
name=vector_search_endpoint_name,
endpoint_type="STANDARD" # or "STORAGE_OPTIMIZED"
)
Python
vsc.get_endpoint(
name=vector_search_endpoint_name
)

Criar índice

Python
# AI Search index
vs_index = f"{source_table_name}_gte_index"
vs_index_fullname = f"{catalog_name}.{schema_name}.{vs_index}"

embedding_model_endpoint = "databricks-qwen3-embedding-0-6b"
Python
index = vsc.create_delta_sync_index(
endpoint_name=vector_search_endpoint_name,
source_table_name=source_table_fullname,
index_name=vs_index_fullname,
pipeline_type='TRIGGERED',
primary_key="id",
embedding_source_column="text",
embedding_model_endpoint_name=embedding_model_endpoint
)
index.describe()['status']['message']
Python
# Wait for index to come online. Expect this command to take several minutes.
# You can also track the status of the index build in Catalog Explorer in the
# Overview tab for the index.
import time
index = vsc.get_index(endpoint_name=vector_search_endpoint_name,index_name=vs_index_fullname)
while not index.describe().get('status')['ready']:
print("Waiting for index to be ready...")
time.sleep(30)
print("Index is ready!")
index.describe()

Pesquisa de similaridade

As seguintes células mostram como consultar o índice para encontrar documentos semelhantes.

Python
results = index.similarity_search(
query_text="Greek myths",
columns=["id", "text", "title"],
num_results=5
)
rows = results['result']['data_array']
for (id, text, title, score) in rows:
if len(text) > 32:
# trim text output for readability
text = text[0:32] + "..."
print(f"id: {id} title: {title} text: '{text}' score: {score}")
Python
# Search with a filter. Note that the syntax depends on the endpoint type.

# Standard endpoint syntax
results = index.similarity_search(
query_text="Greek myths",
columns=["id", "text", "title"],
num_results=5,
filters={"title NOT": "Hercules"}
)

# Storage-optimized endpoint syntax
# results = index.similarity_search(
# query_text="Greek myths",
# columns=["id", "text", "title"],
# num_results=5,
# filters='title != "Hercules"'
# )


rows = results['result']['data_array']
for (id, text, title, score) in rows:
if len(text) > 32:
# trim text output for readability
text = text[0:32] + "..."
print(f"id: {id} title: {title} text: '{text}' score: {score}")

Excluir índice

Python
vsc.delete_index(
endpoint_name=vector_search_endpoint_name,
index_name=vs_index_fullname
)

Notebook de exemplo

Exemplo de modelo de embedding fundacional de pesquisa de AI (GTE)

Abrir notebook em uma nova aba