Build a RAG chatbot with Azure Cosmos DB NoSQL API

In this guide, we demonstrate how to build a RAG Pattern application using a subset of the Movie Lens dataset. This sample leverages the Python SDK for Azure Cosmos DB for NoSQL to perform vector search for RAG, store and retrieve chat history, and store vectors of the chat history to use as a semantic cache. Azure OpenAI is used to generate embeddings and Large Language Model (LLM) completions.

At the end, we create a simple UX using Gradio to allow users to type in questions and display responses generated by Azure OpenAI or served from the cache. The responses will also display an elapsed time to show the impact caching has on performance versus generating a response.

Tip

You can find the full Python notebook sample here.

For more RAG samples, visit: AzureDataRetrievalAugmentedGenerationSamples

Important

This sample requires you to set up accounts for Azure Cosmos DB for NoSQL and Azure OpenAI. To get started, visit:

1. Install Required Packages

Install the necessary Python packages to interact with Azure Cosmos DB and other services.

! pip install --user python-dotenv
! pip install --user aiohttp
! pip install --user openai
! pip install --user gradio
! pip install --user ijson
! pip install --user nest_asyncio
! pip install --user tenacity
# Note: ensure you have azure-cosmos version 4.7 or higher installed
! pip install --user azure-cosmos

2. Initialize Your Client Connection

Populate sample_env_file.env with the appropriate credentials for Azure Cosmos DB and Azure OpenAI.

cosmos_uri = "https://<replace with cosmos db account name>.documents.azure.com:443/"
cosmos_key = "<replace with cosmos db account key>"
cosmos_database_name = "database"
cosmos_collection_name = "vectorstore"
cosmos_vector_property_name = "vector"
cosmos_cache_database_name = "database"
cosmos_cache_collection_name = "vectorcache"
openai_endpoint = "<replace with azure openai endpoint>"
openai_key = "<replace with azure openai key>"
openai_type = "azure"
openai_api_version = "2023-05-15"
openai_embeddings_deployment = "<replace with azure openai embeddings deployment name>"
openai_embeddings_model = "<replace with azure openai embeddings model - e.g. text-embedding-3-large"
openai_embeddings_dimensions = "1536"
openai_completions_deployment = "<replace with azure openai completions deployment name>"
openai_completions_model = "<replace with azure openai completions model - e.g. gpt-35-turbo>"
storage_file_url = "https://cosmosdbcosmicworks.blob.core.windows.net/fabcondata/movielens_dataset.json"
# Import the required libraries
import time
import json
import uuid
import urllib 
import ijson
import zipfile
from dotenv import dotenv_values
from openai import AzureOpenAI
from azure.core.exceptions import AzureError
from azure.cosmos import PartitionKey, exceptions
from time import sleep
import gradio as gr

# Cosmos DB imports
from azure.cosmos import CosmosClient

# Load configuration
env_name = "sample_env_file.env"
config = dotenv_values(env_name)

cosmos_conn = config['cosmos_uri']
cosmos_key = config['cosmos_key']
cosmos_database = config['cosmos_database_name']
cosmos_collection = config['cosmos_collection_name']
cosmos_vector_property = config['cosmos_vector_property_name']
comsos_cache_db = config['cosmos_cache_database_name']
cosmos_cache = config['cosmos_cache_collection_name']

# Create the Azure Cosmos DB for NoSQL async client for faster data loading
cosmos_client = CosmosClient(url=cosmos_conn, credential=cosmos_key)

openai_endpoint = config['openai_endpoint']
openai_key = config['openai_key']
openai_api_version = config['openai_api_version']
openai_embeddings_deployment = config['openai_embeddings_deployment']
openai_embeddings_dimensions = int(config['openai_embeddings_dimensions'])
openai_completions_deployment = config['openai_completions_deployment']

# Movies file url
storage_file_url = config['storage_file_url']

# Create the OpenAI client
openai_client = AzureOpenAI(azure_endpoint=openai_endpoint, api_key=openai_key, api_version=openai_api_version)

