ArchitectureIngestion Pipeline

Ingestion Pipeline

When a document is uploaded to Forge, it passes through a multi-stage pipeline that transforms raw text into a richly indexed, multi-level, graph-connected, tri-modal vector representation. This page traces the complete data flow.

Pipeline Overview

Document (PDF/DOCX/TXT)


┌──────────────────────────────────────────────────────────────┐
│  Stage 1: PARSE                                              │
│  forge/ingestion/parser.py                                   │
│  PDF → PyMuPDF  |  DOCX → python-docx  |  TXT → utf-8 read │
│  Output: raw text + metadata (title, pages, headings)        │
└──────────────────────┬───────────────────────────────────────┘


┌──────────────────────────────────────────────────────────────┐
│  Stage 2: HIERARCHY CONSTRUCTION                             │
│  forge/ingestion/hierarchy.py                                │
│                                                              │
│  ┌──────────────┐                                            │
│  │ L0: Document  │  LLM generates document-level summary     │
│  │    Summary    │  (300-500 tokens)                         │
│  └──────┬───────┘                                            │
│         │                                                    │
│  ┌──────▼───────┐                                            │
│  │ L1: Section   │  Split at headings, LLM summarizes each   │
│  │    Summaries  │  (200-300 tokens per section)             │
│  └──────┬───────┘                                            │
│         │                                                    │
│  ┌──────▼───────┐                                            │
│  │ L2: Semantic  │  Semantic chunking within sections         │
│  │    Chunks     │  (~512 tokens, topic-boundary aware)       │
│  └──────────────┘                                            │
└──────────────────────┬───────────────────────────────────────┘

           ┌───────────┼───────────────────┐
           │           │                   │
           ▼           ▼                   ▼
┌────────────────┐ ┌───────────────┐ ┌──────────────────┐
│ Stage 3:       │ │ Stage 4:      │ │ Stage 5:         │
│ CONTEXTUAL     │ │ PROPOSITION   │ │ GRAPH            │
│ ENRICHMENT     │ │ EXTRACTION    │ │ EXTRACTION       │
│                │ │               │ │                  │
│ Per L2 chunk:  │ │ Per L2 chunk: │ │ Per L2 chunk:    │
│ LLM generates  │ │ LLM extracts  │ │ LLM extracts     │
│ context prefix │ │ atomic claims │ │ entities +       │
│                │ │ → L3 points   │ │ relationships    │
│ contextual.py  │ │ propositions. │ │ graph_extractor. │
│                │ │ py            │ │ py               │
└───────┬────────┘ └──────┬────────┘ └─────────┬────────┘
        │                 │                     │
        └────────┬────────┘                     │
                 │                              │
                 ▼                              ▼
┌──────────────────────────────┐  ┌───────────────────────────┐
│  Stage 6: BGE-M3 EMBEDDING   │  │  Stage 7: GRAPH STORAGE    │
│  forge/ingestion/embedder.py │  │  forge/storage/redis_graph  │
│                              │  │  .py                        │
│  Each L0/L1/L2/L3 point:    │  │                             │
│  → Dense vector (1024-dim)   │  │  Entities → Qdrant payload  │
│  → Sparse vector (lexical)   │  │  Relationships → Redis      │
│  → ColBERT vectors (N×1024)  │  │  adjacency lists            │
└──────────────┬───────────────┘  └──────────────┬──────────────┘
               │                                 │
               ▼                                 ▼
┌──────────────────────────────┐  ┌───────────────────────────┐
│         QDRANT               │  │          REDIS             │
│  Collection: forge_documents │  │  Keys:                     │
│                              │  │  graph:out:{entity_id}     │
│  Named vectors:              │  │  graph:in:{entity_id}      │
│  - dense (1024)              │  │  graph:entity:{name}       │
│  - sparse (variable)         │  │                             │
│  - colbert (N×1024)          │  │                             │
│                              │  │                             │
│  Payload:                    │  │                             │
│  - text, original_text       │  │                             │
│  - level (L0/L1/L2/L3)      │  │                             │
│  - document_id, parent_id    │  │                             │
│  - entities[], relationships │  │                             │
└──────────────────────────────┘  └───────────────────────────┘

Stage-by-Stage Breakdown

Stage 1: Document Parsing

# forge/ingestion/parser.py
class DocumentParser:
    """Extracts text and structure from uploaded documents."""
 
    async def parse(self, file_path: str) -> ParsedDocument:
        ext = Path(file_path).suffix.lower()
 
        if ext == ".pdf":
            return await self._parse_pdf(file_path)
        elif ext in (".docx", ".doc"):
            return await self._parse_docx(file_path)
        elif ext == ".txt":
            return await self._parse_txt(file_path)
        else:
            raise UnsupportedFormatError(f"Unsupported: {ext}")
 
    async def _parse_pdf(self, path: str) -> ParsedDocument:
        """Extract text from PDF using PyMuPDF (fitz)."""
        import fitz
        doc = fitz.open(path)
        pages = []
        for page in doc:
            text = page.get_text("text")
            pages.append(PageContent(
                page_number=page.number + 1,
                text=text,
                headings=self._detect_headings(page),
            ))
        return ParsedDocument(
            filename=Path(path).name,
            pages=pages,
            total_pages=len(pages),
            full_text="\n\n".join(p.text for p in pages),
        )

