Repository & Unit of Work (clean persistence boundary + batched commits)

When to use

  • You want business logic to talk to a small, testable API (Repository), not raw DB code.
  • You need a transactional boundary: stage changes, then commit or roll back as one unit.
  • You’d like easy swaps (in-memory vs DB repo) for tests.

Avoid when your ORM already gives a clean session + simple queries (extra layer may be redundant).

Diagram (text)

Service ── uses ──> UnitOfWork
                      ├─ .repo (staging view)
                      ├─ commit()  → flush staged → real repo
                      └─ rollback() → drop staged
Real Repo (DB)  ◄─── flush from UoW on commit

Python example (≤40 lines, type-hinted)

from __future__ import annotations
from dataclasses import dataclass, field
from typing import Protocol

@dataclass(frozen=True)
class Event: id: int; type: str

class EventRepo(Protocol):
    def add(self, e: Event) -> None: ...
    def list(self) -> list[Event]: ...

@dataclass
class MemoryRepo(EventRepo):
    data: list[Event] = field(default_factory=list)
    def add(self, e: Event) -> None: self.data.append(e)
    def list(self) -> list[Event]: return list(self.data)

@dataclass
class UnitOfWork:
    target: MemoryRepo
    staged: list[Event] = field(default_factory=list)
    class _Staging(EventRepo):
        def __init__(self, buf: list[Event]): self.buf = buf
        def add(self, e: Event) -> None: self.buf.append(e)
        def list(self) -> list[Event]: return list(self.buf)
    @property
    def repo(self) -> EventRepo: return UnitOfWork._Staging(self.staged)
    def __enter__(self) -> "UnitOfWork": return self
    def __exit__(self, t, v, tb): self.commit() if t is None else self.rollback()
    def commit(self) -> None: self.target.data.extend(self.staged); self.staged.clear()
    def rollback(self) -> None: self.staged.clear()

def import_batch(uow: UnitOfWork, rows: list[dict]) -> int:
    with uow as tx:
        for r in rows: tx.repo.add(Event(r["id"], r["type"]))
    return len(rows)

Tiny pytest (cements it)

def test_commit_and_rollback():
    repo = MemoryRepo()
    assert import_batch(UnitOfWork(repo), [{"id":1,"type":"x"}]) == 1
    assert [e.id for e in repo.list()] == [1]            # committed

    uow = UnitOfWork(repo)
    try:
        with uow as tx:
            tx.repo.add(Event(2,"y"))
            raise ValueError("boom")                      # forces rollback
    except ValueError: pass
    assert [e.id for e in repo.list()] == [1]            # 2 not persisted

Trade-offs & pitfalls

  • Pros: Clear persistence API; transactional safety; trivial tests; swap storage backends easily.
  • Cons: Extra layer to maintain; can duplicate ORM features.
  • Pitfalls:
    • Doing business logic inside the repo/UoW—keep them about persistence + transactions only.
    • Forgetting to commit/rollback (use the context manager as above).
    • Sharing entities across UoWs and mutating them—prefer new instances or re-fetch per UoW.

Pythonic alternatives

  • ORM sessions/transactions: SQLAlchemy Session + session.begin() (already a Unit of Work).
  • Django: transaction.atomic() context manager; skip custom UoW unless you need extra layering.
  • Simple functions: if persistence is tiny, a thin gateway function can be enough.
  • Pydantic/dataclasses for entities; keep them persistence-agnostic.

Mini exercise

Add an Outbox to the UoW:

  • Track domain events in uow.outbox: list[dict]; allow handlers to push events during work.
  • On commit(), flush both staged records and outbox to real stores; on rollback(), drop both.
  • Test that events are not emitted on rollback, but are present after commit.

Checks (quick checklist)

  • Repository exposes a small API (add/get/list) and hides storage details.
  • UnitOfWork provides a commit/rollback boundary (context manager preferred).
  • Real writes happen only on commit; rollback clears staged changes.
  • Tests prove commit vs rollback behavior.
  • Keep domain logic outside repos/UoW; they’re persistence plumbing.