How to stream AI responses and show source documents after completion using FastAPI and Langchain RAG

I’m working on a retrieval-augmented generation system using Langchain and want to deploy it through FastAPI. The streaming works fine when I call my API endpoint, but I need help with handling source documents properly.

Right now my response streams correctly, but I want to:

  1. Stream the AI-generated answer first
  2. Show the source documents only after streaming completes
  3. Process the source documents before displaying them to users

Here’s my current implementation:

# API call example: http://127.0.0.1:8000/chat_query?text=What%20are%20the%20main%20features%20of%20our%20system%3F

from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_community.vectorstores import FAISS
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser

embedding_model = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2", model_kwargs={'device': "cpu"})
vector_store = FAISS.load_local("my_vectorstore_path", embedding_model, allow_dangerous_deserialization=True)
doc_retriever = vector_store.as_retriever(search_kwargs={'k': 5, 'score_threshold': 0.7}, search_type="similarity_score_threshold")

def generate_response(user_query: str):
    start_timer = time.time()
    retrieved_docs = doc_retriever.get_relevant_documents(user_query)
    response_data = {"query": user_query, "answer": "", "references": []}
    
    system_prompt = f"""You are a helpful assistant. Please provide accurate information based on the following context:
{retrieved_docs}

User Question: {user_query}

Response:"""
    
    generated_text = ""
    is_first_chunk = True
    
    for chunk in language_model.stream(system_prompt):
        if chunk:
            generated_text += chunk
            if is_first_chunk:
                end_timer = time.time()
                processing_time = round(end_timer - start_timer, 2)
                is_first_chunk = False
                yield f"(Processing took: {processing_time}s)\n"
        yield chunk
    
    if retrieved_docs:
        yield "\n\nSources:\n"
        for idx, document in enumerate(retrieved_docs):
            yield document.metadata["source"].split("/")[-1] + f", Page: {document.metadata['page']+1}\n"
        response_data["references"] = [{"file": doc.metadata["source"], "page_num": doc.metadata["page"]+1} for doc in retrieved_docs]
    else:
        yield "\n\nNote: No relevant sources found for this query."
    
    yield response_data

api_app = FastAPI(
    title="Document Q&A API",
    description="API for querying documents with streaming responses",
    version="1.0"
)

@api_app.get('/chat_query', response_class=JSONResponse)
async def handle_query(text: str):
    return StreamingResponse(generate_response(text), media_type='text/plain')

My main questions are:

  • How can I modify this to return both streamed content and source documents properly?
  • Is there a way to stream the answer first, then show the sources without the user seeing the raw document data?

I thought about creating a separate endpoint for sources, but that means running the retrieval twice which seems inefficient:

@api_app.get('/get_sources')
async def fetch_sources(text: str):
    docs = doc_retriever.get_relevant_documents(text)
    return docs

Any suggestions would be appreciated!

just use websockets instead of hacking streaming responses. send text chunks as they come, then push source docs as json when you’re done. no weird delimiters or mixed content types to deal with. i built mine with fastapi’s websocket support and it works great.

Had the same issue when I built our document chat system. Here’s what worked: don’t mix data types in the same generator - it’s a nightmare.

I store retrieved docs in a temp cache (just a dict with timestamp cleanup) right after retrieval, then stream only the AI response chunks. When streaming’s done, make a second call to grab the source metadata using a session ID or request hash. Yeah, it’s two requests, but you’re not running retrieval twice since everything’s cached.

For the frontend, I keep it simple - streaming endpoint returns pure text, then immediately hits /sources/{session_id} for clean JSON metadata. Way cleaner than parsing mixed content client-side, and you skip SSE complexity if you don’t need real-time updates.

Had the same issue building my RAG system last year. You need to split your streaming logic from document handling completely. Don’t yield everything in one generator - use Server-Sent Events with FastAPI instead. Stream the response first, then send the source documents as a separate event. Use EventSourceResponse from sse-starlette and emit different event types - text chunks and sources. I cached the retrieved docs in memory (Redis for prod) with a unique request ID, then sent that ID with the final source event. Fixes the double retrieval problem and keeps streaming smooth. Your current setup’s on the right track, but mixing the response dictionary with streaming just creates confusion. Keep streaming pure text and handle structured data through separate event types.

Your streaming approach is on the right track, but you’re making it way harder than it needs to be by mixing response types in one generator.

I hit this exact same issue building our internal docs chatbot. Skip the caching and SSE stuff - there’s a much simpler way that works perfectly.

Here’s the trick: keep streaming separate from source handling, but do it all in one request. Stream your AI response like normal, then when it’s done, add a JSON delimiter and send the source metadata as structured data.

async def handle_query(text: str):
    async def stream_with_sources():
        retrieved_docs = doc_retriever.get_relevant_documents(text)
        
        # Stream AI response first
        for chunk in language_model.stream(system_prompt):
            if chunk:
                yield chunk
        
        # Send delimiter then sources as JSON
        yield "\n---SOURCES---\n"
        source_data = {
            "sources": [{
                "filename": doc.metadata["source"].split("/")[-1],
                "page": doc.metadata["page"] + 1
            } for doc in retrieved_docs]
        }
        yield json.dumps(source_data)
    
    return StreamingResponse(stream_with_sources(), media_type='text/plain')

On the frontend, just split on your delimiter and parse the JSON separately. No caching, no multiple requests, no SSE headaches. I’ve been running this in production for 8 months with zero problems.

The key thing to remember: streaming doesn’t mean everything has to be unstructured text. You can stream structured content at the end.