Output: ParsedDocument with full text, page-by-page content, and detected heading structure.

Stage 2: Hierarchy Construction

The HierarchyBuilder creates all four levels:

# forge/ingestion/hierarchy.py
class HierarchyBuilder:
    async def build(self, document: ParsedDocument) -> DocumentHierarchy:
        # L0: Document-level summary
        l0 = await self.build_l0(document)
 
        # L1: Section-level summaries
        l1_sections = await self.build_l1(document)
 
        # L2: Semantic chunks within sections
        l2_chunks = await self.build_l2(l1_sections)
 
        # Set parent references
        for chunk in l2_chunks:
            chunk.parent_section_id = chunk.section.id
            chunk.document_id = document.id
 
        return DocumentHierarchy(
            l0=l0,
            l1=l1_sections,
            l2=l2_chunks,
        )

The semantic chunking algorithm for L2:

    async def _semantic_chunk(self, text: str, target_size: int = 512) -> list[str]:
        """Split text at semantic boundaries using embedding similarity."""
        sentences = self._split_sentences(text)
 
        # Embed all sentences
        embeddings = self.bge_m3.encode([s for s in sentences], return_dense=True)
        dense_vecs = embeddings["dense_vecs"]
 
        # Compute similarity between consecutive sentences
        similarities = [
            cosine_similarity(dense_vecs[i], dense_vecs[i + 1])
            for i in range(len(dense_vecs) - 1)
        ]
 
        # Find split points (where similarity drops)
        threshold = np.percentile(similarities, 25)  # Bottom 25% = topic shifts
        split_points = [i + 1 for i, sim in enumerate(similarities) if sim < threshold]
 
        # Group sentences into chunks, merging small ones
        chunks = self._group_sentences(sentences, split_points, target_size)
        return chunks
Semantic vs. fixed chunking

Semantic chunking costs ~200ms more per section (one BGE-M3 batch encode for sentence embeddings) but produces dramatically better chunk coherence. Each chunk covers a single topic instead of arbitrarily cutting mid-paragraph. Use hierarchy.L2.method: "fixed" in config.yml if you need faster ingestion for very large corpora.

Stage 3: Contextual Enrichment

Every L2 chunk gets a context prefix generated by the LLM:

# forge/ingestion/contextual.py
async def enrich_batch(self, chunks: list[L2Chunk], l0_summary: str) -> list[L2Chunk]:
    """Enrich a batch of chunks with contextual prefixes."""
    tasks = [
        self.enrich(chunk, l0_summary)
        for chunk in chunks
    ]
    enriched_texts = await asyncio.gather(*tasks)
 
    for chunk, enriched_text in zip(chunks, enriched_texts):
        chunk.enriched_text = enriched_text  # prefix + original
        chunk.original_text = chunk.text      # preserved for generation
 
    return chunks

See Contextual Retrieval for the full technique description.

Stage 4: Proposition Extraction

Each L2 chunk yields 1-10 L3 propositions:

# forge/ingestion/propositions.py
async def extract_batch(self, chunks: list[L2Chunk]) -> list[L3Proposition]:
    """Extract propositions from all chunks."""
    all_propositions = []
    for chunk in chunks:
        props = await self.extract(chunk)
        for prop in props:
            prop.parent_chunk_id = chunk.id
            prop.document_id = chunk.document_id
            all_propositions.append(prop)
    return all_propositions

See Proposition Indexing for details.

Stage 5: Graph Extraction

Entities and relationships are extracted from each L2 chunk:

# forge/ingestion/graph_extractor.py
async def extract_batch(self, chunks: list[L2Chunk]) -> GraphData:
    """Extract graph data from all chunks."""
    all_entities = []
    all_relationships = []
 
    for chunk in chunks:
        result = await self.extract(chunk)
        all_entities.extend(result.entities)
        all_relationships.extend(result.relationships)
 
    # Deduplicate entities by name (case-insensitive)
    unique_entities = self._deduplicate_entities(all_entities)
 
    return GraphData(
        entities=unique_entities,
        relationships=all_relationships,
    )

See Knowledge Graph for the full technique.

Stage 6: BGE-M3 Embedding

All points (L0, L1, L2, L3) are embedded with BGE-M3:

