Generators / Iterators (Pythonic streaming)

When to use

  • Process large data without loading it all (JSONL, CSV, log streams).
  • Paged/streamed sources (DB fetchmany, API pages, S3 line reads).
  • Pipeline composition where each stage is lazy (parse → filter → batch → load).

Avoid when data is tiny or you truly need a full in-memory list.

Diagram (text)

lines ──> parse_jsonl ──> filter_events ──> batched(n) ──> sink
(each arrow is a generator yielding items lazily)

Python example (≤40 lines, type-hinted)

from __future__ import annotations
from typing import Iterable, Iterator, Callable, Any, Dict, List

def batched(it: Iterable[Any], n: int) -> Iterator[List[Any]]:
    batch: List[Any] = []
    for x in it:
        batch.append(x)
        if len(batch) == n:
            yield batch; batch = []
    if batch: yield batch

def parse_jsonl(lines: Iterable[str]) -> Iterator[Dict[str, Any]]:
    import json
    for line in lines:
        s = line.strip()
        if s: yield json.loads(s)

def filter_events(rows: Iterable[Dict[str, Any]]) -> Iterator[Dict[str, Any]]:
    for r in rows:
        if r.get("type") == "event":
            yield r

def load_jsonl(lines: Iterable[str], batch: int,
               sink: Callable[[List[Dict[str, Any]]], None]) -> int:
    count = 0
    for chunk in batched(filter_events(parse_jsonl(lines)), batch):
        sink(chunk); count += len(chunk)
    return count

Tiny pytest (cements it)

def test_generator_pipeline_is_lazy_and_batched():
    calls, seen = {"n": 0}, []
    def source():
        for s in ['{"type":"event"}\n','{"type":"other"}\n',' \n','{"type":"event"}\n']:
            calls["n"] += 1; yield s
    assert calls["n"] == 0              # lazy: no work yet
    out = load_jsonl(source(), 2, lambda chunk: seen.append([r["type"] for r in chunk]))
    assert out == 2 and seen == [["event","event"]]
    assert calls["n"] == 4               # lines pulled only during iteration

Trade-offs & pitfalls

  • Pros: Constant memory; backpressure-friendly; composable; testable.
  • Cons: One-pass only (usually); errors surface mid-iteration; debugging can feel indirect.
  • Pitfalls:
    • Accidentally doing list(iterator) and losing laziness.
    • Forgetting to close external resources (wrap sources with contextlib.closing).
    • Mixing I/O and CPU-heavy work in one stage—keep stages small and focused.

Pythonic alternatives

  • itertools: chain.from_iterable, islice, takewhile, pairwise, groupby.
  • yield from to delegate to inner generators.
  • Async generators (async def, async for) for HTTP/DB drivers.
  • fileinput for multi-file line streaming; pathlib.Path.open() with text iteration.

Mini exercise

Add dedupe(keys: list[str]) as a generator that drops rows where the tuple of row[k] was seen before (keep first occurrence). Insert it between filter_events and batched, and test with duplicate events.

Checks (quick checklist)

  • Each stage yields lazily and does one job.
  • No accidental materialization (list, join, sum on huge streams).
  • Resource lifetime managed (close cursors/files).
  • Batch boundaries correct; last partial batch emitted.
  • Tests prove laziness and correctness on small samples.