Prompt Engineering for Data Pipelines: Using LLMs to Clean, Classify, and Enrich Data

Prompt Engineering for Data Pipelines: Using LLMs to Clean, Classify, and Enrich Data

Six months ago, I started sneaking LLM calls into our data pipelines at work. Not as some grand AI initiative, just out of frustration. We had a customer feedback ingestion pipeline that spent 40% of its processing time in a rats' nest of regex patterns and hand-tuned heuristics for classifying support tickets. Every new product launch meant another week of updating rules. So one Friday afternoon, I replaced the whole classification stage with a single GPT-4o-mini call. It worked better on the first try than our rules engine did after two years of tweaking.

That experiment turned into a systematic effort to figure out where LLMs actually belong in data pipelines, and more importantly, where they absolutely do not. This article is the result of six months of production use across four different pipelines, roughly $12,000 in API costs, and a lot of lessons about prompt design for structured data work.

Scope note: Everything here uses OpenAI's API (GPT-4o-mini and GPT-4o), but the patterns apply to any LLM with structured output support. I have tested the same prompts on Claude 3.5 Sonnet and Gemini 1.5 Flash with minor adjustments. Cost numbers are from OpenAI pricing as of early 2026.

Key Takeaways (TL;DR)

  • LLMs excel at fuzzy classification, entity extraction, and data standardization where rules would require hundreds of edge cases.
  • JSON mode and structured outputs are non-negotiable for pipeline integration. Free-form text responses will break your downstream consumers.
  • Batching is the difference between a $50/month task and a $5,000/month disaster. Group records, use async calls, and cache aggressively.
  • Always build fallback paths. LLM APIs go down, rate limits hit, and models occasionally hallucinate. Your pipeline cannot stop.
  • For many tasks, GPT-4o-mini at $0.15/1M input tokens is cheaper than maintaining a custom ML model when you factor in training data curation, retraining, and monitoring.

Where LLMs Actually Fit in a Data Pipeline

Not every stage of an ETL pipeline benefits from an LLM. I wasted real money learning this. Here is where I have found them genuinely useful versus where traditional approaches still win.

The sweet spot: tasks that are easy for humans but hard to codify

Think about the work you do when manually reviewing data quality issues. You glance at a company name and know "Microsft Corp." is Microsoft Corporation. You read a support ticket and immediately know it is about billing, not a technical issue, even though the word "billing" never appears. You see an address and know "123 Main St, Apt 4B, Springfield IL" needs to become a structured object with street, unit, city, and state fields.

These tasks share a common trait: a human can do them in seconds, but writing deterministic code to handle all edge cases takes weeks and still misses things. That is where LLMs shine in pipelines.

The five use cases that actually work

  1. Text classification - Categorizing free-text into predefined labels (support tickets, feedback sentiment, document types, lead scoring)
  2. Entity extraction - Pulling structured fields from unstructured text (names, dates, amounts, product references from emails or PDFs)
  3. Data cleaning and standardization - Normalizing messy inputs (company names, addresses, job titles, product descriptions)
  4. Data enrichment - Adding context that requires world knowledge (industry classification from company descriptions, technology stack detection from job postings)
  5. Schema mapping - Matching fields between incompatible schemas when you are integrating data from a new vendor or acquisition

Where LLMs are a waste of money

Do not use an LLM for: deduplication on structured fields (use fuzzy matching), date parsing (use dateutil), currency conversion (use a lookup table), anything with a clear deterministic rule. I burned about $800 in API calls using GPT-4o to parse dates from invoices before realizing that a combination of dateutil.parser and three regex patterns handled 99.7% of cases. The LLM handled 99.9%, but that 0.2% improvement was not worth $200/month.

Prompt Design for Structured Pipeline Output

The single most important lesson I learned: prompt engineering for pipelines is fundamentally different from prompt engineering for chatbots. In a chatbot, you optimize for helpfulness and nuance. In a pipeline, you optimize for consistency, parseable output, and graceful failure.

