Generators / Iterators (Pythonic streaming)
When to use
- Process large data without loading it all (JSONL, CSV, log streams).
- Paged/streamed sources (DB
fetchmany, API pages, S3 line reads). - Pipeline composition where each stage is lazy (parse → filter → batch → load).
Avoid when data is tiny or you truly need a full in-memory list.
Diagram (text)
lines ──> parse_jsonl ──> filter_events ──> batched(n) ──> sink
(each arrow is a generator yielding items lazily)
Python example (≤40 lines, type-hinted)
from __future__ import annotations
from typing import Iterable, Iterator, Callable, Any, Dict, List
def batched(it: Iterable[Any], n: int) -> Iterator[List[Any]]:
batch: List[Any] = []
for x in it:
batch.append(x)
if len(batch) == n:
yield batch; batch = []
if batch: yield batch
def parse_jsonl(lines: Iterable[str]) -> Iterator[Dict[str, Any]]:
import json
for line in lines:
s = line.strip()
if s: yield json.loads(s)
def filter_events(rows: Iterable[Dict[str, Any]]) -> Iterator[Dict[str, Any]]:
for r in rows:
if r.get("type") == "event":
yield r
def load_jsonl(lines: Iterable[str], batch: int,
sink: Callable[[List[Dict[str, Any]]], None]) -> int:
count = 0
for chunk in batched(filter_events(parse_jsonl(lines)), batch):
sink(chunk); count += len(chunk)
return count
Tiny pytest (cements it)
def test_generator_pipeline_is_lazy_and_batched():
calls, seen = {"n": 0}, []
def source():
for s in ['{"type":"event"}\n','{"type":"other"}\n',' \n','{"type":"event"}\n']:
calls["n"] += 1; yield s
assert calls["n"] == 0 # lazy: no work yet
out = load_jsonl(source(), 2, lambda chunk: seen.append([r["type"] for r in chunk]))
assert out == 2 and seen == [["event","event"]]
assert calls["n"] == 4 # lines pulled only during iteration
Trade-offs & pitfalls
- Pros: Constant memory; backpressure-friendly; composable; testable.
- Cons: One-pass only (usually); errors surface mid-iteration; debugging can feel indirect.
- Pitfalls:
- Accidentally doing
list(iterator)and losing laziness. - Forgetting to close external resources (wrap sources with
contextlib.closing). - Mixing I/O and CPU-heavy work in one stage—keep stages small and focused.
- Accidentally doing
Pythonic alternatives
itertools:chain.from_iterable,islice,takewhile,pairwise,groupby.yield fromto delegate to inner generators.- Async generators (
async def,async for) for HTTP/DB drivers. fileinputfor multi-file line streaming;pathlib.Path.open()with text iteration.
Mini exercise
Add dedupe(keys: list[str]) as a generator that drops rows where the tuple of row[k] was seen before (keep first occurrence). Insert it between filter_events and batched, and test with duplicate events.
Checks (quick checklist)
- Each stage yields lazily and does one job.
- No accidental materialization (
list,join,sumon huge streams). - Resource lifetime managed (close cursors/files).
- Batch boundaries correct; last partial batch emitted.
- Tests prove laziness and correctness on small samples.




