Event-driven callbacks (emit events, react later)
When to use
- After something happens (run finished, file landed), you want other actions to react (notify, trigger next job).
- Producers shouldn’t block or know listeners—emit now, handle later.
- Useful for loose coupling and asynchronous pipelines.
Avoid when a direct function call is enough and you don’t need decoupling or delay.
Diagram (text)
Producer ── emit("run.finished", data) ──> EventBus.queue ── drain() ──> handlers
(FIFO) (0..N)
Python example (≤40 lines, type-hinted)
Emit events into a queue; process them later (tick/drain). Perfect for pipelines/services.
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Callable, Any, Dict, List
Listener = Callable[[dict[str, Any]], None]
@dataclass
class EventBus:
_subs: Dict[str, List[Listener]] = field(default_factory=dict)
_queue: List[dict[str, Any]] = field(default_factory=list)
def on(self, topic: str, fn: Listener) -> None:
self._subs.setdefault(topic, []).append(fn)
def emit(self, topic: str, **data: Any) -> None:
self._queue.append({"topic": topic, **data})
def drain(self, limit: int | None = None) -> int:
n = 0
while self._queue and (limit is None or n < limit):
evt = self._queue.pop(0); n += 1
for fn in self._subs.get(evt["topic"], []):
fn(evt)
return n
# Example: a pipeline run that emits, and two listeners that react
def run_job(bus: EventBus, tenant: str, rows: int) -> None:
bus.emit("run.finished", tenant=tenant, rows=rows)
audit_log: List[dict] = []
def audit(e: dict) -> None: audit_log.append(e)
triggered: List[str] = []
def trigger_downstream(e: dict) -> None: triggered.append(f"{e['tenant']}:{e['rows']}")
Tiny pytest (cements it)
def test_event_queue_and_handlers():
bus = EventBus(); bus.on("run.finished", audit); bus.on("run.finished", trigger_downstream)
run_job(bus, "t1", 3)
assert audit_log == [] and triggered == [] # not processed yet
drained = bus.drain()
assert drained == 1 and audit_log[-1]["tenant"] == "t1" and triggered[-1] == "t1:3"
def test_backpressure_limit():
bus = EventBus(); bus.on("run.finished", audit)
run_job(bus, "t2", 1); run_job(bus, "t3", 2)
assert bus.drain(limit=1) == 1 and len(audit_log) >= 2-1 # drained one event
Trade-offs & pitfalls
- Pros: Loose coupling; producers stay fast; easy to add listeners; good for retries/replays.
- Cons: Eventual processing (not immediate); more moving parts (queues, handlers).
- Pitfalls:
- A bad handler can stop others—wrap errors or use a safe drain.
- No durability by default—process crash loses queued events (persist or use a real queue).
- At-least-once vs exactly-once delivery: duplicates can happen—make handlers idempotent.
- Ordering guarantees (per key vs global) must be defined.
Pythonic alternatives
- Observer (sync) if you want immediate notifications (we did #15).
asyncio.Queue/ background task for true async in one process.- Real queues (SQS/Kafka/Redis streams) for durability, scaling, and cross-process delivery.
- Frameworks (Celery/RQ/Arq) when you need workers, retries, and scheduling.
Mini exercise
Add drain_safe() that:
- Wraps each handler call in
try/except, - Appends failures to
dead_letters: list[tuple[event, exc]], - Continues processing others.
Write a test where one handler raises but the other still runs, and the event appears indead_letters.
Checks (quick checklist)
- Producers only emit; processing happens in drain/tick.
- Handlers are small and idempotent (safe on retries).
- Error handling policy is explicit (safe drain / dead letter).
- Durability & ordering needs documented; use a real queue if required.
- Tests prove queued behavior and handler independence.