Rule 1: Always use JSON mode or structured outputs

Never ask an LLM to return free-form text in a pipeline. Ever. I do not care if it "usually" returns the right format. "Usually" means your pipeline crashes at 3 AM on a Saturday when the model decides to add a helpful explanation before the JSON.

from openai import OpenAI
from pydantic import BaseModel, Field
from enum import Enum
from typing import Optional
import json

client = OpenAI()

class TicketCategory(str, Enum):
    BILLING = "billing"
    TECHNICAL = "technical"
    FEATURE_REQUEST = "feature_request"
    ACCOUNT = "account"
    OTHER = "other"

class TicketClassification(BaseModel):
    category: TicketCategory
    confidence: float = Field(ge=0.0, le=1.0)
    subcategory: Optional[str] = None
    reasoning: str = Field(max_length=200)

def classify_ticket(text: str) -> TicketClassification:
    """Classify a support ticket using GPT-4o-mini with structured output."""
    response = client.beta.chat.completions.parse(
        model="gpt-4o-mini",
        response_format=TicketClassification,
        messages=[
            {
                "role": "system",
                "content": (
                    "You are a support ticket classifier for a SaaS company. "
                    "Classify each ticket into exactly one category. "
                    "If uncertain, choose the closest match and set confidence below 0.7. "
                    "Never refuse to classify - always pick the best category."
                ),
            },
            {
                "role": "user",
                "content": f"Classify this support ticket:\n\n{text}",
            },
        ],
        temperature=0.0,
    )
    return response.choices[0].message.parsed

The key details here: temperature=0.0 for deterministic output, a Pydantic model as the response format so the API guarantees valid JSON matching the schema, and explicit instructions about what to do when uncertain. That last part matters more than you think. Without it, the model sometimes returns "I cannot determine the category" which is not a valid enum value and blows up your pipeline.

Rule 2: Include examples in the system prompt for edge cases

For classification tasks, I found that three to five examples in the system prompt improved accuracy on edge cases by roughly 15% without meaningfully increasing token usage. The trick is to pick examples that sit on category boundaries, not obvious ones.

CLASSIFIER_SYSTEM_PROMPT = """You are a support ticket classifier. Classify each ticket into exactly one category.

Categories:
- billing: Payment issues, invoices, pricing questions, subscription changes
- technical: Bugs, errors, integration problems, API issues, performance
- feature_request: New feature suggestions, enhancement requests, workflow improvements
- account: Login issues, permissions, team management, account settings
- other: Anything that does not fit the above categories

Edge case examples:
- "I can't log in to see my invoice" → billing (primary intent is viewing invoice)
- "The API keeps timing out when I try to export reports" → technical (API timeout is the issue)
- "Can you add SSO? Our security team requires it" → feature_request (requesting new capability)
- "I was charged twice and now I can't access my account" → billing (billing is root cause)

Always respond with your best classification. Never refuse. Set confidence below 0.7 if genuinely ambiguous."""

Rule 3: Design prompts that fail gracefully

Your prompt should have a built-in "I don't know" path that is still machine-parseable. I use a confidence field for this purpose. Downstream, any record with confidence below 0.6 gets routed to a manual review queue instead of flowing through automatically.

Building an LLM-Powered Data Classifier

Let me show you the full classifier I use in production, including batching, error handling, and fallback logic. This handles about 3,000 support tickets per day at a cost of roughly $4.50.

import asyncio
import logging
from dataclasses import dataclass
from typing import Optional
from openai import AsyncOpenAI
from pydantic import BaseModel, Field
from tenacity import retry, stop_after_attempt, wait_exponential

logger = logging.getLogger(__name__)
client = AsyncOpenAI()

BATCH_SIZE = 20  # Records per LLM call
MAX_CONCURRENT = 10  # Parallel API calls

class ClassificationResult(BaseModel):
    record_id: str
    category: str
    confidence: float = Field(ge=0.0, le=1.0)
    subcategory: Optional[str] = None

