Event-driven callbacks (emit events, react later)

When to use

  • After something happens (run finished, file landed), you want other actions to react (notify, trigger next job).
  • Producers shouldn’t block or know listeners—emit now, handle later.
  • Useful for loose coupling and asynchronous pipelines.

Avoid when a direct function call is enough and you don’t need decoupling or delay.

Diagram (text)

Producer ── emit("run.finished", data) ──> EventBus.queue ── drain() ──> handlers
                                                    (FIFO)                    (0..N)

Python example (≤40 lines, type-hinted)

Emit events into a queue; process them later (tick/drain). Perfect for pipelines/services.

from __future__ import annotations
from dataclasses import dataclass, field
from typing import Callable, Any, Dict, List

Listener = Callable[[dict[str, Any]], None]

@dataclass
class EventBus:
    _subs: Dict[str, List[Listener]] = field(default_factory=dict)
    _queue: List[dict[str, Any]] = field(default_factory=list)
    def on(self, topic: str, fn: Listener) -> None:
        self._subs.setdefault(topic, []).append(fn)
    def emit(self, topic: str, **data: Any) -> None:
        self._queue.append({"topic": topic, **data})
    def drain(self, limit: int | None = None) -> int:
        n = 0
        while self._queue and (limit is None or n < limit):
            evt = self._queue.pop(0); n += 1
            for fn in self._subs.get(evt["topic"], []):
                fn(evt)
        return n

# Example: a pipeline run that emits, and two listeners that react
def run_job(bus: EventBus, tenant: str, rows: int) -> None:
    bus.emit("run.finished", tenant=tenant, rows=rows)

audit_log: List[dict] = []
def audit(e: dict) -> None: audit_log.append(e)

triggered: List[str] = []
def trigger_downstream(e: dict) -> None: triggered.append(f"{e['tenant']}:{e['rows']}")

Tiny pytest (cements it)

def test_event_queue_and_handlers():
    bus = EventBus(); bus.on("run.finished", audit); bus.on("run.finished", trigger_downstream)
    run_job(bus, "t1", 3)
    assert audit_log == [] and triggered == []      # not processed yet
    drained = bus.drain()
    assert drained == 1 and audit_log[-1]["tenant"] == "t1" and triggered[-1] == "t1:3"

def test_backpressure_limit():
    bus = EventBus(); bus.on("run.finished", audit)
    run_job(bus, "t2", 1); run_job(bus, "t3", 2)
    assert bus.drain(limit=1) == 1 and len(audit_log) >= 2-1  # drained one event

Trade-offs & pitfalls

  • Pros: Loose coupling; producers stay fast; easy to add listeners; good for retries/replays.
  • Cons: Eventual processing (not immediate); more moving parts (queues, handlers).
  • Pitfalls:
    • A bad handler can stop others—wrap errors or use a safe drain.
    • No durability by default—process crash loses queued events (persist or use a real queue).
    • At-least-once vs exactly-once delivery: duplicates can happen—make handlers idempotent.
    • Ordering guarantees (per key vs global) must be defined.

Pythonic alternatives

  • Observer (sync) if you want immediate notifications (we did #15).
  • asyncio.Queue / background task for true async in one process.
  • Real queues (SQS/Kafka/Redis streams) for durability, scaling, and cross-process delivery.
  • Frameworks (Celery/RQ/Arq) when you need workers, retries, and scheduling.

Mini exercise

Add drain_safe() that:

  • Wraps each handler call in try/except,
  • Appends failures to dead_letters: list[tuple[event, exc]],
  • Continues processing others.
    Write a test where one handler raises but the other still runs, and the event appears in dead_letters.

Checks (quick checklist)

  • Producers only emit; processing happens in drain/tick.
  • Handlers are small and idempotent (safe on retries).
  • Error handling policy is explicit (safe drain / dead letter).
  • Durability & ordering needs documented; use a real queue if required.
  • Tests prove queued behavior and handler independence.