Mediator (a coordinator that reduces many-to-many chatter)

When to use

  • Several components must cooperate, but you don’t want them calling each other directly.
  • You want one place to control workflow (ordering, retries, branching).
  • You need to swap/extend components without touching the others.

Avoid when only two things talk, or a plain function calling helpers is enough.

Diagram (text)

Downloader ─┐
Transformer ├───> Mediator (coordinates) ───> decides who calls next
Loader     ─┘
Components never call each other directly; they notify the Mediator.

Step-by-step idea

  1. Components hold a reference to the mediator.
  2. They notify the mediator when done (on_fetched, on_transformed).
  3. Mediator decides next step (and handles errors/policy).

Python example (≤40 lines, type-hinted)

from __future__ import annotations
from dataclasses import dataclass
from typing import List, Dict

Row = Dict[str, int]

@dataclass
class Mediator:
    dl: "Downloader"; tr: "Transformer"; ld: "Loader"
    def start(self, src: str) -> None: self.dl.fetch(src, self)
    def on_fetched(self, rows: List[Row]) -> None: self.tr.process(rows, self)
    def on_transformed(self, rows: List[Row]) -> None: self.ld.load(rows, self)

@dataclass
class Downloader:
    def fetch(self, src: str, m: Mediator) -> None:
        m.on_fetched([{"i": 1}, {"i": 2}, {"i": 3}])  # pretend IO

@dataclass
class Transformer:
    def process(self, rows: List[Row], m: Mediator) -> None:
        m.on_transformed([r for r in rows if r["i"] % 2 == 0])

@dataclass
class Loader:
    out: List[Row]
    def load(self, rows: List[Row], m: Mediator) -> None:
        self.out.extend(rows)

# usage/test
def test_mediator_flow():
    out: List[Row] = []
    med = Mediator(Downloader(), Transformer(), Loader(out))
    med.start("s3://bucket/file.json")
    assert out == [{"i": 2}]

Trade-offs & pitfalls

  • Pros: Components stay decoupled; flow/policy centralized; easy to add steps or change order.
  • Cons: Mediator can grow into a god object; debugging jumps between callbacks.
  • Pitfalls:
    • Embedding business logic inside components—keep orchestration in the Mediator.
    • Hidden side effects—document mediator callbacks clearly.
    • Tight coupling to one mediator—prefer an interface if you’ll swap mediators.

Pythonic alternatives

  • Orchestrator function that calls helpers in order (simplest case).
  • Observer/EventBus if you want broadcast events instead of a single coordinator.
  • Workflow engines (Airflow/Dagster/Prefect) when you need retries, scheduling, UI, distributed workers.
  • Async queues when components run in different processes.

Mini exercise

Add error handling: let Downloader.fetch sometimes raise. Add Mediator.on_error(stage, err) that logs and retries once, then aborts. Write a test proving a first failure retries, a second failure stops and nothing is loaded.

Checks (quick checklist)

  • Components never call each other—only the mediator.
  • Mediator owns ordering, branching, and policy.
  • Clear callback names (on_fetched, on_transformed, …).
  • Tests cover happy path and failure policy.
  • Keep mediator small; split when flows diversify.