class BatchClassificationResponse(BaseModel):
    classifications: list[ClassificationResult]

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=2, max=30),
)
async def classify_batch(records: list[dict]) -> list[ClassificationResult]:
    """Classify a batch of records in a single LLM call."""
    records_text = "\n---\n".join(
        f"[ID: {r['id']}] {r['text'][:500]}" for r in records
    )

    response = await client.beta.chat.completions.parse(
        model="gpt-4o-mini",
        response_format=BatchClassificationResponse,
        messages=[
            {"role": "system", "content": CLASSIFIER_SYSTEM_PROMPT},
            {
                "role": "user",
                "content": (
                    f"Classify each of the following {len(records)} records. "
                    f"Return a classification for every record ID.\n\n"
                    f"{records_text}"
                ),
            },
        ],
        temperature=0.0,
    )
    return response.choices[0].message.parsed.classifications

def fallback_classify(record: dict) -> ClassificationResult:
    """Rule-based fallback when LLM is unavailable."""
    text = record["text"].lower()
    keyword_map = {
        "billing": ["invoice", "charge", "payment", "refund", "subscription", "price"],
        "technical": ["error", "bug", "crash", "timeout", "api", "500", "404"],
        "feature_request": ["would be nice", "can you add", "feature", "suggest", "wish"],
        "account": ["login", "password", "permission", "access", "sso", "role"],
    }
    for category, keywords in keyword_map.items():
        if any(kw in text for kw in keywords):
            return ClassificationResult(
                record_id=record["id"],
                category=category,
                confidence=0.5,  # Lower confidence for rule-based
                subcategory=None,
            )
    return ClassificationResult(
        record_id=record["id"],
        category="other",
        confidence=0.3,
        subcategory=None,
    )

async def classify_all(records: list[dict]) -> list[ClassificationResult]:
    """Classify all records with batching, concurrency, and fallback."""
    semaphore = asyncio.Semaphore(MAX_CONCURRENT)
    results = []

    async def process_batch(batch):
        async with semaphore:
            try:
                return await classify_batch(batch)
            except Exception as e:
                logger.warning(f"LLM batch failed, using fallback: {e}")
                return [fallback_classify(r) for r in batch]

    batches = [
        records[i : i + BATCH_SIZE] for i in range(0, len(records), BATCH_SIZE)
    ]
    tasks = [process_batch(batch) for batch in batches]
    batch_results = await asyncio.gather(*tasks)

    for batch_result in batch_results:
        results.extend(batch_result)

    return results

A few things worth noting about this design. First, the batching: instead of one API call per record, I pack 20 records into a single prompt. At 3,000 records/day, that is 150 API calls instead of 3,000. The cost difference is dramatic because you amortize the system prompt tokens across the batch. Second, the tenacity retry with exponential backoff handles transient API failures. Third, when all retries fail, the fallback classifier keeps the pipeline moving with lower-confidence, keyword-based results that get flagged for manual review.

Address Standardization: Where LLMs Beat Regex

Address parsing was the use case that convinced my skeptical tech lead. We were getting customer addresses from a web form with no field validation, so the data looked like this:

raw_addresses = [
    "123 Main Street, Apt 4B, Springfield, IL 62701",
    "456 oak ave springfield illinois",
    "789 N. Elm Blvd., Suite 200, Chicago IL, 60601",
    "PO Box 42, Rural Route 7, Peoria, IL",
    "1010 lake shore drive #18C chicago 60611",
]

Our regex-based parser handled maybe 70% of these correctly. The LLM handles about 96%.

from pydantic import BaseModel
from typing import Optional

class StandardizedAddress(BaseModel):
    street_line_1: str
    street_line_2: Optional[str] = None
    city: str
    state: str = Field(description="Two-letter state abbreviation")
    zip_code: Optional[str] = None
    is_po_box: bool = False
    confidence: float = Field(ge=0.0, le=1.0)

class AddressBatchResponse(BaseModel):
    addresses: list[StandardizedAddress]