# forge/ingestion/embedder.py
async def embed_all(self, hierarchy: DocumentHierarchy, propositions: list) -> list[PointStruct]:
    """Embed all hierarchy levels and propositions."""
    all_texts = []
    all_metadata = []
 
    # L0 summary
    all_texts.append(hierarchy.l0.text)
    all_metadata.append({"level": "L0", "document_id": hierarchy.l0.document_id})
 
    # L1 sections
    for section in hierarchy.l1:
        all_texts.append(section.text)
        all_metadata.append({"level": "L1", "document_id": section.document_id})
 
    # L2 chunks (use enriched text for embedding)
    for chunk in hierarchy.l2:
        all_texts.append(chunk.enriched_text)
        all_metadata.append({
            "level": "L2",
            "document_id": chunk.document_id,
            "original_text": chunk.original_text,  # Stored separately
        })
 
    # L3 propositions
    for prop in propositions:
        all_texts.append(prop.text)
        all_metadata.append({
            "level": "L3",
            "document_id": prop.document_id,
            "parent_chunk_id": prop.parent_chunk_id,
        })
 
    # Batch encode
    output = self.bge_m3.encode(
        all_texts,
        batch_size=self.config.batch_size,
        return_dense=True,
        return_sparse=True,
        return_colbert_vecs=True,
    )
 
    # Build Qdrant points
    points = []
    for i, (text, meta) in enumerate(zip(all_texts, all_metadata)):
        points.append(PointStruct(
            id=str(uuid.uuid4()),
            vector={
                "dense": output["dense_vecs"][i].tolist(),
                "sparse": SparseVector(
                    indices=list(output["lexical_weights"][i].keys()),
                    values=list(output["lexical_weights"][i].values()),
                ),
                "colbert": output["colbert_vecs"][i].tolist(),
            },
            payload={"text": text, **meta},
        ))
 
    return points

Stage 7: Storage

# forge/ingestion/pipeline.py (orchestrator)
class IngestionPipeline:
    async def ingest(self, file_path: str) -> IngestionResult:
        # Parse
        document = await self.parser.parse(file_path)
 
        # Build hierarchy
        hierarchy = await self.hierarchy_builder.build(document)
 
        # Enrich (contextual retrieval)
        if self.config.contextual_retrieval.enabled:
            hierarchy.l2 = await self.enricher.enrich_batch(
                hierarchy.l2, hierarchy.l0.text
            )
 
        # Extract propositions
        propositions = []
        if self.config.propositions.enabled:
            propositions = await self.prop_extractor.extract_batch(hierarchy.l2)
 
        # Extract graph
        graph_data = None
        if self.config.graph.enabled:
            graph_data = await self.graph_extractor.extract_batch(hierarchy.l2)
 
        # Embed everything
        points = await self.embedder.embed_all(hierarchy, propositions)
 
        # Store in Qdrant
        await self.qdrant.upsert(
            collection_name="forge_documents",
            points=points,
        )
 
        # Store graph in Redis
        if graph_data:
            await self.graph_store.store(graph_data)
 
        return IngestionResult(
            document_id=document.id,
            points_created=len(points),
            entities_extracted=len(graph_data.entities) if graph_data else 0,
            relationships_extracted=len(graph_data.relationships) if graph_data else 0,
        )

Qdrant Collection Schema

The final Qdrant collection has this structure:

{
  "collection_name": "forge_documents",
  "vectors_config": {
    "dense": { "size": 1024, "distance": "Cosine" },
    "colbert": { "size": 1024, "distance": "Cosine", "multivector": { "comparator": "max_sim" } }
  },
  "sparse_vectors_config": {
    "sparse": { "index": { "on_disk": false } }
  }
}

Each point payload contains:

{
  "text": "enriched or original text",
  "original_text": "original text without context prefix",
  "level": "L0 | L1 | L2 | L3",
  "document_id": "doc_abc123",
  "parent_id": "parent_section_or_chunk_id",
  "entities": [{"name": "...", "type": "PERSON"}],
  "relationships": [{"source": "...", "target": "...", "type": "AUTHORED"}],
  "type": "summary | section | chunk | proposition",
  "page_numbers": [1, 2],
  "heading": "Section 3: Financial Results"
}

Ingestion Performance

Benchmarked on a 100-page technical document, RTX 4080:

StageTimeBottleneck
Parse~1sI/O
Hierarchy (L0-L2)~45sLLM calls for summaries
Contextual Enrichment~120sLLM call per chunk (~400 chunks)
Proposition Extraction~90sLLM call per chunk
Graph Extraction~60sLLM call per chunk
BGE-M3 Embedding~30sCPU batch encoding
Qdrant Upsert~2sNetwork I/O
Redis Graph Store~1sNetwork I/O
Total~5-6 minLLM inference
Speeding up ingestion

The LLM is the bottleneck. Three techniques for faster ingestion: (1) Use a faster/smaller LLM for ingestion only (set FORGE_INGESTION_MODEL separately), (2) Increase LLM_THREADS for parallel inference, (3) Disable costly stages (propositions.enabled: false, graph.enabled: false) for a 2x speedup.

Next Steps