CQRS-lite (separate write model from fast read view)

When to use

  • Writes store truth (normalized). Reads serve fast, denormalized views (dashboards, counts).
  • You want cheap queries without slowing writes.
  • You can tolerate eventual consistency (reads may lag a bit).

Avoid when one table + indexes is fast enough, or strong consistency is required for reads.

Diagram (text)

RunService
  ├─ write.add(Run)     → append-only store (truth)
  └─ read.inc(...)      → denormalized counters (dashboard)
Rebuild: read.reset(); for r in write.all(): read.inc(...)

Python example (≤40 lines, type-hinted)

from __future__ import annotations
from dataclasses import dataclass, field
from typing import Protocol, Dict, List

@dataclass(frozen=True)
class Run: tenant: str; status: str

class RunsWrite(Protocol):
    def add(self, r: Run) -> None: ...
    def all(self) -> List[Run]: ...

@dataclass
class MemoryWrite(RunsWrite):
    data: List[Run] = field(default_factory=list)
    def add(self, r: Run) -> None: self.data.append(r)
    def all(self) -> List[Run]: return list(self.data)

class RunsRead(Protocol):
    def inc(self, tenant: str, status: str) -> None: ...
    def summary(self, tenant: str) -> Dict[str, int]: ...
    def reset(self) -> None: ...

@dataclass
class MemoryRead(RunsRead):
    counts: Dict[str, Dict[str, int]] = field(default_factory=dict)
    def inc(self, t: str, s: str) -> None:
        self.counts.setdefault(t, {}).setdefault(s, 0); self.counts[t][s] += 1
    def summary(self, t: str) -> Dict[str, int]: return dict(self.counts.get(t, {}))
    def reset(self) -> None: self.counts.clear()

@dataclass
class RunService:
    write: RunsWrite; read: RunsRead
    def record(self, tenant: str, status: str) -> None:
        self.write.add(Run(tenant, status)); self.read.inc(tenant, status)  # could be async
    def rebuild_read_model(self) -> None:
        self.read.reset()
        for r in self.write.all(): self.read.inc(r.tenant, r.status)

Tiny pytest (cements it)

def test_cqrs_lite_summary_and_rebuild():
    w, q = MemoryWrite(), MemoryRead()
    svc = RunService(w, q)
    for s in ["ok","ok","fail"]: svc.record("t1", s)
    assert q.summary("t1") == {"ok": 2, "fail": 1}
    # simulate rebuilding the read model from the write side (e.g., after a bug fix)
    svc.rebuild_read_model()
    assert q.summary("t1")["ok"] == 2 and len(w.all()) == 3

Trade-offs & pitfalls

  • Pros: Fast, tailored reads; write path stays simple; easy to rebuild projections.
  • Cons: Duplication (truth + views); eventual consistency; more moving parts.
  • Pitfalls:
    • Forgetting a rebuild path for read models (add one, as above).
    • No idempotency in projectors → double increments on replays.
    • Cross-process apps need durable events/queues (not just in-memory calls).
    • Transactions: ensure write and “project” happen atomically or via reliable retry.

Pythonic alternatives

  • SQL views / materialized views (e.g., REFRESH MATERIALIZED VIEW).
  • OLAP tables updated by scheduled jobs (ETL).
  • ORM read models: separate query objects tuned for specific pages.
  • Single model + indexes when traffic is low—simpler is better.

Mini exercise

Make projection eventual: instead of calling read.inc directly, push (tenant, status) to a queue projector.

  • Add drain() that applies all queued events to MemoryRead.
  • Test that before drain() the summary lags; after drain() it’s up-to-date.
  • Bonus: make rebuild_read_model() read from write.all() and rebuild idempotently.

Checks (quick checklist)

  • Write and read models are separate with clear roles.
  • Reads can be rebuilt from writes.
  • Projection is idempotent and handles replays.
  • Consistency expectations are documented (eventual vs strict).
  • Prefer simpler approaches (views/indexes) if they meet requirements.