Last March, our team shipped a RAG system that now handles ~10,000 daily active users asking questions over a corpus of 2.3 million documents. Getting there took six months of iteration, three major rewrites of the retrieval layer, and more late-night debugging sessions than I care to admit. This is the production RAG architecture guide I wish existed when we started.
I am going to walk through every layer of our pipeline, share the actual Python code we use, and be honest about what broke along the way. If you are building a retrieval augmented generation system for real users (not a weekend demo), this should save you a few weeks of pain.
Note: All latency and cost numbers are from our production environment as of late 2025. Your mileage will vary depending on document types, query patterns, and infrastructure choices.
Key Takeaways (TL;DR)
- Chunk size matters more than embedding model choice. We spent weeks comparing embedding models when the real problem was 512-token chunks splitting tables and code blocks in half.
- Reranking is not optional. A cross-encoder reranker cut our hallucination rate by 38% with only 40ms added latency.
- pgvector is good enough for most teams. We migrated from Pinecone to pgvector at 1.5M vectors and saved $1,400/month with no measurable recall drop.
- Evaluation is the bottleneck, not model quality. You cannot improve what you cannot measure. Build your eval harness before optimizing retrieval.
- Cache aggressively. 60% of our queries are near-duplicates. Semantic caching dropped our p95 latency from 3.2s to 0.8s.
Architecture Overview
Before diving into code, here is the full pipeline layout. Every production RAG pipeline tutorial covers these stages, but the devil is in how they connect:
"""
Production RAG Pipeline Architecture
=====================================
INGESTION LAYER
Raw Documents (PDF, HTML, Markdown, DOCX)
│
▼
Document Parser (unstructured.io + custom extractors)
│
▼
Content Cleaner (normalize unicode, strip boilerplate, detect language)
│
▼
CHUNKING LAYER
Semantic Chunker (recursive split + structure-aware boundaries)
│
▼
Chunk Metadata Enrichment (source, section headers, doc type, dates)
│
▼
EMBEDDING LAYER
Embedding Model (text-embedding-3-small, batched)
│
▼
Vector Store (pgvector on PostgreSQL 16)
│
▼
RETRIEVAL LAYER
Query → Hybrid Search (vector similarity + BM25 keyword)
│
▼
Cross-Encoder Reranker (ms-marco-MiniLM-L-12-v2)
│
▼
GENERATION LAYER
Prompt Construction (system + context + query + guardrails)
│
▼
LLM (GPT-4o-mini for speed, GPT-4o for complex queries)
│
▼
Response + Citations
"""
Each stage is a separate service communicating over internal gRPC. But you do not need microservices to start. A single FastAPI app with background workers works fine up to ~500 concurrent users.
Chunking: Where Most RAG Pipelines Silently Fail
This is the part that took us the longest to get right. Everyone starts with LangChain's RecursiveCharacterTextSplitter at 512 tokens, and honestly, that is fine for blog posts. But our corpus had API docs, legal contracts, research papers with tables, and support tickets. A fixed-size splitter destroyed all of them.
The core problem: a chunk that splits a table in half, or cuts a code example before the return statement, is worse than useless. It actively causes hallucinations because the LLM tries to complete the partial information.
Here is the chunking strategy we settled on:
from dataclasses import dataclass, field
from typing import Optional
import tiktoken
enc = tiktoken.encoding_for_model("gpt-4o")
@dataclass
class Chunk:
text: str
metadata: dict = field(default_factory=dict)
token_count: int = 0
def __post_init__(self):
self.token_count = len(enc.encode(self.text))
def semantic_chunker(
text: str,
doc_type: str = "general",
max_tokens: int = 768,
min_tokens: int = 100,
overlap_tokens: int = 64,
) -> list[Chunk]:
"""
Structure-aware chunker that respects document boundaries.
Key insight: different doc types need different split points.
Code docs split on function/class boundaries.
Legal docs split on section headers.
General text splits on paragraph boundaries with semantic overlap.
"""
# Define split hierarchy by document type
split_rules = {
"code_docs": ["\n## ", "\n### ", "\n```\n", "\n\n", "\n"],
"legal": ["\nARTICLE ", "\nSection ", "\n\n", "\n"],
"research": ["\n## ", "\n### ", "\n\n", ". "],
"general": ["\n## ", "\n### ", "\n\n", "\n", ". "],
}
separators = split_rules.get(doc_type, split_rules["general"])
chunks = []
current_text = text
previous_tail = "" # for overlap
while current_text:
token_count = len(enc.encode(current_text))
if token_count <= max_tokens:
chunks.append(Chunk(
text=(previous_tail + current_text).strip(),
metadata={"doc_type": doc_type, "chunk_index": len(chunks)},
))
break
# Find the best split point within max_tokens
split_pos = _find_split_point(current_text, separators, max_tokens)
chunk_text = previous_tail + current_text[:split_pos]
if len(enc.encode(chunk_text.strip())) >= min_tokens:
chunks.append(Chunk(
text=chunk_text.strip(),
metadata={"doc_type": doc_type, "chunk_index": len(chunks)},
))
# Grab overlap from end of current chunk
overlap_text = current_text[max(0, split_pos - _tokens_to_chars(overlap_tokens))
:split_pos]
previous_tail = overlap_text
current_text = current_text[split_pos:]
return chunks
def _find_split_point(text: str, separators: list[str], max_tokens: int) -> int:
"""Find the latest separator position that keeps us under max_tokens."""
# Convert max_tokens to approximate char position (rough: 4 chars/token)
max_chars = max_tokens * 4
search_region = text[:max_chars]
for sep in separators:
pos = search_region.rfind(sep)
if pos > 0:
# Verify token count
candidate = text[:pos + len(sep)]
if len(enc.encode(candidate)) <= max_tokens:
return pos + len(sep)
# Fallback: hard split at max_tokens boundary
return max_chars
def _tokens_to_chars(tokens: int) -> int:
return tokens * 4 # rough approximation
We use 768 tokens instead of the typical 512. After A/B testing five chunk sizes (256, 512, 768, 1024, 1536), 768 gave the best recall@10. Smaller chunks had better precision but missed context; larger chunks diluted the signal with noise.
The 64-token overlap is deliberate. Without it, queries about content near chunk boundaries had 23% worse retrieval quality. Sixty-four tokens captures a full sentence of context without bloating storage.
Embedding Pipeline: What We Tried and What Stuck
We evaluated five embedding models over two months. Here is the honest comparison from our benchmarks on a held-out set of 500 query-document pairs:
| Model | Recall@10 | Latency (p50) | Cost per 1M tokens | Dimensions |
|---|---|---|---|---|
| OpenAI text-embedding-3-small | 0.89 | 45ms | $0.02 | 1536 |
| OpenAI text-embedding-3-large | 0.91 | 62ms | $0.13 | 3072 |
| Cohere embed-v3 | 0.90 | 55ms | $0.10 | 1024 |
| BGE-large-en-v1.5 (self-hosted) | 0.87 | 28ms | ~$0.005* | 1024 |
| E5-mistral-7b (self-hosted) | 0.92 | 180ms | ~$0.02* | 4096 |
* Self-hosted costs estimated from GPU rental (A10G on AWS) amortized over monthly volume.
We went with text-embedding-3-small. The 2% recall gap versus the large model was not worth 6.5x the cost at our volume (~15M tokens/day). If you are under 1M tokens/day, just use the large model. For teams that need to self-host, BGE-large is remarkably good on a single A10G (200 req/s with batching).
Here is our embedding pipeline code:
import asyncio
import numpy as np
from openai import AsyncOpenAI
from typing import Optional
import hashlib
import json
import redis.asyncio as redis
client = AsyncOpenAI()
cache = redis.from_url("redis://localhost:6379/1")
EMBED_MODEL = "text-embedding-3-small"
BATCH_SIZE = 100 # OpenAI limit is 2048, but 100 is safer for rate limits
EMBEDDING_DIM = 1536
async def embed_texts(
texts: list[str],
use_cache: bool = True,
) -> np.ndarray:
"""
Embed a list of texts with Redis caching and batching.
Returns numpy array of shape (len(texts), EMBEDDING_DIM).
Caches individual embeddings by content hash to avoid
re-embedding unchanged chunks on re-index.
"""
embeddings = np.zeros((len(texts), EMBEDDING_DIM), dtype=np.float32)
uncached_indices = []
uncached_texts = []
if use_cache:
# Check cache for each text
cache_keys = [f"emb:{hashlib.md5(t.encode()).hexdigest()}" for t in texts]
cached = await cache.mget(cache_keys)
for i, val in enumerate(cached):
if val is not None:
embeddings[i] = np.frombuffer(val, dtype=np.float32)
else:
uncached_indices.append(i)
uncached_texts.append(texts[i])
else:
uncached_indices = list(range(len(texts)))
uncached_texts = texts
if not uncached_texts:
return embeddings
# Batch embed uncached texts
for batch_start in range(0, len(uncached_texts), BATCH_SIZE):
batch = uncached_texts[batch_start:batch_start + BATCH_SIZE]
response = await client.embeddings.create(
model=EMBED_MODEL,
input=batch,
)
for j, item in enumerate(response.data):
idx = uncached_indices[batch_start + j]
vec = np.array(item.embedding, dtype=np.float32)
embeddings[idx] = vec
# Cache the embedding
if use_cache:
cache_key = f"emb:{hashlib.md5(batch[j].encode()).hexdigest()}"
await cache.set(cache_key, vec.tobytes(), ex=86400 * 7) # 7 day TTL
return embeddings
The Redis caching layer matters. During daily re-indexing (~5,000 new documents/day), 70% of chunks are unchanged. Caching saves ~$8/day in API costs and cuts re-index time from 45 to 12 minutes.
Vector Store: pgvector vs. Pinecone (We Switched)
We started on Pinecone. At 500K vectors ($70/month, s1 pod), it was fine. Then our corpus grew to 1.5M vectors and costs hit $1,800/month. We evaluated Pinecone serverless (400ms+ cold starts), Weaviate Cloud ($900/month), and pgvector on our existing PostgreSQL 16 instance. pgvector won:
- Cost: $0/month incremental (we already had a 64GB RAM Postgres instance)
- Latency: 8ms p50 for top-20 retrieval with HNSW index (Pinecone was 12ms)
- Operational simplicity: One less service to monitor, one less vendor to manage
- Filtering: Native SQL WHERE clauses are far more flexible than Pinecone metadata filters
The setup is straightforward:
-- pgvector setup (run once)
CREATE EXTENSION IF NOT EXISTS vector;
CREATE TABLE document_chunks (
id BIGSERIAL PRIMARY KEY,
document_id BIGINT REFERENCES documents(id) ON DELETE CASCADE,
chunk_index INTEGER NOT NULL,
content TEXT NOT NULL,
token_count INTEGER NOT NULL,
embedding vector(1536),
metadata JSONB DEFAULT '{}',
created_at TIMESTAMPTZ DEFAULT NOW(),
-- Composite index for filtered searches
CONSTRAINT unique_doc_chunk UNIQUE (document_id, chunk_index)
);
-- HNSW index: ef_construction=128 gives good recall, m=16 is default
-- This takes ~20 minutes to build on 1.5M vectors
CREATE INDEX ON document_chunks
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 128);
-- GIN index for metadata filtering
CREATE INDEX ON document_chunks USING GIN (metadata);
One gotcha with pgvector: you need to set hnsw.ef_search at query time. The default of 40 gives decent recall, but we bumped it to 100 for production. The latency penalty is about 3ms but recall@10 improved from 0.86 to 0.91.
import asyncpg
from typing import Optional
async def vector_search(
pool: asyncpg.Pool,
query_embedding: list[float],
top_k: int = 20,
doc_type_filter: Optional[str] = None,
min_date: Optional[str] = None,
) -> list[dict]:
"""
Hybrid vector search: cosine similarity + optional metadata filters.
We retrieve 2x top_k from vector search because the reranker
will rescore and prune. Over-retrieval is cheap; missed relevant
docs are expensive.
"""
async with pool.acquire() as conn:
# Set HNSW search parameters for this query
await conn.execute("SET hnsw.ef_search = 100")
filters = []
params = [query_embedding, top_k * 2] # Over-retrieve for reranker
param_idx = 3
if doc_type_filter:
filters.append(f"metadata->>'doc_type' = ${param_idx}")
params.append(doc_type_filter)
param_idx += 1
if min_date:
filters.append(f"created_at >= ${param_idx}::timestamptz")
params.append(min_date)
param_idx += 1
where_clause = "WHERE " + " AND ".join(filters) if filters else ""
query = f"""
SELECT id, document_id, content, metadata,
1 - (embedding <=> $1::vector) AS similarity
FROM document_chunks
{where_clause}
ORDER BY embedding <=> $1::vector
LIMIT $2
"""
rows = await conn.fetch(query, *params)
return [dict(r) for r in rows]
The Reranking Layer That Changed Everything
This is the single highest-ROI change we made to the pipeline. Before adding a cross-encoder reranker, our answer quality scores (judged by a human eval panel of 5 people on a 1-5 scale) averaged 3.2. After reranking: 4.1. That is a massive jump.
Bi-encoder embeddings are fast but lossy: they compress a chunk into a single vector. A cross-encoder examines the query and document together, token by token, producing a much more accurate relevance score. The tradeoff is speed: you can only rerank ~50-100 candidates in real time.
from sentence_transformers import CrossEncoder
import numpy as np
# Load once at startup, ~500MB memory
reranker = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-12-v2", max_length=512)
def rerank_chunks(
query: str,
chunks: list[dict],
top_k: int = 5,
score_threshold: float = 0.15,
) -> list[dict]:
"""
Rerank retrieved chunks using cross-encoder.
Takes ~40ms for 20 candidates on CPU.
On GPU (T4), it's ~8ms for 20 candidates.
We filter by score_threshold to avoid including irrelevant
chunks that happened to be the "least bad" in the vector store.
This is critical for hallucination reduction.
"""
if not chunks:
return []
pairs = [(query, chunk["content"]) for chunk in chunks]
scores = reranker.predict(pairs)
# Attach scores and sort
for chunk, score in zip(chunks, scores):
chunk["rerank_score"] = float(score)
ranked = sorted(chunks, key=lambda x: x["rerank_score"], reverse=True)
# Filter by threshold AND top_k
filtered = [c for c in ranked if c["rerank_score"] >= score_threshold]
return filtered[:top_k]
The score_threshold parameter is important. Without it, the LLM always gets top_k chunks, even if the bottom ones are barely relevant. Those low-relevance chunks are hallucination fuel. We found 0.15 to be a good cutoff for ms-marco-MiniLM, but you should calibrate this on your own eval set.
Prompt Construction: Less Is More
After all that retrieval work, the prompt itself is surprisingly simple. We tried elaborate chain-of-thought prompts, multi-step verification prompts, and XML-structured prompts. The winner was the most straightforward version.
def build_rag_prompt(
query: str,
chunks: list[dict],
conversation_history: list[dict] | None = None,
max_context_tokens: int = 6000,
) -> list[dict]:
"""
Construct the final prompt for the LLM.
Key design decisions:
1. System prompt is short and specific about behavior
2. Context chunks include source metadata for citations
3. Hard instruction to say "I don't know" when context is insufficient
4. No chain-of-thought — it increased latency 3x with minimal quality gain
"""
# Build context string with source attribution
context_parts = []
total_tokens = 0
for i, chunk in enumerate(chunks):
source = chunk.get("metadata", {}).get("source", "Unknown")
section = chunk.get("metadata", {}).get("section", "")
label = f"[Source {i+1}: {source}"
if section:
label += f" > {section}"
label += "]"
chunk_text = f"{label}\n{chunk['content']}"
chunk_tokens = len(enc.encode(chunk_text))
if total_tokens + chunk_tokens > max_context_tokens:
break
context_parts.append(chunk_text)
total_tokens += chunk_tokens
context = "\n\n---\n\n".join(context_parts)
system_msg = (
"You are a technical assistant. Answer the user's question using ONLY "
"the provided context. Always cite your sources using [Source N] notation. "
"If the context does not contain enough information to answer confidently, "
"say so explicitly. Do not make up information."
)
messages = [{"role": "system", "content": system_msg}]
# Add conversation history if multi-turn
if conversation_history:
messages.extend(conversation_history[-4:]) # last 2 turns
user_msg = f"Context:\n{context}\n\nQuestion: {query}"
messages.append({"role": "user", "content": user_msg})
return messages
One surprise: removing "think step by step" from the system prompt actually improved answer quality. Step-by-step reasoning caused the model to "fill in" missing context with plausible-sounding but unsupported information. With a direct answer instruction, it was more likely to say "I don't have enough information" when the context was thin.
RAG Best Practices: Hard-Won Lessons
Here are the specific pitfalls we hit and how we solved them. Consider this the RAG best practices section distilled from six months of production experience.
1. The Chunk Boundary Problem
Symptom: Users ask "what are the pricing tiers?" and the answer only mentions two of four tiers because the pricing table got split across chunks.
Fix: Structure-aware chunking (shown above) plus a "chunk merging" step at retrieval time. If two consecutive chunks from the same document both appear in the top-10, we merge them into a single context block.
2. Embedding Drift Over Time
Symptom: Retrieval quality slowly degrades over months even though you did not change anything.
Fix: Your documents change but your eval set does not. We re-sample 50 new eval queries from production logs every week and add them to our test suite. Also, if you switch embedding models, you must re-embed everything. There is no way around this.
3. The "Confident Hallucination" Problem
Symptom: The LLM gives a precise, well-formatted answer that is completely wrong, citing a chunk that is only tangentially related.
Fix: Three things, in order of impact: (a) reranker with score threshold to exclude weak matches, (b) explicit "say I don't know" instruction in the system prompt, (c) a lightweight post-generation check that verifies cited source numbers actually exist in the context.
4. Query-Document Mismatch
Symptom: User asks "how do I reset my password?" but the relevant doc says "credential recovery process" and never uses the word "password."
Fix: Hybrid search. We added BM25 keyword matching alongside vector search using PostgreSQL's full-text search, then merge results before reranking. You can also add query expansion (LLM rephrases the question into 2-3 variants). We do both.
async def hybrid_search(
pool: asyncpg.Pool,
query: str,
query_embedding: list[float],
top_k: int = 20,
vector_weight: float = 0.7,
) -> list[dict]:
"""
Combine vector similarity with BM25 keyword scoring.
RRF (Reciprocal Rank Fusion) merges the two ranked lists.
vector_weight controls the balance — 0.7 works well when
your embedding model is strong.
"""
async with pool.acquire() as conn:
await conn.execute("SET hnsw.ef_search = 100")
# Combined query using RRF
results = await conn.fetch("""
WITH vector_results AS (
SELECT id, content, metadata, document_id,
ROW_NUMBER() OVER (
ORDER BY embedding <=> $1::vector
) AS vector_rank
FROM document_chunks
ORDER BY embedding <=> $1::vector
LIMIT $2
),
keyword_results AS (
SELECT id, content, metadata, document_id,
ROW_NUMBER() OVER (
ORDER BY ts_rank(
to_tsvector('english', content),
plainto_tsquery('english', $3)
) DESC
) AS keyword_rank
FROM document_chunks
WHERE to_tsvector('english', content) @@
plainto_tsquery('english', $3)
LIMIT $2
)
SELECT COALESCE(v.id, k.id) AS id,
COALESCE(v.content, k.content) AS content,
COALESCE(v.metadata, k.metadata) AS metadata,
COALESCE(v.document_id, k.document_id) AS document_id,
(COALESCE($4::float / (60 + v.vector_rank), 0) +
COALESCE((1 - $4::float) / (60 + k.keyword_rank), 0)
) AS rrf_score
FROM vector_results v
FULL OUTER JOIN keyword_results k ON v.id = k.id
ORDER BY rrf_score DESC
LIMIT $2
""", query_embedding, top_k, query, vector_weight)
return [dict(r) for r in results]
5. Cost Spirals at Scale
At 10K DAU, we were burning through LLM tokens. Our monthly OpenAI bill hit $4,200 before we optimized. Here is what brought it down to $1,100:
- Semantic cache: Redis-based, hashes the query embedding and checks for near-neighbors (cosine > 0.97). Hit rate: 58%. Savings: ~$1,800/month.
- Model routing: Simple questions (detected via query length + keyword heuristics) go to GPT-4o-mini ($0.15/1M tokens). Complex multi-hop questions go to GPT-4o ($2.50/1M tokens). 73% of queries are "simple." Savings: ~$900/month.
- Context compression: We reduced max context from 8K to 6K tokens. Minimal quality impact, 25% fewer output tokens on average. Savings: ~$400/month.
LangChain vs. LlamaIndex: Our Take
We started with LangChain (v0.1) and migrated most of our pipeline to custom code by month three. The abstraction layers got in our way more than they helped: debugging retrieval was opaque, customizing chunking required monkey-patching internals, and error handling was inconsistent across chain types.
LlamaIndex was better for our use case. If we were starting today, we would use LlamaIndex for ingestion and indexing (their document loaders and node parsers are genuinely good) and write the retrieval and generation layers ourselves. That said, if your team is small and you want to ship fast, LangChain with LangSmith tracing is productive for getting a v1 out the door. Just be prepared to replace pieces as you scale.
Evaluation: The Part Nobody Wants to Build
You need three things to evaluate a RAG pipeline properly:
- Retrieval eval: Given a query, are the right chunks in the top-K? Metrics: Recall@K, MRR, NDCG.
- Generation eval: Given the right context, is the answer correct? Metrics: Faithfulness (does it only use provided context?), relevance (does it answer the question?), completeness.
- End-to-end eval: From user query to final answer, is the user satisfied? Metrics: thumbs up/down from users, support ticket deflection rate.
For retrieval eval, we maintain 200 query-chunk pairs labeled by domain experts. Every pipeline change gets benchmarked in CI. If recall@10 drops below 0.85, the deploy is blocked. For generation eval, we use LLM-as-judge (GPT-4o scoring faithfulness 1-5), which correlates at 0.82 with our human panel. We still do weekly human evals on 20 random production queries as a sanity check.
Production Monitoring
We track seven key metrics in Prometheus/Grafana: retrieval latency (target < 50ms p95), generation latency (target < 2s p95), cache hit rate (target > 50%), reranker score distribution (watches for corpus drift), empty result rate (should be < 5%), token usage per query by model, and user feedback ratio (target > 0.8).
The "empty result rate" metric saved us once. It spiked from 3% to 22% one Monday morning. A deploy had changed the embedding model config without re-indexing, so query embeddings from the new model had low similarity with old vectors. We caught it in 15 minutes because of the alert.
Final Architecture and Numbers
Here is where our production RAG pipeline stands today:
| Metric | Value |
|---|---|
| Corpus size | 2.3M documents, 8.4M chunks |
| Daily active users | ~10,000 |
| Queries per day | ~45,000 |
| End-to-end latency (p50) | 1.1s |
| End-to-end latency (p95) | 2.4s |
| Retrieval recall@10 | 0.91 |
| User satisfaction (thumbs up rate) | 83% |
| Monthly infrastructure cost | ~$2,800 (compute + API) |
| Hallucination rate (human-judged) | ~4% |
The biggest lever on quality was the reranker. The biggest lever on cost was semantic caching. The biggest lever on latency was model routing. And the biggest lever on all three was having an eval harness to actually measure the impact of each change.
What I Would Do Differently
If I were starting a new production RAG pipeline today, I would change three things:
- Build the eval harness first. Before writing a single line of retrieval code. We wasted three weeks optimizing chunk size by vibes before we had numbers. Never again.
- Start with pgvector. Unless you have a specific reason for a managed vector database (multi-region, serverless scaling), pgvector on a decent Postgres instance is all you need up to at least 10M vectors.
- Use structured outputs from the start. We bolted on JSON-mode output formatting in month four to get reliable citations. Should have done it from day one. OpenAI's structured outputs and Anthropic's tool use both make this straightforward now.
Building a production RAG system is more engineering than research. The models are good enough. The hard part is monitoring, evaluation, cost management, and handling the long tail of weird queries your users will inevitably throw at it.
Leave a Comment