Composite (treat a group of tasks like a single task)
When to use
- You want a pipeline made of steps where a step can be either a single task or a group of tasks.
- Callers should run one
.run()no matter if it’s one task or a whole DAG/sub-DAG. - You want to nest groups (e.g.,
extractgroup,transformgroup, thenload).
Avoid when a flat list of functions is enough and you don’t need nesting.
Diagram (text)
Client ──> Task (interface) ── run(ctx)
▲
┌──────┴─────────┐
FuncTask TaskGroup (Composite)
└─ [Task, Task, Task or nested TaskGroup...]
Step-by-step idea
- Define a tiny Task interface:
run(ctx) -> None. - Leaf: wrap a function as
FuncTask. - Composite:
TaskGroupholds a list ofTaskand runs them in order. - Client calls
.run(ctx)on either aFuncTaskor aTaskGroup—same API.
Python example (≤40 lines, type-hinted)
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Protocol, Callable
class Task(Protocol):
def run(self, ctx: dict) -> None: ...
@dataclass
class FuncTask:
name: str
fn: Callable[[dict], None]
def run(self, ctx: dict) -> None: self.fn(ctx)
@dataclass
class TaskGroup:
name: str
steps: list[Task] = field(default_factory=list)
def add(self, *tasks: Task) -> "TaskGroup":
self.steps.extend(tasks); return self
def run(self, ctx: dict) -> None:
for t in self.steps: t.run(ctx)
# Example steps
extract = FuncTask("extract", lambda c: c.update(src=["a","b","c"]))
transform = FuncTask("transform", lambda c: c.update(out=[s.upper() for s in c["src"]]))
load = FuncTask("load", lambda c: c.setdefault("loaded", []).extend(c["out"]))
pipeline = TaskGroup("etl").add(TaskGroup("stage").add(extract, transform), load)
Tiny pytest (cements it)
def test_composite_runs_nested_groups():
ctx = {}
pipeline.run(ctx)
assert ctx["src"] == ["a","b","c"]
assert ctx["out"] == ["A","B","C"]
assert ctx["loaded"] == ["A","B","C"]
Trade-offs & pitfalls
- Pros: Uniform API for leaf vs group; easy nesting; clean composition of pipelines/DAGs.
- Cons: Order and dependencies are implicit—can get tricky without validation.
- Pitfalls:
- Letting
TaskGroupgrow “smart” (branching, retries, parallelism) → becomes a framework. Keep it small. - Hidden coupling via
ctxkeys—document what each task reads/writes or use typed objects. - Silent failures—consider error handling policy in the group (stop on first error vs continue).
- Letting
Pythonic alternatives
- Simple list of callables:
for fn in steps: fn(ctx)if you don’t need nesting. - Generators for streaming pipelines (
data = step2(step1(data))). - Workflow libs (Airflow, Dagster, Prefect) when you need scheduling, retries, UI, parallelism.
dataclasses+ explicit IO types to makectxsafer/typed.
Mini exercise
Add stop_on_error: bool = True to TaskGroup. If a task raises, stop and re-raise when True; otherwise continue and collect exceptions in ctx["errors"].
Checks (quick checklist)
- Both leaf and group share the same interface (
run(ctx)). - Groups just coordinate; no business logic.
- Nesting works: groups can contain groups.
- Error/ordering policy is explicit and tested.
ctxcontract is documented or typed.




