databricks-logo

    dspy-create-rag-program

    (Python)
    Loading...

    Part 2: Create and optimize a DSPy program for RAG

    This notebook shows how to:

    • Create a basic RAG DSPy program.
    • Run the DSPy program from a notebook.
    • Optimize prompts using DSPy BootstrapFewShot optimizer.
    • Run the optimized DSPy program.

    This notebook is part 2 of 2 notebooks for creating a DSPy program for RAG.

    Requirements

    This notebook assumes:

    • You have completed and run the Part 1: Prepare data and vector search index for a RAG DSPy program notebook.
    • You have specified the following information in the notebook widgets:
      • vs_index: Databricks Vector Search index to be used in the RAG program.
      • source_catalog: UC catalog of the schema where the index is located.
      • source_schema: UC schema containing the Vector Search index.

    Install dependencies

    %pip install -qqqq dspy-ai>=2.5.0 openai<2 databricks-agents>=0.5.0 mlflow>=2.1.6.0
    dbutils.library.restartPython()

    Define notebook widgets

    dbutils.widgets.removeAll()
    format_widget_name = lambda x: x.replace('_', ' ').title()
    
    widget_defaults = {
        "source_catalog": "", # PLEASE ENTER YOUR CATALOG
        "source_schema": "", # PLEASE ENTER YOUR SCHEMA
        "vs_index": "", # PLEASE ENTER YOUR VECTOR SEARCH INDEX
    }
    for k, v in widget_defaultss.items():
        dbutils.widgets.text(k, v, format_widget_name(k))
    
    

    Define configurations

    The following example shows how to obtain a personal access token from the session using the specified notebook widget values. However, this method is not recommended for production; instead use a Databricks secret (AWS| Azure)

    from dbruntime.databricks_repl_context import get_context
    
    print("CONFIGURATIONS")
    config = {}
    for k in widget_defaultss.keys():
        config[k] = dbutils.widgets.get(k)
        assert config[k].strip() != "", f"Please provide a valid {format_widget_name(k)}"
        print(f"- config['{k}']= '{config[k]}'")
    
    config[
        "vs_index_fullname"
    ] = f"{config['source_catalog']}.{config['source_schema']}.{config['vs_index']}"
    
    print(f"- config['vs_index_fullname']= '{config['vs_index_fullname']}'")

    Define the DSPy program

    A DSPy program consists of a Python class inherited from dspy.Module that implements the forward() method, which runs the following steps:

    • Query a Databricks Vector Search index to retrieve document chunks (context) related to the request.
    • Generate an response by sending the context containing the document chunks and the request to an LLM.

    The __init__ function initializes the resources the forward function uses. In this example, the resources are:

    • retrieve: Databricks Vector Search retriever
    • lm: Databricks Foundation Model pay-per-token Llama3-1-8B-instruct
    • response_generator: The prediction technique, in this case DSPy.predict, that uses an LLM to process retrieved documents and instructions to generate a response. Additional prediction techniques include dspy.ChainOfThought and dspy.ReAct.
    import dspy
    from dspy.retrieve.databricks_rm import DatabricksRM
    import os
    
    
    class RAG(dspy.Module):
        def __init__(self, num_passages=3):
            super().__init__()
    
            # Define the retriever that fetches relevant documents from the Databricks Vector Search index
            self.retriever = DatabricksRM(
                databricks_index_name=config["vs_index_fullname"],
                text_column_name="chunk",
                docs_id_column_name="id",
                k=1,
            )
            # Define the language model that will be used for response generation
            self.lm = dspy.LM("databricks/databricks-meta-llama-3-1-8b-instruct")
    
            # Define the program signature
            # The response generator will be provided with a "context" and a "request",
            # and will return a "response"
            signature = "context, request -> response"
    
            # Define response generator
            self.response_generator = dspy.Predict(signature)
    
        def forward(self, request):
    
            # Obtain context by executing a Databricks Vector Search query
            retrieved_context = self.retriever(request)
    
            # Generate a response using the language model defined in the __init__ method
            with dspy.context(lm=self.lm):
                response = self.response_generator(
                    context=retrieved_context.docs, request=request
                ).response
    
            return dspy.Prediction(response=response)
    
    
    

    Run the program

    To run the DSPy program, instantiate it and pass in the request.

    # Instantiating DSPy program
    rag = RAG()
    
    # Running a query
    result = rag("Who is Zeus?")
    
    # Printing response
    print(result.response)

    Not bad for such a simple program!!

    Try another query:

    # Running another query
    result = rag("Who is the father of the brother of Hercules?")
    
    # Printing response
    print(result.response)

    This response is unxpected, since the program should have responded with something contextually related to our query. When this happens, you can inspect the prompt generated by DSPy.

    Inspecting generated prompt

    rag.lm.inspect_history()

    You can see it is a simple prompt with minimal instructions. Try optimizing it by providing few-shot examples. DSPy selects which few-shot examples are most effective based on an evaluation criteria.

    Optimizing prompts

    Define training set

    First, define eight examples of request and expected_response pairs.

    train_set = [
        # Defining a list of DSPy examples taking "request" as the input
        dspy.Example(**item).with_inputs("request")
        for item in [
            {"request": "Who is the son of Zeus?", "expected_response": "Hercules"},
            {"request": "Who is Zeus?", "expected_response": "A Greek god"},
            {
                "request": "What can you tell me about Greek mythology?",
                "expected_response": "Greek myth takes many forms, from religious myths of origin to folktales and legends of heroes",
            },
            {
                "request": "Who is Frederick Barbarossa?",
                "expected_response": "King of Germany in 1152 and Holy Roman Emperor in 1155",
            },
            {
                "request": "When was Frederick Barbarossa a king?",
                "expected_response": "In the year eleven hundred fifty two",
            },
            {
                "request": "Which kingdom did Frederick Barbarossa rule?",
                "expected_response": "Kingdom of Germany",
            },
            {
                "request": "Who is Tom McNab?",
                "expected_response": "Tom McNab has been national champion for triple jump five times and is the author of 'The Complete Book of Track and Field'",
            },
            {
                "request": "Who wrote 'The Complete Book of Track and Field'?",
                "expected_response": "Tom McNab",
            },
        ]
    ]

    Define a prompt optimization evaluation function

    The following defines and implements a function to evaluate if the responses from the program are correct. Mosaic Agent Evaluation (AWS | Azure) is an ideal tool for this purpose.

    import mlflow
    import pandas as pd
    from databricks.agents.evals import judges
    
    def evalute_using_mosaic_agent(example, pred, trace=None):
        # Running evaluation using the Mosaic Agent Evaluation
        return judges.correctness(
                request=example.request,
                response=pred.response,
                expected_response=example.expected_response,
                ).value.name == "YES"

    Run optimization

    Now, the final step is to run the optimization. DSPy offers several optimizers, this example uses the BootstrapFewShot optimizer. The BootstrapFewShot optimizer selects the best few-shot examples for all the stages of the DSPy program, but in this notebook you only use one stage. The examples are obtained from the training set labels (expected_response) and the evaluation executions. For more information about this and other optimizers, see the DSPy documentaion.

    from dspy.evaluate.evaluate import Evaluate
    from dspy.teleprompt import BootstrapFewShot
    
    # Set up a bootstrap optimizer, which optimizes the RAG program.
    optimizer = BootstrapFewShot(
        metric=evalute_using_mosaic_agent, # Use defined evaluation function
        max_bootstrapped_demos=4, # Max number of examples obtained from running the train set
        max_labeled_demos=8 # Max number of examples obtained from labels in the train set
    )
    
    # Start a new MLflow run to track all evaluation metrics
    with mlflow.start_run(run_name="dspy_rag_optimization"):
        # Optimize the program by identifying the best few-shot examples for the prompt used by the `response_generator` step
        optimized_rag = optimizer.compile(rag, trainset=train_set)

    Run the optimized DSPy module

    Try the tricky question again:

    result = optimized_rag("Who is father of the brother of Hercules?")
    print(result.response)

    Inspect the prompt used by the optimized program

    When inspecting the prompt generated from the optimized program, the few-shot examples are added by DSPy:

    optimized_rag.lm.inspect_history()
    ;