3. Create a Database and Containers with Vector Policies

This function takes a database object, a collection name, the name of the document property that stores vectors, and the number of vector dimensions used for the embeddings.

db = cosmos_client.create_database_if_not_exists(cosmos_database)

# Create the vector embedding policy to specify vector details
vector_embedding_policy = {
    "vectorEmbeddings": [ 
        { 
            "path":"/" + cosmos_vector_property,
            "dataType":"float32",
            "distanceFunction":"cosine",
            "dimensions":openai_embeddings_dimensions
        }, 
    ]
}

# Create the vector index policy to specify vector details
indexing_policy = {
    "includedPaths": [ 
    { 
        "path": "/*" 
    } 
    ], 
    "excludedPaths": [ 
    { 
        "path": "/\"_etag\"/?",
        "path": "/" + cosmos_vector_property + "/*",
    } 
    ], 
    "vectorIndexes": [ 
        {
            "path": "/"+cosmos_vector_property, 
            "type": "quantizedFlat" 
        }
    ]
} 

# Create the data collection with vector index (note: this creates a container with 10000 RUs to allow fast data load)
try:
    movies_container = db.create_container_if_not_exists(id=cosmos_collection, 
                                                  partition_key=PartitionKey(path='/id'),
                                                  indexing_policy=indexing_policy, 
                                                  vector_embedding_policy=vector_embedding_policy,
                                                  offer_throughput=10000) 
    print('Container with id \'{0}\' created'.format(movies_container.id)) 

except exceptions.CosmosHttpResponseError: 
    raise 

# Create the cache collection with vector index
try:
    cache_container = db.create_container_if_not_exists(id=cosmos_cache, 
                                                  partition_key=PartitionKey(path='/id'), 
                                                  indexing_policy=indexing_policy,
                                                  vector_embedding_policy=vector_embedding_policy,
                                                  offer_throughput=1000) 
    print('Container with id \'{0}\' created'.format(cache_container.id)) 

except exceptions.CosmosHttpResponseError: 
    raise

4. Generate Embeddings from Azure OpenAI

This function vectorizes the user input for vector search. Ensure the dimensionality and model used match the sample data provided, or else regenerate vectors with your desired model.

from tenacity import retry, stop_after_attempt, wait_random_exponential 
import logging
@retry(wait=wait_random_exponential(min=2, max=300), stop=stop_after_attempt(20))
def generate_embeddings(text):
    try:        
        response = openai_client.embeddings.create(
            input=text,
            model=openai_embeddings_deployment,
            dimensions=openai_embeddings_dimensions
        )
        embeddings = response.model_dump()
        return embeddings['data'][0]['embedding']
    except Exception as e:
        # Log the exception with traceback for easier debugging
        logging.error("An error occurred while generating embeddings.", exc_info=True)
        raise

5. Load Data from the JSON File

Extract the prevectorized MovieLens dataset from the zip file (see its location in notebook repo here).

# Unzip the data file
with zipfile.ZipFile("../../DataSet/Movies/MovieLens-4489-256D.zip", 'r') as zip_ref:
    zip_ref.extractall("/Data")
zip_ref.close()

# Load the data file
data = []
with open('/Data/MovieLens-4489-256D.json', 'r') as d:
    data = json.load(d)

# View the number of documents in the data (4489)
len(data)

6. Store Data in Azure Cosmos DB

Upsert data into Azure Cosmos DB for NoSQL. Records are written asynchronously.

#The following code to get raw movies data is commented out in favour of
#getting prevectorized data. If you want to vectorize the raw data from
#storage_file_url, uncomment the below, and set vectorizeFlag=True

#data = urllib.request.urlopen(storage_file_url)
#data = json.load(data)

vectorizeFlag=False

import asyncio
import time
from concurrent.futures import ThreadPoolExecutor

