Observer (publish/subscribe to events)
When to use
- You want parts of the system to react to events (pipeline finished → notify Slack, trigger next step).
- Producers shouldn’t know who listens; loose coupling.
- You may have many listeners per event, added/removed at runtime.
Avoid when a simple direct call is fine or you don’t need multiple listeners.
Diagram (text)
Producer (pipeline) ── publishes ──> EventBus ── notifies ──> Listeners
"started/finished/failed" (0..N)
Step-by-step idea
- Make a tiny EventBus with
subscribe(topic, fn)andpublish(topic, **data). - Producers publish events; listeners subscribe to topics.
- Publisher doesn’t know (or care) who’s listening.
Python example (≤40 lines, type-hinted)
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Callable, Any
Listener = Callable[[dict[str, Any]], None]
@dataclass
class EventBus:
_subs: dict[str, list[Listener]] = field(default_factory=dict)
def subscribe(self, topic: str, fn: Listener) -> None:
self._subs.setdefault(topic, []).append(fn)
def publish(self, topic: str, **data: Any) -> None:
evt = {"topic": topic, **data}
for fn in self._subs.get(topic, []):
fn(evt)
def run_pipeline(bus: EventBus, name: str, rows: int) -> int:
bus.publish("pipeline.started", name=name)
# ... do work ...
if rows <= 0:
bus.publish("pipeline.failed", name=name, reason="no rows")
raise ValueError("no rows")
bus.publish("pipeline.finished", name=name, rows=rows)
return rows
Tiny pytest (cements it)
def test_observer_pubsub_flow():
bus = EventBus()
seen = {"started": 0, "finished": 0, "msgs": []}
def on_start(e): seen["started"] += 1
def on_finish(e): seen["finished"] += 1
def notify(e): seen["msgs"].append(f"{e['name']}:{e['rows']}")
bus.subscribe("pipeline.started", on_start)
bus.subscribe("pipeline.finished", on_finish)
bus.subscribe("pipeline.finished", notify)
assert run_pipeline(bus, "daily", 3) == 3
assert seen == {"started": 1, "finished": 1, "msgs": ["daily:3"]}
def test_failed_emits_event():
bus = EventBus(); captured = []
bus.subscribe("pipeline.failed", lambda e: captured.append(e["reason"]))
import pytest
with pytest.raises(ValueError): run_pipeline(bus, "empty", 0)
assert captured == ["no rows"]
Trade-offs & pitfalls
- Pros: Loose coupling; many listeners; runtime extensibility; great for side-effects (alerts, logging).
- Cons: Harder to trace flow; ordering of listeners is implicit; exceptions in listeners can break publish.
- Pitfalls:
- Letting a bad listener crash the bus—consider a safe publish that catches and logs.
- Blocking work in listeners—consider async or background workers for slow handlers.
- Global, hidden buses—prefer injecting a bus so tests can control it.
Pythonic alternatives
- Callbacks list on the object if there’s only one event.
- Signals libs (e.g.,
blinker) for robust dispatch. - Asyncio:
async deflisteners +awaitpublish for concurrency. - Message queues (Kafka/SNS/SQS) when you need cross-process delivery and durability.
Mini exercise
Add:
unsubscribe(topic, fn)to remove a listener, andpublish_safe(topic, **data)that calls listeners insidetry/exceptand returns a list of exceptions (if any).
Write a test proving a faulty listener doesn’t stop others and thatunsubscribeworks.
Checks (quick checklist)
- Producers only publish; they don’t know listeners.
- Listeners are small, fast, and independent.
- Ordering and error policy are explicit (documented/tested).
- Tests cover success, failure, and multiple listeners.
- Consider async/queueing if handlers might block.