ADDRESS_SYSTEM_PROMPT = """You are an address standardization engine for US addresses.

Rules:
- Always use two-letter state abbreviations (IL, not Illinois)
- Capitalize properly (Chicago, not chicago or CHICAGO)
- Separate unit/apt/suite into street_line_2
- If zip code is missing, leave it as null (do not guess)
- Abbreviate: Street→St, Avenue→Ave, Boulevard→Blvd, Drive→Dr, Road→Rd
- Set confidence below 0.7 if the address seems incomplete or ambiguous
- Never fabricate information that is not in the input"""

async def standardize_addresses(
    raw: list[str],
) -> list[StandardizedAddress]:
    """Standardize a batch of messy addresses."""
    numbered = "\n".join(f"{i+1}. {addr}" for i, addr in enumerate(raw))

    response = await client.beta.chat.completions.parse(
        model="gpt-4o-mini",
        response_format=AddressBatchResponse,
        messages=[
            {"role": "system", "content": ADDRESS_SYSTEM_PROMPT},
            {
                "role": "user",
                "content": (
                    f"Standardize these {len(raw)} addresses:\n\n{numbered}"
                ),
            },
        ],
        temperature=0.0,
    )
    return response.choices[0].message.parsed.addresses

The output for the messy inputs above comes back clean and consistent every time. "456 oak ave springfield illinois" becomes street_line_1: "456 Oak Ave", city: "Springfield", state: "IL", zip_code: null, confidence: 0.85. The lower confidence flag on the missing zip code is exactly what I want, because it routes that record to enrichment via a USPS API lookup downstream.

Entity Extraction with Structured Output

