Repository & Unit of Work (clean persistence boundary + batched commits)
When to use
- You want business logic to talk to a small, testable API (Repository), not raw DB code.
- You need a transactional boundary: stage changes, then commit or roll back as one unit.
- You’d like easy swaps (in-memory vs DB repo) for tests.
Avoid when your ORM already gives a clean session + simple queries (extra layer may be redundant).
Diagram (text)
Service ── uses ──> UnitOfWork
├─ .repo (staging view)
├─ commit() → flush staged → real repo
└─ rollback() → drop staged
Real Repo (DB) ◄─── flush from UoW on commit
Python example (≤40 lines, type-hinted)
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Protocol
@dataclass(frozen=True)
class Event: id: int; type: str
class EventRepo(Protocol):
def add(self, e: Event) -> None: ...
def list(self) -> list[Event]: ...
@dataclass
class MemoryRepo(EventRepo):
data: list[Event] = field(default_factory=list)
def add(self, e: Event) -> None: self.data.append(e)
def list(self) -> list[Event]: return list(self.data)
@dataclass
class UnitOfWork:
target: MemoryRepo
staged: list[Event] = field(default_factory=list)
class _Staging(EventRepo):
def __init__(self, buf: list[Event]): self.buf = buf
def add(self, e: Event) -> None: self.buf.append(e)
def list(self) -> list[Event]: return list(self.buf)
@property
def repo(self) -> EventRepo: return UnitOfWork._Staging(self.staged)
def __enter__(self) -> "UnitOfWork": return self
def __exit__(self, t, v, tb): self.commit() if t is None else self.rollback()
def commit(self) -> None: self.target.data.extend(self.staged); self.staged.clear()
def rollback(self) -> None: self.staged.clear()
def import_batch(uow: UnitOfWork, rows: list[dict]) -> int:
with uow as tx:
for r in rows: tx.repo.add(Event(r["id"], r["type"]))
return len(rows)
Tiny pytest (cements it)
def test_commit_and_rollback():
repo = MemoryRepo()
assert import_batch(UnitOfWork(repo), [{"id":1,"type":"x"}]) == 1
assert [e.id for e in repo.list()] == [1] # committed
uow = UnitOfWork(repo)
try:
with uow as tx:
tx.repo.add(Event(2,"y"))
raise ValueError("boom") # forces rollback
except ValueError: pass
assert [e.id for e in repo.list()] == [1] # 2 not persisted
Trade-offs & pitfalls
- Pros: Clear persistence API; transactional safety; trivial tests; swap storage backends easily.
- Cons: Extra layer to maintain; can duplicate ORM features.
- Pitfalls:
- Doing business logic inside the repo/UoW—keep them about persistence + transactions only.
- Forgetting to commit/rollback (use the context manager as above).
- Sharing entities across UoWs and mutating them—prefer new instances or re-fetch per UoW.
Pythonic alternatives
- ORM sessions/transactions: SQLAlchemy
Session+session.begin()(already a Unit of Work). - Django:
transaction.atomic()context manager; skip custom UoW unless you need extra layering. - Simple functions: if persistence is tiny, a thin gateway function can be enough.
- Pydantic/dataclasses for entities; keep them persistence-agnostic.
Mini exercise
Add an Outbox to the UoW:
- Track domain events in
uow.outbox: list[dict]; allow handlers to push events during work. - On
commit(), flush bothstagedrecords andoutboxto real stores; onrollback(), drop both. - Test that events are not emitted on rollback, but are present after commit.
Checks (quick checklist)
- Repository exposes a small API (add/get/list) and hides storage details.
- UnitOfWork provides a commit/rollback boundary (context manager preferred).
- Real writes happen only on commit; rollback clears staged changes.
- Tests prove commit vs rollback behavior.
- Keep domain logic outside repos/UoW; they’re persistence plumbing.




