I’m working on a retrieval-augmented generation system using Langchain and want to deploy it through FastAPI. The streaming works fine when I call my API endpoint, but I need help with handling source documents properly.
Right now my response streams correctly, but I want to:
- Stream the AI-generated answer first
- Show the source documents only after streaming completes
- Process the source documents before displaying them to users
Here’s my current implementation:
# API call example: http://127.0.0.1:8000/chat_query?text=What%20are%20the%20main%20features%20of%20our%20system%3F
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_community.vectorstores import FAISS
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser
embedding_model = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2", model_kwargs={'device': "cpu"})
vector_store = FAISS.load_local("my_vectorstore_path", embedding_model, allow_dangerous_deserialization=True)
doc_retriever = vector_store.as_retriever(search_kwargs={'k': 5, 'score_threshold': 0.7}, search_type="similarity_score_threshold")
def generate_response(user_query: str):
start_timer = time.time()
retrieved_docs = doc_retriever.get_relevant_documents(user_query)
response_data = {"query": user_query, "answer": "", "references": []}
system_prompt = f"""You are a helpful assistant. Please provide accurate information based on the following context:
{retrieved_docs}
User Question: {user_query}
Response:"""
generated_text = ""
is_first_chunk = True
for chunk in language_model.stream(system_prompt):
if chunk:
generated_text += chunk
if is_first_chunk:
end_timer = time.time()
processing_time = round(end_timer - start_timer, 2)
is_first_chunk = False
yield f"(Processing took: {processing_time}s)\n"
yield chunk
if retrieved_docs:
yield "\n\nSources:\n"
for idx, document in enumerate(retrieved_docs):
yield document.metadata["source"].split("/")[-1] + f", Page: {document.metadata['page']+1}\n"
response_data["references"] = [{"file": doc.metadata["source"], "page_num": doc.metadata["page"]+1} for doc in retrieved_docs]
else:
yield "\n\nNote: No relevant sources found for this query."
yield response_data
api_app = FastAPI(
title="Document Q&A API",
description="API for querying documents with streaming responses",
version="1.0"
)
@api_app.get('/chat_query', response_class=JSONResponse)
async def handle_query(text: str):
return StreamingResponse(generate_response(text), media_type='text/plain')
My main questions are:
- How can I modify this to return both streamed content and source documents properly?
- Is there a way to stream the answer first, then show the sources without the user seeing the raw document data?
I thought about creating a separate endpoint for sources, but that means running the retrieval twice which seems inefficient:
@api_app.get('/get_sources')
async def fetch_sources(text: str):
docs = doc_retriever.get_relevant_documents(text)
return docs
Any suggestions would be appreciated!