Command (wrap an operation as an object)
When to use
- You want to queue/record/retry operations (DDL, maintenance, ETL steps) uniformly.
- You need undo/audit later or to replay failed steps.
- You want to decouple “when/how to run” from “what to do”.
Avoid when a simple list of callables is enough and you don’t need queuing/retries/audit.
Diagram (text)
Client ──> CommandBus.dispatch([Command, Command, ...])
▲ ▲
runs with retry SqlCommand(client, "VACUUM ...")
SqlCommand(client, "REFRESH ...")
Step-by-step idea
- Define a tiny Command interface:
execute() -> str. - Make concrete commands (e.g.,
SqlCommand). - A CommandBus (invoker) runs them (optionally with retry/logging).
- You can queue, persist, replay—because commands are just data + behavior.
Python example (≤40 lines, type-hinted)
from __future__ import annotations
from typing import Protocol, Iterable, List, Callable
from dataclasses import dataclass, field
class Command(Protocol):
def execute(self) -> str: ...
@dataclass
class SqlCommand:
client: object
sql: str
def execute(self) -> str:
return self.client.run(self.sql)
@dataclass
class CommandBus:
retries: int = 0
sleep: Callable[[float], None] = lambda s: None
log: List[str] = field(default_factory=list)
def dispatch(self, cmds: Iterable[Command]) -> List[str]:
out: List[str] = []
for cmd in cmds:
attempts = 0
while True:
try:
res = cmd.execute()
self.log.append(res); out.append(res); break
except Exception:
if attempts >= self.retries: raise
attempts += 1; self.sleep(0)
return out
Tiny pytest (cements it)
def test_command_bus_runs_in_order_with_retry():
calls = {"refresh": 0}
class FakeClient:
def __init__(self): self.seen = []
def run(self, sql: str) -> str:
self.seen.append(sql)
if sql.startswith("REFRESH") and (calls["refresh"] := calls["refresh"]+1) == 1:
raise TimeoutError("flaky")
return "OK:"+sql
c = FakeClient()
cmds = [SqlCommand(c, "VACUUM t"), SqlCommand(c, "REFRESH mv")]
bus = CommandBus(retries=1)
assert bus.dispatch(cmds) == ["OK:VACUUM t", "OK:REFRESH mv"]
assert c.seen == ["VACUUM t", "REFRESH mv", "REFRESH mv"] # retried once
Trade-offs & pitfalls
- Pros: Uniform execution; easy queuing/retry/audit; testable; operations become composable data.
- Cons: More ceremony than calling functions directly.
- Pitfalls:
- Commands that hide side effects or depend on hidden global state—pass dependencies explicitly.
- Skipping idempotency—retries may re-run; design commands to be safe or record execution.
- Bloated
CommandBus(scheduling, metrics, DLQ) — keep it thin, compose features instead.
Pythonic alternatives
- Plain callables or
functools.partialin a list; add a small runner for logging/retry. - Dataclass + function:
@dataclass class Job(...); def handle(job): ...(Message Handler style). - Task queues (Celery/RQ/Arq) for distributed execution; your Command is the payload.
- Decorator for simple retry/metrics if you don’t need queuing or audit.
Mini exercise
Add UndoableCommand(Command) with undo() -> None. Implement SqlCommand with both execute and undo (e.g., create table / drop table). Extend CommandBus with run_with_undo(cmds) that rolls back already-run commands on failure.
Checks (quick checklist)
- Commands expose a minimal
execute()and carry their own parameters. - Dependencies are injected (no globals).
- Runner (bus) handles order, retry, logging; keeps policy explicit.
- Commands are idempotent or have clear undo/compensation.
- Tests cover order, retry, and failure behavior.
Say “next” and we’ll do #15 Observer (publish/subscribe for pipeline events like “run finished” → “notify Slack”).




