Streaming Protocol
Forge uses Server-Sent Events (SSE) to stream query results in real time. The frontend receives structured events as the agent works — from initial analysis through retrieval, generation, and verification. This page documents all 11 event types.
Event Types
1. query_analysis
Emitted at the start of every query. Contains the analysis of query complexity.
{
"type": "query_analysis",
"query": "How does authentication relate to compliance?",
"complexity": "complex",
"decomposed": false,
"estimated_mode": "agentic"
}| Field | Type | Description |
|---|---|---|
query | string | The original query text |
complexity | string | "simple" | "moderate" | "complex" |
decomposed | boolean | Whether the query will be decomposed into sub-queries |
estimated_mode | string | "agentic" | "direct" |
2. cache_hit / cache_miss
Emitted after the semantic cache check.
// Cache hit
{
"type": "cache_hit",
"similarity": 0.97,
"original_query": "What is the auth system?",
"cached_at": "2024-12-15T10:30:00Z"
}
// Cache miss
{
"type": "cache_miss"
}3. retrieval_start
Emitted when a retrieval tool begins executing.
{
"type": "retrieval_start",
"tool": "semantic_search",
"query": "authentication system architecture",
"iteration": 2
}| Field | Type | Description |
|---|---|---|
tool | string | Tool name: semantic_search, proposition_search, graph_traverse, hyde_search |
query | string | The search query (may differ from original if decomposed) |
iteration | number | Current agent iteration |
4. retrieval_result
Emitted when a retrieval tool completes.
{
"type": "retrieval_result",
"tool": "semantic_search",
"chunks_found": 8,
"top_score": 0.847,
"levels": { "L1": 1, "L2": 6, "L3": 1 },
"duration_ms": 62
}5. tool_call
Emitted for non-retrieval tool calls (decompose, rerank, graph).
{
"type": "tool_call",
"tool": "decompose_query",
"input": { "query": "How does auth relate to compliance?" },
"iteration": 1
}6. tool_result
Result of a non-retrieval tool call.
{
"type": "tool_result",
"tool": "decompose_query",
"result": {
"sub_queries": [
"What authentication system is described?",
"What are the compliance requirements?",
"How do authentication and compliance connect?"
]
},
"duration_ms": 320
}7. crag_evaluation
Emitted after CRAG quality gate evaluates retrieved documents.
{
"type": "crag_evaluation",
"correct": 5,
"ambiguous": 2,
"incorrect": 1,
"action": "proceed",
"expanded": 1,
"details": [
{ "chunk_id": "c_1a2b", "score": 0.87, "classification": "CORRECT" },
{ "chunk_id": "c_3c4d", "score": 0.82, "classification": "CORRECT" },
{ "chunk_id": "c_5e6f", "score": 0.62, "classification": "AMBIGUOUS", "expanded": true, "new_score": 0.78 },
{ "chunk_id": "c_7g8h", "score": 0.22, "classification": "INCORRECT" }
]
}| Field | Type | Description |
|---|---|---|
correct | number | Documents classified as CORRECT |
ambiguous | number | Documents classified as AMBIGUOUS |
incorrect | number | Documents classified as INCORRECT |
action | string | "proceed" | "re-retrieve" | "expand" |
expanded | number | How many ambiguous docs were expanded to parent |
8. rerank_result
Emitted after ColBERT reranking.
{
"type": "rerank_result",
"method": "colbert",
"input_count": 12,
"output_count": 5,
"top_score": 4.23,
"duration_ms": 85
}9. generation_start
Emitted when the LLM begins generating the answer.
{
"type": "generation_start",
"context_chunks": 5,
"total_context_tokens": 3420,
"model": "mistral-7b-instruct-v0.2.Q4_K_M"
}10. token
Individual tokens streamed during generation. These arrive rapidly (60-80+ per second on GPU).
{
"type": "token",
"content": "The authentication"
}{
"type": "token",
"content": " system described in Section 4"
}Tokens may arrive individually or in small batches (2-5 tokens per event) depending on llama.cpp’s output buffer. The frontend should append content strings directly to the display buffer.
11. verification
Emitted after self-verification completes.
{
"type": "verification",
"claims_checked": 5,
"claims_supported": 4,
"claims_partially_supported": 1,
"claims_unsupported": 0,
"confidence": 0.90,
"details": [
{ "claim": "The authentication uses OAuth 2.0.", "verdict": "SUPPORTED" },
{ "claim": "Section 4 describes MFA requirements.", "verdict": "SUPPORTED" },
{ "claim": "Compliance framework references NIST 800-53.", "verdict": "SUPPORTED" },
{ "claim": "All three control families are addressed.", "verdict": "PARTIALLY_SUPPORTED" },
{ "claim": "The system was certified in Q2 2024.", "verdict": "SUPPORTED" }
]
}12. sources
Emitted with the source documents used for the answer.
{
"type": "sources",
"sources": [
{
"chunk_id": "c_1a2b",
"document_id": "doc_xyz",
"text": "Section 4.2: The authentication system implements OAuth 2.0...",
"level": "L2",
"score": 0.87,
"page_numbers": [15, 16],
"heading": "4.2 Authentication Architecture"
},
{
"chunk_id": "c_3c4d",
"document_id": "doc_xyz",
"text": "The compliance framework requires all access control...",
"level": "L1",
"score": 0.82,
"page_numbers": [28],
"heading": "7.1 Compliance Requirements"
}
]
}13. done
Final event for every query.
{
"type": "done",
"total_time_ms": 7240,
"tokens_generated": 312,
"iterations": 6,
"tools_used": ["decompose_query", "semantic_search", "graph_traverse", "rerank_colbert", "generate_answer"],
"sources_count": 5,
"confidence": 0.90,
"mode": "agentic",
"cached": false
}14. error
Emitted if something goes wrong.
{
"type": "error",
"message": "LLM inference failed: CUDA out of memory",
"code": "LLM_ERROR",
"recoverable": false
}Event Ordering
Agentic Query (typical)
query_analysis
cache_miss
tool_call (decompose_query) ─┐
tool_result (decompose_query) │ Iteration 1
retrieval_start (semantic_search) ─┐
retrieval_result (semantic_search) │ Iteration 2
crag_evaluation ─┘
retrieval_start (semantic_search) ─┐
retrieval_result (semantic_search) │ Iteration 3
crag_evaluation ─┘
tool_call (graph_traverse) ─┐
tool_result (graph_traverse) │ Iteration 4
rerank_result ─┘ Iteration 5
generation_start ─┐
token (many) │ Generation
token (many) │
generation_start ─┘
verification
sources
doneDirect Query
query_analysis
cache_miss
retrieval_start (hybrid_search)
retrieval_result
crag_evaluation
rerank_result
generation_start
token (many)
verification
sources
doneCached Query
query_analysis
cache_hit
token (many) ← from cache
sources
doneConsuming the Stream
curl
curl -N http://localhost:8000/api/query/stream \
-H "Content-Type: application/json" \
-d '{"query": "What are the main findings?", "mode": "agentic"}'The -N flag disables curl’s output buffering, so events appear in real time.
JavaScript (Frontend)
// frontend/src/lib/sse.ts
export async function streamQuery(
query: string,
mode: "agentic" | "direct",
onEvent: (event: ForgeSSEEvent) => void,
): Promise<void> {
const response = await fetch("/api/query/stream", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ query, mode }),
});
const reader = response.body!.getReader();
const decoder = new TextDecoder();
let buffer = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split("\n");
buffer = lines.pop() || "";
let currentEvent = "";
for (const line of lines) {
if (line.startsWith("event: ")) {
currentEvent = line.slice(7);
} else if (line.startsWith("data: ") && currentEvent) {
const data = JSON.parse(line.slice(6));
onEvent({ type: currentEvent, ...data });
currentEvent = "";
}
}
}
}Python
import httpx
import json
async def stream_query(query: str, mode: str = "agentic"):
async with httpx.AsyncClient() as client:
async with client.stream(
"POST",
"http://localhost:8000/api/query/stream",
json={"query": query, "mode": mode},
) as response:
event_type = None
async for line in response.aiter_lines():
if line.startswith("event: "):
event_type = line[7:]
elif line.startswith("data: ") and event_type:
data = json.loads(line[6:])
print(f"[{event_type}] {data}")
event_type = NoneError Handling
The frontend should handle these error scenarios:
| Scenario | Event | Frontend Action |
|---|---|---|
| LLM OOM | error with code LLM_ERROR | Show error message, suggest reducing context size |
| Qdrant unavailable | error with code VECTOR_DB_ERROR | Show connection error |
| Timeout (30s) | error with code TIMEOUT | Show partial results if available |
| No documents indexed | error with code NO_DOCUMENTS | Prompt user to upload |
| Stream disconnect | Network error | Attempt reconnect, show last known state |
Next Steps
- API Reference — Full endpoint documentation with request/response schemas
- Query Pipeline — How the agent orchestrates the flow