Composite (treat a group of tasks like a single task)

When to use

  • You want a pipeline made of steps where a step can be either a single task or a group of tasks.
  • Callers should run one .run() no matter if it’s one task or a whole DAG/sub-DAG.
  • You want to nest groups (e.g., extract group, transform group, then load).

Avoid when a flat list of functions is enough and you don’t need nesting.

Diagram (text)

Client ──> Task (interface) ── run(ctx)
             ▲
      ┌──────┴─────────┐
   FuncTask        TaskGroup (Composite)
                     └─ [Task, Task, Task or nested TaskGroup...]

Step-by-step idea

  1. Define a tiny Task interface: run(ctx) -> None.
  2. Leaf: wrap a function as FuncTask.
  3. Composite: TaskGroup holds a list of Task and runs them in order.
  4. Client calls .run(ctx) on either a FuncTask or a TaskGroup—same API.

Python example (≤40 lines, type-hinted)

from __future__ import annotations
from dataclasses import dataclass, field
from typing import Protocol, Callable

class Task(Protocol):
    def run(self, ctx: dict) -> None: ...

@dataclass
class FuncTask:
    name: str
    fn: Callable[[dict], None]
    def run(self, ctx: dict) -> None: self.fn(ctx)

@dataclass
class TaskGroup:
    name: str
    steps: list[Task] = field(default_factory=list)
    def add(self, *tasks: Task) -> "TaskGroup":
        self.steps.extend(tasks); return self
    def run(self, ctx: dict) -> None:
        for t in self.steps: t.run(ctx)

# Example steps
extract = FuncTask("extract", lambda c: c.update(src=["a","b","c"]))
transform = FuncTask("transform", lambda c: c.update(out=[s.upper() for s in c["src"]]))
load = FuncTask("load", lambda c: c.setdefault("loaded", []).extend(c["out"]))
pipeline = TaskGroup("etl").add(TaskGroup("stage").add(extract, transform), load)

Tiny pytest (cements it)

def test_composite_runs_nested_groups():
    ctx = {}
    pipeline.run(ctx)
    assert ctx["src"] == ["a","b","c"]
    assert ctx["out"] == ["A","B","C"]
    assert ctx["loaded"] == ["A","B","C"]

Trade-offs & pitfalls

  • Pros: Uniform API for leaf vs group; easy nesting; clean composition of pipelines/DAGs.
  • Cons: Order and dependencies are implicit—can get tricky without validation.
  • Pitfalls:
    • Letting TaskGroup grow “smart” (branching, retries, parallelism) → becomes a framework. Keep it small.
    • Hidden coupling via ctx keys—document what each task reads/writes or use typed objects.
    • Silent failures—consider error handling policy in the group (stop on first error vs continue).

Pythonic alternatives

  • Simple list of callables: for fn in steps: fn(ctx) if you don’t need nesting.
  • Generators for streaming pipelines (data = step2(step1(data))).
  • Workflow libs (Airflow, Dagster, Prefect) when you need scheduling, retries, UI, parallelism.
  • dataclasses + explicit IO types to make ctx safer/typed.

Mini exercise

Add stop_on_error: bool = True to TaskGroup. If a task raises, stop and re-raise when True; otherwise continue and collect exceptions in ctx["errors"].

Checks (quick checklist)

  • Both leaf and group share the same interface (run(ctx)).
  • Groups just coordinate; no business logic.
  • Nesting works: groups can contain groups.
  • Error/ordering policy is explicit and tested.
  • ctx contract is documented or typed.