async def generate_vectors(items, vector_property):
    # Create a thread pool executor for the synchronous generate_embeddings
    loop = asyncio.get_event_loop()
    
    # Define a function to call generate_embeddings using run_in_executor
    async def generate_embedding_for_item(item):
        try:
            # Offload the sync generate_embeddings to a thread
            vectorArray = await loop.run_in_executor(None, generate_embeddings, item['overview'])
            item[vector_property] = vectorArray
        except Exception as e:
            # Log or handle exceptions if needed
            logging.error(f"Error generating embedding for item: {item['overview'][:50]}...", exc_info=True)
    
    # Create tasks for all the items to generate embeddings concurrently
    tasks = [generate_embedding_for_item(item) for item in items]
    
    # Run all the tasks concurrently and wait for their completion
    await asyncio.gather(*tasks)
    
    return items

async def insert_data(vectorize=False):
    start_time = time.time()  # Record the start time
    
    # If vectorize flag is True, generate vectors for the data
    if vectorize:
        print("Vectorizing data, please wait...")
        global data
        data = await generate_vectors(data, "vector")

    counter = 0
    tasks = []
    max_concurrency = 5  # Adjust this value to control the level of concurrency
    semaphore = asyncio.Semaphore(max_concurrency)
    print("Starting doc load, please wait...")
    
    def upsert_item_sync(obj):
        movies_container.upsert_item(body=obj)
    
    async def upsert_object(obj):
        nonlocal counter
        async with semaphore:
            await asyncio.get_event_loop().run_in_executor(None, upsert_item_sync, obj)
            # Progress reporting
            counter += 1
            if counter % 100 == 0:
                print(f"Sent {counter} documents for insertion into collection.")
    
    for obj in data:
        tasks.append(asyncio.create_task(upsert_object(obj)))
    
    # Run all upsert tasks concurrently within the limits set by the semaphore
    await asyncio.gather(*tasks)
    
    end_time = time.time()  # Record the end time
    duration = end_time - start_time  # Calculate the duration
    print(f"All {counter} documents inserted!")
    print(f"Time taken: {duration:.2f} seconds ({duration:.3f} milliseconds)")

# Run the async function with the vectorize flag set to True or False as needed
await insert_data(vectorizeFlag)  # or await insert_data() for default

This function defines a vector search over the movies data and chat cache collections.

def vector_search(container, vectors, similarity_score=0.02, num_results=5):
    results = container.query_items(
        query='''
        SELECT TOP @num_results c.overview, VectorDistance(c.vector, @embedding) as SimilarityScore 
        FROM c
        WHERE VectorDistance(c.vector,@embedding) > @similarity_score
        ORDER BY VectorDistance(c.vector,@embedding)
        ''',
        parameters=[
            {"name": "@embedding", "value": vectors},
            {"name": "@num_results", "value": num_results},
            {"name": "@similarity_score", "value": similarity_score}
        ],
        enable_cross_partition_query=True,
        populate_query_metrics=True
    )
    results = list(results)
    formatted_results = [{'SimilarityScore': result.pop('SimilarityScore'), 'document': result} for result in results]

    return formatted_results

8. Get Recent Chat History

This function provides conversational context to the LLM, allowing it to better have a conversation with the user.

def get_chat_history(container, completions=3):
    results = container.query_items(
        query='''
        SELECT TOP @completions *
        FROM c
        ORDER BY c._ts DESC
        ''',
        parameters=[
            {"name": "@completions", "value": completions},
        ],
        enable_cross_partition_query=True
    )
    results = list(results)
    return results

9. Chat Completion Functions

Define the functions to handle the chat completion process, including caching responses.

