databricks-logo

    vector-search-python-sdk-example

    (Python)
    Loading...

    Vector Search Python SDK example usage

    This notebook shows how to use the Vector Search Python SDK, which provides a VectorSearchClient as a primary API for working with Vector Search.

    Alternatively, you can call the REST API directly.

    Requirements

    This notebook assumes that a Model Serving endpoint named databricks-gte-large-en exists. To create that endpoint, see the notebook "Call a GTE embeddings model using Mosaic AI Model Serving" (AWS | Azure | GCP).

    2
    %pip install --upgrade --force-reinstall databricks-vectorsearch
    dbutils.library.restartPython()
    3
    from databricks.vector_search.client import VectorSearchClient
    
    vsc = VectorSearchClient()
    4
    help(VectorSearchClient)

    Load toy dataset into source Delta table

    The following creates the source Delta table.

    # Specify the catalog and schema to use. You must have USE_CATALOG privilege on the catalog and USE_SCHEMA and CREATE_TABLE privileges on the schema.
    # Change the catalog and schema here if necessary.
    
    catalog_name = "main"
    schema_name = "default"
    
    
    source_table_name = "en_wiki"
    source_table_fullname = f"{catalog_name}.{schema_name}.{source_table_name}"
    # Uncomment if you want to start from scratch.
    
    # spark.sql(f"DROP TABLE {source_table_fullname}")
    source_df = spark.read.parquet("/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet").limit(10)
    display(source_df)
    source_df.write.format("delta").option("delta.enableChangeDataFeed", "true").saveAsTable(source_table_fullname)
    display(spark.sql(f"SELECT * FROM {source_table_fullname}"))

    Create vector search endpoint

    vector_search_endpoint_name = "vector-search-demo-endpoint"
    vsc.create_endpoint(
        name=vector_search_endpoint_name,
        endpoint_type="STANDARD" # or "STORAGE_OPTIMIZED"
    )
    endpoint = vsc.get_endpoint(
      name=vector_search_endpoint_name)
    endpoint

    Create vector index

    # Vector index
    vs_index = "en_wiki_index"
    vs_index_fullname = f"{catalog_name}.{schema_name}.{vs_index}"
    
    embedding_model_endpoint = "databricks-gte-large-en"
    index = vsc.create_delta_sync_index(
      endpoint_name=vector_search_endpoint_name,
      source_table_name=source_table_fullname,
      index_name=vs_index_fullname,
      pipeline_type='TRIGGERED',
      primary_key="id",
      embedding_source_column="text",
      embedding_model_endpoint_name=embedding_model_endpoint
    )
    index.describe()

    Get a vector index

    Use get_index() to retrieve the vector index object using the vector index name. You can also use describe() on the index object to see a summary of the index's configuration information.

    
    index = vsc.get_index(endpoint_name=vector_search_endpoint_name, index_name=vs_index_fullname)
    
    index.describe()
    # Wait for index to come online. Expect this command to take several minutes.
    import time
    while not index.describe().get('status').get('detailed_state').startswith('ONLINE'):
    
     print("Waiting for index to be ONLINE...")
      time.sleep(5)
    print("Index is ONLINE")
    index.describe()

    Similarity search

    Query the Vector Index to find similar documents.

    # Returns [col1, col2, ...]
    # You can set this to any subset of the columns.
    all_columns = spark.table(source_table_fullname).columns
    
    results = index.similarity_search(
      query_text="Greek myths",
      columns=all_columns,
      num_results=2)
    
    results
    # Search with a filter. Note that the syntax depends on the endpoint type.
    
    # Standard endpoint syntax
    results = index.similarity_search(
      query_text="Greek myths",
      columns=all_columns,
      filters={"id NOT": ("13770", "88231")},
      num_results=2)
    
    # Storage-optimized endpoint syntax
    # results = index.similarity_search(
    #   query_text="Greek myths",
    #   columns=all_columns,
    #   filters='id NOT IN ("13770", "88231")',
    #   num_results=2)
    
    results
    
    

    Convert results to LangChain documents

    The first column retrieved is loaded into page_content, and the rest into metadata.

    # You must have langchain installed on your cluster. The following command installs or upgrades langchain.
    %pip install --upgrade langchain
    from langchain.schema import Document
    from typing import List
    
    def convert_vector_search_to_documents(results) -> List[Document]:
      column_names = []
      for column in results["manifest"]["columns"]:
          column_names.append(column)
    
      langchain_docs = []
      for item in results["result"]["data_array"]:
          metadata = {}
          score = item[-1]
          # print(score)
          i = 1
          for field in item[1:-1]:
              # print(field + "--")
              metadata[column_names[i]["name"]] = field
              i = i + 1
          doc = Document(page_content=item[0], metadata=metadata)  # , 9)
          langchain_docs.append(doc)
      return langchain_docs
    
    langchain_docs = convert_vector_search_to_documents(results)
    
    langchain_docs

    Delete vector index

    vsc.delete_index(index_name=vs_index_fullname)
    ;