ArchitectureQuery Pipeline

Query Pipeline

Forge supports two query modes: agentic (LangGraph agent autonomously selects tools and iterates) and direct (fixed pipeline, single pass). Both produce streaming SSE responses.

Agentic Mode (Default)

The full agentic flow for a query like “How does the authentication system relate to compliance requirements?”:

User Query


┌─────────────────────────────────────────────────────────────────┐
│  1. QUERY ANALYSIS                                              │
│     forge/retrieval/agent.py → analyze_query()                  │
│                                                                 │
│     Input:  "How does the authentication system relate to       │
│              compliance requirements?"                          │
│     Output: complexity="complex", needs_decomposition=true      │
│                                                                 │
│     SSE event: query_analysis                                   │
└──────────────────────────┬──────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────────┐
│  2. CACHE CHECK                                                 │
│     forge/cache/semantic_cache.py                               │
│                                                                 │
│     Embed query → search Redis for semantically similar         │
│     previous queries (threshold: 0.95 cosine similarity)        │
│     Hit → return cached response immediately                    │
│     Miss → proceed to agent loop                                │
│                                                                 │
│     SSE event: cache_hit or cache_miss                          │
└──────────────────────────┬──────────────────────────────────────┘
                           │ cache miss

┌─────────────────────────────────────────────────────────────────┐
│  3. AGENT LOOP (LangGraph StateGraph)                           │
│     forge/retrieval/agent.py → ForgeAgent.run()                 │
│                                                                 │
│  ┌─────────────────────────────────────────────────────────────┐│
│  │  Iteration 1: PLAN                                          ││
│  │  Agent decides: "Complex query → decompose first"           ││
│  │  Tool selected: decompose_query                             ││
│  │                                                             ││
│  │  EXECUTE: decompose_query(query)                            ││
│  │  Result:                                                    ││
│  │    Sub-query 1: "What authentication system is described?"  ││
│  │    Sub-query 2: "What are the compliance requirements?"     ││
│  │    Sub-query 3: "How do they connect?"                      ││
│  │                                                             ││
│  │  SSE events: tool_call, tool_result                         ││
│  └─────────────────────────────────────────────────────────────┘│
│  ┌─────────────────────────────────────────────────────────────┐│
│  │  Iteration 2: PLAN → semantic_search                        ││
│  │                                                             ││
│  │  EXECUTE: semantic_search("authentication system")          ││
│  │  Result: 8 chunks retrieved                                 ││
│  │                                                             ││
│  │  CRAG GATE:                                                 ││
│  │    4 CORRECT (scores: 0.87, 0.82, 0.79, 0.73)             ││
│  │    2 AMBIGUOUS (scores: 0.58, 0.45)                        ││
│  │    2 INCORRECT (scores: 0.28, 0.19)                        ││
│  │    → Expand ambiguous to parent sections                    ││
│  │    → 1 additional CORRECT after expansion                   ││
│  │                                                             ││
│  │  SSE events: retrieval_start, retrieval_result,             ││
│  │              crag_evaluation                                ││
│  └─────────────────────────────────────────────────────────────┘│
│  ┌─────────────────────────────────────────────────────────────┐│
│  │  Iteration 3: PLAN → semantic_search                        ││
│  │                                                             ││
│  │  EXECUTE: semantic_search("compliance requirements")        ││
│  │  Result: 6 chunks, 5 CORRECT after CRAG                    ││
│  └─────────────────────────────────────────────────────────────┘│
│  ┌─────────────────────────────────────────────────────────────┐│
│  │  Iteration 4: PLAN → graph_traverse                         ││
│  │                                                             ││
│  │  EXECUTE: graph_traverse("authentication", max_hops=2)     ││
│  │  Result:                                                    ││
│  │    authentication → RELATED_TO → compliance_framework       ││
│  │    authentication → PART_OF → security_architecture         ││
│  │    compliance_framework → REFERENCES → NIST_800-53          ││
│  │                                                             ││
│  │  SSE events: tool_call, tool_result                         ││
│  └─────────────────────────────────────────────────────────────┘│
│  ┌─────────────────────────────────────────────────────────────┐│
│  │  Iteration 5: PLAN → rerank_colbert                         ││
│  │                                                             ││
│  │  EXECUTE: rerank_colbert(query, all_chunks, top_k=8)       ││
│  │  Result: top 8 chunks, reranked by token-level precision    ││
│  │                                                             ││
│  │  SSE event: rerank_result                                   ││
│  └─────────────────────────────────────────────────────────────┘│
│  ┌─────────────────────────────────────────────────────────────┐│
│  │  Iteration 6: PLAN → generate_answer                        ││
│  │  Agent reasoning: "Sufficient evidence from 3 sub-queries   ││
│  │  + graph context. Generating answer."                       ││
│  │                                                             ││
│  │  EXECUTE: generate_answer(query, top_8_chunks)              ││
│  │  LLM generates with context, tokens streamed via SSE        ││
│  │                                                             ││
│  │  SSE events: generation_start, token (many), generation_end ││
│  └─────────────────────────────────────────────────────────────┘│
└──────────────────────────┬──────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────────┐
│  4. SELF-VERIFICATION                                           │
│     forge/verification/verifier.py                              │
│                                                                 │
│     Extract claims from answer → verify each against sources    │
│     6 claims checked, 6 supported → confidence: 0.92            │
│                                                                 │
│     SSE event: verification                                     │
└──────────────────────────┬──────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────────┐
│  5. RESPONSE COMPLETION                                         │
│     forge/api/streaming.py                                      │
│                                                                 │
│     Send final SSE events: sources, done                        │
│     Cache response in Redis (for future semantic cache hits)    │
│                                                                 │
│     SSE event: sources, done                                    │
└─────────────────────────────────────────────────────────────────┘