def generate_completion(user_prompt, vector_search_results, chat_history):
    system_prompt = '''
    You are an intelligent assistant for movies. You are designed to provide helpful answers to user questions about movies in your database.
    You are friendly, helpful, and informative and can be lighthearted. Be concise in your responses, but still friendly.
     - Only answer questions related to the information provided below. Provide at least 3 candidate movie answers in a list.
     - Write two lines of whitespace between each answer in the list.
    '''

    messages = [{'role': 'system', 'content': system_prompt}]
    for chat in chat_history:
        messages.append({'role': 'user', 'content': chat['prompt'] + " " + chat['completion']})
    messages.append({'role': 'user', 'content': user_prompt})
    for result in vector_search_results:
        messages.append({'role': 'system', 'content': json.dumps(result['document'])})

    response = openai_client.chat.completions.create(
        model=openai_completions_deployment,
        messages=messages,
        temperature=0.1
    )    
    return response.model_dump()

def chat_completion(cache_container, movies_container, user_input):
    print("starting completion")
    # Generate embeddings from the user input
    user_embeddings = generate_embeddings(user_input)
    # Query the chat history cache first to see if this question has been asked before
    cache_results = get_cache(container=cache_container, vectors=user_embeddings, similarity_score=0.99, num_results=1)
    if len(cache_results) > 0:
        print("Cached Result\n")
        return cache_results[0]['completion'], True
        
    else:
        # Perform vector search on the movie collection
        print("New result\n")
        search_results = vector_search(movies_container, user_embeddings)

        print("Getting Chat History\n")
        # Chat history
        chat_history = get_chat_history(cache_container, 3)
        # Generate the completion
        print("Generating completions \n")
        completions_results = generate_completion(user_input, search_results, chat_history)

        print("Caching response \n")
        # Cache the response
        cache_response(cache_container, user_input, user_embeddings, completions_results)

        print("\n")
        # Return the generated LLM completion
        return completions_results['choices'][0]['message']['content'], False

10. Cache Generated Responses

Save the user prompts and generated completions to the cache for faster future responses.

def cache_response(container, user_prompt, prompt_vectors, response):
    chat_document = {
        'id': str(uuid.uuid4()),
        'prompt': user_prompt,
        'completion': response['choices'][0]['message']['content'],
        'completionTokens': str(response['usage']['completion_tokens']),
        'promptTokens': str(response['usage']['prompt_tokens']),
        'totalTokens': str(response['usage']['total_tokens']),
        'model': response['model'],
        'vector': prompt_vectors
    }
    container.create_item(body=chat_document)

def get_cache(container, vectors, similarity_score=0.0, num_results=5):
    # Execute the query
    results = container.query_items(
        query= '''
        SELECT TOP @num_results *
        FROM c
        WHERE VectorDistance(c.vector,@embedding) > @similarity_score
        ORDER BY VectorDistance(c.vector,@embedding)
        ''',
        parameters=[
            {"name": "@embedding", "value": vectors},
            {"name": "@num_results", "value": num_results},
            {"name": "@similarity_score", "value": similarity_score},
        ],
        enable_cross_partition_query=True, populate_query_metrics=True)
    results = list(results)
    return results

11. Create a Simple UX in Gradio

Build a user interface using Gradio for interacting with the AI application.

chat_history = []

with gr.Blocks() as demo:
    chatbot = gr.Chatbot(label="Cosmic Movie Assistant")
    msg = gr.Textbox(label="Ask me about movies in the Cosmic Movie Database!")
    clear = gr.Button("Clear")

    def user(user_message, chat_history):
        start_time = time.time()
        response_payload, cached = chat_completion(cache_container, movies_container, user_message)
        end_time = time.time()
        elapsed_time = round((end
        time - start_time) * 1000, 2)
        details = f"\n (Time: {elapsed_time}ms)"
        if cached:
            details += " (Cached)"
        chat_history.append([user_message, response_payload + details])
        
        return gr.update(value=""), chat_history
    
    msg.submit(user, [msg, chatbot], [msg, chatbot], queue=False)
    clear.click(lambda: None, None, chatbot, queue=False)

# Launch the Gradio interface
demo.launch(debug=True)

# Be sure to run this cell to close or restart the Gradio demo
demo.close()

Vector Database Solutions

Azure PostgreSQL Server pgvector Extension

Next step