ArchitectureStreaming Protocol

Streaming Protocol

Forge uses Server-Sent Events (SSE) to stream query results in real time. The frontend receives structured events as the agent works — from initial analysis through retrieval, generation, and verification. This page documents all 11 event types.

Event Types

1. query_analysis

Emitted at the start of every query. Contains the analysis of query complexity.

{
  "type": "query_analysis",
  "query": "How does authentication relate to compliance?",
  "complexity": "complex",
  "decomposed": false,
  "estimated_mode": "agentic"
}
FieldTypeDescription
querystringThe original query text
complexitystring"simple" | "moderate" | "complex"
decomposedbooleanWhether the query will be decomposed into sub-queries
estimated_modestring"agentic" | "direct"

2. cache_hit / cache_miss

Emitted after the semantic cache check.

// Cache hit
{
  "type": "cache_hit",
  "similarity": 0.97,
  "original_query": "What is the auth system?",
  "cached_at": "2024-12-15T10:30:00Z"
}
 
// Cache miss
{
  "type": "cache_miss"
}

3. retrieval_start

Emitted when a retrieval tool begins executing.

{
  "type": "retrieval_start",
  "tool": "semantic_search",
  "query": "authentication system architecture",
  "iteration": 2
}
FieldTypeDescription
toolstringTool name: semantic_search, proposition_search, graph_traverse, hyde_search
querystringThe search query (may differ from original if decomposed)
iterationnumberCurrent agent iteration

4. retrieval_result

Emitted when a retrieval tool completes.

{
  "type": "retrieval_result",
  "tool": "semantic_search",
  "chunks_found": 8,
  "top_score": 0.847,
  "levels": { "L1": 1, "L2": 6, "L3": 1 },
  "duration_ms": 62
}

5. tool_call

Emitted for non-retrieval tool calls (decompose, rerank, graph).

{
  "type": "tool_call",
  "tool": "decompose_query",
  "input": { "query": "How does auth relate to compliance?" },
  "iteration": 1
}

6. tool_result

Result of a non-retrieval tool call.

{
  "type": "tool_result",
  "tool": "decompose_query",
  "result": {
    "sub_queries": [
      "What authentication system is described?",
      "What are the compliance requirements?",
      "How do authentication and compliance connect?"
    ]
  },
  "duration_ms": 320
}

7. crag_evaluation

Emitted after CRAG quality gate evaluates retrieved documents.

{
  "type": "crag_evaluation",
  "correct": 5,
  "ambiguous": 2,
  "incorrect": 1,
  "action": "proceed",
  "expanded": 1,
  "details": [
    { "chunk_id": "c_1a2b", "score": 0.87, "classification": "CORRECT" },
    { "chunk_id": "c_3c4d", "score": 0.82, "classification": "CORRECT" },
    { "chunk_id": "c_5e6f", "score": 0.62, "classification": "AMBIGUOUS", "expanded": true, "new_score": 0.78 },
    { "chunk_id": "c_7g8h", "score": 0.22, "classification": "INCORRECT" }
  ]
}
FieldTypeDescription
correctnumberDocuments classified as CORRECT
ambiguousnumberDocuments classified as AMBIGUOUS
incorrectnumberDocuments classified as INCORRECT
actionstring"proceed" | "re-retrieve" | "expand"
expandednumberHow many ambiguous docs were expanded to parent

8. rerank_result

Emitted after ColBERT reranking.

{
  "type": "rerank_result",
  "method": "colbert",
  "input_count": 12,
  "output_count": 5,
  "top_score": 4.23,
  "duration_ms": 85
}

9. generation_start

Emitted when the LLM begins generating the answer.

{
  "type": "generation_start",
  "context_chunks": 5,
  "total_context_tokens": 3420,
  "model": "mistral-7b-instruct-v0.2.Q4_K_M"
}

10. token

Individual tokens streamed during generation. These arrive rapidly (60-80+ per second on GPU).

{
  "type": "token",
  "content": "The authentication"
}
{
  "type": "token",
  "content": " system described in Section 4"
}
Token batching

Tokens may arrive individually or in small batches (2-5 tokens per event) depending on llama.cpp’s output buffer. The frontend should append content strings directly to the display buffer.

11. verification

Emitted after self-verification completes.

{
  "type": "verification",
  "claims_checked": 5,
  "claims_supported": 4,
  "claims_partially_supported": 1,
  "claims_unsupported": 0,
  "confidence": 0.90,
  "details": [
    { "claim": "The authentication uses OAuth 2.0.", "verdict": "SUPPORTED" },
    { "claim": "Section 4 describes MFA requirements.", "verdict": "SUPPORTED" },
    { "claim": "Compliance framework references NIST 800-53.", "verdict": "SUPPORTED" },
    { "claim": "All three control families are addressed.", "verdict": "PARTIALLY_SUPPORTED" },
    { "claim": "The system was certified in Q2 2024.", "verdict": "SUPPORTED" }
  ]
}