Direct Mode

Direct mode follows a fixed, single-pass pipeline. No agent, no iteration, no tool selection:

User Query


Cache Check → hit? return cached
    │ miss

BGE-M3 Encode Query (~50ms)

    ├──▶ Dense Search (Qdrant, top 20)
    ├──▶ Sparse Search (Qdrant, top 20)


RRF Fusion → combined top 20


CRAG Quality Gate → filter to CORRECT only


ColBERT Reranking → top 5


LLM Generation (streaming tokens)


Self-Verification → claim check


SSE done event

Latency: 2-5 seconds (vs. 5-15 seconds for agentic mode).

Trade-off: Direct mode cannot do multi-hop reasoning, adaptive retrieval, or graph traversal. It works well for simple factual queries but struggles with complex questions.

LangGraph State Machine

The agent is implemented as a LangGraph StateGraph:

# forge/retrieval/agent.py
from langgraph.graph import StateGraph, END
 
class ForgeAgent:
    def build_graph(self) -> StateGraph:
        graph = StateGraph(ForgeAgentState)
 
        # Add nodes
        graph.add_node("analyze", self.analyze_query)
        graph.add_node("plan", self.plan_next_action)
        graph.add_node("execute", self.execute_tool)
        graph.add_node("evaluate", self.evaluate_evidence)
        graph.add_node("generate", self.generate_answer)
        graph.add_node("verify", self.verify_answer)
 
        # Add edges
        graph.set_entry_point("analyze")
        graph.add_edge("analyze", "plan")
        graph.add_conditional_edges(
            "plan",
            self.should_execute_or_generate,
            {
                "execute": "execute",
                "generate": "generate",
            },
        )
        graph.add_edge("execute", "evaluate")
        graph.add_conditional_edges(
            "evaluate",
            self.is_evidence_sufficient,
            {
                "plan": "plan",      # Need more evidence
                "generate": "generate",  # Sufficient
            },
        )
        graph.add_edge("generate", "verify")
        graph.add_conditional_edges(
            "verify",
            self.is_verification_passed,
            {
                "plan": "plan",  # Retry with different strategy
                "end": END,      # Done
            },
        )
 
        return graph.compile()

Evidence Sufficiency Check

The agent evaluates whether it has enough evidence to generate a reliable answer:

async def is_evidence_sufficient(self, state: ForgeAgentState) -> str:
    """Determine if gathered evidence is sufficient for generation."""
    correct_chunks = [
        r for r in state["crag_results"]
        if r.classification == "CORRECT"
    ]
 
    # Heuristics for sufficiency:
    if len(correct_chunks) >= 3 and state["iteration"] >= 2:
        return "generate"
 
    if state["iteration"] >= state["max_iterations"]:
        # Max iterations reached, generate with what we have
        return "generate"
 
    # Check if we've covered all sub-queries
    if state.get("sub_queries"):
        covered = self._check_sub_query_coverage(state)
        if covered:
            return "generate"
 
    return "plan"  # Need more evidence

Streaming Architecture

Both modes produce real-time SSE events. The streaming layer wraps the query engine:

# forge/api/streaming.py
class StreamingQueryHandler:
    async def stream_query(
        self,
        query: str,
        mode: str,
    ) -> AsyncGenerator[str, None]:
        """Run a query and yield SSE events."""
 
        # Query analysis
        analysis = await self.analyzer.analyze(query)
        yield self._sse_event("query_analysis", analysis)
 
        # Cache check
        cached = await self.cache.get(query)
        if cached:
            yield self._sse_event("cache_hit", cached)
            for token in cached.answer:
                yield self._sse_event("token", {"content": token})
            yield self._sse_event("done", cached.metadata)
            return
 
        yield self._sse_event("cache_miss", {})
 
        if mode == "agentic":
            async for event in self.agent.run_stream(query):
                yield self._sse_event(event.type, event.data)
        else:
            async for event in self.direct_pipeline.run_stream(query):
                yield self._sse_event(event.type, event.data)
 
    def _sse_event(self, event_type: str, data: dict) -> str:
        return f"event: {event_type}\ndata: {json.dumps(data)}\n\n"
# forge/api/router.py
@router.post("/api/query/stream")
async def stream_query(request: QueryRequest):
    handler = StreamingQueryHandler(config)
    return StreamingResponse(
        handler.stream_query(request.query, request.mode),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",
        },
    )

Comparison: Agentic vs. Direct

AspectAgenticDirect
Latency5-15s2-5s
Multi-hopYes (decomposes + iterates)No
AdaptiveYes (chooses tools per query)No (fixed pipeline)
GraphYes (graph_traverse tool)No
Error recoveryYes (retries with different strategy)No
Best forComplex, multi-hop, ambiguousSimple factual, speed-critical
CRAGPer-iterationOnce
ColBERTAgent decides whenAlways (on CRAG results)
VerificationYes (can trigger retry)Yes (report only)
Let the user choose

The Tauri frontend exposes a toggle for agentic vs. direct mode. For most users, agentic mode is the right default — it handles simple queries quickly (agent converges in 2 iterations) and shines on complex ones.

Next Steps