How can I retrieve source context used by Langchain RAG pipeline in Python

I created a RAG application using Langchain and I need help getting the source text that was used to generate answers. Right now my system works fine but I only get back the final JSON response. I want to also capture which document chunks were actually used by the model.

doc_loader = PyPDFLoader(file_path)
source_docs = doc_loader.load()

chunk_splitter = RecursiveCharacterTextSplitter(chunk_size=800, chunk_overlap=150)
processed_chunks = chunk_splitter.split_documents(source_docs)

reranker = CohereRerank(
    top_n=num_results,
    model="rerank-english-v3.0",
    cohere_api_key="your_key_here"
)

base_retriever = vector_db.as_retriever(
    search_type="similarity", 
    search_kwargs={"k": num_results}
)

compressed_retriever = ContextualCompressionRetriever(
    base_compressor=reranker, 
    base_retriever=base_retriever
)

def combine_chunks(chunk_list):
    return "\n\n".join(chunk.page_content for chunk in chunk_list)

schema_list = [
    ResponseSchema(name="cost", description="Total cost", type="float"),
    ResponseSchema(name="quantity", description="Number of items", type="int"),
]
parser = StructuredOutputParser.from_response_schemas(schema_list)

prompt_template = PromptTemplate(
    input_variables=["retrieved_text", "user_query"],
    template=my_template,
    partial_variables={"format_instructions": parser.get_format_instructions()},
)

processing_chain = (
    {"retrieved_text": compressed_retriever | combine_chunks, "user_query": RunnablePassthrough()}
    | prompt_template
    | model
    | parser
)

user_question = "What is the total cost and how many items are there?"
result = processing_chain.invoke(user_question)

My current output only contains the structured data I requested. But I also want to include a field that shows the actual document excerpts that were referenced to create this answer. How can I modify my chain to return both the parsed response and the source context that was used?

You need to modify your chain to capture the retrieved documents with the final response. Here’s how to restructure it so you keep the source chunks while getting your parsed output.

Replace your current processing_chain with this:

def process_with_sources(query):
    retrieved_docs = compressed_retriever.invoke(query)
    combined_context = combine_chunks(retrieved_docs)
    
    chain_input = {"retrieved_text": combined_context, "user_query": query}
    parsed_result = (prompt_template | model | parser).invoke(chain_input)
    
    return {
        "parsed_response": parsed_result,
        "source_contexts": [doc.page_content for doc in retrieved_docs],
        "metadata": [doc.metadata for doc in retrieved_docs]
    }

result = process_with_sources(user_question)

This splits retrieval from generation, so you can access both the original documents and the final structured output. The metadata field’s especially useful since it usually has page numbers and file paths from your PDF loader.

The manual approach works, but this screams automation to me. Multi-step data processing with source tracking? Perfect workflow material. You can automate the entire RAG pipeline - document retrieval, parsing, source context - without all that custom Python.

I’ve built similar systems for compliance tracking. Instead of babysitting retriever chains and capturing intermediate results manually, I just automated the whole thing.

The workflow handles everything: PDF loading, chunking, vector search, reranking, LLM processing. Source metadata gets preserved automatically at every step. You get your structured JSON response AND a complete audit trail showing which document chunks fed each answer.

You can also monitor retrieval quality, set alerts for dropping confidence scores, and A/B test different chunking strategies - all without touching code.

Best part? Your RAG pipeline becomes reusable. Non-technical team members can configure and run it themselves. No more debugging chain compositions or losing source context in complex transformations.

You can modify your existing chain to track sources without breaking anything. Just extend your combine_chunks function to store the document references too.

def combine_chunks_with_tracking(chunk_list):
    combined_text = "\n\n".join(chunk.page_content for chunk in chunk_list)
    return {"text": combined_text, "chunks": chunk_list}

def enhanced_parser(output):
    parsed = parser.invoke(output["llm_response"])
    parsed["source_chunks"] = [doc.page_content for doc in output["original_docs"]]
    return parsed

processing_chain = (
    {"context_data": compressed_retriever | RunnableLambda(combine_chunks_with_tracking), "user_query": RunnablePassthrough()}
    | RunnableLambda(lambda x: {
        "retrieved_text": x["context_data"]["text"],
        "user_query": x["user_query"],
        "original_docs": x["context_data"]["chunks"]
    })
    | {"llm_response": prompt_template | model, "original_docs": lambda x: x["original_docs"]}
    | RunnableLambda(enhanced_parser)
)

This keeps your chain approach but automatically adds source context to every response. Best part? Minimal changes to your working system.

Easiest approach I’ve found is just adding sources directly to your schema. Throw in another ResponseSchema for source chunks and let the LLM handle it:

schema_list = [
    ResponseSchema(name="cost", description="Total cost", type="float"),
    ResponseSchema(name="quantity", description="Number of items", type="int"),
    ResponseSchema(name="sources", description="relevant text excerpts used", type="string")
]

Update your prompt to ask the model to include relevant chunks it used. Works surprisingly well and keeps everything in one clean response without messing with chains.

Had this exact problem last year building a document analysis system. Use a custom RunnableLambda that grabs the retrieval results and passes them to your final output.

Here’s what worked:

from langchain.schema.runnable import RunnableLambda

def capture_retrieval_and_response(query):
    def retrieval_step(query_input):
        docs = compressed_retriever.invoke(query_input)
        combined_text = combine_chunks(docs)
        return {
            "retrieved_text": combined_text,
            "user_query": query_input,
            "source_docs": docs  # Keep the original docs
        }
    
    def final_step(chain_data):
        parsed_output = (prompt_template | model | parser).invoke({
            "retrieved_text": chain_data["retrieved_text"],
            "user_query": chain_data["user_query"]
        })
        
        # Add source context to your structured output
        parsed_output["source_contexts"] = [doc.page_content for doc in chain_data["source_docs"]]
        return parsed_output
    
    return RunnableLambda(retrieval_step) | RunnableLambda(final_step)

enhanced_chain = capture_retrieval_and_response(user_question)
result = enhanced_chain.invoke(user_question)

Keeps everything in one chain while capturing the intermediate retrieval step. Your result gets both the structured fields you defined plus the source_contexts field with the actual text chunks used.