Caching layers (put a fast layer in front of slow I/O)
When to use
- You call expensive/remote things (schemas, feature flags, secrets) repeatedly.
- You can safely reuse results for some time (TTL) or while capacity allows (LRU).
- You need read-through behavior: on miss → load → cache → return.
Avoid when data must be strictly fresh every time, or keys are unbounded with no eviction.
Diagram (text)
Client ──> SchemaService.get(table)
│
cache.get(table) ─ miss ─> loader(table) ─→ cache.put ─→ return
└─ hit ───────────────→ return cached value
Python example (≤40 lines, type-hinted)
Simple LRU + TTL in-memory cache used by a schema service.
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Callable, TypeVar, Generic, Dict, Tuple
import time
from collections import OrderedDict
T = TypeVar("T")
@dataclass
class LRUTTLCache(Generic[T]):
capacity: int = 128
ttl: float | None = 300.0
_store: "OrderedDict[str, Tuple[float, T]]" = field(default_factory=OrderedDict)
_now: Callable[[], float] = time.time
def get(self, key: str) -> T | None:
if key in self._store:
ts, val = self._store.pop(key)
if self.ttl is None or self._now() - ts < self.ttl:
self._store[key] = (ts, val) # refresh recency
return val
return None
def put(self, key: str, val: T) -> None:
if key in self._store: self._store.pop(key)
elif len(self._store) >= self.capacity: self._store.popitem(last=False) # evict LRU
self._store[key] = (self._now(), val)
@dataclass
class SchemaService:
load: Callable[[str], Dict[str, str]]
cache: LRUTTLCache[Dict[str, str]]
def get(self, table: str) -> Dict[str, str]:
v = self.cache.get(table)
if v is None:
v = self.load(table); self.cache.put(table, v)
return v
Tiny pytest (cements it)
def test_cache_hits_and_ttl_and_lru():
calls = {"n": 0}
def loader(t): calls["n"] += 1; return {"table": t, "cols": "a,b"}
# no expiry
svc = SchemaService(loader, LRUTTLCache(capacity=2, ttl=None))
assert svc.get("users")["table"] == "users"
assert svc.get("users")["table"] == "users"
assert calls["n"] == 1 # hit
# TTL expiry
t = 0.0
svc2 = SchemaService(loader, LRUTTLCache(capacity=2, ttl=1.0, _now=lambda: t))
svc2.get("accounts"); t = 2.0
svc2.get("accounts")
assert calls["n"] >= 3 # reloaded after TTL
# LRU eviction
svc3 = SchemaService(loader, LRUTTLCache(capacity=2, ttl=None))
svc3.get("a"); svc3.get("b"); svc3.get("a") # 'b' is LRU now
svc3.get("c") # evicts 'b'
svc3.get("b") # miss → reload
assert calls["n"] >= 5
Trade-offs & pitfalls
- Pros: Big latency cuts, fewer remote calls, lower cost; drop-in around existing services.
- Cons: Staleness risk; memory use; eviction tuning needed.
- Pitfalls:
- Unbounded keys → memory blow-ups (always set capacity).
- TTL too long (stale) or too short (thrash).
- Multi-process apps: each process has its own cache unless you use a shared backend.
- Cache stampede under high concurrency—protect with single-flight locking.
Pythonic alternatives
functools.lru_cachefor pure functions (no TTL).cachetools(LRU/TTL/LFU) for robust policies.- Redis/Memcached for shared caches across processes/hosts.
- Decorators (e.g.,
@cached(ttl=...)) for simple read-through on functions. - Proxy pattern when you want caching at the object boundary (same interface).
Mini exercise
Add a get_or_set(key, loader) helper to LRUTTLCache that loads under a simple per-key lock to avoid stampedes (use a dict[str, Lock]). Write a test that fires two threads asking the same key; ensure the loader runs once.
Checks (quick checklist)
- Clear policy: TTL for freshness, capacity for memory.
- Read-through: miss → load → store → return.
- Eviction works (LRU) and is tested.
- Concurrency story (locks) if used in multithreaded contexts.
- For multi-process, consider a shared cache (Redis) or accept per-process caches.