12. sources

Emitted with the source documents used for the answer.

{
  "type": "sources",
  "sources": [
    {
      "chunk_id": "c_1a2b",
      "document_id": "doc_xyz",
      "text": "Section 4.2: The authentication system implements OAuth 2.0...",
      "level": "L2",
      "score": 0.87,
      "page_numbers": [15, 16],
      "heading": "4.2 Authentication Architecture"
    },
    {
      "chunk_id": "c_3c4d",
      "document_id": "doc_xyz",
      "text": "The compliance framework requires all access control...",
      "level": "L1",
      "score": 0.82,
      "page_numbers": [28],
      "heading": "7.1 Compliance Requirements"
    }
  ]
}

13. done

Final event for every query.

{
  "type": "done",
  "total_time_ms": 7240,
  "tokens_generated": 312,
  "iterations": 6,
  "tools_used": ["decompose_query", "semantic_search", "graph_traverse", "rerank_colbert", "generate_answer"],
  "sources_count": 5,
  "confidence": 0.90,
  "mode": "agentic",
  "cached": false
}

14. error

Emitted if something goes wrong.

{
  "type": "error",
  "message": "LLM inference failed: CUDA out of memory",
  "code": "LLM_ERROR",
  "recoverable": false
}

Event Ordering

Agentic Query (typical)

query_analysis
cache_miss
tool_call (decompose_query)           ─┐
tool_result (decompose_query)          │ Iteration 1
retrieval_start (semantic_search)      ─┐
retrieval_result (semantic_search)      │ Iteration 2
crag_evaluation                        ─┘
retrieval_start (semantic_search)      ─┐
retrieval_result (semantic_search)      │ Iteration 3
crag_evaluation                        ─┘
tool_call (graph_traverse)             ─┐
tool_result (graph_traverse)            │ Iteration 4
rerank_result                          ─┘ Iteration 5
generation_start                       ─┐
token (many)                            │ Generation
token (many)                            │
generation_start                       ─┘
verification
sources
done

Direct Query

query_analysis
cache_miss
retrieval_start (hybrid_search)
retrieval_result
crag_evaluation
rerank_result
generation_start
token (many)
verification
sources
done

Cached Query

query_analysis
cache_hit
token (many)     ← from cache
sources
done

Consuming the Stream

curl

curl -N http://localhost:8000/api/query/stream \
  -H "Content-Type: application/json" \
  -d '{"query": "What are the main findings?", "mode": "agentic"}'

The -N flag disables curl’s output buffering, so events appear in real time.

JavaScript (Frontend)

// frontend/src/lib/sse.ts
export async function streamQuery(
  query: string,
  mode: "agentic" | "direct",
  onEvent: (event: ForgeSSEEvent) => void,
): Promise<void> {
  const response = await fetch("/api/query/stream", {
    method: "POST",
    headers: { "Content-Type": "application/json" },
    body: JSON.stringify({ query, mode }),
  });
 
  const reader = response.body!.getReader();
  const decoder = new TextDecoder();
  let buffer = "";
 
  while (true) {
    const { done, value } = await reader.read();
    if (done) break;
 
    buffer += decoder.decode(value, { stream: true });
    const lines = buffer.split("\n");
    buffer = lines.pop() || "";
 
    let currentEvent = "";
    for (const line of lines) {
      if (line.startsWith("event: ")) {
        currentEvent = line.slice(7);
      } else if (line.startsWith("data: ") && currentEvent) {
        const data = JSON.parse(line.slice(6));
        onEvent({ type: currentEvent, ...data });
        currentEvent = "";
      }
    }
  }
}

Python

import httpx
import json
 
async def stream_query(query: str, mode: str = "agentic"):
    async with httpx.AsyncClient() as client:
        async with client.stream(
            "POST",
            "http://localhost:8000/api/query/stream",
            json={"query": query, "mode": mode},
        ) as response:
            event_type = None
            async for line in response.aiter_lines():
                if line.startswith("event: "):
                    event_type = line[7:]
                elif line.startswith("data: ") and event_type:
                    data = json.loads(line[6:])
                    print(f"[{event_type}] {data}")
                    event_type = None

Error Handling

The frontend should handle these error scenarios:

ScenarioEventFrontend Action
LLM OOMerror with code LLM_ERRORShow error message, suggest reducing context size
Qdrant unavailableerror with code VECTOR_DB_ERRORShow connection error
Timeout (30s)error with code TIMEOUTShow partial results if available
No documents indexederror with code NO_DOCUMENTSPrompt user to upload
Stream disconnectNetwork errorAttempt reconnect, show last known state

Next Steps