Observer (publish/subscribe to events)

When to use

  • You want parts of the system to react to events (pipeline finished → notify Slack, trigger next step).
  • Producers shouldn’t know who listens; loose coupling.
  • You may have many listeners per event, added/removed at runtime.

Avoid when a simple direct call is fine or you don’t need multiple listeners.

Diagram (text)

Producer (pipeline) ── publishes ──> EventBus ── notifies ──> Listeners
       "started/finished/failed"                      (0..N)

Step-by-step idea

  1. Make a tiny EventBus with subscribe(topic, fn) and publish(topic, **data).
  2. Producers publish events; listeners subscribe to topics.
  3. Publisher doesn’t know (or care) who’s listening.

Python example (≤40 lines, type-hinted)

from __future__ import annotations
from dataclasses import dataclass, field
from typing import Callable, Any

Listener = Callable[[dict[str, Any]], None]

@dataclass
class EventBus:
    _subs: dict[str, list[Listener]] = field(default_factory=dict)
    def subscribe(self, topic: str, fn: Listener) -> None:
        self._subs.setdefault(topic, []).append(fn)
    def publish(self, topic: str, **data: Any) -> None:
        evt = {"topic": topic, **data}
        for fn in self._subs.get(topic, []):
            fn(evt)

def run_pipeline(bus: EventBus, name: str, rows: int) -> int:
    bus.publish("pipeline.started", name=name)
    # ... do work ...
    if rows <= 0:
        bus.publish("pipeline.failed", name=name, reason="no rows")
        raise ValueError("no rows")
    bus.publish("pipeline.finished", name=name, rows=rows)
    return rows

Tiny pytest (cements it)

def test_observer_pubsub_flow():
    bus = EventBus()
    seen = {"started": 0, "finished": 0, "msgs": []}
    def on_start(e): seen["started"] += 1
    def on_finish(e): seen["finished"] += 1
    def notify(e): seen["msgs"].append(f"{e['name']}:{e['rows']}")
    bus.subscribe("pipeline.started", on_start)
    bus.subscribe("pipeline.finished", on_finish)
    bus.subscribe("pipeline.finished", notify)
    assert run_pipeline(bus, "daily", 3) == 3
    assert seen == {"started": 1, "finished": 1, "msgs": ["daily:3"]}

def test_failed_emits_event():
    bus = EventBus(); captured = []
    bus.subscribe("pipeline.failed", lambda e: captured.append(e["reason"]))
    import pytest
    with pytest.raises(ValueError): run_pipeline(bus, "empty", 0)
    assert captured == ["no rows"]

Trade-offs & pitfalls

  • Pros: Loose coupling; many listeners; runtime extensibility; great for side-effects (alerts, logging).
  • Cons: Harder to trace flow; ordering of listeners is implicit; exceptions in listeners can break publish.
  • Pitfalls:
    • Letting a bad listener crash the bus—consider a safe publish that catches and logs.
    • Blocking work in listeners—consider async or background workers for slow handlers.
    • Global, hidden buses—prefer injecting a bus so tests can control it.

Pythonic alternatives

  • Callbacks list on the object if there’s only one event.
  • Signals libs (e.g., blinker) for robust dispatch.
  • Asyncio: async def listeners + await publish for concurrency.
  • Message queues (Kafka/SNS/SQS) when you need cross-process delivery and durability.

Mini exercise

Add:

  • unsubscribe(topic, fn) to remove a listener, and
  • publish_safe(topic, **data) that calls listeners inside try/except and returns a list of exceptions (if any).
    Write a test proving a faulty listener doesn’t stop others and that unsubscribe works.

Checks (quick checklist)

  • Producers only publish; they don’t know listeners.
  • Listeners are small, fast, and independent.
  • Ordering and error policy are explicit (documented/tested).
  • Tests cover success, failure, and multiple listeners.
  • Consider async/queueing if handlers might block.