RAG

RAG Pipelines

Advanced

RAG Pipelines
This guide brings everything together into complete RAG Pipelines — from simple query-answer flows to sophisticated agentic, multi-step, and stateful RAG systems using LangGraph. You will learn end-to-end architectures, query transformation, context management, streaming, evaluation, hallucination controls, and production-ready patterns with full working code.

RAG Pipelines

What Is a RAG Pipeline?

A RAG Pipeline is an orchestrated sequence of steps that turns a user query into a grounded, accurate response by retrieving relevant context and feeding it to an LLM. In LangGraph, this becomes a stateful, controllable, debuggable graph instead of fragile prompt chains.

End-to-End Retrieval Workflows

Basic structure:
  1. Query → Transformation
  2. Retrieval (possibly multi-step)
  3. Context Post-processing / Compression
  4. Generation
  5. (Optional) Self-correction / Verification

Query Transformation

Improve retrieval quality before searching.
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="gpt-4o-mini")

query_rewriter = ChatPromptTemplate.from_template(
    "Rewrite the following question to be more specific and effective for vector search.\n"
    "Original: {question}\nRewritten:"
)

def transform_query(state):
    question = state["messages"][-1].content
    rewritten = llm.invoke(query_rewriter.format(question=question))
    return {"messages": [rewritten], "original_question": question}

Retrieval Pipelines

from typing import TypedDict, Annotated, List
from langchain_core.documents import Document
import operator

class RAGState(TypedDict):
    messages: Annotated[list, operator.add]
    context: List[Document]
    original_question: str

def retrieve(state: RAGState):
    query = state.get("original_question") or state["messages"][-1].content
    docs = retriever.invoke(query)          # ensemble or advanced retriever
    return {"context": docs}

Context Injection Pipelines

def inject_context(state: RAGState):
    context_text = "\n\n".join([
        f"Source: {doc.metadata.get('source', 'unknown')}\n{doc.page_content}"
        for doc in state["context"]
    ])
    
    prompt = f"""You are a helpful assistant. Use only the provided context to answer.

Context:
{context_text}

Question: {state["messages"][-1].content}

Answer:"""
    
    return {"prompt": prompt}

Multi-Step RAG Workflows

def grade_documents(state: RAGState):
    relevant = []
    for doc in state["context"]:
        # Use LLM to grade relevance
        score = llm.invoke(f"Is this relevant to '{state['messages'][-1].content}'? YES/NO only.\n{doc.page_content[:400]}")
        if "YES" in score.content.upper():
            relevant.append(doc)
    return {"context": relevant}

# Graph with conditional routing
graph.add_node("retrieve", retrieve)
graph.add_node("grade", grade_documents)
graph.add_node("generate", generate)

graph.add_edge(START, "retrieve")
graph.add_edge("retrieve", "grade")
graph.add_conditional_edges(
    "grade",
    lambda s: "generate" if len(s["context"]) > 0 else "rewrite_query",
    {"generate": "generate", "rewrite_query": "transform_query"}
)

Agentic RAG Pipelines

Use tools + ReAct style.
from langgraph.prebuilt import create_react_agent
from langchain.tools import tool

@tool
def retrieve_knowledge(query: str) -> str:
    """Retrieve relevant documents for the query."""
    docs = retriever.invoke(query)
    return "\n\n".join([doc.page_content for doc in docs[:5]])

agent = create_react_agent(
    model=ChatOpenAI(model="gpt-4o"),
    tools=[retrieve_knowledge],
    checkpointer=MemorySaver()
)

# Run with memory
result = agent.invoke({
    "messages": [("user", "Compare LangGraph and CrewAI in 2026")]
}, config={"configurable": {"thread_id": "rag_agent_42"}})

ReAct + RAG Systems

Combine reasoning with retrieval in a loop.
from langgraph.graph import StateGraph, END

def should_retrieve(state):
    # LLM decides if more retrieval is needed
    decision = llm.invoke("Do I need more information? YES/NO")
    return "retrieve" if "YES" in decision.content.upper() else "generate"

Multi-Agent RAG Architectures

  • Router Agent → decides which retriever / knowledge base to use
  • Critic Agent → grades answer quality
  • Research Agent → performs multi-hop retrieval
  • Summarizer Agent → final synthesis

Streaming RAG Pipelines

async for event in app.astream_events({"messages": [HumanMessage(content=query)]}, version="v2"):
    if event["event"] == "on_chat_model_stream":
        print(event["data"]["chunk"].content, end="")

Stateful RAG Workflows (LangGraph Advantage)

Use MemorySaver or PostgresSaver for:
  • Conversation memory
  • Retrieved context persistence
  • Human-in-the-loop corrections
  • Multi-turn research sessions

RAG Evaluation Strategies

from ragas import evaluate
from ragas.metrics import (
    faithfulness, answer_relevancy, context_precision, context_recall
)

dataset = ...  # your test questions + ground truth

results = evaluate(dataset, metrics=[faithfulness, answer_relevancy, context_precision])
print(results)
Track: Faithfulness, Answer Relevancy, Context Precision/Recall, Latency, Cost.

Hallucination Reduction Techniques

  1. Strict context-only prompting
  2. Document grading + filtering
  3. Self-reflection / self-critique
  4. Citation enforcement
  5. Post-generation verification with another LLM
  6. Knowledge cutoff awareness
def verify_answer(state):
    verification_prompt = f"""Does the answer contain information not present in the context? 
Context: {state['context']}
Answer: {state['messages'][-1].content}
Reply ONLY with VERIFIED or HALLUCINATED."""
    result = llm.invoke(verification_prompt)
    if "HALLUCINATED" in result.content:
        return "rewrite"
    return END

Common RAG Pipeline Mistakes

  • Single naive retrieval step for complex queries
  • No query transformation or routing
  • Feeding raw noisy context to LLM
  • No hallucination / relevance checks
  • Ignoring state management across turns
  • No evaluation or monitoring
  • Fixed k without dynamic adjustment
  • Poor error handling and fallback strategies

Best Practices for RAG Pipelines

  1. Start simple, then add agentic/multi-step capabilities
  2. Always include query rewriting and document grading
  3. Use hybrid/ensemble retrievers
  4. Implement streaming + stateful graphs with LangGraph
  5. Add self-correction loops
  6. Enforce citations and source transparency
  7. Continuously evaluate with real user queries
  8. Monitor latency, token usage, and relevance scores
  9. Design for human-in-the-loop intervention
  10. Abstract pipeline components for easy experimentation
Pro Tip – Full Production RAG Pipeline in LangGraph
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.postgres import PostgresSaver

workflow = StateGraph(RAGState)

workflow.add_node("transform_query", transform_query)
workflow.add_node("retrieve", retrieve)
workflow.add_node("grade", grade_documents)
workflow.add_node("generate", generate)
workflow.add_node("verify", verify_answer)

workflow.add_edge(START, "transform_query")
workflow.add_edge("transform_query", "retrieve")
workflow.add_edge("retrieve", "grade")
workflow.add_edge("grade", "generate")
workflow.add_edge("generate", "verify")
workflow.add_conditional_edges("verify", lambda s: END if "VERIFIED" in ... else "generate")

app = workflow.compile(checkpointer=PostgresSaver.from_conn_string(...))
RAG Pipelines powered by LangGraph turn fragile prompt chains into robust, observable, and evolvable AI systems.

AI agent LangGraph Python RAG

← All training