Query Pipeline
Forge supports two query modes: agentic (LangGraph agent autonomously selects tools and iterates) and direct (fixed pipeline, single pass). Both produce streaming SSE responses.
Agentic Mode (Default)
The full agentic flow for a query like “How does the authentication system relate to compliance requirements?”:
User Query
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ 1. QUERY ANALYSIS │
│ forge/retrieval/agent.py → analyze_query() │
│ │
│ Input: "How does the authentication system relate to │
│ compliance requirements?" │
│ Output: complexity="complex", needs_decomposition=true │
│ │
│ SSE event: query_analysis │
└──────────────────────────┬──────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ 2. CACHE CHECK │
│ forge/cache/semantic_cache.py │
│ │
│ Embed query → search Redis for semantically similar │
│ previous queries (threshold: 0.95 cosine similarity) │
│ Hit → return cached response immediately │
│ Miss → proceed to agent loop │
│ │
│ SSE event: cache_hit or cache_miss │
└──────────────────────────┬──────────────────────────────────────┘
│ cache miss
▼
┌─────────────────────────────────────────────────────────────────┐
│ 3. AGENT LOOP (LangGraph StateGraph) │
│ forge/retrieval/agent.py → ForgeAgent.run() │
│ │
│ ┌─────────────────────────────────────────────────────────────┐│
│ │ Iteration 1: PLAN ││
│ │ Agent decides: "Complex query → decompose first" ││
│ │ Tool selected: decompose_query ││
│ │ ││
│ │ EXECUTE: decompose_query(query) ││
│ │ Result: ││
│ │ Sub-query 1: "What authentication system is described?" ││
│ │ Sub-query 2: "What are the compliance requirements?" ││
│ │ Sub-query 3: "How do they connect?" ││
│ │ ││
│ │ SSE events: tool_call, tool_result ││
│ └─────────────────────────────────────────────────────────────┘│
│ ┌─────────────────────────────────────────────────────────────┐│
│ │ Iteration 2: PLAN → semantic_search ││
│ │ ││
│ │ EXECUTE: semantic_search("authentication system") ││
│ │ Result: 8 chunks retrieved ││
│ │ ││
│ │ CRAG GATE: ││
│ │ 4 CORRECT (scores: 0.87, 0.82, 0.79, 0.73) ││
│ │ 2 AMBIGUOUS (scores: 0.58, 0.45) ││
│ │ 2 INCORRECT (scores: 0.28, 0.19) ││
│ │ → Expand ambiguous to parent sections ││
│ │ → 1 additional CORRECT after expansion ││
│ │ ││
│ │ SSE events: retrieval_start, retrieval_result, ││
│ │ crag_evaluation ││
│ └─────────────────────────────────────────────────────────────┘│
│ ┌─────────────────────────────────────────────────────────────┐│
│ │ Iteration 3: PLAN → semantic_search ││
│ │ ││
│ │ EXECUTE: semantic_search("compliance requirements") ││
│ │ Result: 6 chunks, 5 CORRECT after CRAG ││
│ └─────────────────────────────────────────────────────────────┘│
│ ┌─────────────────────────────────────────────────────────────┐│
│ │ Iteration 4: PLAN → graph_traverse ││
│ │ ││
│ │ EXECUTE: graph_traverse("authentication", max_hops=2) ││
│ │ Result: ││
│ │ authentication → RELATED_TO → compliance_framework ││
│ │ authentication → PART_OF → security_architecture ││
│ │ compliance_framework → REFERENCES → NIST_800-53 ││
│ │ ││
│ │ SSE events: tool_call, tool_result ││
│ └─────────────────────────────────────────────────────────────┘│
│ ┌─────────────────────────────────────────────────────────────┐│
│ │ Iteration 5: PLAN → rerank_colbert ││
│ │ ││
│ │ EXECUTE: rerank_colbert(query, all_chunks, top_k=8) ││
│ │ Result: top 8 chunks, reranked by token-level precision ││
│ │ ││
│ │ SSE event: rerank_result ││
│ └─────────────────────────────────────────────────────────────┘│
│ ┌─────────────────────────────────────────────────────────────┐│
│ │ Iteration 6: PLAN → generate_answer ││
│ │ Agent reasoning: "Sufficient evidence from 3 sub-queries ││
│ │ + graph context. Generating answer." ││
│ │ ││
│ │ EXECUTE: generate_answer(query, top_8_chunks) ││
│ │ LLM generates with context, tokens streamed via SSE ││
│ │ ││
│ │ SSE events: generation_start, token (many), generation_end ││
│ └─────────────────────────────────────────────────────────────┘│
└──────────────────────────┬──────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ 4. SELF-VERIFICATION │
│ forge/verification/verifier.py │
│ │
│ Extract claims from answer → verify each against sources │
│ 6 claims checked, 6 supported → confidence: 0.92 │
│ │
│ SSE event: verification │
└──────────────────────────┬──────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ 5. RESPONSE COMPLETION │
│ forge/api/streaming.py │
│ │
│ Send final SSE events: sources, done │
│ Cache response in Redis (for future semantic cache hits) │
│ │
│ SSE event: sources, done │
└─────────────────────────────────────────────────────────────────┘Direct Mode
Direct mode follows a fixed, single-pass pipeline. No agent, no iteration, no tool selection:
User Query
│
▼
Cache Check → hit? return cached
│ miss
▼
BGE-M3 Encode Query (~50ms)
│
├──▶ Dense Search (Qdrant, top 20)
├──▶ Sparse Search (Qdrant, top 20)
│
▼
RRF Fusion → combined top 20
│
▼
CRAG Quality Gate → filter to CORRECT only
│
▼
ColBERT Reranking → top 5
│
▼
LLM Generation (streaming tokens)
│
▼
Self-Verification → claim check
│
▼
SSE done eventLatency: 2-5 seconds (vs. 5-15 seconds for agentic mode).
Trade-off: Direct mode cannot do multi-hop reasoning, adaptive retrieval, or graph traversal. It works well for simple factual queries but struggles with complex questions.
LangGraph State Machine
The agent is implemented as a LangGraph StateGraph:
# forge/retrieval/agent.py
from langgraph.graph import StateGraph, END
class ForgeAgent:
def build_graph(self) -> StateGraph:
graph = StateGraph(ForgeAgentState)
# Add nodes
graph.add_node("analyze", self.analyze_query)
graph.add_node("plan", self.plan_next_action)
graph.add_node("execute", self.execute_tool)
graph.add_node("evaluate", self.evaluate_evidence)
graph.add_node("generate", self.generate_answer)
graph.add_node("verify", self.verify_answer)
# Add edges
graph.set_entry_point("analyze")
graph.add_edge("analyze", "plan")
graph.add_conditional_edges(
"plan",
self.should_execute_or_generate,
{
"execute": "execute",
"generate": "generate",
},
)
graph.add_edge("execute", "evaluate")
graph.add_conditional_edges(
"evaluate",
self.is_evidence_sufficient,
{
"plan": "plan", # Need more evidence
"generate": "generate", # Sufficient
},
)
graph.add_edge("generate", "verify")
graph.add_conditional_edges(
"verify",
self.is_verification_passed,
{
"plan": "plan", # Retry with different strategy
"end": END, # Done
},
)
return graph.compile()Evidence Sufficiency Check
The agent evaluates whether it has enough evidence to generate a reliable answer:
async def is_evidence_sufficient(self, state: ForgeAgentState) -> str:
"""Determine if gathered evidence is sufficient for generation."""
correct_chunks = [
r for r in state["crag_results"]
if r.classification == "CORRECT"
]
# Heuristics for sufficiency:
if len(correct_chunks) >= 3 and state["iteration"] >= 2:
return "generate"
if state["iteration"] >= state["max_iterations"]:
# Max iterations reached, generate with what we have
return "generate"
# Check if we've covered all sub-queries
if state.get("sub_queries"):
covered = self._check_sub_query_coverage(state)
if covered:
return "generate"
return "plan" # Need more evidenceStreaming Architecture
Both modes produce real-time SSE events. The streaming layer wraps the query engine:
# forge/api/streaming.py
class StreamingQueryHandler:
async def stream_query(
self,
query: str,
mode: str,
) -> AsyncGenerator[str, None]:
"""Run a query and yield SSE events."""
# Query analysis
analysis = await self.analyzer.analyze(query)
yield self._sse_event("query_analysis", analysis)
# Cache check
cached = await self.cache.get(query)
if cached:
yield self._sse_event("cache_hit", cached)
for token in cached.answer:
yield self._sse_event("token", {"content": token})
yield self._sse_event("done", cached.metadata)
return
yield self._sse_event("cache_miss", {})
if mode == "agentic":
async for event in self.agent.run_stream(query):
yield self._sse_event(event.type, event.data)
else:
async for event in self.direct_pipeline.run_stream(query):
yield self._sse_event(event.type, event.data)
def _sse_event(self, event_type: str, data: dict) -> str:
return f"event: {event_type}\ndata: {json.dumps(data)}\n\n"# forge/api/router.py
@router.post("/api/query/stream")
async def stream_query(request: QueryRequest):
handler = StreamingQueryHandler(config)
return StreamingResponse(
handler.stream_query(request.query, request.mode),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)Comparison: Agentic vs. Direct
| Aspect | Agentic | Direct |
|---|---|---|
| Latency | 5-15s | 2-5s |
| Multi-hop | Yes (decomposes + iterates) | No |
| Adaptive | Yes (chooses tools per query) | No (fixed pipeline) |
| Graph | Yes (graph_traverse tool) | No |
| Error recovery | Yes (retries with different strategy) | No |
| Best for | Complex, multi-hop, ambiguous | Simple factual, speed-critical |
| CRAG | Per-iteration | Once |
| ColBERT | Agent decides when | Always (on CRAG results) |
| Verification | Yes (can trigger retry) | Yes (report only) |
The Tauri frontend exposes a toggle for agentic vs. direct mode. For most users, agentic mode is the right default — it handles simple queries quickly (agent converges in 2 iterations) and shines on complex ones.
Next Steps
- Streaming Protocol — All 11 SSE event types with schemas
- API Reference — Full endpoint documentation
- Agentic RAG — Deep dive into the agent architecture