The third pipeline where LLMs replaced a fragile custom solution is extracting structured entities from vendor invoices that arrive as text (already OCR'd). Previously we had a spaCy NER model fine-tuned on 2,000 labeled invoices. It worked, but every new vendor format required a labeling session and retrain. The LLM-based extractor handles new formats with zero additional training.

from pydantic import BaseModel
from typing import Optional
from datetime import date

class InvoiceEntity(BaseModel):
    vendor_name: str
    invoice_number: Optional[str] = None
    invoice_date: Optional[date] = None
    due_date: Optional[date] = None
    total_amount: Optional[float] = None
    currency: str = "USD"
    line_items: list[dict] = []
    tax_amount: Optional[float] = None
    po_number: Optional[str] = None
    confidence: float

INVOICE_PROMPT = """Extract structured data from this invoice text.

Rules:
- Dates should be in ISO format (YYYY-MM-DD)
- Amounts should be numeric (no currency symbols)
- If a field is not present in the text, leave it as null
- For line_items, extract: description, quantity, unit_price, total
- Set confidence below 0.7 if the text is poorly formatted or ambiguous
- Never invent data that is not in the source text"""

async def extract_invoice(text: str) -> InvoiceEntity:
    """Extract structured invoice data from OCR text."""
    response = await client.beta.chat.completions.parse(
        model="gpt-4o",  # Use 4o for complex extraction
        response_format=InvoiceEntity,
        messages=[
            {"role": "system", "content": INVOICE_PROMPT},
            {"role": "user", "content": f"Extract data from this invoice:\n\n{text[:4000]}"},
        ],
        temperature=0.0,
    )
    return response.choices[0].message.parsed

I use GPT-4o instead of 4o-mini for invoice extraction because the accuracy difference on tabular data justifies the 10x cost difference. A single mis-extracted total_amount causes more downstream damage than $0.01 per invoice.

LLM vs Regex vs ML Model: When to Use What

After running these pipelines for six months, I have a much clearer picture of which tool fits each job. Here is the comparison I wish someone had given me at the start.

Task Regex / Rules Custom ML Model LLM API Recommendation
Email validation 99.9% accurate, free Overkill Overkill Regex
Date parsing 95%+ with dateutil Unnecessary 99% but expensive Regex/dateutil
Sentiment analysis 60-70% keyword match 88-92% fine-tuned BERT 90-94% zero-shot LLM (no training data needed)
Multi-class text classification 50-75% keyword rules 85-93% with labeled data 88-95% zero-shot LLM unless you have 10K+ labels
Named entity extraction Poor on varied text 90%+ fine-tuned spaCy 92-96% structured output LLM for varied formats, ML for fixed formats
Address standardization 70% with pattern lib 85% with training data 95%+ zero-shot LLM
Deduplication Good for exact/fuzzy match Better for semantic dedup Expensive per pair Regex + embedding similarity
Language detection 95%+ with fasttext Same as rules 99% but expensive Regex/fasttext
Schema mapping Manual effort Possible but fragile 85-90% automatic LLM for initial mapping, human review

The pattern I see: use LLMs for tasks where the input is highly variable and writing rules for every edge case is impractical. Use regex and deterministic rules for well-defined formats. Use custom ML models only when you have abundant labeled data AND the task is high-volume enough to justify the training and maintenance overhead.

Batching Strategies for Cost Control

Cost was my biggest concern going in, and it is the question I get asked most. Here is the real breakdown from our three production pipelines.

The math that matters

GPT-4o-mini pricing: $0.15 per 1M input tokens, $0.60 per 1M output tokens. A typical classification prompt with system instructions, few-shot examples, and one record is about 800 tokens in, 100 tokens out. That is $0.00018 per record. At 3,000 records/day, that is $0.54/day or $16.20/month.

Now here is where batching changes everything:

# Cost comparison: single vs batched classification
# System prompt: ~500 tokens (paid once per call)
# Per-record input: ~300 tokens
# Per-record output: ~80 tokens

# SINGLE RECORD PER CALL (3,000 calls/day)
single_input_cost = 3000 * (500 + 300) * 0.15 / 1_000_000   # $0.36/day
single_output_cost = 3000 * 80 * 0.60 / 1_000_000            # $0.144/day
single_total = single_input_cost + single_output_cost          # $0.504/day

# BATCHED 20 RECORDS PER CALL (150 calls/day)
batch_input_cost = 150 * (500 + 300 * 20) * 0.15 / 1_000_000  # $0.146/day
batch_output_cost = 150 * (80 * 20) * 0.60 / 1_000_000         # $0.144/day
batch_total = batch_input_cost + batch_output_cost               # $0.290/day

# Savings: 42% cost reduction from batching alone
# Monthly: $8.70 vs $15.12 — not huge, but at scale it compounds

The real savings come when your system prompt is long (mine has 500+ tokens of instructions and examples). Batching amortizes that cost. With a 1,500-token system prompt and 50 records per batch, I have seen 60% cost reductions versus single-record calls.

Practical batching tips

  • Batch size sweet spot: 15-25 records for classification, 5-10 for complex extraction. Beyond that, accuracy drops as the model loses track of which record is which.
  • Number your records with explicit IDs in the prompt. Without IDs, batch responses occasionally skip or duplicate records.
  • Set a token budget: Keep total input under 8,000 tokens per call. Longer prompts increase latency and can hit context limits on cheaper models.
  • Use async with concurrency limits: 10 parallel calls is usually safe for OpenAI Tier 2. Check your rate limits.

Fallback Patterns: What Happens When the LLM Fails

LLM APIs fail. Not often, but often enough that any production pipeline needs a plan. In six months, I have experienced: two major OpenAI outages (30+ minutes), dozens of 429 rate limit errors, a handful of responses that were valid JSON but contained hallucinated data, and one memorable incident where the model started returning Spanish classifications for English tickets after an apparent model update.

The three-tier fallback pattern

import hashlib
import json
from functools import lru_cache
from typing import Optional

# Tier 1: LLM cache (Redis in production, dict for illustration)
_cache: dict[str, dict] = {}

def cache_key(text: str, task: str) -> str:
    """Deterministic cache key from input text and task type."""
    content = f"{task}:{text[:500]}"
    return hashlib.sha256(content.encode()).hexdigest()

async def classify_with_fallback(
    record: dict,
    task: str = "ticket_classification",
) -> ClassificationResult:
    """Three-tier classification: cache → LLM → rules."""

    # Tier 1: Check cache
    key = cache_key(record["text"], task)
    if key in _cache:
        return ClassificationResult(**_cache[key])

    # Tier 2: LLM call with retry
    try:
        result = await classify_batch([record])
        classification = result[0]
        # Cache successful results
        _cache[key] = classification.model_dump()
        return classification
    except Exception as e:
        logger.error(f"LLM classification failed: {e}")

    # Tier 3: Rule-based fallback
    classification = fallback_classify(record)
    classification.confidence = min(classification.confidence, 0.4)
    return classification

The cache layer is more important than you might think. In our support ticket pipeline, about 30% of tickets are near-duplicates (different customers reporting the same issue). Caching the classification result on a hash of the input text means those duplicates never hit the API at all.

Validation: trust but verify

Even when the LLM returns valid JSON matching your schema, the data can be wrong. I run a validation layer after every LLM stage:

from dataclasses import dataclass

@dataclass
class ValidationResult:
    is_valid: bool
    errors: list[str]
    needs_review: bool

def validate_classification(result: ClassificationResult) -> ValidationResult:
    """Post-LLM validation for classification results."""
    errors = []

    # Check confidence threshold
    if result.confidence < 0.5:
        errors.append(f"Low confidence: {result.confidence}")

    # Check for known bad patterns
    if result.category == "other" and result.confidence > 0.9:
        errors.append("Suspiciously high confidence for 'other' category")

    # Check category is in allowed set
    allowed = {"billing", "technical", "feature_request", "account", "other"}
    if result.category not in allowed:
        errors.append(f"Unknown category: {result.category}")

    return ValidationResult(
        is_valid=len(errors) == 0,
        errors=errors,
        needs_review=result.confidence < 0.6 or len(errors) > 0,
    )

Records that fail validation go to a review queue. In practice, about 4-6% of LLM-classified records get flagged, and of those, roughly half actually need correction. That 2-3% error rate is better than our old rules engine (8-10% error rate on new ticket types) and comparable to our fine-tuned ML model (2-4%), but with zero training data and no retraining burden.

Cost Analysis: When the LLM Is Actually Cheaper

This is the part that surprised me. In three of our four LLM pipeline stages, the LLM is not just better, it is cheaper than the alternative when you account for total cost of ownership.

Classification pipeline: LLM vs custom model

Cost Factor Fine-tuned BERT GPT-4o-mini API
Initial labeling (2,000 examples) $4,000 (contractor) $0
Model training + iteration $200 (GPU hours) $0
Engineer time (training pipeline) 40 hours (~$6,000) 8 hours (~$1,200)
Inference hosting (GPU) $150/month $0
API costs $0 $9/month
Quarterly retraining $1,500/quarter $0 (update prompt)
Year 1 total $18,000 $1,308

The LLM approach is 14x cheaper in year one. The gap narrows over time because the custom model's recurring costs are lower, but it takes about three years for the custom model to break even, and that assumes zero concept drift requiring relabeling.

The calculus flips at high volume. If you are classifying 1 million records per day, the LLM API cost jumps to ~$270/month versus the same $150/month for a self-hosted model. At that scale, train a custom model. But most teams I talk to are processing thousands to low tens of thousands of records per day, where the LLM is clearly the better economic choice.

Production Monitoring: What to Track

Running LLMs in a pipeline means adding new metrics to your observability stack. Here is what I monitor:

  • Confidence distribution: A histogram of confidence scores per LLM stage. If the median drops below 0.8, something changed in your input data.
  • Fallback rate: Percentage of records handled by the rule-based fallback. Spikes indicate API issues or rate limiting.
  • Latency per batch: p50, p95, p99. GPT-4o-mini typically returns in 800ms-2s for batches of 20. If p95 exceeds 10s, check for API degradation.
  • Cost per record: Track actual token usage, not estimates. I was off by 30% in my initial projections because I underestimated output tokens.
  • Category distribution drift: If "other" suddenly jumps from 5% to 20% of classifications, your categories may need updating.
  • Cache hit rate: Should stabilize around 20-40% for most workloads. Much lower means your input data is highly unique (expected for some use cases).
import time
from dataclasses import dataclass, field
from collections import Counter

@dataclass
class PipelineMetrics:
    """Tracks LLM pipeline health metrics."""
    total_records: int = 0
    llm_classified: int = 0
    fallback_classified: int = 0
    cache_hits: int = 0
    total_tokens_in: int = 0
    total_tokens_out: int = 0
    latencies: list[float] = field(default_factory=list)
    confidences: list[float] = field(default_factory=list)
    category_counts: Counter = field(default_factory=Counter)

    @property
    def fallback_rate(self) -> float:
        if self.total_records == 0:
            return 0.0
        return self.fallback_classified / self.total_records

    @property
    def cache_hit_rate(self) -> float:
        total_lookups = self.cache_hits + self.llm_classified + self.fallback_classified
        if total_lookups == 0:
            return 0.0
        return self.cache_hits / total_lookups

    @property
    def estimated_cost_usd(self) -> float:
        input_cost = self.total_tokens_in * 0.15 / 1_000_000
        output_cost = self.total_tokens_out * 0.60 / 1_000_000
        return input_cost + output_cost

    def summary(self) -> dict:
        import statistics
        return {
            "total_records": self.total_records,
            "fallback_rate": f"{self.fallback_rate:.1%}",
            "cache_hit_rate": f"{self.cache_hit_rate:.1%}",
            "median_confidence": statistics.median(self.confidences) if self.confidences else 0,
            "p95_latency_ms": (
                sorted(self.latencies)[int(len(self.latencies) * 0.95)]
                if self.latencies else 0
            ),
            "estimated_cost": f"${self.estimated_cost_usd:.4f}",
            "top_categories": self.category_counts.most_common(5),
        }

Lessons After Six Months in Production

Here is what I would tell my past self before starting this journey:

  1. Start with GPT-4o-mini, not GPT-4o. The small model handles 80% of pipeline tasks just as well at 1/10 the cost. Only upgrade to GPT-4o for complex extraction tasks where accuracy directly impacts dollars.
  2. Temperature 0.0 is your friend. Creativity is the enemy of data pipeline consistency. You want the same input to produce the same output every time.
  3. Your system prompt is production code. Version control it. Code review it. Test it against a golden dataset before deploying changes. A prompt typo can silently degrade classification accuracy across your entire pipeline.
  4. Build the fallback first. Before writing a single LLM call, build the rule-based fallback. It gives you a baseline to measure against and a safety net for day one.
  5. Cache everything. LLM calls are slow and expensive relative to a Redis lookup. Even partial input matching (hashing the first 500 characters) catches a surprising number of near-duplicates.
  6. Monitor confidence scores religiously. They are your early warning system. When OpenAI shipped an update to GPT-4o-mini in January, our median confidence on address standardization dropped from 0.92 to 0.84 overnight. The alerts fired, and we updated our prompt before any bad data hit the warehouse.
  7. Structured outputs changed everything. Six months ago, I was parsing LLM responses with regex. That was terrible. OpenAI's structured output mode and Pydantic integration made LLMs viable for pipelines. Do not use an LLM API that does not support guaranteed JSON output.

Bottom line: LLMs in data pipelines are not a gimmick. For classification, entity extraction, and data standardization tasks processing under 100K records/day, an LLM API with good prompt design is often both more accurate and cheaper than the alternatives. The key is treating your prompts as production code, building robust fallbacks, and monitoring relentlessly.

The code examples in this article are simplified from our production implementations, but the patterns are real. If you are considering adding LLM stages to your ETL, start with your messiest classification task, the one where your rules engine has the most edge cases. Build the fallback, write the prompt, batch aggressively, and measure everything. You will probably be surprised at how well it works.

Leave a Comment