Ingestion Pipeline
When a document is uploaded to Forge, it passes through a multi-stage pipeline that transforms raw text into a richly indexed, multi-level, graph-connected, tri-modal vector representation. This page traces the complete data flow.
Pipeline Overview
Document (PDF/DOCX/TXT)
│
▼
┌──────────────────────────────────────────────────────────────┐
│ Stage 1: PARSE │
│ forge/ingestion/parser.py │
│ PDF → PyMuPDF | DOCX → python-docx | TXT → utf-8 read │
│ Output: raw text + metadata (title, pages, headings) │
└──────────────────────┬───────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────┐
│ Stage 2: HIERARCHY CONSTRUCTION │
│ forge/ingestion/hierarchy.py │
│ │
│ ┌──────────────┐ │
│ │ L0: Document │ LLM generates document-level summary │
│ │ Summary │ (300-500 tokens) │
│ └──────┬───────┘ │
│ │ │
│ ┌──────▼───────┐ │
│ │ L1: Section │ Split at headings, LLM summarizes each │
│ │ Summaries │ (200-300 tokens per section) │
│ └──────┬───────┘ │
│ │ │
│ ┌──────▼───────┐ │
│ │ L2: Semantic │ Semantic chunking within sections │
│ │ Chunks │ (~512 tokens, topic-boundary aware) │
│ └──────────────┘ │
└──────────────────────┬───────────────────────────────────────┘
│
┌───────────┼───────────────────┐
│ │ │
▼ ▼ ▼
┌────────────────┐ ┌───────────────┐ ┌──────────────────┐
│ Stage 3: │ │ Stage 4: │ │ Stage 5: │
│ CONTEXTUAL │ │ PROPOSITION │ │ GRAPH │
│ ENRICHMENT │ │ EXTRACTION │ │ EXTRACTION │
│ │ │ │ │ │
│ Per L2 chunk: │ │ Per L2 chunk: │ │ Per L2 chunk: │
│ LLM generates │ │ LLM extracts │ │ LLM extracts │
│ context prefix │ │ atomic claims │ │ entities + │
│ │ │ → L3 points │ │ relationships │
│ contextual.py │ │ propositions. │ │ graph_extractor. │
│ │ │ py │ │ py │
└───────┬────────┘ └──────┬────────┘ └─────────┬────────┘
│ │ │
└────────┬────────┘ │
│ │
▼ ▼
┌──────────────────────────────┐ ┌───────────────────────────┐
│ Stage 6: BGE-M3 EMBEDDING │ │ Stage 7: GRAPH STORAGE │
│ forge/ingestion/embedder.py │ │ forge/storage/redis_graph │
│ │ │ .py │
│ Each L0/L1/L2/L3 point: │ │ │
│ → Dense vector (1024-dim) │ │ Entities → Qdrant payload │
│ → Sparse vector (lexical) │ │ Relationships → Redis │
│ → ColBERT vectors (N×1024) │ │ adjacency lists │
└──────────────┬───────────────┘ └──────────────┬──────────────┘
│ │
▼ ▼
┌──────────────────────────────┐ ┌───────────────────────────┐
│ QDRANT │ │ REDIS │
│ Collection: forge_documents │ │ Keys: │
│ │ │ graph:out:{entity_id} │
│ Named vectors: │ │ graph:in:{entity_id} │
│ - dense (1024) │ │ graph:entity:{name} │
│ - sparse (variable) │ │ │
│ - colbert (N×1024) │ │ │
│ │ │ │
│ Payload: │ │ │
│ - text, original_text │ │ │
│ - level (L0/L1/L2/L3) │ │ │
│ - document_id, parent_id │ │ │
│ - entities[], relationships │ │ │
└──────────────────────────────┘ └───────────────────────────┘Stage-by-Stage Breakdown
Stage 1: Document Parsing
# forge/ingestion/parser.py
class DocumentParser:
"""Extracts text and structure from uploaded documents."""
async def parse(self, file_path: str) -> ParsedDocument:
ext = Path(file_path).suffix.lower()
if ext == ".pdf":
return await self._parse_pdf(file_path)
elif ext in (".docx", ".doc"):
return await self._parse_docx(file_path)
elif ext == ".txt":
return await self._parse_txt(file_path)
else:
raise UnsupportedFormatError(f"Unsupported: {ext}")
async def _parse_pdf(self, path: str) -> ParsedDocument:
"""Extract text from PDF using PyMuPDF (fitz)."""
import fitz
doc = fitz.open(path)
pages = []
for page in doc:
text = page.get_text("text")
pages.append(PageContent(
page_number=page.number + 1,
text=text,
headings=self._detect_headings(page),
))
return ParsedDocument(
filename=Path(path).name,
pages=pages,
total_pages=len(pages),
full_text="\n\n".join(p.text for p in pages),
)Output: ParsedDocument with full text, page-by-page content, and detected heading structure.
Stage 2: Hierarchy Construction
The HierarchyBuilder creates all four levels:
# forge/ingestion/hierarchy.py
class HierarchyBuilder:
async def build(self, document: ParsedDocument) -> DocumentHierarchy:
# L0: Document-level summary
l0 = await self.build_l0(document)
# L1: Section-level summaries
l1_sections = await self.build_l1(document)
# L2: Semantic chunks within sections
l2_chunks = await self.build_l2(l1_sections)
# Set parent references
for chunk in l2_chunks:
chunk.parent_section_id = chunk.section.id
chunk.document_id = document.id
return DocumentHierarchy(
l0=l0,
l1=l1_sections,
l2=l2_chunks,
)The semantic chunking algorithm for L2:
async def _semantic_chunk(self, text: str, target_size: int = 512) -> list[str]:
"""Split text at semantic boundaries using embedding similarity."""
sentences = self._split_sentences(text)
# Embed all sentences
embeddings = self.bge_m3.encode([s for s in sentences], return_dense=True)
dense_vecs = embeddings["dense_vecs"]
# Compute similarity between consecutive sentences
similarities = [
cosine_similarity(dense_vecs[i], dense_vecs[i + 1])
for i in range(len(dense_vecs) - 1)
]
# Find split points (where similarity drops)
threshold = np.percentile(similarities, 25) # Bottom 25% = topic shifts
split_points = [i + 1 for i, sim in enumerate(similarities) if sim < threshold]
# Group sentences into chunks, merging small ones
chunks = self._group_sentences(sentences, split_points, target_size)
return chunksSemantic chunking costs ~200ms more per section (one BGE-M3 batch encode for sentence embeddings) but produces dramatically better chunk coherence. Each chunk covers a single topic instead of arbitrarily cutting mid-paragraph. Use hierarchy.L2.method: "fixed" in config.yml if you need faster ingestion for very large corpora.
Stage 3: Contextual Enrichment
Every L2 chunk gets a context prefix generated by the LLM:
# forge/ingestion/contextual.py
async def enrich_batch(self, chunks: list[L2Chunk], l0_summary: str) -> list[L2Chunk]:
"""Enrich a batch of chunks with contextual prefixes."""
tasks = [
self.enrich(chunk, l0_summary)
for chunk in chunks
]
enriched_texts = await asyncio.gather(*tasks)
for chunk, enriched_text in zip(chunks, enriched_texts):
chunk.enriched_text = enriched_text # prefix + original
chunk.original_text = chunk.text # preserved for generation
return chunksSee Contextual Retrieval for the full technique description.
Stage 4: Proposition Extraction
Each L2 chunk yields 1-10 L3 propositions:
# forge/ingestion/propositions.py
async def extract_batch(self, chunks: list[L2Chunk]) -> list[L3Proposition]:
"""Extract propositions from all chunks."""
all_propositions = []
for chunk in chunks:
props = await self.extract(chunk)
for prop in props:
prop.parent_chunk_id = chunk.id
prop.document_id = chunk.document_id
all_propositions.append(prop)
return all_propositionsSee Proposition Indexing for details.
Stage 5: Graph Extraction
Entities and relationships are extracted from each L2 chunk:
# forge/ingestion/graph_extractor.py
async def extract_batch(self, chunks: list[L2Chunk]) -> GraphData:
"""Extract graph data from all chunks."""
all_entities = []
all_relationships = []
for chunk in chunks:
result = await self.extract(chunk)
all_entities.extend(result.entities)
all_relationships.extend(result.relationships)
# Deduplicate entities by name (case-insensitive)
unique_entities = self._deduplicate_entities(all_entities)
return GraphData(
entities=unique_entities,
relationships=all_relationships,
)See Knowledge Graph for the full technique.
Stage 6: BGE-M3 Embedding
All points (L0, L1, L2, L3) are embedded with BGE-M3:
# forge/ingestion/embedder.py
async def embed_all(self, hierarchy: DocumentHierarchy, propositions: list) -> list[PointStruct]:
"""Embed all hierarchy levels and propositions."""
all_texts = []
all_metadata = []
# L0 summary
all_texts.append(hierarchy.l0.text)
all_metadata.append({"level": "L0", "document_id": hierarchy.l0.document_id})
# L1 sections
for section in hierarchy.l1:
all_texts.append(section.text)
all_metadata.append({"level": "L1", "document_id": section.document_id})
# L2 chunks (use enriched text for embedding)
for chunk in hierarchy.l2:
all_texts.append(chunk.enriched_text)
all_metadata.append({
"level": "L2",
"document_id": chunk.document_id,
"original_text": chunk.original_text, # Stored separately
})
# L3 propositions
for prop in propositions:
all_texts.append(prop.text)
all_metadata.append({
"level": "L3",
"document_id": prop.document_id,
"parent_chunk_id": prop.parent_chunk_id,
})
# Batch encode
output = self.bge_m3.encode(
all_texts,
batch_size=self.config.batch_size,
return_dense=True,
return_sparse=True,
return_colbert_vecs=True,
)
# Build Qdrant points
points = []
for i, (text, meta) in enumerate(zip(all_texts, all_metadata)):
points.append(PointStruct(
id=str(uuid.uuid4()),
vector={
"dense": output["dense_vecs"][i].tolist(),
"sparse": SparseVector(
indices=list(output["lexical_weights"][i].keys()),
values=list(output["lexical_weights"][i].values()),
),
"colbert": output["colbert_vecs"][i].tolist(),
},
payload={"text": text, **meta},
))
return pointsStage 7: Storage
# forge/ingestion/pipeline.py (orchestrator)
class IngestionPipeline:
async def ingest(self, file_path: str) -> IngestionResult:
# Parse
document = await self.parser.parse(file_path)
# Build hierarchy
hierarchy = await self.hierarchy_builder.build(document)
# Enrich (contextual retrieval)
if self.config.contextual_retrieval.enabled:
hierarchy.l2 = await self.enricher.enrich_batch(
hierarchy.l2, hierarchy.l0.text
)
# Extract propositions
propositions = []
if self.config.propositions.enabled:
propositions = await self.prop_extractor.extract_batch(hierarchy.l2)
# Extract graph
graph_data = None
if self.config.graph.enabled:
graph_data = await self.graph_extractor.extract_batch(hierarchy.l2)
# Embed everything
points = await self.embedder.embed_all(hierarchy, propositions)
# Store in Qdrant
await self.qdrant.upsert(
collection_name="forge_documents",
points=points,
)
# Store graph in Redis
if graph_data:
await self.graph_store.store(graph_data)
return IngestionResult(
document_id=document.id,
points_created=len(points),
entities_extracted=len(graph_data.entities) if graph_data else 0,
relationships_extracted=len(graph_data.relationships) if graph_data else 0,
)Qdrant Collection Schema
The final Qdrant collection has this structure:
{
"collection_name": "forge_documents",
"vectors_config": {
"dense": { "size": 1024, "distance": "Cosine" },
"colbert": { "size": 1024, "distance": "Cosine", "multivector": { "comparator": "max_sim" } }
},
"sparse_vectors_config": {
"sparse": { "index": { "on_disk": false } }
}
}Each point payload contains:
{
"text": "enriched or original text",
"original_text": "original text without context prefix",
"level": "L0 | L1 | L2 | L3",
"document_id": "doc_abc123",
"parent_id": "parent_section_or_chunk_id",
"entities": [{"name": "...", "type": "PERSON"}],
"relationships": [{"source": "...", "target": "...", "type": "AUTHORED"}],
"type": "summary | section | chunk | proposition",
"page_numbers": [1, 2],
"heading": "Section 3: Financial Results"
}Ingestion Performance
Benchmarked on a 100-page technical document, RTX 4080:
| Stage | Time | Bottleneck |
|---|---|---|
| Parse | ~1s | I/O |
| Hierarchy (L0-L2) | ~45s | LLM calls for summaries |
| Contextual Enrichment | ~120s | LLM call per chunk (~400 chunks) |
| Proposition Extraction | ~90s | LLM call per chunk |
| Graph Extraction | ~60s | LLM call per chunk |
| BGE-M3 Embedding | ~30s | CPU batch encoding |
| Qdrant Upsert | ~2s | Network I/O |
| Redis Graph Store | ~1s | Network I/O |
| Total | ~5-6 min | LLM inference |
The LLM is the bottleneck. Three techniques for faster ingestion: (1) Use a faster/smaller LLM for ingestion only (set FORGE_INGESTION_MODEL separately), (2) Increase LLM_THREADS for parallel inference, (3) Disable costly stages (propositions.enabled: false, graph.enabled: false) for a 2x speedup.
Next Steps
- Query Pipeline — How queries flow through the system
- Streaming Protocol — SSE event format for real-time updates