Command (wrap an operation as an object)

When to use

  • You want to queue/record/retry operations (DDL, maintenance, ETL steps) uniformly.
  • You need undo/audit later or to replay failed steps.
  • You want to decouple “when/how to run” from “what to do”.

Avoid when a simple list of callables is enough and you don’t need queuing/retries/audit.

Diagram (text)

Client ──> CommandBus.dispatch([Command, Command, ...])
                 ▲                    ▲
            runs with retry       SqlCommand(client, "VACUUM ...")
                                  SqlCommand(client, "REFRESH ...")

Step-by-step idea

  1. Define a tiny Command interface: execute() -> str.
  2. Make concrete commands (e.g., SqlCommand).
  3. A CommandBus (invoker) runs them (optionally with retry/logging).
  4. You can queue, persist, replay—because commands are just data + behavior.

Python example (≤40 lines, type-hinted)

from __future__ import annotations
from typing import Protocol, Iterable, List, Callable
from dataclasses import dataclass, field

class Command(Protocol):
    def execute(self) -> str: ...

@dataclass
class SqlCommand:
    client: object
    sql: str
    def execute(self) -> str:
        return self.client.run(self.sql)

@dataclass
class CommandBus:
    retries: int = 0
    sleep: Callable[[float], None] = lambda s: None
    log: List[str] = field(default_factory=list)
    def dispatch(self, cmds: Iterable[Command]) -> List[str]:
        out: List[str] = []
        for cmd in cmds:
            attempts = 0
            while True:
                try:
                    res = cmd.execute()
                    self.log.append(res); out.append(res); break
                except Exception:
                    if attempts >= self.retries: raise
                    attempts += 1; self.sleep(0)
        return out

Tiny pytest (cements it)

def test_command_bus_runs_in_order_with_retry():
    calls = {"refresh": 0}
    class FakeClient:
        def __init__(self): self.seen = []
        def run(self, sql: str) -> str:
            self.seen.append(sql)
            if sql.startswith("REFRESH") and (calls["refresh"] := calls["refresh"]+1) == 1:
                raise TimeoutError("flaky")
            return "OK:"+sql

    c = FakeClient()
    cmds = [SqlCommand(c, "VACUUM t"), SqlCommand(c, "REFRESH mv")]
    bus = CommandBus(retries=1)
    assert bus.dispatch(cmds) == ["OK:VACUUM t", "OK:REFRESH mv"]
    assert c.seen == ["VACUUM t", "REFRESH mv", "REFRESH mv"]  # retried once

Trade-offs & pitfalls

  • Pros: Uniform execution; easy queuing/retry/audit; testable; operations become composable data.
  • Cons: More ceremony than calling functions directly.
  • Pitfalls:
    • Commands that hide side effects or depend on hidden global state—pass dependencies explicitly.
    • Skipping idempotency—retries may re-run; design commands to be safe or record execution.
    • Bloated CommandBus (scheduling, metrics, DLQ) — keep it thin, compose features instead.

Pythonic alternatives

  • Plain callables or functools.partial in a list; add a small runner for logging/retry.
  • Dataclass + function: @dataclass class Job(...); def handle(job): ... (Message Handler style).
  • Task queues (Celery/RQ/Arq) for distributed execution; your Command is the payload.
  • Decorator for simple retry/metrics if you don’t need queuing or audit.

Mini exercise

Add UndoableCommand(Command) with undo() -> None. Implement SqlCommand with both execute and undo (e.g., create table / drop table). Extend CommandBus with run_with_undo(cmds) that rolls back already-run commands on failure.

Checks (quick checklist)

  • Commands expose a minimal execute() and carry their own parameters.
  • Dependencies are injected (no globals).
  • Runner (bus) handles order, retry, logging; keeps policy explicit.
  • Commands are idempotent or have clear undo/compensation.
  • Tests cover order, retry, and failure behavior.

Say “next” and we’ll do #15 Observer (publish/subscribe for pipeline events like “run finished” → “notify Slack”).